Introduction
Are u working as an HR ? struggling to predict whether the employees in your team will continue working or they’re consider leaving the organisation, No worries ! you don’t wanna be a astrologer to predict this, by using the power of Data Science, we can predict it accurately. Let us begin our wonderful journey of employee Attrition rate with a simple, yet powerful MLOps tool, called ZenML and streamlit. Let’s start our journey.
Learning Objectives
In this article, we will learn,
- What is ZenML? Why and How to use it?
- Why to use MLflow and how to integrate with ZenML?
- The need of using deployment pipeline
- Implementation of employee attrition rate project and make predictions
This article was published as a part of the Data Science Blogathon.
Table of contents
Project Implementation
Problem Statement: Predict whether an employee will leave an organisation or not based on several factors like Age, income, Performance etc.,
Solution: Build a Logistic Regression model to predict the attrition rate of an Employee
Dataset: IBM HR Analytics Employee Attrition & Performance
[Source]: https://www.kaggle.com/datasets/pavansubhasht/ibm-hr-analytics-attrition-dataset
Before seeing the implementation of our project, let us see why we are using ZenML here first.
Why ZenML?
ZenML is a simple and powerful MLOps orchestration tool used to create ML pipelines, cache pipeline steps and save computational resources. ZenML also offers integration with multiple ML tools, making it one of the best tool to create ML pipelines.We can track our model steps,evaluation metrics, we can see our pipelines visually in Dashboards and many more.
In this project, we will implement a traditional pipeline, which uses ZenML, and we will be integrating mlflow with zenml, for experiment tracking.We will also implement a continuous deployment pipeline using MLflow integration with ZenML, which will ingest and clean the data, train the model and redeploys the model, when the prediction meets some minimum evaluation criteria.With this pipeline, we can make sure, if any new model performs better than the previous model’s threshold prediction value,then the MLFlow deployment server will be updated with the new model instead of the old model.
Common ZenML Terms
- Pipelines: Sequence of steps in our Project.
- Components: Building blocks or a particular function in our MLOps pipeline.
- Stacks: Collection of components in local/cloud.
- Artifacts: Input and output data of a step,in our project, which is stored in Artifact store.
- Artifact Store: Storage space for storing and version tracking of our artifact.
- Materializers: Components which defines how artifacts are stored and retrieved from the artifact store.
- Flavors: Solutions for specific tools and use cases.
- ZenML Server: Deployment for running stack components remotely.
Pre-requisites and Basic ZenML Commands
- Python 3.7 or higher: Get it from here: https://www.python.org/downloads/
- Activate your Virtual Environment:
#create a virtual environment
python3 -m venv venv
#Activate your virtual environmnent in your project folder
source venv/bin/activate
- ZenML Commands:
All the basic ZenML Commands with its functionalities are given below:
#Install zenml
pip install zenml
#to Launch zenml server and dashboard locally
pip install "zenml[server]"
#to see the zenml Version:
zenml version
#To initiate a new repository
zenml init
#to run the dashboard locally:
zenml up
#to know the status of our zenml Pipelines
zenml show
These commands are necessary to know to work with ZenML.
Integration of MLflow with ZenML
We are using mlflow as the experiment tracker, to track our model,artifacts, hyperparameter values. We are registering the stack component, experiment tracker, model-deployer here:
#Integrating mlflow with ZenML
zenml integration install mlflow -y
#Register the experiment tracker
zenml experiment-tracker register mlflow_tracker_employee --flavor=mlflow
#Registering the model deployer
zenml model-deployer register mlflow_employee --flavor=mlflow
#Registering the stack
zenml stack register mlflow_stack_employee -a default -o default -d mlflow_employee -e mlflow_tracker_employee --set
Zenml Stack List
Project Structure
employee-attrition-prediction/ # Project directory
├── data/
│ └── HR-Employee-Attrition.csv # Dataset file
│
├── pipelines/
│ ├── deployment_pipeline.py # Deployment pipeline
│ ├── training_pipeline.py # Training pipeline
│ └── utils.py
│
├── src/ # Source code
│ ├── data_cleaning.py # Data cleaning and preprocessing
│ ├── evaluation.py # Model evaluation
│ └── model_dev.py # Model development
│
├── steps/ # code files for ZenML steps
│ ├── ingest_data.py # Ingestion of data
│ ├── clean_data.py # Data cleaning and preprocessing
│ ├── model_train.py # Train the model
│ ├── evaluation.py # Model evaluation
│ └── config.py
│
├── streamlit_app.py # Streamlit web application
│
├── run_deployment.py # Code for running deployment and prediction pipeline
├── run_pipeline.py # Code for running training pipeline
│
├── requirements.txt # List of project required packages
├── README.md # Project documentation
└── .zen/ # ZenML directory (created automatically after ZenML initialization)
Data Ingestion
We first ingest the data from the HR-Employee-Attrition-Rate dataset from the data folder.
import pandas as pd
from zenml import step
class IngestData:
def get_data(self) -> pd.DataFrame:
df = pd.read_csv("./data/HR-Employee-Attrition.csv")
return df
@step
def ingest_data() -> pd.DataFrame:
ingest_data = IngestData()
df = ingest_data.get_data()
return df
@step is a decorator, used to make the function ingest_data() as a step of the pipeline.
Exploratory Data Analysis
#Understand the data
df.info()
# See how the data looks
df.describe()
# Check the sample data
df.head()
#Check the null values
df.isnull.sum()
#Check the percentage of people who stayed and left the company:
df['Attrition'].value_counts()
df_left = df[df['Attrition'] == "Yes"]
df_stayed = df[df['Attrition'] == "No"]
left_percentage=df_left.shape[0]*100/df.shape[0]
stayed_percentage=df_stayed.shape[0]*100/df.shape[0]
print(f"The percentage of people who left the company are:{left_percentage}")
print(f"The percentage of people who stayed the company are:{stayed_percentage}")
#Analyse the differences in features between people who stayed and people who left the company
df_left.describe()
df_stayed.describe()
Output
Observations
- The employees who left the job were worked less years in the company.
- The employess who left the company were younger, than the employees who stayed.
- The employees who left are having the office far distance from home than stayed.
Data Cleaning and Processing
- Data Cleaning: We have removed the unwanted columns in the dataset such as :”EmployeeCount”, “EmployeeNumber”, “StandardHours”, then we have changed the features which have only data values between Yes(or)No to binary 1(or)0.
- One hot Encoding: Then, we did one-hot encoding to the categorical columns such as ‘BusinessTravel’, ‘Department’, ‘EducationField’, ‘Gender’, ‘JobRole’, ‘MaritalStatus’.
import pandas as pd
class DataPreProcessStrategy(DataStrategy):
def __init__(self, encoder=None):
self.encoder = encoder
"""This class is used to preprocess the given dataset"""
def handle_data(self, data: pd.DataFrame) -> pd.DataFrame:
try:
print("Column Names Before Preprocessing:", data.columns) # Add this line
data = data.drop(["EmployeeCount", "EmployeeNumber", "StandardHours"], axis=1)
if 'Attrition' in data.columns:
print("Attrition column found in data.")
else:
print("Attrition column not found in data.")
data["Attrition"] = data["Attrition"].apply(lambda x: 1 if x == "Yes" else 0)
data["Over18"] = data["Over18"].apply(lambda x: 1 if x == "Yes" else 0)
data["OverTime"] = data["OverTime"].apply(lambda x: 1 if x == "Yes" else 0)
# Extract categorical variables
cat = data[['BusinessTravel', 'Department', 'EducationField', 'Gender', 'JobRole', 'MaritalStatus']]
# Perform one-hot encoding on categorical variables
onehot = OneHotEncoder()
cat_encoded = onehot.fit_transform(cat).toarray()
# Convert cat_encoded to DataFrame
cat_df = pd.DataFrame(cat_encoded)
# Extract numerical variables
numerical = data[['Age', 'Attrition', 'DailyRate', 'DistanceFromHome', 'Education', 'EnvironmentSatisfaction', 'HourlyRate', 'JobInvolvement', 'JobLevel', 'JobSatisfaction', 'MonthlyIncome', 'MonthlyRate', 'NumCompaniesWorked', 'Over18', 'OverTime', 'PercentSalaryHike', 'PerformanceRating', 'RelationshipSatisfaction', 'StockOptionLevel', 'TotalWorkingYears', 'TrainingTimesLastYear', 'WorkLifeBalance', 'YearsAtCompany', 'YearsInCurrentRole', 'YearsSinceLastPromotion', 'YearsWithCurrManager']]
# Concatenate X_cat_df and X_numerical
data = pd.concat([cat_df, numerical], axis=1)
print("Column Names After Preprocessing:", data.columns) # Add this line
print("Preprocessed Data:")
print(data.head())
return data
except Exception as e:
logging.error(f"Error in preprocessing the data: {e}")
raise e
Output
The data looks like this, after all data cleaning and processing completed: You can see in the image finally, the data consists of only numerical data after encoding completed.
Splitting the Data
We will then split the training and testing datasets in the ratio of 80:20.
from sklearn.model_selection import train_test_split
class DataDivideStrategy(DataStrategy):
def handle_data(self, data: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:
try:
# Check if 'Attrition' is present in the data
if 'Attrition' in data.columns:
X = data.drop(['Attrition'], axis=1)
Y = data['Attrition']
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)
return X_train, X_test, Y_train, Y_test
else:
raise ValueError("'Attrition' column not found in data.")
except Exception as e:
logging.error(f"Error in data handling: {str(e)}")
raise e
Model Training
Since, its’s a Classification problem, we are using Logistic Regression here, we can also use Random forest Classifer, Gradient boosting etc., classification algorithms.
from zenml import pipeline
@training_pipeline
def training_pipeline(data_path: str):
df = ingest_data(data_path)
X_train, X_test, y_train, y_test = clean_and_split_data(df)
model = define_model() # Define your machine learning model
trained_model = train_model(model, X_train, y_train)
evaluation_metrics = evaluate_model(trained_model, X_test, y_test)
Here, @training_pipeline decorator is used to defind the function training_pipeline() as a pipeline in ZenML.
Evaluation
For binary classification problems, we use evaluation metrics such as: accuracy, precision, F1 score, ROC-AUC curve etc., We import classification_report from scikit-learn library, to calculate the evaluation metrics and to give us the classification report.
Code:
import logging
import numpy as np
from sklearn.metrics import classification_report
class ClassificationReport:
@staticmethod
def calculate_scores(y_true: np.ndarray, y_pred: np.ndarray):
try:
logging.info("Calculate Classification Report")
report = classification_report(y_true, y_pred, output_dict=True)
logging.info(f"Classification Report:\n{report}")
return report
except Exception as e:
logging.error(f"Error in calculating Classification Report: {e}")
raise e
Classification Report:
To see the dashboard of the training_pipeline, we need to run the run_pipelilne.py,
run_pipelilne.py,
from zenml import pipeline
from pipelines.training_pipeline import train_pipeline
from zenml.client import Client
import pandas as pd
if __name__ == "__main__":
uri = Client().active_stack.experiment_tracker.get_tracking_uri()
print(uri)
train_pipeline(data_path="./data/HR-Employee-Attrition.csv")
which will return the tracking dashboard URL, looks like this,
“Dashboard URL: http://127.0.0.1:8237/workspaces/default/pipelines/6e7941f4-cf74-4e30-b3e3-ff4428b823d2/runs/2274fc18-aba1-4536-aaee-9d2ed7d26323/dag“
You can click the URL and view your amazing training pipeline in zenml dashboard. Here, the whole pipeline image is split into different image parts to see,it more clearly in detail.
Overall the training_pipeline looks like this in the dashboard, given below:
Model Deployment
Deployment Trigger
class DeploymentTriggerConfig(BaseParameters):
min_accuracy: float = 0.5
In this class DeploymentTriggerConfig, we set a minimum accuracy parameter, which specifies what our minimum model accuracy should be.
Setting up Deployment Trigger
@step(enable_cache=False)
def deployment_trigger(
accuracy: float,
config: DeploymentTriggerConfig,
):
return accuracy > config.min_accuracy
Here, this deployment_trigger() function is used to deploy the model, only when it exceeds the minimum accuracy. We will cover about why we have used caching here in the next section.
Continuous Deployment Pipeline
@pipeline(enable_cache=False, settings={"docker":docker_settings})
def continuous_deployment_pipeline(
data_path: str,
#data_path="C:/Users/user/Desktop/machine learning/Project/zenml Pipeline/Customer_Satisfaction_project/data/olist_customers_dataset.csv",
min_accuracy:float=0.0,
workers: int=1,
timeout: int=DEFAULT_SERVICE_START_STOP_TIMEOUT,
):
df=ingest_data()
# Clean the data and split into training/test sets
X_train,X_test,Y_train,Y_test=clean_df(df)
model=train_model(X_train,X_test,Y_train,Y_test)
evaluation_metrics=evaluate_model(model,X_test,Y_test)
deployment_decision=deployment_trigger(evaluation_metrics)
mlflow_model_deployer_step(
model=model,
deploy_decision=deployment_decision,
workers=workers,
timeout=timeout,
)
Here, in this continuous_deployment_pipeline(), we will ingest the data, clean the data, train our model, evaluate it, and deploy our model only if it passes the deployment_trigger() condition, so that we can make sure the new model we are going to deploy, will execute only if it’s prediction accuracy exceeds the previous model’s prediction accuracy,which is the threshold value. This is how the continous_deployment_pipeline() works.
Caching refers to storing the output of the previous executed steps in the pipeline. The outputs are stored in the Artifact store. We use caching in the pipeline parameter, to mention that, there is no change in the outputs in the previous runs and current running step, so zenML will reuse the previous run output itself. Enabling caching will speed up the pipeline running process and saves our computational resources. But sometimes, in situations where, we need to run pipelines, where there will be dynamic change in the input, parameters, output like our continuous_deployment_pipeline(), then turning off the caching is well and good. So, we have written enable_cache=False here.
Inference Pipeline
We use inference pipeline to make predictions on the new data, based on the deployed model. Let’s see how we used this pipeline in our project.
inference_pipeline()
@pipeline(enable_cache=False,settings={"docker":docker_settings})
def inference_pipeline(pipeline_name: str, pipeline_step_name:str):
data=dynamic_importer()
#print("Data Shape for Inference:", data.shape) # Print the shape of data for inference
service=prediction_service_loader(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
running=False,
)
prediction=predictor(service=service,data=data)
return prediction
Here, the inference_pipeline(), works in the following order:
- dynamic_importer()– First, the dynamic_importer() loads the new data and prepares it.
- prediction_service_loader()– The prediction_service_loader() loads the deployed model, based on the pipeline name and step name parameters.
- predictor()-Then, predictor() is used to predict the new data based on the deployed model.
Let us see about each of these functions below:
dynamic importer()
@step(enable_cache=False)
def dynamic_importer()->str:
data=get_data_for_test()
return data
Here, it calls the get_data_for_test() in the utils.py, which will loads the new data, do data processing and returns the data.
prediction_service_loader()
@step(enable_cache=False)
def prediction_service_loader(
pipeline_name: str,
pipeline_step_name: str,
running:bool=True,
model_name: str="model",
)->MLFlowDeploymentService:
mlflow_model_deployer_component=MLFlowModelDeployer.get_active_model_deployer()
existing_services=mlflow_model_deployer_component.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=pipeline_step_name,
model_name=model_name,
running=running,
)
if not existing_services:
raise RuntimeError(
f"No MLFlow deployment service found for pipeline {pipeline_name},step {pipeline_step_name} and model{model_name} and pipeline for the model {model_name} is currently running"
)
Here, in this prediction_service_loader (), we load the deployment service with respect to the deployed model based on the parameters. A deployment service is a runtime environment, where our deployed model, is ready to accept inference requests to make predictions on the new data. The line existing_services=mlflow_model_deployer_component.find_model_server(), searches for any existing deployment service available based on the given parameters like pipeline name and pipeline step name, if there is no existing services available, then it means the deployment pipeline is not executed yet, or there is an issue with the deployment pipeline, so it thows an Runtime Error.
predictor()
@step
def predictor(
service: MLFlowDeploymentService,
data: str,
) -> np.ndarray:
"""Run an inference request against a prediction service"""
service.start(timeout=21) # should be a NOP if already started
data = json.loads(data)
data.pop("columns")
data.pop("index")
columns_for_df = [
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,"Age","DailyRate","DistanceFromHome","Education","EnvironmentSatisfaction","HourlyRate","JobInvolvement","JobLevel","JobSatisfaction","MonthlyIncome","MonthlyRate","NumCompaniesWorked","Over18","OverTime","PercentSalaryHike","PerformanceRating","RelationshipSatisfaction","StockOptionLevel","TotalWorkingYears","TrainingTimesLastYear","WorkLifeBalance","YearsAtCompany","YearsInCurrentRole","YearsSinceLastPromotion","YearsWithCurrManager",
]
df = pd.DataFrame(data["data"], columns=columns_for_df)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
prediction = service.predict(data)
return prediction
After, having the deployed model and the new data, we can use the predictor(), to make the predictions.
To visually, see the continuous deployment and inference pipeline, we need to run the run_deployment.py, where the configurations, to deploy and predict will be defined.
@click.option(
"--config",
type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]),
default=DEPLOY_AND_PREDICT,
help="Optionally you can choose to only run the deployment "
"pipeline to train and deploy a model (`deploy`), or to "
"only run a prediction against the deployed model "
"(`predict`). By default both will be run "
"(`deploy_and_predict`).",
)
Here, we can either run the continuous deployment pipeline or the inference pipeline, by following these commands,
#The continuous deployment pipeline
python run_deployment.py
#To see the inference Pipeline(that is to deploy and predict)
python run_deployment.py --config predict
After executing, the commands, you can see the zenML dashboard URL,like this
Dashboard URL: http://127.0.0.1:8237/workspaces/default/pipelines/b437cf1a-971c-4a23-a3b6-c296c1cdf8ca/runs/58826e07-6139-453d-88f9-b3c771bb6695/dag
Enjoy your pipeline visualisations in the dashboard:
Continuous deployment Pipeline
The continuous deployment pipeline,(from ingestion of data to mlflow_model_deployer_step looks like),
Inference Pipeline
Building A Streamlit Application
Streamlit is an amazing open-source, python based framework, used to create UI’s, we can use streamlit to build web apps quickly, without knowing backend or frontend development. First, we need to install streamlit in our PC.
The commands to install and run streamlit server in our local system are,
#install streamlit in our local PC
pip install streamlit
#to run the streamlit local web server
streamlit run streamlit_app.py
Code:
import json
import numpy as np
import pandas as pd
import streamlit as st
from PIL import Image
from pipelines.deployment_pipeline import prediction_service_loader
from run_deployment import main
# Define a global variable to keep track of the service status
service_started = False
def start_service():
global service_started
service = prediction_service_loader(
pipeline_name="continuous_deployment_pipeline",
pipeline_step_name="mlflow_model_deployer_step",
running=False,
)
service.start(timeout=21) # Start the service
service_started = True
return service
def stop_service(service):
global service_started
service.stop() # Stop the service
service_started = False
def main():
st.title("Employee Attrition Prediction")
age = st.sidebar.slider("Age", 18, 65, 30)
monthly_income = st.sidebar.slider("Monthly Income", 0, 20000, 5000)
total_working_years = st.sidebar.slider("Total Working Years", 0, 40, 10)
years_in_current_role = st.sidebar.slider("Years in Current Role", 0, 20, 5)
years_since_last_promotion = st.sidebar.slider("Years Since Last Promotion", 0, 15, 2)
if st.button("Predict"):
global service_started
if not service_started:
service = start_service()
input_data = {
"Age": [age],
"MonthlyIncome": [monthly_income],
"TotalWorkingYears": [total_working_years],
"YearsInCurrentRole": [years_in_current_role],
"YearsSinceLastPromotion": [years_since_last_promotion],
}
df = pd.DataFrame(input_data)
json_list = json.loads(json.dumps(list(df.T.to_dict().values())))
data = np.array(json_list)
pred = service.predict(data)
st.success(
"Predicted Employee Attrition Probability (0 - 1): {:.2f}".format(
pred[0]
)
)
# Stop the service after prediction
if service_started:
stop_service(service)
if __name__ == "__main__":
main()
Here, we have created a streamlit web app, named “Employee Attrition Prediction“, in which users can provide the inputs such as Age, monthly income etc., to make the prediction, when the user clicks the “Predict” button, the input data is sent to the deployed model, the prediction is made and displayed for the user. This, is how our streamlit_app works. When, we run the streamlit_app.py file, we will get the network URL like this,
By clicking the network URL, we can see the amazing Streamlit UI, used to make predictions.
You can view all your stacks, components used, number of pipelines ran in the ZenML Dashboard making your MLOps journey easy.
ZenML Dashboard:
Stacks:
Components:
Number of Pipelines:
Number of runs:
Conclusion
We have successfully built an End-to-End Employee Attrition Rate prediciton MLOps project. We have ingested the data, cleaned it, trained the model, evaluate the model, trigger the deployment, deploy the model, predict the model by getting the new data, search for existing model services, if present, then predict the data, get the user inputs from the Streamlit web app and make predictions, while will help the HR department to take data driven decisions.
GitHub Code: https://github.com/VishalKumar-S/Employee-attrition-rate-MLOps-Project
Key-takeaways
- ZenML acts as an powerful orchestration tool, with integration of other ML tools.
- The Continuous deployment pipeline makes sure, only the best models are deployed, helps in predicting with high accuracy.
- Caching helps us in saving the resources and logging helps us track the pipeline, helps us in debugging and error tracking.
- Dashboards help us to have a clear view on ML pipeline workflow.
Frequently Asked Questions
A. Yes, ZenML is a free open-source MLOps tool, but to use the ZenML cloud, to use the zenml cloud servers with additional support from their team, it costs additionally.
A. Unlike Streamlit, to use FastAPI/ Flask / Shiny, it requires strong knowledge in HTML/CSS to create interactive UI’s. Whereas, in Streamlit, we do not need front-end knowledge to use it.
A. While ZenML provides a framework to manage and orchestrate ML pipelines, by integrating with mlflow we can track our ML experiments, it’s artefacts, parameters, and log metrics. So, we can get more info about the execution of steps.
A. The company should make retention strategies to prevent skilled employees who are at high risk of leaving, by making salary adjustments, creating engaging programs for them, training programs for their career and personal growth, and ensuring a good work environment which improves both employee’s career growth and company’s growth.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.