Introduction
Have you ever heard people speaking about their CI/CD pipelines and wondered if there is something like that for Machine Learning? Want to be able to automatically run tests on your ML models every time the data or models change?
In this article, I’m going to explain how you can validate your Machine Learning models with Apache Airflow and the Deepchecks validation package right before your model is built successfully in the CI/CD pipeline. Assuming your model is built on an Airflow pipeline, this article will demonstrate how you can integrate the rich test suites of Deepchecks into the workflows.
What is Apache Airflow?
From the docs:
“Airflow is a platform to programmatically author, schedule, and monitor workflows. Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. The Airflow scheduler executes your tasks on an array of workers while following the specified dependencies. Rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.”
Basically, Airflow is a tool that helps run workflows consisting of multiple stages (a DAG). It is responsible for the scheduling and the orchestration of the DAGs runs. The DAGs are defined with Python code and as a result, the workflows become more maintainable, versionable, testable, and collaborative. For more info about DAGs, visit Airflow Docs.
Defining our DAG
Below is an example DAG definition that validates a model using Airflow and Deepchecks, and its correspondent DAG on the Airflow platform:
with DAG( dag_id="deepchecks_airflow_integration", schedule_interval="@daily", default_args={ "owner": "airflow", "retries": 1, "retry_delay": timedelta(minutes=5), "start_date": datetime(2021, 1, 1), }, catchup=False, ) as dag: load_adult_dataset = PythonOperator( task_id="load_adult_dataset", python_callable=load_adult_dataset ) integrity_report = PythonOperator( task_id="integrity_report", python_callable=dataset_integrity_step ) load_adult_model = PythonOperator( task_id="load_adult_model", python_callable=load_adult_model ) evaluation_report = PythonOperator( task_id="evaluation_report", python_callable=model_evaluation_step ) load_adult_dataset >> integrity_report load_adult_dataset >> load_adult_model >> evaluation_report
As it can be seen, this DAG defines 2 validation steps:
- To validate the data – we define a dataset integrity step that is being called after the data is loaded.
- To evaluate the model – we define a model evaluation step that is being called after the data and the pre-trained model are loaded.
In the next section, we will explain how to declare such steps using Deepchecks, the open-source Python library for ML validation.
What is Deepchecks?
In short, Deepchecks is an open-source Python library for testing ML/DL models and data. The library can help us out with various testing and validation needs throughout our projects — we can verify the data’s integrity, inspect the distributions, confirm valid data splits (for example, the train/test split), evaluate the performance of our model, and more!
The Deepchecks package contains many different checks – that perform a single check on the data and model (for example, detecting a feature drift between the train and the test data), and suites – which are an ordered collection of checks. The suite object enables displaying a concluding report for all of the checks that ran. Deepchecks already comes with some built-in suites like the data integrity suite, the model evaluation suite, and more. Check the full list of built-in suites here.
Deepchecks & Airflow
In this article, we will use Deepchecks in an Airflow workflow in order to validate a model. Our model will be a simple RandomForest model that is trained on the well-known adult dataset.
We will use the integration tutorial provided in the Deepchecks docs. Deepchecks can be used within an Airflow workflow stage and run a suite in order to validate a model.
Validating the Integrity of the Training Data
We can define a workflow stage that will validate the integrity of our training data, using the built-in integrity suite. The below snippet demonstrates just that:
def dataset_integrity_step(**context) from deepchecks.tabular.suites import single_dataset_integrity from deepchecks.tabular.datasets.classification.adult import _CAT_FEATURES, _target from deepchecks.tabular import Dataset adult_train = pd.read_csv(context.get("ti").xcom_pull(key="train_path")) ds_train = Dataset(adult_train, label=_target, cat_features=_CAT_FEATURES) train_results = single_dataset_integrity().run(ds_train) try: os.mkdir('suite_results') except OSError: print("Creation of the directory {} failed".format(dir_path)) run_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") train_results.save_as_html(os.path.join(dir_path, f'train_integrity_{run_time}.html'))
The output of the stage is an HTML report of the suite, which looks like this:
Validating a Model
Now, we will define a workflow stage that validates the performance of a pre-trained model. We will use the model_evaluation suite for that.
def model_evaluation_step(**context) from deepchecks.tabular.suites import model_evaluation from deepchecks.tabular.datasets.classification.adult import _CAT_FEATURES, _target from deepchecks.tabular import Dataset adult_model = joblib.load(context.get("ti").xcom_pull(key="adult_model")) adult_train = pd.read_csv(context.get("ti").xcom_pull(key="train_path")) adult_test = pd.read_csv(context.get("ti").xcom_pull(key="test_path")) ds_train = Dataset(adult_train, label=_target, cat_features=_CAT_FEATURES) ds_test = Dataset(adult_test, label=_target, cat_features=_CAT_FEATURES) evaluation_results = model_evaluation().run(ds_train, ds_test, adult_model) run_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") evaluation_results.save_as_html(os.path.join(dir_path, f'model_evaluation_{run_time}.html'))
This will result in the following report:
Wrapping Up
In this short article, we demonstrated an approach to validating ML models and data using Airflow and Deepchecks. We have defined 2 Airflow stages that validate different aspects of the model building pipelines: First, we checked the data for integrity issues, and later evaluated the model performance.
Feel free to use the snippets provided here in your projects and workflows, I can’t wait to hear what it would find!
In order to understand more, and to download the code in the article, please visit the following documentation page in Deepchecks docs.