Towards AIblog

Every Python Concept a Generative AI Developer Actually Needs to Know

Monday, June 22, 2026DhanushKumarView original
Last Updated on June 22, 2026 by Editorial Team Author(s): DhanushKumar Originally published on Towards AI. Every Python Concept a Generative AI Developer Actually Needs to Know From async coroutines that power real-time LLM streaming, to memory tricks that let you process million-document datasets — the complete map, written for engineers building with AI today. Most Python tutorials teach you the language. This one teaches you the language as a GenAI engineer uses it — where every concept has a direct line to a real problem you will hit building LLM pipelines, RAG systems, and AI agents. Async / Await — The Heartbeat of Every LLM App Here is the brutal truth about building LLM applications: your code spends most of its time waiting. Waiting for llm to respond. Waiting for an embedding API. Waiting for a vector database. Without async, you serve one user at a time. With async, you serve thousands concurrently — on a single thread. What actually happens when you write await When Python hits an await expression, it pauses the current coroutine and hands control back to the event loop. The event loop looks at everything else that's ready to run, makes progress on it, and returns to your coroutine once the awaited result is available. No threads. No OS context switches. Pure cooperative multitasking. import asyncio import anthropicclient = anthropic.AsyncAnthropic()async def ask_claude(prompt : str, label : str) -> str : # Every await is a potential pause - but only if something else needs CPU message = await client.messages.create( model = "claude-opus-4-5",max_tokens = 512, messages=[{"role":"user","content":prompt}] return f"[{label}]{message.content[0].text}"async def main(): questions = [ ("What is a transformer architecture?", "A"), ("Explain RAG in one paragraph.", "B"), ("What is chain-of-thought prompting?", "C"), ("Describe the attention mechanism briefly.", "D"), ("What is a vector database used for?", "E"), ] # All 5 fire at once - total time ≈ slowest single call (~2s) # Sequential would take ~10s results = await asyncio.gather( *[ask_claude(q, l) for q, l in questions] ) for r in results: print(r)asyncio.run(main()) ⚡ Real-World Impact Sequential LLM calls for 100 documents × 3 seconds each = 5 minutes. With asyncio.gather() they run concurrently and finish in ~3–5 seconds. That's a 60× speedup with zero extra hardware. Tasks: fire and forget (then collect later) asyncio.create_task() schedules a coroutine immediately without waiting for it. This lets you kick off parallel work and collect results later — perfect for RAG pipelines where you retrieve from a vector store and a web search at the same time. async def rag_pipeline(query: str) -> str : # phase 1 : kick off both retrievals simultaneously task_vector = asyncio.create_task(search_vector_db(query)) task_web = asyncio.create_task(search_web(query)) # both run concurrently while we do other prep work system_prompt = "You are a helpful research assistant" # collect results - awaiting blocks only unitl each is redy vector_hits , web_hits = await task_vector, await task_web context = build_context(vector_hits, web_hits) #phase 2 : single llm call with full context return await call_llm(system_prompt, context, query)total : max(vector_latency,web_latency) + llm_latency Streaming tokens in real time with async generators ChatGPT-style streaming — where tokens appear as they’re generated — requires async generators. Instead of waiting for the full response, you yield each token as it arrives and forward it to the client immediately. import anthropicclient = anthropic.AsyncAnthropic()async def stream_response(prompt: str): """ Async generator - yields tokens as they arrive from the llm " async with client.messages.stream( model = "claude-opus-4.5", max_tokens = 1024, messages = [{"role":"user", "content"prompt}]) as stream: async for text in stream.text_stream: yield text # each token arrives here ~50 ms apart async def handle_request(prompt: str): full_text = "" async for token in stream_response(prompt): print(token, end="", flush=True) # real-time display full_text += token print() return full_textasyncio.run(handle_request("Explain diffusion models simply.")) Locks: protecting shared state across coroutines Even though asyncio is single-threaded, race conditions exist. If two coroutines both read-then-write a shared counter without a lock, you’ll get wrong results. asyncio.Lock ensures only one coroutine is inside the critical section at a time. import asynciofrom collections import defualtdictrequest_counts : dict[str,int] = defaultdict(int)lock = asyncio.Lock() async def tracked_embed(Text : str , model : str) -> list[float]: async with lock: request_counts[model] += 1 if request_counts[model] > 1000: raise RuntimeError(f"Daily limit hit for {model}") return await call_embedding_api(text,model) Section 02 : Threading — When Your Library Doesn’t Speak Async Many powerful Python libraries — requests, some database drivers, HuggingFace's synchronous API — are blocking. You can't just slap await on them. But you also can't leave performance on the table. Threading is the answer. The GIL: what it blocks and what it doesn’t The Global Interpreter Lock (GIL) is a mutex in CPython that prevents more than one thread from running Python bytecode at the same time. It sounds like threading is useless — but it isn’t, because the GIL is released during: I/O operations : Network calls, file reads/writes, socket operations. The GIL releases while the OS handles I/O. → Threads work great here C extensions NumPy, PyTorch ops, SciPy — all run C code that releases the GIL for the duration. → Threads work great here Pure Python CPU Loops, string operations, pure Python math. The GIL never releases — threads don’t help. → Use multiprocessing instead threadpool_embedding.py from concurrent.futures import ThreadPoolExecutor, as_completedfrom sentence_transformers import SentenceTransformer#blocking library - can't use asyncio but threads work finemodel = SentenceTransformer("all-MiniLM-L6-V2")def embed_text(text : str, idx : int ) -> tuple : embedding = model.encode(text) # GIL released - C extension runs return idx, embedding.tolist()texts = [f"Document chunk {i} " for i in range(50)]with ThreadPoolExecutor(max_workers = 8) as pool:futures = {pool.submit(embed_text, t, i): i for i, t in enumerate(texts)} results = {} for future in as_completed(futures): idx, embedding = future.result() results[idx] = embeddingprint(f"Embedded {len(results)} chunks") Synchronization primitives — the full toolkit import threading, timemodel_ready = threading.Event()api_sem = threading.Semaphore(5) # max 5 concurrent inferencedef load_model(): print("Loading model weights...") time.sleep(3) # simulate loading 7B param model model_ready.set() # unblocks ALL waiting threads at once print("Model ready!")def inference_worker(worker_id: int): model_ready.wait() # block here until model is loaded with api_sem: # at most 5 […]