Coder Social home page Coder Social logo

scalecube / scalecube-gateway Goto Github PK

View Code? Open in Web Editor NEW
18.0 20.0 7.0 855 KB

ScaleCube API Gateway is the single entry point for service consumers. handles incoming requests and proxy/route to the appropriate microservice instance.

License: Apache License 2.0

Java 97.80% HTML 2.20%

scalecube-gateway's Introduction

scalecube-gateway

ScaLecube API Gateway allows service consumers interact with scalecube microservices cluster.

image

Read about it here:

API-Gateway:

Available api-gateways are rsocket, http and websocket

Basic API-Gateway example:

    Microservices.builder()
        .discovery(options -> options.seeds(seed.discovery().address()))
        .services(...) // OPTIONAL: services (if any) as part of this node.

        // configure list of gateways plugins exposing the apis
        .gateway(options -> new WebsocketGateway(options.id("ws").port(8080)))
        .gateway(options -> new HttpGateway(options.id("http").port(7070)))
        .gateway(options -> new RSocketGateway(options.id("rsws").port(9090)))
        
        .startAwait();
        
        // HINT: you can try connect using the api sandbox to these ports to try the api.
        // http://scalecube.io/api-sandbox/app/index.html

Service API-Gateway providers:

releases: https://github.com/scalecube/scalecube-services/releases

io.scalecube scalecube-services-transport-jackson ${scalecube.version} io.scalecube scalecube-services-transport-protostuff ${scalecube.version} io.scalecube scalecube-services-discovery ${scalecube.version}

scalecube-gateway's People

Contributors

aharonha avatar artem-v avatar dependabot[bot] avatar dmytro-lazebnyi avatar io-scalecube-ci avatar ronenhamias avatar segabriel avatar snripa avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

scalecube-gateway's Issues

NPE when sending bad request with null qualifier

Motivation

Get rid of huge stack trace with NPE when sending bad request.

Description

NPE when sending bad request: e.g sid=null and qualifier=null. Then following stack shall happen with NullPointerException:

at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:334) ~[reactor-core-3.4.5.jar:3.4.5]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:334) ~[reactor-core-3.4.5.jar:3.4.5]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:334) ~[reactor-core-3.4.5.jar:3.4.5]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onError(FluxPeekFuseable.java:553) ~[reactor-core-3.4.5.jar:3.4.5]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onError(MonoPeekTerminal.java:250) ~[reactor-core-3.4.5.jar:3.4.5]
	at io.scalecube.services.gateway.ws.WebsocketGatewayAcceptor.lambda$onRequest$15(WebsocketGatewayAcceptor.java:178) ~[scalecube-services-gateway-netty-2.10.8.jar:?]
	at io.scalecube.services.gateway.ws.GatewayMessages.toErrorResponse(GatewayMessages.java:44) ~[scalecube-services-gateway-netty-2.10.8.jar:?]
	at io.scalecube.services.exceptions.DefaultErrorMapper.toMessage(DefaultErrorMapper.java:66) ~[scalecube-services-api-2.10.18.jar:?]
	at io.scalecube.services.api.ServiceMessage.error(ServiceMessage.java:70) ~[scalecube-services-api-2.10.18.jar:?]
	at io.scalecube.services.api.ServiceMessage$Builder.qualifier(ServiceMessage.java:311) ~[scalecube-services-api-2.10.18.jar:?]

In HttpGatewayAcceptor on client close or disconnect stream still hangs

At HttpGatewayAcceptor when client is being connected and not yet received response, at this moment client can disconnect or connection can be closed but the stream of service call is being left untouched, i.e. client disappears but resource is occupied.
On http client close we must dispose stream produced by service call and free resources on service side.

Add null check on gateway session handler

if override mapMessage and it returns null as a result, we have no notification why it stops working (no responses)

  @Override
  public ServiceMessage mapMessage(
      GatewaySession session, ServiceMessage message, Context context) {
    return null;
  }

Add tests on gateway templates

