streamsx.objectstorage package¶
Object Storage integration for IBM Streams¶
For details of implementing applications in Python for IBM Streams including IBM Cloud Pak for Data and the Streaming Analytics service running on IBM Cloud see:
Overview¶
IBM® Cloud Object Storage (COS) makes it possible to store practically limitless amounts of data, simply and cost effectively. It is commonly used for data archiving and backup, web and mobile applications, and as scalable, persistent storage for analytics.
Cloud Object Storage or any other S3 compatible object storage can be used.
This module allows a Streams application to create objects in parquet format write_parquet
or
to write string messages with write
from a stream
of tuples.
Objects can be listed with scan
and read with read
.
Credentials¶
Select one of the following options to define your Cloud Object Storage credentials:
- Streams application configuration
- Setting the Cloud Object Storage service credentials JSON directly to the
credentials
parameter of the functions.
By default an application configuration named cos is used,
a different configuration name can be specified using the credentials
parameter to write()
, write_parquet()
, scan()
or read()
.
In addition to IAM token-based authentication, it is also possible to authenticate using a signature created from a pair of access and secret keys.
Provide the HMAC keys with the credentials
parameter as dictionary, for example:
credentials = {}
credentials['access_key_id'] = '7exampledonotusea6440da12685eee02'
credentials['secret_access_key'] = '8not8ed850cddbece407exampledonotuse43r2d2586'
Endpoints¶
It is required that you create a bucket before launching an application using this module.
When running the application in a Streaming Analytics service instance, it is recommended, for best performance, to create a bucket with:
- Resiliency: regional
- Location: Near your Streaming Analytics service, for example us-south
- Storage class: Standard
With these setting above it is recommended to use the private endpoint for the US-South region:
endpoint='s3.private.us-south.cloud-object-storage.appdomain.cloud'
Note:
- Use public endpoints to point your application that are hosted outside of the IBM cloud.
- Use cross-region endpoints for buckets creation with cross-region resiliency.
- Set the URL to object storage service with the
endpoint
parameter.
Find the list of endpoints and the endpoint description here: IBM® Cloud Object Storage Endpoints
To access any other Amazon S3 compatible object storage server you need set the endpoint
parameter, for example the MinIO server running at https://play.min.io:9000:
endpoint='play.min.io:9000'
Sample¶
A simple hello world example of a Streams application writing string messages to an object. Scan for created object and read the content:
from streamsx.topology.topology import *
from streamsx.topology.schema import CommonSchema
from streamsx.topology.context import submit
import streamsx.objectstorage as cos
topo = Topology('ObjectStorageHelloWorld')
to_cos = topo.source(['Hello', 'World!'])
to_cos = to_cos.as_string()
# sample bucket with resiliency "regional" and location "us-south"
bucket = 'streamsx-py-sample'
# US-South region private endpoint
endpoint='s3.private.us-south.cloud-object-storage.appdomain.cloud'
# Write a stream to COS
cos.write(to_cos, bucket, endpoint, '/sample/hw%OBJECTNUM.txt')
scanned = cos.scan(topo, bucket=bucket, endpoint=endpoint, directory='/sample')
# read text file line by line
r = cos.read(scanned, bucket=bucket, endpoint=endpoint)
# print each line (tuple)
r.print()
submit('STREAMING_ANALYTICS_SERVICE', topo)
# Use for IBM Streams including IBM Cloud Pak for Data
# submit ('DISTRIBUTED', topo)
-
streamsx.objectstorage.
scan
(topology, bucket, endpoint, pattern='.*', directory='/', credentials=None, ssl_enabled=None, vm_arg=None, name=None)¶ Scan a directory in a bucket for object names.
Scans an object storage directory and emits the names of new or modified objects that are found in the directory.
Example scanning a directory
/sample
for objects matching the pattern:import streamsx.objectstorage as cos scans = cos.scan(topo, bucket='your-bucket-name', directory='/sample', pattern='SAMPLE_[0-9]*\.ascii\.text$')
Parameters: - topology (Topology) – Topology to contain the returned stream.
- bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
- endpoint (str) –
Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
- pattern (str) – Limits the object names that are listed to the names that match the specified regular expression.
- directory (str) – Specifies the name of the directory to be scanned. Any subdirectories are not scanned.
- credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used. - ssl_enabled (bool) – Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used. - vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
. - name (str) – Sink name in the Streams context, defaults to a generated name.
Returns: Object names stream with schema
CommonSchema.String
.Return type: Stream
-
streamsx.objectstorage.
read
(stream, bucket, endpoint, credentials=None, ssl_enabled=None, vm_arg=None, name=None)¶ Read an object in a bucket.
Reads the object specified in the input stream and emits content of the object.
Example of reading object with the objects names from the
scanned
stream:import streamsx.objectstorage as cos r = cos.read(scanned, bucket=bucket, endpoint=endpoint)
Parameters: - stream (streamsx.topology.topology.Stream) – Stream of tuples with object names to be read. Expects
CommonSchema.String
in the input stream. - bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
- endpoint (str) –
Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
- credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used. - ssl_enabled (bool) – Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used. - vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
. - name (str) – Sink name in the Streams context, defaults to a generated name.
Returns: Object content line by line with schema
CommonSchema.String
.Return type: - stream (streamsx.topology.topology.Stream) – Stream of tuples with object names to be read. Expects
-
streamsx.objectstorage.
write
(stream, bucket, endpoint, object, time_per_object=10.0, header=None, credentials=None, ssl_enabled=None, vm_arg=None, name=None)¶ Write strings to an object.
Adds a COS-Writer where each tuple on stream is written into an object.
Example of creating an object with two lines:
import streamsx.objectstorage as cos to_cos = topo.source(['Hello', 'World!']) to_cos = to_cos.as_string() cos.write(to_cos, bucket, endpoint, '/sample/hw%OBJECTNUM.txt')
Parameters: - stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Expects
CommonSchema.String
in the input stream. - bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
- endpoint (str) –
Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
- object (str) – Name of the object to be created in your bucket. For example,
SAMPLE_%OBJECTNUM.text
, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented. - time_per_object (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.
- header (str) – Specify the content of the header row. This header is added as first line in the object. Use this parameter when writing strings in CSV format and you like to query the objects with the IBM SQL Query service. By default no header row is generated.
- credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used. - ssl_enabled (bool) – Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used. - vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
. - name (str) – Sink name in the Streams context, defaults to a generated name.
Returns: Stream termination.
Return type: - stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Expects
-
streamsx.objectstorage.
write_parquet
(stream, bucket, endpoint, object, time_per_object=10.0, credentials=None, ssl_enabled=None, vm_arg=None, name=None)¶ Create objects in parquet format.
Adds a COS-Writer where each tuple on stream is written into an object in parquet format.
Example of creating objects in parquet format from a stream named ‘js’ in JSON format:
import streamsx.objectstorage as cos ... # JSON to tuple to_cos = js.map(schema='tuple<rstring a, int32 b>') cos.write(to_cos, bucket=bucket, endpoint=endpoint, object='/parquet/sample/hw%OBJECTNUM.parquet')
Parameters: - stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Supports
streamsx.topology.schema.StreamSchema
(schema for a structured stream) as input. Attributes are mapped to parquet columns. - bucket (str) – Bucket name. Bucket must have been created in your Cloud Object Storage service before using this function.
- endpoint (str) –
Endpoint for Cloud Object Storage. Select the endpoint for your bucket location and resiliency: IBM® Cloud Object Storage Endpoints. Use a private enpoint when running in IBM cloud Streaming Analytics service.
- object (str) – Name of the object to be created in your bucket. For example,
SAMPLE_%OBJECTNUM.parquet
, %OBJECTNUM is an object number, starting at 0. When a new object is opened for writing the number is incremented. - time_per_object (int|float|datetime.timedelta) – Specifies the approximate time, in seconds, after which the current output object is closed and a new object is opened for writing.
- credentials (str|dict) – Credentials in JSON or name of the application configuration containing the credentials for Cloud Object Storage. When set to
None
the application configurationcos
is used. - ssl_enabled (bool) – Set to
False
if you want to use HTTP instead of HTTPS. Per default SSL is enabled and HTTPS is used. - vm_arg (str) – Arbitrary JVM arguments can be passed. For example, increase JVM’s maximum heap size
'-Xmx 8192m'
. - name (str) – Sink name in the Streams context, defaults to a generated name.
Returns: Stream termination.
Return type: - stream (streamsx.topology.topology.Stream) – Stream of tuples to be written to an object. Supports
-
streamsx.objectstorage.
download_toolkit
(url=None, target_dir=None)¶ Downloads the latest Objectstorage toolkit from GitHub.
Example for updating the Objectstorage toolkit for your topology with the latest toolkit from GitHub:
import streamsx.objectstorage as objectstorage # download toolkit from GitHub objectstorage_toolkit_location = objectstorage.download_toolkit() # add the toolkit to topology streamsx.spl.toolkit.add_toolkit(topology, objectstorage_toolkit_location)
Example for updating the topology with a specific version of the Objectstorage toolkit using a URL:
import streamsx.objectstorage as objectstorage url1100 = 'https://github.com/IBMStreams/streamsx.objectstorage/releases/download/v1.10.0/streamsx.objectstorage.toolkits-1.10.0-20190730-1132.tgz' objectstorage_toolkit_location = objectstorage.download_toolkit(url=url1100) streamsx.spl.toolkit.add_toolkit(topology, objectstorage_toolkit_location)
Parameters: - url (str) – Link to toolkit archive (*.tgz) to be downloaded. Use this parameter to download a specific version of the toolkit.
- target_dir (str) – the directory where the toolkit is unpacked to. If a relative path is given,
the path is appended to the system temporary directory, for example to /tmp on Unix/Linux systems.
If target_dir is
None
a location relative to the system temporary directory is chosen.
Returns: the location of the downloaded Objectstorage toolkit
Return type: str
Note
This function requires an outgoing Internet connection
New in version 1.3.
-
streamsx.objectstorage.
configure_connection
(instance, name='cos', credentials=None)¶ Configures IBM Streams for a certain connection.
Creates an application configuration object containing the required properties with connection information.
Example for creating a configuration for a Streams instance with connection details:
streamsx.rest import Instance import streamsx.topology.context from icpd_core import icpd_util import streamsx.objectstorage as cos cfg = icpd_util.get_service_instance_details(name='your-streams-instance') cfg[streamsx.topology.context.ConfigParams.SSL_VERIFY] = False instance = Instance.of_service(cfg) app_cfg = cos.configure_connection(instance, credentials='my_credentials_json')
Parameters: - instance (streamsx.rest_primitives.Instance) – IBM Streams instance object.
- name (str) – Name of the application configuration, default name is ‘cos’.
- credentials (str|dict) – The service credentials for IBM Cloud Object Storage.
Returns: Name of the application configuration.
Warning
The function can be used only in IBM Cloud Pak for Data
New in version 1.1.