In this three-part blog series, we will explore how to use the Iceberg Java client to interact with Iceberg tables in watsonx.data for seamless table management and operations, by leveraging the Metadata Service (MDS) Iceberg REST Catalog implementation in watsonx.data.
Pre-requisite
· For the Iceberg REST Catalog Java client trying to connect to watsonx.data, ensure the following Maven dependency is addressed:
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>1.6.1</version>
</dependency>
· Required roles in watsonx.data:
o For GET/HEAD endpoints - ADMIN/Metastore Admin/Metastore Viewer
o For other endpoints - Admin/Metastore Admin
Complete the following steps to connect with the Iceberg tables in watsonx.data:
NOTE: Step 1 – 4 is performed in watsonx.data and Step 5 – 10 is performed in the client side that will be connecting to watsonx.data.
1. In the watsonx.data console, go to Infrastructure manager & create an iceberg type catalog associated with any bucket of type S3A.
2. Navigate to the Catalog details page and get the details of the MDS Rest endpoint. This will be used when configuring the Java client.
NOTE: For CPD, use the Metastore External REST endpoint.
Image 1: Catalog details in watsonx.data on CPD
Image 2: Catalog details in watsonx.data SaaS
NOTE: If you are connecting to watsonx.data on CPD, complete steps 3-5. If you are connecting to watsonx.data SaaS, proceed to step 6.
3. Use the following command in CPD console to fetch the MDS certificates.
echo QUIT | openssl s_client -showcerts -connect <mds-rest-endpoint-host>:443 | awk '/-----BEGIN CERTIFICATE-----/ {p=1}; p; /-----END CERTIFICATE-----/ {p=0}' > mds.cert
4. Concatenate the mds.cert. Run the command – cat mds.cert
Image 3: Concatenated certificate
5. Import both certificates into the Java trust store of the client. This certificate will be used at runtime.
keytool -import -alias <alias_name> -file <certificate_file> -keystore <truststore_file> -storepass <truststore_password>
Example:
sudo keytool -import -alias mycert -file mycert.crt -keystore $JAVA_HOME/lib/security/cacerts -storepass changeit
6. Configure the Iceberg REST Catalog in the Java client. Following is a sample code:
import org.apache.iceberg.*;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTCatalog;
import java.util.HashMap;
import java.util.Map;
import static org.apache.iceberg.ManifestFile.SCHEMA;
public class RestCatalogAPIService {
//JAVA CLIENT
private final RESTCatalog catalog = new RESTCatalog();
/**
* Configuring Iceberg RestCatalog to connect with watsonx.data Iceberg type catalog
* Please replace <wxd-catalog-name> with watsonx.data Iceberg catalog name
*/
public RestCatalogAPIService(){
// Configuration for the REST catalog java api client
Map<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.rest.RESTCatalog");
//URL pointing to mds-rest service
properties.put(CatalogProperties.URI, "https://<mds-rest-domain>/mds/iceberg");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "<wxd-catalog-name>");
properties.put("header.Authorization","Bearer <bearer-token>");
//properties.put("header.Authorization","Basic <base64 encoded <username:password>>");
properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
properties.put(S3FileIOProperties.ACCESS_KEY_ID, "<access-key>");
properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, "<secret-key>");
String bucketEndpoint = "<endpoint>";
properties.put(S3FileIOProperties.ENDPOINT, bucketEndpoint);
System.setProperty("aws.region", "<region>");//Only way to force region properties.put(S3FileIOProperties.PRELOAD_CLIENT_ENABLED, "true"); properties.put(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true");
catalog.initialize("<wxd-catalog-name>", properties);
7. Create a namespace – This namespace will call the schema in watsonx.data in the Query workspace.
8. Create an Iceberg table within the schema/namespace.
9. Load the table into the schema created in Step 8.
10. Commit the updates to the table (created in Step 9) directly without any transaction.
NOTE: Refer to the following code for steps 7 – 10.
import org.apache.iceberg.*;
import org.apache.iceberg.aws.s3.S3FileIOProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTCatalog;
import java.util.HashMap;
import java.util.Map;
import static org.apache.iceberg.ManifestFile.SCHEMA;
public class RestCatalogAPIService {
//JAVA CLIENT
private final RESTCatalog catalog = new RESTCatalog();
/**
* Configuring Iceberg RestCatalog to connect with watsonx.data Iceberg type catalog
* Please replace <wxd-catalog-name> with watsonx.data Iceberg catalog name
*/
public RestCatalogAPIService(){
// Configuration for the REST catalog java api client
Map<String, String> properties = new HashMap<>();
properties.put(CatalogProperties.CATALOG_IMPL, "org.apache.iceberg.rest.RESTCatalog");
//URL pointing to mds-rest service
properties.put(CatalogProperties.URI, "https://<mds-rest-domain>/mds/iceberg");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "<wxd-catalog-name>");
properties.put("header.Authorization","Bearer <bearer-token>");
//properties.put("header.Authorization","Basic <base64 encoded <username:password>>");
properties.put(CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.aws.s3.S3FileIO");
properties.put(S3FileIOProperties.ACCESS_KEY_ID, "<access-key>");
properties.put(S3FileIOProperties.SECRET_ACCESS_KEY, "<secret-key>");
String bucketEndpoint = "<endpoint>";
properties.put(S3FileIOProperties.ENDPOINT, bucketEndpoint);
System.setProperty("aws.region", "<region>");//Only way to force region properties.put(S3FileIOProperties.PRELOAD_CLIENT_ENABLED, "true"); properties.put(S3FileIOProperties.USE_ARN_REGION_ENABLED, "true");
catalog.initialize("<wxd-catalog-name>", properties);
}
/**
* Create a schema in watsonx.data with configured Iceberg type wxd-catalog
* @param namespace name of the schema
*/
public void createNamespace(String namespace){
try {
//creating a namespace
catalog.createNamespace(Namespace.of(namespace));
System.out.println("Namespace created successfully, " + namespace);
} catch(Exception e) {
e.printStackTrace();
}
}
/**
* Create a table in watsonx.data with configured Iceberg type wxd-catalog
* @param schema Schema in which you intend to create a table
* @param tableName name of the table
*/
public void createTable(String schema, String tableName){
Namespace namespace = Namespace.of(schema);
TableIdentifier identifier1 = TableIdentifier.of(namespace, tableName);
//table schema definig columns & columns data type
Schema schema = new Schema(
Types.NestedField.optional(1, "Trek_id", Types.LongType.get()),
Types.NestedField.optional(2, "Trek_name", Types.StringType.get()),
Types.NestedField.optional(3, "Location", Types.StringType.get()),
Types.NestedField.optional(4, "Duration", Types.DoubleType.get()),
Types.NestedField.optional(5, "Difficulty", Types.StringType.get())
);
//creating a table
catalog.createTable(identifier1, tableSchema);
System.out.println("Table created successfully, " + schema + "." + tableName);
}
/**
* Load Iceberg table from watsonx.data
* @param schema
* @param tableName
*/
public void loadTable(String schema, String tableName){
try {
TableIdentifier identifier = TableIdentifier.of(Namespace.of(schema), tableName);
//loading a table
Table table = catalog.loadTable(identifier);
System.out.println(String.format("%s.%s table loaded successfully %s", schema, tableName, table));
} catch(Exception e) {
e.printStackTrace();
}
}
/**
* Commit updates to a Iceberg table
* @param schema
* @param tableName
*/
public void commitUpdatesToSingleTable(String schema, String tableName){
try {
TableIdentifier identifier = TableIdentifier.of(Namespace.of(schema), tableName);
//loading a table
Table table = catalog.loadTable(identifier);
System.out.println(String.format("%s.%s table loaded successfully %s", schema, tableName, table));
//single commit to Iceberg Table
table.updateSchema().addColumn("new_column", Types.StringType.get()).commit();
System.out.println(String.format("%s.%s New column added successfully to table %s", schema, tableName, table));
//load table again after updating schema
Table table = catalog.loadTable(identifier);
System.out.println("Below you can see table updated schema after commit");
} catch(Exception e) {
e.printStackTrace();
}
}
}
Examples
The following images provide some examples:
Image 4: Rest Catalog configuration example
Image 5: Java Client console output
Image 6: Schema & table created successfully in watsonx.data iceberg_data catalog
#watsonx.data