IBM TechXchange Integration Group Japan

IBM TechXchange Integration Group Japan

 View Only

IBM Data ReplicationのChange Data Captureを使って、Db2のデータとスキーマ情報をIBM Event Streamsに連携し、Kafka Connect Sink JDBC Connectorを使って、SQL Serverにリアルタイムレプリケーションを行う。

By HIDEO SAITOH posted 10 hours ago

  

IBM Data ReplicationのChange Data Captureを使って、Db2のデータとスキーマ情報をIBM Event Streamsに連携し、Kafka Connect Sink JDBC Connectorを使って、SQL Serverにリアルタイムレプリケーションを行う。

概要

このブログでは、IBM Data Replicationの強力な**Change Data Capture (CDC)**機能を利用し、Db2データベースのデータとスキーマの変更をリアルタイムで捕捉・連携する手法を解説します。

連携先として、エンタープライズレベルのKafkaプラットフォームであるIBM Event Streams(Apache Kafkaをベース)を使用します。Db2で発生したトランザクションレベルの変更イベントは、Event Streamsのトピックに順序性を保って連携されます。

さらに、これらのイベントを最終的なターゲットであるSQL Serverデータベースへ書き込むため、Kafka ConnectフレームワークのJDBC Sink Connectorを使用します。これにより、Db2 → Event Streams → SQL Serverというエンドツーエンドのリアルタイム・レプリケーション環境を構築します。

本記事を通じて、異種データベース間(Db2 → SQL Server)で高速かつ低遅延なデータ同期を実現するための、モダンなイベント駆動型データ連携基盤の構築手順とノウハウを習得できます。

前提

Db2 → IBM Event Streamsへの連携のブログはこちらを前提としています。

Kafka Connectは、WindowsのOS上でStandaloneで稼働させます。

設定手順概要

