Coder Social home page Coder Social logo

reactor's Introduction

Reactor Project

Join the chat at https://gitter.im/reactor/reactor

Download

Starting from 3.0, Reactor is now organized into multiple projects:

A set of compatible versions for all these projects is curated under a BOM ("Bill of Materials") hosted under this very repository.

Using the BOM with Maven

In Maven, you need to import the bom first:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2024.0.0-M1</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

Notice we use the <dependencyManagement> section and the import scope.

Next, add your dependencies to the relevant reactor projects as usual, except without a <version>:

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Using the BOM with Gradle

Gradle 5.0+

Use the platform keyword to import the Maven BOM within the dependencies block, then add dependencies to your project without a version number.

dependencies {
     // import BOM
     implementation platform('io.projectreactor:reactor-bom:2024.0.0-M1')

     // add dependencies without a version number
     implementation 'io.projectreactor:reactor-core'
}

Gradle 4.x and earlier

Gradle versions prior to 5.0 have no core support for Maven BOMs, but you can use Spring's gradle-dependency-management plugin.

First, apply the plugin from Gradle Plugin Portal (check and change the version if a new one has been released):

plugins {
    id "io.spring.dependency-management" version "1.0.11.RELEASE"
}

Then use it to import the BOM:

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2024.0.0-M1"
     }
}

Then add a dependency to your project without a version number:

dependencies {
     compile 'io.projectreactor:reactor-core'
}

BOM Versioning Scheme

The BOM can be imported in Maven, which will provide a set of default artifact versions to use whenever the corresponding dependency is added to a pom without an explicitly provided version.

As the different artifacts versions are not necessarily aligned, the BOM represents a release train with an heterogeneous range of versions that are curated to work together. The artifact version follows the YYYY.MINOR.MICRO-QUALIFIER scheme since Europium, where:

  • YYYY is the year of the first GA release in a given release cycle (like 3.4.0 for 3.4.x)
  • .MINOR is a 0-based number incrementing with each new release cycle ** in the case of the BOM it allows discerning between release cycles in case two get first released the same year
  • .PATCH is a 0-based number incrementing with each service release
  • -QUALIFIER is a textual qualifier, which is omitted in the case of GA releases (see below)

On top of the artifact version, each release train has an associated codename, a chemical name from the Periodic Table of Elements in growing alphabetical order, for reference in discussions.

So far, the release trains code names are:

  • Aluminium for the 3.0.x generation of Reactor-Core (๐Ÿ’ก)
  • Bismuth for the 3.1.x generation (๐Ÿ’ก)
  • Californium for the 3.2.x generation (๐Ÿ’ก)
  • Dysprosium for the 3.3.x generation (๐Ÿ’ก)
  • Europium (2020.0) for the 3.4.x generation (๐Ÿ’ก)

