Light Dark

Universal Config-Driven MLOps Framework

A Generalized Pipeline for Machine Learning Systems

📅 Last updated: March 9, 2026  ·  👤 Vansh Modi

1. Abstract

Machine learning models deployed in production environments suffer from performance degradation over time due to data drift, concept drift, seasonal patterns, and evolving business conditions. Existing MLOps solutions are often tightly coupled to specific frameworks, model types, or infrastructure choices, making them difficult to generalize across diverse real-world use cases.

This document proposes a Universal Config-Driven MLOps Framework — a generalized, configuration-first pipeline architecture designed to support multiple machine learning paradigms including Classification, Regression, Time-Series Forecasting, and Deep Learning models — without requiring code-level changes when switching between tasks.

The framework is built around a central config.yaml file that governs every stage of the ML lifecycle: from data ingestion and versioning through feature engineering, model training, evaluation, explainability, deployment, and continuous monitoring. Orchestration is handled by Apache Airflow, experiment tracking and model registry by MLflow, data versioning by DVC, model explainability by SHAP, and CI/CD automation via GitHub Actions.

The pipeline incorporates automated drift detection, threshold-based retraining triggers, canary and blue-green deployment strategies, and a feature store layer to prevent training-serving skew. All components are observable through a unified monitoring layer tracking prediction drift, feature distribution shifts, and business KPIs in real time.

KEY CONTRIBUTIONS
  • A single YAML configuration that drives the entire ML pipeline without code changes
  • A multi-paradigm training strategy supporting sklearn, XGBoost, LightGBM, and PyTorch
  • An integrated data versioning layer (DVC) for full reproducibility
  • A feature store between feature engineering and training to eliminate training-serving skew
  • Automated drift detection and retraining with configurable thresholds
  • End-to-end CI/CD pipeline via GitHub Actions for continuous delivery
16
Pipeline Stages
7+
Integrated Tools
4
ML Paradigms
1
Config File Controls All
3
Deployment Strategies
Reproducible Experiments

Keywords: MLOps, Config-Driven Pipelines, Apache Airflow, MLflow, DVC, SHAP, Feature Store, Model Drift Detection, CI/CD, Reproducibility, AutoML, Production ML

2. Introduction

2.1 Motivation

Machine learning models degrade over time due to:

⚠ Data Drift

Input feature distributions change over time, causing model assumptions to break.

⚠ Concept Drift

The relationship between features and target variable evolves, reducing prediction accuracy.

🔍 Seasonal Changes

Cyclical patterns in data make static models unreliable across different periods.

📊 Business Evolution

Evolving business conditions alter the relevance and distribution of key features.

Without monitoring and retraining mechanisms, model performance declines.

2.2 MLOps Concept

MLOps applies DevOps principles to machine learning systems, enabling:

Capability Description
Automation Automated training and deployment pipelines
Reproducibility Full experiment tracking and artifact versioning
Monitoring Continuous production performance evaluation
Governance Controlled model promotion with approval gates

3. System Architecture Overview

Pipeline Architecture Flow

📊 Data Sources
🔍 Data Ingestion
📂 Data Versioning (DVC)
✅ Data Validation
⚙ Feature Engineering
🗄 Feature Store
✂ Data Splitting
📊 Model Training
📊 Model Evaluation
├── Metrics (MAPE, WAPE, RMSE)
└── Explainability (SHAP)
📊 Approval Gate + Human Review
📊 Model Registry (MLflow)
📊 Deployment
📊 Monitoring
🔍 Retraining Trigger (Airflow)

Layer Architecture Diagram

DATA LAYER ├ Data Sources ├ Data Ingestion └ Data Versioning FEATURE LAYER ├ Feature Engineering └ Feature Store TRAINING LAYER ├ Training ├ Evaluation └ Explainability GOVERNANCE LAYER ├ Approval Gate └ Human Review SERVING LAYER ├ Model Registry ├ Deployment └ Monitoring

Architecture Components

Component Function
Airflow Workflow orchestration
MLflow Experiment tracking and model registry
Python ML Modules Training and evaluation logic
Monitoring Module Production performance tracking
Explainability Module Model interpretation via SHAP
Feature Store Versioned offline & online features
Data Versioning Dataset version control (DVC)
Approval Gate Automated quality enforcement
Human Review Layer Manual validation and final approval

4. Configuration-Driven Pipeline Design

The pipeline is configuration-driven to support multiple ML scenarios without code changes.

Example Configuration

pipeline: problem_type: classification model_type: xgboost training: optimizer: adam learning_rate: 0.001 batch_size: 32 epochs: 50 evaluation: primary_metric: f1_score threshold: 0.8

5. Data Ingestion Layer

Collect raw data from external systems into the pipeline.

Supported Data Sources

