If you like what we're working on, please  star us on GitHub. This enables us to continue to give back to the community.

Time Series Predictive Maintenance of NASA Turbo Fan Engines — Data Validation using Deepchecks: Part 1

NASA Turbofan Jet Engine Data Set was shared on Kaggle for a Predictive Maintenance use-case challenge.

The dataset consists of engine settings and sensor readings from 26 sensors. The target variable is RUL (Remaining Useful Life) of the units of the turbo fan engines. RUL is equivalent of number of flights remained for the engine after the last datapoint in the test dataset. This is a multi-variate time series data set, and we are given train and test set for the evaluation of the model.

The goal of this post is to built a basic model for the RUL prediction, and the focus is on showing how to use deepchecks for data validation. We will see how the “checks” from Deepchecks modules provide us with important insights that help us for feature engineering. In this post, you will also learn how to create custom check for evaluating your data.

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from sklearn.preprocessing import OrdinalEncoder, MinMaxScaler, RobustScaler
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, cross_val_score, cross_val_predict, StratifiedShuffleSplit
from sklearn.ensemble import RandomForestClassifier
from imblearn.ensemble import BalancedRandomForestClassifier, BalancedBaggingClassifier, RUSBoostClassifier, EasyEnsembleClassifier
from imblearn.over_sampling import SMOTE, ADASYN, RandomOverSampler, BorderlineSMOTE, SVMSMOTE, KMeansSMOTE
from imblearn.under_sampling import TomekLinks, RandomUnderSampler, NearMiss, ClusterCentroids
from sklearn.pipeline import Pipeline
from sklearn.metrics import accuracy_score, classification_report, precision_score, f1_score, roc_auc_score, recall_score, confusion_matrix, roc_curve, precision_recall_curve
import pickle
from deepchecks.tabular.datasets.classification import iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from deepchecks.tabular.suites import default_suites
from deepchecks.tabular import Dataset
sns.set()

from deepchecks.tabular.suites import *
from deepchecks.tabular import Dataset

import deepchecks
deepchecks.__version__

# define filepath to read data
dir_path = './CMaps/'

# define column names for easy indexing
index_names = ['unit_nr', 'time_cycles']
setting_names = ['setting_1', 'setting_2', 'setting_3']
sensor_names = ['s_{}'.format(i) for i in range(1,22)] 
col_names = index_names + setting_names + sensor_names

# read data
train = pd.read_csv((dir_path+'train_FD001.txt'), sep='\s+', header=None, names=col_names)
test = pd.read_csv((dir_path+'test_FD001.txt'), sep='\s+', header=None, names=col_names)
y_test = pd.read_csv((dir_path+'RUL_FD001.txt'), sep='\s+', header=None, names=['RUL'])

test["RUL"] = y_test
# inspect first few rows
train.head()

Now once we have the train data, we are going to compute the RUL.

# Compute linear declining RUL is computed 
def add_remaining_useful_life(df):
    
    # Get the total number of cycles for each unit
    grouped_by_unit = df.groupby(by="unit_nr")
    max_cycle = grouped_by_unit["time_cycles"].max()
    
    # Merge the max cycle back into the original frame
    result_frame = df.merge(max_cycle.to_frame(name='max_cycle'), left_on='unit_nr', right_index=True)
    
    # Calculate remaining useful life for each row
    remaining_useful_life = result_frame["max_cycle"] - result_frame["time_cycles"]
    result_frame["RUL"] = remaining_useful_life
    
    # drop max_cycle as it's no longer needed
    result_frame = result_frame.drop("max_cycle", axis=1)
    return result_frame
  
train = add_remaining_useful_life(train)
train[index_names+['RUL']].head()

Now that we have the train data set along with the target variable, we can use Deepchecks data_integrity check to look at various aspects of the data validation.

## Create Deepchecks Dataset
df_dataset_train = Dataset(train, label='RUL', features=train.drop(["unit_nr","time_cycles","RUL"], axis=1))

#Run data_integrity check
data_integrity().run(df_dataset_train)

