Coder Social home page Coder Social logo

ibmstreams / streamsx.objectstorage Goto Github PK

View Code? Open in Web Editor NEW
4.0 4.0 7.0 117.93 MB

The com.ibm.streamsx.objectstorage toolkit supports Object Storage services with S3 API like IBM Cloud Object Storage service.

Home Page: https://ibmstreams.github.io/streamsx.objectstorage

License: Other

Java 78.32% Makefile 3.17% Python 18.49% Shell 0.02%
ibm-streams object-storage stream-processing toolkit

streamsx.objectstorage's People

Contributors

anouri avatar apyasic avatar brandtol avatar chanskw avatar ddebrunner avatar dependabot[bot] avatar dmhursh avatar markheger avatar

Stargazers

 avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamsx.objectstorage's Issues

iss#9-stocator-based-impl:BucketScanSample failed

Checking variables in object name 'output_%TIME.txt'
Checking object name section '%TIME.txt' validity
com.ibm.tools.attach.AttachNotSupportedException: target not found
at com.ibm.tools.attach.javaSE.VirtualMachineImpl.attachTarget(VirtualMachineImpl.java:87)
at com.ibm.tools.attach.javaSE.AttachProviderImpl.attachVirtualMachine(AttachProviderImpl.java:37)
at ibm.tools.attach.J9AttachProvider.attachVirtualMachine(J9AttachProvider.java:55)
at com.sun.tools.attach.VirtualMachine.attach(VirtualMachine.java:231)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:90)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:508)
at org.ehcache.sizeof.impl.AgentLoader.loadAgent(AgentLoader.java:119)
at org.ehcache.sizeof.impl.AgentSizeOf.(AgentSizeOf.java:43)
at org.ehcache.sizeof.SizeOf.newInstance(SizeOf.java:85)
at org.ehcache.sizeof.SizeOf.newInstance(SizeOf.java:79)
at org.ehcache.impl.internal.sizeof.DefaultSizeOfEngine.(DefaultSizeOfEngine.java:45)
at org.ehcache.impl.internal.sizeof.DefaultSizeOfEngineProvider.createSizeOfEngine(DefaultSizeOfEngineProvider.java:61)
at org.ehcache.impl.internal.store.heap.OnHeapStore$Provider.createStoreInternal(OnHeapStore.java:1727)
at org.ehcache.impl.internal.store.heap.OnHeapStore$Provider.createStore(OnHeapStore.java:1696)
at org.ehcache.impl.internal.store.heap.OnHeapStore$Provider.createStore(OnHeapStore.java:1677)
at org.ehcache.core.EhcacheManager.getStore(EhcacheManager.java:493)
at org.ehcache.core.EhcacheManager.createNewEhcache(EhcacheManager.java:310)
at org.ehcache.core.EhcacheManager.createCache(EhcacheManager.java:259)
at org.ehcache.core.EhcacheManager.init(EhcacheManager.java:566)
at org.ehcache.config.builders.CacheManagerBuilder.build(CacheManagerBuilder.java:72)
at com.ibm.streamsx.objectstorage.OSObjectRegistry.(OSObjectRegistry.java:146)
at com.ibm.streamsx.objectstorage.BaseObjectStorageSink.initialize(BaseObjectStorageSink.java:885)
at com.ibm.streamsx.objectstorage.s3.S3ObjectStorageSink.initialize(S3ObjectStorageSink.java:39)
at com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.initialize(OperatorAdapter.java:736)
at com.ibm.streams.operator.internal.jni.JNIBridge.(JNIBridge.java:274)
27 Nov 2017 13:53:54.372 [20657] ERROR #splapptrc,J[0],P[0],OSink M[?:com.ibm.streamsx.objectstorage.OSObjectRegistryListener.onEvent:-1] - Failed to close OSObject with path '//output_2017-11-27T135353.txt'. Error message: s3a://output_2017-11-27T135353.txt is a directory
27 Nov 2017 13:53:54.623 [20657] ERROR #splapptrc,J[0],P[0],OSink M[?:com.ibm.streamsx.objectstorage.OSObjectRegistryListener.onEvent:-1] - Failed to close OSObject with path '//output_2017-11-27T135353.txt'. Error message: s3a://output_2017-11-27T135353.txt is a directory
27 Nov 2017 13:53:54.990 [20657] ERROR #splapptrc,J[0],P[0],OSink M[?:com.ibm.streamsx.objectstorage.OSObjectRegistryListener.onEvent:-1] - Failed to close OSObject with path '//output_2017-11-27T135353.txt'. Error message: s3a://output_2017-11-27T135353.txt is a directory

