Coder Social home page Coder Social logo

advantageous / reakt Goto Github PK

View Code? Open in Web Editor NEW
59.0 59.0 9.0 305 KB

Fluent Java Reactive interfaces. Promises, Streams, Callbacks, Async results, interfaces for Java 8 that are lambda expression friendly.

Home Page: http://advantageous.github.io/reakt/

License: Apache License 2.0

Java 100.00%

reakt's People

Contributors

mammatusplatypus avatar richardhightower avatar sailorgeoffrey avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reakt's Issues

Create promise.thenMapExpect

Create promise.thenMapExpect
This is a minor addition so no need to do a major release.
It is a backwards compatible change.

Get rid of this boiler plate code

serviceQueue = serviceBuilder
                .setServiceObject(SomeImpl(mgmt!!, someService!!, repo!!))
                .addQueueCallbackHandler(object : QueueCallBackHandler{
                    override fun queueProcess() {
                        mgmt?.process();
                    }
                })
                .buildAndStartAll()

Reakt 3 Simplify Promise and Callback.

We added a CallbackHandle and a PromiseHandle that simplifies the interface of Promise and Callback. (Checked in and it works.) Promises has a deferCall method that returns a PromiseHandle and is given a Consumer<CallbackHandle>. The PromiseHandle only has then, catchErrorโ€ฆ. The CallbackHandle only has resolve, reject. In version 3 of Reakt, PromiseHandle and CallbackHandle will be Promise and Callback and what is now Promise and Callback will be SPI classes probably called PromiseHandler and CallbackHandler.

Create Promise.thenMap

We need something like this.

Promise<Result> resultPromise = Promise.<Result>promise();

Promise<Employee> employeePromise = resultPromise.<Employee> thenMap((promise)-> {
         Result result = promise.get(); 
         Employee e = new Employee(result.getCol("id"), result.getCol("firstName"), ...);
         return e;
});

Or rewritten as....

Promise<Employee> employeePromise = Promise.<Result>promise().  
                                  <Employee> thenMap((promise)-> {
                        Result r = promise.get(); 
                        return new Employee(r.getCol("id"), r.getCol("firstName"), ...);
});

This is like an async map. The thenTransform must take a Function to convert from one type of object to another.

Blocking.thenMap return wrong type of map

This code

    @Override
    public <U> Promise<U> thenMap(Function<? super T, ? extends U> mapper) {
        final Promise<U> mappedPromise = Promises.promise();
        this.whenComplete(p -> {
            final BlockingPromise<T> promise = (BlockingPromise) p;
            if (promise._success()) {
                final T t = promise.result.get().get();
                final U mapped = mapper.apply(t);
                mappedPromise.reply(mapped);
            } else {
                mappedPromise.reject(promise.cause());
            }
        });
        return mappedPromise;
    }

Should be

    @Override
    public <U> Promise<U> thenMap(Function<? super T, ? extends U> mapper) {
        final Promise<U> mappedPromise = Promises.blockingPromise();
        this.whenComplete(p -> {
            final BlockingPromise<T> promise = (BlockingPromise) p;
            if (promise._success()) {
                final T t = promise.result.get().get();
                final U mapped = mapper.apply(t);
                mappedPromise.reply(mapped);
            } else {
                mappedPromise.reject(promise.cause());
            }
        });
        return mappedPromise;
    }

Create invokable promises

Remote Proxy Gen to support returning Reakt invokeable promise

        employeeService.lookupEmployee("123")
               .then((employee)-> {...}).catchError(...).invoke();

It should support reactor too.

        employeeService.lookupEmployee("123")
               .reactor(mgmt.reactor())
               .then((employee)-> {...}).catchError(...).invoke();

Do we just add an invoke method? and an isInovkable flag?

invoke with Reactor

        repository.fetchRawMetricsSince(snapshot.timestamp)
                .then { rawMetrics -> becomeMaster(snapshot, callback, rawMetrics) }
                .catchError { error -> rejectBeingMaster(snapshot, callback, error) }
                .invokeWithPromise(mgmt.reactor().promise());

We need an invokeWithReactor so that invokations happen on same thread as an actor or event loop if needed.

we need an breaker1.ifBrokeOrBroke(breaker2)

There are often times actions that I want to do to clear a broken breaker which might mean initializing more than one system.

breaker1.ifBrokeOrBroke(breaker2) {
     //do something if either breaker is broken
}

create thenCallback that is like thenPromise for chaining a callback.

create thenCallback that is like thenPromise for chaining a callback.



    /**
     * Allows you to pass an existing promise as a handler.
     *
     * @param promise promise
     * @return this, fluent
     */
    default Promise<T> thenPromise(Promise<T> promise) {
        this.catchError(promise::reject).then(promise::resolve);
        return this;
    }
``

ServiceManagementBundleBuilder should provide a default health manager

java.lang.NullPointerException: ServiceHealthManager must be set

at java.util.Objects.requireNonNull(Objects.java:228)
at io.advantageous.qbit.admin.ServiceManagementBundleBuilder.getServiceHealthManager(ServiceManagementBundleBuilder.java:117)
at io.advantageous.qbit.admin.ServiceManagementBundleBuilder.build(ServiceManagementBundleBuilder.java:167)

For easy integration with other libs make Callback also a Consumer. And expose an error consumer.

For release notes

A callback is now a consumer that has an errorConsumer

/* 
 * A {@code Callback} is a {@code Consumer} and can be used anywhere a consumer is used.
 * This is for easy integration with non-Reakt libs and code bases.
 * <p>
 *
 * @param <T> type of result returned from callback
 */
