Basic authoring#

class flytekit.task(_task_function=None, task_config=None, cache=False, cache_serialize=False, cache_version='', cache_ignore_input_vars=(), retries=0, interruptible=None, deprecated='', timeout=0, container_image=None, environment=None, requests=None, limits=None, secret_requests=None, execution_mode=ExecutionBehavior.DEFAULT, node_dependency_hints=None, task_resolver=None, docs=None, disable_deck=None, enable_deck=None, deck_fields=(DeckField.SOURCE_CODE, DeckField.DEPENDENCIES, DeckField.TIMELINE, DeckField.INPUT, DeckField.OUTPUT), pod_template=None, pod_template_name=None, accelerator=None)#

This is the core decorator to use for any task type in flytekit.

Tasks are the building blocks of Flyte. They represent users code. Tasks have the following properties

  • Versioned (usually tied to the git revision SHA1)

  • Strong interfaces (specified inputs and outputs)

  • Declarative

  • Independently executable

  • Unit testable

For a simple python task,

@task
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

For specific task types

@task(task_config=Spark(), retries=3)
def my_task(x: int, y: typing.Dict[str, str]) -> str:
    ...

Please see some cookbook task examples for additional information.

Parameters:
  • _task_function (Callable[[~P], FuncOut] | None) – This argument is implicitly passed and represents the decorated function

  • task_config (T | None) – This argument provides configuration for a specific task types. Please refer to the plugins documentation for the right object to use.

  • cache (bool) – Boolean that indicates if caching should be enabled

  • cache_serialize (bool) – Boolean that indicates if identical (ie. same inputs) instances of this task should be executed in serial when caching is enabled. This means that given multiple concurrent executions over identical inputs, only a single instance executes and the rest wait to reuse the cached results. This parameter does nothing without also setting the cache parameter.

  • cache_version (str) – Cache version to use. Changes to the task signature will automatically trigger a cache miss, but you can always manually update this field as well to force a cache miss. You should also manually bump this version if the function body/business logic has changed, but the signature hasn’t.

  • cache_ignore_input_vars (Tuple[str, ...]) – Input variables that should not be included when calculating hash for cache.

  • retries (int) – Number of times to retry this task during a workflow execution.

  • interruptible (bool | None) – [Optional] Boolean that indicates that this task can be interrupted and/or scheduled on nodes with lower QoS guarantees. This will directly reduce the $/execution cost associated, at the cost of performance penalties due to potential interruptions. Requires additional Union platform level configuration. If no value is provided, the task will inherit this attribute from its workflow, as follows: No values set for interruptible at the task or workflow level - task is not interruptible Task has interruptible=True, but workflow has no value set - task is interruptible Workflow has interruptible=True, but task has no value set - task is interruptible Workflow has interruptible=False, but task has interruptible=True - task is interruptible Workflow has interruptible=True, but task has interruptible=False - task is not interruptible

  • deprecated (str) – A string that can be used to provide a warning message for deprecated task. Absence / empty str indicates that the task is active and not deprecated

  • timeout (timedelta | int) – the max amount of time for which one execution of this task should be executed for. The execution will be terminated if the runtime exceeds the given timeout (approximately).

  • container_image (str | ImageSpec | None) –

    By default the configured FLYTE_INTERNAL_IMAGE is used for every task. This directive can be used to provide an alternate image for a specific task. This is useful for the cases in which images bloat because of various dependencies and a dependency is only required for this or a set of tasks, and they vary from the default.

    # Use default image name `fqn` and alter the tag to `tag-{{default.tag}}` tag of the default image
    # with a prefix. In this case, it is assumed that the image like
    # flytecookbook:tag-gitsha is published alongwith the default of flytecookbook:gitsha
    @task(container_image='{{.images.default.fqn}}:tag-{{images.default.tag}}')
    def foo():
        ...
    
    # Refer to configurations to configure fqns for other images besides default. In this case it will
    # lookup for an image named xyz
    @task(container_image='{{.images.xyz.fqn}}:{{images.default.tag}}')
    def foo2():
        ...
    

  • environment (Dict[str, str] | None) – Environment variables that should be added for this tasks execution

  • requests (Resources | None) – Specify compute resource requests for your task. For Pod-plugin tasks, these values will apply only to the primary container.

  • limits (Resources | None) – Compute limits. Specify compute resource limits for your task. For Pod-plugin tasks, these values will apply only to the primary container. For more information, please see flytekit.Resources.

  • secret_requests (List[Secret] | None) – Keys that can identify the secrets supplied at runtime. Ideally the secret keys should also be semi-descriptive. The key values will be available from runtime, if the backend is configured to provide secrets and if secrets are available in the configured secrets store. Possible options for secret stores are - Vault, Confidant, Kube secrets, AWS KMS etc Refer to Secret to understand how to specify the request for a secret. It may change based on the backend provider.

  • execution_mode (ExecutionBehavior) – This is mainly for internal use. Please ignore. It is filled in automatically.

  • node_dependency_hints (Iterable[PythonFunctionTask | LaunchPlan | WorkflowBase] | None) –

    A list of tasks, launchplans, or workflows that this task depends on. This is only for dynamic tasks/workflows, where flyte cannot automatically determine the dependencies prior to runtime. Even on dynamic tasks this is optional, but in some scenarios it will make registering the workflow easier, because it allows registration to be done the same as for static tasks/workflows.

    For example this is useful to run launchplans dynamically, because launchplans must be registered on flyteadmin before they can be run. Tasks and workflows do not have this requirement.

    @workflow
    def workflow0():
        ...
    
    launchplan0 = LaunchPlan.get_or_create(workflow0)
    
    # Specify node_dependency_hints so that launchplan0 will be registered on flyteadmin, despite this being a
    # dynamic task.
    @dynamic(node_dependency_hints=[launchplan0])
    def launch_dynamically():
        # To run a sub-launchplan it must have previously been registered on flyteadmin.
        return [launchplan0]*10
    

  • task_resolver (TaskResolverMixin | None) – Provide a custom task resolver.

  • disable_deck (bool | None) – (deprecated) If true, this task will not output deck html file

  • enable_deck (bool | None) – If true, this task will output deck html file

  • deck_fields (Tuple[DeckField, ...] | None) – If specified and enble_deck is True, this task will output deck html file with the fields specified in the tuple

  • docs (Documentation | None) – Documentation about this task

  • pod_template (PodTemplate | None) – Custom PodTemplate for this task.

  • pod_template_name (str | None) – The name of the existing PodTemplate resource which will be used in this task.

  • accelerator (BaseAccelerator | None) – The accelerator to use for this task.

