Build RAG with watsonx.data Milvus and Haystack
Today, building intelligent applications is all about unlocking the value hidden in your organization’s data. A popular and effective approach is Retrieval-Augmented Generation (RAG), which combines the strengths of a search engine with the creativity of a generative model to deliver more accurate, context-aware responses.
In this blog, we’ll show you how to set up a RAG system using Haystack and Milvus. This combination allows you to build fast, scalable applications that generate insights grounded in real, reliable data.
What is Haystack?
Haystack is an open-source framework for building production-ready applications with large language models (LLMs). It provides a flexible and composable architecture that allows you to build complex language processing pipelines by connecting different components.
Key features of Haystack:
- Component-based architecture: Build custom pipelines by connecting modular components
- Framework agnostic: Works with various LLM providers and vector databases
- Production-ready: Designed for scalability and real-world application development
- Full RAG support: Provides all the tools needed for retrieval-augmented generation

Step-by-Step Implementation
1. Create Milvus Instance on watsonx.data
You can refer to this Getting Started with IBM watsonx.data Milvus .
2. Set up a Watson Machine Learning service instance and API key
- Create a Watson Machine Learning service instance (you can choose the Lite plan, which is a free instance).
- Generate an API Key in WML. Save this API key for use in this tutorial.
- Associate the WML service to the project you created in watsonx.ai.
3. Setting Up the Environment
First, we'll set up our Python environment and install the necessary packages:
# Install required packages
!pip install --upgrade pymilvus milvus-haystack haystack-ai ibm-watsonx-ai
4. Data Preparation
For this tutorial, we'll use a public domain text about Leonardo Da Vinci from Project Gutenberg:
# Download sample data for demonstration
import os
import urllib.request
url = "https://www.gutenberg.org/cache/epub/7785/pg7785.txt"
file_path = "./davinci.txt"
if not os.path.exists(file_path):
urllib.request.urlretrieve(url, file_path)
5. IBM watsonx.ai Configuration
Next, we'll configure our connection to IBM watsonx.ai:
# Set up IBM watsonx API credentials
watsonx_credentials = {
"url": "<url>", # Replace with your watsonx URL
"apikey": "<apikey>", # Replace with your watsonx API Key
}
project_id = "<project_id>" # Replace with your project ID
6. Initializing watsonx.ai Components
Now we'll initialize the embedding and language models from IBM watsonx.ai:
# Import watsonx libraries
from ibm_watsonx_ai import APIClient
from ibm_watsonx_ai.foundation_models.embeddings import Embeddings
from ibm_watsonx_ai.foundation_models import ModelInference
from ibm_watsonx_ai.metanames import EmbedTextParamsMetaNames as EmbedParams
# Initialize the IBM watsonx client
client = APIClient(watsonx_credentials)
# Configure embedding model
embedding_model_id = "ibm/slate-30m-english-rtrvr"
embedding_params = {
EmbedParams.TRUNCATE_INPUT_TOKENS: 128,
EmbedParams.RETURN_OPTIONS: {'input_text': True},
}
# Initialize the embedding model
watsonx_embeddings = Embeddings(
model_id=embedding_model_id,
credentials=watsonx_credentials,
params=embedding_params,
project_id=project_id,
space_id=None,
verify=False
)
# Configure LLM generation model
generation_model_id = "ibm/granite-3-3-8b-instruct"
generation_params = {
"max_new_tokens": 1024,
"temperature": 0,
"top_p": 0.9,
"repetition_penalty": 1.05
}
# Initialize the LLM model
watsonx_llm = ModelInference(
model_id=generation_model_id,
credentials=watsonx_credentials,
params=generation_params,
project_id=project_id
)
7. Integrating watsonx.ai with Haystack
Haystack provides a modular approach to building NLP pipelines through components that can be connected in various ways. To integrate IBM watsonx.ai models with Haystack, we need to create custom components that wrap around watsonx.ai's API calls.
To use watsonx.ai models within Haystack, we need to create wrapper functions:
# Create simple wrapper functions to integrate watsonx with Haystack
def embed_documents(texts):
"""Wrapper function to embed documents using watsonx"""
return watsonx_embeddings.embed_documents(texts=texts)
def embed_query(text):
"""Wrapper function to embed a single query using watsonx"""
return watsonx_embeddings.embed_query(text=text)
def generate_text(prompt):
"""Wrapper function to generate text using watsonx LLM"""
response = watsonx_llm.generate(prompt=prompt)
return response['results'][0]['generated_text']
Now we'll create custom Haystack components that use these wrapper functions. These components will slot into our pipelines, enabling us to use watsonx.ai within the Haystack framework:
# Import Haystack component framework
from haystack import component
from haystack.dataclasses import Document
# Haystack's @component decorator allows you to plug in your logic into pipelines.
@component
class watsonxDocumentEmbedder:
@component.output_types(documents=List[Document])
def run(self, documents: List[Document]):
texts = [doc.content for doc in documents]
embeddings = embed_documents(texts)
for doc, embedding in zip(documents, embeddings):
doc.embedding = embedding
return {"documents": documents}
@component
class watsonxTextEmbedder:
@component.output_types(embedding=List[float], text=str)
def run(self, text: str):
embedding = embed_query(text)
return {"embedding": embedding, "text": text}
@component
class watsonxGenerator:
@component.output_types(replies=List[str])
def run(self, prompt: str):
generated_text = generate_text(prompt)
return {"replies": [generated_text]}
8. Setting Up Milvus
Now let's set up our Milvus vector database:
# Import Haystack and Milvus components
from haystack import Pipeline
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.components.builders import PromptBuilder
from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever
# Initialize Milvus document store
document_store = MilvusDocumentStore(
connection_args={
"uri": "https://<hostname>:<port>", # Replace with your watsonx.data Milvus URI or IP
"user":"<user>",
"password":"<password>",
"secure": True, # Set True if TLS is enabled
"server_pem_path": "/root/path of ca.cert"
},
drop_old=True,
)
9. Building the Indexing Pipeline
Now that we have our components set up, we'll create an indexing pipeline to process our documents and store them in the Milvus vector database.
This pipeline will:
- Load the text file
- Split it into smaller chunks for better retrieval
- Generate embeddings for each chunk using watsonx.ai
- Store both the text and embeddings in Milvus
# Create an indexing pipeline to process and store documents
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", TextFileToDocument())
indexing_pipeline.add_component(
"splitter", DocumentSplitter(split_by="sentence", split_length=2)
)
indexing_pipeline.add_component("embedder", watsonxDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
# Connect indexing pipeline components
indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")
# Run the indexing pipeline
print("Running indexing pipeline...")
indexing_pipeline.run({"converter": {"sources": [file_path]}})
print(f"Number of documents indexed: {document_store.count_documents()}")
10. Testing Document Retrieval
Before building our complete RAG system, let's test the retrieval capabilities to ensure we can find relevant documents. We'll create a simple retrieval pipeline and test it with a question about the "Warrior" painting mentioned in our document.
# Define a test question
question = 'Where is the painting "Warrior" currently stored?'
# Create and run a simple retrieval pipeline
retrieval_pipeline = Pipeline()
retrieval_pipeline.add_component("embedder", watsonxTextEmbedder())
retrieval_pipeline.add_component(
"retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3)
)
retrieval_pipeline.connect("embedder.embedding", "retriever.query_embedding")
retrieval_results = retrieval_pipeline.run({"embedder": {"text": question}})
# Display retrieved documents
for i, doc in enumerate(retrieval_results["retriever"]["documents"], 1):
print(f"Document {i}:")
print(doc.content)
print("-" * 50)

11. Building the Complete RAG Pipeline
Now that we've confirmed our retrieval works, let's build the complete RAG pipeline. This pipeline will:
- Convert the user query into an embedding
- Retrieve relevant context from Milvus
- Create a prompt that includes the query and retrieved context
- Generate a response using the watsonx.ai language model
# Define a prompt template for RAG
prompt_template = """
Answer the following query based on the provided context. If the context does
not include an answer, reply with 'I don't know'.
Query: {{query}}
Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Answer:
"""
# Create the full RAG pipeline
rag_pipeline = Pipeline()
rag_pipeline.add_component("text_embedder", watsonxTextEmbedder())
rag_pipeline.add_component(
"retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3)
)
rag_pipeline.add_component("prompt_builder", PromptBuilder(template=prompt_template))
rag_pipeline.add_component("generator", watsonxGenerator())
# Connect RAG pipeline components
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "generator")
# Run the RAG pipeline
rag_results = rag_pipeline.run(
{
"text_embedder": {"text": question},
"prompt_builder": {"query": question},
}
)
# Display the final answer
print("\nRAG Answer:")
print(rag_results["generator"]["replies"][0])

We can see the answer retrieved from our RAG Pipeline.
Conclusion
In this tutorial, we've built a complete Retrieval-Augmented Generation (RAG) system by integrating three powerful technologies:
1. IBM watsonx.ai provided the AI brains of our system with:
- The Slate embedding model to create semantic representations of text
- The Granite language model to generate natural language responses
2. Milvus served as our vector database, enabling:
- Efficient storage of document embeddings
- Fast similarity search to find relevant context
3. Haystack tied everything together with:
- Modular pipeline components
- Flexible document processing
- Seamless integration of different technologies
This RAG system demonstrates how enterprises can leverage their private data to enhance AI capabilities. By retrieving relevant information and providing it as context to language models, we ensure more accurate, factual, and contextually appropriate responses.
References:-
· watsonx-ai-python-sdk
· Choosing a foundation model in watsonx.ai | IBM watsonx
· Supported foundation models in watsonx.ai | IBM watsonx
· Retrieval-augmented generation | IBM watsonx
· Converting text to text embeddings | IBM watsonx
#watsonx.data
#Bucket