Coder Social home page Coder Social logo

oryxproject / oryx Goto Github PK

View Code? Open in Web Editor NEW
1.8K 209.0 405.0 7.29 MB

Oryx 2: Lambda architecture on Apache Spark, Apache Kafka for real-time large scale machine learning

Home Page: http://oryx.io

License: Apache License 2.0

Java 97.32% Scala 1.40% Shell 1.28%
lambda-architecture oryx apache-spark machine-learning kafka apache-kafka java cloudera

oryx's People

Contributors

cimox avatar dependabot[bot] avatar dsdinter avatar faviovazquez avatar harscoet avatar hsiangawang avatar hyukjinkwon avatar kkasravi avatar lengweiping1983 avatar lfrancke avatar msumner91 avatar mzorro avatar peterdavehello avatar smarthi avatar srowen avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

oryx's Issues

Queue events require a timestamp

To implement a few types of functionality that will be required, it seems that all data and model/update events will require a timestamp of some kind.

Use a real CSV parser

Now that we will be dealing with more complex data, and consuming it at times via CSV, we need to employ a real CSV parser like Commons CSV instead of merely splitting on comma.

Explain / fix non-fatal exceptions during integration tests

There are several types of exceptions that appear regularly in the logs when integration tests run. They do not appear to affect the result, and tests pass. They may just be corner cases, from ZK and Kafka being rapidly started and stopped. This is a to-do to see if we can ever figure them out.

Updated to include what's being observed most recently. It's almost all:

java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
    at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
    at kafka.network.MultiSend.writeTo(Transmission.scala:101)
    at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
    at kafka.network.Processor.write(SocketServer.scala:472)
    at kafka.network.Processor.run(SocketServer.scala:342)
    at java.lang.Thread.run(Thread.java:745)

Rarely:

java.nio.channels.ClosedByInterruptException
    at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
    at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:956)
    at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
    at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
    at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
    at kafka.utils.Utils$.read(Utils.scala:380)
    at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
    at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
    at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
    at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

and a few in the web tests:

java.lang.IllegalStateException: ServiceLocatorImpl(__HK2_Generated_73,74,527137253) has been shut down
    at org.glassfish.grizzly.servlet.FilterChainImpl.doFilter(FilterChainImpl.java:151)
    at org.glassfish.grizzly.servlet.FilterChainImpl.invokeFilterChain(FilterChainImpl.java:106)
    at org.glassfish.grizzly.servlet.ServletHandler.doServletService(ServletHandler.java:221)
    at org.glassfish.grizzly.servlet.ServletHandler.service(ServletHandler.java:169)
    at org.glassfish.grizzly.http.server.HttpHandler$1.run(HttpHandler.java:219)
    at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:565)
    at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:545)
    at java.lang.Thread.run(Thread.java:745)

Others seem to have gone.

New user/item may get temporarily lost on ALS model update

A new user or itemm shows up quickly in the serving layer due to updates from the speed layer. However, if a batch model build was in progress before it sees the new user/item, it will publish a model without the new user/item. The serving layer will end up removing the user/item vector. this mechanism is needed to remove stale users/items in general. But this should be fixed with some bookkeeping in the serving layer.

Modify CosineAverageFunction to use float[][]

Modify this to use float[][] as opposed to List<float[]> so as to avoid the overhead of a List iterator. This should also keep it consistent with the present implementation in DotsFunction.

Config namespace can conflict with others

Typesafe config is also used by things like Spark. While there are no immediate namespace conflicts in the joined config file, it's best to put every app config value into an oryx parent namespace.

Rename app-tier to modules to include "app"; add module for code common to serving and speed/batch for apps

Right now the app-tier modules also start with oryx-ml, which just doesn't seem right. There is also no appropriate module to share code between the provided app's serving, and speed/batch implementations, which should not depend on one another. The proposal is:

  • Rename oryx-ml-mllib to oryx-app-mllib
  • Rename oryx-ml-oryx to oryx-app
  • Rename oryx-ml-oryx-serving to oryx-app-serving
  • Create oryx-app-common

