David Espejo

Good Systems Gone Bad: The Art of Performing at Scale

Picture this: Friday afternoon, you’re browsing the website of your usual event-ticketing portal trying to find something interesting to do this weekend: maybe a sports match or a concert. You finally find something and start the process of picking a location, paying and confirming your tickets. 

Sounds simple.

The reality is that something like an event-ticketing system has to be designed to handle multiple situations. What if many people are trying to reserve the same seats at the same time?  What if a much-expected concert causes an unprecedented surge in requests to the website? Also, event organizers want to maximize profit and sell all the ticket inventory as fast as possible. How can the platform inform them what locations were the most popular in a particular venue, so they plan accordingly for future events? And after the ticket is sold the work is still not over: The system has to guarantee that every customer is able to access their ticket reliably and with no issues. 

Who doesn’t love crowded concerts?
Photo by ActionVance on unsplash.com/photos/eXVd7gDPO9A 

During the infoQ 2022 conference, a team from SeatGeek went over the design decisions they had to make to manage these challenges. In summary, they are applying controls to throttle demand; handle concurrency; and, as a result, manage unpredictable but historically growing demand. In other words, they are making the system more scalable.

ML systems deal with similar—  and possibly bigger — scalability challenges. In these scenarios, failure to meet expectations under increased workload impedes business metrics and hinders the successful adoption of AI for production use cases.

The first step, then, should be describing how load is aggregated on ML systems.

How ML systems grow

A machine learning system can grow in different ways:

ML model count growth. It’s not uncommon to hear from organizations running thousands of DAGs or ML pipelines for multiple use cases and even multiple customers. For example, Gojek, an Indonesian company that operates more than 20 digital on-demand services in five countries, shared its story of running 1,000+ DAGs in production.

Growth in traffic volume. A surge in demand probably comes to mind most often when thinking about system growth.

Growth in complexity. This one is especially relevant to ML. Not only will a project will eventually run more workflows and more users, but each execution will generate an amount of metadata that should be captured and managed so ML engineers and data scientists can reproduce experiments consistently.

In this post, we’ll try to provide a framework to define scalability in the context of ML systems and their growth patterns, and describe how Flyte incorporates these ideas at different levels. 

What makes a system scalable?

If you think of scalability as the ability of a system to keep performing at an acceptable level under increasing load, you’re right — but only partially.

The problem with the traditional definition of scalability is that it leaves many aspects undefined:

  • What’s the cost of that “ability”?
  • What is the acceptable level of performance?
  • Is increasing load the only aspect that puts scalability to the test?

A study conducted by Weinstock, C. and Goodenough, J. (2006) from Carnegie Mellon University, proposed a more sophisticated approach to the subject: 

Scalability is the ability to handle increasing workloads by repeatedly applying a cost-effective strategy for extending a system’s capacity.”

Let’s unpack this.

This definition implies many things:

  • Something needs and can be done to improve the system’s ability to handle increasing workloads, that's what they call a “strategy”.
  • The strategy has to meet two requirements:
  • ~It can be applied repeatedly
  • ~It’s cost-effective
  • In the end, the strategy aims to extend the capacity of the system. 
Response Metric vs Demand for a Resource. 
Administrators of the system were able to take quick action twice to keep the response acceptable.
Eventually, the system reached a point where no reasonable strategy kept the response metric from taking on an unacceptable value Image from “On System Scalability,” Weinstock, C. and Goodenough, J. (2006), page 14.

In other words, scalability isn’t just a matter of fine-tuning some parameters of the system to make it handle more workloads or throttle resource demands. It requires mechanisms that can be executed multiple times by administrators at a low cost,  until the system reaches a point where the key performance metrics stop improving. How soon or late this happens is what makes a system more or less scalable. No system is infinitely scalable; sooner or later, it will reach a point where further efforts to improve performance will require a strategy that’s unfeasible and unacceptably expensive.

How Flyte handles growth

The Flyte platform comprises multiple components described in the Components Architecture section of Flyte’s documentation. Flyte is designed to anticipate a considerable amount and diversity of growth and, in consequence, introduces mechanisms at different levels to make the system cope with growth.

In this section, we list some of the main mechanisms (or strategies) that can be repeatedly applied on Flyte to improve scalability at a low cost, compared with the overall resource consumption of the system. That can give you an idea of how far you’d be from reaching a point of no further improvement on scalability by adopting a platform as robust as Flyte.

