Coder Social home page Coder Social logo

sparkit-learn's Introduction

Sparkit-learn

Build Status

PySpark + Scikit-learn = Sparkit-learn

About

Sparkit-learn is an experimental repository for supporting scikit-learn functionality and API on PySpark. The main goal of the library is to create an API that stays close to sklearn's.

The driving principal was to "Think locally, execute distributively." To accomodate this conception, the basic data block is always an array or a (sparse) matrix and the operations are executed on block level.

Quick start

Sparkit-learn introduces two important distributed data format:

  • ArrayRDD:

    A numpy.array like distributed array

     from splearn.rdd import ArrayRDD
    
     data = range(20)
     # PySpark RDD with 2 partitions
     rdd = sc.parallelize(data, 2) # each partition with 10 elements
     # ArrayRDD
     # each partition will contain blocks with 5 elements
     X = ArrayRDD(rdd, block_size=5) # 4 blocks, 2 in each partition

    Basic operations:

     len(X) # 4 - number of blocks
     X.shape # (20,) - the shape of the whole dataset
    
     X # returns an ArrayRDD
     # <class 'splearn.rdd.ArrayRDD'> from PythonRDD...
    
     X.collect() # get the dataset
     # [array([0, 1, 2, 3, 4]),
     #  array([5, 6, 7, 8, 9]),
     #  array([10, 11, 12, 13, 14]),
     #  array([15, 16, 17, 18, 19])]
    
     X[1].collect() # indexing
     # [array([5, 6, 7, 8, 9])]
    
     X[1] # also returns an ArrayRDD!
    
     X[1::2].collect() # slicing
     # [array([5, 6, 7, 8, 9]),
     #  array([15, 16, 17, 18, 19])]
    
     X[1::2] # returns an ArrayRDD as well
    
     X.tolist() # returns the dataset as a list
     # [0, 1, 2, ... 17, 18, 19]
     X.toarray() # returns the dataset as a numpy.array
     # array([ 0,  1,  2, ... 17, 18, 19])
    
     # pyspark.rdd operations will still work
     X.numPartitions() # 2 - number of partitions
  • DictRDD:

    A column based data format, each column is a numpy.array.

     from splearn.rdd import DictRDD
    
     X = range(20)
     y = range(2) * 10
     # PySpark RDD with 2 partitions
     X_rdd = sc.parallelize(data_X, 2) # each partition with 10 elements
     y_rdd = sc.parallelize(data_y, 2) # each partition with 10 elements
     zipped_rdd = X_rdd.zip(y_rdd) # zip the two rdd's together
     # DictRDD
     # each partition will contain blocks with 5 elements
     Z = DictRDD(zipped_rdd, columns=('X', 'y'),  block_size=5) # 4 blocks, 2/partition
    
     # or:
     import numpy as np
    
     data = np.array([range(20), range(2)*10]).T
     rdd = sc.parallelize(data, 2)
     Z = DictRDD(rdd, columns=('X', 'y'),  block_size=5)

    Basic operations:

     len(Z) # 4 - number of blocks
     Z.shape # (20,2) - the shape of the whole dataset
     Z.columns # returns ('X', 'y')
    
     Z # returns a DictRDD
     #<class 'splearn.rdd.DictRDD'> from PythonRDD...
    
     Z.collect()
     # [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])),
     #  (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])),
     #  (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0])),
     #  (array([15, 16, 17, 18, 19]), array([1, 0, 1, 0, 1]))]
    
     Z[:, 'y'] # column select - returns an ArrayRDD
     Z[:, 'y'].collect()
     # [array([0, 1, 0, 1, 0]),
     #  array([1, 0, 1, 0, 1]),
     #  array([0, 1, 0, 1, 0]),
     #  array([1, 0, 1, 0, 1])]
    
     Z[:-1, ['X', 'y']] # slicing - DictRDD
     Z[:-1, ['X', 'y']].collect()
     # [(array([0, 1, 2, 3, 4]), array([0, 1, 0, 1, 0])),
     #  (array([5, 6, 7, 8, 9]), array([1, 0, 1, 0, 1])),
     #  (array([10, 11, 12, 13, 14]), array([0, 1, 0, 1, 0]))]

Basic workflow

With the use of the aformentioned data structures, the basic workflow is almost identical to sklearn's.

