In Part1, we understood how to connect with Iceberg Type catalog in watsonx.data & use it to create a namespace & table with a defined schema.
In this second part of the Iceberg Rest Catalog Java Client blog series, we will explore how to commit updates to multiple tables in a single atomic operation.
Important: Any commit made to transaction object will be a temporary or in-memory commit. It must not be understood as an actual or a transaction commit.
Loading tables from watsonx.data
The following code shows how to load two tables from watsonx.data and start transactions for each table.
We will update the schema with transaction objects and do an in-memory commit. Post that we will call the commitTrasaction method for multi-table commits in an atomic method.
/**
* Example to show how to do multi-table commit in single transaction
*/
public void multiTableAtomicCommit() {
Namespace namespace = Namespace.of(“Trek_company”);
catalog.createNamespace(namespace);
TableIdentifier identifier1 = TableIdentifier.of(namespace, “trekyaari”);
TableIdentifier identifier2 = TableIdentifier.of(namespace, “trekIndia”);
//schema with which we are creating both tables
Schema schema = new Schema(
Types.NestedField.optional(1, “Trek_id”, Types.LongType.get()),
Types.NestedField.optional(2, “Trek_name”, Types.StringType.get()),
Types.NestedField.optional(3, “Location”, Types.StringType.get()),
Types.NestedField.optional(4, “Duration”, Types.DoubleType.get()),
Types.NestedField.optional(5, “Difficulty”, Types.StringType.get())
);
//creating two tables for demo of multi-table atomic commit, will use both tables
catalog.createTable(identifier1, schema);
catalog.createTable(identifier2, schema);
//loading both tables created above
Table table1 = catalog.loadTable(identifier1);
Table table2 = catalog.loadTable(identifier2);
Transaction t1Transaction = catalog.loadTable(identifier1).newTransaction();
//adding column with name minimum_age in table1
t1Transaction.updateSchema().addColumn(“minimum_age”, Types.LongType.get()).commit();
//renaming Difficulty column with name as challenge
Transaction t2Transaction = catalog.loadTable(identifier2).newTransaction();
t2Transaction.updateSchema().renameColumn(“Difficulty”, “challenge”).commit();
TableCommit tableCommit1 =
TableCommit.create(
identifier1,
((BaseTransaction) t1Transaction).startMetadata(),
((BaseTransaction) t1Transaction).currentMetadata());
TableCommit tableCommit2 =
TableCommit.create(
identifier2,
((BaseTransaction) t2Transaction).startMetadata(),
((BaseTransaction) t2Transaction).currentMetadata());
//actual commit for both transaction will happen here
catalog.commitTransaction(tableCommit1, tableCommit2);
//loading both tables again after multi-table atomic commit
table1 = catalog.loadTable(identifier1);
table2 = catalog.loadTable(identifier2);
System.out.println(“Below you can see Table1 updated schema after commit”);
System.out.println(table1.schema());
System.out.println();
System.out.println(“Below you can see Table2 updated schema after commit”);
System.out.println(table2.schema());
}
Results
In the Java client, you can see the tables are updated – column name changed, column deleted.
Image 1: Java client
Introducing a conflict
Now, let’s introduce a conflict. As a result, both transactions will fail.
The following code shows how to load two tables from watsonx.data and initialize a transaction for each table.
Using the transaction objects, we update the schema and perform an in-memory commit. However, before the commit takes effect, we will delete a column without involving the transaction. After the column deletion, we attempt to commit both transactions.
The second transaction, which involves renaming the column, fails because the column is already deleted. As a result, the first transaction will also fail, and no changes will be made to table1.
public void multiTableAtomicCommitConflict() {
Namespace namespace = Namespace.of("Trek_company");
catalog.createNamespace(namespace);
TableIdentifier identifier1 = TableIdentifier.of(namespace, "trekyaari");
TableIdentifier identifier2 = TableIdentifier.of(namespace, "trekIndia");
//schema with which we are creating both tables
Schema schema = new Schema(
Types.NestedField.optional(1, "Trek_id", Types.LongType.get()),
Types.NestedField.optional(2, "Trek_name", Types.StringType.get()),
Types.NestedField.optional(3, "Location", Types.StringType.get()),
Types.NestedField.optional(4, "Duration", Types.DoubleType.get()),
Types.NestedField.optional(5, "Difficulty", Types.StringType.get())
);
//creating two tables for demo of multi-table atomic commit, will use both tables
catalog.createTable(identifier1, schema);
catalog.createTable(identifier2, schema);
//loading both tables created above
Table table1 = catalog.loadTable(identifier1);
Table table2 = catalog.loadTable(identifier2);
Transaction t1Transaction = catalog.loadTable(identifier1).newTransaction();
//adding column with name minimum_age in table1
t1Transaction.updateSchema().addColumn("minimum_age", Types.LongType.get()).commit();
//renaming Difficulty column with name as challenge
Transaction t2Transaction = catalog.loadTable(identifier2).newTransaction();
t2Transaction.updateSchema().renameColumn("Difficulty", "challenge").commit();
// delete the colum Difficulty that is being renamed in the above TX2 to cause a conflict
table2.updateSchema().deleteColumn("Difficulty").commit(); //This is actual commit, will call single commit update API
TableCommit tableCommit1 =
TableCommit.create(
identifier1,
((BaseTransaction) t1Transaction).startMetadata(),
((BaseTransaction) t1Transaction).currentMetadata());
TableCommit tableCommit2 =
TableCommit.create(
identifier2,
((BaseTransaction) t2Transaction).startMetadata(),
((BaseTransaction) t2Transaction).currentMetadata());
//Will fail during commitTransaction
catalog.commitTransaction(tableCommit1, tableCommit2);
//loading both tables again after multi-table atomic commit
table1 = catalog.loadTable(identifier1);
table2 = catalog.loadTable(identifier2);
System.out.println("Below you can see Table1 updated schema after commit");
System.out.println(table1.schema());
System.out.println();
System.out.println("Below you can see Table2 updated schema after commit");
System.out.println(table2.schema());
}
Results
After the commit, you’ll observe that the Difficulty column is deleted from table2 due to the actual commit. However, no new column is added to table1 because the rename operation for the Difficulty column in table2 could not be performed, as the column was already deleted before the transaction could be committed.
Image 2: Error displayed in the Java Client
#watsonx.data