Building a Watchdog for IBM Data Replication on CP4D: Pause, Heal, Resume!
Authors: @SHAILESH JAMLOKI @SHERIN JOSE
IBM Data Replication on CP4D is a powerful tool for keeping data flowing across systems in near real-time. But like any long-running process, replication jobs can occasionally hit runtime issues — especially memory stalls that aren't always visible in the job status but show up in the logs. Instead of relying on manual checks, we can build a lightweight Python-based watchdog that monitors logs, detects these stalls, and automatically pauses and resumes the job to keep it healthy. Let’s walk through how to do it.
🧩 Section 1: Why Replication Jobs Need a Watchdog
-
🔁 Replication jobs are long-running — they can run continuously for hours or days.
-
⚠️ Memory stalls like “Waiting for memory” are common in Oracle XStream and other sources.
-
🕵️♂️ Such issues often appear only in event logs, not in job status — making them easy to miss.
-
👀 Manual monitoring is unreliable — especially during off-hours or holidays.
-
🕒 Delays in detection mean lost time, data lag, or failed SLAs.
-
✅ A watchdog can detect patterns in real-time, act immediately, and keep your pipelines healthy.
-
📬 Optional alerting keeps humans in the loop, without constant babysitting.
🛠️ Section 2: The Watchdog Blueprint: What We’re Building
-
🧠 A smart Python script that watches your replication jobs like a hawk.
-
🔍 Periodically polls job logs via IBM Data Replication API on CP4D.
-
🧾 Looks for a specific warning pattern — e.g., Oracle XStream apply receiver states: [Waiting for memory].
-
⏸️ Automatically pauses the job when the pattern is detected — giving the system breathing room.
-
⏳ Waits for a configurable cool down period (e.g., 60 seconds).
-
▶️ Resumes the job once memory is likely reclaimed.
-
📧 Sends email alerts at each step — so you're always in the know.
-
🔁 Loops every few minutes — giving you continuous protection with zero manual effort.
🔁 Section 3: How It Works: Code Walkthrough
-
✅ Authenticate with IBM Cloud using the IAM API key to get a bearer token.
-
🧾 Fetch the latest job run ID and use it to access the job’s /logs.
-
🔍 Scan logs for a specific message like "Waiting for memory".
-
⏸️ If found, pause the job using the /pause API.
-
⏳ Wait for a short cooldown (e.g., 60 seconds).
-
▶️ Resume the job using the /resume API.
-
📬 Send email alerts at each step — trigger found, paused, resumed, or errors.
-
🔁 Repeat this check every few minutes in a loop.
Sample python Script
import json
import time
import requests
import smtplib
import re
from difflib import ndiff
from email.mime.text import MIMEText
# Load config
with open("config.json") as f:
config = json.load(f)
BASE_URL = config["base_url"]
JOB_ID = config["job_id"]
API_KEY = config["api_key"]
PROJECT_ID = config["project_id"]
POLL_INTERVAL = config["poll_interval_seconds"]
SEARCH_STRING = config["search_string"]
PAUSE_WAIT_SECONDS = config["pause_wait_seconds"]
# Email alert utility
def send_email(subject, body):
settings = config.get("email_settings", {})
if not settings.get("enabled", False):
return
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = settings["from_email"]
msg["To"] = settings["to_email"]
try:
with smtplib.SMTP(settings["smtp_server"], settings["smtp_port"]) as server:
server.starttls()
server.login(settings["from_email"], settings["from_password"])
server.sendmail(settings["from_email"], settings["to_email"], msg.as_string())
print("[Alert] Email sent.")
except Exception as e:
print(f"[Error] Failed to send email: {e}")
# IAM Authentication
def get_access_token():
url = "https://iam.cloud.ibm.com/identity/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
data = {
"apikey": API_KEY,
"grant_type": "urn:ibm:params:oauth:grant-type:apikey"
}
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()
return response.json()["access_token"]
# Job info
def get_latest_run_id(token):
url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs?project_id={PROJECT_ID}"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
runs = response.json().get("results", [])
if runs:
return runs[0]["metadata"]["asset_id"], runs[0]["entity"]["job_run"]["state"]
return None, None
# Job logs
def get_logs(token, run_id):
url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs/{run_id}/logs?project_id={PROJECT_ID}"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.text
# Job controls
def pause_job(token,run_id):
url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs/{run_id}/pause?project_id={PROJECT_ID}"
headers = {"Authorization": f"Bearer {token}"}
response = requests.post(url, headers=headers)
response.raise_for_status()
def resume_job(token,run_id):
url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs/{run_id}/resume?project_id={PROJECT_ID}"
headers = {"Authorization": f"Bearer {token}"}
response = requests.post(url, headers=headers)
response.raise_for_status()
def get_run_status(token, run_id):
url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs/{run_id}?project_id={PROJECT_ID}"
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json()["entity"]["job_run"]["state"]
# Main loop
def monitor_job():
prevlogs = []
while True:
try:
token = get_access_token()
run_id, state = get_latest_run_id(token)
if not run_id:
print("[Info] No active job run found.")
else:
logs = json.loads(get_logs(token, run_id))["results"]
logtoprocess=[event for event in logs if event not in prevlogs]
for event in logtoprocess:
prevlogs=logs[:logs.index(event)+1]
if re.search(SEARCH_STRING,event):
print("[Trigger] Pattern found. Pausing...")
send_email("Trigger Alert", f"Pattern found in replication job {JOB_ID}. Pausing it now.")
pause_job(token,run_id)
# Wait for pause
while True:
time.sleep(5)
if get_run_status(token, run_id) == "Paused":
print("[Status] Job paused. Waiting to resume...")
send_email("Job Paused", f"Replication job {JOB_ID} paused. Will resume in {PAUSE_WAIT_SECONDS} seconds.")
time.sleep(PAUSE_WAIT_SECONDS)
resume_job(token,run_id)
while True:
time.sleep(5)
if get_run_status(token, run_id) == "Running":
print("[Status] Job resumed...")
send_email("Job Resumed", f"Replication job {JOB_ID} resumed.")
break
break
else:
print("[Info] No pattern detected. Will check again later.")
except Exception as e:
error_msg = f"[Error] {e}"
print(error_msg)
send_email("Error in Job Monitor", error_msg)
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
monitor_job()
Sample Configuration File
{
"base_url":"<apiserver url eg: http://localhost:9080/replication>",
"job_id":"e519f345-b200-4421-91bb-0c4133be1fax",
"project_id":"2a787056-03e7-42c0-b59b-bdca42e695f3",
"api_key":"xxxxxxxx",
"poll_interval_seconds":10,
"search_string":"Detected new table ([\\w.]+) in the source database.",
"pause_wait_seconds":60,
"email_settings":
{
"enabled":true,
"from_email":"abc@gmail.com",
"from_password":"xxxxxx",
"to_email":"xyz@gmail.com",
"smtp_server":"smtp.gmail.com",
"smtp_port":587
}
}
✅ Section 4: Making It Work for You: Tips & Extensions
-
🛠️ Run it as a background job using cron, systemd, or a lightweight container (e.g., Docker) on your CP4D edge node or utility VM.
-
🧪 Test in a non-production job first, especially pause/resume behavior, to ensure you're not affecting critical data flows.
-
🔁 Make the search string configurable — you can monitor other log patterns (e.g., failed connections, latency spikes).
-
📈 Add logging to a file or dashboard for long-term visibility of pause/resume frequency.
-
🔄 Extend to monitor multiple jobs by looping through a list of job IDs in the config file.
-
📬 Integrate with Slack or MS Teams for real-time alerts using webhooks instead of (or alongside) email.
-
🔐 Use environment variables or a secrets manager for secure storage of API keys and SMTP passwords.
-
⏱️ Add exponential backoff or retry logic if the pause/resume APIs fail intermittently.
-
🧩 Plug it into your observability stack (like Prometheus/Grafana) to correlate job stalls with system metrics.
-
🚫 Guard against infinite loops — optionally stop the watchdog if a job pauses/resumes too often in a short window.
With just a bit of scripting and API access, you can turn your IBM Data Replication job into a self-healing pipeline. By continuously monitoring logs, detecting memory-related stalls, and responding with automated pause/resume actions, this watchdog helps reduce downtime and manual intervention — while keeping you in the loop with alerts. It’s a small investment that pays off in smoother, more resilient data operations on CP4D.
References:
Data Replication on CP4D available End points documentation