By-default scalability

Find in this section strategies where end users don’t have to perform any fine-tuning, but just leverage what Flyte brings by default.

Go routines

flytepropeller, the component that handles workflow executions, uses Go routines to run multiple workflows concurrently. A native Go mechanism to handle concurrency, it enables the Go runtime to schedule execution units running on the same address space without making requests to the base OS. 

GoRoutines only interact with the Go Runtime, working as a lightweight abstraction on top of the more expensive OS threads. 
Image from “Analysis of the Go runtime scheduler”, Desphande, N., et.al. Available at cs.columbia.edu/~aho/cs6998/reports/12-12-11_DeshpandeSponslerWeiss_GO.pdf 

Go routines are very different from traditional OS threads. In this context, threads serve as the substrate over which Go routines are scheduled, but with a much lower resource footprint: about 2 kilobytes of memory for a Go routine vs 1 megabyte for a thread

The way flytepropeller uses Go routines enables unparalleled concurrency at a low cost, improving scalability without requiring any changes to the code or the platform configuration.

Map Tasks

Flyte tasks run in containers. Booting up containers is usually fast, but if multiple requests start to aggregate, the container creation operation can quickly become a bottleneck.

Designed to handle parallelism, Map Tasks enable you to execute numerous instances of a Task within a single Workflow node (K8s Pod), without having to create a node for each instance. Use Map Tasks especially when there is a large number of runs that use identical code logic but different data, or when there is a need to process multiple data batches concurrently.

Find here a commented version of an example from the Map Task docs

Copied to clipboard!
from typing import List

from flytekit import Resources, map_task, task, workflow

# Define a task to be used in the Map Task
@task
def a_mappable_task(a: int) -> str:
    inc = a + 2
    stringified = str(inc)
    return stringified
# Define a task to reduce the mapped output to a string
@task
def coalesce(b: List[str]) -> str:
    coalesced = "".join(b)
    return coalesced
# The a_mappable_task is executed for each element in the list
@workflow
def my_map_workflow(a: List[int]) -> str:
    mapped_out = map_task(a_mappable_task)(a=a).with_overrides(
        requests=Resources(mem="300Mi"),
        limits=Resources(mem="500Mi"),
        retries=1,
    )
    coalesced = coalesce(b=mapped_out)
    return coalesced

Artifact versioning

An often unexplored aspect of scalability involves a system’s ability to manage a growing number of diverse artifacts generated during multiple execution cycles. 

Traditional software already enjoys this capability on Git-based systems, where maintaining hermetic versions of code makes it easy to work on branches, merge them and revert unwanted changes. For its part, Flyte achieves the seemingly impossible: versioning an entire ML workflow — which is basically a live (running) algorithm — to capture all dependencies necessary to reproduce a particular experiment reliably. It also enables users to change the structure of a workflow between versions, without worrying about the consequences for the pipelines in production

Screen capture of the Flyte console (UI), where the Workflow Version is visible. 

Native, reliable and massively scalable scheduler

One of the most common bottlenecks in ML orchestrators is the scheduler component. Under increased load, there’s a chance that higher latency causes missed schedules for recurring tasks.

Flyte’s native scheduler is designed as a robust system with multiple mechanisms to prevent missed schedules during periods of increased load or when recovering from failures.

Logical architecture diagram for Flyte’s native scheduler.
Steps in green mark the schedule execution path without failures. Steps in red mark additional steps performed when the Scheduler is not available

Let’s describe the life of a workflow schedule under normal conditions:

  1. A user controls the activation or deactivation of Launchplans which, in turn, activate or deactivate the associated schedules.
  2. Once activated by the user, the Schedule Management System is instructed to create a schedule. Flyte supports both fixed-rate and cron-based schedules.
  3. The GoCron wrapper, which uses and extends an open cron library for Go, reads the new schedule. 
  4. The GoCron wrapper creates Go routines for each schedule before submitting them to the executor. These are independent units of execution with a very low resource footprint, enabling a consistent operation of multiple schedules in parallel.
  5. Next, the executor creates a unique ID for each execution, based on the timestamp and the name of the schedule. In that way, the executor informs flyteadmin not only of the intended scheduled time but also the actual timestamp when the execution was created, ensuring a precise execution of both cron-based and fixed-rate schedules.
  6. The snapshotter subsystem reads the latest execution time for each schedule and updates a timestamp. Then it periodically persists in the relational database, a compacted version of the map of timestamps.
  7. Flyteadmin receives the schedule and triggers the actual workflow execution on the data plane.

