Extending Flytekit#

This package contains useful classes and methods for extending Flytekit.

class flytekit.extend.get_serializable(entity_mapping, settings, entity, options=None)#

The flytekit authoring code produces objects representing Flyte entities (tasks, workflows, etc.). In order to register these, they need to be converted into objects that Union Admin understands (the IDL objects basically, but this function currently translates to the layer above (e.g. SdkTask) - this will be changed to the IDL objects directly in the future).

Parameters:
  • entity_mapping (OrderedDict) – This is an ordered dict that will be mutated in place. The reason this argument exists is because there is a natural ordering to the entities at registration time. That is, underlying tasks have to be registered before the workflows that use them. The recursive search done by this function and the functions above form a natural topological sort, finding the dependent entities and adding them to this parameter before the parent entity this function is called with.

  • settings (SerializationSettings) – used to pick up project/domain/name - to be deprecated.

  • entity (PythonTask | BranchNode | Node | LaunchPlan | WorkflowBase | ReferenceWorkflow | ReferenceTask | ReferenceLaunchPlan | ReferenceEntity | ArrayNode) – The local flyte entity to try to convert (along with its dependencies)

  • options (Options | None) – Optionally pass in a set of options that can be used to add additional metadata for Launchplans

Returns:

The resulting control plane entity, in addition to being added to the mutable entity_mapping parameter is also returned.

Return type:

TaskSpec | LaunchPlan | WorkflowSpec | Node | BranchNode | ArrayNode

flytekit.extend.context_manager#

alias of <module ‘flytekit.core.context_manager’ from ‘/opt/buildhome/repo/.venv/lib/python3.11/site-packages/flytekit/core/context_manager.py’>

class flytekit.extend.IgnoreOutputs#

This exception should be used to indicate that the outputs generated by this can be safely ignored. This is useful in case of distributed training or peer-to-peer parallel algorithms.

class flytekit.extend.ExecutionState(working_dir, mode=None, engine_dir=None, branch_eval_mode=None, user_space_params=None)#

This is the context that is active when executing a task or a local workflow. This carries the necessary state to execute. Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the user etc.

Attributes:

mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc). working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs

are uploaded

engine_dir (os.PathLike): branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute. user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd

handler, a logging handler, the current execution id and a working directory.

Parameters:
  • working_dir (PathLike | str)

  • mode (Mode | None)

  • engine_dir (PathLike | str | None)

  • branch_eval_mode (BranchEvalMode | None)

  • user_space_params (ExecutionParameters | None)

class Mode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#

Defines the possible execution modes, which in turn affects execution behavior.

branch_complete()#

Indicates that we are within a conditional / ifelse block and the active branch is not done. Default to SKIPPED

take_branch()#

Indicates that we are within an if-else block and the current branch has evaluated to true. Useful only in local execution mode

with_params(working_dir=None, mode=None, engine_dir=None, branch_eval_mode=None, user_space_params=None)#

Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.

Parameters:
  • working_dir (Optional[os.PathLike])

  • mode (Optional[Mode])

  • engine_dir (Optional[os.PathLike])

  • branch_eval_mode (Optional[BranchEvalMode])

  • user_space_params (Optional[ExecutionParameters])

Return type:

ExecutionState

class flytekit.extend.ImageConfig(default_image=None, images=None)#

We recommend you to use ImageConfig.auto(img_name=None) to create an ImageConfig. For example, ImageConfig.auto(img_name=””ghcr.io/flyteorg/flytecookbook:v1.0.0””) will create an ImageConfig.

ImageConfig holds available images which can be used at registration time. A default image can be specified along with optional additional images. Each image in the config must have a unique name.

Attributes:

default_image (Optional[Image]): The default image to be used as a container for task serialization. images (List[Image]): Optional, additional images which can be used in task container definitions.

Parameters:
  • default_image (Image | None)

  • images (List[Image] | None)

classmethod auto(config_file=None, img_name=None)#

Reads from config file or from img_name Note that this function does not take into account the flytekit default images (see the Dockerfiles at the base of this repo). To pick those up, see the auto_default_image function..

Parameters:
  • config_file (str | ConfigFile | None)

  • img_name (str | None)

Returns:

Return type:

ImageConfig

find_image(name)#

