Integration Group Japan

IBM Integration Group Japan

 View Only

IBM Data ReplicationのChange Data Captureを使って、Db2のデータとスキーマ情報をIBM Event Streamsに連携し、Kafka Connect Sink JDBC Connectorを使って、SQL Serverにリアルタイムレプリケーションを行う。

By HIDEO SAITOH posted Tue October 21, 2025 09:11 PM

  

IBM Data ReplicationのChange Data Captureを使って、Db2のデータとスキーマ情報をIBM Event Streamsに連携し、Kafka Connect Sink JDBC Connectorを使って、SQL Serverにリアルタイムレプリケーションを行う。

概要

このブログでは、IBM Data Replicationの強力な**Change Data Capture (CDC)**機能を利用し、Db2データベースのデータとスキーマの変更をリアルタイムで捕捉・連携する手法を解説します。

連携先として、エンタープライズレベルのKafkaプラットフォームであるIBM Event Streams(Apache Kafkaをベース)を使用します。Db2で発生したトランザクションレベルの変更イベントは、Event Streamsのトピックに順序性を保って連携されます。

さらに、これらのイベントを最終的なターゲットであるSQL Serverデータベースへ書き込むため、Kafka ConnectフレームワークのJDBC Sink Connectorを使用します。これにより、Db2 → Event Streams → SQL Serverというエンドツーエンドのリアルタイム・レプリケーション環境を構築します。

本記事を通じて、異種データベース間(Db2 → SQL Server)で高速かつ低遅延なデータ同期を実現するための、モダンなイベント駆動型データ連携基盤の構築手順とノウハウを習得できます。

前提

Db2 → IBM Event Streamsへの連携のブログはこちらを前提としています。

Kafka Connectは、WindowsのOS上でStandaloneで稼働させます。

設定手順概要

