7

Netflix's Metaflow: Reproducible machine learning pipelines

 3 years ago
source link: https://www.cortex.dev/post/reproducible-machine-learning-pipelines-with-metaflow-and-cortex
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Netflix's Metaflow: Reproducible machine learning pipelines

From training to deployment with Metaflow and Cortex

Caleb Kaiser
Caleb Kaiser
Founding Team @ Cortex Labs

If we were to design an optimal machine learning pipeline, it would be:

  • Scalable. As workloads increased, it would scale up without issue.
  • Reproducible. We would be able to draw a line from any model to its data.
  • Configurable. It wouldn’t lock us into particular frameworks or tools.

Typically, pipelines will tradeoff in at least one of these areas. A pipeline might be scalable, but will rely on a platform that puts limits on data scientists. Or, a pipeline will be completely configurable, but will also be glued together by a mess of ad hoc code and will be impossible to reproduce.

In this piece, I want to introduce a way to build this kind of ideal pipeline without any tradeoffs. To do this, we’ll be using Metaflow, the open source data science framework from Netflix, and Cortex, our open source deployment platform for machine learning.

Let’s start by defining our pipeline.

Defining a pipeline in Metaflow

Metaflow is a data science framework that provides a single API for managing different pieces of the infrastructure stack. It places an emphasis on scalability, reproducibility, and usability.

At a high level, Metaflow allows us to define pipelines as DAGs, called “flows,” in which data undergoes a sequence of transformations called ”steps.” These steps persist transformed data as “data artifacts,” which are accessible by subsequent steps throughout the flow.

For example, say we had a training flow that loaded data (probably produced by another flow), trained multiple models with different strategies, evaluated the different models, and saved the top performer:

import os

from metaflow import FlowSpec, current, step

def select_model(a, b): if a > b: return a return b

class TrainingFlow(FlowSpec): """ The flow performs the following steps: 1) Loads the new data from NewDataFlow and initializes a new model. 2) In parallel branches: - A) Fits the model with approach A. - B) Fits the model with approach B. 3) Evaluates both models and exports the best. """

@step def start(self): """ Use the Metaflow client to retrieve the latest successful run from our NewDataFlow, then initialize new model. """

self.next(self.train_approach_a, self.train_approach_b)

@step def train_approach_a(self): """ Train model A """

self.model = 42 # Store resulting trained model self.next(self.join)

@step def train_approach_b(self): """ Train model B """

self.model = 43 # Store resulting trained model self.next(self.join)

@step def join(self, inputs): """ Pick the best of both models """

self.model = select_model(inputs.train_approach_a.model, inputs.train_approach_b.model) # Evaluate best model

self.model_artifact = '/'.join([current.pathspec, 'model']) self.next(self.end)

@step def end(self): print("All done!")

if __name__ == '__main__': TrainingFlow()

This is just a snapshot of our full pipeline, which I’ll be adding to in the next section, but even with just this snippet we have a repeatable training pipeline that can scale to run on many machines. We also have, thanks to Metaflow’s Client API, a way to version, audit, and reproduce these training runs.

For example, to instantiate a given step from a previous flow, we can simply pass in the flow name, run id, and step name to the Metaflow Client:

from metaflow import Step step = Step('TrainingFlow/4/join')

To access an artifact from a particular run, the logic is very similar:

from metaflow import Step print(Step('TrainingFlow/4/join').task.data.model_artifact)

This means that every time a flow is executed, Metaflow automatically versions and records it using a standard taxonomy. As a result, we can trace any given model’s lineage from raw data to final export.

There is much more to Metaflow, and I’d encourage you to check out their documentation to learn more, but as an introduction, this should serve to get us started.

Now, let’s talk a bit about triggering deployments in Metaflow.

Deploying models with Cortex

In this section, I’m going to take our training flow from before and add a step for deploying our model as a production API on AWS. To do this, we’re going to use Cortex.

Cortex is a deployment platform for machine learning. On the surface, it provides simple interfaces for building prediction services, deploying them to production, and managing an inference cluster.

Under the hood, Cortex automates all of the cloud infrastructure needed for inference—autoscaling, GPU/ASIC support, load balancing, prediction tracking, etc—and implements a automated deployment process in which model serving code is packaged, versioned, and deployed to the cluster.

