Coder Social home page Coder Social logo

rhapsody's Introduction

Rhapsody

License

Rhapsody is an asynchronous event processing library that builds on the Reactive Streams Specification to deliver reusable functionalities under the following categories:

  • At-Least-Once Processing
  • Quality of Service
  • Observability

While delivering features that fall in to the above categories, Rhapsody aims to maintain the inherent developmental attributes afforded by the Reactive Streams Specification:

Where appropriate, Rhapsody is backed by Project Reactor to build Transformers and inject side-effect behaviors (like Logging and Metric collection).

Background

Rhapsody started off as an Inner Source library at Vrbo, an Expedia Group brand. At time-of-inception, Vrbo was heavily expanding its usage of Kafka as the de facto asynchronous eventing system of choice.

The need for an alternative asynchronous event processing framework arose out of Vrbo's growing diversity of event processing use cases. Traditionally available libraries, like Kafka Streams, were not quite flexible enough to support the superset of functionalites that developers were trying to build on top of Kafka. In particular, traditional event processing libraries make assumptions about what infrastructures are in play, what processing topologies should be available, and how easy it should be to design for failure. Vrbo's needs for event processing use cases include(d) heterogeneous infrastructure endpoints (like RabbitMQ-to-Kafka), I/O-bound processing (requires API or Database interaction), Extended Architecture (XA) / Two Phase Commit Processing, Batch Processing, Event Deduplication, and others.

And so, Rhapsody was born to attempt providing an event processing framework that provides at-least-once processing (such as what we get from other traditional streaming libraries), arbitrary parallelism (to address I/O-bound processing scalability), and arbitrary interoperability with today's and the future's streaming infrastructures.

Basics

Before getting started, it's important to note that Rhapsody does not aim to be a replacement for any fluent Reactive Streams libraries. The goal of Rhapsody is to build on existing Reactive Streams implementations and provide functionality that primarily addresses usage with infinite asynchronous flows of data while adhering to the Reactive Streams API. In particular, Rhapsody heavily integrates with Project Reactor and related projects, like Reactor Kafka and Reactor RabbitMQ, to avoid re-implementation of existing Publisher and Subscriber implementations.

Why use Reactive Streams?

When it comes to developing asynchronous processes, developers are typically faced with two basic questions:

  1. What infrastructure(s) are to be used
  2. What libraries will be used to interface with the chosen infrastructure(s)

In the absence of an abstraction for asynchronous processing, it is common for developers to inadvertently lock their systems into a particular infrastructure or infrastructure-specific framework.

Reactive Streams provides precisely this abstraction for asynchronous processing, allowing developers the flexibility to switch out infrastructures as overriding requirements evolve over time. Additionally, the abstraction lends itself to easier testing.

Project Reactor

We highly recommend getting familiar with Project Reactor via its Learning Resources if you are not already familiar with Reactive Streams or any of its implementations.

At-Least-Once Processing

At-least-once processing is typically a table stakes requirement in practical asynchronous use cases. The key abstraction around which Rhapsody builds at-least-once processing is Acknowledgeable. The goal behind Acknowledgeability is to restore a lightweight form of the bi-directional communication in control flow that we lose when moving from synchronous to asynchronous code.

In synchronous control flows, we have the (dubious) benefit of being able to tightly couple the processing of any given input/event/message to its successful completion or abnormal termination (Error/Exception). The same goes for asynchronous control flows where there is neither the presence of backpressure or thread boundaries. In either case, when the controlling thread completes the processing/emission of a unit of data without erroneous termination, there is a reasonable implication that the corresponding data has been successfully processed, or, at worst, any Errors resulting from the processing of that data were gracefully handled. An opposite, mutually-exclusive implication is made when that processing results in an Error/Exception being raised/thrown.

In contrast, asynchronous control flows that may incorporate backpressure and/or arbitrary numbers of asynchronous boundaries do not typically have a semantic for communicating "successful completion" or "abnormal termination" to the emission sources of processed data. Acknowledgeability aims to address this by providing "channels" for Acknowledgement and Nacknowledgement (negative acknowledgement) that are logically coupled to the originating data emitted by a Publisher and propagated with that data's downstream transformations. For example, negatively acknowledging the processing/transformation of a Kafka Record allows us to emit the corresponding Error (and hence not committing past its offset) and subsequently resubscribe such that the Error-inducing Record is eventually reprocessed.

Quality of Service

Rhapsody has incrementally evolved to include commonly desired Quality of Service functionalities, like Rate Limiting, Deduplication, and Maximum In-Flight Acknowledgeability. Like most of the features provided by Rhapsody, these are implemented as Publisher Transformers.