Need to expose basic executor settings

The app needs to be able to control how many executors to use, and how much memory to give. The application can expose a simplified version of what Spark exposes in its config file.

Call queues "topics" instead

The code consistently refers to queues, but, Kafka's term is "topic", and the code is closely enough bound to Kafka that it should call it the same.

oryxProject licence

i would like to ask about the licence of the new oryx v2 (oryxproject): is this code/product free to use for company's commercial goals?
and also a sideway question, since i'm moving from myrrix to oryx, shall i use the last oryxProject oryx v.2 or can i use still old oryx or myrrix as well? in meaning of future development.

ALS topN isn't thread safe

The ALS implementation of multithreaded topN has a concurrency bug as it modifies a PriorityQueue without correct synchronization.

object scala.runtime in compiler mirror

R] error: error while loading , error in opening zip file
[ERROR] error: scala.reflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found.
[ERROR] at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
[ERROR] at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getPackage(Mirrors.scala:172)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getRequiredPackage(Mirrors.scala:175)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage$lzycompute(Definitions.scala:183)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage(Definitions.scala:183)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass$lzycompute(Definitions.scala:184)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass(Definitions.scala:184)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr$lzycompute(Definitions.scala:1024)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr(Definitions.scala:1023)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreClasses$lzycompute(Definitions.scala:1153)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreClasses(Definitions.scala:1152)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode$lzycompute(Definitions.scala:1196)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode(Definitions.scala:1196)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1261)
[ERROR] at scala.tools.nsc.Global$Run.(Global.scala:1290)
[ERROR] at scala.tools.nsc.Driver.doCompile(Driver.scala:32)
[ERROR] at scala.tools.nsc.Main$.doCompile(Main.scala:79)
[ERROR] at scala.tools.nsc.Driver.process(Driver.scala:54)
[ERROR] at scala.tools.nsc.Driver.main(Driver.scala:67)
[ERROR] at scala.tools.nsc.Main.main(Main.scala)
[ERROR] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[ERROR] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[ERROR] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[ERROR] at java.lang.reflect.Method.invoke(Method.java:483)
[ERROR] at scala_maven_executions.MainHelper.runMain(MainHelper.java:164)
[ERROR] at scala_maven_executions.MainWithArgsInFile.main(MainWithArgsInFile.java:26)

Investigate consumer.id for better Spark Streaming + Kafka failure recovery

Spark 1.2.0 should no longer force consumers to always start from the beginning of a topic after recovering from failure. It may / should be possible to just use a consistent consumer.id so that the job picks up reading where it left off. This would probably be better semantically for the Batch and Speed Layer.

Add basic RDF batch app

This is a placeholder to track adding a basic random decision forest batch app. As we're still on Spark 1.1, the MLlib-based portion will have to be based only on a DecisionTree.

HyperParamRange needs to support discrete values

Right now, hyperparameters are only supported if they are numeric. It should be possible to test discrete unordered values, like for example different node quality measures ("entropy", "gini", etc.)

KMeansUpdateIT fails / hangs

KMeansUpdateIT is disabled right now since it mysteriously hangs if you run it. The second call to KMeans.train never returns. I find that it doesn't seem to get scheduled even since the last thing in the logs is the call to train, and the Spark UI just shows the parent foreachRDD from BatchLayer executing -- in my case, I see it's stuck at 8 / 18 partitions.

Suneel has been investigating but I am current looking at it too.

Try to return text/plain to browsers

Endpoints in the provided apps tend to return things like text/csv content. Browsers like Chrome present this as a download rather than displaying the text. Ideally the service would know it can return text/plain. Chrome's Accept header is:

text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8

Implement decay factor in ALS

It's necessary to have some form of decay for data to age out old users and items. This should be ported to the new framework for 2.x.

