written by Dale Lane and Andrew Borley
You have a Kafka cluster that you have been using for a while. Your cluster has many topics, and the topics have many messages.
Now you've decided to move to start using a new, different Kafka cluster somewhere else.
How can you take your topics with you?
Terminology
For the purposes of this post, we'll refer to the two Kafka clusters as:
- "origin" - your existing Kafka cluster that you are migrating from
- "target" - your new Kafka cluster that you are migrating to
The instructions here use the IBM Event Streams Operator as a convenient way to configure and run MirrorMaker 2, but neither the "origin" or "target" Kafka clusters need to be managed by the Event Streams Operator for this to work.
We'll be running MirrorMaker 2 in a namespace called "migration".
Overview
This post is a brief introduction to how you can set up and run MirrorMaker 2 for data migration using the IBM Event Streams Operator.
MirrorMaker 2 can be run in a variety of different ways, for a variety of use cases - for example, it can be used for disaster-recovery use-cases where it is run as a continuous background mirroring process. However, for the purposes of this post, the requirement is a one-off data migration, so the description here will describe how to do that.
Prerequisites
Before you start, you need to obtain credentials and TLS certificates for both of your Kafka clusters:
Credentials from the "origin" cluster
If your "origin" cluster requires authentication, you will need to create credentials for MirrorMaker 2 to use.
If your "origin" cluster is managed by IBM Event Streams, you could do this by creating a KafkaUser
resource something like this (below).
apiVersion: eventstreams.ibm.com/v1beta1
kind: KafkaUser
metadata:
name: mm2-credentials
labels:
eventstreams.ibm.com/cluster: origin
namespace: origin-ns
spec:
authentication:
type: scram-sha-512
authorization:
acls:
- host: '*'
operation: Read
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: Describe
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: DescribeConfigs
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: Create
resource:
type: cluster
- host: '*'
operation: Read
resource:
type: cluster
- host: '*'
operation: Describe
resource:
type: cluster
- host: '*'
operation: Write
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: Describe
resource:
name: '*'
patternType: literal
type: group
- host: '*'
operation: Read
resource:
name: '*'
patternType: literal
type: group
type: simple
This will create a new secret with your credentials in the namespace where your "origin" Kafka cluster is running. Copy these into a secret in a namespace on the Kubernetes cluster where you want to run MirrorMaker 2.
apiVersion: v1
kind: Secret
metadata:
name: origin-cluster-credentials
namespace: migration
data:
password: YzZPSHZNUHhlWTZm
These credentials will allow MirrorMaker to find and consume from all of the topics on the "origin" cluster. You will need to do something similar using the type of Kafka cluster you are running. However you create your credentials, you should create a Secret
similar to the one above.
TLS certificate from the "origin" cluster
If your "origin" cluster requires TLS, you may need to obtain a CA cert for MirrorMaker 2 to use.
If your "origin" cluster is managed by IBM Event Streams, you will be able to find this in a Secret
called something like origin-cluster-ca-cert
in the namespace where your Kafka cluster is running.
Copy this into a secret in a namespace on the Kubernetes cluster where you want to run MirrorMaker 2.
apiVersion: v1
kind: Secret
metadata:
name: origin-cluster-ca-cert
namespace: migration
data:
ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJ...0tLS0tCg==
Credentials from the "target" cluster
If your "target" cluster requires authentication, you will need to create credentials for MirrorMaker 2 to use.
If your "target" cluster is managed by IBM Event Streams, you could do this by creating a KafkaUser
resource something like this (below).
Note: This is not the same as before, as the credentials for the "target" Kafka cluster need to allow creating topics.
apiVersion: eventstreams.ibm.com/v1beta1
kind: KafkaUser
metadata:
name: mm2-credentials
labels:
eventstreams.ibm.com/cluster: target
namespace: target-ns
spec:
authentication:
type: scram-sha-512
authorization:
acls:
- host: '*'
operation: Read
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: Write
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: Describe
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: DescribeConfigs
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: AlterConfigs
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: Alter
resource:
name: '*'
patternType: literal
type: topic
- host: '*'
operation: Create
resource:
type: cluster
- host: '*'
operation: Alter
resource:
type: cluster
- host: '*'
operation: DescribeConfigs
resource:
type: cluster
- host: '*'
operation: Read
resource:
name: '*'
patternType: literal
type: group
- host: '*'
operation: Describe
resource:
name: '*'
patternType: literal
type: group
type: simple
This will create a new secret with your credentials in the namespace where your "target" Kafka cluster is running. Copy these into a secret in a namespace on the Kubernetes cluster where you want to run MirrorMaker 2.
apiVersion: v1
kind: Secret
metadata:
name: target-cluster-credentials
namespace: migration
data:
password: TDQzaEoxVVlSMUds
These credentials will allow MirrorMaker to create and produce to topics on the "target" cluster. You will need to do something similar using the type of Kafka cluster you are running. However you create your credentials, you should create a Secret
similar to the one above.
TLS certificate from the "target" cluster
If your "target" cluster requires TLS, you may need to obtain a CA cert for MirrorMaker 2 to use.
If your "target" cluster is managed by IBM Event Streams, you will be able to find this in a Secret
called something like target-cluster-ca-cert
in the namespace where your Kafka cluster is running.
Copy this into a secret in a namespace on the Kubernetes cluster where you want to run MirrorMaker 2.
apiVersion: v1
kind: Secret
metadata:
name: target-cluster-ca-cert
namespace: migration
data:
ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FUR...0tLS0K
Start MirrorMaker 2
Create the following KafkaMirrorMaker2
resource so that the IBM Event Streams Operator can configure MirrorMaker 2 to start the data migration.
The configuration options below are commented in the definition to explain where you may want to customize the values for your own use.
apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaMirrorMaker2
metadata:
name: data-migration
namespace: migration
spec:
# How many instances of MirrorMaker 2 do you want to run in parallel?
#
# If you have a large "origin" cluster with a lot of data to migrate
# then you can increase this value.
replicas: 1
clusters:
# connection details for your "origin" cluster
- alias: "origin-cluster"
# change this to provide the bootstrap servers address for your "origin" cluster
bootstrapServers: "origin-kafka-bootstrap-origin-ns.apps.my-origin-cluster-name.cp.fyre.ibm.com:443"
# remove this section if your "origin" cluster does not require authentication
authentication:
# update this to match the authentication method used by your "origin" cluster
type: scram-sha-512
username: mm2-credentials
passwordSecret:
# name of the secret where you stored the
# credentials for your "origin" cluster
secretName: origin-cluster-credentials
# name of the key in the secret that contains
# the password
password: password
# remove this section if your "origin" cluster does not require a custom TLS CA certificate
tls:
trustedCertificates:
# name of the secret where you stored the
# TLS certificate for your "origin" cluster
- secretName: origin-cluster-ca-cert
# name of the key in the secret that contains
# the certificate
certificate: ca.crt
# connection details for your "target" cluster
- alias: "target-cluster"
# change this to provide the bootstrap servers address for your "target" cluster
bootstrapServers: "target-kafka-bootstrap-target-ns.apps.my-target-cluster-name.cp.fyre.ibm.com:443"
# remove this section if your "target" cluster does not require authentication
authentication:
# update this to match the authentication method used by your "target" cluster
type: scram-sha-512
username: mm2-credentials
passwordSecret:
# name of the secret where you stored the
# credentials for your "target" cluster
secretName: target-cluster-credentials
# name of the key in the secret that contains
# the password
password: password
# remove this section if your "target" cluster does not require a custom TLS CA certificate
tls:
trustedCertificates:
# name of the secret where you stored the
# TLS certificate for your "target" cluster
- secretName: target-cluster-ca-cert
# name of the key in the secret that contains
# the certificate
certificate: ca.crt
config:
# These topics will be created on the "target" Kafka
# cluster for MirrorMaker 2 to store it's state.
# Make sure that these names don't match the names of any
# of your existing topics.
# We will delete these topics once MirrorMaker 2 has finished.
offset.storage.topic: migration-connect-cluster-offsets
config.storage.topic: migration-connect-cluster-configs
status.storage.topic: migration-connect-cluster-status
connectCluster: "target-cluster"
mirrors:
- sourceCluster: "origin-cluster"
targetCluster: "target-cluster"
sourceConnector:
config:
# the replication factor that will be used for
# all topics created on the "target" Kafka cluster
replication.factor: 1
# don't try to copy permissions across from the "origin"
# cluster to the "target" cluster
sync.topic.acls.enabled: "false"
# create topics on the "target" cluster with names that
# match the names of the topics on the "origin" cluster
replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy"
replication.policy.separator: ""
# syncing offsets
offset-syncs.topic.replication.factor: 1
checkpointConnector:
config:
checkpoints.topic.replication.factor: 1
refresh.groups.interval.seconds: 600
emit.checkpoints.interval.seconds: 60
# ensures that consumer group offsets on the "target" cluster
# are correctly mapped to consumer groups on the "origin" cluster
replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy"
replication.policy.separator: ""
# Which topics should be migrated from the
# "origin" cluster to the "target" cluster ?
# If you don't want to migrate all of your topics, modify this pattern to
# match only the topics you want.
topicsPattern: ".*"
# Which consumer groups should be migrated from the
# "origin" cluster to the "target" cluster ?
# If you don't want to migrate all of your groups, modify this pattern to
# match only the groups you want.
groupsPattern: ".*"
Stop MirrorMaker 2 and clean up
Once MirrorMaker 2 has finished migrating all of your topics, you can delete it and the resources that it created while running.
- Delete the
data-migration
KafkaMirrorMaker2 resource
- Delete the three topics that MirrorMaker 2 creates to store it's state (in the config above, these start with the name
migration-connect-cluster-
)
Don't delete the origin-cluster.checkpoints.internal
topic, as you will use this in the next step.
Update your topic replication factors
MirrorMaker 2 does not migrate the replication factor for topics it creates on the "target" cluster to match the "origin" cluster. All topics that it creates on the "target" cluster will have the replication factor you specify in the MirrorMaker 2 config.
You should update the replication factor on the topics on your "target" cluster to match your requirements before continuing.
Resume your client applications
Kafka client applications that were using the "origin" Kafka cluster can now start using the "target" Kafka cluster.
MirrorMaker 2's MirrorCheckpointConnector
automatically store consumer group offset checkpoints for consumer groups on the "origin" Kafka cluster. Each checkpoint maps the last committed offset for each consumer group in the "origin" cluster to the equivalent offset in the "target" cluster.
These checkpoints are stored in the origin-cluster.checkpoints.internal
topic on the "target" cluster.
You can use these checkpoints in your Kafka consumer applications to start consuming from an offset on the "target" cluster that is equivalent to the last committed offset on the "origin" cluster.
If your application is written in Java, Kafka's RemoteClusterUtils
class provides the translateOffsets()
utility method to retrieve the destination cluster offsets for a consumer group from the checkpoints topic.
You can then use the KafkaConsumer.seek()
method to override the offsets that the consumer will use on the next poll
.
For example, the following Java code snippet will update the example-group
consumer group offset from the "origin" cluster to the "target" cluster equivalent.
// Retrieve the mapped offsets for the destination cluster topic-partitions
Map<TopicPartition, OffsetAndMetadata> targetOffsetsMap = RemoteClusterUtils.translateOffsets(properties, "origin", "example-group", Duration.ofMillis(10000));
// Update the KafkaConsumer to start at the mapped offsets for every topic-partition
targetOffsetsMap.forEach((topicPartition, offsetAndMetadata) -> kafkaConsumer.seek(topicPartition, offsetAndMetadata));
// Retrieve records from the target cluster, starting from the mapped offsets
ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(10000));
If you want your client application to continue processing messages on the "target" cluster from the point they reached on the topic on the "origin" cluster, you can use the following command:
kafka-consumer-groups.sh \
--bootstrap-server target-kafka-bootstrap-target-ns.apps.my-target-cluster-name.cp.fyre.ibm.com:443 \
--dry-run \
--group example-group \
--reset-offsets --to-datetime 2021-03-24T21:00:00.000 \
--all-topics
Replace the timestamp in the command with the time that you ran MirrorMaker 2. Once you're satisfied that the command is correctly configured, you can remove --dry-run
.
Using this command to administratively reset the consumer group offset avoids the need to make code changes to your client application.
Once you have finished updating your Kafka consumers and no longer need to offset checkpoints that Mirror Maker 2 has stored, you can delete the origin-cluster.checkpoints.internal
topic.