Data and AI on Power

Data and AI on Power

IBM Power systems provide a robust and scalable platform for a wide range of data and AI workloads, offering benefits in performance, security, and ease of use.

 View Only

Coding an inferencing endpoint for Long-Short Term Memory model in Cloud Pak for Data deployment space

By Theresa Xu posted Sat January 06, 2024 11:52 AM

  

Cloud Pak for Data (CP4D) is a containerized software solution with various supported components like Watson Machine Learning (WML), Watson Studio (WS), and Spark Analytics. This enables customers to deploy CP4D on their preferred cloud providers for training and inference of foundational and machine learning models. This blog focuses on deploying an inference Long-Short Term Memory (LSTM) model in CP4D deployment space with custom requirements.

For more details on CP4D, refer to IBM Cloud Pak for Data documentation.

Exploring the setup

Our setup utilizes CP4D version 4.8 deployed on OpenShift Container Platform (OCP) 4.12 running on an IBM Power10 server. Key components added to our CP4D setup include Watson Machine Learning (WML), Watson Studio, and Spark Analytics.

For deployment instructions, refer to Installing IBM Cloud Pak for Data documentation.

Prerequisites

Before proceeding, ensure you meet the following prerequisites:

  • CP4D: Install CP4D version 4.8 on OCP version 4.12.
  • Deployment space for LSTM model inference endpoint: To deploy the inference endpoint of the LSTM model in CP4D, create a deployment space. For detailed instructions, refer to Creating deployment spaces.
  • Jupyter notebook access in CP4D: Create a project to access Jupyter notebooks in CP4D. For detailed instructions, refer to Creating a project.
  • Inference function deployment: To deploy the inference function in the deployment space, create a Jupyter notebook. For detailed instructions, refer to Creating a notebook file in the notebook editor.
  • API Key for CP4D deployment space: Obtain an API key for using APIs in the CP4D deployment space environment by following these steps:
    1. Select 'Profile'.
    2. Choose 'Profile & Settings'.
    3. Select 'API Key'.

    Note: The API key provides secure communication, granting access similar to the CP4D web client.

  • Inference function creation and deployment: Open the Jupyter notebook to create and deploy the inference function in the CP4D deployment space. For detailed instructions, refer to Coding and running a notebook.

Perform the following steps to code and deploy the LSTM model's inference endpoint in CP4D deployment space.

Steps

1. Set up credentials

In a Jupyter notebook cell, provide your API key, CP4D URL, CP4D username, and the OpenShift version based on your CP4D environment.

username = '<CP4D_username>'
api_key = '<api_key>'
url = '<CP4D_URL>'

wml_credentials = {
    "username": username,
    "apikey": api_key,
    "url": url,
    "instance_id": 'openshift',
    "version": '4.8'
}

2. Install dependencies and configure WML API client

Install the ibm-watson-machine-learning package and configure the WML API client using the WML credentials. This client instance, created with the provided credentials, will play a key role in the subsequent steps for deploying the inference function within the CP4D deployment space.

!pip install -U ibm-watson-machine-learning
from ibm_watson_machine_learning import APIClient
client = APIClient(wml_credentials)

3. Set default deployment space

Obtain the deployment space IDs using the following command.

client.spaces.list(limit=100)

Set the default deployment space by running the following command.

space_id = 'd47b60e3-3e76-4d2d-aac9-8ab2899657d1'
client.set.default_space(space_id)

Note: In step 4 you can make necessary modifications to align with your specific model requirements.

4. Model configuration

Create a Python closure function that encapsulates the logic for model inference. WML mandates the inference function to be a Python closure, with the outer function returning the primary inner inference function. Notably, this inner function is crucial for executing the model's inference logic.

To access essential model files, such as the '.h5' file and the mapper '.pickle' file, within the inference function, leveraging APIs becomes imperative. Consequently, within this inference function, another instance of the APIClient needs to be created.

