Coder Social home page Coder Social logo

ibmstreams / streamsx.sparkmllib Goto Github PK

View Code? Open in Web Editor NEW
8.0 20.0 15.0 2.29 MB

Toolkit for real-time scoring using Apache Spark MLLib library

Home Page: http://ibmstreams.github.io/streamsx.sparkMLLib

License: Apache License 2.0

Java 90.12% Shell 9.88%
spark-mllib-library ibm-streams spark stream-processing toolkit

streamsx.sparkmllib's Introduction

streamsx.sparkMLlib

Toolkit for real-time scoring using Spark MLLib library.

This toolkit implements the NLS feature. Use the guidelines for the message bundle that are described in Messages and National Language Support for toolkits

To learn more about Streams:

Developing and running applications that use the SparkMLLib Toolkit

To create applications that use the SparkMLLib Toolkit, you must configure either Streams Studio or the SPL compiler to be aware of the location of the toolkit.

Before you begin

  • Install IBM InfoSphere Streams. Configure the product environment variables by entering the following command: source /bin/streamsprofile.sh
  • Generate a Spark model as described in the next section and save it to the local filesystem or HDFS.
  • The worker nodes which execute the toolkit code do not require an separate Apache Spart installation.

Spark Models

  • This toolkit provides a number of operators that can load a stored Spark MLlib model and use it to perform real time scoring on incoming tuple data.
  • To generate the model files point the java classpath either to an installed Apache Spark version (e.g.: $SPARK_HOME/jars) or to the download directory of the streamsx.sparkmllib toolkit (e.g.: $STREAMS_SPLPATH/com.ibm.streamsx.sparkmllib/opt/downloaded)

For example, the SparkCollaborativeFilteringALS operator can load a Spark collaborative filtering model (of type MatrixFactorizationModel in the Spark API). In order for the operator to be able to use this model within Streams, the Spark program that created the original model must store the model. The following scala code demonstrates how the model can be saved to HDFS:

		//Generate a MatrixFactorizationModel by training against test data
		val model = ALS.train(training, rank, numIter, lambda)
		
		//Save the generated model to the filesystem
		model.save(sparkContext, "hdfs://some/path/my_model")

Once the model has been persisted, the path to the persisted model would be passed in as a parameter to the SparkCollaborativeFilteringALS operator. The following code demonstrates how this would be done in the SPL program:

	(stream<int32 user, int32 counter, list<int32> analysisResult> SparkCollaborativeFilteringALSOut) as
			SparkCollaborativeFilteringALSOp1 =
			SparkCollaborativeFilteringALS(InputPort1)
		{
			param
				analysisType : RecommendProducts ;
				attr1 : Beacon_1_out0.user ;
				attr2 : Beacon_1_out0.counter ;
				modelPath : "hdfs://some/path/my_model" ;
		}

On initialization, the operator will load the model. Each incoming tuple will be used to generate a score using the model and the score would be passed as an attribute called 'analysisResult' on the output schema.

To Use this Toolkit in you Application

After the location of the toolkit is communicated to the compiler, the SPL artifacts that are specified in the toolkit can be used by an application. The application can include a use directive to bring the necessary namespaces into scope. Alternatively, you can fully qualify the operators that are provided by toolkit with their namespaces as prefixes.

  1. Make sure that a trained Spark model has been saved to the local file system or on HDFS. Alternatively you can bundle the model files into the sab-file (see sample).
  2. Configure the SPL compiler to find the toolkit root directory. Use one of the following methods:
  • Set the STREAMS_SPLPATH environment variable to the root directory of a toolkit or multiple toolkits (with : as a separator). For example: export STREAMS_SPLPATH=$STREAMS_INSTALL/toolkits/com.ibm.streamsx.sparkmllib
  • Specify the -t or --spl-path command parameter when you run the sc command. For example: sc -t $STREAMS_INSTALL/toolkits/com.ibm.streamsx.sparkmllib -M MyMain where MyMain is the name of the SPL main composite. Note: These command parameters override the STREAMS_SPLPATH environment variable.
  • Add the toolkit location in InfoSphere Streams Studio.
  1. Develop your application.
  2. Build your application. You can use the sc command or Streams Studio.
  3. Start the InfoSphere Streams instance.
  4. Run the application. You can submit the application as a job by using the streamtool submitjob command or by using Streams Studio.

What's changed

CHANGELOG.md

streamsx.sparkmllib's People

Contributors

alex-cook4 avatar ankitpas avatar anouri avatar chanskw avatar dependabot[bot] avatar joergboe avatar manojsingh101 avatar natashadsilva avatar zollnapa avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamsx.sparkmllib's Issues

Allow scoring result attribute to be specified

Currently, all spark mllib operators assume that the operator output schema has an attribute called 'analysisResult' of a specific type (usually, float64 or int32). This is too rigid. We should allow users to pick the scoring result attribute from the output schema (similar to how we allow input attributes to be specified as parameters).

Add documentation on what sparkMaster parameter is

More details on how the sparkmaster is used, e.g. when should I use something apart from the default.

In messing around with spark, I've found you can't have multiple spark contexts in a single JVM, so does this have an impact on how my operators from this toolkit can be fused?

Somewhat related to the discussion in issue #2