NOTE: Up until Dysprosium, the BOM was versioned using a release train scheme with a codename followed by a qualifier, and the qualifiers were slightly different. For example: Aluminium-RELEASE (first GA release, would now be something like YYYY.0.0), Bismuth-M1, Californium-SR1 (service release would now be something like YYYY.0.1), Dysprosium-RC1, Dysprosium-BUILD-SNAPSHOT (after each patch, we'd go back to the same snapshot version. would now be something like YYYY.0.X-SNAPSHOT so we get 1 snapshot per PATCH).

Contributing, Community / Support

license

As hinted above, this repository is for hosting the BOM and for transverse issues only. Most of the time, if you're looking to open an issue or a PR, it should be done in a more specific repository corresponding to one of the actual artifacts.

All projects follow the same detailed contributing guidelines which you can find here.

This document also give some ways you can get answers to your questions.

Documentation

Detail of Projects

Reactor Core

Reactor Core

Reactive foundations for apps and frameworks and reactive extensions inspired API with Mono (1 element) and Flux (n elements) types

Reactor Netty

Reactor Netty

TCP and HTTP client and server.

Reactor Addons

Reactor Addons

Extra projects adding features to reactor:

Snapshot Artifacts

While Stable Releases are synchronized with Maven Central, fresh snapshot and milestone artifacts are provided in the repo.spring.io repositories.

To add this repo to your Maven build, add it to the <repositories> section like the following:

<repositories>
	<repository>
	    <id>spring-snapshot</id>
	    <name>Spring Snapshot Repository</name>
	    <url>https://repo.spring.io/snapshot</url>
	    <snapshots>
	        <enabled>true</enabled>
	    </snapshots>
	</repository>
</repositories>

To add it to your Gradle build, use the repositories configuration like this:

repositories {
	maven { url 'https://repo.spring.io/libs-snapshot' }
	mavenCentral()
}

You should then be able to import a -SNAPSHOT version of the BOM, like 2020.0.{NUMBER}-SNAPSHOT for the snapshot of the {NUMBER}th service release of 2020.0 (Europium).

Sponsored by VMware

reactor's People

Contributors

0xflotus avatar bclozel avatar bsideup avatar chemicl avatar dependabot[bot] avatar ericbottard avatar marcingrzejszczak avatar olegdokuka avatar parkerm avatar pderop avatar rouilleur avatar simonbasle avatar smaldini avatar spring-builds avatar violetagg avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reactor's Issues

Dispatcher-aware SLF4J logging implementation

Doing logging in an async handler can be extremely detrimental to the overall performance of an application. It spends a lot of time trying to lock output streams and the like.

I propose we create our own Dispatcher-based SLF4J logging implementation that will eliminate these problems by having it's own dedicated Dispatcher(s) for doing logging. Then we can make the SLF4J API completely asynchronous and eliminate locking issues when using standard logging tools in asynchronous handlers.

Clean Up SSL Handshaking

The TcpNioSSLConnection was pretty much a direct port from SI and had to be munged to work with the reactor threading model.

It needs some work to make it cleaner.

Promise's state tracking is not thread-safe

Promise current keeps track of its own state, separate to that of its Composable super class. A side-effect of this is that it's not thread-safe, and success and error handlers may not be called if they're registered while the state is being changed.

Add support for ExceptionMappers

I think it would be great to have something like JAX-RS ExceptionMappers.
So you could register a type of exception and it would be handled out of the Consumer logic.

Add support for catch/all errors with sl4j logging

We can provide a default option to attach on(T(Throwable), consumer) to trigger any sl4j error logging as long as you have reactor-logback setup.
This would probably be better combined to a solid Configuration Builder.

Improve Context and Dispatcher lifecycle

The current handling of Context and Dispatcher lifecycle is a little bit muddled.

The BlockingQueueDispatcher, RingBufferDispatcher, and ThreadPoolExecutorDispatcher all start themselves implicitly in their constructors. BlockingQueueDispatcher and RingBufferDispatcher repeat this start processing in their start() methods, so, if start() is called, they're relying on the start processing being idempotent. ThreadPoolExecutorDispatcher does nothing in its start method.

Context doesn't start anything implicitly. Its start() implementation calls start() on all of its Dispatchers so this, too, is relying on Dispatcher start processing being idempotent. The various Dispatchers that it provides static access to are only started by virtue of their implicit start in their constructors.

The above all feels a bit muddled to me, and it's unclear when a user should call start(), if at all. At the moment it's not necessary to call start() which makes the need to call stop() and destroy() to clean things up asymmetric.

Add Support for Direct Buffers

Spring Integration NIO support has an option to use NIO Direct Buffers. This is possible because the the data is copied from the ByteBuffer after the read event is fired and processed and before read events are re-enabled.

With reactor-tcp, the goal was to minimize data copying to the greatest extent, so the read buffer is passed to async decode and read events are immediately reenabled. This means that read events need to use a "new" buffer each time (because the decoding of the previous buffer may not have completed).

It is generally not advised to "churn" direct buffers because the memory allocation can be more expensive than simple heap-based byte buffers.

In order to support direct buffers in reactor-tcp we would likely need some kind of central pool of buffers that can be "borrowed", with a reference count, and returned to the pool when there are no more references.

README needs updated.

Looks like the README needs updated to reference the logback module as well as correct the samples after the notify(Object) change.

Remove `singleUse` Boolean from `ConnectionFactorySupport`

Remove the boolean and all references to it.

On the client side, the factory shouldn't be concerned whether a connection is shared or not, it should just hand a new one out when asked. The client should be responsible for reusing and/or closing the connection.

On the server side, socket timeouts can be used to close sockets left open by badly behaved clients.

Use TRACE level logging as much as possible instead of DEBUG

There's lots of logging in the tcp module that's related to the low-level manipulation of bytes, buffers, and the like. Because logging can be so detrimental to an async application's performance, and to reduce the logging overhead in a standard OOTB configuration, we should probably make that logging TRACE level, with only top-level actions logged in DEBUG. Things like connections started and stopped could be DEBUG level, but information about how many bytes are read and whatnot should probably be TRACE level.

Event persistence

Related to #27, we need to provide a persistence mechanism for events so they don't get lost. When in batch mode (or "paused"), a Reactor should have the capability to store events for later publication when coming out of batch mode (or "resumed").

Reuse existing Buffer class if possible

It would be great if we could reuse the existing Buffer class from reactor-core. The reactor-tcp Buffers class is good because it deals with multiple ByteBuffers, but I think it would be better to use a reactor.io.Buffer that wraps a raw ByteBuffer because the Buffer class has lots of helper methods on it to turn the internal ByteBuffer into various things. to read data as most of the primitive types, and also supports dynamic resizing (though in this case, we're using fixed sizes and zero copy...but when your'e creating these by writing a response, the situation will be different).

It might even be generally useful to have the Buffers class moved to reactor-core, where Buffer already is, so the zero-copy functionality could be used in other things like file access and other byte-centric tasks.

Race in startup/initialisation of TcpNioServerConnectionFactory

There appears to be a race during the startup/initialisation processing of TcpNioServerConnectionFactory. The build is intermittently failing with output similar to the following:

reactor.tcp.TcpServerReactorTests > tcpServerReactorCanReceiveRequestsNotifyConsumersAndSendResponses STANDARD_ERROR
21-May-2013 05:22:19        Exception in thread "pool-11-thread-1" java.lang.IllegalArgumentException: Factory not initialized
21-May-2013 05:22:19                at reactor.support.Assert.notNull(Assert.java:30)
21-May-2013 05:22:19                at reactor.tcp.TcpNioServerConnectionFactory.run(TcpNioServerConnectionFactory.java:89)
21-May-2013 05:22:19                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
21-May-2013 05:22:19                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
21-May-2013 05:22:19                at java.lang.Thread.run(Thread.java:722)
21-May-2013 05:22:49    Gradle Worker 6 finished executing tests.
21-May-2013 05:22:49    
21-May-2013 05:22:49    reactor.tcp.TcpServerReactorTests > tcpServerReactorCanReceiveRequestsNotifyConsumersAndSendResponses FAILED
21-May-2013 05:22:49        java.lang.IllegalStateException: Operation was not complete within 30000ms
21-May-2013 05:22:49            at reactor.tcp.test.TimeoutUtils.doWithTimeout(TimeoutUtils.java:33)
21-May-2013 05:22:49            at reactor.tcp.TcpServerReactorTests.awaitAlive(TcpServerReactorTests.java:106)
21-May-2013 05:22:49            at reactor.tcp.TcpServerReactorTests.tcpServerReactorCanReceiveRequestsNotifyConsumersAndSendResponses(TcpServerReactorTests.java:80)
21-May-2013 05:22:49    Process 'Gradle Worker 6' finished with exit value 0 (state: SUCCEEDED)

Don't create whenConsumer before it is actually needed

In Composable, currently:

    protected Composable<T> when(Selector sel, final Consumer<T> consumer) {
        Consumer<Event<T>> whenConsumer = new Consumer<Event<T>>() {
            @Override
            public void accept(Event<T> ev) {
                consumer.accept(ev.getData());
            }
        };
        if (!isComplete()) {
            observable.on(sel, whenConsumer);
        }else{
            R.schedule(consumer, value, observable);
        }
        return this;
    }

Creation of whenConsumer can be done inside the first if statement.

Add a builder API for Composable

Due to the recent constructor injection changes, it's not very easy to create a Composable that is based on an existing Reactor or that uses a specific Dispatcher implementation.

It would be nice to have a general-purpose builder API for Reactors and things that need Dispatchers so a Builder class that's independent is probably the best long-term way to go.

For the time being, though, I suggest we add a Builder inner class to Composable and have the static from method return this Builder just to provide a quick and easy alternative to what was previously possible by using the from method, then calling setDispatcher.

Promise API

Create a Promise API that provides users an easy way to be notified when deferred values become available.

Configuration API

A simple configuration API to manage reactors, dispatchers, balancing strategies and others options.

Reactor integration with Ratpack

Ratpack is a lightweight sinatra-like micro web framework. It does very focused things and does it well.
I think a dedicated module (whereas its a ratpack plugin or a reactor-ratpack artifact) would be very efficent for enabling new async features. It will even make more sense when we will have distribution support especially for people combining ratpack with other heavier backend integrated frameworks, let's say Grails :).

Registration cancelAfterUse does not guarantee that a Consumer will only be called once

This problem is somewhat similar to #44.

Registration.cancelAfterUse can be used to cancel a registration after it's been used. However, there's no guarantee of how many times it will be used before it's cancelled. This is due to a possible race between the registration being made (which makes it available for selection) and cancelAfterUse being called. It may be sufficient to document that cancelAfterUse cannot be used to guarantee at-most-once use, and perhaps provide an alternative mechanism that does guarantee at-most-once use.

Add Debug/Trace Logging for Missing Selectors

I incorrectly used

protected static final reactor.fn.Selector WRITE = $("tcp.write");
...
this.ioReactor.on(WRITE, new Consumer<Event<SelectionKey>> () {

        @Override
        public void accept(Event<SelectionKey> keyEvent) {
            handleWriteSelection(ioSelector, keyEvent.getData());
        }
});
...
this.getIoReactor().notify(WRITE, Fn.event(key));

Since the Selector didn't match, the event was discarded silently; (this previously worked on an older iteration of reactor).

Consider adding DEBUG/TRACE logging to make tracking down these issues easier.

Fix was:

protected static final String WRITE_KEY = "tcp.write";

protected static final reactor.fn.Selector WRITE = $(WRITE_KEY);

...

this.getIoReactor().notify(WRITE_KEY, Fn.event(key));

Create an HttpCodec

I already have some code from an earlier version of Reactor that uses the Apache Commons httpcomponents core classes for doing HTTP parsing. It's actually quite straightforward and would be an easy way to get HTTP support on top of a raw TCP Reactor.

I also have some custom parsing code that has no dependencies on anything but the JDK. That also might be useful in case the user decided they didn't want to depend on the httpcomponents library. Making the HttpCodec pluggable, then, would allow different parsing implementations to be plugged in, depending on the use case.

If the HTTP parsing classes in Jetty or Tomcat were usable directly, we could also provide implementations that used those classes in case the deployment platform was already one of those servers. I haven't looked to see if this is possible yet, but it would be interesting to find that out.

Separate load balancing out of CachingRegistry

CachingRegistry currently has two responsibilities:

  • Providing a registry of Selectors
  • Load-balancing the Selectors that match a particular key

We should split the two responsibilities into two separate classes.

This is somewhat related to #24

Create a generic, type-safe Tuple

It would be useful to have a Tuple that could hold arbitrary values of different types for managing APIs that provide interfaces with multiple arguments. If there's no Tuple class, the user would have to create their own classes to hold the type-safe references to different objects.

That's still a possibility, of course, but it would be generally useful to have a Tuple class that allowed holding multiple values and then providing a hierarchy of Tuple1, Tuple2, etc... that provided a generic signature and methods for getting the values of the Tuple in a type-safe way.

build fails horribly!

If I do this:

./gradlew build

I get this:

  • Exception is:
    org.gradle.api.UncheckedIOException: java.io.IOException: Input/output error
    at org.gradle.util.hash.HashUtil.createHash(HashUtil.java:56)
    at org.gradle.util.hash.HashUtil.createHash(HashUtil.java:34)
    at org.gradle.api.internal.changedetection.DefaultHasher.hash(DefaultHasher.java:24)
    at org.gradle.api.internal.changedetection.CachingHasher.hash(CachingHasher.java:42)
    at org.gradle.api.internal.changedetection.DefaultFileSnapshotter.snapshot(DefaultFileSnapshotter.java:44)
    at org.gradle.api.internal.changedetection.InputFilesChangedUpToDateRule.create(InputFilesChangedUpToDateRule.java:35)
    at org.gradle.api.internal.changedetection.CompositeUpToDateRule.create(CompositeUpToDateRule.java:35)
    at org.gradle.api.internal.changedetection.DefaultTaskArtifactStateRepository$HistoricExecution.calcCurrentState(DefaultTaskArtifactStateRepository.java:80)
    at org.gradle.api.internal.changedetection.DefaultTaskArtifactStateRepository$HistoricExecution.isUpToDate(DefaultTaskArtifactStateRepository.java:88)
    at org.gradle.api.internal.changedetection.DefaultTaskArtifactStateRepository$TaskArtifactStateImpl.isUpToDate(DefaultTaskArtifactStateRepository.java:128)
    at org.gradle.api.internal.changedetection.ShortCircuitTaskArtifactStateRepository$ShortCircuitArtifactState.isUpToDate(ShortCircuitTaskArtifactStateRepository.java:77)
    at org.gradle.api.internal.changedetection.FileCacheBroadcastTaskArtifactStateRepository$1.isUpToDate(FileCacheBroadcastTaskArtifactStateRepository.java:37)
    at org.gradle.api.internal.tasks.execution.SkipUpToDateTaskExecuter.execute(SkipUpToDateTaskExecuter.java:44)
    at org.gradle.api.internal.tasks.execution.ValidatingTaskExecuter.execute(ValidatingTaskExecuter.java:57)
    at org.gradle.api.internal.tasks.execution.SkipEmptySourceFilesTaskExecuter.execute(SkipEmptySourceFilesTaskExecuter.java:41)
    at org.gradle.api.internal.tasks.execution.SkipTaskWithNoActionsExecuter.execute(SkipTaskWithNoActionsExecuter.java:51)
    at org.gradle.api.internal.tasks.execution.SkipOnlyIfTaskExecuter.execute(SkipOnlyIfTaskExecuter.java:52)
    at org.gradle.api.internal.tasks.execution.ExecuteAtMostOnceTaskExecuter.execute(ExecuteAtMostOnceTaskExecuter.java:42)
    at org.gradle.api.internal.AbstractTask.executeWithoutThrowingTaskFailure(AbstractTask.java:275)
    at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor.executeTask(DefaultTaskPlanExecutor.java:52)
    at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor.processTask(DefaultTaskPlanExecutor.java:38)
    at org.gradle.execution.taskgraph.DefaultTaskPlanExecutor.process(DefaultTaskPlanExecutor.java:30)
    at org.gradle.execution.taskgraph.DefaultTaskGraphExecuter.execute(DefaultTaskGraphExecuter.java:84)
    at org.gradle.execution.SelectedTaskExecutionAction.execute(SelectedTaskExecutionAction.java:29)
    at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:61)
    at org.gradle.execution.DefaultBuildExecuter.access$200(DefaultBuildExecuter.java:23)
    at org.gradle.execution.DefaultBuildExecuter$2.proceed(DefaultBuildExecuter.java:67)
    at org.gradle.api.internal.changedetection.TaskCacheLockHandlingBuildExecuter$1.run(TaskCacheLockHandlingBuildExecuter.java:31)
    at org.gradle.internal.Factories$1.create(Factories.java:22)
    at org.gradle.cache.internal.DefaultCacheAccess.useCache(DefaultCacheAccess.java:124)
    at org.gradle.cache.internal.DefaultCacheAccess.useCache(DefaultCacheAccess.java:112)
    at org.gradle.cache.internal.DefaultPersistentDirectoryStore.useCache(DefaultPersistentDirectoryStore.java:134)
    at org.gradle.api.internal.changedetection.DefaultTaskArtifactStateCacheAccess.useCache(DefaultTaskArtifactStateCacheAccess.java:79)
    at org.gradle.api.internal.changedetection.TaskCacheLockHandlingBuildExecuter.execute(TaskCacheLockHandlingBuildExecuter.java:29)
    at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:61)
    at org.gradle.execution.DefaultBuildExecuter.access$200(DefaultBuildExecuter.java:23)
    at org.gradle.execution.DefaultBuildExecuter$2.proceed(DefaultBuildExecuter.java:67)
    at org.gradle.execution.DryRunBuildExecutionAction.execute(DryRunBuildExecutionAction.java:32)
    at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:61)
    at org.gradle.execution.DefaultBuildExecuter.execute(DefaultBuildExecuter.java:54)
    at org.gradle.initialization.DefaultGradleLauncher.doBuildStages(DefaultGradleLauncher.java:165)
    at org.gradle.initialization.DefaultGradleLauncher.doBuild(DefaultGradleLauncher.java:113)
    at org.gradle.initialization.DefaultGradleLauncher.run(DefaultGradleLauncher.java:81)
    at org.gradle.launcher.cli.ExecuteBuildAction.run(ExecuteBuildAction.java:38)
    at org.gradle.launcher.exec.InProcessGradleLauncherActionExecuter.execute(InProcessGradleLauncherActionExecuter.java:39)
    at org.gradle.launcher.exec.InProcessGradleLauncherActionExecuter.execute(InProcessGradleLauncherActionExecuter.java:25)
    at org.gradle.launcher.cli.RunBuildAction.run(RunBuildAction.java:50)
    at org.gradle.api.internal.Actions$RunnableActionAdapter.execute(Actions.java:171)
    at org.gradle.launcher.cli.CommandLineActionFactory$ParseAndBuildAction.execute(CommandLineActionFactory.java:201)
    at org.gradle.launcher.cli.CommandLineActionFactory$ParseAndBuildAction.execute(CommandLineActionFactory.java:174)
    at org.gradle.launcher.cli.CommandLineActionFactory$WithLogging.execute(CommandLineActionFactory.java:170)
    at org.gradle.launcher.cli.CommandLineActionFactory$WithLogging.execute(CommandLineActionFactory.java:139)
    at org.gradle.launcher.cli.ExceptionReportingAction.execute(ExceptionReportingAction.java:33)
    at org.gradle.launcher.cli.ExceptionReportingAction.execute(ExceptionReportingAction.java:22)
    at org.gradle.launcher.Main.doAction(Main.java:48)
    at org.gradle.launcher.bootstrap.EntryPoint.run(EntryPoint.java:45)
    at org.gradle.launcher.Main.main(Main.java:39)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.gradle.launcher.bootstrap.ProcessBootstrap.runNoExit(ProcessBootstrap.java:50)
    at org.gradle.launcher.bootstrap.ProcessBootstrap.run(ProcessBootstrap.java:32)
    at org.gradle.launcher.GradleMain.main(GradleMain.java:26)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at org.gradle.wrapper.BootstrapMainStarter.start(BootstrapMainStarter.java:33)
    at org.gradle.wrapper.WrapperExecutor.execute(WrapperExecutor.java:130)
    at org.gradle.wrapper.GradleWrapperMain.main(GradleWrapperMain.java:48)
    Caused by: java.io.IOException: Input/output error
    at java.io.FileInputStream.readBytes(Native Method)
    at java.io.FileInputStream.read(FileInputStream.java:220)
    at org.gradle.util.hash.HashUtil.createHash(HashUtil.java:46)
    ... 70 more