Return type:

Callable[[~P], FuncOut] | Callable[[Callable[[~P], FuncOut]], PythonFunctionTask[T]] | PythonFunctionTask[T]

class flytekit.workflow(_workflow_function=None, failure_policy=None, interruptible=False, on_failure=None, docs=None)#

This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks.

Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is because while we can determine the entire structure of a task by looking at the function’s signature, workflows need to run through the function itself because the body of the function is what expresses the workflow structure. It’s also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on Flyte. That is, workflows should not call non-Flyte entities since they are only run once (again, this is with respect to the platform, local runs notwithstanding).

Example:

Again, users should keep in mind that even though the body of the function looks like regular Python, it is actually not. When flytekit scans the workflow function, the objects being passed around between the tasks are not your typical Python values. So even though you may have a task t1() -> int, when a = t1() is called, a will not be an integer so if you try to range(a) you’ll get an error.

Please see the user guide for more usage examples.

Parameters:
  • _workflow_function (Callable[[~P], FuncOut] | None) – This argument is implicitly passed and represents the decorated function.

  • failure_policy (WorkflowFailurePolicy | None) – Use the options in flytekit.WorkflowFailurePolicy

  • interruptible (bool) – Whether or not tasks launched from this workflow are by default interruptible

  • on_failure (WorkflowBase | Task | None) – Invoke this workflow or task on failure. The Workflow / task has to match the signature of the current workflow, with an additional parameter called error Error

  • docs (Documentation | None) – Description entity for the workflow

Return type:

Callable[[~P], FuncOut] | Callable[[Callable[[~P], FuncOut]], PythonFunctionWorkflow] | PythonFunctionWorkflow

class flytekit.kwtypes(**kwargs)#

This is a small helper function to convert the keyword arguments to an OrderedDict of types.

kwtypes(a=int, b=str)
Return type:

OrderedDict[str, Type]

class flytekit.current_context#

Use this method to get a handle of specific parameters available in a flyte task.

Usage

flytekit.current_context().logging.info(...)

Available params are documented in flytekit.core.context_manager.ExecutionParams. There are some special params, that should be available

Return type:

ExecutionParameters

class flytekit.ExecutionParameters(execution_date, tmp_dir, stats, execution_id, logging, raw_output_prefix, output_metadata_prefix=None, checkpoint=None, decks=None, task_id=None, **kwargs)#

This is a run-time user-centric context object that is accessible to every @task method. It can be accessed using

flytekit.current_context()

This object provides the following * a statsd handler * a logging handler * the execution ID as an flytekit.models.core.identifier.WorkflowExecutionIdentifier object * a working directory for the user to write arbitrary files to

Please do not confuse this object with the flytekit.FlyteContext object.

Parameters:
  • execution_id (Optional[_identifier.WorkflowExecutionIdentifier])

  • task_id (Optional[_identifier.Identifier])

class Builder(current: 'typing.Optional[ExecutionParameters]' = None)#
Parameters:

current (Optional[ExecutionParameters])

property decks: List#

A list of decks of the tasks, and it will be rendered to a html at the end of the task execution.

property execution_date: datetime#

This is a datetime representing the time at which a workflow was started. This is consistent across all tasks executed in a workflow or sub-workflow.

Note

Do NOT use this execution_date to drive any production logic. It might be useful as a tag for data to help in debugging.

property execution_id: WorkflowExecutionIdentifier#

This is the identifier of the workflow execution within the underlying engine. It will be consistent across all task executions in a workflow or sub-workflow execution.

Note

Do NOT use this execution_id to drive any production logic. This execution ID should only be used as a tag on output data to link back to the workflow run that created it.

get(key)#

Returns task specific context if present else raise an error. The returned context will match the key

Parameters:

key (str)

Return type:

Any

property logging: Logger#

A handle to a useful logging object. TODO: Usage examples

property stats: TaggableStats#

A handle to a special statsd object that provides usefully tagged stats. TODO: Usage examples and better comments

property task_id: Identifier | None#

At production run-time, this will be generated by reading environment variables that are set by the backend.

property working_directory: str#

A handle to a special working directory for easily producing temporary files. TODO: Usage examples

class flytekit.FlyteContext(file_access, level=0, flyte_client=None, compilation_state=None, execution_state=None, serialization_settings=None, in_a_condition=False, origin_stackframe=None, output_metadata_tracker=None)#

This is an internal-facing context object, that most users will not have to deal with. It’s essentially a globally available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and compile workflows, serialize Flyte entities, etc.

Even though this object as a current_context function on it, it should not be called directly. Please use the flytekit.FlyteContextManager object instead.

Please do not confuse this object with the flytekit.ExecutionParameters object.

Parameters:
  • file_access (FileAccessProvider)

  • level (int)

  • flyte_client (Optional['friendly_client.SynchronousFlyteClient'])

  • compilation_state (Optional[CompilationState])

  • execution_state (Optional[ExecutionState])

  • serialization_settings (Optional[SerializationSettings])

  • in_a_condition (bool)

  • origin_stackframe (Optional[traceback.FrameSummary])

  • output_metadata_tracker (Optional[OutputMetadataTracker])

class Builder(file_access: 'FileAccessProvider', level: 'int' = 0, compilation_state: 'Optional[CompilationState]' = None, execution_state: 'Optional[ExecutionState]' = None, flyte_client: "Optional['friendly_client.SynchronousFlyteClient']" = None, serialization_settings: 'Optional[SerializationSettings]' = None, in_a_condition: 'bool' = False, output_metadata_tracker: 'Optional[OutputMetadataTracker]' = None)#
Parameters:
  • file_access (FileAccessProvider)

  • level (int)

  • compilation_state (Optional[CompilationState])

  • execution_state (Optional[ExecutionState])

  • flyte_client (Optional['friendly_client.SynchronousFlyteClient'])

  • serialization_settings (Optional[SerializationSettings])

  • in_a_condition (bool)

  • output_metadata_tracker (Optional[OutputMetadataTracker])

enter_conditional_section()#

Used by the condition block to indicate that a new conditional section has been started.

Return type:

Builder

new_compilation_state(prefix='')#

Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state

Parameters:

prefix (str)

Return type:

CompilationState

new_execution_state(working_dir=None)#

Creates and returns a new default execution state. This should be used at the entrypoint of execution, in all other cases it is preferable to use with_execution_state

Parameters:

working_dir (PathLike | str | None)

Return type:

ExecutionState

static current_context()#

This method exists only to maintain backwards compatibility. Please use FlyteContextManager.current_context() instead.

Users of flytekit should be wary not to confuse the object returned from this function with flytekit.current_context()

Return type:

FlyteContext

get_deck()#

Returns the deck that was created as part of the last execution.

The return value depends on the execution environment. In a notebook, the return value is compatible with IPython.display and should be rendered in the notebook.

with flytekit.new_context() as ctx:
    my_task(...)
ctx.get_deck()

OR if you wish to explicitly display

from IPython import display
display(ctx.get_deck())
Return type:

Union[str, ‘IPython.core.display.HTML’]

new_compilation_state(prefix='')#

Creates and returns a default compilation state. For most of the code this should be the entrypoint of compilation, otherwise the code should always uses - with_compilation_state

Parameters:

prefix (str)

Return type:

CompilationState

new_execution_state(working_dir=None)#

Creates and returns a new default execution state. This should be used at the entrypoint of execution, in all other cases it is preferable to use with_execution_state

Parameters:

working_dir (PathLike | None)

Return type:

ExecutionState

class flytekit.core.array_node_map_task.map_task(target, concurrency=None, min_successes=None, min_success_ratio=1.0, **kwargs)#

Wrapper that creates a map task utilizing either the existing ArrayNodeMapTask or the drop in replacement ArrayNode implementation

Parameters:
  • target (LaunchPlan | PythonFunctionTask) – The Flyte entity of which will be mapped over

  • concurrency (int | None) – If specified, this limits the number of mapped tasks than can run in parallel to the given batch size. If the size of the input exceeds the concurrency value, then multiple batches will be run serially until all inputs are processed. If set to 0, this means unbounded concurrency. If left unspecified, this means the array node will inherit parallelism from the workflow

  • min_successes (int | None) – The minimum number of successful executions

  • min_success_ratio (float) – The minimum ratio of successful executions

class flytekit.core.workflow.ImperativeWorkflow(name, failure_policy=None, interruptible=False)#

An imperative workflow is a programmatic analogue to the typical @workflow function-based workflow and is better suited to programmatic applications.

Assuming you have some tasks like so

You could create a workflow imperatively like so

This workflow would be identical on the back-end to

Note that the only reason we need the NamedTuple is so we can name the output the same thing as in the imperative example. The imperative paradigm makes the naming of workflow outputs easier, but this isn’t a big deal in function-workflows because names tend to not be necessary.