Change protocol parameter to cos - s3d is not supported

Valid protocols are s3a and cos.

Exception in thread "Thread-11" java.lang.IllegalArgumentException: No Object Storage client implementation found for protocol 's3d'
at com.ibm.streamsx.objectstorage.client.ObjectStorageClientFactory.getObjectStorageClient(ObjectStorageClientFactory.java:28)
at com.ibm.streamsx.objectstorage.AbstractObjectStorageOperator.createObjectStorageClient(AbstractObjectStorageOperator.java:163)
at com.ibm.streamsx.objectstorage.AbstractObjectStorageOperator.initialize(AbstractObjectStorageOperator.java:87)
at com.ibm.streamsx.objectstorage.BaseObjectStorageSink.initialize(BaseObjectStorageSink.java:773)
at com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.initialize(OperatorAdapter.java:736)
at com.ibm.streams.operator.internal.jni.JNIBridge.(JNIBridge.java:274)

Build error in demo applications

/opt/ibm/IBM_Streams/4.2.2.0.20170817153217/bin/sc -a -t ../../../com.ibm.streamsx.objectstorage:../../../../streamsx.avro/com.ibm.streamsx.avro:/opt/ibm/IBM_Streams/4.2.2.0.20170817153217/toolkits/com.ibm.streamsx.json --rebuild-toolkits -M com.ibm.streamsx.objectstorage.avro.swift::AvroWriteSample --output-directory=output/swift
com.ibm.streamsx.objectstorage.avro.swift/AvroWriteSample.spl:3:5: CDISP0760W WARNING: The com.ibm.streamsx.objectstorage.swift namespace is not defined in the use directive.
com.ibm.streamsx.objectstorage.avro.swift/AvroWriteSample.spl:4:5: CDISP0760W WARNING: The com.ibm.streamsx.objectstorage.swift namespace is not defined in the use directive.
com.ibm.streamsx.objectstorage.avro.swift/AvroWriteSample.spl:46:56: CDISP0053E ERROR: An unknown identifier was referenced in the SPL program: SwiftObjectStorageSink.
com.ibm.streamsx.objectstorage.avro.swift/AvroWriteSample.spl:67:38: CDISP0053E ERROR: An unknown identifier was referenced in the SPL program: SwiftObjectStorageSource.

Issues with COS (S3) Samples

I tried to run the sample for S3 and I ran into the following issues:

  1. Project page does not have documentation for this sample, only the swift sample
  2. Readme.md for the sample does not mention where to get your credentials.

More importantly, I couldn't match the name of the credentials required by the sample to what was listed in the Credentials page of my cloud instance. This might be because of the new authentication mechanism.
The required parameters for the sample are
ObjectStorage-AccessKeyID", ObjectStorage-SecretAccessKey and ObjectStorage-Endpoint.
But these parameters do not exist in the JSON Credentials object for my instance:

{
  "apikey": "jqN2-k7u",
  "endpoints": "https://cos-service.bluemix.net/endpoints",
  "iam_apikey_description": "nnnn",
  "iam_apikey_name": "adsfdsf72e5f9d062",
  "iam_role_crn": "dfsf",
  "iam_serviceid_crn": "dffd90c5f566",
  "resource_instance_id": "fdfdfd:"
}

I tried matching the apikey and iam_apikey_name and endpoints parameters respectively but it didn't work.

Define versioning scheme

How is versioning being used for this toolkit?

