Integration

 View Only

Using App Connect Enterprise to generate Embeddings with WatsonX AI

By DAVID CRIGHTON posted 5 days ago

  

As the value unlocked by Large Language Models and AI technologies accelerates at an ever increasing pace it become increasingly desirable to make enterprise data available to these models through patterns such as Retrieval Augmented Generation (RAG). The Integration Layer within an enterprise is uniquely positioned to contribute to the building of LLM friendly datastores due to it's existing access to disparate data from a multitude of data sources often in a combined or joined form that may not be represented anywhere else in the Enterprise.

When using the RAG pattern the first step is to generate "Embeddings" for the data. These Embeddings are a vector representation of data that can be used assess semantic similarity of documents in order to pass relevant context to LLMs. These vector representations would typically be stored in a Vector Database such as a Milvus engine deployed as part of WatsonX.data.

In this tutorial we examine 2 different ways that you could use App Connect Enterprise to generate vector embeddings. In a future article we will examine how we might store these in a Vector Database. Firstly we will demonstrate the WatsonX Request Discovery Connector Node and then we will demonstrate how the same use case could be achieved with HTTPRequest Nodes.

Requirements

To follow this tutorial you will need access to a Project Space in a WatsonX Machine Learning instance. At the time of writing the flows developed in this article were runaable against a trial instance of WatsonX. 

You will also need a copy of App Connect Enterprise installed. If you would like to follow the instructions for the WatsonX Request Node then this will need to be version 13.0.2.0 or higher. If you are following the HTTPRequest Node tutorial then any version of App Connect Enterprise should be sufficient.

Obtaining the required information from WatsonX

In order to run these flows you will need to generate some information from your WatsonX instance. You will need:

  • an IAM API Key
  • The project id
  • The Watson.AI URI

Generating an IAM API Key

In order to connect to WatsonX you will need to generate an IAM API Key in order to permit login from an application. To do this launch WatsonX and click on the "Create API Key" button in the "Developer access" section:

You will be taken to the IAM panel for creating API keys:

Click on the "Create Button" and then fill in the following options:

When you click the "Create" button you will be shown the following screen:

Press the eye icon to display the API Key or use the "Copy" or "Download" links to save it. Note that you will not be able to retrieve the API Key again if you lose this so be sure to keep a note.

Once you have noted down the API Key you can return to the Developer Access Panel and take note of the Project Id and the WatsonX URI.

Using the WatsonX.ai Request Node

The simplest way to interact with WatsonX.ai is using the WatsonX.ai Request Node that was introduced in App Connect Enterprise version 13.0.2.0. This node is a Discovery Connector Node, meaning that when you use it the Discovery Wizard will guide you through selecting the appropriate values for interacting with WatsonX.ai.

Additionally the WatsonX.ai Request Node handles the authentication sequence on the flow developers behalf which simplifies the required message flow design.

To get started drop a WatsonX.ai Request Node onto the pallette and then click the "Launch Connector Discovery" button:

In the next screen create a new Policy Project to store the output of Connector Discovery so that it can be used at runtime. Do this by clicking the "New" Button:

Enter a name for the Policy Project and then press Finish:

When we connect to WatsonX in order to perform Connector Discovery we will use a credential (our IAM API Key), this will be stored in an External directory Vault which we can then access in an Integration Server at runtime.  Press the "Select" button and then choose to use either a directory on disk or a Workspace Project to store the external vault. In this instance we have used a Workspace Project:

If this is a newly create vault then when you enter the vault key used to secure access to the vault you will prompted to "Create an external directory vault", otherwise you will be prompted to "Test access". Enter the vault key and then press Launch Discovery:

You will be presented with a new window containing the Connector Discovery Wizard. This will be pre-populated with "IBM watsonX.ai" but this will not yet be connected because we have not supplied a credential to the discovery process. Press the twistee to see the available actions associated with the WatsonX.ai connector:

For this tutorial we will be using the "Generate Embeddings" API so select this and continue to the next screen where you will be prompted to "Connect" this connector.

This will allow you to enter your WatsonX.ai server URI and IAM APIKey which you obtained in the previous section.