def CreditCardFraudDetection():
    
    # To install some of the required packages
    import subprocess
    subprocess.run(['conda', 'config', '--add', 'channels', 'conda-forge'])
    subprocess.run(['conda', 'config', '--remove', 'channels', 'defaults'])
    subprocess.run(['pip', 'install', 'scikit-learn==1.0.2', '-y'])
    subprocess.run(['conda', 'install', 'pydot', '-y'])
    subprocess.run(['pip', 'install', 'sklearn-pandas', '-y'])
    subprocess.run(['pip', 'install', '-U', 'ibm-watson-machine-learning'])
    subprocess.run(['pip', 'install', 'pandas==2.1.2'])

    # In order to use the Cloud Pak for Data (CP4D) API service inside the deployment, you need to create a API client. 
    username = '<CP4D_username>'
    api_key = '<api_key>'
    url = '<CP4D_URL>'

    wml_credentials = {
        "username": username,
        "apikey": api_key,
        "url": url,
        "instance_id": 'openshift',
        "version": '4.8'
    }


    # To specify the deployment-space where the model.h5 and mapper.pkl files are located, you need to set a default space. Therefore, you don’t required to provide the space_id everything you required to use the API client. 
    space_id = 'd47b60e3-3e76-4d2d-aac9-8ab2899657d1'
    from ibm_watson_machine_learning import APIClient

    client = APIClient(wml_credentials)
    client.set.default_space(space_id)

    # The required package import to do inference.
    import tensorflow as tf
    import keras
    import numpy as np
    import pandas as pd
    import math
    import os
    import joblib
    import pydot
    import warnings
    warnings.simplefilter(action='ignore', category=FutureWarning)
    warnings.simplefilter(action='ignore', category=DeprecationWarning)
    import sklearn
    import tempfile
    from sklearn_pandas import DataFrameMapper
    from sklearn.preprocessing import LabelEncoder
    from sklearn.preprocessing import OneHotEncoder
    from sklearn.preprocessing import FunctionTransformer
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.preprocessing import LabelBinarizer
    from sklearn.impute import SimpleImputer
    import ibm_db, json, requests, operator
    import pickle
    from io import BytesIO
    import dill
    from tensorflow import keras
    
    # This is a pre-defined function that would be used by the mapper file.
    def timeEncoder(X):
        X_hm = X['Time'].str.split(':', expand=True)
        d = pd.to_datetime(dict(year=X['Year'],month=X['Month'],day=X['Day'],hour=X_hm[0],minute=X_hm[1])).astype(int)
        return pd.DataFrame(d)

    # This is a pre-defined function that would be used by the mapper file.
    def amtEncoder(X):
        amt = X.apply(lambda x: x[1:]).astype(float).map(lambda amt: max(1,amt)).map(math.log)
        return pd.DataFrame(amt)

    # This is a pre-defined function that would be used by the mapper file.
    def decimalEncoder(X,length=5):
        dnew = pd.DataFrame()
        for i in range(length):
            dnew[i] = np.mod(X,10)
            X = np.floor_divide(X,10)
        return dnew

    # This is a pre-defined function that would be used by the mapper file.
    def fraudEncoder(X):
        return np.where(X == 'Yes', 1, 0).astype(int)

    seq_length = 7
    batch_size = 1

    # To specify the file that is required to access, you need to specify the id associated with that file in your deployment space. You can you do that by selecting the file for detailed view of the file and find the id on the right-hand side screen.
    data_asset_id = "82f77208-9996-4a06-af1d-f8c37f958f22"

    model_data = client.data_assets.get_content(data_asset_id)

    temp_hdf5_file = tempfile.NamedTemporaryFile(delete=False, suffix=".h5")
    temp_hdf5_file.write(model_data)
    temp_hdf5_file.close()

    model = keras.models.load_model(temp_hdf5_file.name)

    data_asset_id = "a8edb454-3ee8-45cf-afcc-0cff99d69fb3"
    mapper_file_path = client.data_assets.download(data_asset_id, filename='mapper.pkl')
    
    #Note the mapper_file requires some predefined functions (like timeEncoder, amtEncoder, decimalEncoder, and fraudEncoder etc), and since this is a closure function you can only use ‘dill’ package to save and load the mapper. As the general ‘pickle’ package wouldn’t support to either save or load the mapper that’s inside a closure function.
    with open(mapper_file_path, 'rb') as file:
        mapper = dill.load(file)

    pd.options.mode.chained_assignment = None  # default='warn'
    seq_length = 7
    batch_size = 1

    # This is a custom function that is required by the inference function.    
    def gen_predict_batch(tdf, mapper):
        new_df = mapper.transform(tdf).drop(['Is Fraud?'],axis=1)
        xbatch = new_df.to_numpy().reshape(batch_size, seq_length, -1)
        xbatch_t = np.transpose(xbatch, axes=(1,0,2))
        return xbatch_t

    # This is a custom function that is required by the inference function.
    def predict_user_card_combination(tdf, mapper, model, user, card):

        # Extract the rows for the specified user and card combination
        user_card_df = tdf[(tdf['User'] == user) & (tdf['Card'] == card)]

        if user_card_df.empty:
            return "No data found for the specified user-card combination"

        batch_predictions = []
        for i in range(0, len(user_card_df), seq_length):
            batch_data = user_card_df.iloc[i:i+seq_length]  # Get a batch of 7 user-card combinations
            xbatch_t = np.asarray(gen_predict_batch(batch_data, mapper))
            
            predictions = model.predict(xbatch_t)

            result = float(predictions[seq_length - 1][0][0])
            batch_predictions.append(result)

        return batch_predictions

    # This is a custom function that is required by the inference function.
    def process_db2_to_pandas(listDicts):
        tdf = pd.DataFrame(listDicts)
        tdf['MERCHANT_NAME'] = tdf['MERCHANT_NAME'].astype(str)
        tdf["MERCHANT_CITY"].replace('ONLINE', ' ONLINE', regex=True, inplace=True)
        tdf["MERCHANT_STATE"].fillna(np.nan, inplace=True)
        tdf['ZIP'].fillna(np.nan, inplace=True)
        tdf['IS_ERRORS'].fillna('missing_value', inplace=True)
        tdf.sort_values(by=['USER_ID','CARD'], inplace=True)
        tdf.reset_index(inplace=True, drop=True)

        tdf.rename(columns={"INDEX": "Index",
                            "USER_ID": "User",
                            "CARD": "Card",
                            "YEAR": "Year",
                            "MONTH": "Month",
                            "DAY": "Day",
                            "TIME": "Time",
                            "AMOUNT": "Amount",
                            "USE_CHIP": "Use Chip",
                            "MERCHANT_NAME": "Merchant Name",
                            "MERCHANT_CITY": "Merchant City",
                            "MERCHANT_STATE": "Merchant State", 
                            "ZIP": "Zip",
                            "IS_ERRORS": "Errors?",
                            "IS_FRAUD": "Is Fraud?"}, inplace=True)
        return tdf
    
    def add_row_to_dataframe(dataframe, user, card, year, month, day, time, amount, use_chip, merchant_name, merchant_city, merchant_state, zip_code, mcc):
        new_row = {
            'User': user,
            'Card': card,
            'Year': year,
            'Month': month,
            'Day': day,
            'Time': time,
            'Amount': amount,
            'Use Chip': use_chip,
            'Merchant Name': merchant_name,
            'Merchant City': merchant_city,
            'Merchant State': merchant_state,
            'Zip': zip_code,
            'MCC': mcc
        }

        dataframe = pd.concat([dataframe, pd.DataFrame([new_row])], ignore_index=True)

        return dataframe

    # This is the main inference function that would connect to the database for retrieving information, data normalization, and inference with the model.
    def prediction_ccf(input):
        conn = ibm_db.connect("DATABASE=BLUDB; HOSTNAME=<hostname>; PORT=50000; PROTOCOL=TCPIP; UID=db2inst1; PWD=<password>;", "", "")
        sql = "select * from credit.indexed_trans where user_id = ? and card = ? and year = ? and month <= ? limit ?"

        user_id = input["input_data"][0]["values"][0][0]
        card = input["input_data"][0]["values"][1][0]
        year, month, day = input["input_data"][0]["values"][2][0], input["input_data"][0]["values"][3][0], input["input_data"][0]["values"][4][0]
        time = input["input_data"][0]["values"][5][0]
        amount = input["input_data"][0]["values"][6][0]
        use_chip = input["input_data"][0]["values"][7][0]
        merchant_name = input["input_data"][0]["values"][8][0]
        merchant_city = input["input_data"][0]["values"][9][0]

        if (len(input["input_data"][0]["values"][10]) == 0) or (type(input["input_data"][0]["values"][10][0]) == str and len(input["input_data"][0]["values"][10][0]) == 0):
            merchant_state = np.nan        
        else:
            merchant_state = input["input_data"][0]["values"][10][0]

        if len(input["input_data"][0]["values"][11]) == 0:
            zip_code = np.nan
        else:
            zip_code = input["input_data"][0]["values"][11][0]
        mcc = input["input_data"][0]["values"][12][0]
        num_rows_to_fetch = 6

        if num_rows_to_fetch % 6 == 0:
            print("Number of rows is divisible by 6.")
            stmt = ibm_db.prepare( conn, sql )
            ibm_db.bind_param(stmt, 1, user_id)
            ibm_db.bind_param(stmt, 2, card)
            ibm_db.bind_param(stmt, 3, year)
            ibm_db.bind_param(stmt, 4, month)
            ibm_db.bind_param(stmt, 5, num_rows_to_fetch)
            ibm_db.execute(stmt)
            listDicts = []
            dictResult  = ibm_db.fetch_assoc(stmt)
            while dictResult  != False:
                listDicts.append(dictResult)
                dictResult  = ibm_db.fetch_assoc(stmt)

            tdf = process_db2_to_pandas(listDicts)
            tdf = add_row_to_dataframe(tdf, user_id, card, year, month, day, time, 
                                       amount, use_chip, merchant_name, merchant_city, 
                                       merchant_state, zip_code, mcc)

            # Prediction function call
            prediction_result = predict_user_card_combination(tdf, mapper, model, user_id, card)
        else:
            prediction_result = "Number of rows isn't divisible by 7. Please enter a number that is divisible by 7."

        prediction_response = {
            'predictions': [{'fields': ['is_fraud'],
                             'values': [prediction_result]
                             }]
        }
        return prediction_response

    return prediction_ccf

