watsonx.data

watsonx.data

Put your data to work, wherever it resides, with the hybrid, open data lakehouse for AI and analytics

 View Only

Build RAG with watsonx.data Milvus and Haystack

By Divya posted Mon May 05, 2025 01:21 AM

  

Build RAG with watsonx.data Milvus and Haystack

Today, building intelligent applications is all about unlocking the value hidden in your organization’s data. A popular and effective approach is Retrieval-Augmented Generation (RAG), which combines the strengths of a search engine with the creativity of a generative model to deliver more accurate, context-aware responses.

In this blog, we’ll show you how to set up a RAG system using Haystack and Milvus. This combination allows you to build fast, scalable applications that generate insights grounded in real, reliable data.

What is Haystack?

Haystack is an open-source framework for building production-ready applications with large language models (LLMs). It provides a flexible and composable architecture that allows you to build complex language processing pipelines by connecting different components.

Key features of Haystack:

  • Component-based architecture: Build custom pipelines by connecting modular components
  • Framework agnostic: Works with various LLM providers and vector databases
  • Production-ready: Designed for scalability and real-world application development
  • Full RAG support: Provides all the tools needed for retrieval-augmented generation

Step-by-Step Implementation

1. Create Milvus Instance on watsonx.data

You can refer to this Getting Started with IBM watsonx.data Milvus 

2. Set up a Watson Machine Learning service instance and API key

  1. Create a Watson Machine Learning service instance (you can choose the Lite plan, which is a free instance).
  2. Generate an API Key in WML. Save this API key for use in this tutorial.
  3. Associate the WML service to the project you created in watsonx.ai.

3. Setting Up the Environment

First, we'll set up our Python environment and install the necessary packages:

# Install required packages

!pip install --upgrade pymilvus milvus-haystack haystack-ai ibm-watsonx-ai

4. Data Preparation

For this tutorial, we'll use a public domain text about Leonardo Da Vinci from Project Gutenberg:

# Download sample data for demonstration

import os

import urllib.request

url = "https://www.gutenberg.org/cache/epub/7785/pg7785.txt"

file_path = "./davinci.txt"

if not os.path.exists(file_path):

    urllib.request.urlretrieve(url, file_path)

5. IBM watsonx.ai Configuration

Next, we'll configure our connection to IBM watsonx.ai:

# Set up IBM watsonx API credentials

watsonx_credentials = {

    "url": "<url>",  # Replace with your watsonx URL

    "apikey": "<apikey>",  # Replace with your watsonx API Key

}

project_id = "<project_id>"  # Replace with your project ID

6. Initializing watsonx.ai Components

Now we'll initialize the embedding and language models from IBM watsonx.ai:

# Import watsonx libraries

from ibm_watsonx_ai import APIClient

from ibm_watsonx_ai.foundation_models.embeddings import Embeddings

from ibm_watsonx_ai.foundation_models import ModelInference

from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames as EmbedParams

# Initialize the IBM watsonx client

client = APIClient(watsonx_credentials)

# Configure embedding model

embedding_model_id = "ibm/slate-30m-english-rtrvr"

embedding_params = {

    EmbedParams.TRUNCATE_INPUT_TOKENS: 128,

    EmbedParams.RETURN_OPTIONS: {'input_text': True},

}

# Initialize the embedding model

watsonx_embeddings = Embeddings(

    model_id=embedding_model_id,

    credentials=watsonx_credentials,

    params=embedding_params,

    project_id=project_id,

    space_id=None,

    verify=False

)

# Configure LLM generation model

generation_model_id = "ibm/granite-3-3-8b-instruct"

generation_params = {

    "max_new_tokens": 1024,

    "temperature": 0, 

    "top_p": 0.9,

    "repetition_penalty": 1.05

}

# Initialize the LLM model

watsonx_llm = ModelInference(

    model_id=generation_model_id,

    credentials=watsonx_credentials,

    params=generation_params,

    project_id=project_id

)

7. Integrating watsonx.ai with Haystack

Haystack provides a modular approach to building NLP pipelines through components that can be connected in various ways. To integrate IBM watsonx.ai models with Haystack, we need to create custom components that wrap around watsonx.ai's API calls.

To use watsonx.ai models within Haystack, we need to create wrapper functions:

# Create simple wrapper functions to integrate watsonx with Haystack

def embed_documents(texts):

    """Wrapper function to embed documents using watsonx"""

    return watsonx_embeddings.embed_documents(texts=texts)

def embed_query(text):

    """Wrapper function to embed a single query using watsonx"""

    return watsonx_embeddings.embed_query(text=text)

def generate_text(prompt):

    """Wrapper function to generate text using watsonx LLM"""

    response = watsonx_llm.generate(prompt=prompt)

    return response['results'][0]['generated_text']

Now we'll create custom Haystack components that use these wrapper functions. These components will slot into our pipelines, enabling us to use watsonx.ai within the Haystack framework:

# Import Haystack component framework

from haystack import component

from haystack.dataclasses import Document

# Haystack's @component decorator allows you to plug in your logic into pipelines.

@component

class watsonxDocumentEmbedder:

    @component.output_types(documents=List[Document])

    def run(self, documents: List[Document]):

        texts = [doc.content for doc in documents]

        embeddings = embed_documents(texts)

       

        for doc, embedding in zip(documents, embeddings):

            doc.embedding = embedding

           

        return {"documents": documents}

@component

class watsonxTextEmbedder:

    @component.output_types(embedding=List[float], text=str)

    def run(self, text: str):

        embedding = embed_query(text)

        return {"embedding": embedding, "text": text}

@component