On the next screen you can map the inputs of the Generate Embeddings API using Jsonata expressions. For this tutorial however we will be providing an input message directly to the WatsonX.ai Request Node, therefore open the "Controls" panel and use the "Use input message unchanged" option. Once this is selected you can hit "Save" and then return to the message flow editor:

The message flow design

The message flow design when using the WatsonX.ai Request Node is very simple. We use a HTTPInput node to receive incoming data in a JSON message, we will assume that the structure of the incoming message is a simple object containing an array of items that we want to generate embeddings for. For example:

{ 
  "inputs":  [
    "This is a string to generate embeddings from", 
    "This is another input string which we can generate embeddings from"
    ] 
}

We need to enrich this data with the details for the model that we want to use and then transform the message into the appropriate format that the WatsonX.AI Request Node is expecting.

The correct format is shown below in trace node output format:

(0x01000000:Object):JSON            = ( ['json' : 0x7fecd4100060]
    (0x01000000:Object):Data = (
      (0x03000000:NameValue):version    = '2024-03-14' (CHARACTER)
      (0x01001000:Array    ):inputs     = (
        (0x03000000:NameValue):dummy = 'This is a string to generate embeddings from' (CHARACTER)
        (0x03000000:NameValue):dummy = 'This is another input string which we can generate embeddings from' (CHARACTER)
      )
      (0x03000000:NameValue):model_id   = 'ibm/slate-125m-english-rtrvr' (CHARACTER)
      (0x03000000:NameValue):project_id = '1111111-1111-1111-1111-11111111' (CHARACTER)
    )
  )

We will use a Mapping Node in conjunction with the schemas to populate the static pieces of data and then use an ESQL Compute Node to add the actual inputs from the input message. So our overall message flow design looks like this:

Let's examine the Graphical Map that we have used to set the static piece of the data:

Here we can see that we have first added an Environment mapping to the map. This allows us to store the input message in the environment as we will use this later in the downstream flow.

We then create a new output message based on the request schema that was generated for us by the WatsonX.ai Request Node. We then have simple "Assign Mappings" for each of the pieces of static data. For example here we have set the modelId to ibm/slate-125m-english-rtrvr:

We can now examine the code for the ESQl Compute Node:

CREATE COMPUTE MODULE WatsonXDemoFlow_Compute
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN		
		CALL CopyEntireMessage();
		CREATE FIELD OutputRoot.JSON.Data.inputs IDENTITY (JSON.Array)inputs;
		DECLARE inputStrings INTEGER CARDINALITY(Environment.Variables.inputs.*[]) + 1;
		DECLARE I INTEGER 1;
		WHILE I < inputStrings DO	
		  CREATE LASTCHILD OF OutputRoot.JSON.Data.inputs TYPE NameValue NAME 'dummy' VALUE Environment.Variables.inputs.*[I];
		  SET I = I+1;
		END WHILE;		
		RETURN TRUE;
	END;

	CREATE PROCEDURE CopyMessageHeaders() BEGIN
		DECLARE I INTEGER 1;
		DECLARE J INTEGER;
		SET J = CARDINALITY(InputRoot.*[]);
		WHILE I < J DO
			SET OutputRoot.*[I] = InputRoot.*[I];
			SET I = I + 1;
		END WHILE;
	END;

	CREATE PROCEDURE CopyEntireMessage() BEGIN
		SET OutputRoot = InputRoot;
	END;
END MODULE;

Remember that the structure of the input message requires us to create an array of JSON Strings. The first thing we do is use the CREATE FIELD statement with the IDENTITY keyword to ensure that the "inputs" element is an array:

CREATE FIELD OutputRoot.JSON.Data.inputs IDENTITY (JSON.Array)inputs;

We then iterate over the elements we stashed earlier in the Environment, creating new children of the OutputRoot.Json.Data,inputs array. Note that there is a quirk of the JSON parser where elements in an array must be of type NameValue in order to be serialised so we assign a "dummy" NAME for these elements. This name will not appear in the serialised output:

DECLARE inputStrings INTEGER CARDINALITY(Environment.Variables.inputs.*[]) + 1;
DECLARE I INTEGER 1;
WHILE I < inputStrings DO	
   CREATE LASTCHILD OF OutputRoot.JSON.Data.inputs TYPE NameValue NAME 'dummy' VALUE Environment.Variables.inputs.*[I];
   SET I = I+1;
END WHILE;		

Once the ESQL Compute Node has been executed the data is ready to pass to the WatsonX.ai Request Node and then the output can be passed directly to the HTTP Reply Node to be returned to the originating http client.

Running The Flow

Before we deploy our flow however we need to configure the Integration Server to use the external directory vault that contains the credentials that we used for Connector Discovery. To do this we can update the Credentials stanza of the server.conf.yaml to reference the fully qualified path to the external directory vault:

Credentials:
  ExternalDirectoryVault:
    directory: '/home/davicrig/IBM/ACET13/WatsonX/TEST_EXT_DIR_VAULT'   # Optional path to an external directory vault that is shared by this and other integration nodes and integration servers. Default ''.

When the Integration Server is started we also need to specify the vault key used to secure vault access via the --ext-vault-key parameter:

IntegrationServer --work-dir /home/davicrig/IBM/ACET13/WatsonX/WATSONX_REQUESTNODE --ext-vault-key vaultKey

Once the Integration Server has started we can deploy our Application and Flow and then execute it using curl for example:

curl [http://localhost:7800/getEmbeddingsWatsonXRequest](http://localhost:7800/getEmbeddingsHTTPRequest)-d "{ \"inputs\": [\"This is a string to generate embeddings from\", \"This is another input string which we can generate embeddings from\"] }"

The output will be the embeddings generated by WatsonX.ai. The vectors are rather large so I have truncated the data to display the structure:

{
    "model_id": "ibm\/slate-125m-english-rtrvr",
    "results": [
        {
            "embedding": [
                0.029097699,
<snip>
                0.013107353
            ]
        },
        {
            "embedding": [
                0.0035335228,
 <snip>
                0.027013943
            ]
        }
    ],
    "input_token_count": 27,
    "created_at": "2025-02-21T14:02:14.831Z"
}

Using the HTTP Request Node

It is also possible to call the WatsonX.AI API's directly using the HTTPRequest Node. You might want to do this if you do not have access to ACE 13.0.2.0 or newer in your ACE environment or if you wanted more explicit control over the HTTP traffic. For example using the HTTPRequest node it may be possible to achieve better performance by carefully managing the memory usage when manipulating the message tree when forming requests.

The auth sequence

One of the main disadvantages of using the HTTP Request Node directly is that you will need to deal with the authentication sequence for WatsonX.AI. In this sequence IAM is used to exchange an API key for a Bearer Token that is presented on subsequent requests. The validity of a bearer token is much shorter than an APIKey so the flow will need some way to dynamically update it's token when required.

In this instance the logic is encapsulated in a subflow as shown below:

The subflow will first check if a Bearer Token is available already in a token cache. 

If the token is present then the Bearer Token is placed in the Environment for later us and control returns to the parent flow.

If on the other hand the token is not present then a request for the Bearer Token is made using the HTTPRequest Node and then the resulting token is added to the cache and placed in the environment for later use.

The token cache itself is implemented using some simple Java:

public class TokenStash {
	private static String token = null;
	private static long refreshTime = System.currentTimeMillis() + 60 * 1000; //refresh every hour 

	public static void stash(String accessToken) {
		token = accessToken;
		refreshTime = System.currentTimeMillis() + 60 * 1000;
	}
	
	public static String get() {
		String retVal = null;
		if(token != null && System.currentTimeMillis() < refreshTime) {
			retVal = token;
		} else {
			token = null;
		}
		return retVal;
	}

}

Here we can see that we store the token as a static member of the TokenStash class and we will invalidate this token every hour. Once the token is invalidated this will cause the execution in the parent flow to refresh the token.

For clarity we have chosen not to worry about the case where multiple message flow threads could both populate the Token Cache concurrently as the last obtained token will still remain valid for subsequent requests. This could be resolved via synchronization of the Token.token member.

We can examine how the cache can be used from the checkForToken Node:

			if(TokenStash.get() != null) {
				String accessToken = TokenStash.get();
				List<MbElement> envNodeSet= (List<MbElement>) outAssembly.getGlobalEnvironment().getRootElement().evaluateXPath("accessToken");
				if(envNodeSet.isEmpty()) {
					MbElement accessTokenElement = outAssembly.getGlobalEnvironment().getRootElement().createElementAsFirstChild(MbElement.TYPE_NAME_VALUE);
					accessTokenElement.setName("accessToken");
					accessTokenElement.setValue(accessToken);
				} else {
					envNodeSet.get(0).setValue(accessToken);
				}
				alt.propagate(outAssembly);
				return;
			}

Similarly we can see how the StashToken node writes the token to the cache:

			//The token will be in JSON.Data.access_token
			List<MbElement> nodeset = (List<MbElement>) outMessage.getRootElement().getLastChild().evaluateXPath("Data/access_token");
			String accessToken = nodeset.get(0).getValueAsString();
			TokenStash.stash(accessToken);
			
			//also put the token in the env for use later in the flow
			List<MbElement> envNodeSet= (List<MbElement>) outAssembly.getGlobalEnvironment().getRootElement().evaluateXPath("accessToken");
			if(envNodeSet.isEmpty()) {
				MbElement accessTokenElement = outAssembly.getGlobalEnvironment().getRootElement().createElementAsFirstChild(MbElement.TYPE_NAME_VALUE);
				accessTokenElement.setName("accessToken");
				accessTokenElement.setValue(accessToken);
			} else {
				envNodeSet.get(0).setValue(accessToken);
			}

So now we have seen how we can write and get tokens from the cache, let's consider how we actually obtain the Bearer Token in the first place. In order to do this we need to make a request to the IAM service. The form of request that we are going to build can be represented by the following curl command:

curl -X POST \  
-H 'Content-Type: application/x-www-form-urlencoded' \  
--data 'grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey=<apiKey>' \  
'https://iam.cloud.ibm.com/identity/token'

The apiKey is a credential and we should not store this in plain text or hard code it in the message flow. Instead we will define the credential as a User Defined Credential and store it in the vault. We can do this with a command such as:

mqsicredentials --ext-vault-dir /home/davicrig/IBM/ACET13/WatsonX/TEST_EXT_DIR_VAULT --vault-key vaultKey --create --credential-type userdefined --credential-name WatsonXAPIKey --api-key<apiKey>

This command adds a new credential called WatsonXAPIKey of type userdefined to store the API Key. We can retrieve this in Java code using the following Java Snippet:

MbCredential apiKey = MbCredential.getCredential("userdefined", "WatsonXAPIKey");
Map<String, char[]> credentialProperties = apiKey.reloadAndRetrieveProperties();
char[] apiKeyBytes = credentialProperties.get(MbCredential.API_KEY);

This is the approach used in the getApiToken Java Compute Node which also constructs the rest of the body of the IAM request:

MbCredential apiKey = MbCredential.getCredential("userdefined", "WatsonXAPIKey");
Map<String, char[]> credentialProperties = apiKey.reloadAndRetrieveProperties();
char[] apiKeyBytes = credentialProperties.get(MbCredential.API_KEY);
			
//the request for a bearer token is form-urlencoded, we will just create a BLOB
//the form needs to be grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey=<myKeyHere>
StringBuilder sb = new StringBuilder();
sb.append("grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey=");
sb.append(apiKeyBytes);
			
outMessage.getRootElement().createElementAsFirstChild(MbBLOB.PARSER_NAME).createElementAsLastChild(MbElement.TYPE_NAME_VALUE, "BLOB", sb.toString().getBytes("UTF-8"));

We then used the HTTPHeader Node to add the required Content-Type header and our request is ready to pass to IAM via the getBearerToken HTTPRequest Node:

Note that the Bearer Token acquired can be used for any WatsonX.ai request and can be used for multiple requests in sequence within the same message flow if required. Packaging this as a subflow allows this functionality to be reused as required.

The main flow design

The main message flow is quite simple and follows a similar approach to the WatsonX.AI Request Node example as shown by the flow diagram below:

The logic implemented here is as follows.

The incoming JSON message data is placed in the environment for future use as the output message may be used to create the IAM request for the bearer token

The Bearer Token is obtained from the subflow as described in the previous section.

A compute Node is used to construct the API request

The request is made by the HTTP Request Node

The data is passed back to the originating HTTP Client via a HTTP Reply Node.

We can examine how we construct the API request in the prepareEmbeddingRequest Compute Node:

ECLARE model_id, project_id, instance_uri, api_version EXTERNAL CHARACTER;

CREATE COMPUTE MODULE WatsonXGetEmbeddings_prepareEmbeddingsRequest
	CREATE FUNCTION Main() RETURNS BOOLEAN
	BEGIN
		-- CALL CopyMessageHeaders();
		-- CALL CopyEntireMessage();
		--Our message needs to look like this
		-- curl -X POST \  
-- -H "Authorization: Bearer myToken" \  
-- -H "Accept: application/json" \  
-- -H "Content-type: application/json" \  
-- --data-raw '{  
--   "inputs": [  
--     "Youth craves thrills while adulthood cherishes wisdom.",  
--     "Youth seeks ambition while adulthood finds contentment.",  
--     "Dreams chased in youth while goals pursued in adulthood."  
--   ],  
--   "model_id": "ibm/slate-125m-english-rtrvr",  
--   "project_id": "982672ca-e63d-46a3-8526-7bc7e65e403a"  
-- }' \  
-- "https://us-south.ml.cloud.ibm.com/ml/v1/text/embeddings?version=2024-05-31"
        
        --First setup the header
		SET OutputRoot.HTTPRequestHeader."Content-Type" = 'application/json';
		SET OutputRoot.HTTPRequestHeader.Accept = 'applicatin/json';
		SET OutputRoot.HTTPRequestHeader.Authorization = 'Bearer ' || Environment.accessToken;
		
		--Now setup the body
		CREATE LASTCHILD Of OutputRoot DOMAIN('JSON');
		SET OutputRoot.JSON.Data.model_id = model_id;
		SET OutputRoot.JSON.Data.project_id = project_id;	
		CREATE FIELD OutputRoot.JSON.Data.inputs IDENTITY (JSON.Array)inputs;
		DECLARE inputStrings INTEGER CARDINALITY(Environment.Variables.inputs.*[]) + 1;
		DECLARE I INTEGER 1;
		WHILE I < inputStrings DO	
		  CREATE LASTCHILD OF OutputRoot.JSON.Data.inputs TYPE NameValue NAME 'dummy' VALUE Environment.Variables.inputs.*[I];
		  SET I = I+1;
		END WHILE;		
		
		--Now build the URI
		SET OutputLocalEnvironment.Destination.HTTP.RequestURL = 'https://' || instance_uri || '/ml/v1/text/embeddings?version=' || api_version;
		
		RETURN TRUE;
	END;

	CREATE PROCEDURE CopyMessageHeaders() BEGIN
		DECLARE I INTEGER 1;
		DECLARE J INTEGER;
		SET J = CARDINALITY(InputRoot.*[]);
		WHILE I < J DO
			SET OutputRoot.*[I] = InputRoot.*[I];
			SET I = I + 1;
		END WHILE;
	END;

	CREATE PROCEDURE CopyEntireMessage() BEGIN
		SET OutputRoot = InputRoot;
	END;
