Cloud Pak for Data Group

How to use local files in a Streams notebook in Cloud Pak for Data 

Mon August 10, 2020 05:25 PM

The Streams Python development guide mentions how you can quickly read local files from a Python application.  However, what if the file is in a project in Cloud Pak for Data? How can you access files in a project from a Streams notebook?
In this post I’ll cover how you can use data from a file in a Streams Python notebook running in IBM Cloud Pak for Data.


Note for other languages: If you are developing with SPL, working with local files and directories is easy to do with the FileSource and DirectoryScan operators. Applications running in the IBM Cloud can apply the cloud-readiness techniques discussed on this page. The Streams Java API technique is discussed here.

How to do it

First, add the file to your project in IBM Cloud Pak for Data. Then, you can include the file in the Streams application, or Topology, from the notebook.

Step 1: Add the file to the project

 If you haven't already, add the file to the project.
 
Open the Python notebook from which you would like to access the file.

Once the file is added to the project, the path to the file is /project_data/data_asset/<filename>

Test it out by running !ls -ls /project_data/data_asset/trades.csv from the notebook.

This should print some details about the file.
If this works, skip to step 2. Otherwise, if the file is not in that directory, follow these steps to find the path to the file relative to the project.

Optional: Find the path to the file relative to the notebook

From the notebook, click Find and add data > Files. This will bring a list of files in the project. Find the file you want to use and click Insert to code > pandas DataFrame: 


This will insert the following code snippet in a cell:
import pandas as pd
df_data_1 = pd.read_csv('/project_data/data_asset/trades.csv')
df_data_1.head()

From the snippet we can copy the path to the file: /project_data/data_asset/trades.csv
Note: Your file might not be visible if it is not a CSV file, but it will still be present in the path provided to pandas.read_csv

Step 2: Include the file in the Streams application

We can already access the file from the notebook, but there is an extra steps needed to use it from Streams application.

This is because the Streams application does not run in the same host where the project and the file are stored. Therefore, to open a file from a Streams application at runtime, we must make it available on the host where Streams is running.

This graphic illustrates how it works:


 As depicted in the diagram, the project, file and notebook reside on one host, and the Topology created by the notebook is executed on another. That is why a Streams application cannot access the file unless it is made available on the host where the Streams instance is running.

If your Streams Topology includes code to read a file, use Topology.add_file_dependency to make the file available on the host where the Topology will be executed. This is because you define the Streams Topology in the notebook, but the code is actually executed on the Streams instance when the Topology is submitted.

The extra steps are to:
  • Include the file in the code bundle that is sent to the Streams instance using Topology.add_file_dependency().
  • Use streamsx.ec.get_application_directory() to get the path to the file when you are ready to read it.

Putting it together

The basic steps are:

1. Find the path to the data set from the notebook. As mentioned, each data set is within /project_data/data_asset/<filename> Note: Since this path is subject to change, use Find data > Insert to code to find the correct path to the file.

2. Add the file to the Topology
After declaring the topology,
topo = Topology("MyApp")
# include the file as a dependency
path_to_data_sets = "/project_data/data_asset/"
file_name = "trades.csv"
topo.add_file_dependency(path_to_data_sets + filename, "etc")
lines_from_file = topo.source(CSVFileReader(filename))
 


At runtime, the file will be included under in the "etc" folder of your application directory. The application directory is a special folder that includes files specific to your application. The target folder must be either etc or opt.

3. Access the full path to the file at runtime:


import streamsx.ec
class CSVFileReader:
    def __init__(self, file): # this is called in the notebook
        self.file_name = file
    def __call__(self): # called at runtime on the Streams instance
# open the file with open(streamsx.ec.get_application_directory() + "/etc/" + self.file_name) as handle:

Complete example

Here is a complete example. We have data in a CSV file called trades.csv that has been added to our project, with the following contents:
#ticker, date, time, askprice
"AEC","27-MAR-2020","06:16:53.068",0
"AGU","27-MAR-2020","06:16:53.068",0
"AOF","27-MAR-2020","06:16:53.068",0
"AWF","27-MAR-2020","06:16:54.074",0

And we want to generate a stream of tuples and filter out askprice values greater than 0.
from streamsx.topology.topology import Topology
from icpd_core import icpd_util
import streamsx.topology.context
import os
import csv
import streamsx.ec

# The CSVFileReader class opens the file at runtime and generates tuples:

class CSVFileReader:
    def __init__(self, file):
        self.file_name = file
    def __call__(self):
        # Convert each row in the file to a dict
        header = ["ticker","date", "time", "askprice"]
        with open(streamsx.ec.get_application_directory()
               + "/etc/" + self.file_name) as handle:
            reader = csv.DictReader(handle, delimiter=',',
                                    fieldnames=header)
            #Use this to skip the header line if your file has one
            next(reader)
            for row in reader:
                yield row  # submit each row  


file = "trades.csv"
topo = Topology("AnalyzeData") # add the file as a dependency 
topo.add_file_dependency('/project_data/data_asset/' + file, "etc")

#use the CSVFileReader class to open the file on the Streams runtime host 
lines_from_file = topo.source(CSVFileReader(file))
lines_from_file.filter(lambda tpl: float(tpl["askprice"]) > 0).print() 

streams_instance_name = "sample-streams" ## Change this to Streams instance
try:
    cfg=icpd_util.get_service_instance_details(name=streams_instance_name, instance_type="streams")
except TypeError:
    cfg=icpd_util.get_service_instance_details(name=streams_instance_name)

# Disable SSL certificate verification if necessary
cfg[context.ConfigParams.SSL_VERIFY] = False
contextType = context.ContextTypes.DISTRIBUTED
context.submit (contextType, topo, config = cfg)


Sample Output:
OrderedDict([('ticker', 'GLD'), ('date', '27-MAR-2020'), ('time', '14:06:09.854'), ('askprice', '50.7')])
OrderedDict([('ticker', 'IYF'), ('date', '27-MAR-2020'), ('time', '14:12:38.019'), ('askprice', '103.69')])
OrderedDict([('ticker', 'IOO'), ('date', '27-MAR-2020'), ('time', '14:13:20.873'), ('askprice', '64.02')])
OrderedDict([('ticker', 'AU'), ('date', '27-MAR-2020'), ('time', '14:13:32.877'), ('askprice', '49')])


Learn more about the Streams Python API in the Python development guide and the full API reference. Topology.add_file_dependency() reference.
#streams

Statistics

0 Favorited
7 Views
0 Files
0 Shares
0 Downloads