Return an image, by name, if it exists.

Return type:

Image | None

classmethod from_images(default_image, m=None)#

Allows you to programmatically create an ImageConfig. Usually only the default_image is required, unless your workflow uses multiple images

ImageConfig.from_dict(
    "ghcr.io/flyteorg/flytecookbook:v1.0.0",
     {
          "spark": "ghcr.io/flyteorg/myspark:...",
          "other": "...",
     }
)
Returns:

Parameters:
  • default_image (str)

  • m (Dict[str, str] | None)

static validate_image(_, param, values)#

Validates the image to match the standard format. Also validates that only one default image is provided. a default image, is one that is specified as default=<image_uri> or just <image_uri>. All other images should be provided with a name, in the format name=<image_uri> This method can be used with the CLI

Parameters:
  • _ (Any) – click argument, ignored here.

  • param (str) – the click argument, here should be “image”

  • values (tuple) – user-supplied images

Returns:

Return type:

ImageConfig

class flytekit.extend.Interface(inputs=None, outputs=None, output_tuple_name=None, docstring=None)#

A Python native interface object, like inspect.signature but simpler.

Parameters:
  • inputs (Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]])

  • outputs (Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]])

  • output_tuple_name (Optional[str])

  • docstring (Optional[Docstring])

remove_inputs(vars)#

This method is useful in removing some variables from the Union backend inputs specification, as these are implicit local only inputs or will be supplied by the library at runtime. For example, spark-session etc It creates a new instance of interface with the requested variables removed

Parameters:

vars (List[str] | None)

Return type:

Interface

with_inputs(extra_inputs)#

Use this to add additional inputs to the interface. This is useful for adding additional implicit inputs that are added without the user requesting for them

Parameters:

extra_inputs (Dict[str, Type])

Return type:

Interface

with_outputs(extra_outputs)#

This method allows addition of extra outputs are expected from a task specification

Parameters:

extra_outputs (Dict[str, Type])

Return type:

Interface

class flytekit.extend.Promise(var, val, type=None)#

This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like

@task
def t1() -> (int, str): ...
  1. Handling the duality between compilation and local execution - when the task function is run in a local execution mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that point to that Node.

  2. One needs to be able to call

    x = t1().with_overrides(...)
    

    If the task returns an integer or a (int, str) tuple like t1 above, calling with_overrides on the result would throw an error. This Promise object adds that.

  3. Assorted handling for conditionals.

Parameters:
  • var (str)

  • val (Union[NodeOutput, _literals_models.Literal])

  • type (Optional[_type_models.LiteralType])

property attr_path: List[str | int]#

The attribute path the promise will be resolved with. :rtype: List[Union[str, int]]

property is_ready: bool#

Returns if the Promise is READY (is not a reference and the val is actually ready)

Usage

p = Promise(...)
...
if p.is_ready():
     print(p.val)
else:
     print(p.ref)
property ref: NodeOutput#

If the promise is NOT READY / Incomplete, then it maps to the origin node that owns the promise

property val: Literal#

If the promise is ready then this holds the actual evaluate value in Flyte’s type system

property var: str#

Name of the variable bound with this promise

class flytekit.extend.DictTransformer#

Transformer that transforms a univariate dictionary Dict[str, T] to a Literal Map or transforms a untyped dictionary to a JSON (struct/Generic)

static dict_to_generic_literal(ctx, v, allow_pickle)#

Creates a flyte-specific Literal value from a native python dictionary.

Parameters:
Return type:

Literal

get_literal_type(t)#

Transforms a native python dictionary to a flyte-specific LiteralType

Parameters:

t (Type[dict])

Return type:

LiteralType

guess_python_type(literal_type)#

Converts the Flyte LiteralType to a python object type.

Parameters:

literal_type (LiteralType)

Return type:

Union[Type[dict], Dict[Type, Type]]

to_literal(ctx, python_val, python_type, expected)#

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch :param ctx: A FlyteContext, useful in accessing the filesystem and other attributes :param python_val: The actual value to be transformed :param python_type: The assumed type of the value (this matches the declared type on the function) :param expected: Expected Literal Type

Parameters:
Return type:

Literal

to_python_value(ctx, lv, expected_python_type)#

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised :param ctx: FlyteContext :param lv: The received literal Value :param expected_python_type: Expected native python type that should be returned

Parameters:
Return type:

dict

class flytekit.extend.T#

alias of TypeVar(‘T’)

class flytekit.extend.TypeEngine#

Core Extensible TypeEngine of Flytekit. This should be used to extend the capabilities of FlyteKits type system. Users can implement their own TypeTransformers and register them with the TypeEngine. This will allow special handling of user objects

classmethod dict_to_literal_map(ctx, d, type_hints=None)#

Given a dictionary mapping string keys to python values and a dictionary containing guessed types for such string keys, convert to a LiteralMap.

Parameters:
  • ctx (FlyteContext)

  • d (Dict[str, Any])

  • type_hints (Dict[str, type] | None)

Return type:

LiteralMap

classmethod get_available_transformers()#

Returns all python types for which transformers are available

Return type:

KeysView[Type]

classmethod get_transformer(python_type)#

The TypeEngine hierarchy for flyteKit. This method looksup and selects the type transformer. The algorithm is as follows

d = dictionary of registered transformers, where is a python type v = lookup type

Step 1:

If the type is annotated with a TypeTransformer instance, use that.

Step 2:

find a transformer that matches v exactly

Step 3:

find a transformer that matches the generic type of v. e.g List[int], Dict[str, int] etc

Step 4:

Walk the inheritance hierarchy of v and find a transformer that matches the first base class. This is potentially non-deterministic - will depend on the registration pattern.

Special case:

If v inherits from Enum, use the Enum transformer even if Enum is not the first base class.

TODO lets make this deterministic by using an ordered dict

Step 5:

if v is of type data class, use the dataclass transformer

Step 6:

Pickle transformer is used

Parameters:

python_type (Type)

Return type:

TypeTransformer[T]

classmethod guess_python_type(flyte_type)#

Transforms a flyte-specific LiteralType to a regular python value.

Parameters:

flyte_type (LiteralType)

Return type:

type

classmethod guess_python_types(flyte_variable_dict)#

Transforms a dictionary of flyte-specific Variable objects to a dictionary of regular python values.

Parameters:

flyte_variable_dict (Dict[str, Variable])

Return type:

Dict[str, type]

classmethod lazy_import_transformers()#

Only load the transformers if needed.

classmethod literal_map_to_kwargs(ctx, lm, python_types=None, literal_types=None)#

Given a LiteralMap (usually an input into a task - intermediate), convert to kwargs for the task

Parameters:
  • ctx (FlyteContext)

  • lm (LiteralMap)

  • python_types (Dict[str, type] | None)

  • literal_types (Dict[str, Variable] | None)

Return type:

Dict[str, Any]

classmethod named_tuple_to_variable_map(t)#

Converts a python-native NamedTuple to a flyte-specific VariableMap of named literals.

Parameters:

t (NamedTuple)

Return type:

VariableMap

classmethod register(transformer, additional_types=None)#

This should be used for all types that respond with the right type annotation when you use type(…) function

Parameters:
classmethod to_literal(ctx, python_val, python_type, expected)#

Converts a python value of a given type and expected LiteralType into a resolved Literal value.

Parameters:
Return type:

Literal

classmethod to_literal_type(python_type)#

Converts a python type into a flyte specific LiteralType

Parameters:

python_type (Type)

Return type:

LiteralType

classmethod to_python_value(ctx, lv, expected_python_type)#

Converts a Literal value with an expected python type into a python value.

Parameters:
Return type:

Any

class flytekit.extend.TypeTransformer(name, t, enable_type_assertions=True)#

Base transformer type that should be implemented for every python native type that can be handled by flytekit

Parameters:
  • name (str)

  • t (Type[T])

  • enable_type_assertions (bool)

abstract get_literal_type(t)#

Converts the python type to a Flyte LiteralType

Parameters:

t (Type[T])

Return type:

LiteralType

guess_python_type(literal_type)#

Converts the Flyte LiteralType to a python object type.

Parameters:

literal_type (LiteralType)

Return type:

Type[T]

property python_type: Type[T]#

This returns the python type

to_html(ctx, python_val, expected_python_type)#

Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div

Parameters:
  • ctx (FlyteContext)

  • python_val (T)

  • expected_python_type (Type[T])

Return type:

str

abstract to_literal(ctx, python_val, python_type, expected)#

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch :param ctx: A FlyteContext, useful in accessing the filesystem and other attributes :param python_val: The actual value to be transformed :param python_type: The assumed type of the value (this matches the declared type on the function) :param expected: Expected Literal Type

Parameters:
Return type:

Literal

abstract to_python_value(ctx, lv, expected_python_type)#

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised :param ctx: FlyteContext :param lv: The received literal Value :param expected_python_type: Expected native python type that should be returned

Parameters:
Return type:

T | None

property type_assertions_enabled: bool#

Indicates if the transformer wants type assertions to be enabled at the core type engine layer

class flytekit.extend.PythonCustomizedContainerTask(*args, **kwargs)#

Please take a look at the comments for :py:class`flytekit.extend.ExecutableTemplateShimTask` as well. This class should be subclassed and a custom Executor provided as a default to this parent class constructor when building a new external-container flytekit-only plugin.

This class provides authors of new task types the basic scaffolding to create task-template based tasks. In order to write such a task, authors need to

  • subclass the ShimTaskExecutor class and override the execute_from_model function. This function is where all the business logic should go. Keep in mind though that you, the plugin author, will not have access to anything that’s not serialized within the TaskTemplate which is why you’ll also need to

  • subclass this class, and override the get_custom function to include all the information the executor will need to run.

  • Also pass the executor you created as the executor_type argument of this class’s constructor.

Keep in mind that the total size of the TaskTemplate still needs to be small, since these will be accessed frequently by the Flyte engine.

get_config(settings)#

Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.

Parameters:

settings (SerializationSettings)

Return type:

Dict[str, str]

get_container(settings)#

Returns the container definition (if any) that is used to run the task on hosted Flyte.

Parameters:

settings (SerializationSettings)

Return type:

Container

get_custom(settings)#

Return additional plugin-specific custom data (if any) as a serializable dictionary.

Parameters:

settings (SerializationSettings)

Return type:

Dict[str, Any]

property task_template: TaskTemplate | None#

Override the base class implementation to serialize on first call.

class flytekit.extend.ExecutableTemplateShimTask(tt, executor_type, *args, **kwargs)#

The canonical @task decorated Python function task is pretty simple to reason about. At execution time (either locally or on a Flyte cluster), the function runs.

This class, along with the ShimTaskExecutor class below, represents another execution pattern. This pattern, has two components:

  • The TaskTemplate, or something like it like a FlyteTask.

  • An executor, which can use information from the task template (including the custom field)

Basically at execution time (both locally and on a Flyte cluster), the task template is given to the executor, which is responsible for computing and returning the results.

Note

The interface at execution time will have to derived from the Flyte IDL interface, which means it may be lossy. This is because when a task is serialized from Python into the TaskTemplate some information is lost because Flyte IDL can’t keep track of every single Python type (or Java type if writing in the Java flytekit).

This class also implements the dispatch_execute and execute functions to make it look like a PythonTask that the entrypoint.py can execute, even though this class doesn’t inherit from PythonTask.

Parameters:
dispatch_execute(ctx, input_literal_map)#

This function is largely similar to the base PythonTask, with the exception that we have to infer the Python interface before executing. Also, we refer to self.task_template rather than just self similar to task classes that derive from the base PythonTask.

Parameters:
Return type:

LiteralMap | DynamicJobSpec

execute(**kwargs)#

Rather than running here, send everything to the executor.

Return type:

Any

property name: str#

Return the name of the underlying task.

post_execute(_, rval)#

This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.

Parameters:
Return type:

Any

pre_execute(user_params)#

This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.

Parameters:

user_params (ExecutionParameters | None)

Return type:

ExecutionParameters | None

class flytekit.extend.ShimTaskExecutor(*args, **kwargs)#
execute_from_model(tt, **kwargs)#

This function must be overridden and is where all the business logic for running a task should live. Keep in mind that you’re only working with the TaskTemplate. You won’t have access to any information in the task that wasn’t serialized into the template.

Parameters:
  • tt (TaskTemplate) – This is the template, the serialized form of the task.

  • kwargs – These are the Python native input values to the task.

Returns:

Python native output values from the task.

Return type:

Any