BUILD FAILED

can't we go back to maven?

At least it just works (mostly)

Add vert.x module

Either a literal vert.x module or an embedded vert.x inside a Dispatcher implementation...?

Provide a thread-safe way for a Consumer to access its Registration

A Consumer's Registration is returned when the Consumer is passed to Reactor.on. As things stand, if the Consumer wants to access its own Registration it cannot do so without being at risk of a race condition: the Consumer's accept() method may be called before Reactor.on has returned so it won't have access to its Registration.

One solution would be for a Consumer to optionally implement RegistrationAware and have the registry pass the Registration to the Consumer before it's actually added to the registry - this would ensure that the Registration is set before accept can be called.

Ordering guarantees

A reader of the blog post made a very interesting comment about Event ordering [1]. I consider this related to #41 and #27.

I'm comparing Reactor with Akka and Jetlang. I really like the clean Composable/Promise design, and the terseness of Reactor compared especially to Akka since I don't require remote transparency and prefer a concise foundation framework for Java.

One thing that seems missing in Reactor is guarantee on the serialized invocations of Consumers. One answer in the Google group mentions possibility of creating a separate reactor with BlockingQueueDispatcher for such serialized invocations. But in that case, we end up creating bunch of heavyweight threads to have such reactors.

Do you have any plan on introducing such a serialization guarantee? I mean like Fiber in Jetlang getting messages in order, or like actor processing messages in its mailbox one by one.