Source Type Examples
Databases PostgreSQL, MySQL, MongoDB
Data Warehouses Snowflake, BigQuery, Redshift
Streaming Systems Apache Kafka, Kinesis
Files CSV, Parquet, JSON, Avro

Ingestion Flow

External Source
Extract
Load
Raw Dataset

Data Versioning Layer

Between Data Ingestion and Validation, production pipelines require data versioning to ensure reproducibility and rollback capabilities.

Tool Purpose
DVC File-level dataset versioning, pipeline caching, remote storage
Delta Lake Versioned tables with ACID transactions for large-scale data

6. Data Versioning — DVC

Data Version Control (DVC) sits between Data Ingestion and Data Validation. It tracks dataset versions, pipeline stages, and experiment artifacts, enabling full reproducibility and rollback.

Why Data Versioning?

📂

Reproducibility

Every model version is tied to an exact dataset version — retrain any experiment identically.

↩️

Rollback

If a new dataset degrades model performance, roll back to a previous data version instantly.

🔍

Auditability

Track who changed what data and when — critical for regulated environments.

DVC Core Workflow

Raw Data Arrives
dvc add data/
git commit .dvc file
dvc push (remote)
Data Validation

DVC Setup & Initialization

# Install DVC pip install dvc dvc-s3 # or dvc-gcs / dvc-azure # Initialize in your Git repo git init dvc init # Configure remote storage (S3 example) dvc remote add -d myremote s3://my-ml-bucket/dvc-store dvc remote modify myremote region us-east-1 # Commit DVC config git add .dvc/config git commit -m "Initialize DVC with S3 remote"

Versioning a Dataset

# Track a raw data file with DVC dvc add data/raw/ad_sales.csv # DVC creates: data/raw/ad_sales.csv.dvc (pointer file) # Add pointer to Git, ignore actual data in Git git add data/raw/ad_sales.csv.dvc .gitignore git commit -m "Dataset v1.0: ad_sales Jan-Dec 2024" dvc push # Upload data to remote storage # Later — new data arrives (Jan 2025) dvc add data/raw/ad_sales.csv # updates .dvc pointer git add data/raw/ad_sales.csv.dvc git commit -m "Dataset v2.0: add Jan 2025 data" dvc push

Reproducing Any Data Version

# Checkout a specific Git commit → DVC restores that dataset version git checkout <commit-hash> dvc checkout # pulls correct data from remote storage # Or use Git tags for named versions git checkout dataset-v1.0 dvc checkout

DVC Pipeline Stages

DVC can also version the full pipeline — not just data but every transformation step:

# dvc.yaml — define pipeline stages stages: ingest: cmd: python src/ingest.py deps: - src/ingest.py outs: - data/raw/ preprocess: cmd: python src/preprocess.py deps: - src/preprocess.py - data/raw/ outs: - data/processed/ params: - params.yaml: - preprocess.normalize - preprocess.fill_strategy train: cmd: python src/train.py deps: - src/train.py - data/processed/ outs: - models/sarimax_model.pkl metrics: - metrics/eval.json
# Run the full pipeline (only re-runs changed stages) dvc repro # View pipeline DAG dvc dag # Compare metrics across experiments dvc metrics show dvc metrics diff HEAD~1

DVC Key Commands Reference

Command Purpose
dvc init Initialize DVC in a Git repository
dvc add <file> Track a data file or directory
dvc push Upload cached data to remote storage
dvc pull Download data for the current Git commit
dvc checkout Restore data files to match current .dvc pointers
dvc repro Re-run pipeline stages that have changed
dvc dag Visualize pipeline dependency graph
dvc metrics show Display tracked evaluation metrics
dvc metrics diff Compare metrics between commits

Integration with MLflow

DVC handles data versioning; MLflow handles experiment tracking. Together they form a complete reproducibility stack:

Concern Tool What It Tracks
Data Version DVC Dataset hash, remote path, Git commit
Code Version Git Source code, configs, .dvc pointer files
Experiment MLflow Parameters, metrics, model artifacts
import mlflow, subprocess # Log the exact DVC data version alongside the MLflow run with mlflow.start_run(): # Capture current Git commit (which pins the .dvc pointer) git_hash = subprocess.check_output(['git', 'rev-parse', 'HEAD']).decode().strip() mlflow.set_tag('data.git_commit', git_hash) mlflow.set_tag('data.dvc_path', 'data/processed/') # ... training code ... mlflow.log_metric('wape', wape_score) mlflow.sklearn.log_model(model, 'sarimax_model')

7. Data Validation

Data validation ensures input data quality before training.

Rule Description
Schema Validation Required columns exist with correct types
Missing Value Detection Flags incomplete records above threshold
Range Checks Detects values outside expected bounds
Drift Detection Distribution change vs. training baseline