Distributed vectorizing of texts

SparkCountVectorizer

from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkCountVectorizer
from sklearn.feature_extraction.text import CountVectorizer

X = [...]  # list of texts
X_rdd = ArrayRDD(sc.parallelize(X, 4))  # sc is SparkContext

local_vect = CountVectorizer()
dist_vect = SparkCountVectorizer()

result_local = local.fit_transform(X)
result_dist = dist.fit_transform(X_rdd)  # ArrayRDD

SparkHashingVectorizer

from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from sklearn.feature_extraction.text import HashingVectorizer

X = [...]  # list of texts
X_rdd = ArrayRDD(sc.parallelize(X, 4))  # sc is SparkContext

local_vect = HashingVectorizer()
dist_vect = SparkHashingVectorizer()

result_local = local.fit_transform(X)
result_dist = dist.fit_transform(X_rdd)  # ArrayRDD

SparkTfidfTransformer

from splearn.rdd import ArrayRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from splearn.feature_extraction.text import SparkTfidfTransformer
from splearn.pipeline import SparkPipeline

from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.pipeline import Pipeline

X = [...]  # list of texts
X_rdd = ArrayRDD(sc.parallelize(X, 4))  # sc is SparkContext

local_pipeline = Pipeline((
	('vect', HashingVectorizer()),
	('tfidf', TfidfTransformer())
))
dist_pipeline = Pipeline((
	('vect', SparkHashingVectorizer()),
	('tfidf', SparkTfidfTransformer())
))

result_local = local_pipeline.fit_transform(X)
result_dist = dist_pipeline.fit_transform(X_rdd)  # ArrayRDD

Distributed Classifiers

from splearn.rdd import DictRDD
from splearn.feature_extraction.text import SparkHashingVectorizer
from splearn.feature_extraction.text import SparkTfidfTransformer
from splearn.svm import SparkLinearSVC
from splearn.pipeline import SparkPipeline

from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.svm import LinearSVC
from sklearn.pipeline import Pipeline

X = [...]  # list of texts
y = [...]  # list of labels
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parralelize(y, 4)
Z = DictRDD(X_rdd.zip(y_rdd), columns=('X', 'y'))

local_pipeline = Pipeline((
	('vect', HashingVectorizer()),
	('tfidf', TfidfTransformer()),
	('clf', LinearSVC())
))
dist_pipeline = Pipeline((
	('vect', SparkHashingVectorizer()),
	('tfidf', SparkTfidfTransformer()),
	('clf', SparkLinearSVC())
))

local_pipeline.fit(X, y)
dist_pipeline.fit(Z, classes=np.unique(y))

y_pred_local = local_pipeline.predict(X)
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])

Distributed Model Selection

from splearn.rdd import DictRDD
from splearn.grid_search import SparkGridSearchCV
from sklearn.naive_bayes import SparkMultinomialNB

from sklearn.grid_search import GridSearchCV
from sklearn.naive_bayes import MultinomialNB

X = [...]
y = [...]
X_rdd = sc.parallelize(X, 4)
y_rdd = sc.parralelize(y, 4)
Z = DictRDD(X_rdd.zip(y_rdd), columns=('X', 'y'))

parameters = {'alpha': [0.1, 1, 10]}
fit_params = {'classes': np.unique(y)}

local_estimator = MultinomialNB()
local_grid = GridSearchCV(estimator=local_estimator,
                          param_grid=parameters)

estimator = SparkMultinomialNB()
grid = SparkGridSearchCV(estimator=estimator,
                         param_grid=parameters,
                         fit_params=fit_params)

local_grid.fit(X, y)
grid.fit(Z)

Run IPython from notebooks directory

PYTHONPATH=${PYTHONPATH}:.. IPYTHON_OPTS="notebook" ${SPARK_HOME}/bin/pyspark --master local\[4\] --driver-memory 2G

Requirements

  • NumPy[>=1.9.0]
  • SciPy[>=0.14.0]
  • Scikit-learn[=0.16-git]
  • Spark[>=1.1.0]

Special thanks

We would like to thank to:

  • scikit-learn community
  • spylearn community
  • pyspark community

Analytics

sparkit-learn's People

Contributors

kszucs avatar gaborbarna avatar huandy avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.