Observability

Rhapsody aims to provide observability in to Reactive Streams by leveraging existing Project Reactor integrations and integrating with standard observability APIs, like SLF4J for logging, Micrometer for metrics, and OpenTracing

Getting Started

Rhapsody is a Java library that requires JDK 1.8+ for building and integration.

Building

Rhapsody is built using Maven. Installing Maven locally is optional as you can use the Maven Wrapper:

./mvnw clean verify

Usage

Check out the Samples to see Rhapsody in action

Contributing

Please refer to CONTRIBUTING for information on how to contribute to Rhapsody

Legal

This project is available under the Apache 2.0 License.

Copyright 2020 Expedia, Inc.

rhapsody's People

Contributors

dependabot[bot] avatar nikos912000 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

rhapsody's Issues

Subscriber metric count is not durable

Describe the bug

The MetricsTransformer uses a Gauge to keep track of Subscriber count. I've noticed that if the associated Flux is canceled and resubscribed, the count is lost.

To Reproduce

Use the MetricsTransformer in a running application; Unsubscribe the stream. The count will not be accurate. Resubscribe and the count might be "accurate" for a while before the weak reference to the no-longer accessible AtomicInteger is garbage collected

Expected behavior

Count should be accurate across resubscriptions

Project Cannot be Built with JDK11

Describe the bug

Project does not build with JDK 11

To Reproduce

Use JDK 11 to build the project

Expected behavior

Project should build successfully

Additional context

Looks like we need to add JAXB API and Runtime for testing purposes

Acknowledgeable should be able to handle Consumers that throw

Describe the feature

There's currently several methods on Acknowledgeable for handling filtering, mapping, and consuming. One new case I've run in to is the need to consume a data item with a method that throws a checked Exception.

Rational

Rather than having to deal with wrapping/unwrapping Exceptions, it should be possible to just nacknowledge an Acknowledgeable with any thrown Throwable.

Example Scenario

I want to consume Acknowledgeable<T> with the following:

public void doSomething(T t) throws Throwable {
    throw new IOException();
}

as:

try {
    acknowledegable.throwingConsume(t -> doSomething(t), Acknowledgeable::acknowledge);
} catch (Throwable error) {
    acknowledgeable.nacknowledge(error);
}

Use Maven Wrapper for local build

It'd be handy to introduce a Maven Wrapper (example) in order to avoid issues due to different versions of Maven.

I've done these changes so I'll raise the PR for this and ref this issue.

WorkBufferer may drop items under heavy load

Describe the bug

Under heavy load, WorkBufferer may drop items, leading to eventual hanging

To Reproduce

See example code on how to reproduce dropped items with FluxGroupBy here: reactor/reactor-core#2352

Will follow up here with example that shows WorkBufferer example Example with WorkBufferer:

    @Test
    public void workIsNotDroppedUnderHeavyLoad() throws Exception {
        AtomicLong upstream = new AtomicLong(0L);
        AtomicLong downstream = new AtomicLong(0L);
        CountDownLatch latch = new CountDownLatch(1);

        Flux.fromStream(Stream.iterate(randomLong(BUFFER_CONFIG.getBufferConcurrency()), last -> randomLong(BUFFER_CONFIG.getBufferConcurrency())))
            .map(number -> TestWork.create(WorkType.INTENT, Long.toString(number)))
            .flatMap(intent -> Flux.concat(
                Mono.just(intent),
                Mono.just(TestWork.create(WorkType.COMMIT, intent.workHeader().subject()))
                    .delayElement(randomDuration(BUFFER_CONFIG.getBufferDuration().multipliedBy(2)))),
                BUFFER_CONFIG.getBufferConcurrency())
            .take(Duration.ofSeconds(10L))
            .doOnNext(next -> upstream.incrementAndGet())
            .transform(WorkBufferer.identity(BUFFER_CONFIG))
            .doOnNext(buffer -> {
                // Mimic real-world computationally-bound processing overhead
                long startNano = System.nanoTime();
                while (System.nanoTime() - startNano < 1_000_000) ;
            })
            .map(Collection::size)
            .subscribe(downstream::addAndGet, System.err::println, latch::countDown);

        latch.await();
        assertEquals(upstream.get(), downstream.get());
        System.out.println("Emitted: " + downstream.get());
    }

Expected behavior

No Work should be dropped

Additional context

While a bug with FluxGroupBy overflow seems to have been identified, I think there is an inherent bug with WorkBufferer code that can't be solved without a code change. Simply put, premature cancellation of GroupedFlux from FluxGroupBy that can happen concurrently with FluxGroupBy publishing (either due to downstream request or upstream emission) simply cannot mitigate race condition between GroupedFlux cancellation and emission, and therefore is susceptible to dropping items.

