Event Streams

Migrating your Apache Kafka cluster using MirrorMaker 2

By Dale Lane posted Thu March 25, 2021 09:26 AM

  

written by Dale Lane and Andrew Borley


You have a Kafka cluster that you have been using for a while. Your cluster has many topics, and the topics have many messages.

Now you've decided to move to start using a new, different Kafka cluster somewhere else.

How can you take your topics with you?





Terminology

For the purposes of this post, we'll refer to the two Kafka clusters as:

  • "origin" - your existing Kafka cluster that you are migrating from
  • "target" - your new Kafka cluster that you are migrating to

The instructions here use the IBM Event Streams Operator as a convenient way to configure and run MirrorMaker 2, but neither the "origin" or "target" Kafka clusters need to be managed by the Event Streams Operator for this to work.

We'll be running MirrorMaker 2 in a namespace called "migration".


Overview

This post is a brief introduction to how you can set up and run MirrorMaker 2 for data migration using the IBM Event Streams Operator.

MirrorMaker 2 can be run in a variety of different ways, for a variety of use cases - for example, it can be used for disaster-recovery use-cases where it is run as a continuous background mirroring process. However, for the purposes of this post, the requirement is a one-off data migration, so the description here will describe how to do that.


Prerequisites

Before you start, you need to obtain credentials and TLS certificates for both of your Kafka clusters:

Credentials from the "origin" cluster

If your "origin" cluster requires authentication, you will need to create credentials for MirrorMaker 2 to use.

If your "origin" cluster is managed by IBM Event Streams, you could do this by creating a KafkaUser resource something like this (below).

apiVersion: eventstreams.ibm.com/v1beta1
kind: KafkaUser
metadata:
  name: mm2-credentials
  labels:
    eventstreams.ibm.com/cluster: origin
  namespace: origin-ns
spec:
  authentication:
    type: scram-sha-512
  authorization:
    acls:
      - host: '*'
        operation: Read
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: Describe
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: DescribeConfigs
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: Create
        resource:
          type: cluster
      - host: '*'
        operation: Read
        resource:
          type: cluster
      - host: '*'
        operation: Describe
        resource:
          type: cluster
      - host: '*'
        operation: Write
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: Describe
        resource:
          name: '*'
          patternType: literal
          type: group
      - host: '*'
        operation: Read
        resource:
          name: '*'
          patternType: literal
          type: group
    type: simple

This will create a new secret with your credentials in the namespace where your "origin" Kafka cluster is running. Copy these into a secret in a namespace on the Kubernetes cluster where you want to run MirrorMaker 2.

apiVersion: v1
kind: Secret
metadata:
  name: origin-cluster-credentials
  namespace: migration
data:
  password: YzZPSHZNUHhlWTZm

These credentials will allow MirrorMaker to find and consume from all of the topics on the "origin" cluster. You will need to do something similar using the type of Kafka cluster you are running. However you create your credentials, you should create a Secret similar to the one above.

TLS certificate from the "origin" cluster

If your "origin" cluster requires TLS, you may need to obtain a CA cert for MirrorMaker 2 to use.

If your "origin" cluster is managed by IBM Event Streams, you will be able to find this in a Secret called something like origin-cluster-ca-cert in the namespace where your Kafka cluster is running.

Copy this into a secret in a namespace on the Kubernetes cluster where you want to run MirrorMaker 2.

apiVersion: v1
kind: Secret
metadata:
  name: origin-cluster-ca-cert
  namespace: migration
data:
  ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJ...0tLS0tCg==

Credentials from the "target" cluster

If your "target" cluster requires authentication, you will need to create credentials for MirrorMaker 2 to use.

If your "target" cluster is managed by IBM Event Streams, you could do this by creating a KafkaUser resource something like this (below).

Note: This is not the same as before, as the credentials for the "target" Kafka cluster need to allow creating topics.

apiVersion: eventstreams.ibm.com/v1beta1
kind: KafkaUser
metadata:
  name: mm2-credentials
  labels:
    eventstreams.ibm.com/cluster: target
  namespace: target-ns
spec:
  authentication:
    type: scram-sha-512
  authorization:
    acls:
      - host: '*'
        operation: Read
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: Write
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: Describe
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: DescribeConfigs
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: AlterConfigs
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: Alter
        resource:
          name: '*'
          patternType: literal
          type: topic
      - host: '*'
        operation: Create
        resource:
          type: cluster
      - host: '*'
        operation: Alter
        resource:
          type: cluster
      - host: '*'
        operation: DescribeConfigs
        resource:
          type: cluster
      - host: '*'
        operation: Read
        resource:
          name: '*'
          patternType: literal
          type: group
      - host: '*'
        operation: Describe
        resource:
          name: '*'
          patternType: literal
          type: group
    type: simple