We had a not working gateway runner during a few weeks (#420). Need to add tests on gateway templates with the client-SDK.

Use NonBlockingHashMapLong instead of Map on websocket sessions

In io.scalecube.services.gateway.ws.WebsocketGatewaySession
line 27 private final Map<Long, Disposable> subscriptions = new NonBlockingHashMapLong<>(1024); change Map -> NonBlockingHashMapLong.

In io.scalecube.services.gateway.transport.websocket.WebsocketSession
lines 40-41 private final Map<Long, Processor<ServiceMessage, ServiceMessage>> inboundProcessors = new NonBlockingHashMapLong<>(1024); change Map -> NonBlockingHashMapLong.

Remove GatewayClientSettings, create more flexible configuration api

Motivation

io.scalecube.services.gateway.transport.GatewayClientSettings becomes less and less flexible. It was expected to be generic settings class for any transport. But with time it becomes more clear that it was mistake to think in this direction, because it's impossible to have transport-agnostic configuration.
It's already clear that followRedirect is for http transport (so we leaked http to configuration => configration is not generic), keepAliveInterval is for websocket transport (so we leaked wesocket to configuration level => so configuration is not generic).

Plus new settings are comming: for http it has to be possible to specify custom ConnectionProvider (.newConnection() or .builder() for settings such as maxConnections maxIdleTime maxLifeTime pendingAcquireTimeout)(io.scalecube.services.gateway.transport.http.HttpGatewayClient line 43), for websocket client must have ability to specify settings for reactor.netty.http.client.WebsocketClientSpec (because client very much might want to setup maxFramePayloadLength protocols proxyPing compress).

Expected updates

  • Remove of GatewayClientSettings.
  • Corresp. configration settings must start to live on corresp. transport client classes.

OutOfDirectMemoryError (memory leak?)

ResourceLeakDetector LEAK
java.lang.Thread.run(Thread.java:748) [sc-cluster-io-epoll-1]
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:333)
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:432)
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:781)
io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75)
io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114)
io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:349)
Created at:
Recent access records:
E 2019-09-09T10:04:18,667 i.n.u.ResourceLeakDetector LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
OutOfDirectMemoryError
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906) [netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:333) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:432) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:781) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
at io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) ~[netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
at io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) ~[netty-transport-native-unix-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:342) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:147) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
at io.netty.buffer.PoolArena.allocate(PoolArena.java:215) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:245) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:748) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
at io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:772) ~[netty-buffer-4.1.36.Final.jar:4.1.36.Final]
at io.netty.util.internal.PlatformDependent.allocateDirectNoCleaner(PlatformDependent.java:622) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:667) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 byte(s) of direct memory (used: 251658247, max: 259522560)
E 2019-09-09T08:24:00,768 r.n.t.TcpServer [id: 0x6c2f3449, L:/xxxxx:8080 - R:/yyyyy:51740] onUncaughtException(SimpleConnection{channel=[id: 0x6c2f3449, L:/xxxxx:8080 - R:/yyyyy:51740]}) [http-gateway-epoll-1]

Get rid of io.scalecube.services.gateway.GatewayMetrics

  • Dependency on codahale:
    Prod code must not include codahale.
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
  • Naive implementation:
    All followign metrics could be provided without compiling into prod code.
  private final Counter connectionCounter;
  private final Meter requestMeter;
  private final Meter responseMeter;
  private final Meter serviceResponseMeter;

Change method signature for GatewaySessionHandler#onConnectionOpen

Change
io.scalecube.services.gateway.GatewaySessionHandler#onConnectionOpen
onConnectionOpen(... Map<String, List<String>> headers) -> onConnectionOpen(... Map<String, String> headers).
While it's formally correct to have multimap for headers, in realilty it's super ugly to deal with it. If multimapness will be really needed then client would tokenize by himself.

Fix annoying callback-not-implemented-exception