設定手順は、以下のようなStepで行っていきます。

  1. Kafka Connectの入手
  2. JDBC Sink Connectorの入手(Aiven-Open/jdbc-connector-for-apache-kafkaを利用: jdbc-connector-for-apache-kafka-6.10.0
  3. avro Converterの入手(apicurio-registry-distro-connect-converter-3.0.15を利用)
  4. Kafka Connectのconnect-standalone.propertiesの設定
  5. JDBC Sink Connectorのjdbc-sink.propertiesの設定
  6. Kafka Connectorの起動
  7. CDCを使って、レプリケーションを実行し、Db2のデータがSQL Serverに反映されることを確認

Kafka Connectの入手

  • Apache Kafkaのサイトより、kafka_2.13-4.1.0.tgz をダウンロードします。
  • ダウンロードした、ファイルを展開します。今回は、Cドライブ直下に展開しました。(c:\kafka_2.13-4.1.0)

JDBC Sink Connector の入手

  • IBM Event AutomationのサイトよりConnectorのカタログにアクセスします。
    https://ibm.github.io/event-automation/connectors/

  • Sink Connectorで、JDBCのコネクターを検索します。

  • Get Connectorをクリックして、Connectorをダウンロードできるサイトに移動します。


  • 表示されているリンクをクリックします。


  • jdbc-connector-for-apache-kafka-6.10.0.zipをダウンロードします。
  • ダウンロードしたものを、c:\kafka_2.13-4.1.0\pluginに展開します。
    c:\kafka_2.13-4.1.0には、pluginのディレクトリは無いので、mkdirコマンドで、作成してください。


avro Converterの入手

  • IBM Event AutomationのサイトよりConnectorのサイトにアクセスします。
    https://ibm.github.io/event-automation/connectors/


  • Convertersのタブをクリックします。
    Apicurio Avroのコンバーターが表示されます。


  • Apicurio Avroコンバーターの所をクリックします。
    Get Converterの所をクリックして、Converterの入手先に移動します。


  • 3.0.1をダウンロードするよう案内されますが、3.0.15が最新なので、mavenのサイトから、直接3.0.15をダウンロードすることにします。

  • 以下のサイトにアクセスします。
    https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter


  • jarの所をクリックして、ダウンロードします。


  • ダウンロードしたものは、jdbcのコネクター同様、pluginのディレクトリに展開します。


Kafka Connectのconnect-standalone.propertiesの設定

c:\kafka_2.13-4.1.0\bin\windows\config\connect-standalone.propertiesに以下の値を設定します。

パラメータ
設定値
意味・役割
bootstrap.servers
es-development-kafka-bootstrap-openshift-operators.apps.itz-g5ry1p.infra01-lb.dal14.techzone.ibm.com:443
Kafkaクラスタの接続先リスト(ブートストラップサーバー)。Kafka Connectがイベントストリーム(IBM Event Streams)と通信を開始するために必要なホスト名とポート番号を指定します。
security.protocol
SASL_SSL
Kafkaクラスタとの通信に使用するセキュリティプロトコル。ここでは、認証にSASL、通信の暗号化にSSL/TLSを使用することを指定しています。
sasl.mechanism
SCRAM-SHA-512
SASL認証に使用するメカニズム。ここではSCRAM-SHA-512(Salted Challenge Response Authentication Mechanism)を指定しており、強力なパスワードベースの認証を実現します。
sasl.jaas.config
org.apache.kafka.common.security.scram.ScramLoginModule required username="es-user" password="LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD";
SASL認証のためのJAAS (Java Authentication and Authorization Service) 設定。ここでは、ユーザー名とパスワードを使ってSCRAM-SHA-512でログインするために必要な設定を記述しています。
ssl.protocol
TLSv1.2
SSL/TLS通信に使用するプロトコルバージョンを指定。セキュリティを確保するためTLSv1.2を使用しています。
ssl.truststore.location
c:/Users/Administrator/Downloads/es-cert.p12
サーバー証明書(Kafka Brokerの証明書)を信頼するために使用するトラストストアファイルのパス。SSL/TLS通信時にサーバーの信頼性を検証するために使用されます。
ssl.truststore.password
Rkmej7ekUrI1
トラストストアファイル (ssl.truststore.locationで指定) を開くためのパスワード。
ssl.truststore.type
PKCS12
トラストストアファイルの形式(タイプ)。ここではPKCS12形式であることを指定しています。
key.converter
org.apache.kafka.connect.storage.StringConverter
Kafkaトピックに格納されているメッセージのキーを、Connectの内部形式へ、またはその逆に変換するためのコンバータクラス。ここでは、キーをシンプルな文字列 (String) として処理します。
value.converter
io.apicurio.registry.utils.converter.AvroConverter
Kafkaトピックに格納されているメッセージの値を、Connectの内部形式へ、またはその逆に変換するためのコンバータクラス。ここでは、Avro形式を使用し、スキーマの管理にApicurio Schema Registryを使用することを示しています。
value.converter.apicurio.registry.auth.username
es-user
Avroコンバータが使用するApicurio Schema Registryに接続するためのユーザー名。
value.converter.apicurio.registry.auth.password
LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD
Avroコンバータが使用するApicurio Schema Registryに接続するためのパスワード。
value.converter.apicurio.registry.url
https://es-development-ibm-es-ac-reg-external-openshift-operators.apps.itz-g5ry1p.infra01-lb.dal14.techzone.ibm.com
Apicurio Schema RegistryのURL。Avroデータが使用するスキーマ情報の登録や取得に使用されます。
key.converter.schemas.enable
TRUE
キーのコンバータがスキーマ情報を含めるかどうかを指定。trueに設定されているため、スキーマ情報を含めます。
value.converter.schemas.enable
TRUE
値のコンバータがスキーマ情報を含めるかどうかを指定。trueに設定されているため、スキーマ情報を含めます。
offset.storage.file.filename
C:/Users/Administrator/Downloads/connect.offsets
Kafka Connectがオフセット情報(どこまでデータを処理したか)を保存するファイルのパス。Connectを再起動しても重複処理を避けるために重要です。
offset.flush.interval.ms
10000
オフセット情報をストレージに書き込む(フラッシュする)間隔 (ミリ秒)。10000ms = 10秒ごとに書き込まれます。
plugin.discovery
only_scan
コネクタプラグインの発見方法に関する設定。only_scanはプラグインパスのみをスキャンすることを意味します。
plugin.path
c:/kafka_2.13-4.1.0/plugin
Kafka Connectのプラグイン(JDBC Sink Connectorなど)が格納されているファイルシステムパス。このディレクトリ内のJARファイルなどがコネクタとしてロードされます。

JDBC Sink Connectorのjdbc-sink.propertiesの設定

c:\kafka_2.13-4.1.0\bin\windows\config\jdbc-sink.propertiesに以下の値を設定します。

JDBC Sink Connector設定

パラメータ 設定値 意味・役割
name
jdbc-sink-connector
このコネクタインスタンスに付ける一意の名前。
connector.class
io.aiven.connect.jdbc.JdbcSinkConnector
使用するコネクタのJavaクラス。Aiven提供のJDBC Sink Connectorを指定しています。
tasks.max
1
コネクタの並列実行タスクの最大数。ここでは1つのタスクで実行されます。
driver.class
com.microsoft.sqlserver.jdbc.SQLServerDriver
接続先のSQL Serverに使用するJDBCドライバクラス。
topics
es.db2toes.sourcedb.db2admin.employee
コネクタがデータを受け取るKafkaトピック名。このトピックのデータがSQL Serverに書き込まれます。

SQL Server 接続設定

パラメータ 設定値 意味・役割
connection.url
jdbc:sqlserver://localhost:1433;databaseName=AdventureWorks2022;
integratedSecurity=false;encrypt=false;trustServerCertificate=true
SQL Serverへの接続に使用するJDBC接続URL。ターゲットデータベース、ホスト名、ポート、接続オプションを指定します。(一行で記述します)
connection.user
sa
SQL Serverに接続するためのユーザー名。
connection.password
password
SQL Serverに接続するためのパスワード。
connection.ds.pool.size
5
データベース接続プールで保持する最大接続数。コネクタタスクが利用できる接続数を制限します。

データ書き込み・テーブル操作設定

パラメータ 設定値 意味・役割
insert.mode
upsert
データの書き込みモード。upsertは、ターゲットテーブルに主キーを持つ行が存在すれば更新し、存在しなければ挿入することを意味します (Change Data Captureの一般的な操作)。
table.name.format
db2_employee
Kafkaのトピック名からSQL Serverのテーブル名に変換するフォーマット。ここでは、トピックに関わらず固定でdb2_employeeというテーブル名を使用。
pk.mode
record_key
主キー (Primary Key) の定義方法。ここではKafkaメッセージのキーを主キーとして使用することを指定しています。
pk.fields
EMPNO
pk.mode=record_keyで設定された主キーとして使用するフィールド名。
auto.create
TRUE
ターゲットのSQL Serverにテーブルが存在しない場合、自動的にテーブルを作成することを許可します。
insert.mode.databaselevel
TRUE
insert.modeをデータベースレベルで制御することを示唆しています。

コンバータとスキーマレジストリ設定

パラメータ 設定値 意味・役割
key.converter
io.apicurio.registry.utils.converter.AvroConverter
メッセージのキーを処理するためのAvroコンバータ。
value.converter
io.apicurio.registry.utils.converter.AvroConverter
メッセージの値を処理するためのAvroコンバータ。
key.converter.apicurio.registry.url
https://es-development-ibm-es-ac-reg-external-openshift-operators.apps.itz-g5ry1p.infra01-lb.dal14.techzone.ibm.com
キーのスキーマを取得するためのSchema RegistryのURL。
key.converter.apicurio.registry.auth.username
es-user
キーのSchema Registryに接続するためのユーザー名。
key.converter.apicurio.registry.auth.password
LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD
キーのSchema Registryに接続するためのパスワード。
value.converter.apicurio.registry.url
https://es-development-ibm-es-ac-reg-external-openshift-operators.apps.itz-g5ry1p.infra01-lb.dal14.techzone.ibm.com
値のスキーマを取得するためのSchema RegistryのURL。
value.converter.apicurio.registry.auth.username
es-user
値のSchema Registryに接続するためのユーザー名。
value.converter.apicurio.registry.auth.password
LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD
値のSchema Registryに接続するためのパスワード。

Consumer/TLS/SASL 設定 (Kafka Consumer Security)

パラメータ 設定値 意味・役割
consumer.ssl.protocol
TLSv1.2
ConsumerがKafkaと通信するためのTLSバージョン。
consumer.ssl.truststore.location
c:/Users/Administrator/Downloads/es-cert.p12
ConsumerがKafka Brokerの証明書を信頼するためのトラストストアのパス。
consumer.ssl.truststore.password
Rkmej7ekUrI1
トラストストアファイルを開くためのパスワード。
consumer.ssl.truststore.type
PKCS12
トラストストアファイルの形式。
consumer.override.security.protocol
SASL_SSL
Consumerの通信プロトコルとしてSASL認証とSSL暗号化を組み合わせたものを使用します。
consumer.override.sasl.mechanism
SCRAM-SHA-512
ConsumerのSASL認証メカニズム。
consumer.override.sasl.jaas.config
org.apache.kafka.common.security.scram.ScramLoginModule required username="es-user" password="LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD";
ConsumerがSCRAM認証でログインするための認証情報(ユーザー名とパスワード)。

Kafka Connectorの起動

  • connect-standalone.batを使って、起動します。
    cd c:\kafka_2.13-4.1.0\bin\windows\bin
    connect-standalone.bat .\config\connect-standalone.properties .\config\jdbc-sink.properties

CDCを使って、レプリケーションを実行し、Db2のデータがSQL Serverに反映されることを確認

  • CDCのconsoleより、リフレッシュで全件をKafkaにデータを送信します。


  • Event Streamsのトピックを確認します。
    トピックには、データが反映されています。


  • Kafka Connectのログをみると、データベースに処理していることが確認できます。


  • SQL Server で、テーブルができているか確認します。
    dbo.db2_employeeというテーブルが作成され、データが入っていることが確認できます。


    Db2のデータは以下の通りです。


  • CDCのコンソールから、Mirrorの設定を行います。
    常にデータベースの変更をキャプチャするためにMirrorにします。


    下記のようにMirroringになっていることが確認できます。
  • Db2のデータを一部Updateし、SQL Serverに反映されるかどうか確認します。
    empnoが000010の人のbonusを2000にUpdateします。Updateする前は、bonusは1000です。



  • SQL Serverのデータを確認します。
    2000に変更されていました。


  • Event Streamsのトピックを確認すると、更新データの258番目のオフセットにデータが来ています。

まとめ

IBM Data Replication(Change Data Capture)とモダンなイベント駆動型データ連携基盤であるIBM Event Streamsに加えて、Kafka ConnectのJDBC Sink Connectorを使うことで、異種データベース間(Db2 → SQL Server)で高速かつ低遅延なデータ同期できることがわかりました。

#IBMCloudPakforIntegration(ICP4I) #Kafka #DataReplication #Db2f

0 comments
20 views

Permalink