Data Integration

Data Integration

Connect with experts and peers to elevate technical expertise, solve problems and share insights.

 View Only

Replicating SQL Data to Databricks in IBM Data Replication

By Phil Koza posted 4 days ago

  

Replicating SQL Data to Databricks

Overview

It’s no wonder Databricks is so popular these days! Databricks helps manage and analyze large volumes of data, particularly in hybrid cloud environments. It simplifies the process of building data engineering applications with high volume workloads. Databricks' architecture, based on a Lakehouse concept, offers an open, unified foundation for all data and governance, enabling seamless access to data for advanced analytics and AI.

If you’re a Databricks user though, you might find yourself struggling with how to capture valuable transactional data and make it available quickly for use within Databricks, given that Databricks auto-commits, has limited JDBC statement parameters, and restricts batches by limiting the number of rows inserted or deleted in a single statement. Add to that, JDBC batch operations tend to be low performing.

Fortunately, IBM Data Replication has a solution – you can now use the CDC FlexRep engine for Databricks to efficiently deliver near-real-time data from mainframe and distributed sources to Databricks and gain timely insights from your data. Provision data into Databricks workspaces located in various regions to ensure consistent, low-latency access for teams or customers.

Data replication is the process of storing data in multiple locations to ensure consistency and availability. This pattern provides guidance for replicating data from a SQL database to Databricks using IBM Data Replication’s Change Data Capture (CDC). CDC uses JDBC to replicate data and supports specific JDBC driver versions for Databricks.

Supported JDBC Drivers

CDC supports the following JDBC driver versions for connecting to Databricks:

Simba Driver (version 2.7.1 or 2.7.3)

OSS Driver (version 2.7.1 or 2.7.3)

You can download either driver from the Databricks website. Run Databricks on any cloud platform accessible via the JDBC driver.

Supported Databricks Data Types

CDC supports Databricks data types that conform to the SQL92 standard, including:

Bigint

Binary

Boolean

Date

Decimal

Double

Float

Int

Smallint

String

Timestamp

Tinyint

Installing CDC and Creating a Databricks Instance

Procedure

1.     Download the latest CDC 11.4.0.5 installer and specify "Flexrep" as the target database.

2.     Create a CDC instance using the CDC config tool.

3.     Specify the JDBC driver and JDBC URL. The JDBC URL follows this format:

jdbc:databricks://<hostname>/:<port>/<schema>/[:property=value[;…]]

For example,

:jdbc:databricks://adb-2945880371306113.13.azuredatabricks.net:443/default;

transportMode=http;ssl=1;AuthMech=3;

httpPath=/sql/1.0/warehouses/b28338c62ba57585

4.     Provide the username as “token”, and the password as the personal access token for the CDC user. The token identifies the CDC user.

5.     Specify a schema where CDC can create metadata tables. Tables to be replicated can be in this schema or they can be in any schema in the default catalog for the CDC user.

6.     Grant the CDC user authority to create tables in and drop tables from the specified schema.

If you need to replicate tables from a catalog other than the CDC user’s default catalog, create a separate CDC user whose default catalog is the catalog with the desired tables. Create a separate CDC instance with that user.

Creating a Subscription and Configuring for Databricks

Procedure

1.     Create a subscription as you would for replicating from any CDC source to any CDC target.

2.     Specify a subscription-level fast apply user exit to use a "fast apply" algorithm. Choose one of the following fast apply algorithms:

-       GroupByTableNetEffectConvertUpdates

-       ParallelizeByTableNetEffectConvertUpdates

-       ParallelizeByHashNetEffectConvertUpdates

All of these algorithms are "NetEffect" algorithms that preserve source transaction consistency and consolidate multiple operations for a row. For example, if a row is inserted then deleted in the same unit of work (UOW), then CDC throws away both the insert and delete. CDC reorders operations in the UOW so that all deletes on each table are consecutive, and all inserts on each table are consecutive. This allows the deletes for each table to be applied as a single batch, and the inserts for each table to be applied as a single batch. To achieve good performance when applying to Databricks, you must apply a batch of operations at a time; single-row operations are very slow.

