alibaba / alibaba-rsocket-broker Goto Github PK
View Code? Open in Web Editor NEWAlibaba RSocket Broker: Mesh, Streaming & IoT
Home Page: https://alibroker.info
License: Apache License 2.0
Alibaba RSocket Broker: Mesh, Streaming & IoT
Home Page: https://alibroker.info
License: Apache License 2.0
在RSocket Broker的场景中,一个RSocket长连接会传输各种类型的Payload,虽然RSocket长连接支持全局的Payload的data mime type,但是实际中会存在不用的业务场景使用不同的数据编码格式,如RPC调用和Kafka消息可能就是不同的数据类型,这个时候就需要给每一个Payload设置独立的数据编码格式,对于RPC场景,可能还存在对返回数据的数据编码需求,如以下签名:
User findUserByNick(String nick);
有可能是请求的时候是Text/Plain编码,而返回的对象结果要求是JSON编码,这种不同的编码需求主要是出于个性化和性能要求。 在RSocket 281的Metadata规范中,增加了data encoding和accept data encoding两者元数据类型,这样可以保证满足该场景的要求。
在RSocket Broker中,建议服务提供方支持多种数据类型,目前主要包括:
Composite Metadata Extension: data Mime/type definition per-stream: rsocket/rsocket#281
提供RSocket Broker 开发环境Docker镜像,方便使用Docker或者Docker Compose的开发者快速启动RSocket Broker进行相关的测试。 目前已经RSocket Broker已经使用Jib进行镜像制作,但是还没有提供到docker hub上。
RSocket默认支持Interceptor机制,你可以通过ClientRSocketFactory.addRequesterPlugin() 添加你自定义的RSocketInterceptor,但是这些interceptor相对比较低级一些,如果你要获取具体的信息还需要进行ByteBuf解析,尤其是metadata。 如目前的Zipkin Trace实现,实现相对麻烦一些,如果以Interceptor实现,就会简单很多。
如果Protobuf Message对象,通过Jackson进行json解析或者序列化,会出现错误。 需要为Jackson添加protobuf的datatype支持。 解决的方法是将jackson-datatype-protobuf添加到依赖中即可。
和HTTP REST API到RSocket转换一样,是否要增加gRPC到RSocket协议转换的gateway? 有需求的留言一下,目前内部已经有一个原型版本。
Ensure Maven build easy for end user, CI etc. The Maven version is 3.6.3.
在某些情况下,需要支持broker到broker直接的RPC调用,例如访问broker的一些配置等,目前一个场景就是broker要支持外部应用应用接入,应用无法直接访问broker的内部IP,如office的机器访问云vpc内部的rsocker broker集群,这个时候broker要有两个ip,所以对外部应用推送就需要走外部IP或者域名推送,内部应用则走内部IP推送。
解决思路:
Broker RPC特性可以方便后续对broker的功能的扩展,broker之间通讯也比较简洁。
看readme.md中的介绍,解决了“传统设计中众多的问题”,但是当前这些问题都有成熟的方案。
不太了解rsocket-broker在阿里目前都有哪些方面的实际应用?
2020-04-28 00:02:11.427 INFO 1 --- [tor-tcp-epoll-4] b.r.b.r.RSocketBrokerHandlerRegistryImpl : RST-500200: Succeed to accept connection from rsocket-user-service
2020-04-28 00:02:11.460 ERROR 1 --- [tor-tcp-epoll-4] c.a.r.listen.CompositeMetadataRSocket : RST-600500: Failed to parse composite metadata
java.lang.NullPointerException: null
at java.util.Objects.requireNonNull(Objects.java:203) ~[na:1.8.0_242]
at io.micrometer.core.instrument.ImmutableTag.(ImmutableTag.java:35) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Tag.of(Tag.java:29) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Tags.of(Tags.java:254) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.MeterRegistry.counter(MeterRegistry.java:363) ~[micrometer-core-1.3.2.jar:1.3.2]
at io.micrometer.core.instrument.Metrics.counter(Metrics.java:76) ~[micrometer-core-1.3.2.jar:1.3.2]
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.metrics(CompositeMetadataRSocket.java:155) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.requestResponse(CompositeMetadataRSocket.java:70) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:193) [rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299) [rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8174) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
2020-04-28 00:02:11.463 ERROR 1 --- [tor-tcp-epoll-4] c.a.r.listen.impl.RSocketListenerImpl : RST-200501: Exception during rsocket call
io.rsocket.exceptions.InvalidException: RST-600500: Failed to parse composite metadata
at com.alibaba.rsocket.listen.CompositeMetadataRSocket.requestResponse(CompositeMetadataRSocket.java:76) ~[alibaba-rsocket-core-0.1.0-SNAPSHOT.jar:na]
at io.rsocket.RSocketResponder.requestResponse(RSocketResponder.java:193) [rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.RSocketResponder.handleFrame(RSocketResponder.java:299) [rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:554) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:630) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.subscribe(FluxGroupBy.java:696) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Flux.subscribe(Flux.java:8174) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:188) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1637) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:317) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.rsocket.internal.ClientServerInputMultiplexer.lambda$new$1(ClientServerInputMultiplexer.java:116) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drainLoop(FluxGroupBy.java:380) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.drain(FluxGroupBy.java:316) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:201) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) ~[reactor-netty-0.9.4.RELEASE.jar:0.9.4.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.45.Final-linux-x86_64.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
${rsocket.port:0}!=0
@Configuration
@ConditionalOnExpression("'${rsocket.port}'!='0'")
public class RSocketListenerAutoConfiguration {
spring-autoconfigure-metadata.properties
文件导致RSocketAutoConfiguration中@ConditionalOnClass加载了类而报错 @Bean
@ConditionalOnClass(PrometheusMeterRegistry.class)
public MetricsService metricsService(PrometheusMeterRegistry meterRegistry) {
return new MetricsServicePrometheusImpl(meterRegistry);
}
文档建议添加如下依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure-processor</artifactId>
<optional>true</optional>
</dependency>
参考文档
报错信息如下
java.lang.IllegalStateException: Failed to introspect Class [com.alibaba.spring.boot.rsocket.RSocketAutoConfiguration] from ClassLoader [sun.misc.Launcher$AppClassLoader@18b4aac2]
at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:481) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:358) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.util.ReflectionUtils.getUniqueDeclaredMethods(ReflectionUtils.java:414) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.lambda$getTypeForFactoryMethod$2(AbstractAutowireCapableBeanFactory.java:743) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) ~[na:1.8.0_241]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.getTypeForFactoryMethod(AbstractAutowireCapableBeanFactory.java:742) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.determineTargetType(AbstractAutowireCapableBeanFactory.java:681) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.predictBeanType(AbstractAutowireCapableBeanFactory.java:649) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.isFactoryBean(AbstractBeanFactory.java:1605) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBeanNamesForType(DefaultListableBeanFactory.java:523) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeanNamesForType(DefaultListableBeanFactory.java:494) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:616) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.getBeansOfType(DefaultListableBeanFactory.java:608) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.getBeansOfType(AbstractApplicationContext.java:1242) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.boot.SpringApplication.getExitCodeFromMappedException(SpringApplication.java:880) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
at org.springframework.boot.SpringApplication.getExitCodeFromException(SpringApplication.java:868) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
at org.springframework.boot.SpringApplication.handleExitCode(SpringApplication.java:855) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
at org.springframework.boot.SpringApplication.handleRunFailure(SpringApplication.java:806) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.0.M4.jar:2.3.0.M4]
at com.example.spring.webflux.SpringWebfluxDemoApplication.main(SpringWebfluxDemoApplication.java:29) [classes/:na]
Caused by: java.lang.NoClassDefFoundError: io/micrometer/prometheus/PrometheusMeterRegistry
at java.lang.Class.getDeclaredMethods0(Native Method) ~[na:1.8.0_241]
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701) ~[na:1.8.0_241]
at java.lang.Class.getDeclaredMethods(Class.java:1975) ~[na:1.8.0_241]
at org.springframework.util.ReflectionUtils.getDeclaredMethods(ReflectionUtils.java:463) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 21 common frames omitted
Caused by: java.lang.ClassNotFoundException: io.micrometer.prometheus.PrometheusMeterRegistry
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[na:1.8.0_241]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[na:1.8.0_241]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[na:1.8.0_241]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[na:1.8.0_241]
... 25 common frames omitted
Disconnected from the target VM, address: '127.0.0.1:1064', transport: 'socket'
Process finished with exit code 1
目前RSocket Broker已经提供对各个应用的Prometheus的采集支持,主要是通过MetricsService这个RSocket服务完成的。 而且提供了MetricsScrapeController可以输出Prometheus的格式。
实现一个基于RSocket Broker的Prometheus Service Discovery,完成对接入到RSocket Broker的所有应用进行Prometheus的metrics抓取。
https://prometheus.io/blog/2018/07/05/implementing-custom-sd/
https://github.com/prometheus/prometheus/tree/master/discovery
Hessian序列化增加Java 8数据类型支持,如Java time类型,Optional支持。
兼容Dubbo Hessian Lite,相关的代码来自Dubbo Hessian Lite,去除Java反射逻辑,RSocket支持Java 8+版本,不需要使用反射来支持Java 8数据类型。
实现逻辑: META-INF/hessian/serializers添加对应的Serializer类。
其他类型支持,如Joda time, 需要自己添加对应的Serializer类。
Upgrade to RSocket Java SDK 1.0.0 🌹
https://github.com/rsocket/rsocket-java/releases/tag/1.0.0
考虑到开发和部署的便捷性,RSocket Broker Server内置支持RSocket Service的HTTP访问,格式如下,请注意 "/api" 前缀。
POST http://127.0.0.1:9998/api/com.alibaba.user.UserService/findById
Authorization: Bearer jwt_token
Content-Type: application/json
[
1
]
这里有一个小问题: Broker Console目前是用Vaadin开发,而Vaadin现在还不能支持WebFlux,所以RSocket REST API并不能很好地利用WebFlux的Reactive特性,性能和thread模型还是Spring MVC的。
Vaadin的reactive支持要等待这里: vaadin/spring#565
而alibaba-broker-http-gateway还继续保留,这个是完全Reactive的,同时方便开发者对其他协议进行集成,如gRPC, Dubbo等。 如果大家有gRPC -> RSocket的需求,我们会考虑在http gateway中添加gRPC的集成。
如果大家有更好的建议,欢迎留言。
使用Travis CI进行持续集成,同时在README.md添加build status图标。
目前使用Spring Boot默认的banner,调整为粉色的 Alibaba RSocket Broker
at com.alibaba.rsocket.broker.web.ui.ServicesView.lambda$services$0(ServicesView.java:50) ~[classes/:na]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[na:1.8.0_231]
at java.util.concurrent.ConcurrentHashMap$ValueSpliterator.forEachRemaining(ConcurrentHashMap.java:3566) ~[na:1.8.0_231]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[na:1.8.0_231]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[na:1.8.0_231]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[na:1.8.0_231]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[na:1.8.0_231]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[na:1.8.0_231]
目前的应用状态主要包括 connected, serving, paused, stopped,同时触发不同的逻辑。 使用状态机管理逻辑可能更清晰,代码也容易管理。 目前spring state machine的flux支持还没有发布,不过目前可以做一些测试,性能不用担心,主要是StateMachine对象的大小。
Distribute the server as tar.gz and make it easy to start.
EMQ可以达到百万单机长连接数,咱们这个RSocket Broker单机长连接数大概能达到多少?
目前tracing的产品比较多,AP和规范也各自不同,考虑能在Broker上适配主流的Tracing产品。
Bridge between OpenTracing and Brave: https://github.com/openzipkin-contrib/brave-opentracing
准备Alpha1的版本发布,一些要解决的核心问题:
如果大家还有一些比较关注的问题,这里可以留言一下。
在不少场景中,我们调用远程服务, 获取资源对象,然后再对象输出。 如下述代码中,我们调用远程服务,获取用户,然后再将用户信息以REST API方式输出。 但是这里有序列化的问题: RPC网络调用有两次序列化和反序列化,然后REST API输出又再进行JSON序列化,代码如下:
@GetMapping(value = "/user/{id}", produces = "application/json")
public Mono<User> jsonBytes(@PathVariable Integer id) {
return userService.findUserById(id);
}
能否有一个机制,在服务端就输出对应的数据格式,如JSON,而通讯的过程中都不要涉及多次序列化的问题,而是将服务端输出的bytes直接输出给最终的调用者,代码如下:
@GetMapping(value = "/user/{id}", produces = "application/json")
public Mono<ByteBuffer> jsonBytes(@PathVariable Integer id) {
return userService.findUserById(id);
}
目前路由的算法是 group + service name + version,然后对其进行hashcode,然后在进行bitmap匹配查找。 可以考虑在调用方提前将路由信息进行HashCode化,然后基于Integer Hashcode进行bitmap匹配,这样只需要读取composite metadata的前8个字节(1 + 3 + 4)就可以完成路由匹配。 考虑到Hashcode各个语言的一致性性,还是采用 MurmurHash3 算法。
目前RSocket Service是基于Java Interface的,所以需要通过Proxy机制代理进行RSocket接口调用。
对比JDK Proxy, Javassist 和 Byte Buddy 发现,ByteBuddy的性能最高。 一个简单的性能测试数据如下,而且ByteBuddy对Java Interface的default method处理也比较友好。
ByteBuddyProxyTest.testOperation thrpt 2 1391028488.716 ops/s
JavassistProxyTest.testOperation thrpt 2 582515791.244 ops/s
JdkProxyTest.testOperation thrpt 2 233792969.032 ops/s
所以Java Proxy代理机制调整到ByteBuddy上。
https://github.com/txd-team/docsite
看了下dubbo、nacos都是直接用docsite搭建的官网,我们需不需要现在开始也搞一下呢?
目前RSocket主要涉及到的Load Balance到Broker 集群的多连接管理。 Alibaba RSocket Broker采用share nothing架构,也就是集群中broker相互之间不通讯,不承担消息转发。 目前客户端SDK的设计思路是通过集群推过来的拓扑结构变更,SDK端完成连接的重连和路由更新等。 目前代码只有基础功能,需要再细致完善,提升稳定性。
准备1.0.0.M2发布:
简化K8S下的broker集群部署,主要解决的问题是各个Pod独立域名、JWT RSA Token的问题。
MESSAGE_RSOCKET_BINARY_ROUTING 等在项目中的io.rsocket.metadata.WellKnownMimeType类中,在Rsocket-core-1.0.0.jar中存在同名类,导致加载时报找不到Field。
经过排查,classloader加载了Rsocket-core-1.0.0.jar 这个里面的WellKnownMimeType报找不到Field。
启动server的时候报这个是啥意思?
[INFO] Added 40 dependencies to 'D:\IdeaProjects\alibaba-rsocket-broker\alibaba-broker-server\target\frontend\package.json'
[INFO] Updated npm D:\IdeaProjects\alibaba-rsocket-broker\alibaba-broker-server\target\frontend\package.json.
[INFO] Running npm install
...
npm ERR! Unexpected end of JSON input while parsing near '...2oYSQ4\nrAd/cAHSAmngL'
maven 版本 :3.63
nodejs 版本:v12.13.1
是不是我本地网络比较慢,包下载的不完整?
目前RSocket Broker支持RxJava 2, RxJava 3 估计在在1月底发布,要兼容 RxJava 3。 主要在RSocketRequestRpcProxy 和 RSocketResponderSupport 进行调整。
主要是根据 @serializable 进行判断,目前考虑主要支持JSON, CBOR和Protobuf三种数据序列化格式,方便各种Kotlin应用进行对接。
import kotlinx.serialization.Serializable
@Serializable
data class Data(val a: Int, val b: String = "42")
目前的DataEncodingMetadata并不是标准规范,待 rsocket/rsocket#281 确定后进行调整,可能会存在不兼容的情况。
目前RSocket Broker的控制台使用Vaadin开发,主要是方便Java程序员,同时减少各种REST API调用的问题,不知道多少同学对其他JS框架的了解程度。 这里调查一下,是否需要将控制台UI调整到Vue、React等框架? 但是Node环境和基本开发这个都是需要的。
JDK 14将于3月17日发布,添加了非常多的特性,这些特性对开发和性能提升都有帮助。虽然项目是基于JDK 8开发的,但是要兼容JDK 14。
希望建立钉钉或者QQ群之类的沟通群组,方便大家遇到问题及时沟通。
有roadmap话,我们这些菜鸟更方便参与进来^_^
Deploy into maven repo
Now use Maven Jib plugin to build broker Docker images, please consider to use Native Spring Boot Buildpacks to build Docker image. https://spring.io/blog/2020/01/27/creating-docker-images-with-spring-boot-2-3-0-m1
目前RSocket Service接口支持Reactor,RxJava2, RxJava 3,还没有对Kolin Coroutines和 Flow的支持。 Kotlin Coroutines和Reactive都可以相互转换的,这里调研一下大家是否有在Kotlin? 是否有意愿使用Kotlin的原生接口?
interface UserService {
suspend fun getAdmin(): String
suspend fun getNickById(id: Int): String
fun getAllNames(): Flow<String>
}
代码为 Example 中的代码,按照文档步骤操作后访问正常,
过了六小时未操作后再次调用接口服务器会报 500 异常,手动将 Broker、Respones、Request 重启后正常访问
2019-12-11 10:27:20.625 INFO 97481 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8181
2019-12-11 10:27:20.628 INFO 97481 --- [ main] c.a.s.b.r.demo.RSocketRequesterApp : Started RSocketRequesterApp in 1.702 seconds (JVM running for 2.302)
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 65000 ms
at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115)
at io.rsocket.keepalive.KeepAliveSupport.tryTimeout(KeepAliveSupport.java:110)
at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:146)
at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:54)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-12-11 16:14:48.833 ERROR 97481 --- [ctor-http-nio-4] a.w.r.e.AbstractErrorWebExceptionHandler : [fc5bd7d9] 500 Server Error for HTTP GET "/user/1"
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 65000 ms
at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115) ~[rsocket-core-1.0.0-RC5.jar:na]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler com.alibaba.spring.boot.rsocket.demo.PortalController#user(Integer) [DispatcherHandler]
|_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
|_ checkpoint ⇢ HTTP GET "/user/1" [ExceptionHandlingWebHandler]
Stack trace:
at io.rsocket.RSocketRequester.terminate(RSocketRequester.java:115) ~[rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.keepalive.KeepAliveSupport.tryTimeout(KeepAliveSupport.java:110) ~[rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.keepalive.KeepAliveSupport$ClientKeepAliveSupport.onIntervalTick(KeepAliveSupport.java:146) ~[rsocket-core-1.0.0-RC5.jar:na]
at io.rsocket.keepalive.KeepAliveSupport.lambda$start$0(KeepAliveSupport.java:54) ~[rsocket-core-1.0.0-RC5.jar:na]
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_212]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[na:1.8.0_212]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_212]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_212]
目前Gossip还有Reactor Dysprosium版本适配问题,使用本地修改过的 scalecube-cluster 2.4.10 SNASHOT可以工作,还需要等待scalecube-cluster进行更新。
还是有一些变化,主要是UriHandler.java 和 UriTransportRegistry.java 被删除啦,目前RSocket Broker要根据schema做transport层识别,可能需要将这些代码转换到broker内部,RSocket-CLI是添加了这些代码。
如下改变:
Reactor Addons已经支持RxJava 3的适配
reactor/reactor-addons#224
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.