Parameters:
add_entity(entity, **kwargs)#

Anytime you add an entity, all the inputs to the entity must be bound.

Parameters:

entity (PythonTask | LaunchPlan | WorkflowBase)

Return type:

Node

add_workflow_input(input_name, python_type)#

Adds an input to the workflow.

Parameters:
  • input_name (str)

  • python_type (Type)

Return type:

Promise

add_workflow_output(output_name, p, python_type=None)#

Add an output with the given name from the given node output.

Parameters:
property compilation_state: CompilationState#

Compilation is done a bit at a time, one task or other entity call at a time. This is why this workflow class has to keep track of its own compilation state.

execute(**kwargs)#

Called by local_execute. This function is how local execution for imperative workflows runs. Because when an entity is added using the add_entity function, all inputs to that entity should’ve been already declared, we can just iterate through the nodes in order and we shouldn’t run into any dependency issues. That is, we force the user to declare entities already in a topological sort. To keep track of outputs, we create a map to start things off, filled in only with the workflow inputs (if any). As things are run, their outputs are stored in this map. After all nodes are run, we fill in workflow level outputs the same way as any other previous node.

property inputs: Dict[str, Promise]#

This holds the input promises to the workflow. The nodes in these Promise objects should always point to the global start node.

ready()#
This function returns whether or not the workflow is in a ready state, which means
  • Has at least one node

  • All workflow inputs are bound

These conditions assume that all nodes and workflow i/o changes were done with the functions above, which do additional checking.

Return type:

bool

class flytekit.core.node_creation.create_node(entity, *args, **kwargs)#

This is the function you want to call if you need to specify dependencies between tasks that don’t consume and/or don’t produce outputs. For example, if you have t1() and t2(), both of which do not take in nor produce any outputs, how do you specify that t2 should run before t1?

t1_node = create_node(t1)
t2_node = create_node(t2)

t2_node.runs_before(t1_node)
# OR
t2_node >> t1_node

This works for tasks that take inputs as well, say a t3(in1: int)

t3_node = create_node(t3, in1=some_int)  # basically calling t3(in1=some_int)

You can still use this method to handle setting certain overrides

t3_node = create_node(t3, in1=some_int).with_overrides(...)

Outputs, if there are any, will be accessible. A t4() -> (int, str)

t4_node = create_node(t4)
In compilation node.o0 has the promise. ::

t5(in1=t4_node.o0)

If t1 produces only one output, note that in local execution, you still get a wrapper object that needs to be dereferenced by the output name.

t1_node = create_node(t1)
t2(t1_node.o0)
Parameters:

entity (Union[PythonTask, LaunchPlan, WorkflowBase, RemoteEntity])

Return type:

Union[Node, VoidPromise]

class flytekit.core.promise.NodeOutput(node, var, attr_path=None)#
Parameters:
  • node (Node)

  • var (str)

  • attr_path (Optional[List[Union[str, int]]])

property node: Node#

Return Node object.

property node_id#

Override the underlying node_id property to refer to the Node’s id. This is to make sure that overriding node IDs from with_overrides gets serialized correctly. :rtype: Text

class flytekit.FlyteContextManager#

FlyteContextManager manages the execution context within Flytekit. It holds global state of either compilation or Execution. It is not thread-safe and can only be run as a single threaded application currently. Context’s within Flytekit is useful to manage compilation state and execution state. Refer to CompilationState and ExecutionState for more information. FlyteContextManager provides a singleton stack to manage these contexts.

Typical usage is

FlyteContextManager.initialize()
with FlyteContextManager.with_context(o) as ctx:
  pass

# If required - not recommended you can use
FlyteContextManager.push_context()
# but correspondingly a pop_context should be called
FlyteContextManager.pop_context()
static initialize()#

Re-initializes the context and erases the entire context