Note that same solution will need to be applied to DeduplicatingTransformer

Consider Repackaging Under `com.expediagroup.reactivestreams` vs. `com.expediagroup.rhapsody`

Currently, all code is packaged under com.expediagroup.rhapsody. For a project name, Rhapsody seems fine (if not possibly arbitrary and/or vague in meaning), but, according to Clean Code principles, names used in code should be expressive.

Avoiding code packaging under the Project name makes possible future renames of the project much simpler, as users would only need to change the target of their build dependencies and not have to make code changes.

This is, of course, a backward incompatible change, so, if decided to do so, should be done while this project is still nascent

NoSuchMethodError from AbstractAllOrNothingPartitionAssignor when running with Java < 9

Describe the bug

When trying to use (an extension of) AbstractAllOrNothingPartitionAssignor, if running with Java 8, a NoSuchMethodError is thrown:

reactor.core.scheduler.Schedulers: KafkaScheduler worker in group main failed with an uncaught exception
! java.lang.NoSuchMethodError: java.nio.ByteBuffer.flip()Ljava/nio/ByteBuffer;
! at com.expediagroup.rhapsody.kafka.partitioning.AbstractAllOrNothingPartitionAssignor.deserializeBornEpochMilli(AbstractAllOrNothingPartitionAssignor.java:101)
! at com.expediagroup.rhapsody.kafka.partitioning.AbstractAllOrNothingPartitionAssignor.extractBornEpochMilli(AbstractAllOrNothingPartitionAssignor.java:95)

To Reproduce

Use AbstractAllOrNothingPartitionAssignor in a Java 8 environment

Expected behavior

An Error should not be thrown when using an extension of AbstractAllOrNothingPartitionAssignor

Additional context

https://stackoverflow.com/questions/61267495/exception-in-thread-main-java-lang-nosuchmethoderror-java-nio-bytebuffer-flip

Bump to Newer Versions of Kafka and Confluent

Describe the feature

We're currently on older versions of Kafka and Confluent, 2.0.0 and 4.1.3, respectively. These are fairly old versions at around ~1.5 years old (at time of issue creation).

Let's at least bump these to 2.2.1 and 5.2.1, respectively to be less than a year old on them. Note that I'm initially targeting these versions to avoid other required dependency bumps on things like Jackson.

Note: May also require going from kafka_2.11 to kafka_2.12 for testing purposes

Rational

Let's try not to be too old on dependencies

Move Dropwizard-Specific code in to `dropwizard` Module

Describe the feature

There is currently some Dropwizard-specific code for Metrics that depends on Codahale that would likely be cleaner in a Dropwizard-specific Module.

In addition, we should be able to move some Dropwizard Bundle code that we've found useful with Expedia that wires Stanzas in to the Server LifeCycle

Rational

Since Rhapsody is a streaming framework for web services, it makes sense to have minimal resources that play nice with common web frameworks, like Dropwizard (and, Spring, etc.)

Example Scenario

If I'm using a Dropwizard application, it would be nice to pull in a Rhapsody Dropwizard module and immediately have an idea of the classes I might want to use

Stanza Framework

Describe the feature

Would be nice to have "stanza" templates in place such that Flows can be started and stopped, and, by default, have streams instrumented with resubscription, rate limiting, and metrics

Rational

Rhapsody is a lightweight decoration framework, and this type of wire framing has proved useful before in the Vrbo ecosystem

Example Scenario

I want to add a new stream to a Dropwizard application. I simply extend an existing Stanza, add my streaming logic, and it is already instrumented with QoS and observability

Add Samples for Dropwizard

Describe the feature

We currently have samples for "core" functionality. If we're going to be adding Dropwizard code (#41), then we should have samples for that once available

Rational

Show people the intended patterns of usage between Rhapsody and Dropwizard

Implement Non-blocking Work Preparation

Describe the feature

Add the abstractions and base implementations to allow for non-blocking (two-phased) Work preparation.

Rational

Currently, Work preparation is constrained to be blocking. This is due to WorkBufferTranslator using blocking abstractions of WorkPreparer and FailureConsumer, which are blocking interfaces.

It should be possible to implement (and not refactor, which would be a breaking change) a fully reactive version of Work Buffer preparation that uses reactive definitions of the interfaces this process relies on.

Example Scenario

This would allow major efficiency improvements in cases where reactive implementations of the processes are possible, thus reducing the need for inefficient scaling solutions, like high parallelism with a high number of threads.

Upgrade to Reactor >3.4

Describe the feature

