Genomic alignment

Code available here.

This tutorial builds a bioinformatics pipeline that aligns raw sequencing reads to a reference genome. The workflow downloads a reference genome and paired-end sequencing data, performs quality filtering, builds a reference index, and aligns the filtered reads with the Bowtie 2 aligner — running each sample in parallel.

It’s a good showcase of how Flyte handles real bioinformatics workloads:

  • Per-task resources so quality filtering, indexing, and alignment each request exactly the CPU and memory they need.
  • cache="auto" on the download and indexing steps, so re-runs skip work that hasn’t changed.
  • Fan-out parallelism across samples with asyncio.gather.
  • System dependencies (fastp, bowtie2) installed into the container image with apt.

Define the container image

Because the pipeline shells out to bioinformatics tools, we build a custom image with flyte.Image.from_uv_script and install fastp (quality filtering) and bowtie2 (alignment) via apt.

genomic_alignment.py
main_img = (
    flyte.Image.from_uv_script(
        __file__,
        name="alignment-tutorial",
    )
    .with_apt_packages("fastp", "bowtie2")
)

The Python dependencies are declared at the top of the file using the uv script style:

# /// script
# requires-python = "3.12"
# dependencies = [
#    "flyte",
#    "requests",
# ]
# main = "alignment_wf"
# ///

Define the task environments

Each stage runs in its own TaskEnvironment with tailored resources. The top-level base_env declares the others as depends_on so the tasks it calls are available at run time.

genomic_alignment.py
fetch_env = flyte.TaskEnvironment(
    name="alignment-tutorial-fetch",
    image=main_img,
    cache="auto",
)

fastp_env = flyte.TaskEnvironment(
    name="alignment-tutorial-fastp",
    image=main_img,
    resources=flyte.Resources(memory="2Gi"),
)

index_env = flyte.TaskEnvironment(
    name="alignment-tutorial-index",
    image=main_img,
    resources=flyte.Resources(memory="10Gi"),
    cache="auto",
)

align_env = flyte.TaskEnvironment(
    name="alignment-tutorial-align",
    image=main_img,
    resources=flyte.Resources(cpu=2, memory="10Gi"),
)

base_env = flyte.TaskEnvironment(
    name="alignment-tutorial",
    image=main_img,
    depends_on=[fetch_env, fastp_env, index_env, align_env],
)

Define the data classes

We model the reference genome, sequencing reads, and alignment results as dataclasses. flyte.io.File and flyte.io.Dir reference offloaded data in blob storage, so large genomic files are passed between tasks by reference rather than copied through the orchestrator.

genomic_alignment.py
@dataclass
class Reference:
    """
    Represents a reference FASTA and associated index files.

    Attributes:
        ref_name (str): Name or identifier of the reference file.
        ref_dir (Dir): Directory containing the reference and any index files.
        index_name (str): Index string to pass to tools requiring it.
        indexed_with (str): Name of tool used to create the index.
    """

    ref_name: str
    ref_dir: Dir
    index_name: str | None = None
    indexed_with: str | None = None


# Sequencing reads are the raw data generated from a sequencing experiment.


@dataclass
class Reads:
    """
    Represents a sequencing reads sample via its associated FastQ files.

    Attributes:
        sample (str): The name or identifier of the raw sequencing sample.
        read1 (File): A File object representing the path to the raw R1 read file.
        read2 (File): A File object representing the path to the raw R2 read file.
    """

    sample: str
    read1: File | None = None
    read2: File | None = None

    def get_read_fnames(self):
        return (
            f"{self.sample}_1.fastq.gz",
            f"{self.sample}_2.fastq.gz",
        )


# Finally, we define an `Alignment` data class to represent an alignment file.


@dataclass
class Alignment:
    """
    Represents an alignment file and its associated sample.

    Attributes:
        sample (str): The name or identifier of the sample.
        aligner (str): The name of the aligner used to generate the alignment file.
        format (str): The format of the alignment file (e.g., SAM, BAM).
        alignment (File): A File object representing the path to the alignment file.
    """

    sample: str
    aligner: str
    format: str | None = None
    alignment: File | None = None

    def get_alignment_fname(self):
        return f"{self.sample}_{self.aligner}_aligned.{self.format}"

Fetch assets

The first task downloads the reference genome and paired-end reads from remote URLs and materializes them as File/Dir objects. It’s cached, so repeat runs skip the download.

genomic_alignment.py
@fetch_env.task
async def fetch_assets(
    ref_url: str, read_urls: List[str]
) -> tuple[Reference, List[Reads]]:
    """
    Fetch assets from remote URLs.
    """
    # Download reference genome
    ref_dir = Path("/tmp/reference_genome")
    ref_dir.mkdir(exist_ok=True, parents=True)
    ref = fetch_file(ref_url, str(ref_dir))
    ref_obj = Reference(
        ref_name=ref.name,
        ref_dir=await Dir.from_local(str(ref_dir)),
    )

    # Download sequencing reads
    dl_loc = Path("/tmp/reads")
    dl_loc.mkdir(exist_ok=True, parents=True)

    samples: dict[str, Reads] = {}
    for url in read_urls:
        fp = fetch_file(url, str(dl_loc))
        sample = fp.stem.split("_")[0]

        if sample not in samples:
            samples[sample] = Reads(sample=sample)

        if ".fastq.gz" in fp.name or "fasta" in fp.name:
            mate = fp.name.strip(".fastq.gz").strip(".filt").split("_")[-1]
            if "1" in mate:
                samples[sample].read1 = await File.from_local(str(fp))
            elif "2" in mate:
                samples[sample].read2 = await File.from_local(str(fp))

    return ref_obj, list(samples.values())