Drift Detection Methods

Method Description
Population Stability Index (PSI) Distribution comparison between datasets
KL Divergence Probability distribution divergence measure

PSI Interpretation

PSI Value Interpretation
< 0.1 Stable
0.1 – 0.25 Moderate Drift
> 0.25 Significant Drift

8. Feature Engineering

Feature engineering transforms raw data into model-ready features.

Technique Example
Normalization Min-max or Z-score scaling of numeric features
Encoding One-hot, label, or target encoding of categoricals
Aggregation Grouped statistics (mean, std, count)
Lag Features Time-series historical values

Feature Store

A Feature Store sits between Feature Engineering and Training to prevent training-serving skew and manage features as independent assets.

Tool Purpose
Feast Feature store
Tecton Managed feature platform
Redis Online feature serving

Architecture Component:

  • Versioned features: Ensures identical features are used across experiments.
  • Offline training features: High-throughput batch serving for training models.
  • Online inference features: Low-latency serving for real-time predictions.

8. Data Splitting Strategy

Different ML tasks require fundamentally different splitting strategies. Incorrect splitting can lead to data leakage, overfitting, and misleading evaluation results. Choosing the right strategy is critical for building trustworthy models.

Why Data Splitting Matters

📊

Unbiased Evaluation

Test data must never influence training — ensures honest performance estimates.

🔍

Prevent Data Leakage

Information from the future or test set must not leak into training features.

📊

Generalization

Models must perform well on unseen data, not just memorize the training set.

Splitting Strategies by Problem Type

Strategy Problem Type Key Principle When to Use
Random Split Classification, Regression Randomly shuffle and divide data i.i.d. data with no temporal dependency
Stratified Split Classification (imbalanced) Preserve class distribution in each split When target classes are unevenly distributed
Temporal Split Time Series Split by time — no future data in training Forecasting, any time-dependent data
Group Split Any (grouped data) Keep all samples from same group in one split Patient data, user sessions, store-level data
K-Fold CV Any (small datasets) Rotate train/test across K partitions Limited data where every sample matters

Classification / Regression Split

Standard Train-Val-Test Split

Dataset % Purpose
Train 70% Model learns patterns from this data
Validation 15% Hyperparameter tuning and early stopping
Test 15% Final unbiased performance evaluation

Stratified Splitting

When classes are imbalanced (e.g., 95% negative, 5% positive), random splitting may produce splits with zero minority class samples. Stratified splitting ensures each subset has the same class ratio as the full dataset.

from sklearn.model_selection import train_test_split X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.15, stratify=y, # preserve class distribution random_state=42 )

⚠ Warning Never use stratified split for time-series data — it violates temporal ordering.

Time Series Splitting

Time-series data has temporal dependency — future data cannot be used to predict the past. Random shuffling would cause catastrophic data leakage.

📊 Past Data (Train)
📊 Validation Window
🔍 Future (Test)

Forward-Chaining (Expanding Window) Validation

Training window expands over time. Each fold adds more historical data while testing on the next unseen period.

Fold 1: [Train: Jan-Mar] → [Test: Apr] Fold 2: [Train: Jan-Apr] → [Test: May] Fold 3: [Train: Jan-May] → [Test: Jun] Fold 4: [Train: Jan-Jun] → [Test: Jul] Fold 5: [Train: Jan-Jul] → [Test: Aug]

Sliding Window Validation

Fixed-size training window slides forward. Useful when older data becomes less relevant.

Fold 1: [Train: Jan-Mar] → [Test: Apr] Fold 2: [Train: Feb-Apr] → [Test: May] Fold 3: [Train: Mar-May] → [Test: Jun] Fold 4: [Train: Apr-Jun] → [Test: Jul]
from sklearn.model_selection import TimeSeriesSplit tscv = TimeSeriesSplit(n_splits=5) for train_idx, test_idx in tscv.split(X): X_train, X_test = X[train_idx], X[test_idx] y_train, y_test = y[train_idx], y[test_idx] model.fit(X_train, y_train) score = model.score(X_test, y_test)

9. Model Training

The training layer supports multiple algorithm types.

Supported Algorithms

Problem Type Algorithms
Classification Logistic Regression, Random Forest, SVM, XGBoost
Regression Linear Regression, XGBoost, LightGBM
Time Series ARIMA, SARIMAX, Prophet
Deep Learning CNN, LSTM, Transformer

How Does the Model Know Its Error? — Loss Functions

During training, the model uses a loss function (also called cost function or objective function) to measure how far its predictions are from the actual values. The optimizer then adjusts model weights to minimize this loss.

The Training Loop:

📊 Input Batch
📊 Forward Pass (predict)
📊 Loss = Error(predicted, actual)
🔍 Backward Pass (gradients)
⚙ Optimizer Updates Weights

↻ Repeat for every batch × every epoch until loss converges

Loss Functions by Problem Type

Problem Type Loss Function Formula / Description When to Use
Classification Binary Cross-Entropy $\mathcal{L} = -\bigl[y \cdot \log(\hat{y}) + (1-y) \cdot \log(1-\hat{y})\bigr]$ Binary classification (2 classes)
Categorical Cross-Entropy $\mathcal{L} = -\sum_{i=1}^{C} y_i \cdot \log(\hat{y}_i)$ Multi-class classification
Focal Loss $\mathcal{L} = -\alpha_t (1 - p_t)^\gamma \cdot \log(p_t)$ Severely imbalanced classes
Regression Mean Squared Error (MSE) $\text{MSE} = \dfrac{1}{n}\sum_{i=1}^{n}(y_i - \hat{y}_i)^2$ Default regression — penalizes large errors heavily
Mean Absolute Error (MAE) $\text{MAE} = \dfrac{1}{n}\sum_{i=1}^{n}|y_i - \hat{y}_i|$ Robust to outliers
Huber Loss $L_\delta = \begin{cases} \frac{1}{2}(y-\hat{y})^2 & |y-\hat{y}| \le \delta \\ \delta|y-\hat{y}| - \frac{1}{2}\delta^2 & \text{otherwise} \end{cases}$ Best of both — smooth + robust
Time Series MSE / RMSE $\text{RMSE} = \sqrt{\dfrac{1}{n}\sum_{i=1}^{n}(y_i - \hat{y}_i)^2}$ Standard time-series forecasting
Log-Cosh Loss $\mathcal{L} = \sum_{i=1}^{n} \log\bigl(\cosh(\hat{y}_i - y_i)\bigr)$ Smooth approximation to MAE, handles outliers
Deep Learning Cross-Entropy + Softmax Softmax output → CE loss Neural network classifiers
Contrastive / Triplet Loss Learns similarity between embeddings Siamese networks, embedding learning

Loss Convergence During Training

The optimizer's goal is to reach the minimum loss. Training loss should decrease over epochs. If validation loss starts rising while training loss keeps dropping, the model is overfitting.

10. Model Evaluation

Evaluation metrics depend on problem type.

Classification Metrics

Metric Formula / Description
Accuracy $\text{Accuracy} = \dfrac{\text{TP} + \text{TN}}{\text{TP} + \text{TN} + \text{FP} + \text{FN}}$
Precision $\text{Precision} = \dfrac{\text{TP}}{\text{TP} + \text{FP}}$
Recall $\text{Recall} = \dfrac{\text{TP}}{\text{TP} + \text{FN}}$
F1 Score $F_1 = 2 \cdot \dfrac{\text{Precision} \cdot \text{Recall}}{\text{Precision} + \text{Recall}}$
ROC-AUC $\text{AUC} = \int_0^1 \text{TPR}(\text{FPR}^{-1}(x))\, dx$

11. Explainability Layer

Explainability improves model transparency and builds stakeholder trust.

SHAP (SHapley Additive exPlanations)

SHAP values explain the contribution of each feature to a prediction:

$$f(x) = \varphi_0 + \sum_{i=1}^{M} \varphi_i$$

Where: $\varphi_0$ = base value (average model prediction), $\varphi_i$ = SHAP contribution of feature $i$

SHAP Feature Importance

Plot Type Purpose
Summary Plot Global feature importance across all predictions
Waterfall Plot Step-by-step single prediction explanation
Force Plot Feature push/pull visualization for one prediction

Explainability Types

Type Visualization Use Case
Global Summary plot, feature ranking Model validation, governance
Local Waterfall plot, force plot Single prediction explanation

Why SHAP?

⚠ SHAP is NOT Causal

SHAP explains how the model uses features — not what happens if you change a real-world feature. Causal inference requires A/B testing, DAGs, propensity score matching, or do-calculus.

12. Approval Gate & Human-in-the-Loop

The Approval Gate is a critical control checkpoint between model evaluation and model registry. It enforces quality standards and optionally routes the decision to a human reviewer.

12.1 Purpose

📊

Quality Guard

Prevent underperforming models from reaching production

📊

Compliance

Enforce business and regulatory compliance checks

📊

Human Oversight

Enable expert review for high-stakes decisions

12.2 Automated Gate Checks

Check Condition
Primary Metric Threshold New model metric > configured minimum
Regression Guard No significant drop in secondary metrics
SHAP Drift Check Feature importance stable vs. previous model
Data Quality Flag No critical validation warnings in pipeline
Fairness Check Bias metrics within acceptable bounds
approval_gate: auto_approve: false primary_metric_threshold: 0.85 regression_tolerance: 0.02 require_human_review: true escalate_on_shap_drift: true