It seems to be "rushing" into 1.x releases with an api that is changing, e.g. #8 . A typical approach is to use 0.x for initial releases where api is evolving, with 1.0 being the first stable point.

Is it expected that the first stable release will be 2.0?

@chanskw

Object Sink operator

The toolkit shall contain an operator that can write objects to the Bluemix Object Storage service.
Things we should consider/specify

  • Supported interface : Swift and/or S3
  • Credential handling/token refresh (is Streams application config an option ?)
  • Object naming should be flexible and user controlled (see FileSink options)
  • Object naming might be aligned message hub naming
  • Input formats. At least binary should be supported. Others may be handled by the application
  • Error handling

The FileSink operator for reference:
https://www.ibm.com/support/knowledgecenter/SSCRJU_4.2.0/com.ibm.streams.toolkits.doc/spldoc/dita/tk$spl/op$spl.adapter$FileSink.html

The HDFSFileSink operator :
https://www.ibm.com/support/knowledgecenter/SSCRJU_4.2.0/com.ibm.streams.toolkits.doc/spldoc/dita/tk$com.ibm.streamsx.hdfs/ix$Operator.html

iss#9-stocator-based-impl: NPE if object has no expiry

Exception in thread "Thread-18" java.lang.NullPointerException: Null expiry
at org.ehcache.config.builders.CacheConfigurationBuilder.withExpiry(CacheConfigurationBuilder.java:294)
at com.ibm.streamsx.objectstorage.OSObjectRegistry.(OSObjectRegistry.java:140)
at com.ibm.streamsx.objectstorage.BaseObjectStorageSink.initialize(BaseObjectStorageSink.java:885)
at com.ibm.streamsx.objectstorage.s3.S3ObjectStorageSink.initialize(S3ObjectStorageSink.java:39)
at com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.initialize(OperatorAdapter.java:736)
at com.ibm.streams.operator.internal.jni.JNIBridge.(JNIBridge.java:274)

Build error in com.ibm.streamsx.objectstorage.sample

[exec] com.ibm.streamsx.objectstorage.sample/ObjectReadWriteSample.spl:31:5: CDISP0054E ERROR: The objectStorageProjectID parameter is unknown and was not defined for the ObjectStorageSink operator.
[exec]
[exec] CDISP0092E ERROR: Because of previous compilation errors, the compile process cannot continue.
[exec] make: *** [distributed] Error 1

iss#9-stocator-based-impl: split sample projects

  • samples depending on avro, json and messagehub are moved to samples/demo
  • com.ibm.streamsx.objectstorage.sample contains basic samples only. Some applications are moved to com.ibm.streamsx.objectstorage.sample.complex.

Problem in ReadWriteSample if bucket does not exist

Sample app tries to create a bucket by using the native functions.
Unfortunately the Source/Sink connect in their initialize function already to COS and require that the bucket exist at this point.

28 Mar 2018 09:45:39.954 [13206] ERROR #splapptrc,J[0],P[0],ObjStSource M[?:com.ibm.streamsx.objectstorage.AbstractObjectStorageOperator.initialize:-1] - Failed to connect to endpoint 's3-api.us-geo.objectstorage.softlayer.net'. Exception: 'Bucket streams-test-bucket-xxxx does not exist'
28 Mar 2018 09:45:39.962 [13206] ERROR #splapplog,J[0],P[0],ObjStSource M[JavaOp.cpp:log:83] - OBJST1266I Attempting to connect to URI: '{0}'. java.io.FileNotFoundException: Bucket streams-test-bucket-xxxx does not exist
28 Mar 2018 09:45:39.963 [13206] ERROR #splapptrc,J[0],P[0],ObjStSource,spl_javaop M[?:com.ibm.streams.operator.internal.jni.JNIBridge.:-1] - Failed to connect to endpoint 's3-api.us-geo.objectstorage.softlayer.net'
28 Mar 2018 09:45:39.963 [13206] ERROR #splapptrc,J[0],P[0],ObjStSource,spl_javaop M[?:?:0] - java.lang.Exception: Failed to connect to endpoint 's3-api.us-geo.objectstorage.softlayer.net'
28 Mar 2018 09:45:39.964 [13206] ERROR #splapptrc,J[0],P[0],ObjStSource,spl_javaop M[?:?:0] - com.ibm.streamsx.objectstorage.AbstractObjectStorageOperator.initialize(AbstractObjectStorageOperator.java:101)