public interface Callback<T> extends Consumer<T> {
    ...

    /**
     * Bridge between Consumer world and Callback world
     * Performs this operation on the given argument.
     *
     * @param t the input argument
     */
    @Override
    default void accept(T t) {
        reply(t);
    }

    /**
     * Used to convert the error handling of the callback or promise
     * into a Consumer so you can easily integrate with non-Reakt code.
     *
     * @return Consumer version of error handling.
     */
    default Consumer<Throwable> errorConsumer() {
        return this::reject;
    }

    /**
     * Used to easily cast this callback to a consumer.
     *
     * @return Consumer version of this callback.
     */
    default Consumer<T> consumer() {
        return this;
    }
}

Create friendly methods for basic types like promiseBoolean, and promiseString, and promise(Employee.class).

We need this in both the reactor Promises.

class Promises

    /**
     * Returns a String promise
     * Added to make static imports possible.
     *
     * @return returns a string promise
     */
    static Promise<String> promiseString() {
        return new BasePromise<>();
    }

    /**
     * Returns a Integer promise
     * Added to make static imports possible.
     *
     * @return returns an int promise
     */
    static Promise<Integer> promiseInt() {
        return new BasePromise<>();
    }


    /**
     * Returns a Long promise
     * Added to make static imports possible.
     *
     * @return returns an long promise
     */
    static Promise<Long> promiseLong() {
        return new BasePromise<>();
    }

    /**
     * Returns a Double promise
     * Added to make static imports possible.
     *
     * @return returns an double promise
     */
    static Promise<Double> promiseDouble() {
        return new BasePromise<>();
    }


    /**
     * Returns a Float promise
     * Added to make static imports possible.
     *
     * @return returns an float promise
     */
    static Promise<Float> promiseFloat() {
        return new BasePromise<>();
    }

    /**
     * Returns a void promise for notify of outcome but no value returned.
     * <p>
     * Callback replyDone can be used instead of replay on service side.
     *
     * @return void promise
     */
    static Promise<Void> promiseNotify() {
        return new BasePromise<>();
    }

    /**
     * Boolean promise
     * Added to make static imports possible.
     *
     * @return promises a boolean
     */
    static Promise<Boolean> promiseBoolean() {
        return new BasePromise<>();
    }


    /**
     * Generic promise.
     * Added to make static imports possible.
     *
     * @param cls type
     * @param <T> promise of a result of T
     * @return new Promise of type T
     */
    @SuppressWarnings("unused")
    static <T> Promise<T> promise(Class<T> cls) {
        return new BasePromise<>();
    }


    /**
     * Generic list promise.
     * Added to make static imports possible.
     *
     * @param componentType component type of list
     * @param <T>           promise a list of type T
     * @return new Promise for a list of type T
     */
    @SuppressWarnings("unused")
    static <T> Promise<List<T>> promiseList(Class<T> componentType) {
        return new BasePromise<>();
    }

    /**
     * Generic collection promise.
     * Added to make static imports possible.
     *
     * @param componentType component type of collection
     * @param <T>           promise a collection of type T
     * @return new Promise for a collection of type T
     */
    @SuppressWarnings("unused")
    static <T> Promise<Collection<T>> promiseCollection(Class<T> componentType) {
        return new BasePromise<>();
    }

    /**
     * Generic map promise.
     * Added to make static imports possible.
     *
     * @param keyType   type of map key
     * @param valueType type of map value
     * @param <K>       promise a map of  key type K
     * @param <V>       promise a map of  value type V
     * @return new Promise for a collection of type T
     */
    @SuppressWarnings("unused")
    static <K, V> Promise<Map<K, V>> promiseMap(Class<K> keyType, Class<V> valueType) {
        return new BasePromise<>();
    }

    /**
     * Generic set promise.
     * Added to make static imports possible.
     *
     * @param componentType component type of set
     * @param <T>           promise a set of type T
     * @return new Promise for a set of type T
     */
    @SuppressWarnings("unused")
    static <T> Promise<Set<T>> promiseSet(Class<T> componentType) {
        return new BasePromise<>();
    }

We also have them for the reactor (methods off of the reactor), and we have them for blocking and replay promises off of Promises utility class.

add connivence method for testing invokeAsBlockingPromise

+    /**
 +     * Used for testing and legacy integration.
 +     * This turns an async promise into a blocking promise.
 +     * @return blocking promise
 +     */
 +    default Promise<T> invokeAsBlockingPromise() {
 +        Promise<T> blockingPromise = Promises.blockingPromise();
 +        this.invokeWithPromise(blockingPromise);
 +        return blockingPromise;
 +    }
 +
 +
 +    /**
 +     * Used for testing and legacy integration.
 +     * This turns an async promise into a blocking promise.
 +     * @param duration duration
 +     * @return blocking promise
 +     */
 +    default Promise<T> invokeAsBlockingPromise(Duration duration) {
 +        Promise<T> blockingPromise = Promises.blockingPromise(duration);
 +        this.invokeWithPromise(blockingPromise);
 +        return blockingPromise;
 +    }

Resolver interface

Problem: Promise is a Result, a Consumer, and is a Callback so it has many methods besides resolve, then, and catchError which make it a bit confusing which methods you want to use when using invoke method.


Handler methods (PromiseHandle)
catchError returns promise handle 
then returns promise handle 
thenSafe returns promise handle 
invoke returns void
asPromise returns promise

Resolver methods (CallbackHandle)
resolve returns void
reject  returns void
asPromise returns promise.


Promises.invoker takes a PromiseResolver consumer and returns a PromiseHandle.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.