Genomic alignment
Code available here.
This tutorial builds a bioinformatics pipeline that aligns raw sequencing reads to a reference genome. The workflow downloads a reference genome and paired-end sequencing data, performs quality filtering, builds a reference index, and aligns the filtered reads with the Bowtie 2 aligner — running each sample in parallel.
It’s a good showcase of how Flyte handles real bioinformatics workloads:
- Per-task resources so quality filtering, indexing, and alignment each request exactly the CPU and memory they need.
cache="auto"on the download and indexing steps, so re-runs skip work that hasn’t changed.- Fan-out parallelism across samples with
asyncio.gather. - System dependencies (
fastp,bowtie2) installed into the container image withapt.
Define the container image
Because the pipeline shells out to bioinformatics tools, we build a custom image with flyte.Image.from_uv_script and install fastp (quality filtering) and bowtie2 (alignment) via apt.
main_img = (
flyte.Image.from_uv_script(
__file__,
name="alignment-tutorial",
)
.with_apt_packages("fastp", "bowtie2")
)
The Python dependencies are declared at the top of the file using the uv script style:
# /// script
# requires-python = "3.12"
# dependencies = [
# "flyte",
# "requests",
# ]
# main = "alignment_wf"
# ///Define the task environments
Each stage runs in its own TaskEnvironment with tailored resources. The top-level base_env declares the others as depends_on so the tasks it calls are available at run time.
fetch_env = flyte.TaskEnvironment(
name="alignment-tutorial-fetch",
image=main_img,
cache="auto",
)
fastp_env = flyte.TaskEnvironment(
name="alignment-tutorial-fastp",
image=main_img,
resources=flyte.Resources(memory="2Gi"),
)
index_env = flyte.TaskEnvironment(
name="alignment-tutorial-index",
image=main_img,
resources=flyte.Resources(memory="10Gi"),
cache="auto",
)
align_env = flyte.TaskEnvironment(
name="alignment-tutorial-align",
image=main_img,
resources=flyte.Resources(cpu=2, memory="10Gi"),
)
base_env = flyte.TaskEnvironment(
name="alignment-tutorial",
image=main_img,
depends_on=[fetch_env, fastp_env, index_env, align_env],
)
Define the data classes
We model the reference genome, sequencing reads, and alignment results as dataclasses. flyte.io.File and flyte.io.Dir reference offloaded data in blob storage, so large genomic files are passed between tasks by reference rather than copied through the orchestrator.
@dataclass
class Reference:
"""
Represents a reference FASTA and associated index files.
Attributes:
ref_name (str): Name or identifier of the reference file.
ref_dir (Dir): Directory containing the reference and any index files.
index_name (str): Index string to pass to tools requiring it.
indexed_with (str): Name of tool used to create the index.
"""
ref_name: str
ref_dir: Dir
index_name: str | None = None
indexed_with: str | None = None
# Sequencing reads are the raw data generated from a sequencing experiment.
@dataclass
class Reads:
"""
Represents a sequencing reads sample via its associated FastQ files.
Attributes:
sample (str): The name or identifier of the raw sequencing sample.
read1 (File): A File object representing the path to the raw R1 read file.
read2 (File): A File object representing the path to the raw R2 read file.
"""
sample: str
read1: File | None = None
read2: File | None = None
def get_read_fnames(self):
return (
f"{self.sample}_1.fastq.gz",
f"{self.sample}_2.fastq.gz",
)
# Finally, we define an `Alignment` data class to represent an alignment file.
@dataclass
class Alignment:
"""
Represents an alignment file and its associated sample.
Attributes:
sample (str): The name or identifier of the sample.
aligner (str): The name of the aligner used to generate the alignment file.
format (str): The format of the alignment file (e.g., SAM, BAM).
alignment (File): A File object representing the path to the alignment file.
"""
sample: str
aligner: str
format: str | None = None
alignment: File | None = None
def get_alignment_fname(self):
return f"{self.sample}_{self.aligner}_aligned.{self.format}"
Fetch assets
The first task downloads the reference genome and paired-end reads from remote URLs and materializes them as File/Dir objects. It’s cached, so repeat runs skip the download.
@fetch_env.task
async def fetch_assets(
ref_url: str, read_urls: List[str]
) -> tuple[Reference, List[Reads]]:
"""
Fetch assets from remote URLs.
"""
# Download reference genome
ref_dir = Path("/tmp/reference_genome")
ref_dir.mkdir(exist_ok=True, parents=True)
ref = fetch_file(ref_url, str(ref_dir))
ref_obj = Reference(
ref_name=ref.name,
ref_dir=await Dir.from_local(str(ref_dir)),
)
# Download sequencing reads
dl_loc = Path("/tmp/reads")
dl_loc.mkdir(exist_ok=True, parents=True)
samples: dict[str, Reads] = {}
for url in read_urls:
fp = fetch_file(url, str(dl_loc))
sample = fp.stem.split("_")[0]
if sample not in samples:
samples[sample] = Reads(sample=sample)
if ".fastq.gz" in fp.name or "fasta" in fp.name:
mate = fp.name.strip(".fastq.gz").strip(".filt").split("_")[-1]
if "1" in mate:
samples[sample].read1 = await File.from_local(str(fp))
elif "2" in mate:
samples[sample].read2 = await File.from_local(str(fp))
return ref_obj, list(samples.values())
Quality filtering with fastp
pyfastp removes duplicate and low-quality reads. It requests extra memory so it can process larger read files efficiently.
@fastp_env.task
async def pyfastp(rs: Reads) -> Reads:
"""
Perform quality filtering and preprocessing using Fastp on a Reads object.
Args:
rs (Reads): A Reads object containing raw sequencing data to be processed.
Returns:
Reads: A Reads object representing the filtered and preprocessed data.
"""
ldir = Path(tempfile.mkdtemp())
samp = Reads(rs.sample)
o1, o2 = samp.get_read_fnames()
o1p = ldir / o1
o2p = ldir / o2
assert rs.read1 is not None and rs.read2 is not None
r1 = await rs.read1.download()
r2 = await rs.read2.download()
cmd = [
"fastp",
"-i", str(r1),
"-I", str(r2),
"-o", str(o1p),
"-O", str(o2p),
]
subprocess.run(cmd, check=True)
samp.read1 = await File.from_local(str(o1p))
samp.read2 = await File.from_local(str(o2p))
return samp
Build the Bowtie 2 index
A reference index rarely changes, so this task is cached.
@index_env.task
async def bowtie2_index(ref: Reference) -> Reference:
"""
Generate Bowtie2 index files from a reference genome.
Args:
ref (Reference): A Reference object representing the reference genome.
Returns:
Reference: The same reference object with the index_name and indexed_with attributes set.
"""
ref_dir = await ref.ref_dir.download()
idx_name = "bt2_idx"
cmd = [
"bowtie2-build",
str(Path(str(ref_dir)) / ref.ref_name),
str(Path(str(ref_dir)) / idx_name),
]
subprocess.run(cmd, check=True)
return Reference(
ref.ref_name,
await Dir.from_local(str(ref_dir)),
idx_name,
"bowtie2",
)
Align reads
Each sample is aligned to the indexed reference with Bowtie 2, producing a SAM file.
@align_env.task
async def bowtie2_align_paired_reads(idx: Reference, fs: Reads) -> Alignment:
"""
Perform paired-end alignment using Bowtie 2 on a filtered sample.
Args:
idx (Reference): A Reference object containing the Bowtie 2 index.
fs (Reads): A filtered Reads object containing sample data to be aligned.
Returns:
Alignment: An Alignment object representing the alignment result.
"""
assert idx.indexed_with == "bowtie2", "Reference index must be generated with bowtie2"
assert idx.index_name is not None
assert fs.read1 is not None and fs.read2 is not None
ref_dir = await idx.ref_dir.download()
r1 = await fs.read1.download()
r2 = await fs.read2.download()
ldir = Path(tempfile.mkdtemp())
alignment = Alignment(fs.sample, "bowtie2", "sam")
al = ldir / alignment.get_alignment_fname()
cmd = [
"bowtie2",
"-x", str(Path(str(ref_dir)) / idx.index_name),
"-1", str(r1),
"-2", str(r2),
"-S", str(al),
]
subprocess.run(cmd, check=True)
alignment.alignment = await File.from_local(str(al))
return alignment
Orchestrate the workflow
The top-level task fetches the assets, filters every sample in parallel, builds the index, and aligns all samples. Parallelism across samples is achieved with asyncio.gather rather than a separate @dynamic decorator.
@base_env.task
async def alignment_wf() -> List[Alignment]:
# Prepare raw samples from remote URLs
ref, samples = await fetch_assets(
ref_url="https://github.com/unionai-oss/unionbio/raw/main/tests/assets/references/GRCh38_short.fasta",
read_urls=[
"https://github.com/unionai-oss/unionbio/raw/main/tests/assets/sequences/raw/ERR250683-tiny_1.fastq.gz",
"https://github.com/unionai-oss/unionbio/raw/main/tests/assets/sequences/raw/ERR250683-tiny_2.fastq.gz",
],
)
# Filter all samples in parallel
filtered_samples = list(
await asyncio.gather(*[pyfastp(rs=s) for s in samples])
)
# Generate a bowtie2 index or load it from cache
bowtie2_idx = await bowtie2_index(ref=ref)
# Generate alignments using bowtie2
sams = await bowtie2_align_samples(idx=bowtie2_idx, samples=filtered_samples)
return sams
Run the workflow
This example has no secrets or external API keys — it pulls public test data from GitHub.
From the
example directory, run it as a uv script:
cd v2/tutorials/genomic_alignment
uv run --script genomic_alignment.pyOr submit it with the Flyte CLI:
flyte run genomic_alignment.py alignment_wfWhen the run completes, each returned Alignment points to a SAM file in blob storage that you can download from the run’s outputs in the UI.