Pryce Turner
Samhita Alla

Human-in-the-Loop Pipelines

Made with DALL·E 2

By walking through a genomic alignments code and a Streamlit app, explore how Union makes it easier to connect external inputs to pipelines.

Reinforcement Learning from Human Feedback (RLHF) has proven to elicit reliable responses from Large Language Models (LLMs) since it aligns closely with human values. In the realm of data, machine learning, analytics and bioinformatics, Human-in-the-Loop (HITL) methodologies become pivotal. Not only do they integrate human feedback into pipelines, but they also simplify the development of real-world applications.

An example of HITL in action: a workflow designed to wait for an explicit approval signal before advancing with a machine learning model deployment. Consider a scenario where you need to evaluate the model's output before granting approval for downstream deployment to a serving layer. Now, think about gradually deploying a model (in canary deployment, for example). This method requires feedback from users for the rollout to move forward, and HITL pipelines can bridge the gap between automation and human judgment. 

Typically, HITL is managed in a separate stack, creating fragmentation when building and executing the entire pipeline. Integrating HITL directly into pipelines accelerates development, and Union.ai offers native support for human inputs. Flyte, the open-source AI pipeline orchestrator that powers Union, supports the provision of external inputs during the execution of a pipeline — a feature especially advantageous in tasks such as model deployment, data labeling and active learning. Beyond human input, Flyte enables any external process to provide inputs seamlessly to the pipeline. 

Clicking on the icon of the `title` task node or the Resume button on the sidebar will create a modal form that you can use to provide the custom title input.

This article delves into how Flyte's HITL feature can be leveraged in genomic sequencing analysis during read alignment. We also demonstrate the creation of a Streamlit app that allows users to run the pipeline interactively using Flyte.

The variant discovery pipeline

We’re using HITL in the first section of the variant discovery pipeline: alignment. In bioinformatics and genomics, genomic alignment refers to the process of aligning DNA or RNA sequences to a reference genome. The goal is to identify the positions where the sequences from the sample (query sequences) match or are homologous to the corresponding regions in an established standard for comparison: the reference genome.

Genomic alignment is a fundamental step in many genomic analyses, including variant calling, which applies tools to identify genomic variations such as single nucleotide polymorphisms (SNPs), insertions, deletions and structural variations. 

Accurate downstream analysis requires effective genomic alignment. Because high-quality results depend on high-quality data, you need an extra set of eyes for an explicit verification step. 

The role of human-in-the-loop 

We’re highlighting the HITL step within the alignment pipeline of the variant discovery workflow.  Here, human input serves as a crucial gate to perform quality control and preprocessing performance checks before more computationally intensive downstream analysis.

Aligning reads to a reference genome is a crucial first step in a variant discovery pipeline. First off, we use FastQC, a very common tool written in Java for quantifying read quality. Here’s how it’s implemented:

Copied to clipboard!
fastqc = ShellTask(
    name="fastqc",
    debug=True,
    container_image=base_image,
    metadata=TaskMetadata(retries=3, cache=True, cache_version="1"),
    script="""
    mkdir {outputs.qc}
    fastqc {inputs.seq_dir}/*.fastq.gz --outdir={outputs.qc}
    """,
    inputs=kwtypes(seq_dir=FlyteDirectory),
    output_locs=[
        OutputLocation(var="qc", var_type=FlyteDirectory, location="/root/qc")
    ],
)

We’re using Flyte’s native `ShellTask` to make an output directory and then run FastQC on a directory of `.fastq.gz` files. We’ll then write those reports to an output directory that gets assigned to the `fastqc` variable for use later in the workflow.

For every file processed, FastQC creates a summary with a simple PASS/WARN/FAIL across a number of metrics. We’re making use of workflow conditionals here to fail a workflow automatically if there are any FAILs in any included sample. This is a quick and early test of sample quality that doesn’t require any external input.

Moving on, we’ll use Fastp, a tool designed for quickly preprocessing high-throughput sequencing data. It is specifically developed for tasks such as quality control, adapter removal and filtering raw sequencing reads. Fastp is implemented as a regular Python task in Flyte and operates on a per-sample basis, taking in a `fastq.gz` file for each read and outputting its filtered equivalent. It also outputs a report, which we’ll similarly capture to use later. Here’s the code:

Copied to clipboard!
@task(requests=Resources(cpu="1", mem="2Gi"), container_image=base_image)
def pyfastp(rs: RawSample) -> FiltSample:
    ldir = Path(current_context().working_directory)
    o1p = ldir.joinpath(f"{rs.sample}_1_filt.fq.gz")
    o2p = ldir.joinpath(f"{rs.sample}_2_filt.fq.gz")
    rep = ldir.joinpath(f"{rs.sample}_report.json")
    
    logger.debug(f"Writing filtered reads to {o1p} and {o2p} and report to {rep}")

    cmd = ["fastp", "-i", rs.raw_r1, "-I", rs.raw_r2, "-o", o1p, "-O", o2p, "-j", rep]
    logger.debug(f"Running command: {cmd}")

    subproc_raise(cmd)

    return FiltSample(
        sample=rs.sample,
        filt_r1=FlyteFile(path=str(o1p)),
        filt_r2=FlyteFile(path=str(o2p)),
        report=FlyteFile(path=str(rep)),
    )

There are a couple things to additionally note. First, the resource request in the task decorator bumps up memory for the pod to accommodate Fastp’s larger memory footprint. Second, the call to `subproc_raise` is used instead of a `ShellTask` to call a command-line process while also allowing us to keep things organized by unpacking the inputs and packing up the outputs in a dataclass. We’ll also use a map task to apply parallel processing across any number of samples in the workflow.

Finally, after laying that groundwork and collecting reports from these tools, we can pass them to a report-rendering task that takes advantage of MultiQC. MultiQC is an excellent way to visualize the results of myriad supported tools. After copying the various reports to pod storage, we can run MultiQC with this snippet from the larger Python function:

Copied to clipboard!
@task(container_image=multiqc_image_spec, disable_deck=False)
def render_multiqc(
    fqc: FlyteDirectory, filt_reps: List[FiltSample], sams: List[List[SamFile]]
) -> FlyteFile:
    final_report = ldir.joinpath("multiqc_report.html")
    mqc_cmd = ["multiqc", str(ldir), "-n", str(final_report)]

    logger.debug(f"Generating MultiQC report at {final_report} with command: {mqc_cmd}")
    subproc_raise(mqc_cmd)

    with open(final_report, "r") as f:
        report_html = f.read()

    current_context().default_deck.append(report_html)
    return FlyteFile(path=str(final_report))

Calling and approving this task in the workflow itself happens on one line:

Copied to clipboard!
approve_filter = approve(
    render_multiqc(fqc=fqc_dir, filt_reps=filtered_samples, sams=[]),
    "filter-approval",
    timeout=timedelta(hours=2),
)

Here we’ve imported `approve` from Flytekit and passed it to the function to pause for approval of the report. If the timeout is exceeded, the workflow will fail. 

The `render_multiqc` task is also defined with `disable_deck=False` in the decorator to render the Flyte Deck. We’ve then appended the raw HTML to the default deck. Decks — a powerful visualization tool built straight into the Flyte console — also appear in the Streamlit app below. 

With this one line, the workflow will compile and render the report, while pausing execution until an external actor manually verifies that everything looks good. Scientists can quickly inspect and approve these reports to get a bird’s eye view of the sequencing run, and highlight any issues before alignment starts. Once the pipeline is allowed to proceed with verifiably good data, it generates indices and aligns reads to compare performance with two different aligners. Finally, it generates a complete report containing the alignment statistics and saves it for future reference. To see a more complete breakdown of the downstream steps, please refer to the full alignment tutorial.

Creating a Streamlit app

The genomic alignments workflow is entirely manageable through the Streamlit UI. This means users can approve the workflow directly from Streamlit instead of interacting separately with Union.

Flyte remote has been employed to trigger executions and retrieve execution data programmatically. You can access the Streamlit app code here.

External Flyte inputs

The approval workflow isn't the only way to provide external inputs to Flyte. You can also use `sleep()` and `wait_for_input()` to pause executions and supply external inputs to the executions, respectively. All these constructs enable human feedback within the pipeline. Find documentation on implementing these features here: https://docs.flyte.org/projects/cookbook/en/latest/auto_examples/advanced_composition/waiting_for_external_inputs.html

In summary, Flyte's HITL feature is designed to cater to scenarios where a workflow execution needs to pause for a specified duration or until it receives external inputs unrelated to workflow execution. Thanks to Flyte's multi-tenancy support, multiple users can execute HITL pipelines concurrently, so numerous workloads can run simultaneously without disruptions. Because HITL is integral and native to Flyte, the entire process of building, executing and maintaining the pipelines becomes significantly more efficient.

Article