Let’s now see the results.

  • Displayed as absolute values.
  • NaN values (where the correlation could not be calculated) are displayed as 0.0, total of 30 NaNs in this display.

Note – data sampling: Data is sampled from the original dataset, running on 10000 samples out of 20631. Sample size can be controlled with the “n_samples” parameter.

Open source package for ml validation

Build Test Suites for ML Models & Data with Deepchecks

Get StartedOur GithubOur Github

In the data_integrity check above we see that setting_3, s_1, s_5, s_10, s_16, s_18, s_19 are having single value in all rows. Hence these columns are not much of use to us. Similarly, while looking at the descriptive statistics of the train data we see that the setting values are having very low standard deviation, hence it won’t be much of a contribution to predict the target, hence we will drop them.

# drop unwanted columns 
drop_labels = ['setting_1','setting_2','setting_3','s_1','s_5','s_6','s_10','s_16','s_18','s_19']

X_train = train.drop(drop_labels, axis=1)
X_test_interim = test.drop(drop_labels, axis=1)

Let’s look at the correlation matrix of the data.

import seaborn as sns
corr = X_train.corr()
mask = np.triu(np.ones_like(corr, dtype=bool))

# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(11, 9))

# Generate a custom diverging colormap
cmap = sns.diverging_palette(230, 20, as_cmap=True)
sns.heatmap(corr, mask=mask, cmap=cmap, vmax=.3, center=0,
            square=True, linewidths=.5, cbar_kws={"shrink": .5})

We see some of the important features include s_2, s_4, s_4, s_8, s_11. unit_nr and time_cycles is the index features and RUL is the target feature.

Now, as an example, let’s see how to build a custom check using Deepchecks package.

In this custom check we are going to build a check that runs through a particular column and filter by the index of the unit_nr and run a ADFuller test. The output of the test will include adf value, p-value, number of observations, critical values and IC best value. For now, the adf value and p-value are the most important. The more negative the test statistic, the more stationary the signal. When the p-value is below 0.05, it can be assumed the time-series is stationary.

import pandas as pd

from deepchecks.core import CheckResult, ConditionCategory, ConditionResult
from deepchecks.tabular import SingleDatasetCheck
from statsmodels.tsa.stattools import adfuller

class StationarityCheck(SingleDatasetCheck):
    """Description about the check. Name of the check will be the class name splitted in upper case letters."""

    def __init__(self, col_a: str, u_no: int, **kwargs):
        super().__init__(**kwargs)
        self.col_a = col_a
        self.u_no = u_no

    def run_logic(self, context, dataset_kind) -> CheckResult:
        dataset = context.get_data_by_kind(dataset_kind)

        # Get from the dataset the data
        data: pd.DataFrame = dataset.data

        # LOGIC HERE 
        col_a = self.col_a
        u_no = self.u_no
        
        test_series = data.loc[data['unit_nr']== u_no, col_a]
        adf, pvalue, usedlag, n_obs, critical_values, icbest = adfuller(test_series, maxlag=1)
        
        result = {
            'adf': adf,
            'pvalue':pvalue,
            'n_obs':n_obs,
            'critical_values':critical_values
        }
        
        return CheckResult(result, display="Results are displayed")

    def stationarity_test(self, threshold: float = 0.05):
        # Here we define the condition function, which accepts as input the `CheckResult.value`
        
        def condition(result):
            adf = result['adf']
            pvalue = result['pvalue']
            n_obs = result['n_obs']
            critical_values = result['critical_values']
            
            
            category = ConditionCategory.PASS if pvalue < 0.05 else ConditionCategory.FAIL
            
            if(category == ConditionCategory.FAIL):
                message = f'The af value is: {adf}' + f' \nThe time series is not stationary. P value is : {pvalue}' + f'\n The number of critical observations are : {n_obs}'
            else:
                message = 'No issues found'
            return ConditionResult(category,message)

        # Here we define the name of condition
        name = f'Stationarity of time series'
        # Now add it on the class instance
        return self.add_condition(name, condition)
    

See below the results of the test on s_2 and s_4. In sensor 2, the af value is -3.8 and the p-value is 0.00024 which means that the s_2 value is stationary and the test is significant. IN the sensor 4, the sf value is -2.05 and the p-value is 0.26 which means that the test is not significant.