E 2019-11-01T15:33:34,112 r.c.p.Operators Operator called default onErrorDropped [websocket-gateway-client-epoll-1]
reactor.core.Exceptions$ErrorCallbackNotImplemented: ConnectionClosedException{errorMessage=Connection closed}
Caused by: io.scalecube.services.exceptions.ConnectionClosedException: Connection closed
	at io.scalecube.services.gateway.transport.websocket.WebsocketSession.lambda$null$1(WebsocketSession.java:84) ~[scalecube-services-gateway-client-transport-2.8.11.jar:?]
	at java.util.concurrent.ConcurrentMap.forEach(ConcurrentMap.java:114) ~[?:1.8.0_222]
	at io.scalecube.services.gateway.transport.websocket.WebsocketSession.lambda$new$2(WebsocketSession.java:83) ~[scalecube-services-gateway-client-transport-2.8.11.jar:?]
	at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:101) [reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
	at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:267) [reactor-netty-0.8.8.RELEASE.jar:0.8.8.RELEASE]
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:502) [netty-common-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:495) [netty-common-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:474) [netty-common-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:415) [netty-common-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:540) [netty-common-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:529) [netty-common-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:101) [netty-common-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1183) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:769) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:745) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:616) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.channel.epoll.EpollEventLoop.closeAll(EpollEventLoop.java:388) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:351) [netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906) [netty-common-4.1.36.Final.jar:4.1.36.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.36.Final.jar:4.1.36.Final]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Add deferWithContext in RSocket gateway

Subj.

We have a test disabled because of defer context not implemented in rsocket gw:
io.scalecube.services.gateway.rsocket.RSocketLocalGatewayAuthTest#testCallSecuredMethod_authenticated

  @Disabled("https://github.com/scalecube/scalecube-gateway/issues/121")
  void testCallSecuredMethod_authenticated() {
    // authenticate session
    extension.client().requestOne(createSessionReq(ALLOWED_USER), String.class).block(TIMEOUT);
    // call secured service
    final String req = "echo";
    StepVerifier.create(clientService.requestOne(req))
        .expectNextMatches(resp -> resp.equals(ALLOWED_USER + "@" + req))
        .expectComplete()
        .verify();
  }

In io.scalecube.services.gateway.http.HttpGatewayAcceptor get rid of perf headers

io.scalecube.services.gateway.http.HttpGatewayAcceptor

Prod code must not include perf headers.

  private static final String SERVICE_RECV_TIME = "service-recv-time";
  private static final String SERVICE_SEND_TIME = "service-send-time";
  private static final String CLIENT_RECV_TIME = "client-recv-time";
  private static final String CLIENT_SEND_TIME = "client-send-time";

Remove methods: io.scalecube.services.gateway.http.HttpGatewayAcceptor#enrichRequest and io.scalecube.services.gateway.http.HttpGatewayAcceptor#enrichResponse.

NPE on websocket gateway client

NPE:

java.lang.NullPointerException: null
	at io.scalecube.services.api.Qualifier.getQualifierNamespace(Qualifier.java:28) ~[scalecube-services-api-2.9.1-RC8.jar:?]
	at io.exberry.exchange.operation.gateway.GatewaySessionHandlerImpl.isOperationEvents(GatewaySessionHandlerImpl.java:151) ~[app.jar:?]
	at io.exberry.exchange.operation.gateway.GatewaySessionHandlerImpl.onResponse(GatewaySessionHandlerImpl.java:133) ~[app.jar:?]
	at io.scalecube.services.gateway.ws.WebsocketGatewaySession.lambda$send$3(WebsocketGatewaySession.java:104) ~[scalecube-services-gateway-netty-2.8.16-RC5.jar:?]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2317) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:137) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onSubscribe(MonoFlatMap.java:103) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:171) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoDefer

Presumably this is sig CANCEL and/or COMPLETE: io.scalecube.services.gateway.ws.GatewayMessages#newCancelMessage
io.scalecube.services.gateway.ws.GatewayMessages#newCompleteMessage those are two places where qualifier is explicitly not being set.

Proposed solution (@segabriel) -- store original request qualifier somewhere and when it's needed to send CANCEL and/or CANCEL then use that original saved somewhere message qualfier.

Write tests for `cancel gateway client subscription`

Motivation:
Verify that dispose on gateway client subscrioption shall result in closing logical stream on service (check local and remote modes).

Take similar tests (more or less) WebsocketClientConnectionTest, RsocketClientConnectionTest, HttpClientConnectionTest as a reference(s).

Unexpected message type

