Extending Flytekit#
This package contains useful classes and methods for extending Flytekit.
- class flytekit.extend.get_serializable(entity_mapping, settings, entity, options=None)#
The flytekit authoring code produces objects representing Flyte entities (tasks, workflows, etc.). In order to register these, they need to be converted into objects that Union Admin understands (the IDL objects basically, but this function currently translates to the layer above (e.g. SdkTask) - this will be changed to the IDL objects directly in the future).
- Parameters:
entity_mapping (OrderedDict) – This is an ordered dict that will be mutated in place. The reason this argument exists is because there is a natural ordering to the entities at registration time. That is, underlying tasks have to be registered before the workflows that use them. The recursive search done by this function and the functions above form a natural topological sort, finding the dependent entities and adding them to this parameter before the parent entity this function is called with.
settings (SerializationSettings) – used to pick up project/domain/name - to be deprecated.
entity (PythonTask | BranchNode | Node | LaunchPlan | WorkflowBase | ReferenceWorkflow | ReferenceTask | ReferenceLaunchPlan | ReferenceEntity | ArrayNode) – The local flyte entity to try to convert (along with its dependencies)
options (Options | None) – Optionally pass in a set of options that can be used to add additional metadata for Launchplans
- Returns:
The resulting control plane entity, in addition to being added to the mutable entity_mapping parameter is also returned.
- Return type:
TaskSpec | LaunchPlan | WorkflowSpec | Node | BranchNode | ArrayNode
- flytekit.extend.context_manager#
alias of <module ‘flytekit.core.context_manager’ from ‘/opt/buildhome/repo/.venv/lib/python3.11/site-packages/flytekit/core/context_manager.py’>
- class flytekit.extend.IgnoreOutputs#
This exception should be used to indicate that the outputs generated by this can be safely ignored. This is useful in case of distributed training or peer-to-peer parallel algorithms.
- class flytekit.extend.ExecutionState(working_dir, mode=None, engine_dir=None, branch_eval_mode=None, user_space_params=None)#
This is the context that is active when executing a task or a local workflow. This carries the necessary state to execute. Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the user etc.
- Attributes:
mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc). working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs
are uploaded
engine_dir (os.PathLike): branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute. user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd
handler, a logging handler, the current execution id and a working directory.
- Parameters:
working_dir (PathLike | str)
mode (Mode | None)
engine_dir (PathLike | str | None)
branch_eval_mode (BranchEvalMode | None)
user_space_params (ExecutionParameters | None)
- class Mode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
Defines the possible execution modes, which in turn affects execution behavior.
- branch_complete()#
Indicates that we are within a conditional / ifelse block and the active branch is not done. Default to SKIPPED
- take_branch()#
Indicates that we are within an if-else block and the current branch has evaluated to true. Useful only in local execution mode
- with_params(working_dir=None, mode=None, engine_dir=None, branch_eval_mode=None, user_space_params=None)#
Produces a copy of the current execution state and overrides the copy’s parameters with passed parameter values.
- Parameters:
working_dir (Optional[os.PathLike])
mode (Optional[Mode])
engine_dir (Optional[os.PathLike])
branch_eval_mode (Optional[BranchEvalMode])
user_space_params (Optional[ExecutionParameters])
- Return type:
- class flytekit.extend.ImageConfig(default_image=None, images=None)#
We recommend you to use ImageConfig.auto(img_name=None) to create an ImageConfig. For example, ImageConfig.auto(img_name=””ghcr.io/flyteorg/flytecookbook:v1.0.0””) will create an ImageConfig.
ImageConfig holds available images which can be used at registration time. A default image can be specified along with optional additional images. Each image in the config must have a unique name.
- Attributes:
default_image (Optional[Image]): The default image to be used as a container for task serialization. images (List[Image]): Optional, additional images which can be used in task container definitions.
- classmethod auto(config_file=None, img_name=None)#
Reads from config file or from img_name Note that this function does not take into account the flytekit default images (see the Dockerfiles at the base of this repo). To pick those up, see the auto_default_image function..
- Parameters:
config_file (str | ConfigFile | None)
img_name (str | None)
- Returns:
- Return type:
- classmethod from_images(default_image, m=None)#
Allows you to programmatically create an ImageConfig. Usually only the default_image is required, unless your workflow uses multiple images
ImageConfig.from_dict( "ghcr.io/flyteorg/flytecookbook:v1.0.0", { "spark": "ghcr.io/flyteorg/myspark:...", "other": "...", } )
- Returns:
- Parameters:
default_image (str)
m (Dict[str, str] | None)
- static validate_image(_, param, values)#
Validates the image to match the standard format. Also validates that only one default image is provided. a default image, is one that is specified as
default=<image_uri>
or just<image_uri>
. All other images should be provided with a name, in the formatname=<image_uri>
This method can be used with the CLI- Parameters:
_ (Any) – click argument, ignored here.
param (str) – the click argument, here should be “image”
values (tuple) – user-supplied images
- Returns:
- Return type:
- class flytekit.extend.Interface(inputs=None, outputs=None, output_tuple_name=None, docstring=None)#
A Python native interface object, like inspect.signature but simpler.
- Parameters:
inputs (Union[Optional[Dict[str, Type]], Optional[Dict[str, Tuple[Type, Any]]]])
outputs (Union[Optional[Dict[str, Type]], Optional[Dict[str, Optional[Type]]]])
output_tuple_name (Optional[str])
docstring (Optional[Docstring])
- remove_inputs(vars)#
This method is useful in removing some variables from the Union backend inputs specification, as these are implicit local only inputs or will be supplied by the library at runtime. For example, spark-session etc It creates a new instance of interface with the requested variables removed
- Parameters:
vars (List[str] | None)
- Return type:
- with_inputs(extra_inputs)#
Use this to add additional inputs to the interface. This is useful for adding additional implicit inputs that are added without the user requesting for them
- Parameters:
extra_inputs (Dict[str, Type])
- Return type:
- class flytekit.extend.Promise(var, val, type=None)#
This object is a wrapper and exists for three main reasons. Let’s assume we’re dealing with a task like
@task def t1() -> (int, str): ...
Handling the duality between compilation and local execution - when the task function is run in a local execution mode inside a workflow function, a Python integer and string are produced. When the task is being compiled as part of the workflow, the task call creates a Node instead, and the task returns two Promise objects that point to that Node.
One needs to be able to call
x = t1().with_overrides(...)
If the task returns an integer or a
(int, str)
tuple liket1
above, callingwith_overrides
on the result would throw an error. This Promise object adds that.Assorted handling for conditionals.
- Parameters:
var (str)
val (Union[NodeOutput, _literals_models.Literal])
type (Optional[_type_models.LiteralType])
- property attr_path: List[str | int]#
The attribute path the promise will be resolved with. :rtype: List[Union[str, int]]
- property is_ready: bool#
Returns if the Promise is READY (is not a reference and the val is actually ready)
Usage
p = Promise(...) ... if p.is_ready(): print(p.val) else: print(p.ref)
- property ref: NodeOutput#
If the promise is NOT READY / Incomplete, then it maps to the origin node that owns the promise
- property val: Literal#
If the promise is ready then this holds the actual evaluate value in Flyte’s type system
- property var: str#
Name of the variable bound with this promise
- class flytekit.extend.DictTransformer#
Transformer that transforms a univariate dictionary Dict[str, T] to a Literal Map or transforms a untyped dictionary to a JSON (struct/Generic)
- static dict_to_generic_literal(ctx, v, allow_pickle)#
Creates a flyte-specific
Literal
value from a native python dictionary.- Parameters:
ctx (FlyteContext)
v (dict)
allow_pickle (bool)
- Return type:
- get_literal_type(t)#
Transforms a native python dictionary to a flyte-specific
LiteralType
- Parameters:
t (Type[dict])
- Return type:
- guess_python_type(literal_type)#
Converts the Flyte LiteralType to a python object type.
- Parameters:
literal_type (LiteralType)
- Return type:
Union[Type[dict], Dict[Type, Type]]
- to_literal(ctx, python_val, python_type, expected)#
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch :param ctx: A FlyteContext, useful in accessing the filesystem and other attributes :param python_val: The actual value to be transformed :param python_type: The assumed type of the value (this matches the declared type on the function) :param expected: Expected Literal Type
- Parameters:
ctx (FlyteContext)
python_val (Any)
python_type (Type[dict])
expected (LiteralType)
- Return type:
- to_python_value(ctx, lv, expected_python_type)#
Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised :param ctx: FlyteContext :param lv: The received literal Value :param expected_python_type: Expected native python type that should be returned
- Parameters:
ctx (FlyteContext)
lv (Literal)
expected_python_type (Type[dict])
- Return type:
dict
- class flytekit.extend.T#
alias of TypeVar(‘T’)
- class flytekit.extend.TypeEngine#
Core Extensible TypeEngine of Flytekit. This should be used to extend the capabilities of FlyteKits type system. Users can implement their own TypeTransformers and register them with the TypeEngine. This will allow special handling of user objects
- classmethod dict_to_literal_map(ctx, d, type_hints=None)#
Given a dictionary mapping string keys to python values and a dictionary containing guessed types for such string keys, convert to a LiteralMap.
- Parameters:
ctx (FlyteContext)
d (Dict[str, Any])
type_hints (Dict[str, type] | None)
- Return type:
LiteralMap
- classmethod get_available_transformers()#
Returns all python types for which transformers are available
- Return type:
KeysView[Type]
- classmethod get_transformer(python_type)#
The TypeEngine hierarchy for flyteKit. This method looksup and selects the type transformer. The algorithm is as follows
d = dictionary of registered transformers, where is a python type v = lookup type
- Step 1:
If the type is annotated with a TypeTransformer instance, use that.
- Step 2:
find a transformer that matches v exactly
- Step 3:
find a transformer that matches the generic type of v. e.g List[int], Dict[str, int] etc
- Step 4:
Walk the inheritance hierarchy of v and find a transformer that matches the first base class. This is potentially non-deterministic - will depend on the registration pattern.
- Special case:
If v inherits from Enum, use the Enum transformer even if Enum is not the first base class.
TODO lets make this deterministic by using an ordered dict
- Step 5:
if v is of type data class, use the dataclass transformer
- Step 6:
Pickle transformer is used
- Parameters:
python_type (Type)
- Return type:
- classmethod guess_python_type(flyte_type)#
Transforms a flyte-specific
LiteralType
to a regular python value.- Parameters:
flyte_type (LiteralType)
- Return type:
Type[T]
- classmethod guess_python_types(flyte_variable_dict)#
Transforms a dictionary of flyte-specific
Variable
objects to a dictionary of regular python values.- Parameters:
flyte_variable_dict (Dict[str, Variable])
- Return type:
Dict[str, type]
- classmethod lazy_import_transformers()#
Only load the transformers if needed.
- classmethod literal_map_to_kwargs(ctx, lm, python_types=None, literal_types=None)#
Given a
LiteralMap
(usually an input into a task - intermediate), convert to kwargs for the task- Parameters:
ctx (FlyteContext)
lm (LiteralMap)
python_types (Dict[str, type] | None)
literal_types (Dict[str, Variable] | None)
- Return type:
Dict[str, Any]
- classmethod named_tuple_to_variable_map(t)#
Converts a python-native
NamedTuple
to a flyte-specific VariableMap of named literals.- Parameters:
t (NamedTuple)
- Return type:
VariableMap
- classmethod register(transformer, additional_types=None)#
This should be used for all types that respond with the right type annotation when you use type(…) function
- Parameters:
transformer (TypeTransformer)
additional_types (List[Type] | None)
- classmethod to_literal(ctx, python_val, python_type, expected)#
Converts a python value of a given type and expected
LiteralType
into a resolvedLiteral
value.- Parameters:
ctx (FlyteContext)
python_val (Any)
python_type (Type)
expected (LiteralType)
- Return type:
- classmethod to_literal_type(python_type)#
Converts a python type into a flyte specific
LiteralType
- Parameters:
python_type (Type)
- Return type:
- classmethod to_python_value(ctx, lv, expected_python_type)#
Converts a Literal value with an expected python type into a python value.
- Parameters:
ctx (FlyteContext)
lv (Literal)
expected_python_type (Type)
- Return type:
Any
- class flytekit.extend.TypeTransformer(name, t, enable_type_assertions=True)#
Base transformer type that should be implemented for every python native type that can be handled by flytekit
- Parameters:
name (str)
t (Type[T])
enable_type_assertions (bool)
- abstract get_literal_type(t)#
Converts the python type to a Flyte LiteralType
- Parameters:
t (Type[T])
- Return type:
- guess_python_type(literal_type)#
Converts the Flyte LiteralType to a python object type.
- Parameters:
literal_type (LiteralType)
- Return type:
Type[T]
- property python_type: Type[T]#
This returns the python type
- to_html(ctx, python_val, expected_python_type)#
Converts any python val (dataframe, int, float) to a html string, and it will be wrapped in the HTML div
- Parameters:
ctx (FlyteContext)
python_val (T)
expected_python_type (Type[T])
- Return type:
str
- abstract to_literal(ctx, python_val, python_type, expected)#
Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch :param ctx: A FlyteContext, useful in accessing the filesystem and other attributes :param python_val: The actual value to be transformed :param python_type: The assumed type of the value (this matches the declared type on the function) :param expected: Expected Literal Type
- Parameters:
ctx (FlyteContext)
python_val (T)
python_type (Type[T])
expected (LiteralType)
- Return type:
- abstract to_python_value(ctx, lv, expected_python_type)#
Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised :param ctx: FlyteContext :param lv: The received literal Value :param expected_python_type: Expected native python type that should be returned
- Parameters:
ctx (FlyteContext)
lv (Literal)
expected_python_type (Type[T])
- Return type:
T | None
- property type_assertions_enabled: bool#
Indicates if the transformer wants type assertions to be enabled at the core type engine layer
- class flytekit.extend.PythonCustomizedContainerTask(*args, **kwargs)#
Please take a look at the comments for :py:class`flytekit.extend.ExecutableTemplateShimTask` as well. This class should be subclassed and a custom Executor provided as a default to this parent class constructor when building a new external-container flytekit-only plugin.
This class provides authors of new task types the basic scaffolding to create task-template based tasks. In order to write such a task, authors need to
subclass the
ShimTaskExecutor
class and override theexecute_from_model
function. This function is where all the business logic should go. Keep in mind though that you, the plugin author, will not have access to anything that’s not serialized within theTaskTemplate
which is why you’ll also need tosubclass this class, and override the
get_custom
function to include all the information the executor will need to run.Also pass the executor you created as the
executor_type
argument of this class’s constructor.
Keep in mind that the total size of the
TaskTemplate
still needs to be small, since these will be accessed frequently by the Flyte engine.- get_config(settings)#
Returns the task config as a serializable dictionary. This task config consists of metadata about the custom defined for this task.
- Parameters:
settings (SerializationSettings)
- Return type:
Dict[str, str]
- get_container(settings)#
Returns the container definition (if any) that is used to run the task on hosted Flyte.
- Parameters:
settings (SerializationSettings)
- Return type:
Container
- get_custom(settings)#
Return additional plugin-specific custom data (if any) as a serializable dictionary.
- Parameters:
settings (SerializationSettings)
- Return type:
Dict[str, Any]
- property task_template: TaskTemplate | None#
Override the base class implementation to serialize on first call.
- class flytekit.extend.ExecutableTemplateShimTask(tt, executor_type, *args, **kwargs)#
The canonical
@task
decorated Python function task is pretty simple to reason about. At execution time (either locally or on a Flyte cluster), the function runs.This class, along with the
ShimTaskExecutor
class below, represents another execution pattern. This pattern, has two components:The
TaskTemplate
, or something like it like aFlyteTask
.An executor, which can use information from the task template (including the
custom
field)
Basically at execution time (both locally and on a Flyte cluster), the task template is given to the executor, which is responsible for computing and returning the results.
Note
The interface at execution time will have to derived from the Flyte IDL interface, which means it may be lossy. This is because when a task is serialized from Python into the
TaskTemplate
some information is lost because Flyte IDL can’t keep track of every single Python type (or Java type if writing in the Java flytekit).This class also implements the
dispatch_execute
andexecute
functions to make it look like aPythonTask
that theentrypoint.py
can execute, even though this class doesn’t inherit fromPythonTask
.- Parameters:
tt (_task_model.TaskTemplate)
executor_type (Type[ShimTaskExecutor])
- dispatch_execute(ctx, input_literal_map)#
This function is largely similar to the base PythonTask, with the exception that we have to infer the Python interface before executing. Also, we refer to
self.task_template
rather than justself
similar to task classes that derive from the basePythonTask
.- Parameters:
ctx (FlyteContext)
input_literal_map (LiteralMap)
- Return type:
LiteralMap | DynamicJobSpec
- execute(**kwargs)#
Rather than running here, send everything to the executor.
- Return type:
Any
- property name: str#
Return the name of the underlying task.
- post_execute(_, rval)#
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.
- Parameters:
_ (ExecutionParameters | None)
rval (Any)
- Return type:
Any
- pre_execute(user_params)#
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.
- Parameters:
user_params (ExecutionParameters | None)
- Return type:
ExecutionParameters | None
- class flytekit.extend.ShimTaskExecutor(*args, **kwargs)#
- execute_from_model(tt, **kwargs)#
This function must be overridden and is where all the business logic for running a task should live. Keep in mind that you’re only working with the
TaskTemplate
. You won’t have access to any information in the task that wasn’t serialized into the template.- Parameters:
tt (TaskTemplate) – This is the template, the serialized form of the task.
kwargs – These are the Python native input values to the task.
- Returns:
Python native output values from the task.
- Return type:
Any