END MODULE;

The first thing we do is declare some external variables to hold the model_id, project_id, instance_uri and api_version. This allows us to implement these variables as User Defined Properties so they can be passed as configuration to the flow instead of being hardcoded. This would allow us to for example use a different WatsonX Project Space in Production from the one used in Test.

We can see in the comment the structure of the HTTP Request which we are building:

		-- curl -X POST \  
-- -H "Authorization: Bearer myToken" \  
-- -H "Accept: application/json" \  
-- -H "Content-type: application/json" \  
-- --data-raw '{  
--   "inputs": [  
--     "Youth craves thrills while adulthood cherishes wisdom.",  
--     "Youth seeks ambition while adulthood finds contentment.",  
--     "Dreams chased in youth while goals pursued in adulthood."  
--   ],  
--   "model_id": "ibm/slate-125m-english-rtrvr",  
--   "project_id": "982672ca-e63d-46a3-8526-7bc7e65e403a"  
-- }' \  
-- "https://us-south.ml.cloud.ibm.com/ml/v1/text/embeddings?version=2024-05-31"

This sample was taken from the WatsonX.ai API documentation. The first thing we do in the ESQL Compute Node is create the required headers:

        --First setup the header
		SET OutputRoot.HTTPRequestHeader."Content-Type" = 'application/json';
		SET OutputRoot.HTTPRequestHeader.Accept = 'applicatin/json';
		SET OutputRoot.HTTPRequestHeader.Authorization = 'Bearer ' || Environment.accessToken;