Now, to make the time series values stationary, we will use this function.

def find_max_diff(series):
    maxdiff = 0
    do = True
    adf, pvalue, usedlag, nobs, critical_values, icbest = adfuller(series, maxlag=1)
    if pvalue < 0.05:
        do = False
    
    while do:
        maxdiff += 1
        adf, pvalue, usedlag, nobs, critical_values, icbest = adfuller(series.diff(maxdiff).dropna(), maxlag=1)
        if pvalue < 0.05:  # if significant, stop differencing and testing for stationarity
            do = False
    return maxdiff


def make_stationary(df_input, columns):
    df = df_input.copy()

    for unit_nr in range(1, df['unit_nr'].max()+1):
        for col in columns:
            maxdiff = find_max_diff(df.loc[df['unit_nr']==unit_nr, col])
            if maxdiff > 0:
                df.loc[df['unit_nr']==unit_nr, col] = df.loc[df['unit_nr']==unit_nr, col].diff(maxdiff)
    df.dropna(inplace=True)
    return df
    
     
remaining_sensors = X_train.columns.difference(index_names+['RUL'])
intermediate_df = make_stationary(X_train, remaining_sensors)

intermediate_df.head()  # stationary data!

Now we will create train, test set ready for a basic linear regression model.

from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score

# target variable
y_train = X_train.pop('RUL')
X_test = X_test_interim.groupby('unit_nr').last().reset_index()
X_test.drop("RUL", inplace=True, axis=1)

def evaluate(y_true, y_hat, label='test'):
    mse = mean_squared_error(y_true, y_hat)
    rmse = np.sqrt(mse)
    variance = r2_score(y_true, y_hat)
    print('{} set RMSE:{}, R2:{}'.format(label, rmse, variance))
    
    
# create and fit model
lm = LinearRegression()
lm.fit(intermediate_df[remaining_sensors], intermediate_df['RUL'])

# predict and evaluate
y_hat_train = lm.predict(intermediate_df[remaining_sensors])
evaluate(intermediate_df['RUL'], y_hat_train, 'train')

With this we get a baseline model and metrics as

train set RMSE:51.633756137911796, R2:0.43408640113852315

Now that we have a baseline model, we need to see how the model is performing. We will run a model validation check to inspect upon the predictions made by the model. We will run the RegressionErrorDistribution check on the linear model that we built.

df_dataset_train_new = Dataset(intermediate_df, label='RUL', features=intermediate_df[remaining_sensors])

from deepchecks.tabular.checks import RegressionErrorDistribution

result = RegressionErrorDistribution().run(df_dataset_train_new, lm)
result

The results from the RegressionErrorDistribution would like

You see the predictions values which are far away from the actual RUL. This is showing the top three over estimation and underestimation cases. You can tweak

RegressionErrorDistribution(n_top_samples: int=3, n_bins: int=40)

Now, let’s use another model validation function- RegressionSystematicError.

from deepchecks.tabular.checks import RegressionSystematicError

result2 = RegressionSystematicError().run(df_dataset_train_new, lm)
result2

We see that the mean of the model prediction error is -2.55, which shows slight systematic error.

The linear model is only considering the current state of the machine to predict the RUL, but there is more information hidden in the past values of the sensor value. Hence we would like to add lagged values of the sensor to cover the time-series aspect to the data.

def add_lag_features(df, lag_val, columns):
    temp = df.copy()
    for i in range(lag_val):
        lag_cols = [col + '_lag_{}'.format(i+1) for col in columns]
        temp[lag_cols] = temp.groupby('unit_nr')[columns].shift(i)
    
    temp.dropna(inplace=True)
    return temp
## Finding the right lagged_value 

# add lags and evaluate models to find optimal lag length
import statsmodels.api as sm

