The Streams Python development guide mentions how you can quickly read local files from a Python application. However, what if the file is in a project in Cloud Pak for Data? How can you access files in a project from a Streams notebook?
In this post I’ll cover how you can use data from a file in a Streams Python notebook running in IBM Cloud Pak for Data.
Note for other languages: If you are developing with SPL, working with local files and directories is easy to do with the
DirectoryScan operators. Applications running in the IBM Cloud can apply the cloud-readiness techniques discussed on this page. The Streams Java API technique is discussed here.
How to do it
First, add the file to your project in IBM Cloud Pak for Data. Then, you can include the file in the Streams application, or Topology
, from the notebook.
Step 1: Add the file to the project
If you haven't already, add the file to the project.
Open the Python notebook from which you would like to access the file.
Once the file is added to the project, the path to the file is
Test it out by running
!ls -ls /project_data/data_asset/trades.csv
from the notebook.
This should print some details about the file.
If this works, skip to step 2
. Otherwise, if the file is not in that directory, follow these steps to find the path to the file relative to the project.
Optional: Find the path to the file relative to the notebook
From the notebook, click Find and add data > Files.
This will bring a list of files in the project. Find the file you want to use and click Insert to code > pandas DataFrame:
This will insert the following code snippet in a cell:
import pandas as pd
df_data_1 = pd.read_csv('/project_data/data_asset/trades.csv')
From the snippet we can copy the path to the file:
Note: Your file might not be visible if it is not a CSV file, but it will still be present in the path provided to
Step 2: Include the file in the Streams application
We can already access the file from the notebook, but there is an extra steps needed to use it from Streams application.
This is because the Streams application does not run in the same host where the project and the file are stored. Therefore, to open a file from a Streams application at runtime, we must make it available on the host where Streams is running.
This graphic illustrates how it works:
As depicted in the diagram, the project, file and notebook reside on one host, and the Topology created by the notebook is executed on another. That is why a Streams application cannot access the file unless it is made available on the host where the Streams instance is running.If your Streams Topology includes code to read a file, use
Topology.add_file_dependency to make the file available on the host where the Topology will be executed. This is because you define the Streams Topology in the notebook, but the code is actually executed on the Streams instance when the Topology is submitted.
The extra steps are to:
- Include the file in the code bundle that is sent to the Streams instance using
streamsx.ec.get_application_directory() to get the path to the file when you are ready to read it.
Putting it together
The basic steps are: 1. Find the path to the data set from the notebook.
As mentioned, each data set is within
Note: Since this path is subject to change, use Find data
> Insert to code
to find the correct path to the file. 2. Add the file to the Topology
After declaring the topology,
topo = Topology("MyApp")
# include the file as a dependency
path_to_data_sets = "/project_data/data_asset/"
file_name = "trades.csv"
topo.add_file_dependency(path_to_data_sets + filename, "etc")
lines_from_file = topo.source(CSVFileReader(filename))
At runtime, the file will be included under in the "etc" folder of your application directory. The application directory is a special folder that includes files specific to your application. The target folder must be either
. 3. Access the full path to the file at runtime
def __init__(self, file): # this is called in the notebook
self.file_name = file
def __call__(self): # called at runtime on the Streams instance
# open the file
+ "/etc/" + self.file_name) as handle:
Here is a complete example. We have data in a CSV file called trades.csv that has been added to our project, with the following contents:
#ticker, date, time, askprice
And we want to generate a stream of tuples and filter out
values greater than 0.
from streamsx.topology.topology import Topology
from icpd_core import icpd_util
# The CSVFileReader class opens the file at runtime and generates tuples:
def __init__(self, file):
self.file_name = file
# Convert each row in the file to a dict
header = ["ticker","date", "time", "askprice"]
+ "/etc/" + self.file_name) as handle:
reader = csv.DictReader(handle, delimiter=',',
#Use this to skip the header line if your file has one
for row in reader:
yield row # submit each row
file = "trades.csv"
topo = Topology("AnalyzeData") # add the file as a dependency
topo.add_file_dependency('/project_data/data_asset/' + file, "etc")
#use the CSVFileReader class to open the file on the Streams runtime host
lines_from_file = topo.source(CSVFileReader(file))
lines_from_file.filter(lambda tpl: float(tpl["askprice"]) > 0).print()
streams_instance_name = "sample-streams" ## Change this to Streams instance
# Disable SSL certificate verification if necessary
cfg[context.ConfigParams.SSL_VERIFY] = False
contextType = context.ContextTypes.DISTRIBUTED
context.submit (contextType, topo, config = cfg)
OrderedDict([('ticker', 'GLD'), ('date', '27-MAR-2020'), ('time', '14:06:09.854'), ('askprice', '50.7')])
OrderedDict([('ticker', 'IYF'), ('date', '27-MAR-2020'), ('time', '14:12:38.019'), ('askprice', '103.69')])
OrderedDict([('ticker', 'IOO'), ('date', '27-MAR-2020'), ('time', '14:13:20.873'), ('askprice', '64.02')])
OrderedDict([('ticker', 'AU'), ('date', '27-MAR-2020'), ('time', '14:13:32.877'), ('askprice', '49')])
Learn more about the Streams Python API in the Python development guide
and the full API reference. Topology.add_file_dependency()