[1] - http://blog.springsource.org/2013/05/13/reactor-a-foundation-for-asynchronous-applications-on-the-jvm/#comment-346260

JavaConfig support for Reactor

Something like @EnableReactors which creates some default plumbing, with suitable methods to reconfigure the defaults if required.

Bulk Eventing

One of the key LMAX RingBuffer features is the events-bulking, we have some use cases especially w.r.t. data ingestion where this would be very interesting to explore.

Create an XML namespace for Reactor

For XML-based Spring configuration we should provide an XML namespace for creating Reactors and Dispatchers (and anything else we think would benefit from namespace support).

Registration cancellation is not thread-safe

CachableRegistration.cancel removes an entry from registrations and sets refreshRequired to true, but does so without holding the write lock. This isn't thread-safe and, among other things, can result in ConcurrentModificationExceptions being thrown when iterating over the registrations.

Helper to create unique Selector/Object combos

It's a little verbose to create a Selector/Object combination now if you want to do explict p2p eventing. You have to do something like:

Object objKey = new Object();
Selector objSel = $(objKey);

reactor.on(objSel, ...);
reactor.notify(objKey, ...);

It would be useful to have a helper function that returned a Tuple2<Selector, Object> that would generate these things for you so that the above would become:

Tuple2<Selector, Object> obj = Fn.createSelector(); // or called something else

