i think i can scrape guilds live with selenium now

This commit is contained in:
Artemis Tosini 2024-03-27 18:25:41 +00:00
parent e364bd3657
commit c1d0668a3a
Signed by: artemist
GPG key ID: EE5227935FE3FF18
3 changed files with 128 additions and 34 deletions

View file

@ -96,6 +96,7 @@
python3 python3
python3Packages.black python3Packages.black
python3Packages.ipython python3Packages.ipython
python3Packages.trio
(python3Packages.selenium.overrideAttrs (old: { (python3Packages.selenium.overrideAttrs (old: {
postInstall = old.postInstall + '' postInstall = old.postInstall + ''
for ver in v85 v120 v121 v122; do for ver in v85 v120 v121 v122; do

View file

@ -23,48 +23,75 @@ def download_file(url, path):
time.sleep(0.1) time.sleep(0.1)
def get_data(filename): def get_guilds(filename):
if filename.endswith(".json"):
# TODO: specify if it's compressed or not better
jsondata = open(filename, "r").read()
else:
bindata = open(filename, "rb").read() bindata = open(filename, "rb").read()
decompress = zlib.decompressobj() decompress = zlib.decompressobj()
jsondata = decompress.decompress(bindata).decode("utf-8") jsondata = decompress.decompress(bindata).decode("utf-8")
decoder = json.JSONDecoder() decoder = json.JSONDecoder()
while len(jsondata) > 0: guilds = []
obj, end = decoder.raw_decode(jsondata) while (offset := jsondata.find("{")) != -1:
if obj.get("t", "") == "READY": obj, end = decoder.raw_decode(jsondata[offset:])
return obj jsondata = jsondata[end + offset :]
jsondata = jsondata[end:]
typ = obj.get("t", "")
if typ == "READY":
guilds.extend(obj["d"]["guilds"])
elif typ == "GUILD_CREATE":
guilds.append(obj["d"])
return guilds
def safe_name(s):
# if you're on windows do something else here, i don't care
return s.replace("/", " ")
def main(*args): def main(*args):
data = get_data(args[1]) guilds = get_guilds(args[1])
base_dir = pathlib.Path("guilds") base_dir = pathlib.Path("out") / "discord"
for guild in data["d"]["guilds"]: for guild in guilds:
name = guild["properties"]["name"] name = guild["properties"]["name"]
print(f"Processing guild '{name}'") print(f"Processing guild '{name}'")
guild_dir = base_dir / name.replace("/", " ") sticker_objs = guild["stickers"]
sticker_dir = guild_dir / "stickers" emoji_objs = guild["emojis"]
sticker_dir.mkdir(parents=True, exist_ok=True)
emoji_dir = guild_dir / "emoji"
emoji_dir.mkdir(exist_ok=True)
for sticker in guild["stickers"]: dir_name = safe_name(name)
break sticker_dir = base_dir / "stickers" / dir_name
emoji_dir = base_dir / "emoji" / dir_name
if len(sticker_objs) > 0:
sticker_dir.mkdir(exist_ok=True, parents=True)
if len(emoji_objs) > 0:
emoji_dir.mkdir(exist_ok=True, parents=True)
for sticker in sticker_objs:
print(f"Found sticker '{sticker['name']}'") print(f"Found sticker '{sticker['name']}'")
extension = STICKER_EXTENSIONS[sticker["format_type"]] extension = STICKER_EXTENSIONS[sticker["format_type"]]
if extension == "json":
# Lottie is weird for some reason
url = f"https://discord.com/stickers/{sticker['id']}.json"
else:
url = f"https://media.discordapp.net/stickers/{sticker['id']}.{extension}?size=4096" url = f"https://media.discordapp.net/stickers/{sticker['id']}.{extension}?size=4096"
# Set passthrough=false for APNGs if you don't want animated # Set passthrough=false for APNGs if you don't want animated
filename = sticker_dir / f"{sticker['name']}.{extension}"
filename = sticker_dir / f"{safe_name(sticker['name'])}.{extension}"
print(f"Downloading {url} to {filename}") print(f"Downloading {url} to {filename}")
download_file(url, filename) download_file(url, filename)
for emoji in guild["emojis"]: for emoji in emoji_objs:
print(f"Found emoji '{emoji['name']}'") print(f"Found emoji '{emoji['name']}'")
extension = "gif" if emoji["animated"] else "png" extension = "gif" if emoji["animated"] else "png"
url = f"https://cdn.discordapp.com/emojis/{emoji['id']}.{extension}" url = f"https://cdn.discordapp.com/emojis/{emoji['id']}.{extension}"
filename = emoji_dir / f"{emoji['name']}.{extension}" filename = emoji_dir / f"{safe_name(emoji['name'])}.{extension}"
print(f"Downloading {url} to {filename}") print(f"Downloading {url} to {filename}")
download_file(url, filename) download_file(url, filename)

View file

@ -1,8 +1,27 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""
Scrape guilds with selenium
You probably shouldn't use this, but if you really must:
* Set the CHROME environment variable to the path to your chrome or chromium
* You probably want to connect to an existing chrome instance so you can log in first,
run `chromium --user-data-dir=$HOME/.cache/chromium-emoji-script --remote-debugging-port=9222`
to start chromium then `env CHROME=$(command -v chromium) ./scrape_guilds.py 127.0.0.1:99222`
* It just gives you json, you have to dump later
"""
import base64
import json
import os.path import os.path
import os import os
from selenium import webdriver import sys
import zlib
from selenium import webdriver
import trio
def setup_driver():
options = webdriver.ChromeOptions() options = webdriver.ChromeOptions()
if location := os.getenv("CHROME"): if location := os.getenv("CHROME"):
options.binary_location = location options.binary_location = location
@ -10,9 +29,56 @@ options.add_argument("--start-maximized")
options.add_argument( options.add_argument(
"--user-data-dir=" + os.path.expanduser("~/.cache/chromium-emoji-script") "--user-data-dir=" + os.path.expanduser("~/.cache/chromium-emoji-script")
) )
options.add_experimental_option("detach", True) if len(sys.argv) > 1:
driver = webdriver.Chrome(options=options) options.add_experimental_option("debuggerAddress", sys.argv[1])
# Selenium says this API is deprecated but there's no better working option in Python # options.add_experimental_option("detach", True)
# (okay, maybe there is but they don't really have docs) return webdriver.Chrome(options=options)
async def handle_events(listener, out_file):
valid_id = None
decompress = None
async for event in listener:
typ = event.__class__.__name__
if typ == "WebSocketCreated":
if event.url.startswith("wss://gateway.discord.gg/"):
valid_id = event.request_id
decompress = zlib.decompressobj()
elif typ == "WebSocketFrameReceived":
if event.request_id != valid_id:
continue
message = json.loads(
decompress.decompress(base64.b64decode(event.response.payload_data))
)
message_type = message.get("t")
await out_file.write(json.dumps(message) + "\n")
# The data we actually want, might as well flush
if message_type in ["READY", "GUILD_CREATE"]:
await out_file.flush()
print("Got message of type", message_type)
async def main():
out_path = await trio.Path.cwd() / "out" / "discord" / "events.json"
await out_path.parent.mkdir(parents=True, exist_ok=True)
out_file = await out_path.open("a")
driver = setup_driver()
async with driver.bidi_connection() as conn:
devtools, session = conn.devtools, conn.session
await session.execute(devtools.network.enable())
listener = session.listen(
devtools.network.WebSocketCreated,
devtools.network.WebSocketFrameReceived,
buffer_size=1024,
)
await handle_events(listener, out_file)
if __name__ == "__main__":
trio.run(main)