This will create a new secret with your credentials in the namespace where your "target" Kafka cluster is running. Copy these into a secret in a namespace on the Kubernetes cluster where you want to run MirrorMaker 2.

apiVersion: v1
kind: Secret
metadata:
  name: target-cluster-credentials
  namespace: migration
data:
  password: TDQzaEoxVVlSMUds

These credentials will allow MirrorMaker to create and produce to topics on the "target" cluster. You will need to do something similar using the type of Kafka cluster you are running. However you create your credentials, you should create a Secret similar to the one above.

TLS certificate from the "target" cluster

If your "target" cluster requires TLS, you may need to obtain a CA cert for MirrorMaker 2 to use.

If your "target" cluster is managed by IBM Event Streams, you will be able to find this in a Secret called something like target-cluster-ca-cert in the namespace where your Kafka cluster is running.

Copy this into a secret in a namespace on the Kubernetes cluster where you want to run MirrorMaker 2.

apiVersion: v1
kind: Secret
metadata:
  name: target-cluster-ca-cert
  namespace: migration
data:
  ca.crt: LS0tLS1CRUdJTiBDRVJUSUZJQ0FUR...0tLS0K

Start MirrorMaker 2

Create the following KafkaMirrorMaker2 resource so that the IBM Event Streams Operator can configure MirrorMaker 2 to start the data migration.

The configuration options below are commented in the definition to explain where you may want to customize the values for your own use.

apiVersion: eventstreams.ibm.com/v1alpha1
kind: KafkaMirrorMaker2
metadata:
  name: data-migration
  namespace: migration
spec:
  # How many instances of MirrorMaker 2 do you want to run in parallel?
  #
  # If you have a large "origin" cluster with a lot of data to migrate
  #  then you can increase this value.
  replicas: 1


  clusters:

  # connection details for your "origin" cluster
  - alias: "origin-cluster"
    # change this to provide the bootstrap servers address for your "origin" cluster
    bootstrapServers: "origin-kafka-bootstrap-origin-ns.apps.my-origin-cluster-name.cp.fyre.ibm.com:443"
    # remove this section if your "origin" cluster does not require authentication
    authentication:
      # update this to match the authentication method used by your "origin" cluster
      type: scram-sha-512
      username: mm2-credentials
      passwordSecret:
        # name of the secret where you stored the
        #  credentials for your "origin" cluster
        secretName: origin-cluster-credentials
        # name of the key in the secret that contains
        #  the password
        password: password
    # remove this section if your "origin" cluster does not require a custom TLS CA certificate
    tls:
      trustedCertificates:
        # name of the secret where you stored the
        #  TLS certificate for your "origin" cluster
        - secretName: origin-cluster-ca-cert
          # name of the key in the secret that contains
          #  the certificate
          certificate: ca.crt

  # connection details for your "target" cluster
  - alias: "target-cluster"
    # change this to provide the bootstrap servers address for your "target" cluster
    bootstrapServers: "target-kafka-bootstrap-target-ns.apps.my-target-cluster-name.cp.fyre.ibm.com:443"
    # remove this section if your "target" cluster does not require authentication
    authentication:
      # update this to match the authentication method used by your "target" cluster
      type: scram-sha-512
      username: mm2-credentials
      passwordSecret:
        # name of the secret where you stored the
        #  credentials for your "target" cluster
        secretName: target-cluster-credentials
        # name of the key in the secret that contains
        #  the password
        password: password
    # remove this section if your "target" cluster does not require a custom TLS CA certificate
    tls:
      trustedCertificates:
        # name of the secret where you stored the
        #  TLS certificate for your "target" cluster
        - secretName: target-cluster-ca-cert
          # name of the key in the secret that contains
          #  the certificate
          certificate: ca.crt
    config:
      # These topics will be created on the "target" Kafka
      #  cluster for MirrorMaker 2 to store it's state.
      # Make sure that these names don't match the names of any
      #  of your existing topics.
      # We will delete these topics once MirrorMaker 2 has finished.
      offset.storage.topic: migration-connect-cluster-offsets
      config.storage.topic: migration-connect-cluster-configs
      status.storage.topic: migration-connect-cluster-status

  connectCluster: "target-cluster"

  mirrors:
    - sourceCluster: "origin-cluster"
      targetCluster: "target-cluster"
      sourceConnector:
        config:
          # the replication factor that will be used for
          #  all topics created on the "target" Kafka cluster
          replication.factor: 1

          # don't try to copy permissions across from the "origin"
          #  cluster to the "target" cluster
          sync.topic.acls.enabled: "false"
          # create topics on the "target" cluster with names that
          #  match the names of the topics on the "origin" cluster
          replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy"
          replication.policy.separator: ""
          # syncing offsets
          offset-syncs.topic.replication.factor: 1

      checkpointConnector:
        config:
          checkpoints.topic.replication.factor: 1
          refresh.groups.interval.seconds: 600
          emit.checkpoints.interval.seconds: 60
          # ensures that consumer group offsets on the "target" cluster
          #  are correctly mapped to consumer groups on the "origin" cluster
          replication.policy.class: "io.strimzi.kafka.connect.mirror.IdentityReplicationPolicy"
          replication.policy.separator: ""

      # Which topics should be migrated from the
      #  "origin" cluster to the "target" cluster ?
      # If you don't want to migrate all of your topics, modify this pattern to
      #  match only the topics you want. 
      topicsPattern: ".*"

      # Which consumer groups should be migrated from the
      #  "origin" cluster to the "target" cluster ?
      # If you don't want to migrate all of your groups, modify this pattern to
      #  match only the groups you want. 
      groupsPattern: ".*"