Selecting a Fast Apply Algorithm

To decide which fast apply algorithm to use for your Databricks target, consider these characteristics.

GroupByTableNetEffectConvertUpdates uses a single thread. Select it if you do not need parallelism.

ParallelizeByHashNetEffectConvertUpdates is a parallel algorithm apply algorithm where the tables are split amongst the apply threads via a hashing algorithm. This ensures all apply thread will be given a uniform distribution of the workload, even if some tables are busy and others are relatively idle.

ParallelizeByTableNetEffectConvertUpdates is a parallel algorithm apply algorithm where the tables are split amongst the apply threads via a round-robin algorithm. Select it if your tables have roughly the same activity level.

Configuring Fast Apply Algorithm Properties

Procedure

1.     UOW threshold - Set the "UOW threshold" to a value greater than or equal to

<number of tables in a typical UOW> * global_max_batch_size * 2

2.     Threads - For parallel fast apply algorithms, set the number of apply threads initially to 4 and increase as needed to achieve the desired throughput.

Special Considerations for Databricks

1.     To avoid exceeding Databricks' 255 SQL parameter limit, set the CDC system property target_apply_single_multirow_statement_column_values_inline to true to append insert or delete column values as literal strings instead of SQL parameters.

2.     Increase the CDC system property global_max_batch_size to a value much larger than the default (50) for better mirroring performance, typically 2000 or more. However, be cautious of very long SQL statements that may degrade performance. Increase the batch size until performance starts to decrease for the typical workload. Set the fast apply Unit Of Work (UOW) threshold to be substantially larger than the value set for this property, as a batch cannot span a UOW.

3.     Increase the CDC system properties jdbc_refresh_max_batch_size and jdbc_refresh_commit_after_max_operations to larger values (e.g., 5000 or more) for better table refresh (load) performance.  Note that jdbc_refresh_max_batch_size cannot exceed the value set for jdbc_refresh_commit_after_max_operations.

4.     Enable parallel refresh by setting target_allow_parallel_jdbc_refresh to true and specifying the desired level of parallelism (target_num_refresh_apply_threads, default is 4).

5.     Set target_apply_insert_batch_via_single_statement and  target_apply_delete_batch_via_single_statement to true to apply multiple inserts and deletes in a single batch for optimal performance.

Summary of Required Properties for Databricks

Property Name

Default Value

Recommended Value

target_apply_insert_batch_via_single_statement

false

true

target_apply_delete_batch_via_single_statement

false

true

target_apply_single_multirow_statement_column_values_inline

false

true

global_max_batch_size

50

2000 or more

jdbc_refresh_max_batch_size

500

5000 or more

jdbc_refresh_commit_after_max_operations

1000

5000 or more

target_allow_parallel_jdbc_refresh

false

true

target_num_refresh_apply_threads

4

8 or more

Defining Replication Keys

1.     CDC requires a set of target table columns that uniquely identify a row, called the "replication key."

2.     The best replication key columns are primary key columns. If a primary key constraint is declared, CDC will automatically choose the primary key columns.

3.     In a Databricks workspace that supports Unity Catalog and whose default catalog is not the Hive catalog, primary key constraints are supported. It is highly recommended to declare primary key constraints on all target tables.

4.     If a primary key constraint is not declared, explicitly set the replication key columns to the set of columns that logically form a unique key when creating a table mapping. Declare these columns as NOT NULL.

5.     A multi-column primary key will have much slower multi-row delete performance than a single-column primary key. This is due to Databricks SQL optimizer issues.  Declare a single-column primary key whenever possible.

How to Get Started

To get started applying data to Databricks, install or upgrade to IDR 11.4.0.5-5776 or newer from FixCentral.

0 comments
8 views

Permalink