Python — codecai
Async-first binding for Python 3.10+. Decode streams, encode IDs, watch tool calls, translate across vocabs.
codecai is the Python binding. It’s async-first (built on httpx-style streams), pure-Python, no compiled extension. Python 3.10+ is required for the type annotations.
Install
pip install codecai
The four-step shape
Every Codec client follows the same four steps. In Python:
from codecai import (
load_map, # 1. fetch + verify the vocab map
Detokenizer, # 2. (and 4.) IDs → text
decode_msgpack_stream, # 3. binary stream → frames
)
1. Load the vocab map
map = await load_map(
url="https://cdn.jsdelivr.net/gh/wdunn001/codec-maps/maps/qwen/qwen2.json",
hash="sha256:887311099cdc09e7022001a01fa1da396750d669b7ed2c242a000b9badd09791",
)
load_map is async because it fetches over the network. It verifies the response bytes against hash and caches the parsed map.
If the vendor publishes their own map under /.well-known/codec/, skip the URL/hash and resolve from (origin, id):
from codecai import discover_map
map = await discover_map(origin="https://example.com", id="qwen2")
2. Send a request
A normal /v1/completions POST with stream_format added:
import httpx
async with httpx.AsyncClient() as client:
async with client.stream(
"POST", "http://localhost:8000/v1/completions",
json={
"model": "Qwen/Qwen2.5-7B-Instruct",
"prompt": "Explain entropy in one paragraph.",
"stream_format": "msgpack",
"max_tokens": 256,
},
headers={"Accept-Encoding": "gzip"},
) as resp:
# see step 3
...
Why
client.streamoverclient.post?streamdoesn’t buffer the response body — you want each chunk to flow through the decoder as it arrives.
3. Decode the binary stream
async for frame in decode_msgpack_stream(resp.aiter_raw()):
# frame.ids: list[int]
# frame.done: bool
# frame.finish_reason: str | None
...
decode_msgpack_stream consumes an async byte iterator and yields CodecFrame objects. Use decode_protobuf_stream if your server is configured for protobuf instead.
4. Detokenize at the edge
detok = Detokenizer(map)
async for frame in decode_msgpack_stream(resp.aiter_raw()):
text = detok.render(frame.ids, partial=not frame.done)
print(text, end="", flush=True)
Detokenizer is stateful and stream-safe — it buffers split UTF-8 sequences across render calls. Pass partial=True while the stream is open; the final call (or any call where you know the stream is done) should be partial=False so the buffer flushes.
Encoding (sending IDs, not text)
If you already have token IDs:
from codecai import BPETokenizer
tok = BPETokenizer(map)
ids = tok.encode("System: be concise.\nUser: what's BPE?")
async with client.stream(
"POST", "http://localhost:8000/v1/completions",
json={
"model": "Qwen/Qwen2.5-7B-Instruct",
"prompt": ids, # list[int]
"stream_format": "msgpack",
"max_tokens": 256,
},
) as resp:
...
BPETokenizer.encode() produces bit-identical IDs to the upstream model’s tokenizer.
Watching for tool calls
from codecai import ToolWatcher
watcher = ToolWatcher(map, start="<tool_call>", end="</tool_call>")
async for frame in decode_msgpack_stream(resp.aiter_raw()):
for ev in watcher.feed(frame.ids):
if ev.kind == "passthrough":
forward(ev.ids)
else: # ev.kind == "captured"
text = detok.render(ev.ids)
tool, args = parse_tool_call(text)
await dispatch(tool, args)
The watcher matches reserved control IDs with a single uint32 compare per token. It never detokenizes — that’s the whole point. See Tool calling for the agentic-loop pattern.
Negotiating zstd-with-dict
For deployments with pre-trained zstd dictionaries (see Protocol » Compression), codecai ships small helpers in codecai.compression. The pattern: load the dict bytes once, advertise zstd in Accept-Encoding, validate the server’s Codec-Zstd-Dict header before decompressing.
from codecai import (
CodecZstdDictError,
hash_zstd_dict,
select_zstd_dict_for_response,
)
# 1. Load whatever dicts you trust — typically fetched from the
# tokenizer map's zstd_dictionaries[] entry, hash-verified.
with open("qwen2.5-msgpack-v1.dict", "rb") as f:
msgpack_dict = f.read()
loaded_dicts = {hash_zstd_dict(msgpack_dict): msgpack_dict}
# 2. Send the request with zstd advertised.
async with httpx.AsyncClient() as http:
async with http.stream(
"POST", "http://localhost:8000/v1/completions",
headers={"Accept-Encoding": "zstd, gzip"},
json={
"model": "Qwen/Qwen2.5-7B-Instruct",
"prompt": "Explain entropy.",
"stream_format": "msgpack",
"max_tokens": 256,
},
) as resp:
# 3. Decide how to read the body based on what the server returned.
try:
zdict_bytes = select_zstd_dict_for_response(
resp.headers, loaded_dicts=loaded_dicts,
)
except CodecZstdDictError as e:
# Server used a dict we don't have — fetch it from the map's
# zstd_dictionaries[] entry whose hash matches, or retry with
# Accept-Encoding: gzip. Don't try to decompress a guess.
raise
if zdict_bytes is None:
# Not zstd — httpx auto-decompresses gzip/br. Decode normally.
async for frame in decode_msgpack_stream(resp.aiter_raw()):
...
else:
# zstd-with-dict — use a streaming zstd decoder seeded with
# the right dict bytes, then feed its output to the codec
# frame decoder.
...
select_zstd_dict_for_response returns the matching dict bytes when the response is Content-Encoding: zstd and the server’s Codec-Zstd-Dict header points at a dict you’ve loaded. Returns None when the response isn’t zstd (so your normal gzip / identity path stays untouched). Raises CodecZstdDictError on any of: missing header on a zstd response, malformed sha256: value, hash unknown to your local registry. Wrong-dict zstd produces garbage bytes — failing fast is the only safe option.
The matching server-side helper in sglang and vLLM is set_zstd_dict(stream_format, dict_bytes) — see sglang » zstd, with a dict.
Translating across vocabularies
from codecai import Translator
qwen = await load_map(
url="https://cdn.jsdelivr.net/gh/wdunn001/codec-maps/maps/qwen/qwen2.json",
hash="sha256:887311099cdc09e7022001a01fa1da396750d669b7ed2c242a000b9badd09791",
)
llama = await load_map(
url="https://cdn.jsdelivr.net/gh/wdunn001/codec-maps/maps/meta-llama/llama-3.json",
hash="sha256:79b707aea8c2b41c2883ec7913b0c4a0c880044ac844d89a9a03e779eb92db04",
)
tr = Translator(qwen, llama)
async for frame in decode_msgpack_stream(qwen_resp.aiter_raw()):
llama_ids = tr.translate(frame.ids, partial=not frame.done)
forward_to_llama_agent(llama_ids)
Cross-vocab handoff that never goes through UTF-8. See Translator.
A complete agent loop
The packages/demo-python/ directory in the main repo has a runnable agent example with real tool dispatch. Highlights:
import asyncio, httpx
from codecai import Detokenizer, ToolWatcher, decode_msgpack_stream, load_map
async def run_turn(prompt_ids, map, http):
detok = Detokenizer(map)
watcher = ToolWatcher(map, start="<tool_call>", end="</tool_call>")
async with http.stream("POST", SERVER + "/v1/completions", json={
"model": MODEL, "prompt": prompt_ids,
"stream_format": "msgpack", "max_tokens": 512,
}) as resp:
async for frame in decode_msgpack_stream(resp.aiter_raw()):
for ev in watcher.feed(frame.ids):
if ev.kind == "passthrough":
yield ("text", detok.render(ev.ids, partial=not frame.done))
else:
yield ("tool", detok.render(ev.ids))
if frame.done:
return frame.finish_reason
Yields ("text", str) for normal output and ("tool", str) when the model emits a tool-call region.
Production checklist
- Pin the map hash. Mismatch = supply-chain alarm.
- Use
client.stream, notclient.post. Otherwise the entire response buffers in memory. - Reuse
DetokenizerandBPETokenizeracross requests; both are stateless across-stream (the detokenizer’s buffer resets at each new stream). - Async ergonomics. Wrap the decode loop in a
try/finallyif you need to clean up —aiter_raw()doesn’t auto-close.