The page you navigated to () does not exist, so we brought you to the closest page to it.
You have switched from the to the variant of this site. There is no equivalent of . We have taken you to the closest page in the variant.
Sqlite3
Once you have a Union account, install union:
pip install unionExport the following environment variable to build and push images to your own container registry:
# replace with your registry name
export IMAGE_SPEC_REGISTRY="<your-container-registry>"Then run the following commands to run the workflow:
$ git clone https://github.com/unionai/unionai-examples
$ cd unionai-examples
$ union run --remote <path/to/file.py> <workflow_name> <params>The source code for this example can be found here.
import pandas
from flytekit import kwtypes, task, workflow
from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3TaskSQLite3 queries in flyte produce a Schema output. The data in this example is taken from here.
from flytekit.types.schema import FlyteSchema
EXAMPLE_DB = "https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip"the task is declared as a regular task. Alternatively it can be declared within a workflow just at the point of using it (example later)
sql_task = SQLite3Task(
name="sqlite3.sample",
query_template="select TrackId, Name from tracks limit {{.inputs.limit}}",
inputs=kwtypes(limit=int),
output_schema_type=FlyteSchema[kwtypes(TrackId=int, Name=str)],
task_config=SQLite3Config(uri=EXAMPLE_DB, compressed=True),
)As described elsewhere FlyteSchemas can be easily be received as pandas Dataframe and Flyte will autoconvert them
@task
def print_and_count_columns(df: pandas.DataFrame) -> int:
return len(df[df.columns[0]])The task can be used normally in the workflow, passing the declared inputs
@workflow
def wf() -> int:
return print_and_count_columns(df=sql_task(limit=100))It can also be executed locally.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running main {wf()}")As mentioned earlier it is possible to also write the SQL Task inline as follows
@workflow
def query_wf() -> int:
df = SQLite3Task(
name="sqlite3.sample_inline",
query_template="select TrackId, Name from tracks limit {{.inputs.limit}}",
inputs=kwtypes(limit=int),
output_schema_type=FlyteSchema[kwtypes(TrackId=int, Name=str)],
task_config=SQLite3Config(uri=EXAMPLE_DB, compressed=True),
)(limit=100)
return print_and_count_columns(df=df)It can also be executed locally.
if __name__ == "__main__":
print(f"Running {__file__} main...")
print(f"Running main {query_wf()}")