5. Create inference function

Create the inference function inside the deployment space.

# Creating the python fuction in the deployment space with specific software specification
pyfunc_swspec_id = client.software_specifications.get_uid_by_name("runtime-23.1-py3.10")

meta_data = {
    client.repository.FunctionMetaNames.NAME: 'CCF Detection MMA 2GB deployment',
    client.repository.FunctionMetaNames.DESCRIPTION: 'Credit Card Fraud Detection MMA',
    client.repository.FunctionMetaNames.SOFTWARE_SPEC_UID: pyfunc_swspec_id
}

function_details = client.repository.store_function(meta_props=meta_data, function=CreditCardFraudDetection)

6. Deploy the model

Deploy the model by providing the inference function, software specification, and hardware specification details.

# Deploying the model to the deployment space
function_uid = client.repository.get_function_uid(function_details)

meta_props = {
   client.deployments.ConfigurationMetaNames.NAME: "CCF Detection MMA 2GB -1 deployment",
   client.deployments.ConfigurationMetaNames.DESCRIPTION: "CreditCardFraudDetectionMMA",
   client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: { 'name': 'XXS'},
   client.deployments.ConfigurationMetaNames.ONLINE: {   }
}
deployment_details = client.deployments.create(function_uid, meta_props=meta_props)

print(deployment_details)
deployment_id = client.deployments.get_uid(deployment_details)

