Spark on Databricks agent#

Note

To use the Databricks agent, you must pip install flytekitplugins-spark.

flytekitplugins.spark.new_spark_session(name, conf=None)#

Optionally creates a new spark session and returns it. In cluster mode (running in hosted flyte, this will disregard the spark conf passed in)

This method is safe to be used from any other method. That is one reason why, we have duplicated this code fragment with the pre-execute. For example in the notebook scenario we might want to call it from a separate kernel

Parameters:
  • name (str)

  • conf (Dict[str, str])

class flytekitplugins.spark.Spark(spark_conf=None, hadoop_conf=None, executor_path=None, applications_path=None)#

Use this to configure a SparkContext for a your task. Task’s marked with this will automatically execute natively onto K8s as a distributed execution of spark

Args:

spark_conf: Dictionary of spark config. The variables should match what spark expects hadoop_conf: Dictionary of hadoop conf. The variables should match a typical hadoop configuration for spark executor_path: Python binary executable to use for PySpark in driver and executor. applications_path: MainFile is the path to a bundled JAR, Python, or R file of the application to execute.

Parameters:
  • spark_conf (Dict[str, str] | None)

  • hadoop_conf (Dict[str, str] | None)

  • executor_path (str | None)

  • applications_path (str | None)

class flytekitplugins.spark.SparkDataFrameSchemaReader(from_path, cols, fmt)#

Implements how SparkDataFrame should be read using the open method of FlyteSchema

Parameters:
  • from_path (str)

  • cols (Dict[str, type] | None)

  • fmt (SchemaFormat)

class flytekitplugins.spark.SparkDataFrameSchemaWriter(to_path, cols, fmt)#

Implements how SparkDataFrame should be written to using open method of FlyteSchema

Parameters:
  • to_path (str)

  • cols (Dict[str, type] | None)

  • fmt (SchemaFormat)

class flytekitplugins.spark.SparkDataFrameTransformer#

Transforms Spark DataFrame’s to and from a Schema (typed/untyped)

get_literal_type(t)#

Converts the python type to a Flyte LiteralType

Parameters:

t (Type[DataFrame])

Return type:

LiteralType

to_literal(ctx, python_val, python_type, expected)#

Converts a given python_val to a Flyte Literal, assuming the given python_val matches the declared python_type. Implementers should refrain from using type(python_val) instead rely on the passed in python_type. If these do not match (or are not allowed) the Transformer implementer should raise an AssertionError, clearly stating what was the mismatch :param ctx: A FlyteContext, useful in accessing the filesystem and other attributes :param python_val: The actual value to be transformed :param python_type: The assumed type of the value (this matches the declared type on the function) :param expected: Expected Literal Type

Parameters:
Return type:

Literal

to_python_value(ctx, lv, expected_python_type)#

Converts the given Literal to a Python Type. If the conversion cannot be done an AssertionError should be raised :param ctx: FlyteContext :param lv: The received literal Value :param expected_python_type: Expected native python type that should be returned

Parameters:
Return type:

T

class flytekitplugins.spark.ParquetToSparkDecodingHandler#
decode(ctx, flyte_value, current_task_metadata)#

This is code that will be called by the dataset transformer engine to ultimately translate from a Flyte Literal value into a Python instance.

Parameters:
  • ctx (FlyteContext) – A FlyteContext, useful in accessing the filesystem and other attributes

  • flyte_value (StructuredDataset) – This will be a Flyte IDL StructuredDataset Literal - do not confuse this with the StructuredDataset class defined also in this module.

  • current_task_metadata (StructuredDatasetMetadata) – Metadata object containing the type (and columns if any) for the currently executing task. This type may have more or less information than the type information bundled inside the incoming flyte_value.

Returns:

This function can either return an instance of the dataframe that this decoder handles, or an iterator of those dataframes.

Return type:

DataFrame

class flytekitplugins.spark.DatabricksV2(spark_conf=None, hadoop_conf=None, executor_path=None, applications_path=None, databricks_conf=None, databricks_instance=None)#

Use this to configure a Databricks task. Task’s marked with this will automatically execute natively onto databricks platform as a distributed execution of spark

Args:

databricks_conf: Databricks job configuration compliant with API version 2.1, supporting 2.0 use cases. For the configuration structure, visit here.https://docs.databricks.com/dev-tools/api/2.0/jobs.html#request-structure For updates in API 2.1, refer to: https://docs.databricks.com/en/workflows/jobs/jobs-api-updates.html databricks_instance: Domain name of your deployment. Use the form <account>.cloud.databricks.com.

Parameters:
  • spark_conf (Dict[str, str] | None)

  • hadoop_conf (Dict[str, str] | None)

  • executor_path (str | None)

  • applications_path (str | None)

  • databricks_conf (Dict[str, str | dict] | None)

  • databricks_instance (str | None)