vvsuperman / coolmq Goto Github PK
View Code? Open in Web Editor NEW消息最终一致性方案,基于rabbitmq的分布式事务解决方案
消息最终一致性方案,基于rabbitmq的分布式事务解决方案
你好,这里面没看到消息入库的过程啊!是什么情况呢
这个能支持rocketmq吗
// 消息发送到RabbitMQ交换器后接收ack回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if(returnFlag){
logger.error("mq发送错误,无对应的的交换机,confirm回掉,ack={},correlationData={} cause={} returnFlag={}",
ack, correlationData, cause, returnFlag);
}
logger.info("confirm回调,ack={} correlationData={} cause={}", ack, correlationData, cause);
String msgId = correlationData.getId();
/** 只要消息能投入正确的消息队列,并持久化,就返回ack为true*/
if(ack){
logger.info("消息已正确投递到队列, correlationData:{}", correlationData);
//清除重发缓存
String dbCoordinatior = ((CompleteCorrelationData)correlationData).getCoordinator();
DBCoordinator coordinator = (DBCoordinator)applicationContext.getBean(dbCoordinatior);
coordinator.setMsgSuccess(msgId);
}else{
logger.error("消息投递至交换机失败,业务号:{},原因:{}",correlationData.getId(),cause);
}
});
coordinator.setMsgSuccess(msgId);报 java.lang.IllegalArgumentException: non null hash key required
redis在该项目中使用的意义是什么?感觉在该项目中不使用redis也能实现消息的最终可消费吧
MyAnonFanoutListener这个注解是什么作用?没看到哪里有用到
将队列的定义配置化,使得配置中心可以直接定义队列
1.如果没用你封装的注解,报错
2.使用你封装的注解,redis地址配置不生效
后面我就不想修改了.不知道怎么上的生产
微信里longer童鞋提出这样的问题:
此时业务操作事务还未提交,但消息已经发送了,于是出现不一致的情况
如果把提交放到前面
1 执行业务操作-> 2 提交-> 3 发送消息
但此时如果2提交后机器挂了,又会出现不一致的情况。因为发消息是一个异步的过程,是没法在一个事务中进行控制的,即使用事务的传播特性:
事务A:
事务B:执行业务操作
发送消息
B先提交,如果A失败B也会回滚。但即使这样,如果B后面一步出现断电等极端情况依然会产生不一致
考虑到断电的极端情况,必须要有第三方守护进程来进行一个判断,于是可以使用如下的逻辑:
来看下可能的异常:如果业务成功,发送消息失败(断电等情况), 此时守护进程查到db中有一条“我要做业务操作啦”,同时业务方提供一个回查接口,并查到业务确实成功了,进行消息重发。RocketMQ就是这种解决方案,但还有一个问题:如果业务B方需要业务A的结果,但A的结果此时是无法得到的,貌似RocketMQ并未考虑该情况,所以需要在回查接口中返回这个结果,方便重发。
org.springframework.data.redis.RedisConnectionFailureException: Cannot get Jedis connection; nested exception is redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:204) ~[spring-data-redis-1.8.8.RELEASE.jar:na]
at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.getConnection(JedisConnectionFactory.java:348) ~[spring-data-redis-1.8.8.RELEASE.jar:na]
at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:129) ~[spring-data-redis-1.8.8.RELEASE.jar:na]
at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:92) ~[spring-data-redis-1.8.8.RELEASE.jar:na]
at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:79) ~[spring-data-redis-1.8.8.RELEASE.jar:na]
at org.springframework.boot.actuate.health.RedisHealthIndicator.doHealthCheck(RedisHealthIndicator.java:52) ~[spring-boot-actuator-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.actuate.health.AbstractHealthIndicator.health(AbstractHealthIndicator.java:43) ~[spring-boot-actuator-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.actuate.health.CompositeHealthIndicator.health(CompositeHealthIndicator.java:68) [spring-boot-actuator-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.actuate.endpoint.HealthEndpoint.invoke(HealthEndpoint.java:85) [spring-boot-actuator-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.actuate.endpoint.HealthEndpoint.invoke(HealthEndpoint.java:35) [spring-boot-actuator-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at org.springframework.boot.actuate.endpoint.jmx.DataEndpointMBean.getData(DataEndpointMBean.java:46) [spring-boot-actuator-1.5.8.RELEASE.jar:1.5.8.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
at sun.reflect.misc.Trampoline.invoke(MethodUtil.java:71) [na:1.8.0_151]
at sun.reflect.GeneratedMethodAccessor119.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
at sun.reflect.misc.MethodUtil.invoke(MethodUtil.java:275) [na:1.8.0_151]
at javax.management.modelmbean.RequiredModelMBean$4.run(RequiredModelMBean.java:1252) [na:1.8.0_151]
at java.security.AccessController.doPrivileged(Native Method) [na:1.8.0_151]
at java.security.ProtectionDomain$JavaSecurityAccessImpl.doIntersectionPrivilege(ProtectionDomain.java:80) [na:1.8.0_151]
at javax.management.modelmbean.RequiredModelMBean.invokeMethod(RequiredModelMBean.java:1246) [na:1.8.0_151]
at javax.management.modelmbean.RequiredModelMBean.invoke(RequiredModelMBean.java:1085) [na:1.8.0_151]
at org.springframework.jmx.export.SpringModelMBean.invoke(SpringModelMBean.java:90) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at javax.management.modelmbean.RequiredModelMBean.getAttribute(RequiredModelMBean.java:1562) [na:1.8.0_151]
at org.springframework.jmx.export.SpringModelMBean.getAttribute(SpringModelMBean.java:109) [spring-context-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttribute(DefaultMBeanServerInterceptor.java:647) [na:1.8.0_151]
at com.sun.jmx.mbeanserver.JmxMBeanServer.getAttribute(JmxMBeanServer.java:678) [na:1.8.0_151]
at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1445) [na:1.8.0_151]
at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76) [na:1.8.0_151]
at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309) [na:1.8.0_151]
at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401) [na:1.8.0_151]
at javax.management.remote.rmi.RMIConnectionImpl.getAttribute(RMIConnectionImpl.java:639) [na:1.8.0_151]
at sun.reflect.GeneratedMethodAccessor85.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:357) [na:1.8.0_151]
at sun.rmi.transport.Transport$1.run(Transport.java:200) [na:1.8.0_151]
at sun.rmi.transport.Transport$1.run(Transport.java:197) [na:1.8.0_151]
at java.security.AccessController.doPrivileged(Native Method) [na:1.8.0_151]
at sun.rmi.transport.Transport.serviceCall(Transport.java:196) [na:1.8.0_151]
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568) [na:1.8.0_151]
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826) [na:1.8.0_151]
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683) [na:1.8.0_151]
at java.security.AccessController.doPrivileged(Native Method) [na:1.8.0_151]
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682) [na:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_151]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_151]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
at redis.clients.util.Pool.getResource(Pool.java:53) ~[jedis-2.9.0.jar:na]
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:226) ~[jedis-2.9.0.jar:na]
at redis.clients.jedis.JedisPool.getResource(JedisPool.java:16) ~[jedis-2.9.0.jar:na]
at org.springframework.data.redis.connection.jedis.JedisConnectionFactory.fetchJedisConnector(JedisConnectionFactory.java:194) ~[spring-data-redis-1.8.8.RELEASE.jar:na]
... 50 common frames omitted
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect
at redis.clients.jedis.Connection.connect(Connection.java:207) ~[jedis-2.9.0.jar:na]
at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:93) ~[jedis-2.9.0.jar:na]
at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1767) ~[jedis-2.9.0.jar:na]
at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:106) ~[jedis-2.9.0.jar:na]
at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:868) ~[commons-pool2-2.4.2.jar:2.4.2]
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) ~[commons-pool2-2.4.2.jar:2.4.2]
at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) ~[commons-pool2-2.4.2.jar:2.4.2]
at redis.clients.util.Pool.getResource(Pool.java:49) ~[jedis-2.9.0.jar:na]
... 53 common frames omitted
Caused by: java.net.ConnectException: Connection refused: connect
at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) ~[na:1.8.0_151]
at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) ~[na:1.8.0_151]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) ~[na:1.8.0_151]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) ~[na:1.8.0_151]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) ~[na:1.8.0_151]
at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) ~[na:1.8.0_151]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[na:1.8.0_151]
at java.net.Socket.connect(Socket.java:589) ~[na:1.8.0_151]
at redis.clients.jedis.Connection.connect(Connection.java:184) ~[jedis-2.9.0.jar:na]
... 60 common frames omitted
我用客户端连接是没问题的
redis配置:
spring.redis.host: 10.16.0.202
spring.redis.port: 6379
spring.redis.timeout: 10000
spring.redis.pool.max-active: 8
spring.redis.pool.max-wait: -1
spring.redis.pool.max-idle: 8
spring.redis.pool.min-idle: 0
@vvsuperman balabala
看了下代码,请问能否带上消费模块的例子,还有一些代码没有开源出来,比如说,发邮件和 入库那块的用到的服务,本人邮箱[email protected]
rt
视屏教程不能看吗?
coordinator.setMsgReady(bizName, rabbitMetaMessage);
coordinator.setMsgSuccess(msgId);
发送消息确认的回调的key都不一样 搞锤子 浪费我时间恶心人
TransactionSender这个切面是在Transactional注解指定的切面之前执行的吗?
如果是之前执行的那没问题,如果是之后执行的,那么有问题,当本地业务方法执行成功了,也提交事务了,但rabbitmq的消息没发送到mq server。
能否补充一个详细的例子
这段代码如何与DeadLetterMessageListener的onMessage联系起来(这段代码如何生效的?)
// 入死信队列
channel.basicReject(deliveryTag, false);
abstractmessagelistener onmessage中increment出现两次有问题吧?还有建议代码重构下。
这个demo具体是怎么实现事物回滚的?是通过事物补偿吗?
没有看到业务上MqSender注解的使用,没有业务DEMO拼命。业务上不方便测试效果。能不能后期加上
你好,你什么时候有时间上传下消费侧的代码。谢谢!
有没有具体的例子 比如
跨库 转账
跨库 减库存 更新订单状态
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.