We can trigger a deployment using Cortex’s Python client within our training flow like this:

import os

from metaflow import FlowSpec, current, step import cortex

def select_model(a, b): if a > b: return a return b

class TrainingFlow(FlowSpec): """ The flow performs the following steps: 1) Loads the new data from NewDataFlow and initializes a new model. 2) In parallel branches: - A) Fits the model with approach A. - B) Fits the model with approach B. 3) Evaluates both models and exports the best. """

@step def start(self): """ Use the Metaflow client to retrieve the latest successful run from our NewDataFlow, then initialize new model. """

self.next(self.train_approach_a, self.train_approach_b)

@step def train_approach_a(self): """ Train model A """

self.model = 42 # Store resulting trained model self.next(self.join)

@step def train_approach_b(self): """ Train model B """

self.model = 43 # Store resulting trained model self.next(self.join)

@step def join(self, inputs): """ Pick the best of both models """

self.model = select_model(inputs.train_approach_a.model, inputs.train_approach_b.model) # Evaluate best model

self.model_artifact = '/'.join([current.pathspec, 'model']) self.next(self.deploy)

@step def deploy(self): """ Deploy model to Cortex cluster """

cortex_client = cortex.client("aws")

api_config = { "name": "api-classifier", "kind": "RealtimeAPI", "predictor": { "type": "python", "path": "predictor.py", "config": { "model_artifact": self.model_artifact }, "env": { "USERNAME": "cortex", "METAFLOW_DATASTORE_SYSROOT_S3": "XXX", "METAFLOW_DEFAULT_DATASTORE": "s3", "METAFLOW_DEFAULT_METADATA": "service", "METAFLOW_SERVICE_URL": "XXX", } } }

self.deployments = cortex_client.deploy(api_config, project_dir=".")

self.next(self.end)

@step def end(self): print("All done!")

if __name__ == '__main__': TrainingFlow()

You’ll notice the client includes a deploy() method, which takes a configuration object for defining our API. This configuration works with the Metaflow client to extract the location of the model, and the metadata of the flow for logging purposes. Now, when we audit our deployments, we can connect it all the way back to the run that produced it, extending our lineage from data to deployment.

The configuration object also references a predict.py script, which is where the actual prediction service is defined. A Cortex predictor looks like this:

# Import whatever dependencies your API needs import os from metaflow import DataArtifact os.environ['METAFLOW_USER'] = 'cortex' class PythonPredictor: def __init__(self, config): # Initialize your model here print("Got config to be %s" % str(config)) self.model = DataArtifact(config['model_artifact']).data def predict(self, payload): # Payload is the request your endpoint receives prediction = self.model return prediction

The structure is very simple. We initialize our model in the init() function, which runs on initial deployment, and we generate predictions in the predict() function. Similar to steps in Metaflow, these Python methods can contain whatever logic you want to implement.

Now, when we run the flow, the model will be trained, evaluated, and deployed to production with zero downtime or extra configuration needed.

Because Cortex provides native support for A/B testing and traffic splitting, we can even run complex deployment strategies without breaking Metaflow’s lineage.

For example, if after selecting a best model, we wanted to test how the model performed in different formats—say ONNX vs TensorFlow—we could export two versions of the model, deploy them both in an A/B test, and log their performance. Because our training flow is connected to our deployment, we can then pass this information back and forth between Cortex and Metaflow without issue.

An easier path to production machine learning

Over the years, a number of end-to-end data science platforms have been released, and most of them fall into the same traps:

  • Providing a smooth interface, with zero transparency into what’s happening under the hood, killing reproducibility and auditing.
  • Solving one part of the stack well, like training, but “bolting on” under-developed solutions for other parts, like deployment.
  • Locking data scientists and machine learning engineers into a narrow stack by only supporting specific frameworks and integrations.

The result is a platform that makes production machine learning easy—if you stay strictly within the confines of the system. When you have a diverse set of problems to solve, however, this is difficult to do.

Metaflow and Cortex represent a fundamentally different, human-centric approach. The emphasis is not on providing a magic solution to a narrow set of problems, but on providing an easy interface for building solutions to any problem.

If you’re interested in digging into either platform, check out the links below:

Interested in production machine learning?


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK