Cloud Pak for Data (CP4D) is a containerized software solution with various supported components like Watson Machine Learning (WML), Watson Studio (WS), and Spark Analytics. This enables customers to deploy CP4D on their preferred cloud providers for training and inference of foundational and machine learning models. This blog focuses on deploying an inference Long-Short Term Memory (LSTM) model in CP4D deployment space with custom requirements.
For more details on CP4D, refer to IBM Cloud Pak for Data documentation.
Exploring the setup
Our setup utilizes CP4D version 4.8 deployed on OpenShift Container Platform (OCP) 4.12 running on an IBM Power10 server. Key components added to our CP4D setup include Watson Machine Learning (WML), Watson Studio, and Spark Analytics.
For deployment instructions, refer to Installing IBM Cloud Pak for Data documentation.
Prerequisites
Before proceeding, ensure you meet the following prerequisites:
Perform the following steps to code and deploy the LSTM model's inference endpoint in CP4D deployment space.
Steps
1. Set up credentials
In a Jupyter notebook cell, provide your API key, CP4D URL, CP4D username, and the OpenShift version based on your CP4D environment.
username = '<CP4D_username>'
api_key = '<api_key>'
url = '<CP4D_URL>'
wml_credentials = {
"username": username,
"apikey": api_key,
"url": url,
"instance_id": 'openshift',
"version": '4.8'
}
2. Install dependencies and configure WML API client
Install the ibm-watson-machine-learning package and configure the WML API client using the WML credentials. This client instance, created with the provided credentials, will play a key role in the subsequent steps for deploying the inference function within the CP4D deployment space.
!pip install -U ibm-watson-machine-learning
from ibm_watson_machine_learning import APIClient
client = APIClient(wml_credentials)
3. Set default deployment space
Obtain the deployment space IDs using the following command.
client.spaces.list(limit=100)
Set the default deployment space by running the following command.
space_id = 'd47b60e3-3e76-4d2d-aac9-8ab2899657d1'
client.set.default_space(space_id)
Note: In step 4 you can make necessary modifications to align with your specific model requirements.
4. Model configuration
Create a Python closure function that encapsulates the logic for model inference. WML mandates the inference function to be a Python closure, with the outer function returning the primary inner inference function. Notably, this inner function is crucial for executing the model's inference logic.
To access essential model files, such as the '.h5' file and the mapper '.pickle' file, within the inference function, leveraging APIs becomes imperative. Consequently, within this inference function, another instance of the APIClient needs to be created.
def CreditCardFraudDetection():
# To install some of the required packages
import subprocess
subprocess.run(['conda', 'config', '--add', 'channels', 'conda-forge'])
subprocess.run(['conda', 'config', '--remove', 'channels', 'defaults'])
subprocess.run(['pip', 'install', 'scikit-learn==1.0.2', '-y'])
subprocess.run(['conda', 'install', 'pydot', '-y'])
subprocess.run(['pip', 'install', 'sklearn-pandas', '-y'])
subprocess.run(['pip', 'install', '-U', 'ibm-watson-machine-learning'])
subprocess.run(['pip', 'install', 'pandas==2.1.2'])
# In order to use the Cloud Pak for Data (CP4D) API service inside the deployment, you need to create a API client.
username = '<CP4D_username>'
api_key = '<api_key>'
url = '<CP4D_URL>'
wml_credentials = {
"username": username,
"apikey": api_key,
"url": url,
"instance_id": 'openshift',
"version": '4.8'
}
# To specify the deployment-space where the model.h5 and mapper.pkl files are located, you need to set a default space. Therefore, you don’t required to provide the space_id everything you required to use the API client.
space_id = 'd47b60e3-3e76-4d2d-aac9-8ab2899657d1'
from ibm_watson_machine_learning import APIClient
client = APIClient(wml_credentials)
client.set.default_space(space_id)
# The required package import to do inference.
import tensorflow as tf
import keras
import numpy as np
import pandas as pd
import math
import os
import joblib
import pydot
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=DeprecationWarning)
import sklearn
import tempfile
from sklearn_pandas import DataFrameMapper
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import FunctionTransformer
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelBinarizer
from sklearn.impute import SimpleImputer
import ibm_db, json, requests, operator
import pickle
from io import BytesIO
import dill
from tensorflow import keras
# This is a pre-defined function that would be used by the mapper file.
def timeEncoder(X):
X_hm = X['Time'].str.split(':', expand=True)
d = pd.to_datetime(dict(year=X['Year'],month=X['Month'],day=X['Day'],hour=X_hm[0],minute=X_hm[1])).astype(int)
return pd.DataFrame(d)
# This is a pre-defined function that would be used by the mapper file.
def amtEncoder(X):
amt = X.apply(lambda x: x[1:]).astype(float).map(lambda amt: max(1,amt)).map(math.log)
return pd.DataFrame(amt)
# This is a pre-defined function that would be used by the mapper file.
def decimalEncoder(X,length=5):
dnew = pd.DataFrame()
for i in range(length):
dnew[i] = np.mod(X,10)
X = np.floor_divide(X,10)
return dnew
# This is a pre-defined function that would be used by the mapper file.
def fraudEncoder(X):
return np.where(X == 'Yes', 1, 0).astype(int)
seq_length = 7
batch_size = 1
# To specify the file that is required to access, you need to specify the id associated with that file in your deployment space. You can you do that by selecting the file for detailed view of the file and find the id on the right-hand side screen.
data_asset_id = "82f77208-9996-4a06-af1d-f8c37f958f22"
model_data = client.data_assets.get_content(data_asset_id)
temp_hdf5_file = tempfile.NamedTemporaryFile(delete=False, suffix=".h5")
temp_hdf5_file.write(model_data)
temp_hdf5_file.close()
model = keras.models.load_model(temp_hdf5_file.name)
data_asset_id = "a8edb454-3ee8-45cf-afcc-0cff99d69fb3"
mapper_file_path = client.data_assets.download(data_asset_id, filename='mapper.pkl')
#Note the mapper_file requires some predefined functions (like timeEncoder, amtEncoder, decimalEncoder, and fraudEncoder etc), and since this is a closure function you can only use ‘dill’ package to save and load the mapper. As the general ‘pickle’ package wouldn’t support to either save or load the mapper that’s inside a closure function.
with open(mapper_file_path, 'rb') as file:
mapper = dill.load(file)
pd.options.mode.chained_assignment = None # default='warn'
seq_length = 7
batch_size = 1
# This is a custom function that is required by the inference function.
def gen_predict_batch(tdf, mapper):
new_df = mapper.transform(tdf).drop(['Is Fraud?'],axis=1)
xbatch = new_df.to_numpy().reshape(batch_size, seq_length, -1)
xbatch_t = np.transpose(xbatch, axes=(1,0,2))
return xbatch_t
# This is a custom function that is required by the inference function.
def predict_user_card_combination(tdf, mapper, model, user, card):
# Extract the rows for the specified user and card combination
user_card_df = tdf[(tdf['User'] == user) & (tdf['Card'] == card)]
if user_card_df.empty:
return "No data found for the specified user-card combination"
batch_predictions = []
for i in range(0, len(user_card_df), seq_length):
batch_data = user_card_df.iloc[i:i+seq_length] # Get a batch of 7 user-card combinations
xbatch_t = np.asarray(gen_predict_batch(batch_data, mapper))
predictions = model.predict(xbatch_t)
result = float(predictions[seq_length - 1][0][0])
batch_predictions.append(result)
return batch_predictions
# This is a custom function that is required by the inference function.
def process_db2_to_pandas(listDicts):
tdf = pd.DataFrame(listDicts)
tdf['MERCHANT_NAME'] = tdf['MERCHANT_NAME'].astype(str)
tdf["MERCHANT_CITY"].replace('ONLINE', ' ONLINE', regex=True, inplace=True)
tdf["MERCHANT_STATE"].fillna(np.nan, inplace=True)
tdf['ZIP'].fillna(np.nan, inplace=True)
tdf['IS_ERRORS'].fillna('missing_value', inplace=True)
tdf.sort_values(by=['USER_ID','CARD'], inplace=True)
tdf.reset_index(inplace=True, drop=True)
tdf.rename(columns={"INDEX": "Index",
"USER_ID": "User",
"CARD": "Card",
"YEAR": "Year",
"MONTH": "Month",
"DAY": "Day",
"TIME": "Time",
"AMOUNT": "Amount",
"USE_CHIP": "Use Chip",
"MERCHANT_NAME": "Merchant Name",
"MERCHANT_CITY": "Merchant City",
"MERCHANT_STATE": "Merchant State",
"ZIP": "Zip",
"IS_ERRORS": "Errors?",
"IS_FRAUD": "Is Fraud?"}, inplace=True)
return tdf
def add_row_to_dataframe(dataframe, user, card, year, month, day, time, amount, use_chip, merchant_name, merchant_city, merchant_state, zip_code, mcc):
new_row = {
'User': user,
'Card': card,
'Year': year,
'Month': month,
'Day': day,
'Time': time,
'Amount': amount,
'Use Chip': use_chip,
'Merchant Name': merchant_name,
'Merchant City': merchant_city,
'Merchant State': merchant_state,
'Zip': zip_code,
'MCC': mcc
}
dataframe = pd.concat([dataframe, pd.DataFrame([new_row])], ignore_index=True)
return dataframe
# This is the main inference function that would connect to the database for retrieving information, data normalization, and inference with the model.
def prediction_ccf(input):
conn = ibm_db.connect("DATABASE=BLUDB; HOSTNAME=<hostname>; PORT=50000; PROTOCOL=TCPIP; UID=db2inst1; PWD=<password>;", "", "")
sql = "select * from credit.indexed_trans where user_id = ? and card = ? and year = ? and month <= ? limit ?"
user_id = input["input_data"][0]["values"][0][0]
card = input["input_data"][0]["values"][1][0]
year, month, day = input["input_data"][0]["values"][2][0], input["input_data"][0]["values"][3][0], input["input_data"][0]["values"][4][0]
time = input["input_data"][0]["values"][5][0]
amount = input["input_data"][0]["values"][6][0]
use_chip = input["input_data"][0]["values"][7][0]
merchant_name = input["input_data"][0]["values"][8][0]
merchant_city = input["input_data"][0]["values"][9][0]
if (len(input["input_data"][0]["values"][10]) == 0) or (type(input["input_data"][0]["values"][10][0]) == str and len(input["input_data"][0]["values"][10][0]) == 0):
merchant_state = np.nan
else:
merchant_state = input["input_data"][0]["values"][10][0]
if len(input["input_data"][0]["values"][11]) == 0:
zip_code = np.nan
else:
zip_code = input["input_data"][0]["values"][11][0]
mcc = input["input_data"][0]["values"][12][0]
num_rows_to_fetch = 6
if num_rows_to_fetch % 6 == 0:
print("Number of rows is divisible by 6.")
stmt = ibm_db.prepare( conn, sql )
ibm_db.bind_param(stmt, 1, user_id)
ibm_db.bind_param(stmt, 2, card)
ibm_db.bind_param(stmt, 3, year)
ibm_db.bind_param(stmt, 4, month)
ibm_db.bind_param(stmt, 5, num_rows_to_fetch)
ibm_db.execute(stmt)
listDicts = []
dictResult = ibm_db.fetch_assoc(stmt)
while dictResult != False:
listDicts.append(dictResult)
dictResult = ibm_db.fetch_assoc(stmt)
tdf = process_db2_to_pandas(listDicts)
tdf = add_row_to_dataframe(tdf, user_id, card, year, month, day, time,
amount, use_chip, merchant_name, merchant_city,
merchant_state, zip_code, mcc)
# Prediction function call
prediction_result = predict_user_card_combination(tdf, mapper, model, user_id, card)
else:
prediction_result = "Number of rows isn't divisible by 7. Please enter a number that is divisible by 7."
prediction_response = {
'predictions': [{'fields': ['is_fraud'],
'values': [prediction_result]
}]
}
return prediction_response
return prediction_ccf
5. Create inference function
Create the inference function inside the deployment space.
# Creating the python fuction in the deployment space with specific software specification
pyfunc_swspec_id = client.software_specifications.get_uid_by_name("runtime-23.1-py3.10")
meta_data = {
client.repository.FunctionMetaNames.NAME: 'CCF Detection MMA 2GB deployment',
client.repository.FunctionMetaNames.DESCRIPTION: 'Credit Card Fraud Detection MMA',
client.repository.FunctionMetaNames.SOFTWARE_SPEC_UID: pyfunc_swspec_id
}
function_details = client.repository.store_function(meta_props=meta_data, function=CreditCardFraudDetection)
6. Deploy the model
Deploy the model by providing the inference function, software specification, and hardware specification details.
# Deploying the model to the deployment space
function_uid = client.repository.get_function_uid(function_details)
meta_props = {
client.deployments.ConfigurationMetaNames.NAME: "CCF Detection MMA 2GB -1 deployment",
client.deployments.ConfigurationMetaNames.DESCRIPTION: "CreditCardFraudDetectionMMA",
client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: { 'name': 'XXS'},
client.deployments.ConfigurationMetaNames.ONLINE: { }
}
deployment_details = client.deployments.create(function_uid, meta_props=meta_props)
print(deployment_details)
deployment_id = client.deployments.get_uid(deployment_details)
Navigating to the 'deployments' tab in your designated deployment space to confirm the successful creation of a new deployment.
Note: The hardware specification name ‘XSS’ refers to the ‘Extra extra small’ which is 1 vCPU and 2 GB Memory.
7. Scale the deployment
Adjust the number of replicas to support running high number of concurrent users.
Although multiple users can perform semi-concurrent inferencing with one REST API replica/pod, the average wait time may increase linearly after the first two users. If your inferencing workload is latency-sensitive, the following code enables you to create multiple replicas of the same REST API endpoint using the parameter 'num_nodes.
Obtain the deployment_id by the running the following command.
# Getting the deployment ids of the deployments
client.deployments.list(limit=100)
Scale the deployment by running the following command.
deployment_id = "999fb9e3-fd33-4d1b-9556-942179c2c041"
change_meta = {
client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {
"name":"XXS",
"num_nodes":20}
}
client.deployments.update(deployment_id, change_meta)
Adjust the num_nodes parameter to set the desired number of replicas. In this example, the number of replicas is set to 20, but you can modify this value based on your specific requirements.
Summary
This blog post guided you through the process of creating a deployment (micro-service) for the inference function in the Cloud Pak for Data platform, along with the necessary prerequisites. After setting up the deployment, you learned how to scale it to meet the demands of a higher number of concurrent users. Utilize the API endpoint to test your deployment, and fine-tune the scaling based on real-world usage patterns.
For any queries or additional information, feel free to comment below or reach out to us at theresax@ca.ibm.com or Revanth.Atmakuri@ibm.com or Shadman.Kaif@ibm.com.