Real-Time Word Streaming with Azure OpenAI and Python FastAPI
Written on
Introduction
User Requirement: The goal is to obtain responses from OpenAI instantly, with each word delivered as soon as it's generated.
Motivations: Real-time responses create a conversational experience that feels more human-like, enhancing user engagement and satisfaction.
User Journey
Solution Tech Stack
Underlying Technologies
Server-Sent Events (SSE)
SSE is a technology that facilitates a continuous connection between a client and a server, allowing real-time data to be pushed over HTTP.
In our application, employing SSE enables our Python application to send each word of the response immediately as it is created. This capability enhances the conversational experience by making it more dynamic and engaging.
Asynchronous Programming
Asynchronous programming enables tasks to run concurrently, meaning the completion of one task does not hinder the execution of others.
In our application, this allows the Python application to deliver each word of the response while simultaneously fetching the next segment from Azure OpenAI.
FastAPI
FastAPI is a web framework designed for building APIs with Python.
In this project, we utilize it to construct an API that accepts user prompts. These prompts are then forwarded to Azure OpenAI, which streams the response back to the user as it is generated.
The Infrastructure
The Terraform script provisions a Resource Group along with an Azure OpenAI instance and its associated model.
locals {
open_ai_instance_models = flatten([
for instance in var.open_ai_instances : [
for model in instance.models : {
instance_name = instance.name
model_name = model.name
model_version = model.version
}
]
])
}
resource "azurerm_resource_group" "resource_group" {
name = var.resource_group_name
location = var.location
}
resource "azurerm_cognitive_account" "ai_services" {
for_each = { for open_ai_instance in var.open_ai_instances : open_ai_instance.name => open_ai_instance }
name = each.value.name
location = each.value.region
resource_group_name = azurerm_resource_group.resource_group.name
kind = "OpenAI"
sku_name = each.value.sku
custom_subdomain_name = each.value.custom_subdomain_name
public_network_access_enabled = true
}
resource "azurerm_cognitive_deployment" "model" {
for_each = { for open_ai_instance_model in local.open_ai_instance_models : open_ai_instance_model.model_name => open_ai_instance_model }
name = each.value.model_name
cognitive_account_id = azurerm_cognitive_account.ai_services[each.value.instance_name].id
model {
format = "OpenAI"
name = each.value.model_name
version = each.value.model_version
}
scale {
type = "Standard"}
}
You can customize your deployment by modifying the vars.tfvars file with your desired values.
location = "uksouth"
resource_group_name = "azure-open-ai-rg"
open_ai_instances = [
{
name = "dev-openai-1"
region = "uksouth"
sku = "S0"
custom_subdomain_name = "ai-service-dev-openai-1"
models = [
{
name = "gpt-35-turbo"
version = "0301"
},
]
},
]
Retrieve the Endpoint, Keys, and Deployment Model name from the Azure Portal.
The Code
Application
The initial part of the application code includes:
- Initialization of the FastAPI application.
- Setting up authentication for Azure OpenAI.
- Defining a Prompt model that specifies the type of input expected from users, which is text in this case.
The application is now prepared to accept prompts and communicate with Azure OpenAI.
import os
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import openai
import uvicorn
# App
app = FastAPI()
# Azure OpenAI Authentication
endpoint = os.environ["AZURE_OPEN_AI_ENDPOINT"]
api_key = os.environ["AZURE_OPEN_AI_API_KEY"]
client = openai.AsyncAzureOpenAI(
azure_endpoint=endpoint,
api_key=api_key,
api_version="2023-09-01-preview"
)
# Azure OpenAI Model Configuration
deployment = os.environ["AZURE_OPEN_AI_DEPLOYMENT_MODEL"]
temperature = 0.7
# Prompt
class Prompt(BaseModel):
input: str
The subsequent section of the application code establishes the API for streaming responses from Azure OpenAI back to the user. Here’s what it does:
The stream function accepts user input (the prompt) and asynchronously requests a response from Azure OpenAI.
The stream_processor function processes the Azure OpenAI response asynchronously.
The response from the stream function is of type StreamingResponse, enabling SSE technology to stream the response word by word.
# Generate Stream
async def stream_processor(response):
async for chunk in response:
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if delta.content:
yield delta.content
# API Endpoint
@app.post("/stream")
async def stream(prompt: Prompt):
azure_open_ai_response = await client.chat.completions.create(
model=deployment,
temperature=temperature,
messages=[{"role": "user", "content": prompt.input}],
stream=True
)
return StreamingResponse(stream_processor(azure_open_ai_response), media_type="text/event-stream")
The final part of the code runs the application, making it accessible for users.
if __name__ == "__main__":
uvicorn.run("main:app", port=8000)
Testing
To test the application, execute the curl command below. Important notes to consider:
The application must be running while you run this command, so use a separate terminal.
You can modify the input prompt for the API.
You might need to adjust the application’s IP address based on your machine’s default settings.
curl -L
-H "Accept: application/json"
-H "Connection: close"
-H "Content-type: application/json"
-X POST -d '{"input": "write a random 100 word paragraph"}'
http://127.0.0.1:8000/stream --no-buffer
—verbose
Automation
The setup of the infrastructure and application can be automated in various ways. In the provided GitHub repository, a Makefile is utilized for automation. Follow the README in this repo to create the infrastructure and run the application independently.
Summary
In summary, we have successfully developed a solution that meets the user requirement—our API fetches responses from Azure OpenAI in real-time, delivering each word to the user as soon as it's available. This functionality allows the application to simulate human-like responses, greatly improving user experience.
For further development, consider integrating the Terraform code with your Azure Landing Zone and working towards deploying your application in production. You could also create a front-end interface (like a chatbot) and establish automated deployment pipelines.
About the Author: Rukaya Ogazi-Khan is an Azure architect at Version 1.