flytekit.core.context_manager
These classes provide functionality related context management.
| Class |
Description |
BranchEvalMode |
This is a 3-way class, with the None value meaning that we are not within a conditional context. |
CompilationState |
Compilation state is used during the compilation of a workflow or task. |
ExecutionParameters |
This is a run-time user-centric context object that is accessible to every @task method. |
ExecutionState |
This is the context that is active when executing a task or a local workflow. |
FlyteContext |
This is an internal-facing context object, that most users will not have to deal with. |
FlyteContextManager |
FlyteContextManager manages the execution context within Flytekit. |
FlyteEntities |
This is a global Object that tracks various tasks and workflows that are declared within a VM during the. |
OutputMetadata |
|
OutputMetadataTracker |
This class is for the users to set arbitrary metadata on output literals. |
SecretsManager |
This provides a secrets resolution logic at runtime. |
| Protocol |
Description |
SerializableToString |
This protocol is used by the Artifact create_from function. |
| Property |
Type |
Description |
flyte_context_Var |
ContextVar |
|
This is a 3-way class, with the None value meaning that we are not within a conditional context. The other two
values are
- Active - This means that the next
then should run
- Skipped - The next
then should not run
Compilation state is used during the compilation of a workflow or task. It stores the nodes that were
created when walking through the workflow graph.
Attributes:
prefix (str): This is because we may one day want to be able to have subworkflows inside other workflows. If
users choose to not specify their node names, then we can end up with multiple “n0"s. This prefix allows
us to give those nested nodes a distinct name, as well as properly identify them in the workflow.
mode (int): refer to flytekit.extend.ExecutionState.Mode
task_resolver (Optional[TaskResolverMixin]): Please see flytekit.extend.TaskResolverMixin
nodes (Optional[List]): Stores currently compiled nodes so far.
class CompilationState(
prefix: str,
mode: int,
task_resolver: Optional[TaskResolverMixin],
nodes: List,
)
| Parameter |
Type |
Description |
prefix |
str |
|
mode |
int |
|
task_resolver |
Optional[TaskResolverMixin] |
|
nodes |
List |
|
| Method |
Description |
add_node() |
|
with_params() |
Create a new CompilationState where the mode and task resolver are defaulted to the current object, but they. |
| Parameter |
Type |
Description |
n |
Node |
|
def with_params(
prefix: str,
mode: Optional[int],
resolver: Optional[TaskResolverMixin],
nodes: Optional[List],
) -> CompilationState
Create a new CompilationState where the mode and task resolver are defaulted to the current object, but they
and all other args are taken if explicitly provided as an argument.
Usage:
s.with_params(“p”, nodes=[])
| Parameter |
Type |
Description |
prefix |
str |
|
mode |
Optional[int] |
|
resolver |
Optional[TaskResolverMixin] |
|
nodes |
Optional[List] |
|
This is a run-time user-centric context object that is accessible to every @task method. It can be accessed using
flytekit.current_context()
This object provides the following objections
- a statsd handler
- a logging handler
- the execution ID as an
flytekit.models.core.identifier.WorkflowExecutionIdentifier object
- a working directory for the user to write arbitrary files to
Please do not confuse this object with the flytekit.FlyteContext object.
class ExecutionParameters(
execution_date,
tmp_dir,
stats,
execution_id: typing.Optional[_identifier.WorkflowExecutionIdentifier],
logging,
raw_output_prefix,
output_metadata_prefix,
checkpoint,
decks,
task_id: typing.Optional[_identifier.Identifier],
enable_deck: bool,
kwargs,
)
| Parameter |
Type |
Description |
execution_date |
|
Date when the execution is running |
tmp_dir |
|
temporary directory for the execution |
stats |
|
handle to emit stats |
execution_id |
typing.Optional[_identifier.WorkflowExecutionIdentifier] |
Identifier for the execution |
logging |
|
handle to logging |
raw_output_prefix |
|
|
output_metadata_prefix |
|
|
checkpoint |
|
Checkpoint Handle to the configured checkpoint system |
decks |
|
|
task_id |
typing.Optional[_identifier.Identifier] |
|
enable_deck |
bool |
|
kwargs |
**kwargs |
|
| Property |
Type |
Description |
checkpoint |
None |
|
decks |
None |
A list of decks of the tasks, and it will be rendered to a html at the end of the task execution. |
default_deck |
None |
|
enable_deck |
None |
Returns whether deck is enabled or not |
execution_date |
None |
This is a datetime representing the time at which a workflow was started. This is consistent across all tasks executed in a workflow or sub-workflow. > [!NOTE] > Do NOT use this execution_date to drive any production logic. It might be useful as a tag for data to help in debugging. |
execution_id |
None |
This is the identifier of the workflow execution within the underlying engine. It will be consistent across all task executions in a workflow or sub-workflow execution. > [!NOTE] > Do NOT use this execution_id to drive any production logic. This execution ID should only be used as a tag on output data to link back to the workflow run that created it. |
logging |
None |
A handle to a useful logging object. TODO: Usage examples |
output_metadata_prefix |
None |
|
raw_output_prefix |
None |
|
secrets |
None |
|
stats |
None |
A handle to a special statsd object that provides usefully tagged stats. TODO: Usage examples and better comments |
task_id |
None |
At production run-time, this will be generated by reading environment variables that are set by the backend. |
timeline_deck |
None |
|
working_directory |
None |
A handle to a special working directory for easily producing temporary files. TODO: Usage examples |
def get(
key: str,
) -> typing.Any
Returns task specific context if present else raise an error. The returned context will match the key
| Parameter |
Type |
Description |
key |
str |
|
def has_attr(
attr_name: str,
) -> bool
| Parameter |
Type |
Description |
attr_name |
str |
|
def new_builder(
current: Optional[ExecutionParameters],
) -> Builder
| Parameter |
Type |
Description |
current |
Optional[ExecutionParameters] |
|
def with_enable_deck(
enable_deck: bool,
) -> Builder
| Parameter |
Type |
Description |
enable_deck |
bool |
|
This is the context that is active when executing a task or a local workflow. This carries the necessary state to
execute.
Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the
user etc.
Attributes:
mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc).
working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs
are uploaded
engine_dir (os.PathLike):
branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute.
user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd
handler, a logging handler, the current execution id and a working directory.
class ExecutionState(
working_dir: Union[os.PathLike, str],
mode: Optional[ExecutionState.Mode],
engine_dir: Optional[Union[os.PathLike, str]],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
)
| Parameter |
Type |
Description |
working_dir |
Union[os.PathLike, str] |
|
mode |
Optional[ExecutionState.Mode] |
|
engine_dir |
Optional[Union[os.PathLike, str]] |
|
branch_eval_mode |
Optional[BranchEvalMode] |
|
user_space_params |
Optional[ExecutionParameters] |
|
| Method |
Description |
branch_complete() |
Indicates that we are within a conditional / ifelse block and the active branch is not done. |
is_local_execution() |
|
take_branch() |
Indicates that we are within an if-else block and the current branch has evaluated to true. |
with_params() |
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values. |
Indicates that we are within a conditional / ifelse block and the active branch is not done.
Default to SKIPPED
Indicates that we are within an if-else block and the current branch has evaluated to true.
Useful only in local execution mode
def with_params(
working_dir: Optional[os.PathLike],
mode: Optional[Mode],
engine_dir: Optional[os.PathLike],
branch_eval_mode: Optional[BranchEvalMode],
user_space_params: Optional[ExecutionParameters],
) -> ExecutionState
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.
| Parameter |
Type |
Description |
working_dir |
Optional[os.PathLike] |
|
mode |
Optional[Mode] |
|
engine_dir |
Optional[os.PathLike] |
|
branch_eval_mode |
Optional[BranchEvalMode] |
|
user_space_params |
Optional[ExecutionParameters] |
|
This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally
available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and
compile workflows, serialize Flyte entities, etc.
Even though this object as a current_context function on it, it should not be called directly. Please use the
flytekit.FlyteContextManager object instead.
Please do not confuse this object with the flytekit.ExecutionParameters object.
class FlyteContext(
file_access: FileAccessProvider,
level: int,
flyte_client: Optional['friendly_client.SynchronousFlyteClient'],
compilation_state: Optional[CompilationState],
execution_state: Optional[ExecutionState],
serialization_settings: Optional[SerializationSettings],
in_a_condition: bool,
origin_stackframe: Optional[traceback.FrameSummary],
output_metadata_tracker: Optional[OutputMetadataTracker],
worker_queue: Optional[Controller],
)
| Parameter |
Type |
Description |
file_access |
FileAccessProvider |
|
level |
int |
|
flyte_client |
Optional['friendly_client.SynchronousFlyteClient'] |
|
compilation_state |
Optional[CompilationState] |
|
execution_state |
Optional[ExecutionState] |
|
serialization_settings |
Optional[SerializationSettings] |
|
in_a_condition |
bool |
|
origin_stackframe |
Optional[traceback.FrameSummary] |
|
output_metadata_tracker |
Optional[OutputMetadataTracker] |
|
worker_queue |
Optional[Controller] |
|
| Property |
Type |
Description |
user_space_params |
None |
|
This method exists only to maintain backwards compatibility. Please use
FlyteContextManager.current_context() instead.
Users of flytekit should be wary not to confuse the object returned from this function
with {{< py_func_ref flytekit.current_context >}}
def enter_conditional_section()
Returns the deck that was created as part of the last execution.
The return value depends on the execution environment. In a notebook, the return value is compatible with
IPython.display and should be rendered in the notebook.
with flytekit.new_context() as ctx:
my_task(...)
ctx.get_deck()
OR if you wish to explicitly display
from IPython import display
display(ctx.get_deck())
def get_origin_stackframe_repr()
def new_compilation_state(
prefix: str,
) -> CompilationState
Creates and returns a default compilation state. For most of the code this should be the entrypoint
of compilation, otherwise the code should always uses - with_compilation_state
| Parameter |
Type |
Description |
prefix |
str |
|
def new_execution_state(
working_dir: Optional[os.PathLike],
) -> ExecutionState
Creates and returns a new default execution state. This should be used at the entrypoint of execution,
in all other cases it is preferable to use with_execution_state
| Parameter |
Type |
Description |
working_dir |
Optional[os.PathLike] |
|
def set_stackframe(
s: traceback.FrameSummary,
)
| Parameter |
Type |
Description |
s |
traceback.FrameSummary |
|
def with_client(
c: SynchronousFlyteClient,
) -> Builder
| Parameter |
Type |
Description |
c |
SynchronousFlyteClient |
|
def with_compilation_state(
c: CompilationState,
) -> Builder
| Parameter |
Type |
Description |
c |
CompilationState |
|
def with_execution_state(
es: ExecutionState,
) -> Builder
| Parameter |
Type |
Description |
es |
ExecutionState |
|
def with_file_access(
fa: FileAccessProvider,
) -> Builder
| Parameter |
Type |
Description |
fa |
FileAccessProvider |
|
def with_new_compilation_state()
def with_output_metadata_tracker(
t: OutputMetadataTracker,
) -> Builder
| Parameter |
Type |
Description |
t |
OutputMetadataTracker |
|
def with_serialization_settings(
ss: SerializationSettings,
) -> Builder
| Parameter |
Type |
Description |
ss |
SerializationSettings |
|
def with_worker_queue(
wq: Controller,
) -> Builder
| Parameter |
Type |
Description |
wq |
Controller |
|
FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation
or Execution. It is not thread-safe and can only be run as a single threaded application currently.
Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState
and ExecutionState for more information. FlyteContextManager provides a singleton stack to manage these contexts.
Typical usage is
FlyteContextManager.initialize()
with FlyteContextManager.with_context(o) as ctx:
pass
# If required - not recommended you can use
FlyteContextManager.push_context()
# but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
def add_signal_handler(
handler: typing.Callable[[int, FrameType], typing.Any],
)
| Parameter |
Type |
Description |
handler |
typing.Callable[[int, FrameType], typing.Any] |
|
def get_origin_stackframe(
limit,
) -> traceback.FrameSummary
| Parameter |
Type |
Description |
limit |
|
|
Re-initializes the context and erases the entire context
def push_context(
ctx: FlyteContext,
f: Optional[traceback.FrameSummary],
) -> FlyteContext
| Parameter |
Type |
Description |
ctx |
FlyteContext |
|
f |
Optional[traceback.FrameSummary] |
|
def with_context(
b: FlyteContext.Builder,
) -> Generator[FlyteContext, None, None]
| Parameter |
Type |
Description |
b |
FlyteContext.Builder |
|
This is a global Object that tracks various tasks and workflows that are declared within a VM during the
registration process
class OutputMetadata(
artifact: 'Artifact',
dynamic_partitions: Optional[typing.Dict[str, str]],
time_partition: Optional[datetime],
additional_items: Optional[typing.List[SerializableToString]],
)
| Parameter |
Type |
Description |
artifact |
'Artifact' |
|
dynamic_partitions |
Optional[typing.Dict[str, str]] |
|
time_partition |
Optional[datetime] |
|
additional_items |
Optional[typing.List[SerializableToString]] |
|
This class is for the users to set arbitrary metadata on output literals.
Attributes:
output_metadata Optional[TaskOutputMetadata]: is a sparse dictionary of metadata that the user wants to attach
to each output of a task. The key is the output value (object) and the value is an OutputMetadata object.
class OutputMetadataTracker(
output_metadata: typing.Dict[typing.Any, OutputMetadata],
)
| Parameter |
Type |
Description |
output_metadata |
typing.Dict[typing.Any, OutputMetadata] |
|
def add(
obj: typing.Any,
metadata: OutputMetadata,
)
| Parameter |
Type |
Description |
obj |
typing.Any |
|
metadata |
OutputMetadata |
|
def get(
obj: typing.Any,
) -> Optional[OutputMetadata]
| Parameter |
Type |
Description |
obj |
typing.Any |
|
def with_params(
output_metadata: Optional[TaskOutputMetadata],
) -> OutputMetadataTracker
Produces a copy of the current object and set new things
| Parameter |
Type |
Description |
output_metadata |
Optional[TaskOutputMetadata] |
|
This provides a secrets resolution logic at runtime.
The resolution order is
- Try env var first. The env var should have the configuration.SECRETS_ENV_PREFIX. The env var will be all upper
cased
- If not then try the file where the name matches lower case
configuration.SECRETS_DEFAULT_DIR/<group>/configuration.SECRETS_FILE_PREFIX<key>
All configuration values can always be overridden by injecting an environment variable
class SecretsManager(
secrets_cfg: typing.Optional[SecretsConfig],
)
| Parameter |
Type |
Description |
secrets_cfg |
typing.Optional[SecretsConfig] |
|
| Method |
Description |
get() |
Retrieves a secret using the resolution order -> Env followed by file. |
get_secrets_env_var() |
Returns a string that matches the ENV Variable to look for the secrets. |
get_secrets_file() |
Returns a path that matches the file to look for the secrets. |
def get(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
encode_mode: str,
) -> str
Retrieves a secret using the resolution order -> Env followed by file. If not found raises a ValueError
param encode_mode, defines the mode to open files, it can either be “r” to read file, or “rb” to read binary file
| Parameter |
Type |
Description |
group |
Optional[str] |
|
key |
Optional[str] |
|
group_version |
Optional[str] |
|
encode_mode |
str |
|
def get_secrets_env_var(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
) -> str
Returns a string that matches the ENV Variable to look for the secrets
| Parameter |
Type |
Description |
group |
Optional[str] |
|
key |
Optional[str] |
|
group_version |
Optional[str] |
|
def get_secrets_file(
group: Optional[str],
key: Optional[str],
group_version: Optional[str],
) -> str
Returns a path that matches the file to look for the secrets
| Parameter |
Type |
Description |
group |
Optional[str] |
|
key |
Optional[str] |
|
group_version |
Optional[str] |
|
This protocol is used by the Artifact create_from function. Basically these objects are serialized when running,
and then added to a literal’s metadata.
protocol SerializableToString()
def serialize_to_string(
ctx: FlyteContext,
variable_name: str,
) -> typing.Tuple[str, str]
| Parameter |
Type |
Description |
ctx |
FlyteContext |
|
variable_name |
str |
|