Note that the Bearer Token is obtained from the Environment where it was placed by the getWatsonXBearerToken subflow.

Then we construct a new JSON object and copy in the project_id and model_id  using the values set in the UDP's that we declared earlier:

		--Now setup the body
		CREATE LASTCHILD Of OutputRoot DOMAIN('JSON');
		SET OutputRoot.JSON.Data.model_id = model_id;
		SET OutputRoot.JSON.Data.project_id = project_id;

We then use the same approach as in the WatsonX.AI Request Node example to construct the input Strings, remembering that the array elements must be of NameValue type even though the Name will not be serialised:

		CREATE FIELD OutputRoot.JSON.Data.inputs IDENTITY (JSON.Array)inputs;
		DECLARE inputStrings INTEGER CARDINALITY(Environment.Variables.inputs.*[]) + 1;
		DECLARE I INTEGER 1;
		WHILE I < inputStrings DO	
		  CREATE LASTCHILD OF OutputRoot.JSON.Data.inputs TYPE NameValue NAME 'dummy' VALUE Environment.Variables.inputs.*[I];
		  SET I = I+1;
		END WHILE;	

Finally we construct the URI for the HTTPRequest Node to use by setting the appropriate Destination field in the LocalEnvironment, again we access the UDP's we defined earlier to get the values for the instance_uri and api_version:

		--Now build the URI
		SET OutputLocalEnvironment.Destination.HTTP.RequestURL = 'https://' || instance_uri || '/ml/v1/text/embeddings?version=' || api_version;

The flow is almost complete, the last step we need to perform is to create the UDP's that will be associated with the ESQL EXTERNAL variable. We do this by using the User Defined Properties tab in the main flow editing canvas:

We chose to supply defaults for the model_id and api_version and then set the project_id and instance_uri to mandatory.

When we build the bar file we can populare these values with the data we obtained from WatsonX.ai when we setup the prerequisites:

We can call our flow using a curl command similar to the previous example:

curl -v http://localhost:7800/getEmbeddingsHTTPRequest -d "{ \"inputs\": [\"This is a string to generate embeddings from\", \"This is another input string which we can generate embeddings from\"] }"

Once again I have displayed the output in a truncated form:

{
    "model_id": "ibm/slate-125m-english-rtrvr",
    "created_at": "2025-02-21T14:46:17.527Z",
    "results": [
        {
            "embedding": [
                0.029097699,
<snip>
                0.013107353
            ]
        },
        {
            "embedding": [
                0.0035335228,
 <snip>
                0.027013943
            ]
        }
    ],
    "input_token_count": 27
}

Conclusion

In this articles we have seen how to interact with WatsonX.ai to generate Embeddings using both the WatsonX.ai Request Node or the HTTP Request Node. This provides the first step needed to expose data flowing through the integration platform to Large Language Models via the RAG pattern. In a full solution these embeddings would need to be stored in a vector database and then retrieved based on semantic similarity with the prompt. App Connect Enterprise is also an ideal fit for performing both of these tasks and the process used in this article should be considered as part of a broader strategy around building AI into you Integration Platform.

As well as being used to generate Embeddings for application data flowing through the Integration Platform this could also be used to create embeddings for log data produced by the flow allowing LLM's access to semantically similar log data. This could for example provide valuable insights when dealing with production incidents.

If you would like to follow along with this tutorial you can get the project files from the following location 

0 comments
7 views

Permalink