class watsonxGenerator:

    @component.output_types(replies=List[str])

    def run(self, prompt: str):

        generated_text = generate_text(prompt)

        return {"replies": [generated_text]}

8. Setting Up Milvus

Now let's set up our Milvus vector database:

# Import Haystack and Milvus components

from haystack import Pipeline

from haystack.components.converters import TextFileToDocument

from haystack.components.preprocessors import DocumentSplitter

from haystack.components.writers import DocumentWriter

from haystack.components.builders import PromptBuilder

from milvus_haystack import MilvusDocumentStore

from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever

# Initialize Milvus document store

document_store = MilvusDocumentStore(

     connection_args={

        "uri": "https://<hostname>:<port>", # Replace with your watsonx.data Milvus URI or IP

        "user":"<user>",

        "password":"<password>",

        "secure": True,  # Set True if TLS is enabled

        "server_pem_path": "/root/path of ca.cert"

    },

    drop_old=True,

)

9. Building the Indexing Pipeline

Now that we have our components set up, we'll create an indexing pipeline to process our documents and store them in the Milvus vector database.

This pipeline will:

  1. Load the text file
  2. Split it into smaller chunks for better retrieval
  3. Generate embeddings for each chunk using watsonx.ai
  4. Store both the text and embeddings in Milvus

# Create an indexing pipeline to process and store documents

indexing_pipeline = Pipeline()

indexing_pipeline.add_component("converter", TextFileToDocument())

indexing_pipeline.add_component(

    "splitter", DocumentSplitter(split_by="sentence", split_length=2)

)

indexing_pipeline.add_component("embedder", watsonxDocumentEmbedder())

indexing_pipeline.add_component("writer", DocumentWriter(document_store))

# Connect indexing pipeline components

indexing_pipeline.connect("converter", "splitter")

indexing_pipeline.connect("splitter", "embedder")

indexing_pipeline.connect("embedder", "writer")

# Run the indexing pipeline

print("Running indexing pipeline...")

indexing_pipeline.run({"converter": {"sources": [file_path]}})

print(f"Number of documents indexed: {document_store.count_documents()}")

10. Testing Document Retrieval

Before building our complete RAG system, let's test the retrieval capabilities to ensure we can find relevant documents. We'll create a simple retrieval pipeline and test it with a question about the "Warrior" painting mentioned in our document.

# Define a test question

question = 'Where is the painting "Warrior" currently stored?'

# Create and run a simple retrieval pipeline

retrieval_pipeline = Pipeline()

retrieval_pipeline.add_component("embedder", watsonxTextEmbedder())

retrieval_pipeline.add_component(

    "retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3)

)

retrieval_pipeline.connect("embedder.embedding", "retriever.query_embedding")

retrieval_results = retrieval_pipeline.run({"embedder": {"text": question}})

# Display retrieved documents

for i, doc in enumerate(retrieval_results["retriever"]["documents"], 1):

    print(f"Document {i}:")

    print(doc.content)

    print("-" * 50)

A screenshot of a computer

AI-generated content may be incorrect.

11. Building the Complete RAG Pipeline

Now that we've confirmed our retrieval works, let's build the complete RAG pipeline. This pipeline will:

  1. Convert the user query into an embedding
  2. Retrieve relevant context from Milvus
  3. Create a prompt that includes the query and retrieved context
  4. Generate a response using the watsonx.ai language model

# Define a prompt template for RAG

prompt_template = """

Answer the following query based on the provided context. If the context does

not include an answer, reply with 'I don't know'.

Query: {{query}}

Documents:

{% for doc in documents %}

{{ doc.content }}

{% endfor %}

Answer:

"""

# Create the full RAG pipeline

rag_pipeline = Pipeline()

rag_pipeline.add_component("text_embedder", watsonxTextEmbedder())

rag_pipeline.add_component(

    "retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3)

)

rag_pipeline.add_component("prompt_builder", PromptBuilder(template=prompt_template))

rag_pipeline.add_component("generator", watsonxGenerator())

# Connect RAG pipeline components

rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")

rag_pipeline.connect("retriever.documents", "prompt_builder.documents")

rag_pipeline.connect("prompt_builder", "generator")

# Run the RAG pipeline

rag_results = rag_pipeline.run(

    {

        "text_embedder": {"text": question},

        "prompt_builder": {"query": question},

    }

)

# Display the final answer

print("\nRAG Answer:")

print(rag_results["generator"]["replies"][0])

A black background with white text

AI-generated content may be incorrect.

We can see the answer retrieved from our RAG Pipeline.

Conclusion

In this tutorial, we've built a complete Retrieval-Augmented Generation (RAG) system by integrating three powerful technologies:

1. IBM watsonx.ai provided the AI brains of our system with:

   - The Slate embedding model to create semantic representations of text

   - The Granite language model to generate natural language responses

2. Milvus served as our vector database, enabling:

   - Efficient storage of document embeddings

   - Fast similarity search to find relevant context

3. Haystack tied everything together with:

   - Modular pipeline components

   - Flexible document processing

   - Seamless integration of different technologies

This RAG system demonstrates how enterprises can leverage their private data to enhance AI capabilities. By retrieving relevant information and providing it as context to language models, we ensure more accurate, factual, and contextually appropriate responses.

References:-

·      watsonx-ai-python-sdk

·      Choosing a foundation model in watsonx.ai | IBM watsonx

·      Supported foundation models in watsonx.ai | IBM watsonx

·      Retrieval-augmented generation | IBM watsonx

·      Converting text to text embeddings | IBM watsonx

 


#watsonx.data
#Bucket

0 comments
80 views

Permalink