Navigating to the 'deployments' tab in your designated deployment space to confirm the successful creation of a new deployment.

Note: The hardware specification name ‘XSS’ refers to the ‘Extra extra small’ which is 1 vCPU and 2 GB Memory.

7. Scale the deployment

Adjust the number of replicas to support running high number of concurrent users.

Although multiple users can perform semi-concurrent inferencing with one REST API replica/pod, the average wait time may increase linearly after the first two users. If your inferencing workload is latency-sensitive, the following code enables you to create multiple replicas of the same REST API endpoint using the parameter 'num_nodes.

Obtain the deployment_id by the running the following command.

# Getting the deployment ids of the deployments
client.deployments.list(limit=100)

Scale the deployment by running the following command.
deployment_id = "999fb9e3-fd33-4d1b-9556-942179c2c041"
change_meta = {
                client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {
                                       "name":"XXS",
                                       "num_nodes":20}
            }
client.deployments.update(deployment_id, change_meta)

Adjust the num_nodes parameter to set the desired number of replicas. In this example, the number of replicas is set to 20, but you can modify this value based on your specific requirements.

Summary

This blog post guided you through the process of creating a deployment (micro-service) for the inference function in the Cloud Pak for Data platform, along with the necessary prerequisites. After setting up the deployment, you learned how to scale it to meet the demands of a higher number of concurrent users. Utilize the API endpoint to test your deployment, and fine-tune the scaling based on real-world usage patterns.

For any queries or additional information, feel free to comment below or reach out to us at theresax@ca.ibm.com or Revanth.Atmakuri@ibm.com or Shadman.Kaif@ibm.com.

0 comments
24 views

Permalink