Data Replication

Expand all | Collapse all

CDC for Kafka Topic Message parse

  • 1.  CDC for Kafka Topic Message parse

    Posted 16 days ago
    Hello All,

    I am very new to IBM IIDR and kafka integration.
    I need to parse the Kafka topic value into the same data type which is there in the source datasource(oracle, db2 etc.)

    for Example :
    In the below example, the schema definition is created by the CDC into the Kafka schema registry(avro) and all the source attributes have
    different data types(Decimal, varchar etc.).

    {"subject":"cdckafka.oraclekafka.sourcedb.kafka.cdcurrencytp-value","version":1,"id":3,"schema":"{\"type\":\"record\",\"name\":\"CDCURRENCYTP\",\"namespace\":\"value.SOURCEDB.KAFKA\",\"fields\":[{\"name\":\"LANG_TP_CD\",\"type\":[{\"type\":\"string\",\"logicalType\":\"DECIMAL\",\"dbColumnName\":\"LANG_TP_CD\",\"precision\":19,\"scale\":0},\"null\"],\"default\":\"0\"},{\"name\":\"CURRENCY_TP_CD\",\"type\":[{\"type\":\"string\",\"logicalType\":\"DECIMAL\",\"dbColumnName\":\"CURRENCY_TP_CD\",\"precision\":19,\"scale\":0},\"null\"],\"default\":\"0\"},{\"name\":\"NAME\",\"type\":[{\"type\":\"string\",\"logicalType\":\"VARCHAR\",\"dbColumnName\":\"NAME\",\"length\":120},\"null\"],\"default\":\"\"},{\"name\":\"DESCRIPTION\",\"type\":[{\"type\":\"string\",\"logicalType\":\"VARCHAR\",\"dbColumnName\":\"DESCRIPTION\",\"length\":255},\"null\"],\"default\":\"\"},{\"name\":\"EXPIRY_DT\",\"type\":[{\"type\":\"string\",\"logicalType\":\"TIMESTAMP\",\"dbColumnName\":\"EXPIRY_DT\",\"length\":29},\"null\"],\"default\":\"\"},{\"name\":\"LAST_UPDATE_DT\",\"type\":[{\"type\":\"string\",\"logicalType\":\"TIMESTAMP\",\"dbColumnName\":\"LAST_UPDATE_DT\",\"length\":29},\"null\"],\"default\":\"\"},{\"name\":\"CURRENCY_CODE\",\"type\":[{\"type\":\"string\",\"logicalType\":\"CHARACTER\",\"dbColumnName\":\"CURRENCY_CODE\",\"length\":3},\"null\"],\"default\":\"\"}]}"}
     however, when we are consuming the message from the topic all converted into the string format.

    {"LANG_TP_CD":{"string":"100"},"COUNTRY_TP_CD":{"string":"197"},"NAME":{"string":"Finland"},"DESCRIPTION":null,"EXPIRY_DT":null,"LAST_UPDATE_DT":{"string":"2001-09-28T22:20:11.000000000000"},"ISO_CODE":{"string":"FI"}}
    {"LANG_TP_CD":{"string":"100"},"COUNTRY_TP_CD":{"string":"198"},"NAME":{"string":"Switzerland"},"DESCRIPTION":null,"EXPIRY_DT":null,"LAST_UPDATE_DT":{"string":"2006-10-24T22:20:11.000000000000"},"ISO_CODE":{"string":"CH"}}
    {"LANG_TP_CD":{"string":"100"},"COUNTRY_TP_CD":{"string":"7"},"NAME":{"string":"Argentina"},"DESCRIPTION":{"string":"Argentina"},"EXPIRY_DT":null,"LAST_UPDATE_DT":{"string":"2001-08-29T00:00:00.000000000000"},"ISO_CODE":{"string":"AR"}}

    Could you please & suggest how to read the messages using the same format which is available in the data source?

    Thank,
    Manish






    ------------------------------
    manish bachhania
    ------------------------------


  • 2.  RE: CDC for Kafka Topic Message parse

    Posted 16 days ago
    Hello Manish

    The set of primitive data types offered by Avro as per their documentation is:

    Primitive Types

    The set of primitive type names is:

    • null: no value
    • boolean: a binary value
    • int: 32-bit signed integer
    • long: 64-bit signed integer
    • float: single precision (32-bit) IEEE 754 floating-point number
    • double: double precision (64-bit) IEEE 754 floating-point number
    • bytes: sequence of 8-bit unsigned bytes
    • string: unicode character sequence

    So I think you simply map the source datatype to the closest corresponding Avro data type as per above.

    Regards

    Robert

    ------------------------------
    Robert Philo
    ------------------------------



  • 3.  RE: CDC for Kafka Topic Message parse

    Posted 16 days ago
    Thanks, Robert for your quick response!

    Could you please guide me on how to map the source column with the target corresponding Avro data type ?

    The source-target column mapping option is disabled in the Management console in the case of the target is Kafka.

     


    ------------------------------
    manish bachhania
    ------------------------------



  • 4.  RE: CDC for Kafka Topic Message parse

    Posted 16 days ago
    Hello Manish

    The mapping is automatic with the Kafka target. CDC simply takes all the source columns and maps them automatically into Kafka

    It follows from this that if you wish to enrich the data, you either have to add a source derived column (which would appear to the Kafka agent as if it were actually part of the source table schema) or use the KCOP functionality, for example the audit mode

    Regards

    Robert

    ------------------------------
    Robert Philo
    ------------------------------



  • 5.  RE: CDC for Kafka Topic Message parse

    Posted 16 days ago
    Hello Robert,

    Thanks again for your input!

    I had created the derived column in mapping however it is not populated into Kafka topic.


    only the source columns are populating into Kafka topic with all converted into a string data type.

    kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic cdckafka.kafka.cdaccounttp --from-beginning --property schema.registry.url=http://localhost:8081
    {"LANG_TP_CD":{"string":"100"},"ACCT_TP_CD":{"string":"1"},"NAME":{"string":"Checking"},"DESCRIPTION":null,"EXPIRY_DT":null,"LAST_UPDATE_DT":{"string":"2002-06-12T09:35:51.065000000000"}}
    {"LANG_TP_CD":{"string":"100"},"ACCT_TP_CD":{"string":"2"},"NAME":{"string":"Savings"},"DESCRIPTION":null,"EXPIRY_DT":null,"LAST_UPDATE_DT":{"string":"2002-04-30T16:10:37.000000000000"}}


    I am also trying to enable to Kafka KCOP processor based on the following documentation.
      https://www.ibm.com/docs/en/idr/11.4.0?topic=kafka-enabling-integrated-custom-operation-processors-kcop

    However, the user exits menu is disabled for me.
    I am using the confluent Kafka zookeeper server and schema registry URL into Kafka-property.

     
    Could you please help with the configuration and what steps I am missing for the delivered column and CKOP configuration?

    Thanks in advance!

    Thanks,
    Manish


    ------------------------------
    manish bachhania
    ------------------------------



  • 6.  RE: CDC for Kafka Topic Message parse

    Posted 16 days ago
    Hello Manish

    Are you using one of the latest builds of CDC for Kafka (from IBM Fix Central, not the base Passport Advantage version)

    When you create source derived column, you should check the column filtering to ensure that it is selected. For some reason a source derived column is not selected by default, and this is probably why the target cannot see it

    Regards

    Robert

    ------------------------------
    Robert Philo
    ------------------------------



  • 7.  RE: CDC for Kafka Topic Message parse

    Posted 6 days ago
    Hello Robert,

    Thanks for your input!

    I was using the old IIDR version  & now upgraded from the IBM Fix Central.
    Now able to use the user exit features for Kafka custom operation processor (KCOP).

    I appreciate your help!

    Thank,
    Manish

    ------------------------------
    manish bachhania
    ------------------------------