Data Integration

Data Integration

Connect with experts and peers to elevate technical expertise, solve problems and share insights.

 View Only

Building a Watchdog for IBM Data Replication on CP4D: Pause, Heal, Resume!

By SHAILESH JAMLOKI posted 9 hours ago

  

Building a Watchdog for IBM Data Replication on CP4D: Pause, Heal, Resume!

Authors: @SHAILESH JAMLOKI @SHERIN JOSE

IBM Data Replication on CP4D is a powerful tool for keeping data flowing across systems in near real-time. But like any long-running process, replication jobs can occasionally hit runtime issues — especially memory stalls that aren't always visible in the job status but show up in the logs. Instead of relying on manual checks, we can build a lightweight Python-based watchdog that monitors logs, detects these stalls, and automatically pauses and resumes the job to keep it healthy. Let’s walk through how to do it.

🧩 Section 1: Why Replication Jobs Need a Watchdog

  • 🔁 Replication jobs are long-running — they can run continuously for hours or days.

  • ⚠️ Memory stalls like “Waiting for memory” are common in Oracle XStream and other sources.

  • 🕵️‍♂️ Such issues often appear only in event logs, not in job status — making them easy to miss.

  • 👀 Manual monitoring is unreliable — especially during off-hours or holidays.

  • 🕒 Delays in detection mean lost time, data lag, or failed SLAs.

  • ✅ A watchdog can detect patterns in real-time, act immediately, and keep your pipelines healthy.

  • 📬 Optional alerting keeps humans in the loop, without constant babysitting.

🛠️ Section 2: The Watchdog Blueprint: What We’re Building

  • 🧠 A smart Python script that watches your replication jobs like a hawk.

  • 🔍 Periodically polls job logs via IBM Data Replication API on CP4D.

  • 🧾 Looks for a specific warning pattern — e.g., Oracle XStream apply receiver states: [Waiting for memory].

  • ⏸️ Automatically pauses the job when the pattern is detected — giving the system breathing room.

  • ⏳ Waits for a configurable cool down period (e.g., 60 seconds).

  • ▶️ Resumes the job once memory is likely reclaimed.

  • 📧 Sends email alerts at each step — so you're always in the know.

  • 🔁 Loops every few minutes — giving you continuous protection with zero manual effort.

🔁 Section 3: How It Works: Code Walkthrough

  • ✅ Authenticate with IBM Cloud using the IAM API key to get a bearer token.

  • 🧾 Fetch the latest job run ID and use it to access the job’s /logs.

  • 🔍 Scan logs for a specific message like "Waiting for memory".

  • ⏸️ If found, pause the job using the /pause API.

  • ⏳ Wait for a short cooldown (e.g., 60 seconds).

  • ▶️ Resume the job using the /resume API.

  • 📬 Send email alerts at each step — trigger found, paused, resumed, or errors.

  • 🔁 Repeat this check every few minutes in a loop.

Sample python Script

import json
import time
import requests
import smtplib
import re
from difflib import ndiff
from email.mime.text import MIMEText

# Load config
with open("config.json") as f:
    config = json.load(f)

BASE_URL = config["base_url"]
JOB_ID = config["job_id"]
API_KEY = config["api_key"]
PROJECT_ID = config["project_id"]
POLL_INTERVAL = config["poll_interval_seconds"]
SEARCH_STRING = config["search_string"]
PAUSE_WAIT_SECONDS = config["pause_wait_seconds"]

# Email alert utility
def send_email(subject, body):
    settings = config.get("email_settings", {})
    if not settings.get("enabled", False):
        return

    msg = MIMEText(body)
    msg["Subject"] = subject
    msg["From"] = settings["from_email"]
    msg["To"] = settings["to_email"]

    try:
        with smtplib.SMTP(settings["smtp_server"], settings["smtp_port"]) as server:
            server.starttls()
            server.login(settings["from_email"], settings["from_password"])
            server.sendmail(settings["from_email"], settings["to_email"], msg.as_string())
        print("[Alert] Email sent.")
    except Exception as e:
        print(f"[Error] Failed to send email: {e}")

# IAM Authentication
def get_access_token():
    url = "https://iam.cloud.ibm.com/identity/token"
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    data = {
        "apikey": API_KEY,
        "grant_type": "urn:ibm:params:oauth:grant-type:apikey"
    }
    response = requests.post(url, headers=headers, data=data)
    response.raise_for_status()
    return response.json()["access_token"]

# Job info
def get_latest_run_id(token):
    url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs?project_id={PROJECT_ID}"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    runs = response.json().get("results", [])
    if runs:
        return runs[0]["metadata"]["asset_id"], runs[0]["entity"]["job_run"]["state"]
    return None, None

