Hybrid app-task graphs
Apps and tasks can interact with each other: tasks can call apps via HTTP, and apps can trigger task execution via the Flyte SDK. This page covers both patterns.
Call app from task
Tasks can call apps by making HTTP requests to the app’s endpoint. This is useful when:
- You need to use a long-running service during task execution
- You want to call a model serving endpoint from a batch processing task
- You need to interact with an API from a workflow
Example: FastAPI app called from a task
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "flyte>=2.0.0b52",
# "fastapi",
# "httpx",
# ]
# ///
"""Example of a task calling an app."""
import pathlib
import httpx
from fastapi import FastAPI
import flyte
from flyte.app.extras import FastAPIAppEnvironment
app = FastAPI(title="Add One", description="Adds one to the input", version="1.0.0")
image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("fastapi", "uvicorn", "httpx")
app_env = FastAPIAppEnvironment(
name="add-one-app",
app=app,
description="Adds one to the input",
image=image,
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=False,
)
task_env = flyte.TaskEnvironment(
name="add_one_task_env",
image=image,
resources=flyte.Resources(cpu=1, memory="512Mi"),
depends_on=[app_env], # Ensure app is deployed before task runs
)
@app.get("/")
async def add_one(x: int) -> dict[str, int]:
"""Main endpoint for the add-one app."""
return {"result": x + 1}
@task_env.task
async def add_one_task(x: int) -> int:
print(f"Calling app at {app_env.endpoint}")
async with httpx.AsyncClient() as client:
response = await client.get(app_env.endpoint, params={"x": x})
response.raise_for_status()
return response.json()["result"]
if __name__ == "__main__":
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
deployments = flyte.deploy(task_env)
print(f"Deployed task environment: {deployments}")
Key points:
- The task environment uses
depends_on=[app_env]to ensure the app is deployed first - Access the app endpoint via
app_env.endpoint - Use standard HTTP client libraries (like
httpx) to make requests
Call task from app (webhooks / APIs)
Apps can trigger task execution using the Flyte SDK. This is useful for:
- Webhooks that trigger workflows
- APIs that need to run batch jobs
- Services that need to execute tasks asynchronously
Webhooks are HTTP endpoints that trigger actions in response to external events. Flyte apps can serve as webhook endpoints that trigger task runs, workflows, or other operations.
Example: Basic webhook app
Here’s a simple webhook that triggers Flyte tasks:
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "flyte>=2.0.0b52",
# "fastapi",
# ]
# ///
"""A webhook that triggers Flyte tasks."""
import pathlib
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment
WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()
async def verify_token(
credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
"""Verify the API key from the bearer token."""
if credentials.credentials != WEBHOOK_API_KEY:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Could not validate credentials",
)
return credentials
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize Flyte before accepting requests."""
await flyte.init_in_cluster.aio()
yield
# Cleanup if needed
app = FastAPI(
title="Flyte Webhook Runner",
description="A webhook service that triggers Flyte task runs",
version="1.0.0",
lifespan=lifespan,
)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "healthy"}
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
project: str,
domain: str,
name: str,
version: str,
inputs: dict,
credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
"""
Trigger a Flyte task run via webhook.
Returns information about the launched run.
"""
# Fetch the task
task = remote.Task.get(
project=project,
domain=domain,
name=name,
version=version,
)
# Run the task
run = await flyte.run.aio(task, **inputs)
return {
"url": run.url,
"id": run.id,
"status": "started",
}
env = FastAPIAppEnvironment(
name="webhook-runner",
app=app,
description="A webhook service that triggers Flyte task runs",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"fastapi",
"uvicorn",
),
resources=flyte.Resources(cpu=1, memory="512Mi"),
requires_auth=False, # We handle auth in the app
env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)
if __name__ == "__main__":
flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
app_deployment = flyte.deploy(env)
print(f"Deployed webhook: {app_deployment[0].summary_repr()}")
Once deployed, you can trigger tasks via HTTP POST:
curl -X POST "https://your-webhook-url/run-task/flytesnacks/development/my_task/v1" \
-H "Authorization: Bearer test-api-key" \
-H "Content-Type: application/json" \
-d '{"input_key": "input_value"}'Response:
{
"url": "https://console.union.ai/...",
"id": "abc123",
"status": "started"
}Advanced webhook patterns
Webhook with validation
Use Pydantic for input validation:
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
project: str,
domain: str,
name: str,
version: str,
inputs: TaskInput, # Validated input
credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
task = remote.Task.get(
project=project,
domain=domain,
name=name,
version=version,
)
run = await flyte.run.aio(task, **inputs.model_dump())
return {
"run_id": run.id,
"url": run.url,
}
Webhook with response waiting
Wait for task completion:
@app.post("/run-task-and-wait/{project}/{domain}/{name}/{version}")
async def run_task_and_wait(
project: str,
domain: str,
name: str,
version: str,
inputs: dict,
credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
task = remote.Task.get(
project=project,
domain=domain,
name=name,
version=version,
)
run = await flyte.run.aio(task, **inputs)
run.wait() # Wait for completion
return {
"run_id": run.id,
"url": run.url,
"status": run.status,
"outputs": run.outputs(),
}
Webhook with secret management
Use Flyte secrets for API keys:
env = FastAPIAppEnvironment(
name="webhook-runner",
app=app,
secrets=flyte.Secret(key="webhook-api-key", as_env_var="WEBHOOK_API_KEY"),
# ...
)Then access in your app:
WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY")Webhook security and best practices
- Authentication: Always secure webhooks with authentication (API keys, tokens, etc.).
- Input validation: Validate webhook inputs using Pydantic models.
- Error handling: Handle errors gracefully and return meaningful error messages.
- Async operations: Use async/await for I/O operations.
- Health checks: Include health check endpoints.
- Logging: Log webhook requests for debugging and auditing.
- Rate limiting: Consider implementing rate limiting for production.
Security considerations:
- Store API keys in Flyte secrets, not in code.
- Always use HTTPS in production.
- Validate all inputs to prevent injection attacks.
- Implement proper access control mechanisms.
- Log all webhook invocations for security auditing.
Example: GitHub webhook
Here’s an example webhook that triggers tasks based on GitHub events:
@app.post("/github-webhook")
async def github_webhook(
request: Request,
x_hub_signature_256: str = Header(None),
):
"""Handle GitHub webhook events."""
body = await request.body()
# Verify signature
secret = os.getenv("GITHUB_WEBHOOK_SECRET")
signature = hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
expected_signature = f"sha256={signature}"
if not hmac.compare_digest(x_hub_signature_256, expected_signature):
raise HTTPException(status_code=403, detail="Invalid signature")
# Process webhook
event = await request.json()
event_type = request.headers.get("X-GitHub-Event")
if event_type == "push":
# Trigger deployment task
task = remote.Task.get(
project="my-project",
domain="development",
name="deploy-task",
version="v1",
)
run = await flyte.run.aio(task, commit=event["after"])
return {"run_id": run.id, "url": run.url}
return {"status": "ignored"}
Gradio agent UI
For AI agents, a Gradio app lets you build an interactive UI that kicks off agent runs. The app uses flyte.with_runcontext() to run the agent task either locally or on a remote cluster, controlled by an environment variable.
import os
import flyte
import flyte.app
from research_agent import agent
RUN_MODE = os.getenv("RUN_MODE", "remote")
serving_env = flyte.app.AppEnvironment(
name="research-agent-ui",
image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
"gradio", "langchain-core", "langchain-openai", "langgraph",
),
secrets=flyte.Secret(key="OPENAI_API_KEY", as_env_var="OPENAI_API_KEY"),
port=7860,
)
def run_query(request: str):
"""Kick off the agent as a Flyte task."""
result = flyte.with_runcontext(mode=RUN_MODE).run(agent, request=request)
result.wait()
return result.outputs()[0]
@serving_env.server
def app_server():
create_demo().launch(server_name="0.0.0.0", server_port=7860)
if __name__ == "__main__":
create_demo().launch()The RUN_MODE variable gives you a smooth development progression:
- Fully local:
RUN_MODE=local python agent_app.py. Everything runs in your local Python environment, great for rapid iteration. - Local app, remote task:
python agent_app.py. The UI runs locally but the agent executes on the cluster with full compute resources. - Full remote:
flyte deploy agent_app.py serving_env. Both the UI and agent run on the cluster.
Best practices
- Use
depends_on: Always specify dependencies to ensure proper deployment order. - Handle errors: Implement proper error handling for HTTP requests.
- Use async clients: Use async HTTP clients (
httpx.AsyncClient) in async contexts. - Initialize Flyte: For apps calling tasks, initialize Flyte in the app’s startup.
- Endpoint access: Use
app_env.endpointorAppEndpointparameter for accessing app URLs. - Webhook security: Secure webhooks with auth, validation, and HTTPS.