設定手順は、以下のようなStepで行っていきます。

  1. Kafka Connectの入手
  2. JDBC Sink Connectorの入手(Aiven-Open/jdbc-connector-for-apache-kafkaを利用: jdbc-connector-for-apache-kafka-6.10.0
  3. avro Converterの入手(apicurio-registry-distro-connect-converter-3.0.15を利用)
  4. Kafka Connectのconnect-standalone.propertiesの設定
  5. JDBC Sink Connectorのjdbc-sink.propertiesの設定
  6. Kafka Connectorの起動
  7. CDCを使って、レプリケーションを実行し、Db2のデータがSQL Serverに反映されることを確認

Kafka Connectの入手

  • Apache Kafkaのサイトより、kafka_2.13-4.1.0.tgz をダウンロードします。
  • ダウンロードした、ファイルを展開します。今回は、Cドライブ直下に展開しました。(c:\kafka_2.13-4.1.0)

JDBC Sink Connector の入手

  • IBM Event AutomationのサイトよりConnectorのカタログにアクセスします。
    https://ibm.github.io/event-automation/connectors/

  • Sink Connectorで、JDBCのコネクターを検索します。

  • Get Connectorをクリックして、Connectorをダウンロードできるサイトに移動します。


  • 表示されているリンクをクリックします。


  • jdbc-connector-for-apache-kafka-6.10.0.zipをダウンロードします。
  • ダウンロードしたものを、c:\kafka_2.13-4.1.0\pluginに展開します。
    c:\kafka_2.13-4.1.0には、pluginのディレクトリは無いので、mkdirコマンドで、作成してください。


avro Converterの入手

  • IBM Event AutomationのサイトよりConnectorのサイトにアクセスします。
    https://ibm.github.io/event-automation/connectors/


  • Convertersのタブをクリックします。
    Apicurio Avroのコンバーターが表示されます。


  • Apicurio Avroコンバーターの所をクリックします。
    Get Converterの所をクリックして、Converterの入手先に移動します。


  • 3.0.1をダウンロードするよう案内されますが、3.0.15が最新なので、mavenのサイトから、直接3.0.15をダウンロードすることにします。

  • 以下のサイトにアクセスします。
    https://mvnrepository.com/artifact/io.apicurio/apicurio-registry-utils-converter


  • jarの所をクリックして、ダウンロードします。


  • ダウンロードしたものは、jdbcのコネクター同様、pluginのディレクトリに展開します。


Kafka Connectのconnect-standalone.propertiesの設定

c:\kafka_2.13-4.1.0\bin\windows\config\connect-standalone.propertiesに以下の値を設定します。

パラメータ
設定値
意味・役割
bootstrap.servers
es-development-kafka-bootstrap-openshift-operators.apps.itz-g5ry1p.infra01-lb.dal14.techzone.ibm.com:443
Kafkaクラスタの接続先リスト(ブートストラップサーバー)。Kafka Connectがイベントストリーム(IBM Event Streams)と通信を開始するために必要なホスト名とポート番号を指定します。
security.protocol
SASL_SSL
Kafkaクラスタとの通信に使用するセキュリティプロトコル。ここでは、認証にSASL、通信の暗号化にSSL/TLSを使用することを指定しています。
sasl.mechanism
SCRAM-SHA-512
SASL認証に使用するメカニズム。ここではSCRAM-SHA-512(Salted Challenge Response Authentication Mechanism)を指定しており、強力なパスワードベースの認証を実現します。
sasl.jaas.config
org.apache.kafka.common.security.scram.ScramLoginModule required username="es-user" password="LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD";
SASL認証のためのJAAS (Java Authentication and Authorization Service) 設定。ここでは、ユーザー名とパスワードを使ってSCRAM-SHA-512でログインするために必要な設定を記述しています。
ssl.protocol
TLSv1.2
SSL/TLS通信に使用するプロトコルバージョンを指定。セキュリティを確保するためTLSv1.2を使用しています。
ssl.truststore.location
c:/Users/Administrator/Downloads/es-cert.p12
サーバー証明書(Kafka Brokerの証明書)を信頼するために使用するトラストストアファイルのパス。SSL/TLS通信時にサーバーの信頼性を検証するために使用されます。
ssl.truststore.password
Rkmej7ekUrI1
トラストストアファイル (ssl.truststore.locationで指定) を開くためのパスワード。
ssl.truststore.type
PKCS12
トラストストアファイルの形式(タイプ)。ここではPKCS12形式であることを指定しています。
key.converter
org.apache.kafka.connect.storage.StringConverter
Kafkaトピックに格納されているメッセージのキーを、Connectの内部形式へ、またはその逆に変換するためのコンバータクラス。ここでは、キーをシンプルな文字列 (String) として処理します。
value.converter
io.apicurio.registry.utils.converter.AvroConverter
Kafkaトピックに格納されているメッセージの値を、Connectの内部形式へ、またはその逆に変換するためのコンバータクラス。ここでは、Avro形式を使用し、スキーマの管理にApicurio Schema Registryを使用することを示しています。
value.converter.apicurio.registry.auth.username
es-user
Avroコンバータが使用するApicurio Schema Registryに接続するためのユーザー名。
value.converter.apicurio.registry.auth.password
LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD
Avroコンバータが使用するApicurio Schema Registryに接続するためのパスワード。
value.converter.apicurio.registry.url
https://es-development-ibm-es-ac-reg-external-openshift-operators.apps.itz-g5ry1p.infra01-lb.dal14.techzone.ibm.com
Apicurio Schema RegistryのURL。Avroデータが使用するスキーマ情報の登録や取得に使用されます。
key.converter.schemas.enable
TRUE
キーのコンバータがスキーマ情報を含めるかどうかを指定。trueに設定されているため、スキーマ情報を含めます。
value.converter.schemas.enable
TRUE
値のコンバータがスキーマ情報を含めるかどうかを指定。trueに設定されているため、スキーマ情報を含めます。
offset.storage.file.filename
C:/Users/Administrator/Downloads/connect.offsets
Kafka Connectがオフセット情報(どこまでデータを処理したか)を保存するファイルのパス。Connectを再起動しても重複処理を避けるために重要です。
offset.flush.interval.ms
10000
オフセット情報をストレージに書き込む(フラッシュする)間隔 (ミリ秒)。10000ms = 10秒ごとに書き込まれます。
plugin.discovery
only_scan
コネクタプラグインの発見方法に関する設定。only_scanはプラグインパスのみをスキャンすることを意味します。
plugin.path
c:/kafka_2.13-4.1.0/plugin
Kafka Connectのプラグイン(JDBC Sink Connectorなど)が格納されているファイルシステムパス。このディレクトリ内のJARファイルなどがコネクタとしてロードされます。

JDBC Sink Connectorのjdbc-sink.propertiesの設定

c:\kafka_2.13-4.1.0\bin\windows\config\jdbc-sink.propertiesに以下の値を設定します。

JDBC Sink Connector設定

パラメータ 設定値 意味・役割
name
jdbc-sink-connector
このコネクタインスタンスに付ける一意の名前。
connector.class
io.aiven.connect.jdbc.JdbcSinkConnector
使用するコネクタのJavaクラス。Aiven提供のJDBC Sink Connectorを指定しています。
tasks.max
1
コネクタの並列実行タスクの最大数。ここでは1つのタスクで実行されます。
driver.class
com.microsoft.sqlserver.jdbc.SQLServerDriver
接続先のSQL Serverに使用するJDBCドライバクラス。
topics
es.db2toes.sourcedb.db2admin.employee
コネクタがデータを受け取るKafkaトピック名。このトピックのデータがSQL Serverに書き込まれます。

SQL Server 接続設定

パラメータ 設定値 意味・役割
connection.url
jdbc:sqlserver://localhost:1433;databaseName=AdventureWorks2022;
integratedSecurity=false;encrypt=false;trustServerCertificate=true
SQL Serverへの接続に使用するJDBC接続URL。ターゲットデータベース、ホスト名、ポート、接続オプションを指定します。(一行で記述します)
connection.user
sa
SQL Serverに接続するためのユーザー名。
connection.password
password
SQL Serverに接続するためのパスワード。
connection.ds.pool.size
5
データベース接続プールで保持する最大接続数。コネクタタスクが利用できる接続数を制限します。

データ書き込み・テーブル操作設定

パラメータ 設定値 意味・役割
insert.mode
upsert
データの書き込みモード。upsertは、ターゲットテーブルに主キーを持つ行が存在すれば更新し、存在しなければ挿入することを意味します (Change Data Captureの一般的な操作)。
table.name.format
db2_employee
Kafkaのトピック名からSQL Serverのテーブル名に変換するフォーマット。ここでは、トピックに関わらず固定でdb2_employeeというテーブル名を使用。
pk.mode
record_key
主キー (Primary Key) の定義方法。ここではKafkaメッセージのキーを主キーとして使用することを指定しています。
pk.fields
EMPNO
pk.mode=record_keyで設定された主キーとして使用するフィールド名。
auto.create
TRUE
ターゲットのSQL Serverにテーブルが存在しない場合、自動的にテーブルを作成することを許可します。
insert.mode.databaselevel
TRUE
insert.modeをデータベースレベルで制御することを示唆しています。

コンバータとスキーマレジストリ設定

パラメータ 設定値 意味・役割
key.converter
io.apicurio.registry.utils.converter.AvroConverter
メッセージのキーを処理するためのAvroコンバータ。
value.converter
io.apicurio.registry.utils.converter.AvroConverter
メッセージの値を処理するためのAvroコンバータ。
key.converter.apicurio.registry.url
https://es-development-ibm-es-ac-reg-external-openshift-operators.apps.itz-g5ry1p.infra01-lb.dal14.techzone.ibm.com
キーのスキーマを取得するためのSchema RegistryのURL。
key.converter.apicurio.registry.auth.username
es-user
キーのSchema Registryに接続するためのユーザー名。
key.converter.apicurio.registry.auth.password
LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD
キーのSchema Registryに接続するためのパスワード。
value.converter.apicurio.registry.url
https://es-development-ibm-es-ac-reg-external-openshift-operators.apps.itz-g5ry1p.infra01-lb.dal14.techzone.ibm.com
値のスキーマを取得するためのSchema RegistryのURL。
value.converter.apicurio.registry.auth.username
es-user
値のSchema Registryに接続するためのユーザー名。
value.converter.apicurio.registry.auth.password
LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD
値のSchema Registryに接続するためのパスワード。

Consumer/TLS/SASL 設定 (Kafka Consumer Security)

パラメータ 設定値 意味・役割
consumer.ssl.protocol
TLSv1.2
ConsumerがKafkaと通信するためのTLSバージョン。
consumer.ssl.truststore.location
c:/Users/Administrator/Downloads/es-cert.p12
ConsumerがKafka Brokerの証明書を信頼するためのトラストストアのパス。
consumer.ssl.truststore.password
Rkmej7ekUrI1
トラストストアファイルを開くためのパスワード。
consumer.ssl.truststore.type
PKCS12
トラストストアファイルの形式。
consumer.override.security.protocol
SASL_SSL
Consumerの通信プロトコルとしてSASL認証とSSL暗号化を組み合わせたものを使用します。
consumer.override.sasl.mechanism
SCRAM-SHA-512
ConsumerのSASL認証メカニズム。
consumer.override.sasl.jaas.config
org.apache.kafka.common.security.scram.ScramLoginModule required username="es-user" password="LCfM4LMrzArIfTk0VJLJdTtMvFL2FHGD";
ConsumerがSCRAM認証でログインするための認証情報(ユーザー名とパスワード)。

Kafka Connectorの起動

  • connect-standalone.batを使って、起動します。
    cd c:\kafka_2.13-4.1.0\bin\windows\bin
    connect-standalone.bat .\config\connect-standalone.properties .\config\jdbc-sink.properties

CDCを使って、レプリケーションを実行し、Db2のデータがSQL Serverに反映されることを確認

  • CDCのconsoleより、リフレッシュで全件をKafkaにデータを送信します。


  • Event Streamsのトピックを確認します。
    トピックには、データが反映されています。


  • Kafka Connectのログをみると、データベースに処理していることが確認できます。


  • SQL Server で、テーブルができているか確認します。
    dbo.db2_employeeというテーブルが作成され、データが入っていることが確認できます。


    Db2のデータは以下の通りです。


  • CDCのコンソールから、Mirrorの設定を行います。
    常にデータベースの変更をキャプチャするためにMirrorにします。


    下記のようにMirroringになっていることが確認できます。
  • Db2のデータを一部Updateし、SQL Serverに反映されるかどうか確認します。
    empnoが000010の人のbonusを2000にUpdateします。Updateする前は、bonusは1000です。



  • SQL Serverのデータを確認します。
    2000に変更されていました。


  • Event Streamsのトピックを確認すると、更新データの258番目のオフセットにデータが来ています。

まとめ

IBM Data Replication(Change Data Capture)とモダンなイベント駆動型データ連携基盤であるIBM Event Streamsに加えて、Kafka ConnectのJDBC Sink Connectorを使うことで、異種データベース間(Db2 → SQL Server)で高速かつ低遅延なデータ同期できることがわかりました。

#IBMCloudPakforIntegration(ICP4I) #Kafka #DataReplication #Db2f

0 comments
6 views

Permalink