# Actors
> This bundle contains all pages in the Actors section.
> Source: https://www.union.ai/docs/v1/union/user-guide/core-concepts/actors/

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/actors ===

# Actors

> **📝 Note**
>
> An LLM-optimized bundle of this entire section is available at [`section.md`](https://www.union.ai/docs/v1/union/user-guide/core-concepts/section.md).
> This single file contains all pages in this section, optimized for AI coding agent context.

Actors allow you to reuse a container and environment between tasks, avoiding the cost of starting a new container for each task. This can be useful when you have a task that requires a lot of setup or has a long startup time.

To create an actor, instantiate the [`ActorEnvironment`](https://www.union.ai/docs/v1/union/api-reference/union-sdk/packages/union.actor/page.md) class, then add the instance as a decorator to the task that requires that environment.

### `ActorEnvironment` parameters

* **container_image:** The container image to use for the task. This container must have the `union` python package installed, so this must be updated from the default (i.e. `cr.flyte.org/flyteorg/flytekit:py3.11-latest`).

* **environment:** Environment variables as key, value pairs in a Python dictionary.
* **limits:** Compute resource limits.
* **replica_count:** The number of workers to provision that are able to accept tasks.
* **requests:** Compute resource requests per task.
* **secret_requests:** Keys (ideally descriptive) that can identify the secrets supplied at runtime. For more information, see [Managing secrets](https://www.union.ai/docs/v1/union/user-guide/development-cycle/managing-secrets/page.md).
* **ttl_seconds:** How long to keep the Actor alive while no tasks are being run.

The following example shows how to create a basic `ActorEnvironment` and use it for one task:

```python
# hello_world.py

import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    requests=union.Resources(
        cpu="2",
        mem="300Mi",
    ),
    container_image=image,
)

@actor.task
def say_hello() -> str:
    return "hello"

@union.workflow
def wf():
    say_hello()
```

You can learn more about the trade-offs between actors and regular tasks, as well as the efficiency gains you can expect **Core concepts > Actors > Actors and regular tasks**.

## Caching on Actor Replicas

The `@actor_cache` decorator provides a powerful mechanism to cache the results of Python callables on individual actor replicas. This is particularly beneficial for workflows involving repetitive tasks, such as data preprocessing, model loading, or initialization of shared resources, where caching can minimize redundant operations and improve overall efficiency. Once a callable is cached on a replica, subsequent tasks that use the same actor can access the cached result, significantly improving performance and efficiency.

### When to Use `@actor_cache`

- **Shared Initialization Costs:**
  For expensive, shared initialization processes that multiple tasks rely on.

- **Repetitive Task Execution:**
  When tasks repeatedly require the same resource or computation on the same actor replica.

- **Complex Object Caching:**
  Use custom Python objects as keys to define unique cache entries.

Below is a simplified example showcasing the use of `@actor_cache` for caching repetitive tasks. This dummy example demonstrates caching model that is loaded by the `load_model` task.

```python
# caching_basic.py

from time import sleep
import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    container_image=image,
    replica_count=1,
)

@union.actor_cache
def load_model(state: int) -> callable:
    sleep(4)  # simulate model loading
    return lambda value: state + value

@actor.task
def evaluate(value: int, state: int) -> int:
    model = load_model(state=state)
    return model(value)

@union.workflow
def wf(init_value: int = 1, state: int = 3) -> int:
    out = evaluate(value=init_value, state=state)
    out = evaluate(value=out, state=state)
    out = evaluate(value=out, state=state)
    out = evaluate(value=out, state=state)
    return out
```

> [!NOTE]
> In order to get the `@actor_cache` functionality, you must pin `union` to at least `0.1.121`.

![Actor caching example 1](https://www.union.ai/docs/v1/union/_static/images/user-guide/core-concepts/actors/caching/actor-cache-example-1.png)

You can see that the first call of `evaluate` took considerable time as it involves allocating a node for the task, creating a container, and loading the model. The subsequent calls of `evaluate` execute in a fraction of the time.

You can see examples of more advanced actor usage **Core concepts > Actors > Actor examples**.

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/actors/actors-and-regular-tasks ===

# Actors and regular tasks

When deciding whether to use actors or traditional tasks in your workflows, it's important to consider the benefits
and trade-offs. This page outlines key scenarios where actors shine and where they may not be the best fit.

| When to Use Actors | When Not to Use Actors |
| ------------------ | ---------------------- |
| **Short Running Tasks** Traditional tasks spin up a new container and pod for each task, which adds overhead. Actors allow tasks to run on the same container, removing the repeated cost of pod creation, image pulling, and initialization. Actors offer the most benefit for short running tasks where the startup overhead is a larger component of total task runtime. | **Long Running Tasks** For long running tasks, container initialization overhead is minimal, therefore the performance benefits of actors become negligible when task runtime significantly exceeds startup time. |
| **Map Tasks with Large Input Arrays** Map tasks by default share the same image and resource definitions, making them a great use case for actors. Actors provide the greatest benefit when the input array is larger than the desired concurrency. For example, consider an input array with 2,000 entries and a concurrency level of 50. Without actors, map tasks would spin up 2,000 individual containers—one for each entry. With actors, only 50 containers are needed, corresponding to the number of replicas, dramatically reducing overhead. | **Map Tasks with Small Input Arrays** In a map task where the number of actor replicas matches the input array size, the same number of pods and container are initialized when a map task is used without an actor. For example, if there are 10 inputs and 10 replicas, 10 pods are created, resulting in no reduction in overhead. |
| **State Management and Efficient Initialization** Actors excel when state persistence between tasks is valuable. You can use `@actor_cache` to cache Python objects. For example, this lets you load a large model or dataset into memory once per replica, and access it across tasks run on that replica. You can also serve a model or initialize shared resources in an init container. Each task directed to that actor replica can then reuse the same model or resource. | **Strict Task Isolation Is Critical** While actors clear Python caches, global variables, and custom environment variables after each task, they still share the same container. The shared environment introduces edge cases where you could intentionally or unintentionally impact downstream tasks. For example, if you write to a file in one task, that file will remain mutated for the next task that is run on that actor replica. If strict isolation between tasks is a hard requirement, regular tasks provide a safer option. |
| **Shared Dependencies and Resources** If multiple tasks can use the same container image and have consistent resource requirements, actors are a natural fit. | |

# Efficiency Gains from Actors with Map Tasks

Let's see how using Actors with map tasks can cut runtime in half!

We compare three scenarios:

1. **Regular map tasks without specifying concurrency.** This is the fasted expected configuration as flyte will spawn as many pods as there are elements in the input array, allowing Kubernetes to manage scheduling based on available resources.
2. **Regular map tasks with fixed concurrency.** This limits the number of pods that are alive at any given time.
3. **Map tasks with Actors.** Here we set the number of replicas to match the concurrency of the previous example.

These will allow us to compare actors to vanilla map tasks when both speed is maximized and when alive pods are matched one-to-one.

## "Hello World" Benchmark

This benchmark simply runs a task that returns "Hello World", which is a near instantaneous task.

| Task Type      | Concurrency/Replicas | Duration (seconds) |
| -------------- | -------------------- | ------------------ |
| Without Actors | unbound              | 111                |
| Without Actors | 25                   | 1195               |
| With Actors    | 25                   | 42                 |

**Key Takeaway:** For near instantaneous tasks, using a 25-replica Actor with map tasks reduces runtime by 96% if live pods are matched, and 62% when map task concurrency is unbounded.

## "5s Sleep" Benchmark

This benchmark simply runs a task that sleeps for five seconds.

| Task Type      | Concurrency/Replicas | Duration (seconds) |
| -------------- | -------------------- | ------------------ |
| Without Actors | unbound              | 174                |
| Without Actors | 100                  | 507                |
| With Actors    | 100                  | 87                 |

**Key Takeaway:** For five-second long tasks, using a 100-replica Actor with map tasks reduces runtime by 83% if live pods are matched, and 50% when map task concurrency is unbounded.

If you have short running map tasks, you can cut your runtime in half. If you are already using concurrency limits on your map tasks, you can expect even better improvements!

=== PAGE: https://www.union.ai/docs/v1/union/user-guide/core-concepts/actors/actor-examples ===

# Actor examples

### Refactoring from Regular Tasks to Actors

Notice that converting a non-actor workflow to use actors is as simple as replacing the `@union.task` decorator with the `@actor.task` decorator. Additionally, task decorator arguments can be moved either to the actor environment or the actor task decorator, depending on whether they apply to the entire environment (e.g. resource specifications) or to a single task execution (e.g. caching arguments).

```diff
import union

+ actor = union.ActorEnvironment(
+    name = "myenv",
+    replica_count = 10,
+    ttl_seconds = 120,
+    requests = union.Resources(mem="1Gi"),
+    container_image = "myrepo/myimage-with-scipy:latest",
+)
+
- @union.task(requests=union.Resources(mem="1Gi"))
+ @actor.task
def add_numbers(a: float, b: float) -> float:
    return a + b

- @union.task(container_image="myrepo/myimage-with-scipy:latest")
+ @actor.task
def calculate_distance(point_a: list[int], point_b: list[int]) -> float:
    from scipy.spatial.distance import euclidean
    return euclidean(point_a, point_b)

- @union.task(cache=True, cache_version="v1")
+ @actor.task(cache=True, cache_version="v1")
def is_even(number: int) -> bool:
    return number % 2 == 0

@union.workflow
def distance_add_wf(point_a: list[int], point_b: list[int]) -> float:
    distance = calculate_distance(point_a=point_a, point_b=point_b)
    return add_numbers(a=distance, b=1.5)

@union.workflow
def is_even_wf(point_a: list[int]) -> list[bool]:
    return union.map(is_even)(number=point_a)
```
<!-- TODO: emphasize-lines: 2,3,4,5,6,7,8,9,10,11,13,18,24 -->

## Multiple instances of the same task

In this example, the `actor.task`-decorated task is invoked multiple times in one workflow, and will use the same `ActorEnvironment` on each invocation:

```python
# plus_one.py

import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=300,
    requests=union.Resources(cpu="2", mem="500Mi"),
    container_image=image,
)

@actor.task
def plus_one(input: int) -> int:
    return input + 1

@union.workflow
def wf(input: int = 0) -> int:
    a = plus_one(input=input)
    b = plus_one(input=a)
    c = plus_one(input=b)
    return plus_one(input=c)

```

## Multiple tasks

Every task execution in the following example will execute in the same `ActorEnvironment`.
You can use the same environment for multiple tasks in the same workflow and tasks across workflow definitions, using both subworkflows and launch plans:

```python
# multiple_tasks.py

import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    requests=union.Resources(cpu="1", mem="450Mi"),
    container_image=image,
)

@actor.task
def say_hello(name: str) -> str:
    return f"hello {name}"

@actor.task
def scream_hello(name: str) -> str:
    return f"HELLO {name}"

@union.workflow
def my_child_wf(name: str) -> str:
    return scream_hello(name=name)

my_child_wf_lp = union.LaunchPlan.get_default_launch_plan(union.current_context(), my_child_wf)

@union.workflow
def my_parent_wf(name: str) -> str:
    a = say_hello(name=name)
    b = my_child_wf(name=a)
    return my_child_wf_lp(name=b)
```

## Custom PodTemplates

Both tasks in the following example will be executed in the same `ActorEnvironment`, which is created with a `PodTemplate` for additional configuration.

```python
# pod_template.py

import os

from kubernetes.client.models import (
    V1Container,
    V1PodSpec,
    V1ResourceRequirements,
    V1EnvVar,
)
import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union", "flytekitplugins-pod"],
)

pod_template = union.PodTemplate(
    primary_container_name="primary",
    pod_spec=V1PodSpec(
        containers=[
            V1Container(
                name="primary",
                image=image,
                resources=V1ResourceRequirements(
                    requests={
                        "cpu": "1",
                        "memory": "1Gi",
                    },
                    limits={
                        "cpu": "1",
                        "memory": "1Gi",
                    },
                ),
                env=[V1EnvVar(name="COMP_KEY_EX", value="compile_time")],
            ),
        ],
    ),
)

actor = union.ActorEnvironment(
    name="my-actor",
    replica_count=1,
    ttl_seconds=30,
    pod_template=pod_template,
)

@actor.task
def get_and_set() -> str:
    os.environ["RUN_KEY_EX"] = "run_time"
    return os.getenv("COMP_KEY_EX")

@actor.task
def check_set() -> str:
    return os.getenv("RUN_KEY_EX")

@union.workflow
def wf() -> tuple[str,str]:
    return get_and_set(), check_set()
```

## Example: `@actor_cache` with `map`

With map tasks, each task is executed within the same environment, making actors a natural fit for this pattern. If a task has an expensive operation, like model loading, caching it with `@actor_cache` can improve performance. This example shows how to cache model loading in a mapped task to avoid redundant work and save resources.

```python
# caching_map_task.py

from functools import partial
from pathlib import Path
from time import sleep
import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    container_image=image,
    replica_count=2,
)

class MyModel:
    """Simple model that multiples value with model_state."""

    def __init__(self, model_state: int):
        self.model_state = model_state

    def __call__(self, value: int):
        return self.model_state * value

@union.task(container_image=image, cache=True, cache_version="v1")
def create_model_state() -> union.FlyteFile:
    working_dir = Path(union.current_context().working_directory)
    model_state_path = working_dir / "model_state.txt"
    model_state_path.write_text("4")
    return model_state_path

@union.actor_cache
def load_model(model_state_path: union.FlyteFile) -> MyModel:
    # Simulate model loading time. This can take a long time
    # because the FlyteFile download is large, or when the
    # model is loaded onto the GPU.
    sleep(10)
    with model_state_path.open("r") as f:
        model_state = int(f.read())

    return MyModel(model_state=model_state)

@actor.task
def inference(value: int, model_state_path: union.FlyteFile) -> int:
    model = load_model(model_state_path)
    return model(value)

@union.workflow
def run_inference(values: list[int] = list(range(20))) -> list[int]:
    model_state = create_model_state()
    inference_ = partial(inference, model_state_path=model_state)
    return union.map(inference_)(value=values)
```

## Example: Caching with Custom Objects

Finally, we can cache custom objects by defining the `__hash__` and `__eq__` methods. These methods allow `@actor_cache` to determine if an object is the same between runs, ensuring that expensive operations are skipped if the object hasn’t changed.

```python
# caching_custom_object.py

from time import sleep
import os

import union

image = union.ImageSpec(
    registry=os.environ.get("DOCKER_REGISTRY", None),
    packages=["union"],
)

actor = union.ActorEnvironment(
    name="my-actor",
    container_image=image,
    replica_count=1,
)

class MyObj:
    def __init__(self, state: int):
        self.state = state

    def __hash__(self):
        return hash(self.state)

    def __eq__(self, other):
        return self.state == other.state

@union.actor_cache
def get_state(obj: MyObj) -> int:
    sleep(2)
    return obj.state

@actor.task
def construct_and_get_value(state: int) -> int:
    obj = MyObj(state=state)
    return get_state(obj)

@union.workflow
def wf(state: int = 2) -> int:
    value = construct_and_get_value(state=state)
    value = construct_and_get_value(state=value)
    value = construct_and_get_value(state=value)
    value = construct_and_get_value(state=value)
    return value
```

