Introduction
Imagine you are running a busy restaurant. When a customer orders a meal, the waiter takes the order to the kitchen, stands next to the chef for ten minutes waiting for the food to cook, and only returns to serve other customers once the meal is ready. The restaurant would go out of business on its first day.
Unfortunately, this is exactly how traditional synchronous Python web servers (like standard Flask or WSGI-based Django) operate.
When a client calls your backend API to stream a Large Language Model (LLM) response (which can take 5 to 10 seconds of waiting for token generation over the network), a synchronous server thread blocks. It cannot process any other user requests until that network stream completes. If your server is configured with 20 worker threads, your 21st user will experience a connection timeout, even if your server's CPU and memory usage are near zero.
To handle 10,000+ concurrent connections without buying expensive server clusters, we must use Asynchronous Python (asyncio) with frameworks like FastAPI or Tornado. Asynchronous servers act like smart waiters: they place an order with the kitchen, immediately go serve other tables, and return to collect the food only when the kitchen signals it is ready.
This guide details how to build non-blocking streaming servers in Python.
The Asynchronous Event Loop Mechanics
The following diagram illustrates how a single-threaded async event loop schedules and executes multiple concurrent client connections, polling non-blocking sockets without blocking processing:
Client A Request Client B Request Client C Request
│ │ │
▼ ▼ ▼
+─────────────────────────────────────────────────────────────+
| FastAPI / Tornado Server |
| |
| +───────────────────────────────────────────────────────+ |
| | Asyncio Event Loop | |
| | | |
| | +------------+ +------------+ +------------+ | |
| | | Task A | | Task B | | Task C | | |
| | | (Client A) | | (Client B) | | (Client C) | | |
| | +----+-------+ +----+-------+ +----+-------+ | |
| | | | | | |
| +───────┼───────────────┼───────────────┼───────────────+ |
+──────────┼───────────────┼───────────────┼──────────────────+
│ │ │
v (Await IO) v (Await IO) v (Await IO)
+-----+-----+ +-----+-----+ +-----+-----+
| Gemini | | Database | | Cache |
| Network | | Read | | Lookup |
+-----------+ +-----------+ +-----------+
When Task A awaits network IO from the Gemini API, the event loop pauses its execution, registers the socket, and switches to Task B. This co-operative multitasking allows a single thread to handle thousands of open connections.
Step-by-Step FastAPI Implementation
To keep an asynchronous server fast under load, we must maintain a shared connection pool using httpx.AsyncClient and stream responses back to the client chunk-by-chunk.
# filepath: src/server.py
import asyncio
import os
import sys
from typing import AsyncGenerator
import httpx
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
app = FastAPI(title="High-Concurrency LLM Backend")
# Configure a shared connection pool
# max_keepalive_connections: Number of idle connections to keep open
# max_connections: Maximum limit of concurrent sockets
connection_limits = httpx.Limits(max_keepalive_connections=200, max_connections=1000)
async_client = httpx.AsyncClient(limits=connection_limits, timeout=60.0)
class GenerationRequest(BaseModel):
prompt: str
async def gemini_stream_generator(prompt: str) -> AsyncGenerator[bytes, None]:
'''
Streams token responses directly from Google's Gemini API
without blocking the application event loop.
'''
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
yield b"Error: GEMINI_API_KEY environment variable is missing."
return
url = (
f"https://generativelanguage.googleapis.com/v1beta/models/"
f"gemini-1.5-flash:streamGenerateContent?key={api_key}"
)
payload = {
"contents": [{
"parts": [{"text": prompt}]
}]
}
try:
# Stream response from external API
async with async_client.stream("POST", url, json=payload) as response:
if response.status_code != 200:
yield f"API Error (Status {response.status_code})".encode()
return
# Read the stream chunks as they arrive
async for raw_chunk in response.aiter_bytes():
yield raw_chunk
# Co-operatively yield control back to the event loop
await asyncio.sleep(0.001)
except httpx.RequestError as exc:
yield f"Network failure during LLM connection: {str(exc)}".encode()
except Exception as exc:
yield f"Unexpected stream disruption: {str(exc)}".encode()
@app.post("/api/v1/generate-stream")
async def generate_stream(request: GenerationRequest):
if not request.prompt.strip():
raise HTTPException(status_code=400, detail="Prompt cannot be empty.")
return StreamingResponse(
gemini_stream_generator(request.prompt),
media_type="text/event-stream"
)
@app.on_event("shutdown")
async def shutdown_event():
# Clean up the shared connection pool during server shutdown
await async_client.aclose()
Alternative: Tornado Concurrency Handler
Tornado is a mature, high-performance asynchronous web framework that is well-suited for raw network socket handling. Here is how you implement a non-blocking streaming handler in Tornado:
# filepath: src/tornado_server.py
import os
import json
import tornado.ioloop
import tornado.web
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
class StreamHandler(tornado.web.RequestHandler):
async def post(self):
body = json.loads(self.request.body)
prompt = body.get("prompt", "")
api_key = os.environ.get("GEMINI_API_KEY")
url = (
f"https://generativelanguage.googleapis.com/v1beta/models/"
f"gemini-1.5-flash:streamGenerateContent?key={api_key}"
)
payload = {
"contents": [{"parts": [{"text": prompt}]}]
}
client = AsyncHTTPClient()
def handle_chunk(chunk):
# Write chunks directly to the response buffer as they are received
self.write(chunk)
self.flush()
request = HTTPRequest(
url,
method="POST",
headers={"Content-Type": "application/json"},
body=json.dumps(payload),
streaming_callback=handle_chunk,
request_timeout=60.0
)
await client.fetch(request)
self.finish()
def make_app():
return tornado.web.Application([
(r"/api/v1/stream", StreamHandler),
])
if __name__ == "__main__":
app = make_app()
app.listen(8080)
tornado.ioloop.IOLoop.current().start()
Technical Deep Dive: Event Loops & Concurrency Tuning
1. The Cardinal Sin: Blocking the Event Loop
Because an asynchronous server runs on a single execution thread, any blocking call halts all concurrent requests.
- Do not use
time.sleep(): This blocks the thread. Useawait asyncio.sleep(). - Do not use standard SQL or HTTP clients: Synchronous clients like
requestsorpsycopg2block the loop. Use async clients likehttpxorasyncpg. - Identifying Blocking Calls: If a request blocks the event loop for longer than 50 milliseconds, the event loop will lag. You can debug this by enabling debug mode in your development environment:
import logging
asyncio.run(main(), debug=True)
# Logs warning messages like: "Executing <Task...> took 0.150 seconds"
2. Offloading CPU-Bound Operations
If your server needs to run CPU-heavy operations (such as resizing images, parsing large JSON blocks, or running local machine learning models), you must offload those tasks to a thread pool or process pool executor:
from concurrent.futures import ProcessPoolExecutor
import asyncio
process_pool = ProcessPoolExecutor(max_workers=4)
def heavy_image_processing(image_bytes: bytes) -> bytes:
# Synchronous, CPU-heavy work
return processed_bytes
async def process_image_route(raw_image: bytes):
loop = asyncio.get_running_loop()
# Offload the work to run on a separate CPU core
result = await loop.run_in_executor(
process_pool,
heavy_image_processing,
raw_image
)
return result
3. Tuning the Linux Kernel and Host Environment
- File Descriptor Limits: Every open socket connection is treated as a file descriptor by the operating system. By default, Linux limits file descriptors to 1,024 per process. Increase this value in your deployment scripts to support thousands of concurrent connections:
ulimit -n 65536
import uvloop
import asyncio
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
Cross-Reading Recommendations
For details on managing stateful channels or configuring database backends, check out these articles:
- Scaling Stateful WebSockets: Event-Driven Real-Time Sync with FastAPI & Redis: Learn how to scale persistent WebSocket channels across multiple server instances.
- Architecting Agentic RAG: Production AI Knowledge Systems with Gemini & PostgreSQL: Learn how to query vector databases asynchronously without blocking your application event loop.
References & Official Documentation
- Python Library: Asyncio Concurrency Documentation
- Web Framework: FastAPI Async Concurrency Guide
- Async Client: HTTPX Async Client Reference Docs
Feedback & Collaboration
Designing high-concurrency backends in Python requires careful attention to the event loop. What tools do you use to monitor event loop blocking in production? Have you migrated legacy synchronous codebases to async?
I would love to learn about your experiences. Share your thoughts on my Resume Portal or write a note on my Portfolio Portal's Connect tab.