Core task types#

class flytekit.SQLTask(*args, **kwargs)#

Base task types for all SQL tasks. See flytekit.extras.sqlite3.task.SQLite3Task and flytekitplugins.athena.task.AthenaTask for examples of how to use it as a base class.

class SQLite3Task(*args, **kwargs)

Run client side SQLite3 queries that optionally return a FlyteSchema object.

Note

This is a pre-built container task. That is, your user container will not be used at task execution time. Instead the image defined in this task definition will be used instead.

See the integrations guide for additional usage examples and the base class flytekit.extend.PythonCustomizedContainerTask as well.

execute(**kwargs)#

This method will be invoked to execute the task.

Return type:

Any

classmethod interpolate_query(query_template, **kwargs)#

This function will fill in the query template with the provided kwargs and return the interpolated query. Please note that when SQL tasks run in Flyte, this step is done by the task executor.

Return type:

Any

class flytekit.ContainerTask(*args, **kwargs)#

This is an intermediate class that represents Flyte Tasks that run a container at execution time. This is the vast majority of tasks - the typical @task decorated tasks for instance all run a container. An example of something that doesn’t run a container would be something like the Athena SQL task.

class IOStrategy(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
class MetadataFormat(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
execute(**kwargs)#

This method will be invoked to execute the task.

Return type:

LiteralMap

get_config(settings)#

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameters:

settings (SerializationSettings)

Return type:

Dict[str, str] | None

get_container(settings)#

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameters:

settings (SerializationSettings)

Return type:

Container

get_k8s_pod(settings)#

Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte.

Parameters:

settings (SerializationSettings)

Return type:

K8sPod

class flytekit.PythonFunctionTask(*args, **kwargs)#

A Python Function task should be used as the base for all extensions that have a python function. It will automatically detect interface of the python function and when serialized on the hosted Union platform handles the writing execution command to execute the function

It is advised this task is used using the @task decorator as follows

In the above code, the name of the function, the module, and the interface (inputs = int and outputs = str) will be auto detected.

class ExecutionBehavior(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
compile_into_workflow(ctx, task_function, **kwargs)#

In the case of dynamic workflows, this function will produce a workflow definition at execution time which will then proceed to be executed.

Parameters:
Return type:

DynamicJobSpec | LiteralMap

dynamic_execute(task_function, **kwargs)#

By the time this function is invoked, the local_execute function should have unwrapped the Promises and Flyte literal wrappers so that the kwargs we are working with here are now Python native literal values. This function is also expected to return Python native literal values.

Since the user code within a dynamic task constitute a workflow, we have to first compile the workflow, and then execute that workflow.

When running for real in production, the task would stop after the compilation step, and then create a file representing that newly generated workflow, instead of executing it.

Parameters:

task_function (Callable)

Return type:

Any

execute(**kwargs)#

This method will be invoked to execute the task. If you do decide to override this method you must also handle dynamic tasks or you will no longer be able to use the task as a dynamic task generator.

Return type:

Any

property name: str#

Returns the name of the task.

class flytekit.PythonInstanceTask(*args, **kwargs)#

This class should be used as the base class for all Tasks that do not have a user defined function body, but have a platform defined execute method. (Execute needs to be overridden). This base class ensures that the module loader will invoke the right class automatically, by capturing the module name and variable in the module name.

class flytekit.LaunchPlan(name, workflow, parameters, fixed_inputs, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, trigger=None, overwrite_cache=None)#

Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the core concepts if you are unfamiliar with them.

Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow

@workflow
def wf(a: int, c: str) -> str:
    ...

Create the default launch plan with

LaunchPlan.get_or_create(workflow=my_wf)

If you specify additional parameters, you’ll also have to give the launch plan a unique name. Default and fixed inputs can be expressed as Python native values like so:

Additionally, a launch plan can be configured to run on a schedule and emit notifications.

Please see the relevant Schedule and Notification objects as well.

To configure the remaining parameters, you’ll need to import the relevant model objects as well.

from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig

Then use as follows

Parameters:
  • name (str)

  • workflow (_annotated_workflow.WorkflowBase)

  • parameters (_interface_models.ParameterMap)

  • fixed_inputs (_literal_models.LiteralMap)

  • schedule (Optional[_schedule_model.Schedule])

  • notifications (Optional[List[_common_models.Notification]])

  • labels (Optional[_common_models.Labels])

  • annotations (Optional[_common_models.Annotations])

  • raw_output_data_config (Optional[_common_models.RawOutputDataConfig])

  • max_parallelism (Optional[int])

  • security_context (Optional[security.SecurityContext])

  • trigger (Optional[LaunchPlanTriggerBase])

  • overwrite_cache (Optional[bool])

static get_default_launch_plan(ctx, workflow)#

Users should probably call the get_or_create function defined below instead. A default launch plan is the one that will just pick up whatever default values are defined in the workflow function signature (if any) and use the default auth information supplied during serialization, with no notifications or schedules.

Parameters:
  • ctx (FlyteContext) – This is not flytekit.current_context(). This is an internal context object. Users familiar with flytekit should feel free to use this however.

  • workflow (WorkflowBase) – The workflow to create a launch plan for.

Return type:

LaunchPlan

classmethod get_or_create(workflow, name=None, default_inputs=None, fixed_inputs=None, schedule=None, notifications=None, labels=None, annotations=None, raw_output_data_config=None, max_parallelism=None, security_context=None, auth_role=None, trigger=None, overwrite_cache=None)#

This function offers a friendlier interface for creating launch plans. If the name for the launch plan is not supplied, this assumes you are looking for the default launch plan for the workflow. If it is specified, it will be used. If creating the default launch plan, none of the other arguments may be specified.

The resulting launch plan is also cached and if called again with the same name, the cached version is returned

Parameters:
  • security_context (SecurityContext | None) – Security context for the execution

  • workflow (WorkflowBase) – The Workflow to create a launch plan for.

  • name (str | None) – If you supply a name, keep it mind it needs to be unique. That is, project, domain, version, and this name form a primary key. If you do not supply a name, this function will assume you want the default launch plan for the given workflow.

  • default_inputs (Dict[str, Any] | None) – Default inputs, expressed as Python values.

  • fixed_inputs (Dict[str, Any] | None) – Fixed inputs, expressed as Python values. At call time, these cannot be changed.

  • schedule (Schedule | None) – Optional schedule to run on.

  • notifications (List[Notification] | None) – Notifications to send.

  • labels (Labels | None) – Optional labels to attach to executions created by this launch plan.

  • annotations (Annotations | None) – Optional annotations to attach to executions created by this launch plan.

  • raw_output_data_config (RawOutputDataConfig | None) – Optional location of offloaded data for things like S3, etc.

  • auth_role (AuthRole | None) – Add an auth role if necessary.

  • max_parallelism (int | None) – Controls the maximum number of tasknodes that can be run in parallel for the entire workflow. This is useful to achieve fairness. Note: MapTasks are regarded as one unit, and parallelism/concurrency of MapTasks is independent from this.

  • trigger (LaunchPlanTriggerBase | None) – [alpha] This is a new syntax for specifying schedules.

  • overwrite_cache (bool | None)

Return type:

LaunchPlan