flytekit.core.promise
| Property |
Type |
Description |
T |
TypeVar |
|
def async_flyte_entity_call_handler(
entity: SupportsNodeCreation,
args,
kwargs,
) -> Union[Tuple[Promise], Promise, VoidPromise, Tuple, None]
This is a limited async version of the main call handler.
| Parameter |
Type |
Description |
entity |
SupportsNodeCreation |
|
args |
*args |
|
kwargs |
**kwargs |
|
def binding_data_from_python_std(
ctx: _flyte_context.FlyteContext,
expected_literal_type: _type_models.LiteralType,
t_value: Any,
t_value_type: typing.Type[T],
nodes: List[Node],
) -> _literals_models.BindingData
| Parameter |
Type |
Description |
ctx |
_flyte_context.FlyteContext |
|
expected_literal_type |
_type_models.LiteralType |
|
t_value |
Any |
|
t_value_type |
typing.Type[T] |
|
nodes |
List[Node] |
|
def binding_from_python_std(
ctx: _flyte_context.FlyteContext,
var_name: str,
expected_literal_type: _type_models.LiteralType,
t_value: Any,
t_value_type: type,
) -> Tuple[_literals_models.Binding, List[Node]]
| Parameter |
Type |
Description |
ctx |
_flyte_context.FlyteContext |
|
var_name |
str |
|
expected_literal_type |
_type_models.LiteralType |
|
t_value |
Any |
|
t_value_type |
type |
|
def create_and_link_node(
ctx: FlyteContext,
entity: SupportsNodeCreation,
overridden_interface: Optional[Interface],
add_node_to_compilation_state: bool,
node_id: str,
kwargs,
) -> Optional[Union[Tuple[Promise], Promise, VoidPromise]]
This method is used to generate a node with bindings within a flytekit workflow. this is useful to traverse the
workflow using regular python interpreter and generate nodes and promises whenever an execution is encountered
| Parameter |
Type |
Description |
ctx |
FlyteContext |
FlyteContext |
entity |
SupportsNodeCreation |
RemoteEntity |
overridden_interface |
Optional[Interface] |
utilize this interface instead of the one provided by the entity. This is useful for ArrayNode as there’s a mismatch between the underlying interface and inputs |
add_node_to_compilation_state |
bool |
bool that enables for nodes to be created but not linked to the workflow. This is useful when creating nodes nested under other nodes such as ArrayNode |
node_id |
str |
str if provided, this will be used as the node id. |
kwargs |
**kwargs |
Dict[str, Any] default inputs passed from the user to this entity. Can be promises. :return: Optional[Union[Tuple[Promise], Promise, VoidPromise]] |
def create_and_link_node_from_remote(
ctx: FlyteContext,
entity: HasFlyteInterface,
overridden_interface: Optional[_interface_models.TypedInterface],
add_node_to_compilation_state: bool,
node_id: str,
_inputs_not_allowed: Optional[Set[str]],
_ignorable_inputs: Optional[Set[str]],
kwargs,
) -> Optional[Union[Tuple[Promise], Promise, VoidPromise]]
This method is used to generate a node with bindings especially when using remote entities, like FlyteWorkflow,
FlyteTask and FlyteLaunchplan.
This method is kept separate from the similar named method create_and_link_node as remote entities have to be
handled differently. The major difference arises from the fact that the remote entities do not have a python
interface, so all comparisons need to happen using the Literals.
| Parameter |
Type |
Description |
ctx |
FlyteContext |
FlyteContext |
entity |
HasFlyteInterface |
RemoteEntity |
overridden_interface |
Optional[_interface_models.TypedInterface] |
utilize this interface instead of the one provided by the entity. This is useful for ArrayNode as there’s a mismatch between the underlying interface and inputs |
add_node_to_compilation_state |
bool |
bool that enables for nodes to be created but not linked to the workflow. This is useful when creating nodes nested under other nodes such as ArrayNode |
node_id |
str |
str if provided, this will be used as the node id. |
_inputs_not_allowed |
Optional[Set[str]] |
Set of all variable names that should not be provided when using this entity. Useful for Launchplans with fixed inputs |
_ignorable_inputs |
Optional[Set[str]] |
Set of all variable names that are optional, but if provided will be overridden. Useful for launchplans with default inputs |
kwargs |
**kwargs |
Dict[str, Any] default inputs passed from the user to this entity. Can be promises. :return: Optional[Union[Tuple[Promise], Promise, VoidPromise]] |
def create_native_named_tuple(
ctx: FlyteContext,
promises: Union[Tuple[Promise], Promise, VoidPromise, None],
entity_interface: Interface,
) -> Optional[Tuple]
Creates and returns a Named tuple with all variables that match the expected named outputs. this makes
it possible to run things locally and expect a more native behavior, i.e. address elements of a named tuple
by name.
| Parameter |
Type |
Description |
ctx |
FlyteContext |
|
promises |
Union[Tuple[Promise], Promise, VoidPromise, None] |
|
entity_interface |
Interface |
|
def create_task_output(
promises: Optional[Union[List[Promise], Promise]],
entity_interface: Optional[Interface],
) -> Optional[Union[Tuple[Promise], Promise]]
| Parameter |
Type |
Description |
promises |
Optional[Union[List[Promise], Promise]] |
|
entity_interface |
Optional[Interface] |
|
def extract_obj_name(
name: str,
) -> str
Generates a shortened name, without the module information. Useful for node-names etc. Only extracts the final
object information often separated by . in the python fully qualified notation
| Parameter |
Type |
Description |
name |
str |
|
def flyte_entity_call_handler(
entity: SupportsNodeCreation,
args,
kwargs,
) -> Union[Tuple[Promise], Promise, VoidPromise, Tuple, None]
This function is the call handler for tasks, workflows, and launch plans (which redirects to the underlying
workflow). The logic is the same for all three, but we did not want to create base class, hence this separate
method. When one of these entities is () aka called, there are three things we may do:
#. Compilation Mode - this happens when the function is called as part of a workflow (potentially
dynamic task?). Instead of running the user function, produce promise objects and create a node.
#. Workflow Execution Mode - when a workflow is being run locally. Even though workflows are functions
and everything should be able to be passed through naturally, we’ll want to wrap output values of the
function into objects, so that potential .with_cpu or other ancillary functions can be attached to do
nothing. Subsequent tasks will have to know how to unwrap these. If by chance a non-Flyte task uses a
task output as an input, things probably will fail pretty obviously.
#. Start a local execution - This means that we’re not already in a local workflow execution, which means that
we should expect inputs to be native Python values and that we should return Python native values.
| Parameter |
Type |
Description |
entity |
SupportsNodeCreation |
|
args |
*args |
|
kwargs |
**kwargs |
|
def get_primitive_val(
prim: Primitive,
) -> Any
| Parameter |
Type |
Description |
prim |
Primitive |
|
def resolve_attr_path_in_dict(
d: dict,
attr_path: List[Union[str, int]],
) -> Any
| Parameter |
Type |
Description |
d |
dict |
|
attr_path |
List[Union[str, int]] |
|
def resolve_attr_path_in_pb_struct(
st: _struct.Struct,
attr_path: List[Union[str, int]],
) -> Union[_struct.Struct, _struct.ListValue]
Resolves the protobuf struct (e.g. dataclass) with attribute path.
Note that the return type can be google.protobuf.struct_pb2.Struct or google.protobuf.struct_pb2.ListValue.
| Parameter |
Type |
Description |
st |
_struct.Struct |
|
attr_path |
List[Union[str, int]] |
|
def resolve_attr_path_in_promise(
p: Promise,
) -> Promise
resolve_attr_path_in_promise resolves the attribute path in a promise and returns a new promise with the resolved value
This is for local execution only. The remote execution will be resolved in flytepropeller.
| Parameter |
Type |
Description |
p |
Promise |
|
def resolve_attr_path_recursively(
v: Any,
) -> Any
This function resolves the attribute path in a nested structure recursively.
| Parameter |
Type |
Description |
v |
Any |
|
def to_binding(
p: Promise,
) -> _literals_models.Binding
| Parameter |
Type |
Description |
p |
Promise |
|
def translate_inputs_to_literals(
ctx: FlyteContext,
incoming_values: Dict[str, Any],
flyte_interface_types: Dict[str, _interface_models.Variable],
native_types: Dict[str, type],
) -> Dict[str, _literals_models.Literal]
The point of this function is to extract out Literals from a collection of either Python native values (which would
be converted into Flyte literals) or Promises (the literals in which would just get extracted).
When calling a task inside a workflow, a user might do something like this.
def my_wf(in1: int) -> int:
a = task_1(in1=in1)
b = task_2(in1=5, in2=a)
return b
If this is the case, when task_2 is called in local workflow execution, we’ll need to translate the Python native
literal 5 to a Flyte literal.
More interesting is this:
def my_wf(in1: int, in2: int) -> int:
a = task_1(in1=in1)
b = task_2(in1=5, in2=[a, in2])
return b
Here, in task_2, during execution we’d have a list of Promises. We have to make sure to give task2 a Flyte
LiteralCollection (Flyte’s name for list), not a Python list of Flyte literals.
This helper function is used both when sorting out inputs to a task, as well as outputs of a function.
| Parameter |
Type |
Description |
ctx |
FlyteContext |
Context needed in case a non-primitive literal needs to be translated to a Flyte literal (like a file) |
incoming_values |
Dict[str, Any] |
This is a map of your task’s input or wf’s output kwargs basically |
flyte_interface_types |
Dict[str, _interface_models.Variable] |
One side of an {{< py_class_ref flytekit.models.interface.TypedInterface >}} basically. |
native_types |
Dict[str, type] |
Map to native Python type. |
def translate_inputs_to_native(
ctx: FlyteContext,
incoming_values: Dict[str, Any],
flyte_interface_types: Dict[str, _interface_models.Variable],
) -> Dict[str, _literals_models.Literal]
| Parameter |
Type |
Description |
ctx |
FlyteContext |
|
incoming_values |
Dict[str, Any] |
|
flyte_interface_types |
Dict[str, _interface_models.Variable] |
|
ComparisonExpression refers to an expression of the form (lhs operator rhs), where lhs and rhs are operands
and operator can be any comparison expression like <, >, <=, >=, ==, !=
class ComparisonExpression(
lhs: Union['Promise', Any],
op: ComparisonOps,
rhs: Union['Promise', Any],
)
| Parameter |
Type |
Description |
lhs |
Union['Promise', Any] |
|
op |
ComparisonOps |
|
rhs |
Union['Promise', Any] |
|
| Property |
Type |
Description |
lhs |
None |
|
op |
None |
|
rhs |
None |
|
A Conjunction Expression is an expression of the form either (A and B) or (A or B).
where A, B are two expressions (comparison or conjunctions) and (and, or) are logical truth operators.
A conjunctionExpression evaluates to True or False depending on the logical operator and the truth values of
each of the expressions A & B
class ConjunctionExpression(
lhs: Union[ComparisonExpression, 'ConjunctionExpression'],
op: ConjunctionOps,
rhs: Union[ComparisonExpression, 'ConjunctionExpression'],
)
| Parameter |
Type |
Description |
lhs |
Union[ComparisonExpression, 'ConjunctionExpression'] |
|
op |
ConjunctionOps |
|
rhs |
Union[ComparisonExpression, 'ConjunctionExpression'] |
|
| Property |
Type |
Description |
lhs |
None |
|
op |
None |
|
rhs |
None |
|
protocol HasFlyteInterface()
| Property |
Type |
Description |
interface |
None |
|
name |
None |
|
def construct_node_metadata()
protocol LocallyExecutable()
def local_execute(
ctx: FlyteContext,
kwargs,
) -> Union[Tuple[Promise], Promise, VoidPromise, None]
| Parameter |
Type |
Description |
ctx |
FlyteContext |
|
kwargs |
**kwargs |
|
def local_execution_mode()
class NodeOutput(
node: Node,
var: str,
attr_path: Optional[List[Union[str, int]]],
)
| Parameter |
Type |
Description |
node |
Node |
|
var |
str |
The name of the variable this NodeOutput references |
attr_path |
Optional[List[Union[str, int]]] |
|
| Property |
Type |
Description |
attr_path |
None |
The attribute path the promise will be resolved with. :rtype: list[union[str, int]] |
is_empty |
None |
|
node |
None |
Return Node object. |
node_id |
None |
Override the underlying node_id property to refer to the Node’s id. This is to make sure that overriding node IDs from with_overrides gets serialized correctly. :rtype: Text |
var |
None |
Variable name must refer to an output variable for the node. :rtype: Text |
def from_flyte_idl(
pb2_object,
)
| Parameter |
Type |
Description |
pb2_object |
|
|
def serialize_to_string()
:rtype: Text
:rtype: flyteidl.core.types.OutputReference
def with_attr(
key,
) -> NodeOutput
| Parameter |
Type |
Description |
key |
|
|
This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like ::
@task
def t1() -> (int, str): ...
#. Handling the duality between compilation and local execution - when the task function is run in a local execution
mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as
part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that
point to that Node.
#. One needs to be able to call ::
x = t1().with_overrides(...)
If the task returns an integer or a (int, str) tuple like t1 above, calling with_overrides on the
result would throw an error. This Promise object adds that.
#. Assorted handling for conditionals.
class Promise(
var: str,
val: Union[NodeOutput, _literals_models.Literal],
type: typing.Optional[_type_models.LiteralType],
)
| Parameter |
Type |
Description |
var |
str |
|
val |
Union[NodeOutput, _literals_models.Literal] |
|
type |
typing.Optional[_type_models.LiteralType] |
|
| Property |
Type |
Description |
attr_path |
None |
The attribute path the promise will be resolved with. :rtype: List[Union[str, int]] |
is_ready |
None |
Returns if the Promise is READY (is not a reference and the val is actually ready) Usage :: p = Promise(…) … if p.is_ready(): print(p.val) else: print(p.ref) |
ref |
None |
If the promise is NOT READY / Incomplete, then it maps to the origin node that owns the promise |
val |
None |
If the promise is ready then this holds the actual evaluate value in Flyte’s type system |
var |
None |
Name of the variable bound with this promise |
def is_(
v: bool,
) -> ComparisonExpression
| Parameter |
Type |
Description |
v |
bool |
|
def with_overrides(
node_name: Optional[str],
aliases: Optional[Dict[str, str]],
requests: Optional[Resources],
limits: Optional[Resources],
timeout: Optional[Union[int, datetime.timedelta, object]],
retries: Optional[int],
interruptible: Optional[bool],
name: Optional[str],
task_config: Optional[Any],
container_image: Optional[str],
accelerator: Optional[BaseAccelerator],
cache: Optional[Union[bool, Cache]],
args,
kwargs,
)
| Parameter |
Type |
Description |
node_name |
Optional[str] |
|
aliases |
Optional[Dict[str, str]] |
|
requests |
Optional[Resources] |
|
limits |
Optional[Resources] |
|
timeout |
Optional[Union[int, datetime.timedelta, object]] |
|
retries |
Optional[int] |
|
interruptible |
Optional[bool] |
|
name |
Optional[str] |
|
task_config |
Optional[Any] |
|
container_image |
Optional[str] |
|
accelerator |
Optional[BaseAccelerator] |
|
cache |
Optional[Union[bool, Cache]] |
|
args |
*args |
|
kwargs |
**kwargs |
|
def with_var(
new_var: str,
) -> Promise
| Parameter |
Type |
Description |
new_var |
str |
|
protocol SupportsNodeCreation()
| Property |
Type |
Description |
name |
None |
|
python_interface |
None |
|
def construct_node_metadata()
This object is returned for tasks that do not return any outputs (declared interface is empty)
VoidPromise cannot be interacted with and does not allow comparisons or any operations
class VoidPromise(
task_name: str,
ref: Optional[NodeOutput],
)
| Parameter |
Type |
Description |
task_name |
str |
|
ref |
Optional[NodeOutput] |
|
| Property |
Type |
Description |
ref |
None |
|
task_name |
None |
|
def runs_before(
args,
kwargs,
)
This is a placeholder and should do nothing. It is only here to enable local execution of workflows
where a task returns nothing.
| Parameter |
Type |
Description |
args |
*args |
|
kwargs |
**kwargs |
|
def with_overrides(
args,
kwargs,
)
| Parameter |
Type |
Description |
args |
*args |
|
kwargs |
**kwargs |
|