2024-04-04 06:05:35 +00:00
|
|
|
#!/usr/bin/env nix-shell
|
|
|
|
#!nix-shell -i python3 -p python3
|
|
|
|
import struct
|
|
|
|
import sys
|
|
|
|
import pathlib
|
|
|
|
|
|
|
|
|
2024-04-04 07:23:35 +00:00
|
|
|
FRAME_SIZES = [
|
|
|
|
480, 960, 1920, 3840, # SILK NB
|
|
|
|
480, 960, 1920, 3840, # SILK MB
|
|
|
|
480, 960, 1920, 3840, # SILK WB
|
|
|
|
480, 960, # Hybrid SWB
|
|
|
|
480, 960, # Hybrid FB
|
|
|
|
240, 480, 960, 1920, # CELT NB
|
|
|
|
240, 480, 960, 1920, # CELT WB
|
|
|
|
240, 480, 960, 1920, # CELT SWB
|
|
|
|
240, 480, 960, 1920, # CELT FB
|
|
|
|
]
|
|
|
|
|
2024-04-04 06:05:35 +00:00
|
|
|
def crc32ogg(seq):
|
|
|
|
crc = 0
|
|
|
|
for b in seq:
|
|
|
|
crc ^= b << 24
|
|
|
|
for _ in range(8):
|
|
|
|
crc = (crc << 1) ^ 0x104C11DB7 if crc & 0x80000000 else crc << 1
|
|
|
|
return crc
|
|
|
|
|
2024-04-04 07:23:35 +00:00
|
|
|
def paginate(sequence: int, is_last: bool, granule_pos, content: bytes):
|
2024-04-04 06:05:35 +00:00
|
|
|
# Version, flags, position, serial number, sequence number, checksum, segments, segment table
|
|
|
|
flags = 2 if sequence == 0 else 0
|
2024-04-04 06:20:23 +00:00
|
|
|
if is_last:
|
|
|
|
flags |= 4
|
2024-04-04 06:05:35 +00:00
|
|
|
page = bytearray(
|
|
|
|
b"OggS"
|
|
|
|
+ struct.pack(
|
2024-04-04 07:23:35 +00:00
|
|
|
"<BBQIIIB", 0, flags, granule_pos, 0xACAB1234, sequence, 0, (len(content) // 255) + 1
|
2024-04-04 06:05:35 +00:00
|
|
|
)
|
|
|
|
+ (b"\xff" * (len(content) // 255))
|
|
|
|
+ bytes([len(content) % 255])
|
|
|
|
+ content
|
|
|
|
)
|
|
|
|
struct.pack_into("<I", page, 22, crc32ogg(page))
|
|
|
|
return page
|
|
|
|
|
|
|
|
|
|
|
|
def write_opus(ktss: bytes, filename: pathlib.Path):
|
|
|
|
# We only deal with opus
|
|
|
|
if ktss[0x20] != 9:
|
|
|
|
return
|
|
|
|
|
|
|
|
out = filename.open("wb")
|
|
|
|
|
|
|
|
channels = ktss[0x29]
|
|
|
|
(sample_rate, num_samples) = struct.unpack("<II", ktss[0x2C:0x34])
|
|
|
|
(start_offset, data_size) = struct.unpack("<II", ktss[0x40:0x48])
|
2024-04-04 07:29:44 +00:00
|
|
|
(skip,) = struct.unpack("<H", ktss[0x58:0x5A])
|
2024-04-04 06:05:35 +00:00
|
|
|
stream_count = ktss[0x5A]
|
|
|
|
coupled_count = ktss[0x5B]
|
|
|
|
channel_mapping = ktss[0x5C : 0x5C + channels]
|
|
|
|
|
|
|
|
opus_header = (
|
|
|
|
b"OpusHead"
|
|
|
|
+ struct.pack(
|
|
|
|
"<BBHIHBBB",
|
|
|
|
1,
|
|
|
|
channels,
|
|
|
|
skip,
|
|
|
|
sample_rate,
|
|
|
|
0,
|
|
|
|
1,
|
|
|
|
stream_count,
|
|
|
|
coupled_count,
|
|
|
|
)
|
|
|
|
+ channel_mapping
|
|
|
|
)
|
|
|
|
# Channel mapping is apparently incorrect for 6 channels but I don't care
|
2024-04-04 07:23:35 +00:00
|
|
|
out.write(paginate(0, False, 0, opus_header))
|
2024-04-04 06:05:35 +00:00
|
|
|
|
|
|
|
comment_header = b"OpusTags\4\0\0\0ktss\0\0\0\0\0\0\0\0"
|
2024-04-04 07:23:35 +00:00
|
|
|
out.write(paginate(1, False, 0, comment_header))
|
2024-04-04 06:05:35 +00:00
|
|
|
|
|
|
|
# Weird length encoding here, not opus standard
|
|
|
|
sequence = 2
|
|
|
|
offset = start_offset
|
2024-04-04 07:23:35 +00:00
|
|
|
granule_pos = 0
|
2024-04-04 06:05:35 +00:00
|
|
|
while offset < len(ktss):
|
|
|
|
(packet_len,) = struct.unpack(">I", ktss[offset : offset + 4])
|
2024-04-04 06:20:23 +00:00
|
|
|
is_last = offset + 8 + packet_len >= len(ktss)
|
2024-04-04 07:23:35 +00:00
|
|
|
toc = ktss[offset + 8]
|
|
|
|
if toc & 3 == 0:
|
|
|
|
num_frames = 1
|
|
|
|
elif toc & 3 == 3:
|
|
|
|
num_frames = ktss[offset + 9] & 0x3f
|
|
|
|
else:
|
|
|
|
num_frames = 2
|
|
|
|
|
|
|
|
granule_len = FRAME_SIZES[toc >> 3] * num_frames
|
|
|
|
if toc & 4 != 0:
|
|
|
|
# We have to divide by 2 if packet is stereo
|
|
|
|
granule_len //= 2
|
|
|
|
|
|
|
|
granule_pos += granule_len
|
|
|
|
|
2024-04-04 06:20:23 +00:00
|
|
|
out.write(
|
2024-04-04 07:23:35 +00:00
|
|
|
paginate(sequence, is_last, granule_pos, ktss[offset + 8 : offset + 8 + packet_len])
|
2024-04-04 06:20:23 +00:00
|
|
|
)
|
2024-04-04 06:05:35 +00:00
|
|
|
|
|
|
|
offset += packet_len + 8
|
|
|
|
sequence += 1
|
|
|
|
|
|
|
|
|
|
|
|
# i don't care about efficiency
|
|
|
|
data = open(sys.argv[1], "rb").read()
|
|
|
|
outdir = pathlib.Path(sys.argv[2])
|
|
|
|
|
|
|
|
if data[:4] != b"KTSR":
|
|
|
|
raise Exception("what are you doing")
|
|
|
|
|
|
|
|
# there's some more stuff but i don't care, go straight to the entry
|
|
|
|
off = 0x40
|
|
|
|
idx = 0
|
|
|
|
while off < len(data):
|
|
|
|
|
|
|
|
if data[off + 0x40 : off + 0x44] != b"KTSS":
|
|
|
|
raise Exception("oh no offset broken")
|
|
|
|
|
|
|
|
(entry_size,) = struct.unpack("<I", data[off + 4 : off + 8])
|
|
|
|
(size,) = struct.unpack("<I", data[off + 0x44 : off + 0x48])
|
|
|
|
|
|
|
|
ktss = data[off + 0x40 : off + 0x40 + size]
|
|
|
|
write_opus(ktss, outdir / f"{idx}.opus")
|
|
|
|
|
|
|
|
off += entry_size
|
|
|
|
idx += 1
|
|
|
|
print(idx, hex(off))
|