iss#9-stocator-based-impl:: "/" is required at the end of URL

Exception in thread "Thread-14" java.lang.StringIndexOutOfBoundsException
at java.lang.String.substring(String.java:1402)
at com.ibm.streamsx.objectstorage.Utils.getHost(Utils.java:77)
at com.ibm.streamsx.objectstorage.AbstractObjectStorageOperator.genServiceExtendedURI(AbstractObjectStorageOperator.java:240)
at com.ibm.streamsx.objectstorage.AbstractObjectStorageOperator.initialize(AbstractObjectStorageOperator.java:75)
at com.ibm.streamsx.objectstorage.BaseObjectStorageSource.initialize(BaseObjectStorageSource.java:121)
at com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.initialize(OperatorAdapter.java:736)
at com.ibm.streams.operator.internal.jni.JNIBridge.(JNIBridge.java:274)

Input attributes parameters should be attributes, not names of the attributes.

ObjectSource's objectDataAttribute and objectNameAttribute should be of type attribute, not rstrings.

Though objectDataAttribute might be for the output port, but its description says input. It's hard to tell as the SPLDOC doesn't fully describe what the operator does.

Same for ObjectSink, though I think in that case both are input attributes.

See https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.0.0/com.ibm.streams.spl-java-operators.doc/api/com/ibm/streams/operator/TupleAttribute.html

S3ObjectStorageSink java.lang.NullPointerException if objectNameAttribute parameter is set an closeOnPunct is not set

28 Mar 2018 12:08:36.322 [7496] ERROR #splapptrc,J[0],P[0],ObjStSink,spl_javaop M[?:com.ibm.streams.operator.internal.jni.JNIBridge.:-1] - Null expiry
28 Mar 2018 12:08:36.323 [7496] ERROR #splapptrc,J[0],P[0],ObjStSink,spl_javaop M[?:?:0] - java.lang.NullPointerException: Null expiry
28 Mar 2018 12:08:36.323 [7496] ERROR #splapptrc,J[0],P[0],ObjStSink,spl_javaop M[?:?:0] - org.ehcache.config.builders.UserManagedCacheBuilder.withExpiry(UserManagedCacheBuilder.java:455)
28 Mar 2018 12:08:36.323 [7496] ERROR #splapptrc,J[0],P[0],ObjStSink,spl_javaop M[?:?:0] - com.ibm.streamsx.objectstorage.internal.sink.OSObjectRegistry.(OSObjectRegistry.java:161)
28 Mar 2018 12:08:36.324 [7496] ERROR #splapptrc,J[0],P[0],ObjStSink,spl_javaop M[?:?:0] - com.ibm.streamsx.objectstorage.BaseObjectStorageSink.initialize(BaseObjectStorageSink.java:906)
28 Mar 2018 12:08:36.324 [7496] ERROR #splapptrc,J[0],P[0],ObjStSink,spl_javaop M[?:?:0] - com.ibm.streamsx.objectstorage.s3.S3ObjectStorageSink.initialize(S3ObjectStorageSink.java:39)
28 Mar 2018 12:08:36.324 [7496] ERROR #splapptrc,J[0],P[0],ObjStSink,spl_javaop M[?:?:0] - com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.initialize(OperatorAdapter.java:736)
28 Mar 2018 12:08:36.324 [7496] ERROR #splapptrc,J[0],P[0],ObjStSink,spl_javaop M[?:?:0] - com.ibm.streams.operator.internal.jni.JNIBridge.(JNIBridge.java:274)
Exception in thread "Thread-14" java.lang.NullPointerException: Null expiry
at org.ehcache.config.builders.UserManagedCacheBuilder.withExpiry(UserManagedCacheBuilder.java:455)
at com.ibm.streamsx.objectstorage.internal.sink.OSObjectRegistry.(OSObjectRegistry.java:161)
at com.ibm.streamsx.objectstorage.BaseObjectStorageSink.initialize(BaseObjectStorageSink.java:906)
at com.ibm.streamsx.objectstorage.s3.S3ObjectStorageSink.initialize(S3ObjectStorageSink.java:39)
at com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.initialize(OperatorAdapter.java:736)
at com.ibm.streams.operator.internal.jni.JNIBridge.(JNIBridge.java:274)