If the scheduler goes down, the Catchup-all subsystem kicks in to:

  1. Read the last execution timestamp from the database or the snapshoter.
  2. Submit any missed schedule up to time.now to the GoCron wrapper to create the execution.

Schedules in Flyte are versioned and leverage the idempotent nature of flyteadmin to ensure that, even after recovering from failures, existing and new executions can be run in parallel without creating conflicts. Also, the hermetic versioning system in Flyte warrants only one active version of the schedule at any given time.

All these mechanisms guarantee that even during periods of intense load or after partial failures, no schedule will be missed.

On-demand parameters to extend scalability

In this section, we go over some of the Flyte parameters frequently used to extend its native ability to cope with load:

Max_parallelism

Diagram of a Workflow with a large fan-out

On a workflow execution, flytepropeller tries to evaluate as many nodes as possible on a single loop using a traversal breadth-first algorithm. That greedy nature can potentially turn it into a performance bottleneck. The max_parallelism parameter controls the maximum number of tasks that can be executed concurrently. 

As a way to throttle resource demand, it’s specially designed for workflows with a large fan-out: typically >1000 nodes. You can set it at the platform level, launch plan or even override it with a different value at the execution level (learn how to do it in the docs).

Bear in mind that if you’re using MapTasks in conjunction with max_paralellism, MapTasks will execute without respecting the max_parallelism set on the workflow as Map Tasks have their own parallelism controls that default to executing all of the subtasks at the same time. 

Workers

These are the units of compute resources that are available to process workflows. The larger the number, it means that more workflows can be evaluated in parallel. Monitoring is essential to determine the need to add more workers. flytepropeller exposes the flyte:propeller:all:free_workers_count metric that can be used with the Propeller Grafana dashboard. If that metric is decreasing, or as a preventative measure, the number of workers can be adjusted from the Helm values file:

For the flyte-core chart:

Copied to clipboard!
core:
    …
    # -- follows the structure specified [here](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/config).
    propeller:
      rawoutput-prefix: s3://my-s3-bucket/
      metadata-prefix: metadata/propeller
      workers: 4
      …

For the flyte-binary chart:
…
inline:
  core:
    propeller:
      workers: 4
…

Given that flytepropeller uses Go routines, adding more workers can be done repeatedly at a low cost. (Typically, 500 to 800 workers with 4 to 8 CPUs have shown consistently good performance).

Sharding Flytepropeller

Even if all the above is not enough to satisfy the scalability requirements of a platform built with Flyte; flytepropeller can be scaled out using reliable, low-cost techniques.

As described in a previous post, the default deployment strategy for flytepropeller is to have a single instance. Additional instances can be added, but this is a manual operation with a relatively high cost in terms of the resources needed to spin up a new Pod. 

What Flyte introduced was a horizontal scaling mechanism that:

  • Performs automated scale-out operations, orchestrated by a Propeller Manager
  • Guarantees workflow consistency with no corruption in case of failures
  • Lets users configure the sharding strategy and number of replicas

The available sharding strategies are:

  • Consistent hash. This is a technique to ensure an even distribution of load between Workflow CRDs and flytepropeller instances in charge of managing them. It also maintains a balanced load even during partial failures or when more flytepropeller instances are added.
  • Project and domain. This is a more deterministic option where the metadata of projects or domains is used to associate them to a particular shard.

Both options can be configured in the Helm chart:

For flyte-core:

Copied to clipboard!
…
flytepropeller:
  enabled: true
  manager: false #set to true
…
configmap:
  core:
      shard: #Sharding strategy. Use either shard, project, or domain
# add the parameters depending on the selected strategy as described in https://docs.flyte.org/en/latest/deployment/configuration/performance.html#automatic-scale-out

For flyte-binary, add the configuration under the inline section of the Helm values file. 

Conclusion and next steps

Scalability is one of the key aspects of designing infrastructure for data-intensive applications. Flyte approaches scalability by providing robust controls at different levels, ensuring not only the ability to cope with increased load, but doing so in a way that requires little intervention from administrators and, if needed, scaling strategies can be applied at a low cost.

What we explored in this post is only a subset of the features Flyte incorporates to handle concurrent and increased load.

To learn more, check out the Optimizing Performance section in the documentation.

Machine Learning
AI Orchestration