diff --git a/flake.nix b/flake.nix index 36abdb4..e700fd2 100644 --- a/flake.nix +++ b/flake.nix @@ -96,6 +96,7 @@ python3 python3Packages.black python3Packages.ipython + python3Packages.trio (python3Packages.selenium.overrideAttrs (old: { postInstall = old.postInstall + '' for ver in v85 v120 v121 v122; do diff --git a/scripts/emoji/get_discord_emoji.py b/scripts/emoji/get_discord_emoji.py index dcf3bc6..5afb654 100755 --- a/scripts/emoji/get_discord_emoji.py +++ b/scripts/emoji/get_discord_emoji.py @@ -23,48 +23,75 @@ def download_file(url, path): time.sleep(0.1) -def get_data(filename): - bindata = open(filename, "rb").read() - decompress = zlib.decompressobj() - jsondata = decompress.decompress(bindata).decode("utf-8") +def get_guilds(filename): + if filename.endswith(".json"): + # TODO: specify if it's compressed or not better + jsondata = open(filename, "r").read() + else: + bindata = open(filename, "rb").read() + decompress = zlib.decompressobj() + jsondata = decompress.decompress(bindata).decode("utf-8") + decoder = json.JSONDecoder() - while len(jsondata) > 0: - obj, end = decoder.raw_decode(jsondata) - if obj.get("t", "") == "READY": - return obj - jsondata = jsondata[end:] + guilds = [] + while (offset := jsondata.find("{")) != -1: + obj, end = decoder.raw_decode(jsondata[offset:]) + jsondata = jsondata[end + offset :] + + typ = obj.get("t", "") + if typ == "READY": + guilds.extend(obj["d"]["guilds"]) + elif typ == "GUILD_CREATE": + guilds.append(obj["d"]) + + return guilds + + +def safe_name(s): + # if you're on windows do something else here, i don't care + return s.replace("/", " ") def main(*args): - data = get_data(args[1]) + guilds = get_guilds(args[1]) - base_dir = pathlib.Path("guilds") + base_dir = pathlib.Path("out") / "discord" - for guild in data["d"]["guilds"]: + for guild in guilds: name = guild["properties"]["name"] print(f"Processing guild '{name}'") - guild_dir = base_dir / name.replace("/", " ") - sticker_dir = guild_dir / "stickers" - sticker_dir.mkdir(parents=True, exist_ok=True) - emoji_dir = guild_dir / "emoji" - emoji_dir.mkdir(exist_ok=True) + sticker_objs = guild["stickers"] + emoji_objs = guild["emojis"] - for sticker in guild["stickers"]: - break + dir_name = safe_name(name) + sticker_dir = base_dir / "stickers" / dir_name + emoji_dir = base_dir / "emoji" / dir_name + + if len(sticker_objs) > 0: + sticker_dir.mkdir(exist_ok=True, parents=True) + if len(emoji_objs) > 0: + emoji_dir.mkdir(exist_ok=True, parents=True) + + for sticker in sticker_objs: print(f"Found sticker '{sticker['name']}'") extension = STICKER_EXTENSIONS[sticker["format_type"]] - url = f"https://media.discordapp.net/stickers/{sticker['id']}.{extension}?size=4096" + if extension == "json": + # Lottie is weird for some reason + url = f"https://discord.com/stickers/{sticker['id']}.json" + else: + url = f"https://media.discordapp.net/stickers/{sticker['id']}.{extension}?size=4096" # Set passthrough=false for APNGs if you don't want animated - filename = sticker_dir / f"{sticker['name']}.{extension}" + + filename = sticker_dir / f"{safe_name(sticker['name'])}.{extension}" print(f"Downloading {url} to {filename}") download_file(url, filename) - for emoji in guild["emojis"]: + for emoji in emoji_objs: print(f"Found emoji '{emoji['name']}'") extension = "gif" if emoji["animated"] else "png" url = f"https://cdn.discordapp.com/emojis/{emoji['id']}.{extension}" - filename = emoji_dir / f"{emoji['name']}.{extension}" + filename = emoji_dir / f"{safe_name(emoji['name'])}.{extension}" print(f"Downloading {url} to {filename}") download_file(url, filename) diff --git a/scripts/emoji/scrape_guilds.py b/scripts/emoji/scrape_guilds.py index f80f308..63218b6 100755 --- a/scripts/emoji/scrape_guilds.py +++ b/scripts/emoji/scrape_guilds.py @@ -1,18 +1,84 @@ #!/usr/bin/env python3 +""" +Scrape guilds with selenium +You probably shouldn't use this, but if you really must: +* Set the CHROME environment variable to the path to your chrome or chromium +* You probably want to connect to an existing chrome instance so you can log in first, + run `chromium --user-data-dir=$HOME/.cache/chromium-emoji-script --remote-debugging-port=9222` + to start chromium then `env CHROME=$(command -v chromium) ./scrape_guilds.py 127.0.0.1:99222` +* It just gives you json, you have to dump later + +""" + +import base64 +import json import os.path import os +import sys +import zlib + from selenium import webdriver +import trio -options = webdriver.ChromeOptions() -if location := os.getenv("CHROME"): - options.binary_location = location -options.add_argument("--start-maximized") -options.add_argument( - "--user-data-dir=" + os.path.expanduser("~/.cache/chromium-emoji-script") -) -options.add_experimental_option("detach", True) -driver = webdriver.Chrome(options=options) -# Selenium says this API is deprecated but there's no better working option in Python -# (okay, maybe there is but they don't really have docs) +def setup_driver(): + options = webdriver.ChromeOptions() + if location := os.getenv("CHROME"): + options.binary_location = location + options.add_argument("--start-maximized") + options.add_argument( + "--user-data-dir=" + os.path.expanduser("~/.cache/chromium-emoji-script") + ) + if len(sys.argv) > 1: + options.add_experimental_option("debuggerAddress", sys.argv[1]) + # options.add_experimental_option("detach", True) + return webdriver.Chrome(options=options) + + +async def handle_events(listener, out_file): + valid_id = None + decompress = None + + async for event in listener: + typ = event.__class__.__name__ + if typ == "WebSocketCreated": + if event.url.startswith("wss://gateway.discord.gg/"): + valid_id = event.request_id + decompress = zlib.decompressobj() + elif typ == "WebSocketFrameReceived": + if event.request_id != valid_id: + continue + message = json.loads( + decompress.decompress(base64.b64decode(event.response.payload_data)) + ) + message_type = message.get("t") + await out_file.write(json.dumps(message) + "\n") + # The data we actually want, might as well flush + if message_type in ["READY", "GUILD_CREATE"]: + await out_file.flush() + + print("Got message of type", message_type) + + +async def main(): + out_path = await trio.Path.cwd() / "out" / "discord" / "events.json" + await out_path.parent.mkdir(parents=True, exist_ok=True) + out_file = await out_path.open("a") + + driver = setup_driver() + async with driver.bidi_connection() as conn: + devtools, session = conn.devtools, conn.session + + await session.execute(devtools.network.enable()) + listener = session.listen( + devtools.network.WebSocketCreated, + devtools.network.WebSocketFrameReceived, + buffer_size=1024, + ) + + await handle_events(listener, out_file) + + +if __name__ == "__main__": + trio.run(main)