Concurrency control
Concurrency control allows you to limit the number of concurrently running workflow executions for a specific launch plan, identified by its unique project
, domain
, and name
.
This control is applied across all versions of that launch plan.
To clone and run the example code on this page, see the Flytesnacks repo.
How it works
When a new execution for a launch plan with a ConcurrencyPolicy
is requested, Flyte performs a check to count the number of currently active executions for that same launch plan (project/domain/name
), irrespective of their versions.
This check is done using a database query that joins the executions
table with the launch_plans
table.
It filters for executions that are in an active phase (e.g., QUEUED
, RUNNING
, ABORTING
, etc.) and belong to the launch plan name being triggered.
If the number of active executions is already at or above the max_concurrency
limit defined in the policy of the launch plan version being triggered, the new execution will be handled according to the specified behavior
.
Basic usage
Here’s an example of how to define a launch plan with concurrency control:
from flytekit import ConcurrencyPolicy, ConcurrencyLimitBehavior, LaunchPlan, workflow
@workflow
def my_workflow() -> str:
return "Hello, World!"
# Create a launch plan with concurrency control
concurrency_limited_lp = LaunchPlan.get_or_create(
name="my_concurrent_lp",
workflow=my_workflow,
concurrency=ConcurrencyPolicy(
max_concurrency=3,
behavior=ConcurrencyLimitBehavior.SKIP,
),
)
Scheduled workflows with concurrency control
Concurrency control is particularly useful for scheduled workflows to prevent overlapping executions:
from flytekit import ConcurrencyPolicy, ConcurrencyLimitBehavior, CronSchedule, LaunchPlan, workflow
@workflow
def scheduled_workflow() -> str:
# This workflow might take a long time to complete
return "Processing complete"
# Create a scheduled launch plan with concurrency control
scheduled_lp = LaunchPlan.get_or_create(
name="my_scheduled_concurrent_lp",
workflow=scheduled_workflow,
concurrency=ConcurrencyPolicy(
max_concurrency=1, # Only allow one execution at a time
behavior=ConcurrencyLimitBehavior.SKIP,
),
schedule=CronSchedule(schedule="*/5 * * * *"), # Runs every 5 minutes
)
Defining the policy
A ConcurrencyPolicy
is defined with two main parameters:
max_concurrency
(integer): The maximum number of workflows that can be running concurrently for this launch plan name.behavior
(enum): What to do when themax_concurrency
limit is reached. Currently, onlySKIP
is supported, which means new executions will not be created if the limit is hit.
from flytekit import ConcurrencyPolicy, ConcurrencyLimitBehavior
policy = ConcurrencyPolicy(
max_concurrency=5,
behavior=ConcurrencyLimitBehavior.SKIP
)
Key behaviors and considerations
Version-agnostic check, version-specific enforcement
The concurrency check counts all active workflow executions of a given launch plan (project/domain/name
).
However, the enforcement (i.e., the max_concurrency
limit and behavior
) is based on the ConcurrencyPolicy
defined in the specific version of the launch plan you are trying to launch.
Example scenario:
- Launch plan
MyLP
versionv1
has aConcurrencyPolicy
withmax_concurrency = 3
. - Three executions of
MyLP
(they could bev1
or any other version) are currently running. - You try to launch
MyLP
versionv2
, which has aConcurrencyPolicy
withmax_concurrency = 10
.- Result: This
v2
execution will launch successfully because its own limit (10) is not breached by the current 3 active executions.
- Result: This
- Now, with 4 total active executions (3 original + the new
v2
), you try to launchMyLP
versionv1
again.- Result: This
v1
execution will fail. The check sees 4 active executions, andv1
’s policy only allows a maximum of 3.
- Result: This
Concurrency limit on manual trigger
Upon manual trigger of an execution (via pyflyte
for example) which would breach the concurrency limit, you should see this error in the console:
_InactiveRpcError:
<_InactiveRpcError of RPC that terminated with:
status = StatusCode.RESOURCE_EXHAUSTED
details = "Concurrency limit (1) reached for launch plan my_workflow_lp. Skipping execution."
>
Scheduled execution behavior
When the scheduler attempts to trigger an execution and the concurrency limit is met, the creation will fail and the error message from FlyteAdmin will be logged in FlyteScheduler logs. This will be transparent to the user. A skipped execution will not appear as skipped in the UI or project execution page.
Limitations
“At most” enforcement
While the system aims to respect max_concurrency
, it acts as an “at most” limit.
Due to the nature of scheduling, workflow execution durations, and the timing of the concurrency check (at launch time), there might be periods where the number of active executions is below max_concurrency
even if the system could theoretically run more.
For example, if max_concurrency
is 5 and all 5 workflows finish before the next scheduled check/trigger, the count will drop.
The system prevents exceeding the limit but doesn’t actively try to always maintain max_concurrency
running instances.
Notifications for skipped executions
Currently, there is no built-in notification system for skipped executions. When a scheduled execution is skipped due to concurrency limits, it will be logged in FlyteScheduler but no user notification will be sent. This is an area for future enhancement.
Best practices
- Use with scheduled workflows: Concurrency control is most beneficial for scheduled workflows that might take longer than the schedule interval to complete.
- Set appropriate limits: Consider your system resources and the resource requirements of your workflows when setting
max_concurrency
. - Monitor skipped executions: Regularly check FlyteAdmin logs to monitor if executions are being skipped due to concurrency limits.
- Version management: Be aware that different versions of the same launch plan can have different concurrency policies, but the check is performed across all versions.