Problem when running java unit test

ob build and execution completed. Starting results validation.
-> Expected operator output location: '/mnt/hgfs/share/git-public/streamsx.objectstorage/test/java/com.ibm.streamsx.objectstorage.test/unittests/testrun432477979/../../../../../test/java/com.ibm.streamsx.objectstorage.test/data/expected/com.ibm.streamsx.objectstorage.unitest.sink.parquet.TestCloseBySizeParquetGzip'
-> Expected output tuples count: '8'
-> Actual operator output location: '/tmp/ost_unitest/com.ibm.streamsx.objectstorage.unitest.sink.parquet.TestCloseBySizeParquetGzip/'
-> Actual output tuples count: '10'
Failed to detect a valid hadoop home directory
java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:448)
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:419)
at org.apache.hadoop.util.Shell.(Shell.java:496)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)
at org.apache.hadoop.fs.FileSystem$Cache$Key.(FileSystem.java:2972)
at org.apache.hadoop.fs.FileSystem$Cache$Key.(FileSystem.java:2967)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2829)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:389)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:181)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:356)
at org.apache.parquet.hadoop.ParquetReader.(ParquetReader.java:110)
at org.apache.parquet.hadoop.ParquetReader.(ParquetReader.java:62)
at com.ibm.streamsx.objectstorage.test.OSTParquetFileUtils.readFileLineByLine(OSTParquetFileUtils.java:36)
at com.ibm.streamsx.objectstorage.test.OSTParquetFileUtils.contentEquals(OSTParquetFileUtils.java:84)
at com.ibm.streamsx.objectstorage.unitest.sink.BaseObjectStorageTestSink.checkOperatorOutputData(BaseObjectStorageTestSink.java:205)
at com.ibm.streamsx.objectstorage.unitest.sink.BaseObjectStorageTestSink.checkOperatorOutputData(BaseObjectStorageTestSink.java:228)
at com.ibm.streamsx.objectstorage.unitest.sink.BaseObjectStorageTestSink.validateResults(BaseObjectStorageTestSink.java:299)
at com.ibm.streamsx.objectstorage.unitest.sink.BaseObjectStorageTestSink.createObjectTest(BaseObjectStorageTestSink.java:129)
at com.ibm.streamsx.objectstorage.unitest.sink.BaseObjectStorageTestSink.runUnitest(BaseObjectStorageTestSink.java:169)
at com.ibm.streamsx.objectstorage.unitest.sink.BaseObjectStorageTestSink.runUnitest(BaseObjectStorageTestSink.java:177)
at com.ibm.streamsx.objectstorage.unitest.sink.parquet.TestCloseBySizeParquetGzip.testCloseBySizeParquetConfigGzip(TestCloseBySizeParquetGzip.java:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:90)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
at java.lang.reflect.Method.invoke(Method.java:508)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
at junit.framework.JUnit4TestAdapter.run(JUnit4TestAdapter.java:39)
at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.run(JUnitTestRunner.java:532)
at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.launch(JUnitTestRunner.java:1179)
at org.apache.tools.ant.taskdefs.optional.junit.JUnitTestRunner.main(JUnitTestRunner.java:1030)
setsid exited with exit code 0

Why is HADOOP_HOME needed, and what is required to set it properly?

iss#9-stocator-based-impl: bucket naming convention

On branch "iss#9-stocator-based-impl":