Stacktrace
E 2019-09-09T11:55:00,269 i.s.s.g.w.WebsocketGatewayAcceptor Exception occurred on request: GatewayMessage {headers: {q=/om2.exchange.auth/createSession, sessionId=1568023078647, sid=1}, data: bb-670}, session=1568023078647 [http-gateway-client-epoll-4]
... 25 more
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:89) ~[netty-codec-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.codec.http.HttpClientCodec$Encoder.encode(HttpClientCodec.java:167) ~[netty-codec-http-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:86) ~[netty-codec-http-4.1.36.Final.jar:4.1.36.Final]
Caused by: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpRequest, state: 1
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:338) ~[netty-transport-native-epoll-4.1.36.Final-linux-x86_64.jar:4.1.36.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.run(PooledConnectionProvider.java:566) ~[reactor-netty-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:470) ~[reactor-netty-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:441) ~[reactor-netty-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.core.publisher.MonoSource.subscribe(MonoSource.java:51) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:305) ~[reactor-netty-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at reactor.core.publisher.Mono.subscribe(Mono.java:3710) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:141) ~[reactor-core-3.2.9.RELEASE.jar:3.2.9.RELEASE]
at reactor.netty.http.HttpOperations.lambda$send$0(HttpOperations.java:104) ~[reactor-netty-0.8.8.RELEASE.jar:0.8.8.RELEASE]
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:305) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1036) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:812) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:763) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.channel.CombinedChannelDuplexHandler.write(CombinedChannelDuplexHandler.java:348) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107) ~[netty-codec-4.1.36.Final.jar:4.1.36.Final]
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpRequest, state: 1

Copy message headers to corresp. client transport headers

Subj.

For http:
In HttpGatewayClient pass headers. See constructor io.scalecube.services.gateway.transport.http.HttpGatewayClient#HttpGatewayClient

For websocket:
Take headrs from client transport settings and set then into underlying websocket client transport. See construcrtor io.scalecube.services.gateway.transport.websocket.WebsocketGatewayClient#WebsocketGatewayClient

For rsocket:
Use HeaderCodec to encode map of headers and put them into .setupPayload() .

Enhance RSocketGateway to support authentication

Motivation:

Investigate what's missing in RSocketGateway to perform authentication similar to how it's done on WebsocketGateway (see diagram below). Implement missing parts, change/update RSocketGateway as needed. Add tests (somehow auth tests for websocket gateway are missing, please add websocket auth tests as well). Scenarios for tests could be copied from exchange project. All services in the tests may reside on gateway, no need in a distributed setup with seed or/and services on another microservices instances.

Reminder how websocket auth supposed to work:

Est. connection and calling createSession:
AuthArchP1


Calling secured methods:
AuthArchP2


Closing connection:
AuthArchP3

Buffer leak in websocket gateway client transport

Test to reproduce leaks:
service-gateway in WebsocketGatewayTest

@test
public void test() {
for (int i = 0; i < 100; i++) {
System.out.println(service.manyStream(30L).blockFirst());
}
}
LEAK with

io.scalecube.services.gateway.transport.websocket.WebsocketGatewayClientCodec.decode(WebsocketGatewayClientCodec.java:170)
io.scalecube.services.gateway.transport.websocket.WebsocketGatewayClientCodec.decode(WebsocketGatewayClientCodec.java:33)
io.scalecube.services.gateway.transport.websocket.WebsocketSession.lambda$new$0(WebsocketSession.java:58)

Seems like the cause of LEAK in SimpleOrderbookTests is "cancel" signal which is generated by .take(1) method. For example we received 1 item and canceled Flux stream but then from network we received one more item which will be ignored(ByteBuf will not be release) because flux already canceled.
To see this we need to enable "debug" log level for "reactor"

D 1004-1511:10,404 r.c.p.Operators onNextDropped: io.rsocket.util.ByteBufPayload@41a1d079

To fix this we could add session.removeProcessor(sid); in the end of method handleCancel in WebsocketGatewayClient, but I'm not sure that this is the right way to do, need to discuss with the @artem-v

Second leak

io.scalecube.services.transport.rsocket.RSocketClientTransport.lambda$connect$1(RSocketClientTransport.java:63)
Seems like same issue with "Cancel" but I'm still looking how to fix it

