First, you need to set up Trino. For that we will use the official Helm Chart.
At the time of writing, the latest release (0.8.0) still includes a bug which is why we clone the code and modify it directly as shown below 1 . Start in a fresh namespace and get the chart's code:
oc new-project trino --display-name="Trino" --description="Virtualizes access to datalakes via SQL."
git clone https://github.com/trinodb/charts.git
cd charts/charts
Next, you need to customize the helm chart's configuration in trino/values.yaml
. For one thing, this concerns general server settings. On the other hand, it encompasses the definition of your catalogs. These are the databases you want to speak to.
Adapt the connection parameters under additionalCatalogs
as needed to connect to your databases before executing the subsequent commands. Find more information about how to configure the connectors here (MongoDB) and here (PostgreSQL).
cat >> trino-catalogs.txt <<'EOF'
additionalCatalogs:
mongodb: |
connector.name=mongodb
mongodb.connection-url=mongodb://<username>:<password>@<mongodb-service-name>.<namespace>:<port>/?authSource=<authentication-database>
mongodb.schema-collection=schemadef
postgresql: |
connector.name=postgresql
connection-url=jdbc:postgresql://<postgresql-service-name>.<namespace>:<port>/<database>
connection-user=<username>
connection-password=<password>
EOF
sed -i "/additionalCatalogs: {}/r trino-catalogs.txt" trino/values.yaml
sed -i "/additionalCatalogs: {}/d" trino/values.yaml
rm -f trino-catalogs.txt
Note: The configuration for MongoDB includes the property mongodb.schema-collection
. You need to create that collection manually and insert one document with a schema definition per collection you want to query via Trino.
Moreover, we need to fix some permission-related settings. Openshift chooses a random UID/GID when starting containers whereas Trino assumes that both are 1000. We'll fix that within the next two steps:
cat >> trino-security-context.txt <<'EOF'
securityContext:
runAsUser: 1000
runAsGroup: 1000
EOF
sed -i "/serviceAccountName: {{ include \"trino.serviceAccountName\" . }}/r trino-security-context.txt" trino/templates/deployment-worker.yaml
rm -f trino-security-context.txt
Of course, you may edit values.yaml
directly using an editor. However, the approach shown above is more comfortable if you want to create a script for the whole setup process. Now, complete the installation:
helm install trino ./trino
oc adm policy add-scc-to-user anyuid -z default -n trino
Next, start a new Jupyter Notebook server via the Kubeflow UI:
from kfp.components import OutputPath
def connect2trino(
catalog: str,
col_names: list,
host: str,
prep_dataset_dir: OutputPath(str),
query: str,
port: int=8080,
schema: str='default'
):
from trino.dbapi import Connection
import pandas as pd
with Connection(
catalog=catalog,
host=host,
port=port,
schema=schema,
user='anybody'
) as conn:
link = conn.cursor()
link.execute(query)
df = pd.DataFrame(link.fetchall())
df.columns = col_names
df.to_csv(prep_dataset_dir)
The function's parameters define which catalog (database) and which schema Trino should connect to. If your query joins data from multiple databases, it does not matter for which of these you specify the arguments. You have to use the full table specification (<connector>.<schema>.<table>
) in the query anyway. Also, the function signature encompasses the query to run, the expected columns and where to write the query result to.
Then, make this function a component:
from kfp.components import create_component_from_func
trino_connection_comp = create_component_from_func(
func=connect2trino,
base_image='quay.io/ibm/kubeflow-notebook-image-ppc64le:latest'
)
For the purpose of demonstration we will create a small, one-step pipeline, which uses our Trino component:
from kfp import Client
with open('/var/run/secrets/kubernetes.io/serviceaccount/namespace') as f:
NAMESPACE = f.read()
@dsl.pipeline(
name='Trino Demo',
description='An example pipeline that connects to 2 databases via Trino'
)
def trino_pipeline():
query_trino_task = trino_connection_comp(
catalog='mongodb',
col_names=['date', 'precipitation', 'close_price'],
host='trino.trino',
query="SELECT * FROM mongodb.weather.ny w JOIN postgresql.public.stockhistory s ON w._id = s.Date WHERE s.Date < date \'2022-08-05\' ORDER BY date ASC",
schema='weather',
)
client = Client()
client.create_run_from_pipeline_func(
trino_pipeline,
namespace=NAMESPACE,
)
The example contains some exemplary arguments for the Trino connection and query. Adapt them to suit your database setup.
Now that you know how a basic Trino-with-Kubeflow setup could look like, start experimenting with more sophisticated queries. If you dare, try to include it in your own data retrieval pipelines!
Also stay tuned for updates on our on-going efforts to integrate Trino with Kubeflow - expect to get Trino and examples automatically packaged directly into our Kubeflow on IBM Power distribution very soon!
1) Once there is a new release of the Trino helm chart, proceed as follows to install Trino:
helm repo add trino https://trinodb.github.io/charts/
wget https://raw.githubusercontent.com/trinodb/charts/main/charts/trino/values.yaml -O trino/values.yaml
Modify the values.yaml
file as shown in the article. Then, the last step is:
helm install trino trino/trino --version 0.8.0 -f trino/values.yaml