# Hybrid app-task graphs

Apps and tasks can interact with each other: tasks can call apps via HTTP, and apps can trigger task execution via the Flyte SDK. This page covers both patterns.

## Call app from task

Tasks can call apps by making HTTP requests to the app's endpoint. This is useful when:

- You need to use a long-running service during task execution
- You want to call a model serving endpoint from a batch processing task
- You need to interact with an API from a workflow

### Example: FastAPI app called from a task 

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
#    "httpx",
# ]
# ///

"""Example of a task calling an app."""

import pathlib
import httpx
from fastapi import FastAPI
import flyte
from flyte.app.extras import FastAPIAppEnvironment

app = FastAPI(title="Add One", description="Adds one to the input", version="1.0.0")

image = flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages("fastapi", "uvicorn", "httpx")

# {{docs-fragment app-definition}}
app_env = FastAPIAppEnvironment(
    name="add-one-app",
    app=app,
    description="Adds one to the input",
    image=image,
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
)
# {{/docs-fragment app-definition}}

# {{docs-fragment task-env}}
task_env = flyte.TaskEnvironment(
    name="add_one_task_env",
    image=image,
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    depends_on=[app_env],  # Ensure app is deployed before task runs
)
# {{/docs-fragment task-env}}

# {{docs-fragment app-endpoint}}
@app.get("/")
async def add_one(x: int) -> dict[str, int]:
    """Main endpoint for the add-one app."""
    return {"result": x + 1}
# {{/docs-fragment app-endpoint}}

# {{docs-fragment task}}
@task_env.task
async def add_one_task(x: int) -> int:
    print(f"Calling app at {app_env.endpoint}")
    async with httpx.AsyncClient() as client:
        response = await client.get(app_env.endpoint, params={"x": x})
        response.raise_for_status()
        return response.json()["result"]