metrics = pd.DataFrame(columns=['rmse', 'AIC', 'BIC'])
nr_of_lags = 30
for i in range(0, nr_of_lags+1):
    X_train = add_lag_features(intermediate_df, i, remaining_sensors)
    X_train = X_train.drop(index_names, axis=1)
    y_train = X_train.pop('RUL')
    
    model = sm.OLS(y_train, sm.add_constant(X_train.values))
    result = model.fit()

    metrics = metrics.append(pd.DataFrame(data=[[np.sqrt(result.mse_resid), round(result.aic,2), round(result.bic,2)]],
                               columns=['rmse', 'AIC', 'BIC']),
                               ignore_index = True)

display(metrics)

 

 

Now, let’s plot and see where the does the curve flatten. (Change the “.r” according to where you find the plot to be flattening)

plt.figure(figsize=(15,5))
plt.plot(metrics['AIC'].diff(), marker='.')  # plot the difference to see where it flattens out
plt.plot(14, metrics['AIC'].diff()[15], '.r')
plt.xlabel("Nr of lags")
plt.ylabel("AIC rate of change")
plt.show()
plt.close()

Let’s now scale the data and add lags to both train and test data.

# train and evaluate model with 0 to n lags
lags = 15

from sklearn.preprocessing import StandardScaler

X_train_interim = train.drop(drop_labels, axis=1)
scaler = StandardScaler()
scaler.fit(X_train_interim[remaining_sensors])

# prep data

X_train_interim[remaining_sensors] = scaler.transform(X_train_interim[remaining_sensors])
X_train_interim = make_stationary(X_train_interim, remaining_sensors)
X_train_interim = add_lag_features(X_train_interim, lags, remaining_sensors)
X_train_interim = sm.add_constant(X_train_interim)
X_train = X_train_interim.drop(index_names, axis=1)
y_train = X_train.pop("RUL")

## Transform the X_test data

X_test_interim = test.drop(drop_labels, axis=1)
X_test_interim.drop("RUL", axis=1, inplace=True)
X_test_interim[remaining_sensors] = scaler.transform(X_test_interim[remaining_sensors])
X_test_interim = make_stationary(X_test_interim, remaining_sensors)
X_test_interim = add_lag_features(X_test_interim, lags, remaining_sensors)
X_test_interim = X_test_interim.groupby('unit_nr').last().reset_index()
X_test_interim = sm.add_constant(X_test_interim)
X_test = X_test_interim.drop(index_names, axis=1)

## Convert X_train and X_test to Deepchecks data
df_dataset_train = Dataset(X_train, label=y_train, features=X_train)
df_dataset_test = Dataset(X_test, label=y_test, features=X_test)

Now that we have the transformed and computer train and test data. We need to run a train_test_validation() check.

train_test_validation().run(train_dataset=df_dataset_train, test_dataset=df_dataset_test)

What we see here is that:

  • Feature Label Correlation has changed, the PPS score difference is above the threshold of 0.2.
  • The Train Test Feature Drift is over 0.2 for categorical and over 0.1 for numerical features.
  • The Train Test Label Drift is over 0.1 for numerical label we have here.
  • The Whole Dataset Drift shows that the drift value between the entire Train and Test dataset is over 0.25.

The viz results below are self-explanatory, where you can see details about the checks and graphs showcasing the values and distribution.

Additional Outputs

  • The Drift score is a measure for the difference between two distributions, in this check – the test and train distributions.
  • The check shows the drift score and distributions for the features, sorted by drift score and showing only the top 5 features, according to drift score.
  • If available, the plot titles also show the feature importance (FI) rank.

These checks and visualization are very helpful to understand and validate the data before you start building the model. Once you have this information in hand, you can explore treatments and transformations you can do in the data to correct these issues.

In the next blog, we will take about exploring the data validation treatments.

Stay tuned !

Subscribe to Our Newsletter

Do you want to stay informed? Keep up-to-date with industry news, the latest trends in MLOps, and observability of ML systems.

Related articles

How to Automate Data Drift Thresholding in Machine Learning
How to Automate Data Drift Thresholding in Machine Learning
Best Practices for Computer Vision Model Deployment
Best Practices for Computer Vision Model Deployment
Benefits of MLOps Tools for ML Data
Benefits of MLOps Tools for ML Data