Running a Spark Task
Once you have a Union account, install union:
pip install unionExport the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"Then run the following commands to run the workflow:
$ git clone https://github.com/unionai/unionai-examples
$ cd unionai-examples
$ union run --remote <path/to/file.py> <workflow_name> <params>The source code for this example can be found here.
import datetime
import random
from operator import add
import flytekit
from flytekit import ImageSpec, Resources, task, workflow
from flytekitplugins.spark import SparkCreate an ImageSpec to automate the retrieval of a prebuilt Spark image.
custom_image = ImageSpec(python_version="3.9", registry="ghcr.io/flyteorg", packages=["flytekitplugins-spark"])Replace ghcr.io/flyteorg with a container registry you’ve access to publish to.
To upload the image to the local registry in the demo cluster, indicate the registry as localhost:30000.
To create a Spark task, add flytekitplugins.spark.Spark config to the Flyte task.
The spark_conf parameter can encompass configuration choices commonly employed when setting up a Spark cluster.
Additionally, if necessary, you can provide hadoop_conf as an input.
@task(
task_config=Spark(
# This configuration is applied to the Spark cluster
spark_conf={
"spark.driver.memory": "1000M",
"spark.executor.memory": "1000M",
"spark.executor.cores": "1",
"spark.executor.instances": "2",
"spark.driver.cores": "1",
"spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar",
}
),
limits=Resources(mem="2000M"),
container_image=custom_image,
)
def hello_spark(partitions: int) -> float:
print("Starting Spark with Partitions: {}".format(partitions))
n = 1 * partitions
sess = flytekit.current_context().spark_session
count = sess.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
pi_val = 4.0 * count / n
return pi_valThe hello_spark task initiates a new Spark cluster.
When executed locally, it sets up a single-node client-only cluster.
However, when executed remotely, it dynamically scales the cluster size based on the specified Spark configuration.
For this particular example,
we define a function upon which the map-reduce operation is invoked within the Spark cluster.
def f(_):
x = random.random() * 2 - 1
y = random.random() * 2 - 1
return 1 if x**2 + y**2 <= 1 else 0Additionally, we specify a standard Flyte task that won’t be executed on the Spark cluster.
@task(
cache_version="2",
container_image=custom_image,
)
def print_every_time(value_to_print: float, date_triggered: datetime.datetime) -> int:
print("My printed value: {} @ {}".format(value_to_print, date_triggered))
return 1Finally, define a workflow that connects your tasks in a sequence. Remember, Spark and non-Spark tasks can be chained together as long as their parameter specifications match.
@workflow
def my_spark(triggered_date: datetime.datetime = datetime.datetime(2020, 9, 11)) -> float:
"""
Using the workflow is still as any other workflow. As image is a property of the task, the workflow does not care
about how the image is configured.
"""
pi = hello_spark(partitions=1)
print_every_time(value_to_print=pi, date_triggered=triggered_date)
return piYou can execute the workflow locally. Certain aspects of Spark, such as links to Hive meta stores, may not work in the local execution, but these limitations are inherent to using Spark and are not introduced by Flyte.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(
f"Running my_spark(triggered_date=datetime.datetime.now()) {my_spark(triggered_date=datetime.datetime.now())}"
)