Data and AI on Power

 View Only

Simplify Data Access to Multiple Data Sources Using Trino on IBM Power

By Natalie Jann posted Mon November 07, 2022 03:07 AM

  
Are you annoyed by writing a custom code for every database you need to connect to, to get all the data you need for your machine learning (ML) models?
Here comes a handy solution: use Trino - now fully integrated in our Kubeflow on Power distribution for machine learning operations (MLOps)!

Trino is a distributed query engine that allows you to query all kinds of databases, be it relational or non-relational, using SQL. That already greatly harmonizes data access. But it gets even better: Within the same SQL query you can join data from multiple database systems on the fly. Yes, you heard right. Just a single SQL query to query all of your databases.

Overall, Trino can help you to save lots of time as well as storage space and avoid maintaining duplicate code. For example, you’ll now only need one Kubeflow component inside your ML pipeline to query all your data sources via Trino.

Here comes an example that shows you these helpful features in action.
To follow along, you need:
- Access to an OpenShift on IBM Power cluster via CLI
- A Kubeflow installation (on top of OpenShift)
- A PostgreSQL database and a MongoDB instance (ideally also running in your OpenShift cluster)

The basic setup of the example developed hereinafter looks as follows:
High-level architecture overview for this blog's example

Installing Trino on OpenShift

First, you need to set up Trino. For that we will use the official Helm Chart.

At the time of writing, the latest release (0.8.0) still includes a bug which is why we clone the code and modify it directly as shown below 1 . Start in a fresh namespace and get the chart's code:
oc new-project trino --display-name="Trino" --description="Virtualizes access to datalakes via SQL."
git clone https://github.com/trinodb/charts.git
cd charts/charts

Next, you need to customize the helm chart's configuration in
trino/values.yaml. For one thing, this concerns general server settings. On the other hand, it encompasses the definition of your catalogs. These are the databases you want to speak to.

Adapt the connection parameters under additionalCatalogs as needed to connect to your databases before executing the subsequent commands. Find more information about how to configure the connectors here (MongoDB) and here (PostgreSQL).
cat >> trino-catalogs.txt <<'EOF'
additionalCatalogs:
  mongodb: |
    connector.name=mongodb
    mongodb.connection-url=mongodb://<username>:<password>@<mongodb-service-name>.<namespace>:<port>/?authSource=<authentication-database>
    mongodb.schema-collection=schemadef
  postgresql: |
    connector.name=postgresql
    connection-url=jdbc:postgresql://<postgresql-service-name>.<namespace>:<port>/<database>
    connection-user=<username>
    connection-password=<password>
EOF
sed -i "/additionalCatalogs: {}/r trino-catalogs.txt" trino/values.yaml
sed -i "/additionalCatalogs: {}/d" trino/values.yaml
rm -f trino-catalogs.txt
Note: The configuration for MongoDB includes the property mongodb.schema-collection. You need to create that collection manually and insert one document with a schema definition per collection you want to query via Trino.

Moreover, we need to fix some permission-related settings. Openshift chooses a random UID/GID when starting containers whereas Trino assumes that both are 1000. We'll fix that within the next two steps:
cat >> trino-security-context.txt <<'EOF'
      securityContext:
        runAsUser: 1000
        runAsGroup: 1000
EOF
sed -i "/serviceAccountName: {{ include \"trino.serviceAccountName\" . }}/r trino-security-context.txt" trino/templates/deployment-worker.yaml
rm -f trino-security-context.txt​

Of course, you may edit
values.yaml directly using an editor. However, the approach shown above is more comfortable if you want to create a script for the whole setup process.  Now, complete the installation:
helm install trino ./trino

oc adm policy add-scc-to-user anyuid -z default -n trino

You should get a confirmation of the successful installation and
oc get pods should show you a trino-coordinator and two trino-workers.

Using Trino in Kubeflow Pipelines

Next, start a new Jupyter Notebook server via the Kubeflow UI:
Jupyter Notebook creation in the Kubeflow UI


Specify quay.io/ibm/kubeflow-notebook-image-ppc64le:latest as the base image and connect to the notebook once it is running.

Now, it is time to define a function that handles the connection to Trino. For that, you can use the trino-python-client library already installed in the base image.
from kfp.components import OutputPath

def connect2trino(
    catalog: str,
    col_names: list,
    host: str,
    prep_dataset_dir: OutputPath(str),
    query: str,
    port: int=8080,
    schema: str='default'
):
    
    from trino.dbapi import Connection
    import pandas as pd
    
    with Connection(
        catalog=catalog,
        host=host,
        port=port,
        schema=schema,
        user='anybody'
    ) as conn:

        link = conn.cursor()
        link.execute(query)
        df = pd.DataFrame(link.fetchall())

    df.columns = col_names
    df.to_csv(prep_dataset_dir)​

The function's parameters define which catalog (database) and which schema Trino should connect to.
If your query joins data from multiple databases, it does not matter for which of these you specify the arguments. You have to use the full table specification (<connector>.<schema>.<table>) in the query anyway. Also, the function signature encompasses the query to run, the expected columns and where to write the query result to.

Then, make this function a component:
from kfp.components import create_component_from_func

trino_connection_comp = create_component_from_func(
    func=connect2trino,
    base_image='quay.io/ibm/kubeflow-notebook-image-ppc64le:latest'
)​

For the purpose of demonstration we will create a small, one-step pipeline, which uses our Trino component:
from kfp import Client

with open('/var/run/secrets/kubernetes.io/serviceaccount/namespace') as f:
    NAMESPACE = f.read()

@dsl.pipeline(
  name='Trino Demo',
  description='An example pipeline that connects to 2 databases via Trino'
)
def trino_pipeline():
    query_trino_task = trino_connection_comp(
        catalog='mongodb',
        col_names=['date', 'precipitation', 'close_price'],
        host='trino.trino',
        query="SELECT * FROM mongodb.weather.ny w JOIN postgresql.public.stockhistory s ON w._id = s.Date WHERE s.Date < date \'2022-08-05\' ORDER BY date ASC",
        schema='weather',
    )

client = Client()
client.create_run_from_pipeline_func(
    trino_pipeline,
    namespace=NAMESPACE,
)​

The example contains some exemplary arguments for the Trino connection and query. Adapt them to suit your database setup.

Now that you know how a basic Trino-with-Kubeflow setup could look like, start experimenting with more sophisticated queries.
If you dare, try to include it in your own data retrieval pipelines!
Also stay tuned for updates on our on-going efforts to integrate Trino with Kubeflow - expect to get Trino and examples automatically packaged directly into our Kubeflow on IBM Power distribution very soon!



1) Once there is a new release of the Trino helm chart, proceed as follows to install Trino:
helm repo add trino https://trinodb.github.io/charts/

wget https://raw.githubusercontent.com/trinodb/charts/main/charts/trino/values.yaml -O trino/values.yaml
Modify the values.yaml file as shown in the article. Then, the last step is:
helm install trino trino/trino --version 0.8.0 -f trino/values.yaml

 


#Featured-area-1#Featured-area-1-home

Permalink