12.3 Human-in-the-Loop (HITL) Workflow

📊 Approval Gate (Automated Checks)
✅ All pass + auto_approve=true → Registry
⚠ Fail OR require_human_review=true
📊 Notification Sent to Reviewer
📊 Reviewer Dashboard
📊 Human Decision
Approve

Model promoted to Staging/Production via MLflow

Reject

Model archived, alert triggered, team notified

Defer

Retraining triggered with reviewer comments

13. Experiment Tracking with MLflow

MLflow records all experiment details for reproducibility.

Category Examples
Parameters learning rate, batch size, epochs
Metrics accuracy, RMSE, WAPE
Artifacts SHAP plots, confusion matrices
Models Serialized model files (pickle, ONNX)
mlflow.log_metric("accuracy", accuracy) mlflow.log_metric("rmse", rmse) mlflow.log_metric("wape", wape) mlflow.log_param("learning_rate", 0.001) mlflow.sklearn.log_model(model, "model")

14. Model Registry

MLflow Registry manages the model lifecycle through stages.

Stage Description
None Newly logged model — awaiting evaluation
Staging Candidate model under review
Production Active deployed model serving predictions
Archived Deprecated / replaced model

Promotion Criteria

⚠ Critical: MLflow Tags are Always Strings

mlflow.set_tag() stores values as strings internally. Always pass "True" not True — otherwise the string comparison in Airflow's approval gate will fail silently ("True" != True).

15. Airflow Pipeline Orchestration

Airflow manages pipeline execution through Directed Acyclic Graphs (DAGs).

Example DAG

data_ingestion
data_versioning
data_validation
feature_engineering
feature_store_sync
model_training
model_evaluation (includes shap_analysis)
approval_gate (automated)
human_review_layer (HITL)
model_registration
deployment

Scheduling

Pipeline Frequency
Training Pipeline Weekly
Monitoring Pipeline Daily

DAG Task Breakdown

# Task ID Responsibility
1 data_ingestion Pull from source systems, validate schema
2 data_validation Nulls, distributions, outlier checks, PSI drift
3 feature_engineering Lag features, rolling stats, encoding
4 data_splitting Split into Train (70%), Validation (15%), Test (15%) handling temporal nature
5 model_training Run training with configured hyperparameters
6 model_evaluation Compute MAE · RMSE · WAPE · Bias · MASE, log to MLflow
7 shap_analysis Generate SHAP plots, log artifacts to MLflow
8 human_review Data Scientist reviews MLflow run, sets human_approved="True" tag
9 approval_gate Reads human_approved + metric thresholds — halts DAG if any check fails
10 model_registration Push to MLflow Model Registry → Staging → Production
11 deployment Serve model to production endpoint via FastAPI

✔ Airflow Guarantees: Cron scheduling · Dependency enforcement · Automatic retry logic · Retraining automation via TriggerDagRunOperator · Email alerts on failure

16. Monitoring System

Production monitoring continuously tracks model performance.

Metric Purpose
Rolling WAPE Forecast degradation tracking
Accuracy Trend Classification drift detection
RMSE Trend Regression model degradation
Bias Systematic prediction error detection
Prediction Drift Prediction Distribution vs Training Prediction Distribution

Drift Types

📊 Data Drift

Changes in the input statistical distributions.

🔍 Feature Importance Drift

Changes in which features the model relies on (SHAP shift).

📈 Prediction Drift

Model output distribution changing relative to the training baseline.

⚠ Concept Drift

The core relationship between inputs and targets shifts.

Monitoring Signals — Thresholds & Actions

Signal Threshold Breach Action
WAPE (rolling) ↑ Rising trend Trigger retraining DAG
RMSE > trained baseline Investigate data quality / anomalies
Bias (ME) Persistently ≠ 0 Retrain with updated exogenous features
Data Drift (PSI) PSI / KL divergence ↑ Re-validate features, repipeline data
SHAP Shift Importance rank change Pre-drift warning — act proactively

17. Drift Detection

Drift detection monitors changes in input data distributions over time.

Method Purpose
PSI Distribution shift between training and production data
KL Divergence Statistical divergence between probability distributions
SHAP Shift Feature importance change over time

Example Trigger: PSI > 0.25 → Automatic model retraining initiated via Airflow

SHAP + MAPE Drift Matrix

Use this matrix to diagnose what type of degradation is happening based on two independent signals:

Scenario MAPE/WAPE SHAP Status Meaning Action
Model Aging ↑ Rising Stable Data drifted, model hasn't seen new patterns Retrain on recent data
Concept Drift ↑ Rising Changed Underlying patterns shifted fundamentally Investigate features, retrain with new architecture
Early Warning Stable Changed Feature behaviour shifting before metrics degrade Act proactively — investigate now, retrain soon
Healthy Stable Stable Model operating within acceptable bounds No action needed — continue monitoring