reactor.on(obj.getT1(), ...);
reactor.notify(obj.getT2(), ...);

Composable does not support null values

When the value is null, the Composable is considered to be incomplete. This makes it impossible for the value to actually be null. Instead, we need to use a valueSet boolean, or similar, to track completion.

ComposableThroughputTests: not clear what is being tested - created Composable does not do anything

The test creates this Composable (System.out.println are mine):

    private Composable<Integer> createComposable(Dispatcher dispatcher) {
        return new Composable<Integer>(dispatcher)
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer) {
                        System.out.println("mapping: "+integer);
                        return integer;
                    }
                })
                .reduce(new Function<Composable.Reduce<Integer, Integer>, Integer>() {
                    @Override
                    public Integer apply(Composable.Reduce<Integer, Integer> r) {
                        int last = (null != r.getLastValue() ? r.getLastValue() : 1);
                        System.out.println("reducing: last value: "+r.getLastValue()+", nextValue: "+r.getNextValue());
                        return last + r.getNextValue();
                    }
                })
                .consume(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) {
                        System.out.println("consuming: "+integer);
                        latch.countDown();
                    }
                });
    }

When I run the test, only consume method prints to console. Map and Reduce do not do anything.

Not sure what the test it trying to test - just speed of consuming (then kill map and reduce) or cumulative processing as well (then there is a bug somewhere because map and reduce don't produce any output)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.