Require scalecube-services-gateway-runner ARM64 docker image

I am trying to use scalecube/scalecube-services-gateway-runner docker image over ARM64v8 but it seems it does not have an arm64 supported tag in the docker hub.
I have gone through this commit, it shows that the support for service-gateway-runner has been migrated or completely removed.

I have tried finding the source of the docker image but unable to find the latest dockerfile for scalecube/scalecube-services-gateway-runner docker image.

Do you have any plans for releasing an ARM64 supported docker image?

I am happy to help in providing the arm64 support, but as a start can you point to the dockerfile, that is used to build this image..

Make stack traces shorter

Make such stack traces shorter:

E 2020-05-15T09:57:45,095 i.s.s.g.GatewaySessionHandler exchangeId=-1|clientSessionId=1589525851346| [websocket-gateway-epoll-2]
io.scalecube.services.exceptions.MessageCodecException: Failed to decode message
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'fgsdfg': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 7]
	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851) ~[jackson-core-2.11.0.jar:2.11.0]
	at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:717) ~[jackson-core-2.11.0.jar:2.11.0]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3585) ~[jackson-core-2.11.0.jar:2.11.0]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3561) ~[jackson-core-2.11.0.jar:2.11.0]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:2896) ~[jackson-core-2.11.0.jar:2.11.0]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchFalse(UTF8StreamJsonParser.java:2849) ~[jackson-core-2.11.0.jar:2.11.0]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:841) ~[jackson-core-2.11.0.jar:2.11.0]
	at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:757) ~[jackson-core-2.11.0.jar:2.11.0]
	at io.scalecube.services.gateway.ws.GatewayMessageCodec.decode(GatewayMessageCodec.java:125) ~[scalecube-services-gateway-netty-2.8.16-RC2.jar:?]
	at io.scalecube.services.gateway.ws.WebsocketGatewayAcceptor.lambda$onRequest$9(WebsocketGatewayAcceptor.java:126) ~[scalecube-services-gateway-netty-2.8.16-RC2.jar:?]
	at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:56) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.MonoDeferWithContext.subscribe(MonoDeferWithContext.java:54) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4218) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4329) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4049) ~[reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at io.scalecube.services.gateway.ws.WebsocketGatewayAcceptor.lambda$onConnect$7(WebsocketGatewayAcceptor.java:119) ~[scalecube-services-gateway-netty-2.8.16-RC2.jar:?]
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:107) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onNext(FluxFilter.java:240) [reactor-core-3.3.5.RELEASE.jar:3.3.5.RELEASE]
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:331) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:352) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:493) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.http.server.WebsocketServerOperations.onInboundNext(WebsocketServerOperations.java:154) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) [reactor-netty-0.9.7.RELEASE.jar:0.9.7.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) [netty-codec-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) [netty-handler-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) [netty-codec-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) [netty-codec-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [netty-transport-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) [netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) [netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [netty-transport-native-epoll-4.1.48.Final-linux-x86_64.jar:4.1.48.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.48.Final.jar:4.1.48.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.48.Final.jar:4.1.48.Final]
	at java.lang.Thread.run(Thread.java:834) [?:?]

Add interceptors on websocket gateway side

This part only for WebSocket gateway acceptor so far.

I need to add more audit stuff and it should write all incoming and outgoing traffic/content.
I guess the context approach (reactor.util.context.Context) can be used for it. And need to add something similar to the following invocation of two interfaces:

public interface WebsocketRequestInterceptor {

  Context handle(String gatewaySessionId, ByteBuf request, Context ctx);
}


public interface WebsocketResponseInterceptor {

  void handle(String gatewaySessionId, ByteBuf response, Context ctx);
}

As you can see WebsocketRequestInterceptor returns a new context instance which should be used with reactor.core.publisher.Mono#subscriberContext(java.util.function.Function<reactor.util.context.Context,reactor.util.context.Context>) and this context will go along with the request.

Also, it can be a replacement for invoking GatewayMetrics, it means it can be pluggable.

WDYT guys?

rsocket.onClose doesn't work

io.scalecube.services.gateway.rsocket.RSocketClientSdkDisconnectTest doesn't work after reverting of rsocket version.
See #25

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.