🔍 Airflow fires retraining DAG if ANY condition is true:

  • Rolling WAPE trend rising beyond threshold
  • RMSE > trained baseline threshold
  • Bias (Mean Error) persistently ≠ 0
  • Data drift detected (PSI / KL divergence)
  • SHAP feature importance distribution shifted

18. Automated Retraining

Retraining occurs automatically when monitoring detects degradation.

Retraining Triggers

📊 Rolling WAPE

Forecast error trending upward beyond threshold

📊 Accuracy Drop

Classification accuracy below configured threshold

📊 RMSE Spike

Regression error rising above baseline

🔍 PSI Drift

Data distribution shift detected (PSI > 0.25)

🔍 SHAP Shift

Feature importance has changed significantly — model assumptions may be invalid.

Airflow automatically triggers the full training DAG when any trigger fires.

19. Deployment Strategies

Method Example Best For
REST API FastAPI, Flask Real-time predictions
Batch Inference Scheduled predictions Large-scale offline scoring
Streaming Kafka pipelines Low-latency event-driven predictions

Infrastructure

📊

Docker

Containerized model serving

Kubernetes

Scalable orchestration

Cloud Services

AWS, GCP, Azure managed ML

20. Final Architecture View

The complete end-to-end framework.

Feature Store → Airflow pipeline (Split, Train, Eval, Explain, Approve, Register) → Registry → Serving

Managed centrally by MLflow Tracking.

Project Directory Structure

/mlops_framework/ ├── data/ ├── training/ ├── evaluation/ │ ├── metrics.py │ ├── shap_analysis.py │ └── approval_config.yaml ├── monitoring/ ├── deployment/ └── mlruns/

Compatible with: Regression · Time-series · Classification · Tree-based · Deep learning

Framework Benefits

  • Standardized Policy: Consistent evaluation across all projects
  • Explainable AI: SHAP for every model in registry
  • Automated Lifecycle: Zero manual steps end-to-end (except optional HITL)
  • Reduced Risk: Dual-metric threshold gating limits bad deployments
  • Reproducibility: Every run fully tracked in MLflow

21. Enterprise Foundations: CI & Infrastructure

A machine learning model running on a laptop is a script. A model running in an automated, scalable, and secure environment is a product. This section defines the underlying enterprise architecture required to support the Universal MLOps Framework.

21.1 Continuous Integration (CI)

While Airflow handles Continuous Training (CT) of the models, CI tools handle the testing and validation of the Python code and DAG configurations.

Component Technology Purpose
Source Control Git / GitHub Version control for train.py, config files, and DAGs.
CI Pipeline GitHub Actions Code linting (Flake8), type checking (mypy), and Unit Tests (PyTest) trigger on every pull request.
CD Pipeline Pending Continuous Delivery and Automated Deployment still remain to explore.

21.2 Infrastructure Manageability

Consistency between development, staging, and production environments is non-negotiable.

📊

Containerization (Docker)

Every training job and inference endpoint runs in an isolated, identical Docker container. "It works on my machine" is eliminated.

📊

Future Expansions

Technologies like Kubernetes / Kubeflow and Terraform (IaC) for large-scale cluster management and automated provisioning still remain to explore.

📊 Business Value ROI:

By implementing these foundations alongside the CT framework, organizations can: (1) Reduce model time-to-market from months to hours. (2) Prevent million-dollar forecasting disasters via automated safety gates.

22. End-to-End Example: Ad Sales Demand Forecasting

Let’s walk through the entire MLOps pipeline with a real-world example — forecasting weekly ad sales demand. We’ll see exactly what happens at each stage, with Airflow orchestrating the pipeline and MLflow tracking everything.

📊 Business Problem: A retail company wants to predict ad-driven product demand for the next 7 days to optimize inventory and ad spend allocation.

📊 Data: 2 years of daily sales data with features: price, ad_spend, competitor_price, promotions, holidays, day_of_week.

📊 Models: ARIMA, SARIMAX, and LSTM are trained and compared.

Stage 1: Airflow DAG Definition

Airflow orchestrates the entire pipeline as a DAG. Every Monday at 2 AM, it triggers the full training pipeline.