`Application` not initializing yet when Tomcat starts

@kkasravi @smarthi I probably need your help on this one. If you run ALSServingModelManagerIT you'll see it fails. (You can go directly there with mvn -DskipTests install and mvn verify -pl oryx-ml-oryx-serving.)

The problem is just that OryxApplication and the WebListener ModelManagerListener doesn't initialize. Should be simple to fix but I don't know what to tweak?

I thought this had something to do with it but does it?

    ContextConfig contextConfig = new ContextConfig();
    context.addLifecycleListener(contextConfig);

Tests should not use the same port for Kafka/ZK each time

It would be a better test if Kafka / ZK processes used a random free port each time, to expose potential problems from using the default ports. It might stop tests from accidentally talking to the wrong instance. This also makes it a little more realistic to parallelize tests.

Fix flaky HyperParamTuningIT

The HyperParamTuningIT regularly fails because the test scenario is not quite robust. It should be modified so that the intended model is very clear.

Serving layer needs to pick a smart offset into update topic

The Serving Layer has to read models from the update queue at startup. It's not suitable to simply wait until the next model appears. It's also potentially too slow to read from the beginning of the topic. Ideally the offset it starts at is intelligent in some way to try to read from the last model on the topic forward.

Update to Java 8

Before the real first release, I think we'll want to move to Java 8, to take advantage of lambdas. Using Spark without these in Java makes the result significantly harder to parse and develop.

hi,Sean,can i join you as contributor?

hi,Sean!
I am from china ,could not contact you by email or IM,so I write a issues here!
the architect of WUBA,the nasdaq-listed company
I found your project in github several days ago,and very interest in your great work!
after compare to other opensource recommend implements,Oryx 2 has more fashion,use the latest opensource dependency,such as use Kafka to transfer data、spark to compute、HDFS distribute file system to store......,and use the Lambda architecture.

I anaylze the source code ,this project is new document and code is not complete,could I join to write some code or document?

Checkpointing causes NotSerializableException for BatchUpdateFunction SparkContext

Now that https://issues.apache.org/jira/browse/SPARK-4196 is fixed, checkpointing can move forward when we move to Spark 1.2.0. There is still a problem with a reference to SparkContext in BatchUpdateFunction:

    - field (class "com.cloudera.oryx.lambda.BatchUpdateFunction", name: "sparkContext", type: "class org.apache.spark.api.java.JavaSparkContext")
    - object (class "com.cloudera.oryx.lambda.BatchUpdateFunction", com.cloudera.oryx.lambda.BatchUpdateFunction@e0a4232)
    - field (class "org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2", name: "foreachFunc$2", type: "interface org.apache.spark.api.java.function.Function2")
    - object (class "org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2", <function2>)
    - field (class "org.apache.spark.streaming.dstream.ForEachDStream", name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", type: "interface scala.Function2")
    - object (class "org.apache.spark.streaming.dstream.ForEachDStream", org.apache.spark.streaming.dstream.ForEachDStream@548d5ed3)

Config not being read on driver and/or executor

The executor or driver doesn't seem to get getting config, even though the file is deployed to executors and -Dconfig.file is set:

java.lang.IllegalArgumentException: Wrong FS: file://xxxxx.cloudera.com:8020/tmp/Oryx/data, expected: hdfs://sssss.cloudera.com:8020
    at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
    at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:192)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:651)
    at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104)
    at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:716)
    at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:712)
    at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
    at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1625)
    at com.cloudera.oryx.lambda.BatchUpdateFunction.call(BatchUpdateFunction.java:90)
    at com.cloudera.oryx.lambda.BatchUpdateFunction.call(BatchUpdateFunction.java:45)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
...

Support user functions using Scala Spark API

Right now, the various interfaces that a user app implements assume using the Java Spark APIs. This is not a good assumption. This should be refactored to as easily accommodate use of the Scala Spark APIs.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.