oryxproject / oryx Goto Github PK
View Code? Open in Web Editor NEWOryx 2: Lambda architecture on Apache Spark, Apache Kafka for real-time large scale machine learning
Home Page: http://oryx.io
License: Apache License 2.0
Oryx 2: Lambda architecture on Apache Spark, Apache Kafka for real-time large scale machine learning
Home Page: http://oryx.io
License: Apache License 2.0
To implement a few types of functionality that will be required, it seems that all data and model/update events will require a timestamp of some kind.
This is a placeholder to track adding a basic k-means speed app.
Now that we will be dealing with more complex data, and consuming it at times via CSV, we need to employ a real CSV parser like Commons CSV instead of merely splitting on comma.
There are several types of exceptions that appear regularly in the logs when integration tests run. They do not appear to affect the result, and tests pass. They may just be corner cases, from ZK and Kafka being rapidly started and stopped. This is a to-do to see if we can ever figure them out.
Updated to include what's being observed most recently. It's almost all:
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:123)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at java.lang.Thread.run(Thread.java:745)
Rarely:
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.poll(SocketChannelImpl.java:956)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:204)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
at kafka.utils.Utils$.read(Utils.scala:380)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:111)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
and a few in the web tests:
java.lang.IllegalStateException: ServiceLocatorImpl(__HK2_Generated_73,74,527137253) has been shut down
at org.glassfish.grizzly.servlet.FilterChainImpl.doFilter(FilterChainImpl.java:151)
at org.glassfish.grizzly.servlet.FilterChainImpl.invokeFilterChain(FilterChainImpl.java:106)
at org.glassfish.grizzly.servlet.ServletHandler.doServletService(ServletHandler.java:221)
at org.glassfish.grizzly.servlet.ServletHandler.service(ServletHandler.java:169)
at org.glassfish.grizzly.http.server.HttpHandler$1.run(HttpHandler.java:219)
at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork(AbstractThreadPool.java:565)
at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.run(AbstractThreadPool.java:545)
at java.lang.Thread.run(Thread.java:745)
Others seem to have gone.
A new user or itemm shows up quickly in the serving layer due to updates from the speed layer. However, if a batch model build was in progress before it sees the new user/item, it will publish a model without the new user/item. The serving layer will end up removing the user/item vector. this mechanism is needed to remove stale users/items in general. But this should be fixed with some bookkeeping in the serving layer.
Modify this to use float[][] as opposed to List<float[]> so as to avoid the overhead of a List iterator. This should also keep it consistent with the present implementation in DotsFunction.
Typesafe config is also used by things like Spark. While there are no immediate namespace conflicts in the joined config file, it's best to put every app config value into an oryx
parent namespace.
Right now the app-tier modules also start with oryx-ml
, which just doesn't seem right. There is also no appropriate module to share code between the provided app's serving, and speed/batch implementations, which should not depend on one another. The proposal is:
oryx-ml-mllib
to oryx-app-mllib
oryx-ml-oryx
to oryx-app
oryx-ml-oryx-serving
to oryx-app-serving
oryx-app-common
Usages of a DStream
from Kafka need to have their checkpoint directory set to improve recovery in case of a failure.
This is a placeholder to track adding a basic k-means batch app.
In general, serving layer needs a framework for exposing the Kafka input queue.
The app needs to be able to control how many executors to use, and how much memory to give. The application can expose a simplified version of what Spark exposes in its config file.
The code consistently refers to queues, but, Kafka's term is "topic", and the code is closely enough bound to Kafka that it should call it the same.
i would like to ask about the licence of the new oryx v2 (oryxproject): is this code/product free to use for company's commercial goals?
and also a sideway question, since i'm moving from myrrix to oryx, shall i use the last oryxProject oryx v.2 or can i use still old oryx or myrrix as well? in meaning of future development.
The ALS implementation of multithreaded topN has a concurrency bug as it modifies a PriorityQueue without correct synchronization.
R] error: error while loading , error in opening zip file
[ERROR] error: scala.reflect.internal.MissingRequirementError: object scala.runtime in compiler mirror not found.
[ERROR] at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
[ERROR] at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:40)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getPackage(Mirrors.scala:172)
[ERROR] at scala.reflect.internal.Mirrors$RootsBase.getRequiredPackage(Mirrors.scala:175)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage$lzycompute(Definitions.scala:183)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackage(Definitions.scala:183)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass$lzycompute(Definitions.scala:184)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.RuntimePackageClass(Definitions.scala:184)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr$lzycompute(Definitions.scala:1024)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.AnnotationDefaultAttr(Definitions.scala:1023)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreClasses$lzycompute(Definitions.scala:1153)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.syntheticCoreClasses(Definitions.scala:1152)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode$lzycompute(Definitions.scala:1196)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.symbolsNotPresentInBytecode(Definitions.scala:1196)
[ERROR] at scala.reflect.internal.Definitions$DefinitionsClass.init(Definitions.scala:1261)
[ERROR] at scala.tools.nsc.Global$Run.(Global.scala:1290)
[ERROR] at scala.tools.nsc.Driver.doCompile(Driver.scala:32)
[ERROR] at scala.tools.nsc.Main$.doCompile(Main.scala:79)
[ERROR] at scala.tools.nsc.Driver.process(Driver.scala:54)
[ERROR] at scala.tools.nsc.Driver.main(Driver.scala:67)
[ERROR] at scala.tools.nsc.Main.main(Main.scala)
[ERROR] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[ERROR] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[ERROR] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[ERROR] at java.lang.reflect.Method.invoke(Method.java:483)
[ERROR] at scala_maven_executions.MainHelper.runMain(MainHelper.java:164)
[ERROR] at scala_maven_executions.MainWithArgsInFile.main(MainWithArgsInFile.java:26)
Spark 1.2.0 should no longer force consumers to always start from the beginning of a topic after recovering from failure. It may / should be possible to just use a consistent consumer.id
so that the job picks up reading where it left off. This would probably be better semantically for the Batch and Speed Layer.
This is a placeholder to track adding a basic random decision forest batch app. As we're still on Spark 1.1, the MLlib-based portion will have to be based only on a DecisionTree.
Right now, hyperparameters are only supported if they are numeric. It should be possible to test discrete unordered values, like for example different node quality measures ("entropy", "gini", etc.)
KMeansUpdateIT
is disabled right now since it mysteriously hangs if you run it. The second call to KMeans.train
never returns. I find that it doesn't seem to get scheduled even since the last thing in the logs is the call to train
, and the Spark UI just shows the parent foreachRDD
from BatchLayer
executing -- in my case, I see it's stuck at 8 / 18 partitions.
Suneel has been investigating but I am current looking at it too.
This is a placeholder to track adding a basic random decision forest speed app
At the moment, we're going to launch the batch / speed / serving binaries directly, and embed Spark as a client, rather than use spark-submit
, for simplicity. We still need a convenience script that will set up classpath and environment variables.
Right now it will use 4040 or another free port, but, this can be configurable and set to random ports in the unit tests explicitly.
There's a bug that prevents SSL configuring correctly. I'll fix and add a test.
This is a placeholder to track adding a basic k-means serving app.
Could we make oryx runnable in windows? As we can run spark in windows.
It would be easier for us to try oryx.
Thanks...
We should override the default Tomcat one with the simple/pretty one from Oryx 1, without getting JSPX involved.
... includes Variation of Information, VanDongen criterion and Silhoutte (http://en.wikipedia.org/wiki/Silhouette_%28clustering%29)
Since we're bothering with special high-speed primitive collections, it's worth trying http://openhft.net/products/koloboke-collections/ which was recommended on the Flink mailing list for speed/memory efficiency.
... and at least Long everywhere else, except the Spark MLlib implementation which requires Integer.
It might be useful to reintroduce the simple HTML page that let people enter queries to endpoints from the browser.
Endpoints in the provided apps tend to return things like text/csv
content. Browsers like Chrome present this as a download rather than displaying the text. Ideally the service would know it can return text/plain
. Chrome's Accept
header is:
text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
It would be nice to get the known items for a user, since this is already stored by the ALS endpoint.
This is a placeholder to track adding a basic random decision forest serving app.
It's necessary to have some form of decay for data to age out old users and items. This should be ported to the new framework for 2.x.
@kkasravi @smarthi I probably need your help on this one. If you run ALSServingModelManagerIT
you'll see it fails. (You can go directly there with mvn -DskipTests install
and mvn verify -pl oryx-ml-oryx-serving
.)
The problem is just that OryxApplication
and the WebListener
ModelManagerListener
doesn't initialize. Should be simple to fix but I don't know what to tweak?
I thought this had something to do with it but does it?
ContextConfig contextConfig = new ContextConfig();
context.addLifecycleListener(contextConfig);
It would be a better test if Kafka / ZK processes used a random free port each time, to expose potential problems from using the default ports. It might stop tests from accidentally talking to the wrong instance. This also makes it a little more realistic to parallelize tests.
The HyperParamTuningIT
regularly fails because the test scenario is not quite robust. It should be modified so that the intended model is very clear.
The Serving Layer has to read models from the update queue at startup. It's not suitable to simply wait until the next model appears. It's also potentially too slow to read from the beginning of the topic. Ideally the offset it starts at is intelligent in some way to try to read from the last model on the topic forward.
Before the real first release, I think we'll want to move to Java 8, to take advantage of lambdas. Using Spark without these in Java makes the result significantly harder to parse and develop.
hi,Sean!
I am from china ,could not contact you by email or IM,so I write a issues here!
the architect of WUBA,the nasdaq-listed company
I found your project in github several days ago,and very interest in your great work!
after compare to other opensource recommend implements,Oryx 2 has more fashion,use the latest opensource dependency,such as use Kafka to transfer data、spark to compute、HDFS distribute file system to store......,and use the Lambda architecture.
I anaylze the source code ,this project is new document and code is not complete,could I join to write some code or document?
Kafka might need to use a ZK chroot; it does in CDH for example. This should be supported in the config.
Now that https://issues.apache.org/jira/browse/SPARK-4196 is fixed, checkpointing can move forward when we move to Spark 1.2.0. There is still a problem with a reference to SparkContext in BatchUpdateFunction:
- field (class "com.cloudera.oryx.lambda.BatchUpdateFunction", name: "sparkContext", type: "class org.apache.spark.api.java.JavaSparkContext")
- object (class "com.cloudera.oryx.lambda.BatchUpdateFunction", com.cloudera.oryx.lambda.BatchUpdateFunction@e0a4232)
- field (class "org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2", name: "foreachFunc$2", type: "interface org.apache.spark.api.java.function.Function2")
- object (class "org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2", <function2>)
- field (class "org.apache.spark.streaming.dstream.ForEachDStream", name: "org$apache$spark$streaming$dstream$ForEachDStream$$foreachFunc", type: "interface scala.Function2")
- object (class "org.apache.spark.streaming.dstream.ForEachDStream", org.apache.spark.streaming.dstream.ForEachDStream@548d5ed3)
The executor or driver doesn't seem to get getting config, even though the file is deployed to executors and -Dconfig.file
is set:
java.lang.IllegalArgumentException: Wrong FS: file://xxxxx.cloudera.com:8020/tmp/Oryx/data, expected: hdfs://sssss.cloudera.com:8020
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:192)
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:651)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:104)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:716)
at org.apache.hadoop.hdfs.DistributedFileSystem$14.doCall(DistributedFileSystem.java:712)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:712)
at org.apache.hadoop.fs.Globber.listStatus(Globber.java:69)
at org.apache.hadoop.fs.Globber.glob(Globber.java:217)
at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1625)
at com.cloudera.oryx.lambda.BatchUpdateFunction.call(BatchUpdateFunction.java:90)
at com.cloudera.oryx.lambda.BatchUpdateFunction.call(BatchUpdateFunction.java:45)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
...
http://kafka.apache.org/documentation.html#compaction might be usable to keep only the last known MODEL
message on the update queue, and perhaps intelligently discard updates before it.
Right now, the various interfaces that a user app implements assume using the Java Spark APIs. This is not a good assumption. This should be refactored to as easily accommodate use of the Scala Spark APIs.
Not clear yet how important this will be, but we may need to support more than 1 Kafka consumer
The RescorerProvider
abstraction still needs to be ported into the ALS serving app.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.