Stop MirrorMaker 2 and clean up

Once MirrorMaker 2 has finished migrating all of your topics, you can delete it and the resources that it created while running.

  • Delete the data-migration KafkaMirrorMaker2 resource
  • Delete the three topics that MirrorMaker 2 creates to store it's state (in the config above, these start with the name migration-connect-cluster-)

Don't delete the origin-cluster.checkpoints.internal topic, as you will use this in the next step.


Update your topic replication factors

MirrorMaker 2 does not migrate the replication factor for topics it creates on the "target" cluster to match the "origin" cluster. All topics that it creates on the "target" cluster will have the replication factor you specify in the MirrorMaker 2 config.

You should update the replication factor on the topics on your "target" cluster to match your requirements before continuing.


Resume your client applications

Kafka client applications that were using the "origin" Kafka cluster can now start using the "target" Kafka cluster.

MirrorMaker 2's MirrorCheckpointConnector automatically store consumer group offset checkpoints for consumer groups on the "origin" Kafka cluster. Each checkpoint maps the last committed offset for each consumer group in the "origin" cluster to the equivalent offset in the "target" cluster.

These checkpoints are stored in the origin-cluster.checkpoints.internal topic on the "target" cluster.

You can use these checkpoints in your Kafka consumer applications to start consuming from an offset on the "target" cluster that is equivalent to the last committed offset on the "origin" cluster.

If your application is written in Java, Kafka's RemoteClusterUtils class provides the translateOffsets() utility method to retrieve the destination cluster offsets for a consumer group from the checkpoints topic.

You can then use the KafkaConsumer.seek() method to override the offsets that the consumer will use on the next poll.

For example, the following Java code snippet will update the example-group consumer group offset from the "origin" cluster to the "target" cluster equivalent.

// Retrieve the mapped offsets for the destination cluster topic-partitions
Map<TopicPartition, OffsetAndMetadata> targetOffsetsMap = RemoteClusterUtils.translateOffsets(properties, "origin", "example-group", Duration.ofMillis(10000));

// Update the KafkaConsumer to start at the mapped offsets for every topic-partition
targetOffsetsMap.forEach((topicPartition, offsetAndMetadata) -> kafkaConsumer.seek(topicPartition, offsetAndMetadata));

// Retrieve records from the target cluster, starting from the mapped offsets
ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(Duration.ofMillis(10000));

If you want your client application to continue processing messages on the "target" cluster from the point they reached on the topic on the "origin" cluster, you can use the following command:

kafka-consumer-groups.sh \
    --bootstrap-server target-kafka-bootstrap-target-ns.apps.my-target-cluster-name.cp.fyre.ibm.com:443 \
    --dry-run \
    --group example-group \
    --reset-offsets --to-datetime 2021-03-24T21:00:00.000 \
    --all-topics  

Replace the timestamp in the command with the time that you ran MirrorMaker 2. Once you're satisfied that the command is correctly configured, you can remove --dry-run.

Using this command to administratively reset the consumer group offset avoids the need to make code changes to your client application.

Once you have finished updating your Kafka consumers and no longer need to offset checkpoints that Mirror Maker 2 has stored, you can delete the origin-cluster.checkpoints.internal topic.

Permalink