Using the sample com.ibm.streamsx.objectstorage.s3.sample
it is observed that the bucket name handling is different.
Operators expect that .service exist at launch time, but the ReadWriteSample.spl wants to create the bucket on its own before writing to Object Storage.
Why is .service be added to the bucket name?

Extend toolkit documentation

  • operator description needs to be extended
  • describe when to use S3ObjectStorage* operators and when ObjectStorage*

Native functions: add IAM support

native funtions support S3 client with BasicAWSCredentials only.
initialize(String accessKeyID, String secretAccessKey, String endpoint)

IAM support shall be provided as second option.

Build: run maven goals in Batch mode (-B)

Tried compiling the toolkit and saw some ugly output when the necessary central repo packages were downloaded:

[exec] 253/253 KB   46/46 KB   8/608 KB   253/253 KB   46/46 KB   12/608 KB   253/253 KB   46/46 KB
16/608 KB   253/253 KB   46/46 KB   20/608 KB   253/253 KB   46/46 KB   24/608 KB   253/253 KB   46/46
KB   28/608 KB   253/253 KB   46/46 KB   32/608 KB   253/253 KB   46/46 KB   36/608 KB   253/253 KB
46/46 KB   40/608 KB   253/253 KB   46/46 KB   44/608 KB   253/253 KB   46/46 KB   48/608 KB   253/253
KB   46/46 KB   52/608 KB   253/253 KB   46/46 KB   56/608 KB   253/253 KB   4/27 KB   46/46 KB   56/608
KB   253/253 KB   4/27 KB   46/46 KB   60/608 KB   253/253 KB   4/27 KB   46/46 KB   64/608 KB   253/253
KB   8/27 KB   46/46 KB   64/608 KB   253/253 KB   12/27 KB   46/46 KB   64/608 KB   253/253 KB   16/27
KB   46/46 KB   64/608 KB   253/253 KB   20/27 KB   46/46 KB   64/608 KB
253/253 KB   24/27 KB   64/608 KB              Downloaded: https://repo.maven.apache.org/maven2
/com/fasterxml/jackson/core/jackson-annotations/2.6.0/jackson-annotations-2.6.0.jar (46 KB at 72.6
KB/sec)253/253 KB   27/27 KB   64/608 KB
[exec] 253/253 KB   27/27 KB   68/608 KB   253/253 KB   27/27 KB   72/608 KB   253/253 KB   27/27 KB
76/608 KB   253/253 KB   27/27 KB   80/608 KB                                       Downloaded:
https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-core/2.6.6/jackson-
core-2.6.6.jar (253 KB at 396.9 KB/sec)27/27 KB   84/608 KB 

Auxiliary functions

The toolkit may provide helper functions to perform other operations on objects.
Candidates may be :

  • Rename/move
  • Delete
  • Read/change ACL (if possible)
  • Read/change other metadata (if possible)

Enhancement request to add IGC support to ObjectStorage operators

Add IGC support to the ObjectStorage operators to allow their use in applications requiring data lineage and flow metadata to be reported when an application using these assets is submitted. This capability is currently supported by other sink operators such as File, Kafka, JMS, MQTT, ...

Background Streams/IGC related information:

https://www.slideshare.net/lisanl/data-governance-with-ibm-streams-v41
http://ibmstreams.github.io/streamsx.documentation/docs/4.1/governance/governance-quickstart/
https://www.ibm.com/support/knowledgecenter/en/SSCRJU_4.1.0/com.ibm.streams.studio.doc/doc/tworking-with-governance-catalogs.html

iss#9-stocator-based-impl: Missing resource files

