In Part1 & Part2 of this blog series, we understood basic operations & how to configure Iceberg Rest catalog Java client to connect to watsonx.data Iceberg catalogs.
In this blog, we’ll explore how to append data files to an existing Iceberg table. We’ll also look into the Iceberg data module, which offers convenient classes for generating records and handling the actual file-writing process.
To append data files, you must ensure the following additional Maven dependencies are addressed:
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>1.15.0</version>
</dependency>
The following code shows how to append data to an Iceberg table. First, we’ll create an Iceberg table. Then, we’ll generate records and create a data file. Finally, we’ll append the generated data file to the Iceberg table.
/***
* Base function to call all other methods
* @throws IOException
*/
public void appendingDataFilesIcebergDemo() throws IOException {
Namespace namespace = Namespace.of("Trek_company");
//if namespace doesn't exists, let's create first
if(!catalog.namespaceExists(namespace)) {
catalog.createNamespace(namespace);
}
TableIdentifier identifier = TableIdentifier.of(namespace, "trekyaari");
Table table = null;
//if table doesn't exists, let's create a table first
if(catalog.tableExists(identifier)) {
//loading table if already exists
table = catalog.loadTable(identifier);
}
else{
//Let's create demo table
table = createDemoTable();
}
ImmutableList<GenericRecord> records = createDemoRecords(table.schema());
//Let's finally write data on file & append Data file to an Iceberg table
appendingFileDemo(table, records);
}
/**
*
* @return will create & return demo table
*/
public Table createDemoTable(){
Schema schema = new Schema(
Types.NestedField.optional(1, "trek_id", Types.StringType.get()),
Types.NestedField.optional(2, "trek_name", Types.StringType.get()),
Types.NestedField.optional(3, "location", Types.StringType.get()),
Types.NestedField.optional(4, "duration", Types.DoubleType.get()),
Types.NestedField.optional(5, "difficulty", Types.StringType.get())
);
Namespace trekCompany = Namespace.of("Trek_company");
TableIdentifier name = TableIdentifier.of(trekCompany, "trekyaari");
return catalog.createTable(name, schema, PartitionSpec.unpartitioned());
}
/**
* Creating demo records for writing on data file
* @param schema
* @return
*/
public ImmutableList<GenericRecord> createDemoRecords(Schema schema){
GenericRecord record = GenericRecord.create(schema);
ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
builder.add(record.copy(ImmutableMap.of("trek_id", UUID.randomUUID().toString(), "trek_name", "Triund Trek", "location", "Himachal Pradesh", "duration", 1.0, "difficulty", "Intermediate")));
builder.add(record.copy(ImmutableMap.of("trek_id", UUID.randomUUID().toString(), "trek_name", "Kheerganga Trek", "location", "Himachal Pradesh", "duration", 1.0, "difficulty", "Beginner")));
builder.add(record.copy(ImmutableMap.of("trek_id", UUID.randomUUID().toString(), "trek_name", "Kashmir Great Lakes Trek", "location", "Kashmir", "duration", 2.0, "difficulty", "Difficult")));
builder.add(record.copy(ImmutableMap.of("trek_id", UUID.randomUUID().toString(), "trek_name", "Hampta Paas Trek", "location", "Himachal Pradesh", "duration", 1.0, "difficulty", "Moderate")));
ImmutableList<GenericRecord> records = builder.build();
return records;
}
/**
* Let's finally write data on file & append Data file to an Iceberg table
* @param table
* @param records
* @throws IOException
*/
public void appendingFileDemo(Table table, List<GenericRecord> records) throws IOException {
String filepath = table.location() + "/" + UUID.randomUUID().toString();
OutputFile file = table.io().newOutputFile(filepath);
DataWriter<GenericRecord> dataWriter =
Parquet.writeData(file)
.schema(table.schema())
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build();
try {
for (GenericRecord record : records) {
//writing records on data file
dataWriter.write(record);
}
} finally {
dataWriter.close();
}
//contains metadata of file
DataFile dataFile = dataWriter.toDataFile();
//appending & committing data file having records
table.newAppend().appendFile(dataFile).commit();
}
Results:
The following image shows the results of the code in watsonx.data.
You can see that the data file is successfully appended using the Java client.
Data File appended successfully using Java client
#watsonx.data