# {{/docs-fragment task}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    deployments = flyte.deploy(task_env)
    print(f"Deployed task environment: {deployments}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/task_calling_app.py*

Key points:

- The task environment uses `depends_on=[app_env]` to ensure the app is deployed first
- Access the app endpoint via `app_env.endpoint`
- Use standard HTTP client libraries (like `httpx`) to make requests

## Call task from app (webhooks / APIs)

Apps can trigger task execution using the Flyte SDK. This is useful for:

- Webhooks that trigger workflows
- APIs that need to run batch jobs
- Services that need to execute tasks asynchronously

Webhooks are HTTP endpoints that trigger actions in response to external events. Flyte apps can serve as webhook endpoints that trigger task runs, workflows, or other operations.

### Example: Basic webhook app

Here's a simple webhook that triggers Flyte tasks:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A webhook that triggers Flyte tasks."""

import pathlib
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

# {{docs-fragment auth}}
WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
    """Verify the API key from the bearer token."""
    if credentials.credentials != WEBHOOK_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials
# {{/docs-fragment auth}}

# {{docs-fragment lifespan}}
@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield
    # Cleanup if needed
# {{/docs-fragment lifespan}}

# {{docs-fragment app}}
app = FastAPI(
    title="Flyte Webhook Runner",
    description="A webhook service that triggers Flyte task runs",
    version="1.0.0",
    lifespan=lifespan,
)

@app.get("/health")
async def health_check():
    """Health check endpoint."""
    return {"status": "healthy"}
# {{/docs-fragment app}}

# {{docs-fragment webhook-endpoint}}
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: dict,
    credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
    """
    Trigger a Flyte task run via webhook.

    Returns information about the launched run.
    """
    # Fetch the task
    task = remote.Task.get(
        project=project,
        domain=domain,
        name=name,
        version=version,
    )

    # Run the task
    run = await flyte.run.aio(task, **inputs)

    return {
        "url": run.url,
        "id": run.id,
        "status": "started",
    }
# {{/docs-fragment webhook-endpoint}}

# {{docs-fragment env}}
env = FastAPIAppEnvironment(
    name="webhook-runner",
    app=app,
    description="A webhook service that triggers Flyte task runs",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,  # We handle auth in the app
    env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)
# {{/docs-fragment env}}

# {{docs-fragment deploy}}
if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed webhook: {app_deployment[0].summary_repr()}")
# {{/docs-fragment deploy}}
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/webhook/basic_webhook.py*

Once deployed, you can trigger tasks via HTTP POST:

```bash
curl -X POST "https://your-webhook-url/run-task/flytesnacks/development/my_task/v1" \
  -H "Authorization: Bearer test-api-key" \
  -H "Content-Type: application/json" \
  -d '{"input_key": "input_value"}'
```

Response:

```json
{
  "url": "https://console.union.ai/...",
  "id": "abc123",
  "status": "started"
}
```

### Advanced webhook patterns

**Webhook with validation**

Use Pydantic for input validation:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A webhook with Pydantic validation."""

import pathlib
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
from pydantic import BaseModel
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
    """Verify the API key from the bearer token."""
    if credentials.credentials != WEBHOOK_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield

app = FastAPI(
    title="Flyte Webhook Runner with Validation",
    description="A webhook service that triggers Flyte task runs with Pydantic validation",
    version="1.0.0",
    lifespan=lifespan,
)

# {{docs-fragment validation-model}}
class TaskInput(BaseModel):
    data: dict
    priority: int = 0
# {{/docs-fragment validation-model}}

# {{docs-fragment validated-webhook}}
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: TaskInput,  # Validated input
    credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
    task = remote.Task.get(
        project=project,
        domain=domain,
        name=name,
        version=version,
    )

    run = await flyte.run.aio(task, **inputs.model_dump())

    return {
        "run_id": run.id,
        "url": run.url,
    }
# {{/docs-fragment validated-webhook}}

env = FastAPIAppEnvironment(
    name="webhook-with-validation",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed webhook: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/webhook_validation.py*

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A webhook with Pydantic validation."""

import pathlib
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
from pydantic import BaseModel
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
    """Verify the API key from the bearer token."""
    if credentials.credentials != WEBHOOK_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield

app = FastAPI(
    title="Flyte Webhook Runner with Validation",
    description="A webhook service that triggers Flyte task runs with Pydantic validation",
    version="1.0.0",
    lifespan=lifespan,
)

# {{docs-fragment validation-model}}
class TaskInput(BaseModel):
    data: dict
    priority: int = 0
# {{/docs-fragment validation-model}}

# {{docs-fragment validated-webhook}}
@app.post("/run-task/{project}/{domain}/{name}/{version}")
async def run_task(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: TaskInput,  # Validated input
    credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
    task = remote.Task.get(
        project=project,
        domain=domain,
        name=name,
        version=version,
    )

    run = await flyte.run.aio(task, **inputs.model_dump())

    return {
        "run_id": run.id,
        "url": run.url,
    }
# {{/docs-fragment validated-webhook}}

env = FastAPIAppEnvironment(
    name="webhook-with-validation",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed webhook: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/webhook_validation.py*

**Webhook with response waiting**

Wait for task completion:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A webhook that waits for task completion."""

import pathlib
from fastapi import FastAPI, HTTPException, Security
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from starlette import status
import os
from contextlib import asynccontextmanager
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY", "test-api-key")
security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> HTTPAuthorizationCredentials:
    """Verify the API key from the bearer token."""
    if credentials.credentials != WEBHOOK_API_KEY:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Could not validate credentials",
        )
    return credentials

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield

app = FastAPI(
    title="Flyte Webhook Runner (Wait for Completion)",
    description="A webhook service that triggers Flyte task runs and waits for completion",
    version="1.0.0",
    lifespan=lifespan,
)

# {{docs-fragment wait-webhook}}
@app.post("/run-task-and-wait/{project}/{domain}/{name}/{version}")
async def run_task_and_wait(
    project: str,
    domain: str,
    name: str,
    version: str,
    inputs: dict,
    credentials: HTTPAuthorizationCredentials = Security(verify_token),
):
    task = remote.Task.get(
        project=project,
        domain=domain,
        name=name,
        version=version,
    )

    run = await flyte.run.aio(task, **inputs)
    run.wait()  # Wait for completion

    return {
        "run_id": run.id,
        "url": run.url,
        "status": run.status,
        "outputs": run.outputs(),
    }
# {{/docs-fragment wait-webhook}}

env = FastAPIAppEnvironment(
    name="webhook-wait-completion",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    env_vars={"WEBHOOK_API_KEY": os.getenv("WEBHOOK_API_KEY", "test-api-key")},
)

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed webhook: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/webhook_wait.py*

**Webhook with secret management**

Use Flyte secrets for API keys:

```python
env = FastAPIAppEnvironment(
    name="webhook-runner",
    app=app,
    secrets=flyte.Secret(key="webhook-api-key", as_env_var="WEBHOOK_API_KEY"),
    # ...
)
```

Then access in your app:

```python
WEBHOOK_API_KEY = os.getenv("WEBHOOK_API_KEY")
```

### Webhook security and best practices

- **Authentication**: Always secure webhooks with authentication (API keys, tokens, etc.).
- **Input validation**: Validate webhook inputs using Pydantic models.
- **Error handling**: Handle errors gracefully and return meaningful error messages.
- **Async operations**: Use async/await for I/O operations.
- **Health checks**: Include health check endpoints.
- **Logging**: Log webhook requests for debugging and auditing.
- **Rate limiting**: Consider implementing rate limiting for production.

Security considerations:

- Store API keys in Flyte secrets, not in code.
- Always use HTTPS in production.
- Validate all inputs to prevent injection attacks.
- Implement proper access control mechanisms.
- Log all webhook invocations for security auditing.

### Example: GitHub webhook

Here's an example webhook that triggers tasks based on GitHub events:

```
# /// script
# requires-python = ">=3.12"
# dependencies = [
#    "flyte>=2.0.0b52",
#    "fastapi",
# ]
# ///

"""A GitHub webhook that triggers Flyte tasks based on GitHub events."""

import pathlib
import hmac
import hashlib
import os
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, Header, HTTPException
import flyte
import flyte.remote as remote
from flyte.app.extras import FastAPIAppEnvironment

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize Flyte before accepting requests."""
    await flyte.init_in_cluster.aio()
    yield

app = FastAPI(
    title="GitHub Webhook Handler",
    description="Triggers Flyte tasks based on GitHub events",
    version="1.0.0",
    lifespan=lifespan,
)

# {{docs-fragment github-webhook}}
@app.post("/github-webhook")
async def github_webhook(
    request: Request,
    x_hub_signature_256: str = Header(None),
):
    """Handle GitHub webhook events."""
    body = await request.body()

    # Verify signature
    secret = os.getenv("GITHUB_WEBHOOK_SECRET")
    signature = hmac.new(
        secret.encode(),
        body,
        hashlib.sha256
    ).hexdigest()

    expected_signature = f"sha256={signature}"
    if not hmac.compare_digest(x_hub_signature_256, expected_signature):
        raise HTTPException(status_code=403, detail="Invalid signature")

    # Process webhook
    event = await request.json()
    event_type = request.headers.get("X-GitHub-Event")

    if event_type == "push":
        # Trigger deployment task
        task = remote.Task.get(
            project="my-project",
            domain="development",
            name="deploy-task",
            version="v1",
        )
        run = await flyte.run.aio(task, commit=event["after"])
        return {"run_id": run.id, "url": run.url}

    return {"status": "ignored"}
# {{/docs-fragment github-webhook}}

# {{docs-fragment env}}
env = FastAPIAppEnvironment(
    name="github-webhook",
    app=app,
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "fastapi",
        "uvicorn",
    ),
    resources=flyte.Resources(cpu=1, memory="512Mi"),
    requires_auth=False,
    secrets=flyte.Secret(key="GITHUB_WEBHOOK_SECRET", as_env_var="GITHUB_WEBHOOK_SECRET"),
)
# {{/docs-fragment env}}

if __name__ == "__main__":
    flyte.init_from_config(root_dir=pathlib.Path(__file__).parent)
    app_deployment = flyte.deploy(env)
    print(f"Deployed GitHub webhook: {app_deployment[0].summary_repr()}")
```

*Source: https://github.com/unionai/unionai-examples/blob/main/v2/user-guide/build-apps/fastapi/github_webhook.py*

### Gradio agent UI

For AI agents, a Gradio app lets you build an interactive UI that kicks off agent runs. The app uses `flyte.with_runcontext()` to run the agent task either locally or on a remote cluster, controlled by an environment variable.

```python
import os
import flyte
import flyte.app
from research_agent import agent

RUN_MODE = os.getenv("RUN_MODE", "remote")

serving_env = flyte.app.AppEnvironment(
    name="research-agent-ui",
    image=flyte.Image.from_debian_base(python_version=(3, 12)).with_pip_packages(
        "gradio", "langchain-core", "langchain-openai", "langgraph",
    ),
    secrets=flyte.Secret(key="OPENAI_API_KEY", as_env_var="OPENAI_API_KEY"),
    port=7860,
)

def run_query(request: str):
    """Kick off the agent as a Flyte task."""
    result = flyte.with_runcontext(mode=RUN_MODE).run(agent, request=request)
    result.wait()
    return result.outputs()[0]

@serving_env.server
def app_server():
    create_demo().launch(server_name="0.0.0.0", server_port=7860)

if __name__ == "__main__":
    create_demo().launch()
```

The `RUN_MODE` variable gives you a smooth development progression:

1. **Fully local**: `RUN_MODE=local python agent_app.py`. Everything runs in your local Python environment, great for rapid iteration.
2. **Local app, remote task**: `python agent_app.py`. The UI runs locally but the agent executes on the cluster with full compute resources.
3. **Full remote**: `flyte deploy agent_app.py serving_env`. Both the UI and agent run on the cluster.

## Best practices

1. **Use `depends_on`**: Always specify dependencies to ensure proper deployment order.
2. **Handle errors**: Implement proper error handling for HTTP requests.
3. **Use async clients**: Use async HTTP clients (`httpx.AsyncClient`) in async contexts.
4. **Initialize Flyte**: For apps calling tasks, initialize Flyte in the app's startup.
5. **Endpoint access**: Use `app_env.endpoint` or `AppEndpoint` parameter for accessing app URLs.
6. **Webhook security**: Secure webhooks with auth, validation, and HTTPS.

---
**Source**: https://github.com/unionai/unionai-docs/blob/main/content/user-guide/build-apps/hybrid-graphs.md
**HTML**: https://www.union.ai/docs/v2/union/user-guide/build-apps/hybrid-graphs/