# Job logs
def get_logs(token, run_id):
    url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs/{run_id}/logs?project_id={PROJECT_ID}"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.text

# Job controls
def pause_job(token,run_id):
    url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs/{run_id}/pause?project_id={PROJECT_ID}"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.post(url, headers=headers)
    response.raise_for_status()

def resume_job(token,run_id):
    url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs/{run_id}/resume?project_id={PROJECT_ID}"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.post(url, headers=headers)
    response.raise_for_status()

def get_run_status(token, run_id):
    url = f"{BASE_URL}/v2/jobs/{JOB_ID}/runs/{run_id}?project_id={PROJECT_ID}"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()["entity"]["job_run"]["state"]

# Main loop
def monitor_job():
    prevlogs = []
    while True:
        try:
            token = get_access_token()
            run_id, state = get_latest_run_id(token)
            if not run_id:
                print("[Info] No active job run found.")
            else:
                logs = json.loads(get_logs(token, run_id))["results"]
                logtoprocess=[event for event in logs if event not in prevlogs]
                for event in logtoprocess:
                    prevlogs=logs[:logs.index(event)+1]
                    if re.search(SEARCH_STRING,event):    
                        print("[Trigger] Pattern found. Pausing...")
                        send_email("Trigger Alert", f"Pattern found in replication job {JOB_ID}. Pausing it now.")
                        pause_job(token,run_id)

                        # Wait for pause
                        while True:
                            time.sleep(5)
                            if get_run_status(token, run_id) == "Paused":
                                print("[Status] Job paused. Waiting to resume...")
                                send_email("Job Paused", f"Replication job {JOB_ID} paused. Will resume in {PAUSE_WAIT_SECONDS} seconds.")
                                time.sleep(PAUSE_WAIT_SECONDS)
                                resume_job(token,run_id)
                                while True:
                                    time.sleep(5)
                                    if get_run_status(token, run_id) == "Running":
                                        print("[Status] Job resumed...")
                                        send_email("Job Resumed", f"Replication job {JOB_ID} resumed.")
                                        break
                                break
                    else:
                        print("[Info] No pattern detected. Will check again later.")

        except Exception as e:
            error_msg = f"[Error] {e}"
            print(error_msg)
            send_email("Error in Job Monitor", error_msg)

        time.sleep(POLL_INTERVAL)

if __name__ == "__main__":
    monitor_job()

Sample Configuration File

{
    "base_url":"<apiserver url eg:  http://localhost:9080/replication>",
    "job_id":"e519f345-b200-4421-91bb-0c4133be1fax",
    "project_id":"2a787056-03e7-42c0-b59b-bdca42e695f3",
    "api_key":"xxxxxxxx",
    "poll_interval_seconds":10,
    "search_string":"Detected new table ([\\w.]+) in the source database.",
    "pause_wait_seconds":60,
    "email_settings":
    {
        "enabled":true,
        "from_email":"abc@gmail.com",
        "from_password":"xxxxxx",
        "to_email":"xyz@gmail.com",
        "smtp_server":"smtp.gmail.com",
        "smtp_port":587
    }
}

✅ Section 4: Making It Work for You: Tips & Extensions

  • 🛠️ Run it as a background job using cron, systemd, or a lightweight container (e.g., Docker) on your CP4D edge node or utility VM.

  • 🧪 Test in a non-production job first, especially pause/resume behavior, to ensure you're not affecting critical data flows.

  • 🔁 Make the search string configurable — you can monitor other log patterns (e.g., failed connections, latency spikes).

  • 📈 Add logging to a file or dashboard for long-term visibility of pause/resume frequency.

  • 🔄 Extend to monitor multiple jobs by looping through a list of job IDs in the config file.

  • 📬 Integrate with Slack or MS Teams for real-time alerts using webhooks instead of (or alongside) email.

  • 🔐 Use environment variables or a secrets manager for secure storage of API keys and SMTP passwords.

  • ⏱️ Add exponential backoff or retry logic if the pause/resume APIs fail intermittently.

  • 🧩 Plug it into your observability stack (like Prometheus/Grafana) to correlate job stalls with system metrics.

  • 🚫 Guard against infinite loops — optionally stop the watchdog if a job pauses/resumes too often in a short window.

With just a bit of scripting and API access, you can turn your IBM Data Replication job into a self-healing pipeline. By continuously monitoring logs, detecting memory-related stalls, and responding with automated pause/resume actions, this watchdog helps reduce downtime and manual intervention — while keeping you in the loop with alerts. It’s a small investment that pays off in smoother, more resilient data operations on CP4D.

References: 

Data Replication on CP4D available End points documentation

0 comments
8 views

Permalink