SPLDOC allows pages and overviews for a namespace, so the documentation could be in a single location and referenced by each operator.

Globalizing the toolkit

Provide log messages for different locales via unique message ids: German (de_DE), French (fr_FR), Italian (it_IT), Spanish (es_ES). Brazilian Portuguese (pt_BR), Japanese (ja_JP), Simplified Chinese (zh_CN), Russian (ru_RU), Traditional Chinese (zh_TW), Korean (ko_KR).
#17

Hardcoded lib directory

With Spark 2.x, the lib directory is no longer part of the distribution. It is now called jars. It would probably be better if the SPARK_LIB environment variable is used for pointing to the right lib directory. This will ensure that there are no class not found exceptions thrown.

Add support for new models in Spark

Spark has added save/load support to KMeans in Spark 1.4. Save/load is also supported on the logistic regression and isotonic regression models. We should provide operators that can perform analytics using these models

Provide ability to reload models

At present, the specified Spark models are loaded when the operator initializes. We should provide a mechanism to reload these models (perhaps through a control port).

Proposal for extension - Additional output attributes from models

Spark models have additional prediction information which may be required in certain cases. e.g. Naive Bayes has list of of probabilities, KMeans has cluster centers and cost, etc. It would be good to have these as optional output attributes in operators.
I forked the sparkMLLib toolkit and modified it for according to my project requirement.
https://github.com/manojsingh101/streamsx.sparkMLLib

The modifications are for SparkNaiveBayes operator to output list of probabilities for each prediction class.
Changes:

  1. Add a new boolean attribute as parameter "getProbabilities"
  2. Add a new output attribute of type list which would required only if getProbabilities=true
  3. Add a new function to generate a feature vector of float64 from a line of text to pass to model.
    com.ibm.streamsx.sparkmllib.utils.Util.getFeatureVector
  4. A sample application to use the new operator.

Can we extend other models to output similar relevant attributes to make these operators more feature rich?

Help with setup

Hi,

I have the following problem: I've added the streamsx.sparkmllib Toolkit in InfoSphere Streams Studio. Using the SparkCollaborativeFilteringALS operator, I get the following exception:

Exception in thread "Thread-11" java.lang.NoClassDefFoundError: org.apache.spark.SparkConf at com.ibm.streamsx.sparkmllib.AbstractSparkMLlibOperator.initialize(AbstractSparkMLlibOperator.java:132) at com.ibm.streams.operator.internal.runtime.api.OperatorAdapter.initialize(OperatorAdapter.java:713) at com.ibm.streams.operator.internal.jni.JNIBridge.(JNIBridge.java:275) Caused by: java.lang.ClassNotFoundException: org.apache.spark.SparkConf at java.net.URLClassLoader.findClass(URLClassLoader.java:607) at java.lang.ClassLoader.loadClassHelper(ClassLoader.java:797) at java.lang.ClassLoader.loadClass(ClassLoader.java:775) at java.lang.ClassLoader.loadClass(ClassLoader.java:752) ... 3 more

CDISR5033E: An exception occurred during the execution of the SparkCollaborativeFilteringALS_3 operator. Processing element number 22 is terminating. CDISR4000E The processing element ID 22 of the application::MyProjectSpark_7 job with job ID 7 was submitted by the spark user, and then was shut down unexpectedly. The error is: Distillery Exception: 'virtual void SPL::PEImpl::process()' [./src/SPL/Runtime/ProcessingElement/PEImpl.cpp:873] with msg: Runtime failures occurred in the following operators: SparkCollaborativeFilteringALS_3..

Am I missing something during the setup?

Thank you in advance.

Shall we open a new branch for toolkit globalization

Starting from master we plan to implement localization support for the internet toolkit.

Shall we open a new branch for this work?
Proposal:
keep on using master branch
bump version number from 1.0.0 to 1.1.0

Fix spldoc issues

Currently generating spldoc causes errors. In addition, the toolkit is missing a description that will be useful to users

proposal for extension

The toolkit looks great. Great Job, @ankitpas!

The approach proposed by @ankitpas when streams operator communicates with a corresponding Spark job
is very interesting and absolutely make sense. As an extension to an existing toolkit we wanted to propose
an alternative approach which is similar to spss toolkit one when a model is "published" by Spark to Streams
and used by corresponding Streams operator for real-time data scoring. The advantage of this approach
is that it doesn't require spark runtime to be installed or accessible by streams machine. We already have an initial prototype
working, and wanted to discuss with you different options on how to integrate it with an existing implementation:

  1. Existing operators extension to support two model execution modes: spark and streams.
    The first mode is the one implemented by @ankitpas when model is executed by spark job.
    The second mode is the one we're about to contribute when model is executed by a corresponding streams operator.
    The advantage of this approach is in a fact that a toolkit user will have the same set of powerful operators to use and
    would be able easily switch between the two.
    The disadvantage of this approach is that operator implementation would become much more complex.
  2. New operator(s) that will run spark model. Then we have to decide which namespace to choose for this group
    of operators. Probably, something like com.ibm.streamsx.sparkmllib.scoring ?

proposal for separate namespace for each algorithm/utility category

@ankitpas as Spark mllib provides wide range of learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction
we probably should consider to have a separate namespace for each algorithms and utility category to improve the toolkit user and developer experience.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.