Customized Observable base class
let's create an abstraction EdgeChain
on top of RxJava3, which will have customized versions of map, zip, and subscribe functions. We'll create two subclasses of EdgeChain
, called MRKLChain
and ReactChain
, with different implementations for parsing. We'll also create an EndPoint
class to encapsulate the API endpoint details.
First, let's define the EdgeChain
class:
import io.reactivex.rxjava3.core.Observable;
public abstract class EdgeChain<T> {
protected Observable<T> observable;
protected EdgeChain(Observable<T> observable) {
this.observable = observable;
}
public abstract <R> EdgeChain<R> transform(Function<T, R> mapper);
public abstract <R> EdgeChain<R> combine(BiFunction<T, T, R> zipper);
public abstract void execute(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
}
Customized Endpoint base class
Now, let's create the EndPoint
class:
public class EndPoint {
private String url;
private int maxRetries;
private BackoffStrategy backoffStrategy;
public EndPoint(String url, int maxRetries, BackoffStrategy backoffStrategy) {
this.url = url;
this.maxRetries = maxRetries;
this.backoffStrategy = backoffStrategy;
}
// Getters and setters
}
Example implementations
This implementation allows you to create customized observable abstractions with different parsing implementations in the subclasses. The EndPoint
class encapsulates the API endpoint details, and the Chain
class reads from the EndPoint
class to call the actual URL. The Chain
class has an optional EndPoint
class inside, and its behavior depends on whether it's declared inside, mandatory but empty, or not necessary.
Now, you can create instances of the MRKLChain
and ReactChain
classes and use their transform
, combine
, and execute
methods to perform the required operations. Remember to provide an EndPoint
instance with the required API endpoint details, and the number of retries and backoff strategy.
let's create the MRKLChain
subclass:
import io.reactivex.rxjava3.functions.Function;
public class MRKLChain extends EdgeChain<String> {
private EndPoint endPoint;
public MRKLChain(Observable<String> observable, EndPoint endPoint) {
super(observable);
this.endPoint = endPoint;
}
@Override
public <R> MRKLChain<R> transform(Function<String, R> mapper) {
return new MRKLChain<>(observable.map(mapper), endPoint);
}
@Override
public <R> MRKLChain<R> combine(BiFunction<String, String, R> zipper) {
return new MRKLChain<>(observable.zipWith(observable, zipper), endPoint);
}
@Override
public void execute(Consumer<? super String> onNext, Consumer<? super Throwable> onError) {
observable
.retryWhen(Retry.backoff(endPoint.getMaxRetries(), endPoint.getBackoffStrategy()))
.subscribe(onNext, onError);
}
}
Now, let's create the ReactChain
subclass:
import io.reactivex.rxjava3.functions.Function;
public class ReactChain extends EdgeChain<String> {
private EndPoint endPoint;
public ReactChain(Observable<String> observable, EndPoint endPoint) {
super(observable);
this.endPoint = endPoint;
}
@Override
public <R> ReactChain<R> transform(Function<String, R> mapper) {
return new ReactChain<>(observable.map(mapper), endPoint);
}
@Override
public <R> ReactChain<R> combine(BiFunction<String, String, R> zipper) {
return new ReactChain<>(observable.zipWith(observable, zipper), endPoint);
}
@Override
public void execute(Consumer<? super String> onNext, Consumer<? super Throwable> onError) {
observable
.retryWhen(Retry.backoff(endPoint.getMaxRetries(), endPoint.getBackoffStrategy()))
.subscribe(onNext, onError);
}
}
Combined Example
Let's create an example where two chains, MRKLChain
and ReactChain
, are used together with transform
, combine
, and execute
. We'll use a simple transformation function to parse and reformat the text in each chain. We'll also use the forkJoin
operator from RxJava3 to combine the results of the two chains before passing them to another instance of the chain.
First, let's create some sample API endpoints and transformation functions:
EndPoint endPoint1 = new EndPoint("https://api.example.com/data1", 3, BackoffStrategy.exponential());
EndPoint endPoint2 = new EndPoint("https://api.example.com/data2", 3, BackoffStrategy.exponential());
Function<String, List<String>> mrklParser = text -> Arrays.asList(text.split("\\s+"));
Function<String, List<String>> reactParser = text -> Arrays.asList(text.split("[\\r\\n]+"));
Next, let's create instances of MRKLChain
and ReactChain
:
MRKLChain mrklChain = new MRKLChain(Observable.just("sample text for MRKLChain"), endPoint1);
ReactChain reactChain = new ReactChain(Observable.just("sample text\nfor ReactChain"), endPoint2);
Now, let's transform the data, combine the results using forkJoin
, and execute the combined chain:
Observable<List<String>> mrklTransformed = mrklChain.transform(mrklParser).observable;
Observable<List<String>> reactTransformed = reactChain.transform(reactParser).observable;
Observable<List<String>> combined = Observable.combineLatest(mrklTransformed, reactTransformed, (mrklData, reactData) -> {
List<String> result = new ArrayList<>(mrklData);
result.addAll(reactData);
return result;
});
MRKLChain combinedChain = new MRKLChain(combined, endPoint1);
combinedChain.execute(
result -> System.out.println("Combined result: " + result),
error -> System.err.println("Error: " + error)
);
In this example, the MRKLChain
and ReactChain
instances are created with sample text and API endpoints. The transformation functions, mrklParser
and reactParser
, are used to parse and reformat the text. The forkJoin
operator is used to combine the results of the two chains before passing them to another instance of the chain (combinedChain
). The execute
method is used to process the combined result or handle any errors.
This demonstrates how to use the customized observable abstractions to call external APIs, parse and reformat the result, and pass the reformatted data to another instance of the abstraction for further processing.
Putting it inside Spring Boot Webflux
To integrate the EdgeChain
abstractions into a Spring Boot Webflux application, you can create a Spring Webflux controller for each chain and expose them as RESTful API endpoints. By doing this, you can call the chain endpoints from your code or any other external clients, without worrying about whether they are executing on your local machine or a remote endpoint.
Let's continue using RxJava3 with Spring Boot Webflux. You can use Observable
directly in the controller and return a Mono
by converting the Observable
using the from
method. This way, we can continue using RxJava3, and the extra configuration needed is minimal.
First, let's create a ChainController
class that will handle the incoming requests and delegate the processing to the appropriate chain instances:
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
@RestController
@RequestMapping("/chains")
public class ChainController {
@PostMapping("/mrkl")
public Mono<ResponseEntity<String>> processMRKLChain(@RequestBody String input) {
// Create MRKLChain instance and process the input
// Return the result as a Mono<ResponseEntity<String>>
}
@PostMapping("/react")
public Mono<ResponseEntity<String>> processReactChain(@RequestBody String input) {
// Create ReactChain instance and process the input
// Return the result as a Mono<ResponseEntity<String>>
}
}
Now, let's implement the processMRKLChain
and processReactChain
methods in the ChainController
. You can keep the execute
method in the EdgeChain
, MRKLChain
, and ReactChain
classes as they were before:
@PostMapping("/mrkl")
public Mono<ResponseEntity<String>> processMRKLChain(@RequestBody String input) {
EndPoint endPoint = new EndPoint("https://api.example.com/data1", 3, BackoffStrategy.exponential());
MRKLChain mrklChain = new MRKLChain(Observable.just(input), endPoint);
return Mono.from(mrklChain.transform(mrklParser).observable)
.map(result -> ResponseEntity.ok().body(result))
.onErrorResume(e -> Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error: " + e.getMessage())));
}
@PostMapping("/react")
public Mono<ResponseEntity<String>> processReactChain(@RequestBody String input) {
EndPoint endPoint = new EndPoint("https://api.example.com/data2", 3, BackoffStrategy.exponential());
ReactChain reactChain = new ReactChain(Observable.just(input), endPoint);
return Mono.from(reactChain.transform(reactParser).observable)
.map(result -> ResponseEntity.ok().body(result))
.onErrorResume(e -> Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error: " + e.getMessage())));
}
In this implementation, we create MRKLChain
and ReactChain
instances and process the input using the transform
method. The result is an Observable
, which is then converted to a Mono
using the Mono.from
method. This approach allows you to keep using RxJava3 in your Spring Boot Webflux application and expose the chains as RESTful API endpoints. When you call these endpoints from your code or an external client, they will process the input using the corresponding chain and return the result.
cleaning up response classes, etc. Pure cosmetics
Sure, we can create a custom class ArkResponse
that wraps the Mono<ResponseEntity<String>>
and provides a method to create an instance of ArkResponse
from an Observable<String>
.
Here's the updated ArkResponse
class:
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import reactor.core.publisher.Mono;
import io.reactivex.rxjava3.core.Observable;
public class ArkResponse {
private final Mono<ResponseEntity<String>> response;
private ArkResponse(Mono<ResponseEntity<String>> response) {
this.response = response;
}
public static ArkResponse fromObservable(Observable<String> observable) {
Mono<ResponseEntity<String>> mono = Mono.from(observable)
.map(result -> ResponseEntity.ok().body(result))
.onErrorResume(e -> Mono.just(ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Error: " + e.getMessage())));
return new ArkResponse(mono);
}
public Mono<ResponseEntity<String>> getResponse() {
return response;
}
}
Now, let's update the ChainController
to return ArkResponse
instead of Mono<ResponseEntity<String>>
:
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/chains")
public class ChainController {
@PostMapping("/mrkl")
public ArkResponse processMRKLChain(@RequestBody String input) {
// Create MRKLChain instance and process the input
// Return the result as an ArkResponse
}
@PostMapping("/react")
public ArkResponse processReactChain(@RequestBody String input) {
// Create ReactChain instance and process the input
// Return the result as an ArkResponse
}
}
Update the processMRKLChain
and processReactChain
methods in the ChainController
to use ArkResponse.fromObservable
:
@PostMapping("/mrkl")
public ArkResponse processMRKLChain(@RequestBody String input) {
EndPoint endPoint = new EndPoint("https://api.example.com/data1", 3, BackoffStrategy.exponential());
MRKLChain mrklChain = new MRKLChain(Observable.just(input), endPoint);
return ArkResponse.fromObservable(mrklChain.transform(mrklParser).observable);
}
@PostMapping("/react")
public ArkResponse processReactChain(@RequestBody String input) {
EndPoint endPoint = new EndPoint("https://api.example.com/data2", 3, BackoffStrategy.exponential());
ReactChain reactChain = new ReactChain(Observable.just(input), endPoint);
return ArkResponse.fromObservable(reactChain.transform(reactParser).observable);
}
With this implementation, the ArkResponse
class wraps the Mono<ResponseEntity<String>>
and the ChainController
methods return ArkResponse
instances, reducing verbosity in the controller. To access the underlying Mono<ResponseEntity<String>>
when needed, you can use the getResponse()
method provided by the ArkResponse
class.
tying it all together
In this example, we'll use two chains, MRKLChain
and ReactChain
, to transform and reformat the input text using simple transformation functions. Then, we'll combine the results of these two chains using the forkJoin
operator from RxJava3 and pass the combined result to another instance of the chain.
First, let's create simple transformation functions for MRKLChain
and ReactChain
. We'll use a lambda expression to define these functions:
Function<String, String> mrklParser = input -> "MRKL: " + input.toUpperCase();
Function<String, String> reactParser = input -> "React: " + input.toLowerCase();
Now, let's create instances of MRKLChain
and ReactChain
and apply the transformation functions using the transform
method:
EndPoint endPoint1 = new EndPoint("https://api.example.com/data1", 3, BackoffStrategy.exponential());
MRKLChain mrklChain = new MRKLChain(Observable.just("Hello MRKL"), endPoint1);
Observable<String> mrklTransformed = mrklChain.transform(mrklParser).observable;
EndPoint endPoint2 = new EndPoint("https://api.example.com/data2", 3, BackoffStrategy.exponential());
ReactChain reactChain = new ReactChain(Observable.just("Hello React"), endPoint2);
Observable<String> reactTransformed = reactChain.transform(reactParser).observable;
Next, let's use the forkJoin
operator from RxJava3 to combine the results of the two chains:
import io.reactivex.rxjava3.core.Observable;
Observable<String> combined = Observable.forkJoin(mrklTransformed, reactTransformed, (mrklResult, reactResult) -> mrklResult + " | " + reactResult);
Now, we'll create another instance of the chain (e.g., MRKLChain
) and pass the combined result to it. We'll also apply a transformation function that concatenates a string to the result:
Function<String, String> combinedParser = input -> "Combined: " + input;
MRKLChain combinedChain = new MRKLChain(combined, endPoint1);
Observable<String> combinedTransformed = combinedChain.transform(combinedParser).observable;
Finally, let's create a new endpoint in the ChainController
that uses the combined chain:
@PostMapping("/combined")
public ArkResponse processCombinedChain() {
return ArkResponse.fromObservable(combinedTransformed);
}
In this example, we created instances of MRKLChain
and ReactChain
, applied simple transformation functions to reformat the input text, combined the results using the forkJoin
operator, and passed the combined result to another instance of the chain. We also added a new endpoint in the ChainController
to handle the combined chain.