App usage patterns
Apps and tasks can interact in various ways: calling each other via HTTP, webhooks, WebSockets, or direct browser usage. This page describes the different patterns and when to use them.
Patterns overview
- Call app from task: A task makes HTTP requests to an app
- Call task from app (webhooks / APIs): An app triggers task execution via the Flyte SDK
- Call app from app: One app makes HTTP requests to another app
- WebSocket-based interaction: Real-time, bidirectional communication
- Browser-based access: Users access apps directly through the browser
Call app from task
Tasks can call apps by making HTTP requests to the app’s endpoint. This is useful when:
- You need to use a long-running service during task execution
- You want to call a model serving endpoint from a batch processing task
- You need to interact with an API from a workflow
Example: Task calling an app
"""Example of a task calling an app."""
import httpx
from fastapi import FastAPI
import flyte
from flyte.app.extras import FastAPIAppEnvironment
app = FastAPI(title="Add One", description="Adds one to the input", version="1.0.0")
image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("fastapi", "uvicorn", "httpx")
app_env = FastAPIAppEnvironment(
name="add-one-app",
app=app,
description="Adds one to the input",
image=image,
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=False,
)
task_env = flyte.TaskEnvironment(
name="add_one_task_env",
image=image,
resources=flyte.Resources(cpu=1, memory="512Mi"),
depends_on=[app_env], # Ensure app is deployed before task runs
)
@app.get("/")
async def add_one(x: int) -> dict[str, int]:
"""Main endpoint for the add-one app."""
return {"result": x + 1}
@task_env.task
async def add_one_task(x: int) -> int:
print(f"Calling app at {app_env.endpoint}")
async with httpx.AsyncClient() as client:
response = await client.get(app_env.endpoint, params={"x": x})
response.raise_for_status()
return response.json()["result"]
Key points:
- The task environment uses
depends_on=[app_env]to ensure the app is deployed first - Access the app endpoint via
app_env.endpoint - Use standard HTTP client libraries (like
httpx) to make requests
Call task from app (webhooks / APIs)
Apps can trigger task execution using the Flyte SDK. This is useful for:
- Webhooks that trigger workflows
- APIs that need to run batch jobs
- Services that need to execute tasks asynchronously
Webhooks are HTTP endpoints that trigger actions in response to external events. Flyte apps can serve as webhook endpoints that trigger task runs, workflows, or other operations.
Example: Basic webhook app
Here’s a simple webhook that triggers Flyte tasks:
"""A webhook that triggers Flyte tasks."""
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment
WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()
async def verify_token(
credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
"""Verify the API key from the bearer token."""
if credentials.credentials != WEBHOOK_API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
return credentials
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize Flyte before accepting requests."""
await flyte.init_in_cluster.aio()
yield
# Cleanup if needed
app = FastAPI(
title="Flyte Webhook Runner",
description="A webhook service that triggers Flyte task runs",
version="1.0.0",
lifespan=lifespan,
)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
project: str,
domain: str,
name: str,
version: str,
inputs: dict,
credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
"""
Trigger a Flyte task run via webhook.
Returns information about the launched run.
"""
# Fetch the task
task = await remote.TaskDetails.fetch(
project=project,
domain=domain,
name=name,
version=version,
)
# Run the task
run = await flyte.run.aio(task, **inputs)
return {
"url": run.url,
"id": run.id,
"status": "started",
}
env = FastAPIAppEnvironment(
name="webhook-runner",
app=app,
description="A webhook service that triggers Flyte task runs",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"fastapi",
"uvicorn",
),
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=False, # We handle auth in the app
env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)
Once deployed, you can trigger tasks via HTTP POST:
curl -X POST "https://your-webhook-url/run-task/flytesnacks/development/my_task/v1" \
-H "Authorization: Bearer test-api-key" \
-H "Content-Type: application/json" \
-d '{"input_key": "input_value"}'Response:
{
"url": "https://console.union.ai/...",
"id": "abc123",
"status": "started"
}Advanced webhook patterns
Webhook with validation
Use Pydantic for input validation:
from pydantic import BaseModel
class TaskInput(BaseModel):
data: dict
priority: int = 0
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
project: str,
domain: str,
name: str,
version: str,
inputs: TaskInput, # Validated input
credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
task = await remote.TaskDetails.fetch(
project=project,
domain=domain,
name=name,
version=version,
)
run = await flyte.run.aio(task, **inputs.dict())
return {
"run_id": run.id,
"url": run.url,
}Webhook with response waiting
Wait for task completion:
@app.post("/run-task-and-wait/{project}/{domain}/{name}/{version}")
async def run_task_and_wait(
project: str,
domain: str,
name: str,
version: str,
inputs: dict,
credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
task = await remote.TaskDetails.fetch(
project=project,
domain=domain,
name=name,
version=version,
)
run = await flyte.run.aio(task, **inputs)
run.wait() # Wait for completion
return {
"run_id": run.id,
"url": run.url,
"status": run.status,
"outputs": run.outputs(),
}Webhook with secret management
Use Flyte secrets for API keys:
env = FastAPIAppEnvironment(
name="webhook-runner",
app=app,
secrets=flyte.Secret(key="webhook-api-key", as_env_var="WEBHOOK_API_KEY"),
# ...
)Then access in your app:
WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY")Webhook security and best practices
- Authentication: Always secure webhooks with authentication (API keys, tokens, etc.).
- Input validation: Validate webhook inputs using Pydantic models.
- Error handling: Handle errors gracefully and return meaningful error messages.
- Async operations: Use async/await for I/O operations.
- Health checks: Include health check endpoints.
- Logging: Log webhook requests for debugging and auditing.
- Rate limiting: Consider implementing rate limiting for production.
Security considerations:
- Store API keys in Flyte secrets, not in code.
- Always use HTTPS in production.
- Validate all inputs to prevent injection attacks.
- Implement proper access control mechanisms.
- Log all webhook invocations for security auditing.
Example: GitHub webhook
Here’s an example webhook that triggers tasks based on GitHub events:
from fastapi import FastAPI, Request, Header
import hmac
import hashlib
app = FastAPI(title="GitHub Webhook Handler")
@app.post("/github-webhook")
async def github_webhook(
request: Request,
x_hub_signature_256: str = Header(None),
):
"""Handle GitHub webhook events."""
body = await request.body()
# Verify signature
secret = os.getenv("GITHUB_WEBHOOK_SECRET")
signature = hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
expected_signature = f"sha256={signature}"
if not hmac.compare_digest(x_hub_signature_256, expected_signature):
raise HTTPException(status_code=403, detail="Invalid signature")
# Process webhook
event = await request.json()
event_type = request.headers.get("X-GitHub-Event")
if event_type == "push":
# Trigger deployment task
task = await remote.TaskDetails.fetch(...)
run = await flyte.run.aio(task, commit=event["after"])
return {"run_id": run.id, "url": run.url}
return {"status": "ignored"}Call app from app
Apps can call other apps by making HTTP requests. This is useful for:
- Microservice architectures
- Proxy/gateway patterns
- A/B testing setups
- Service composition
Example: App calling another app
import httpx
from fastapi import FastAPI
import flyte
from flyte.app.extras import FastAPIAppEnvironment
# Backend app
app1 = FastAPI(title="Backend API")
env1 = FastAPIAppEnvironment(
name="backend-api",
app=app1,
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"fastapi", "uvicorn", "httpx"
),
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=False,
)
@app1.get("/greeting/{name}")
async def greeting(name: str) -> str:
return f"Hello, {name}!"
# Frontend app that calls the backend
app2 = FastAPI(title="Frontend API")
env2 = FastAPIAppEnvironment(
name="frontend-api",
app=app2,
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"fastapi", "uvicorn", "httpx"
),
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=False,
depends_on=[env1], # Ensure backend is deployed first
)
@app2.get("/greeting/{name}")
async def greeting_proxy(name: str):
"""Proxy that calls the backend app."""
async with httpx.AsyncClient() as client:
response = await client.get(f"{env1.endpoint}/greeting/{name}")
response.raise_for_status()
return response.json()Key points:
- Use
depends_on=[env1]to ensure dependencies are deployed first - Access the app endpoint via
env1.endpoint - Use HTTP clients (like
httpx) to make requests between apps
Using AppEndpoint input
You can pass app endpoints as inputs for more flexibility:
env2 = FastAPIAppEnvironment(
name="frontend-api",
app=app2,
inputs=[
flyte.app.Input(
name="backend_url",
value=flyte.app.AppEndpoint(app_name="backend-api"),
env_var="BACKEND_URL",
),
],
# ...
)
@app2.get("/greeting/{name}")
async def greeting_proxy(name: str):
backend_url = os.getenv("BACKEND_URL")
async with httpx.AsyncClient() as client:
response = await client.get(f"{backend_url}/greeting/{name}")
return response.json()WebSocket-based patterns
WebSockets enable bidirectional, real-time communication between clients and servers. Flyte apps can serve WebSocket endpoints for real-time applications like chat, live updates, or streaming data.
Example: Basic WebSocket app
Here’s a simple FastAPI app with WebSocket support:
"""A FastAPI app with WebSocket support."""
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import asyncio
import json
from datetime import UTC, datetime
import flyte
from flyte.app.extras import FastAPIAppEnvironment
app = FastAPI(
title="Flyte WebSocket Demo",
description="A FastAPI app with WebSocket support",
version="1.0.0",
)
class ConnectionManager:
"""Manages WebSocket connections."""
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
"""Accept and register a new WebSocket connection."""
await websocket.accept()
self.active_connections.append(websocket)
print(f"Client connected. Total: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
"""Remove a WebSocket connection."""
self.active_connections.remove(websocket)
print(f"Client disconnected. Total: {len(self.active_connections)}")
async def send_personal_message(self, message: str, websocket: WebSocket):
"""Send a message to a specific WebSocket connection."""
await websocket.send_text(message)
async def broadcast(self, message: str):
"""Broadcast a message to all active connections."""
for connection in self.active_connections:
try:
await connection.send_text(message)
except Exception as e:
print(f"Error broadcasting: {e}")
manager = ConnectionManager()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time communication."""
await manager.connect(websocket)
try:
# Send welcome message
await manager.send_personal_message(
json.dumps({
"type": "system",
"message": "Welcome! You are connected.",
"timestamp": datetime.now(UTC).isoformat(),
}),
websocket,
)
# Listen for messages
while True:
data = await websocket.receive_text()
# Echo back to sender
await manager.send_personal_message(
json.dumps({
"type": "echo",
"message": f"Echo: {data}",
"timestamp": datetime.now(UTC).isoformat(),
}),
websocket,
)
# Broadcast to all clients
await manager.broadcast(
json.dumps({
"type": "broadcast",
"message": f"Broadcast: {data}",
"timestamp": datetime.now(UTC).isoformat(),
"connections": len(manager.active_connections),
})
)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(
json.dumps({
"type": "system",
"message": "A client disconnected",
"connections": len(manager.active_connections),
})
)
env = FastAPIAppEnvironment(
name="websocket-app",
app=app,
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"fastapi",
"uvicorn",
"websockets",
),
resources=flyte.Resources(cpu=1, memory="1Gi"),
requires_auth=False,
)
WebSocket patterns
Echo server
@app.websocket("/echo")
async def echo(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
passBroadcast server
@app.websocket("/broadcast")
async def broadcast(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
await manager.broadcast(data)
except WebSocketDisconnect:
manager.disconnect(websocket)Real-time data streaming
@app.websocket("/stream")
async def stream_data(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Generate or fetch data
data = {"timestamp": datetime.now().isoformat(), "value": random.random()}
await websocket.send_json(data)
await asyncio.sleep(1) # Send update every second
except WebSocketDisconnect:
passChat application
class ChatRoom:
def __init__(self, name: str):
self.name = name
self.connections: list[WebSocket] = []
async def join(self, websocket: WebSocket):
self.connections.append(websocket)
async def leave(self, websocket: WebSocket):
self.connections.remove(websocket)
async def broadcast(self, message: str, sender: WebSocket):
for connection in self.connections:
if connection != sender:
await connection.send_text(message)
rooms: dict[str, ChatRoom] = {}
@app.websocket("/chat/{room_name}")
async def chat(websocket: WebSocket, room_name: str):
await websocket.accept()
if room_name not in rooms:
rooms[room_name] = ChatRoom(room_name)
room = rooms[room_name]
await room.join(websocket)
try:
while True:
data = await websocket.receive_text()
await room.broadcast(data, websocket)
except WebSocketDisconnect:
await room.leave(websocket)Using WebSockets with Flyte tasks
You can trigger Flyte tasks from WebSocket messages:
@app.websocket("/task-runner")
async def task_runner(websocket: WebSocket):
await websocket.accept()
try:
while True:
# Receive task request
message = await websocket.receive_text()
request = json.loads(message)
# Trigger Flyte task
task = await remote.TaskDetails.fetch(
project=request["project"],
domain=request["domain"],
name=request["task"],
version=request["version"],
)
run = await flyte.run.aio(task, **request["inputs"])
# Send run info back
await websocket.send_json({
"run_id": run.id,
"url": run.url,
"status": "started",
})
# Optionally stream updates
async for update in run.stream():
await websocket.send_json({
"status": update.status,
"message": update.message,
})
except WebSocketDisconnect:
passWebSocket client example
Connect from Python:
import asyncio
import websockets
import json
async def client():
uri = "ws://your-app-url/ws"
async with websockets.connect(uri) as websocket:
# Send message
await websocket.send("Hello, Server!")
# Receive message
response = await websocket.recv()
print(f"Received: {response}")
asyncio.run(client())Browser-based apps
For browser-based apps (like Streamlit), users interact directly through the web interface. The app URL is accessible in a browser, and users interact with the UI directly - no API calls needed from other services.
To access a browser-based app:
- Deploy the app
- Navigate to the app URL in a browser
- Interact with the UI directly
Best practices
- Use
depends_on: Always specify dependencies to ensure proper deployment order. - Handle errors: Implement proper error handling for HTTP requests.
- Use async clients: Use async HTTP clients (
httpx.AsyncClient) in async contexts. - Initialize Flyte: For apps calling tasks, initialize Flyte in the app’s startup.
- Endpoint access: Use
app_env.endpointorAppEndpointinput for accessing app URLs. - Authentication: Consider authentication when apps call each other (set
requires_auth=Trueif needed). - Webhook security: Secure webhooks with auth, validation, and HTTPS.
- WebSocket robustness: Implement connection management, heartbeats, and rate limiting.
Summary
| Pattern | Use Case | Implementation |
|---|---|---|
| Task → App | Batch processing using inference services | HTTP requests from task |
| App → Task | Webhooks, APIs triggering workflows | Flyte SDK in app |
| App → App | Microservices, proxies, agent routers, LLM routers | HTTP requests between apps |
| Browser → App | User-facing dashboards | Direct browser access |
Choose the pattern that best fits your architecture and requirements.