Quality filtering with fastp

pyfastp removes duplicate and low-quality reads. It requests extra memory so it can process larger read files efficiently.

genomic_alignment.py
@fastp_env.task
async def pyfastp(rs: Reads) -> Reads:
    """
    Perform quality filtering and preprocessing using Fastp on a Reads object.

    Args:
        rs (Reads): A Reads object containing raw sequencing data to be processed.

    Returns:
        Reads: A Reads object representing the filtered and preprocessed data.
    """
    ldir = Path(tempfile.mkdtemp())
    samp = Reads(rs.sample)
    o1, o2 = samp.get_read_fnames()
    o1p = ldir / o1
    o2p = ldir / o2

    assert rs.read1 is not None and rs.read2 is not None
    r1 = await rs.read1.download()
    r2 = await rs.read2.download()

    cmd = [
        "fastp",
        "-i", str(r1),
        "-I", str(r2),
        "-o", str(o1p),
        "-O", str(o2p),
    ]
    subprocess.run(cmd, check=True)

    samp.read1 = await File.from_local(str(o1p))
    samp.read2 = await File.from_local(str(o2p))

    return samp

Build the Bowtie 2 index

A reference index rarely changes, so this task is cached.

genomic_alignment.py
@index_env.task
async def bowtie2_index(ref: Reference) -> Reference:
    """
    Generate Bowtie2 index files from a reference genome.

    Args:
        ref (Reference): A Reference object representing the reference genome.

    Returns:
        Reference: The same reference object with the index_name and indexed_with attributes set.
    """
    ref_dir = await ref.ref_dir.download()
    idx_name = "bt2_idx"
    cmd = [
        "bowtie2-build",
        str(Path(str(ref_dir)) / ref.ref_name),
        str(Path(str(ref_dir)) / idx_name),
    ]
    subprocess.run(cmd, check=True)
    return Reference(
        ref.ref_name,
        await Dir.from_local(str(ref_dir)),
        idx_name,
        "bowtie2",
    )

Align reads

Each sample is aligned to the indexed reference with Bowtie 2, producing a SAM file.

genomic_alignment.py
@align_env.task
async def bowtie2_align_paired_reads(idx: Reference, fs: Reads) -> Alignment:
    """
    Perform paired-end alignment using Bowtie 2 on a filtered sample.

    Args:
        idx (Reference): A Reference object containing the Bowtie 2 index.
        fs (Reads): A filtered Reads object containing sample data to be aligned.

    Returns:
        Alignment: An Alignment object representing the alignment result.
    """
    assert idx.indexed_with == "bowtie2", "Reference index must be generated with bowtie2"
    assert idx.index_name is not None
    assert fs.read1 is not None and fs.read2 is not None

    ref_dir = await idx.ref_dir.download()
    r1 = await fs.read1.download()
    r2 = await fs.read2.download()

    ldir = Path(tempfile.mkdtemp())
    alignment = Alignment(fs.sample, "bowtie2", "sam")
    al = ldir / alignment.get_alignment_fname()

    cmd = [
        "bowtie2",
        "-x", str(Path(str(ref_dir)) / idx.index_name),
        "-1", str(r1),
        "-2", str(r2),
        "-S", str(al),
    ]
    subprocess.run(cmd, check=True)

    alignment.alignment = await File.from_local(str(al))
    return alignment

Orchestrate the workflow

The top-level task fetches the assets, filters every sample in parallel, builds the index, and aligns all samples. Parallelism across samples is achieved with asyncio.gather rather than a separate @dynamic decorator.

genomic_alignment.py
@base_env.task
async def alignment_wf() -> List[Alignment]:
    # Prepare raw samples from remote URLs
    ref, samples = await fetch_assets(
        ref_url="https://github.com/unionai-oss/unionbio/raw/main/tests/assets/references/GRCh38_short.fasta",
        read_urls=[
            "https://github.com/unionai-oss/unionbio/raw/main/tests/assets/sequences/raw/ERR250683-tiny_1.fastq.gz",
            "https://github.com/unionai-oss/unionbio/raw/main/tests/assets/sequences/raw/ERR250683-tiny_2.fastq.gz",
        ],
    )

    # Filter all samples in parallel
    filtered_samples = list(
        await asyncio.gather(*[pyfastp(rs=s) for s in samples])
    )

    # Generate a bowtie2 index or load it from cache
    bowtie2_idx = await bowtie2_index(ref=ref)

    # Generate alignments using bowtie2
    sams = await bowtie2_align_samples(idx=bowtie2_idx, samples=filtered_samples)

    return sams

Run the workflow

This example has no secrets or external API keys — it pulls public test data from GitHub.

From the example directory, run it as a uv script:

cd v2/tutorials/genomic_alignment
uv run --script genomic_alignment.py

Or submit it with the Flyte CLI:

flyte run genomic_alignment.py alignment_wf

When the run completes, each returned Alignment points to a SAM file in blob storage that you can download from the run’s outputs in the UI.