watsonx.data

watsonx.data

Put your data to work, wherever it resides, with the hybrid, open data lakehouse for AI and analytics

 View Only

Iceberg Rest API on watsonx.data: Java Client PART-3 — Appending Data Files

By Hemant Marve posted 19 days ago

  

In Part1 & Part2 of this blog series, we understood basic operations & how to configure Iceberg Rest catalog Java client to connect to watsonx.data Iceberg catalogs.

In this blog, we’ll explore how to append data files to an existing Iceberg table. We’ll also look into the Iceberg data module, which offers convenient classes for generating records and handling the actual file-writing process.

To append data files, you must ensure the following additional Maven dependencies are addressed:

<dependency>
   <groupId>org.apache.iceberg</groupId>
   <artifactId>iceberg-data</artifactId>
   <version>1.6.1</version>
  </dependency>

  <dependency>
   <groupId>org.apache.iceberg</groupId>
   <artifactId>iceberg-common</artifactId>
   <version>1.6.1</version>
  </dependency>

  <dependency>
   <groupId>org.apache.iceberg</groupId>
   <artifactId>iceberg-parquet</artifactId>
   <version>1.6.1</version>
  </dependency>

  <dependency>
   <groupId>org.apache.parquet</groupId>
   <artifactId>parquet-avro</artifactId>
   <version>1.15.0</version>
  </dependency>

  <dependency>
   <groupId>org.apache.parquet</groupId>
   <artifactId>parquet-common</artifactId>
   <version>1.15.0</version>
  </dependency>

The following code shows how to append data to an Iceberg table. First, we’ll create an Iceberg table. Then, we’ll generate records and create a data file. Finally, we’ll append the generated data file to the Iceberg table.

/***
     * Base function to call all other methods
     * @throws IOException
     */
    public void appendingDataFilesIcebergDemo() throws IOException {
        Namespace namespace = Namespace.of("Trek_company");

        //if namespace doesn't exists, let's create first
        if(!catalog.namespaceExists(namespace)) {
            catalog.createNamespace(namespace);
        }

        TableIdentifier identifier = TableIdentifier.of(namespace, "trekyaari");
        Table table = null;

        //if table doesn't exists, let's create a table first
        if(catalog.tableExists(identifier)) {

            //loading table if already exists
            table = catalog.loadTable(identifier);
        }
        else{
            //Let's create demo table
            table = createDemoTable();
        }
        ImmutableList<GenericRecord> records = createDemoRecords(table.schema());

        //Let's finally write data on file & append Data file to an Iceberg table
        appendingFileDemo(table, records);
    }

    /**
     *
     * @return will create & return demo table
     */
    public Table createDemoTable(){
        Schema schema = new Schema(
                Types.NestedField.optional(1, "trek_id", Types.StringType.get()),
                Types.NestedField.optional(2, "trek_name", Types.StringType.get()),
                Types.NestedField.optional(3, "location", Types.StringType.get()),
                Types.NestedField.optional(4, "duration", Types.DoubleType.get()),
                Types.NestedField.optional(5, "difficulty", Types.StringType.get())
        );

        Namespace trekCompany = Namespace.of("Trek_company");
        TableIdentifier name = TableIdentifier.of(trekCompany, "trekyaari");
        return catalog.createTable(name, schema, PartitionSpec.unpartitioned());
    }

    /**
     * Creating demo records for writing on data file
     * @param schema
     * @return
     */
    public ImmutableList<GenericRecord> createDemoRecords(Schema schema){
        GenericRecord record = GenericRecord.create(schema);
        ImmutableList.Builder<GenericRecord> builder = ImmutableList.builder();
        builder.add(record.copy(ImmutableMap.of("trek_id", UUID.randomUUID().toString(), "trek_name", "Triund Trek", "location", "Himachal Pradesh", "duration", 1.0, "difficulty", "Intermediate")));
        builder.add(record.copy(ImmutableMap.of("trek_id", UUID.randomUUID().toString(), "trek_name", "Kheerganga Trek", "location", "Himachal Pradesh", "duration", 1.0, "difficulty", "Beginner")));
        builder.add(record.copy(ImmutableMap.of("trek_id", UUID.randomUUID().toString(), "trek_name", "Kashmir Great Lakes Trek", "location", "Kashmir", "duration", 2.0, "difficulty", "Difficult")));
        builder.add(record.copy(ImmutableMap.of("trek_id", UUID.randomUUID().toString(), "trek_name", "Hampta Paas Trek", "location", "Himachal Pradesh", "duration", 1.0, "difficulty", "Moderate")));
        ImmutableList<GenericRecord> records = builder.build();
        return records;
    }

    /**
     * Let's finally write data on file & append Data file to an Iceberg table
     * @param table
     * @param records
     * @throws IOException
     */
    public void appendingFileDemo(Table table, List<GenericRecord> records) throws IOException {
        String filepath = table.location() + "/" + UUID.randomUUID().toString();
        OutputFile file = table.io().newOutputFile(filepath);
        DataWriter<GenericRecord> dataWriter =
                Parquet.writeData(file)
                        .schema(table.schema())
                        .createWriterFunc(GenericParquetWriter::buildWriter)
                        .overwrite()
                        .withSpec(PartitionSpec.unpartitioned())
                        .build();

        try {
            for (GenericRecord record : records) {
                //writing records on data file
                dataWriter.write(record);
            }
        } finally {
            dataWriter.close();
        }

        //contains metadata of file
        DataFile dataFile = dataWriter.toDataFile();

        //appending & committing data file having records
        table.newAppend().appendFile(dataFile).commit();
    }

Results:

The following image shows the results of the code in watsonx.data.

You can see that the data file is successfully appended using the Java client.

Data File appended successfully using Java client

                                          Data File appended successfully using Java client


#watsonx.data
0 comments
17 views

Permalink