Exception in thread "Thread-14" java.lang.ExceptionInInitializerError
at java.lang.J9VMInternals.ensureError(J9VMInternals.java:141)
at java.lang.J9VMInternals.recordInitializationFailure(J9VMInternals.java:130)
at com.ibm.streamsx.objectstorage.BaseObjectStorageScan.checkStrictMode(BaseObjectStorageScan.java:352)
at com.ibm.streamsx.objectstorage.BaseObjectStorageScan.initialize(BaseObjectStorageScan.java:421)
at com.ibm.streamsx.objectstorage.s3.S3ObjectStorageScan.initialize(S3ObjectStorageScan.java:34)
at com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.initialize(OperatorAdapter.java:736)
at com.ibm.streams.operator.internal.jni.JNIBridge.(JNIBridge.java:274)
Caused by: java.util.MissingResourceException: Can't find bundle for base name com.ibm.streamsx.objectstorage.messages.messages, locale en_US
at java.util.ResourceBundle.throwMissingResourceException(ResourceBundle.java:1590)
at java.util.ResourceBundle.getBundleImpl(ResourceBundle.java:1406)
at java.util.ResourceBundle.getBundle(ResourceBundle.java:784)
at com.ibm.streamsx.objectstorage.Messages.(Messages.java:16)
... 5 more

com.ibm.streamsx.objectstorage.messages.messages needs to be part of jar file

Adapt trace levels

  • debug traces needs to be converted to traces using TraceLevel.TRACE if used in onTuple basis.
  • it needs to be checked if trace level is on TRACE.isLoggable(TraceLevel.TRACE) before calling the log method

Stocator Based Implmentation

Combine Stocator (open source project for interacting with Object Storage using Swift and S3 dialects) and HDFS Toolkit capabilities for performance-wise implementation of extendable object storage operators. The following presentation covers proposed architecture and operator details.

Object Source Operator

The toolkit shall contain an operator that can read objects from the Bluemix Object Storage service.
Things we should consider/specify

  • Supported interface : Swift and/or S3
  • Credential handling/token refresh (is Streams application config an option ?)
  • Object name specification. Providing an input port to retrieve container/bucket and object names would be preferred.
  • Output formats. At least binary should be supported. Others may be handled by the application
  • Error handling

Sample application from Bluemix
https://www.ibm.com/blogs/bluemix/2016/02/leverage-object-storage-for-streaming-analytics/

The Streams FileSource operator for reference
https://www.ibm.com/support/knowledgecenter/SSCRJU_4.2.0/com.ibm.streams.toolkits.doc/spldoc/dita/tk$spl/op$spl.adapter$FileSource.html

ObjectStorageSink creates objects with zero bytes

Sometimes objects have size 0.
ObjectStorageSink or S3ObjectStorageSink used with
closeOnPunct: true;
It doesn't matter if s3a or cos is selected.

For example test_s3a_write_n_objects_iam in test_object_storage.py
validate objects in streamsx-os-test-bucket-us-iam-15218224521909282:
test_data_0 100001
test_data_1 100001
test_data_2 100001
test_data_3 100001
test_data_4 0

Scan/watch operator

The toolkit shall contain an operator that can detect changes in the Bluemix Object Storage service.
Either by receiving notifications, and/or by reading metadata on accounts/containers/buckets etc.

Things we should consider/specify

  • Supported interface : Swift and/or S3
  • Credential handling/token refresh (is Streams application config an option ?)
  • Error handling
  • Scan intervals/sorting , if applicable

The DirectorySan operator for reference:
https://www.ibm.com/support/knowledgecenter/SSCRJU_4.2.0/com.ibm.streams.toolkits.doc/spldoc/dita/tk$spl/op$spl.adapter$DirectoryScan.html

A filesystem notification based dir watcher operator for reference:
https://www.ibm.com/support/knowledgecenter/SSCRJU_4.2.0/com.ibm.streams.toolkits.doc/spldoc/dita/tk$com.ibm.streams.teda/op$com.ibm.streams.teda.adapter$DirectoryWatch.html

Fix error handling in operators.

The sink operator catches exceptions, logs them and then just continues on.

https://github.com/IBMStreams/streamsx.objectstorage/blob/master/com.ibm.streamsx.objectstorage/impl/java/src/com/ibm/streamsx/objectstorage/s3/ObjectSink.java#L153

This is not good practice, errors go unnoticed, no indication from the operator that something is wrong.

Typically the exception should be thrown and let the runtime log it, or maybe have retry logic in there, or an error port.

Applications can catch exceptions using the @catch annotation.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.