from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta default_args = { 'owner': 'mlops-team', 'retries': 2, 'retry_delay': timedelta(minutes=5), 'email_on_failure': True, 'email': ['ml-alerts@company.com'] } dag = DAG( 'ad_sales_training_pipeline', default_args=default_args, description='Weekly ad sales forecasting pipeline', schedule_interval='0 2 * * MON', # Every Monday 2 AM start_date=datetime(2025, 1, 1), catchup=False )

Stage 2: Data Ingestion

📊 What Happens:

  • Pull latest sales data from PostgreSQL
  • Fetch ad spend data from marketing API
  • Load competitor pricing from CSV export
  • Merge all sources into a unified dataset
def ingest_data(**context): sales = pd.read_sql(query, pg_conn) ads = fetch_ad_api(start, end) merged = sales.merge(ads, on='date') merged.to_parquet('/data/raw/sales.parquet') context['ti'].xcom_push( key='rows', value=len(merged) ) t1 = PythonOperator( task_id='data_ingestion', python_callable=ingest_data, dag=dag )

Stage 3: Data Validation

def validate_data(**context): df = pd.read_parquet('/data/raw/sales.parquet') # Schema validation required = ['date','sales','price','ad_spend'] assert all(c in df.columns for c in required) # Missing values missing_pct = df.isnull().sum() / len(df) assert missing_pct.max() < 0.05 # < 5% missing # Drift detection (PSI) psi=calculate_psi(df['ad_spend'], baseline) mlflow.log_metric('psi_ad_spend', psi) if psi> 0.25: alert('Significant drift detected in ad_spend!') t2 = PythonOperator( task_id='data_validation', python_callable=validate_data, dag=dag )

✅ Result: PSI = 0.08 (stable), 0 missing values, schema valid.

Stage 4: Feature Engineering

def engineer_features(**context): df = pd.read_parquet('/data/raw/sales.parquet') # Lag features df['lag_1'] = df['sales'].shift(1) df['lag_7'] = df['sales'].shift(7) # Rolling statistics df['rolling_mean_7'] = df['sales'].rolling(7).mean() df['rolling_std_7'] = df['sales'].rolling(7).std() # Cyclical encoding df['day_sin'] = np.sin(2*np.pi*df['dayofweek']/7) df['day_cos'] = np.cos(2*np.pi*df['dayofweek']/7) # Holiday flag df['is_holiday'] = df['date'].isin(holidays) df.dropna().to_parquet('/data/features/sales.parquet') t3 = PythonOperator( task_id='feature_engineering', python_callable=engineer_features, dag=dag )

Stage 5: Model Training + MLflow Tracking

All three models are trained and every parameter, metric, and artifact is logged to MLflow.

import mlflow def train_models(**context): df = pd.read_parquet('/data/features/sales.parquet') train, test = temporal_split(df, test_days=30) mlflow.set_experiment('ad_sales_forecasting') # --- Model 1: ARIMA --- with mlflow.start_run(run_name='ARIMA'): model = ARIMA(train['sales'], order=(1,1,1)) fitted = model.fit() preds = fitted.forecast(steps=30) rmse = np.sqrt(mean_squared_error(test['sales'], preds)) wape = np.sum(np.abs(test['sales']-preds)) / np.sum(test['sales']) mlflow.log_params({'p':1, 'd':1, 'q':1}) mlflow.log_metrics({'rmse': rmse, 'wape': wape}) mlflow.statsmodels.log_model(fitted, 'arima_model') # --- Model 2: SARIMAX --- with mlflow.start_run(run_name='SARIMAX'): exog = train[['ad_spend','price','is_holiday']] model = SARIMAX(train['sales'], exog=exog, order=(1,1,1), seasonal_order=(0,1,1,7)) fitted = model.fit(disp=False) preds = fitted.forecast(steps=30, exog=test_exog) rmse = np.sqrt(mean_squared_error(test['sales'], preds)) wape = np.sum(np.abs(test['sales']-preds)) / np.sum(test['sales']) mlflow.log_params({'p':1,'d':1,'q':1,'P':0,'D':1,'Q':1,'s':7}) mlflow.log_metrics({'rmse': rmse, 'wape': wape}) mlflow.statsmodels.log_model(fitted, 'sarimax_model') # --- Model 3: LSTM --- with mlflow.start_run(run_name='LSTM'): X_seq, y_seq = create_sequences(train, window=14) model = build_lstm(units=64, dropout=0.2) model.fit(X_seq, y_seq, epochs=50, batch_size=32, validation_split=0.15, callbacks=[EarlyStopping(patience=10)]) preds = model.predict(X_test_seq).flatten() rmse = np.sqrt(mean_squared_error(test['sales'], preds)) wape = np.sum(np.abs(test['sales']-preds)) / np.sum(test['sales']) mlflow.log_params({'units':64, 'dropout':0.2, 'epochs':50, 'window':14}) mlflow.log_metrics({'rmse': rmse, 'wape': wape}) mlflow.keras.log_model(model, 'lstm_model') t4 = PythonOperator( task_id='model_training', python_callable=train_models, dag=dag )

Stage 6: Model Evaluation — MLflow Comparison

Model RMSE WAPE MAPE Winner?
ARIMA 142.3 12.4% 14.1%
SARIMAX 98.7 7.2% 8.5% 📊 Best
LSTM 115.6 9.8% 11.2%

Stage 7: SHAP Explainability

def run_shap(**context): best_model = load_best_model() # SARIMAX explainer = shap.Explainer(best_model.predict, X_test) shap_values = explainer(X_test) # Log SHAP summary plot to MLflow fig = shap.plots.beeswarm(shap_values, show=False) mlflow.log_figure(fig, 'shap_summary.png') # Log feature importance ranking importance = pd.DataFrame({ 'feature': feature_names, 'mean_shap': np.abs(shap_values.values).mean(0) }).sort_values('mean_shap', ascending=False) mlflow.log_table(importance, 'feature_importance.json')

Top Features: ad_spend price is_holiday lag_7

Stage 8: Approval Gate + Human Review

✅ Automated Checks:

  • WAPE 7.2% < threshold 10% → PASS
  • RMSE 98.7 < baseline 120 → PASS
  • PSI 0.08 < 0.25 → PASS
  • SHAP drift: none detected → PASS
  • Fairness: N/A (forecasting) → PASS

📊 Human Review:

  • Reviewer: data-science-lead
  • Reviewed SHAP plots → features look reasonable
  • Compared with current production model → 15% improvement
  • Decision: APPROVED
  • Notes: "SARIMAX captures weekly seasonality well"

Stage 9: Model Registry — MLflow

def register_model(**context): best_run = get_best_run('ad_sales_forecasting', metric='wape') # Register the best model model_uri = f"runs:/{best_run.run_id}/sarimax_model" mv = mlflow.register_model(model_uri, 'ad-sales-forecaster') # Promote to Staging client = mlflow.tracking.MlflowClient() client.transition_model_version_stage( name='ad-sales-forecaster', version=mv.version, stage='Staging' ) # After validation in staging, promote to Production client.transition_model_version_stage( name='ad-sales-forecaster', version=mv.version, stage='Production' ) # Archive old production model client.transition_model_version_stage( name='ad-sales-forecaster', version=old_version, stage='Archived' )
None
Staging
✅ Production

Stage 10: Deployment

# FastAPI endpoint serving predictions from fastapi import FastAPI import mlflow.pyfunc app = FastAPI() model = mlflow.pyfunc.load_model( 'models:/ad-sales-forecaster/Production' ) @app.post('/predict') def predict(data: ForecastRequest): features = prepare_features(data) prediction = model.predict(features) return {'forecast': prediction.tolist(), 'model_version': model.metadata.run_id}

Stage 11: Monitoring + Retraining Trigger

# Airflow daily monitoring DAG def monitor_performance(**context): actuals = fetch_actual_sales(yesterday) predictions = fetch_predictions(yesterday) rolling_wape = compute_rolling_wape(actuals, predictions, window=7) mlflow.log_metric('rolling_wape', rolling_wape) if rolling_wape > 0.10: # 10% threshold trigger_dag('ad_sales_training_pipeline') # Retrain! alert('Model degradation detected. Retraining triggered.')

⚠ Week 6: Rolling WAPE hits 11.2% → Airflow automatically triggers retraining pipeline → New SARIMAX trained on fresh data → Approval Gate → Deployed.

Complete Airflow DAG Wiring

# Task dependencies — the full pipeline t1 = PythonOperator(task_id='data_ingestion', ...) t2 = PythonOperator(task_id='data_validation', ...) t3 = PythonOperator(task_id='feature_engineering', ...) t4 = PythonOperator(task_id='data_splitting', ...) t5 = PythonOperator(task_id='model_training', ...) t6 = PythonOperator(task_id='model_evaluation', ...) t7 = PythonOperator(task_id='shap_analysis', ...) t8 = PythonOperator(task_id='approval_gate', ...) t9 = PythonOperator(task_id='model_registration', ...) t10 = PythonOperator(task_id='deployment', ...) # Airflow DAG chain t1 >> t2 >> t3 >> t4 >> t5 >> t6 >> t7 >> t8 >> t9 >> t10
📊 t1: ingest
✅ t2: validate
⚙ t3: features
✂ t4: split
📊 t5: train (MLflow tracks)
📊 t6: evaluate
🔍 t7: SHAP
📊 t8: approval gate + human
📊 t9: register (MLflow)
📊 t10: deploy

23. Conclusion

This document presents a generalized MLOps framework capable of supporting diverse machine learning systems through a modular pipeline architecture. By integrating Airflow orchestration, MLflow experiment tracking, explainability tools, automated monitoring, and a structured Approval Gate with Human-in-the-Loop review, the framework enables continuous model improvement, reliable deployment, and scalable ML system management.

The Human Review Layer ensures that critical model promotions are subject to expert oversight, maintaining trust, explainability, and governance across all environments.