Currently using a version of Reactor that's over a year old. Would be good to upgrade to latest 3.4.x train

Rational

Don't get so behind in versions that it's difficult to upgrade

Example Scenario

I want to use latest features from Reactor, like Sinks, but Rhapsody ties me to an old version

Tracing Span is not active when Acknowledgeable is `published`

Describe the bug

On Acknowledgeable methods like filter, map, etc., TracingAcknowledgeble activates the span associated with its data item. This allows for tracing operations to be applied when the passed Predicates/Functions/etc are executed.

This is not currently done for Acknowledgeable::publish which prohibits propagating the Span with possibly-async operations

To Reproduce

Apply publish method on TracingAcknowledgeable extension. Its Span is not active within the execution of the publishing function

Expected behavior

The Span contained by the TracingAcknowledgeable should be active in the passed publishing function

BOM module is inheriting from base pom

Describe the bug

The BOM module is inheriting from the base pom which means it's inheriting the base pom's dependency management section. This prohibits intended usage of importing the bom pom.

To Reproduce

Import the bom pom. You'll have more in the dependency management section than just what's declared in the bom pom

Expected behavior

Only dependencies explicitly listed in the bom pom should be imported

Additional context

See how many dependencies are listed in the bom pom: https://mvnrepository.com/artifact/com.expediagroup/rhapsody-bom/0.5.0

Upgrade to OpenTracing 0.33.0

Describe the feature

OpenTracing 0.33.0 looks to be the last-to-be-released version of OpenTracing. OpenTelemetry looks to be how tracing (and possibly also metrics) will be interfaced in the future.

Rational

We already have an issue to migrate to OpenTelemetry in the future. OpenTelemetry looks to use most interfaces (and breaking changes) from OpenTracing 0.33.0

To make that upgrade easier, let's use OpenTracing 0.33.0

Could use a StanzaBundle that integrates with JMX

Describe the feature

It would be nice to have a StanzaBundle that integrates with JMX

Rational

Especially in cases where an application may have more than one StanzaBundle, it is beneficial to be able to start and stop certain Stanzas without having to redeploy

Example Scenario

I have an application where multiple Stanzas are logically daisy-chained together. It would be ideal to be able to stop the first Stanza in that chain in order to clear out the pipeline

Inconsistent max-in-flight configurations

This is less of a bug, and more just pointing out inconsistency...

KafkaFluxFactory uses ReceiverAcknowledgementStrategy to figure out how many "max in flight" records are allowed (messages which are not yet acknowledged). This configuration is currently called max.in.flight.per.topic.partition. While the "max.in.flight" portion is accurate, the "per.topic.partition" is misleading.

Disclaimer: The reason it's called this is b/c I originally intended it to be a per-topic-partition restriction, but this proved to not be implementable.

KafkaSenderFactory also has a max.in.flight configuration that controls how many records are allowed to be in flight per sent publisher.

It would be better to make these configurations clearer, especially when the same Kafka configuration is shared between a publisher and a subscriber.

Thinking:
receiver.max.in.flight.per.subscription - Configuration for Receiver-backed Kafka Flux which controls the max number of in-flight unacknowledged records per subscription
sender.max.in.flight.per.send - Configuration for Sender-backed Kafka sends which controls the max number of in-flight records per sent Publisher

Enable Stanza state deduction

Describe the feature

We've come across a need to expose Stanza control to JMX. Exposing any Stanza to external control requires knowing what the current state of the Stanza is. We can deduce the state of the Stanza based on what the current Disposable is set to and whether or not the Disposable itself is disposed.

Rational

Provides bare minimum insight in to Stanza status.

Example Scenario

If I wrap my Stanza in an MBean and bind to JMX, I should be able to know the "running state" of the Stanza, in addition to being able to start and stop it.

License not recognised by GitHub

Describe the bug

The license is still not recognised by GitHub which is a bit annoying!

To Reproduce

See the license section in the repo's main page.

Expected behavior

License shows as Apache 2.0.

Additional context

Possible reasons:

  • Since there was a bit missing from the Apache license, GitHub might be using a cached version.
  • We've both a LICENSE and a LICENSE_HEADER file which is possibly causing issues. GitHub's using a custom installation of licensee but not sure what kind of name checks they apply. A quick workaround would be to rename the LICENSE_HEADER.

Any chance to wipe out the existing one and add the Apache from the GitHub UI?

Security and Deprecation Warnings

  • GitHub is currently warning about a security problem by depending on qpid 6.1.4, with remediation being Upgrade org.apache.qpid:qpid-broker to version 6.1.5
  • We're using deprecated methods in certain tests

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.