Coder Social home page Coder Social logo

davidmarquis / redisq Goto Github PK

View Code? Open in Web Editor NEW
103.0 9.0 37.0 57 KB

RedisQ - Java implementation of a reliable Pub/Sub message queue based on Redis

Home Page: http://blog.radiant3.ca/2013/01/03/reliable-delivery-message-queues-with-redis/

License: MIT License

Java 92.59% Gherkin 7.41%
redis java queue consumer producer reliable

redisq's People

Contributors

davidmarquis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

redisq's Issues

is good project but Configuration file writing is tedious

my code :
`
//base redis mq
@bean
public PayloadSerializer payloadSerializer(){
return new com.github.davidmarquis.redisq.serialization.GsonPayloadSerializer();
}
@bean("redisOps")
public RedisOps buildRedisMQOps(){
RedisOps ro = new com.github.davidmarquis.redisq.persistence.RedisOps();
//ro.setRedisTemplate(redisTemplate);
return ro;
}
@bean("redisMemberQueue")
public RedisMessageQueue redisMsQueue(){
RedisMessageQueue mq = new com.github.davidmarquis.redisq.RedisMessageQueue();
mq.setQueueName("queue.member");
//mq.setRedisOps(redisOps);
return mq;
}
// loop event
@bean("memberSignUpProducer")
public MessageProducer signUpProducer(@qualifier("redisMemberQueue") RedisMessageQueue redisMemberQueue){
DefaultMessageProducer dmq = new com.github.davidmarquis.redisq.producer.DefaultMessageProducer<>();
dmq.setQueue(redisMemberQueue);
return dmq;
}
@bean("memberSignInProducer")
public MessageProducer signInProducer(@qualifier("redisMemberQueue") RedisMessageQueue redisMemberQueue){
DefaultMessageProducer dmq = new com.github.davidmarquis.redisq.producer.DefaultMessageProducer<>();
dmq.setQueue(redisMemberQueue);
return dmq;
}
@bean("memberSignOutProducer")
public MessageProducer signOutProducer(@qualifier("redisMemberQueue") RedisMessageQueue redisMemberQueue){
DefaultMessageProducer dmq = new com.github.davidmarquis.redisq.producer.DefaultMessageProducer<>();
dmq.setQueue(redisMemberQueue);
return dmq;
}
// loop listener
@bean
public MessageConsumer promoteRoleConsumer(@qualifier("redisMemberQueue") RedisMessageQueue redisMemberQueue, @qualifier("promoteRole") MessageListener promoteRole){
MessageConsumer messageConsumer = new com.github.davidmarquis.redisq.consumer.MessageConsumer<>();
messageConsumer.setQueue(redisMemberQueue);
messageConsumer.setConsumerId("SignUpEvent.PromoteRole");
messageConsumer.setMessageListener(promoteRole);
return messageConsumer;
}
@bean("promoteRole")
public MessageListener promoteRoleListener(){
return new MemberSignUpPromoteRoleListener();
}

@Bean
public MessageConsumer<MemberSignUpEvent> inviteCodeConsumer(@Qualifier("redisMemberQueue") RedisMessageQueue redisMemberQueue, @Qualifier("inviteCode") MessageListener<MemberSignUpEvent> inviteCode){
    MessageConsumer<MemberSignUpEvent> messageConsumer = new com.github.davidmarquis.redisq.consumer.MessageConsumer<>();
    messageConsumer.setQueue(redisMemberQueue);
    messageConsumer.setConsumerId("SignUpEvent.InviteCode");
    messageConsumer.setMessageListener(inviteCode);
    return messageConsumer;
}
@Bean("inviteCode")
public MessageListener<MemberSignUpEvent> inviteCodeListener(){
    return new MemberSignUpInviteCodeListener();
}

@Bean
public MessageConsumer<MemberSignUpEvent> registerNoticeConsumer(@Qualifier("redisMemberQueue") RedisMessageQueue redisMemberQueue, @Qualifier("registerNotice") MessageListener<MemberSignUpEvent> registerNotice){
    MessageConsumer<MemberSignUpEvent> messageConsumer = new com.github.davidmarquis.redisq.consumer.MessageConsumer<>();
    messageConsumer.setQueue(redisMemberQueue);
    messageConsumer.setConsumerId("SignUpEvent.Notice");
    messageConsumer.setMessageListener(registerNotice);
    return messageConsumer;
}
@Bean("registerNotice")
public MessageListener<MemberSignUpEvent> registerNoticeListener(){
    return new MemberSignUpNoticeListener();
}

@Bean
public MessageConsumer<MemberSignInEvent> loginNoticeConsumer(@Qualifier("redisMemberQueue") RedisMessageQueue redisMemberQueue, @Qualifier("loginNotice") MessageListener<MemberSignInEvent> loginNotice){
    MessageConsumer<MemberSignInEvent> messageConsumer = new com.github.davidmarquis.redisq.consumer.MessageConsumer<>();
    messageConsumer.setQueue(redisMemberQueue);
    messageConsumer.setConsumerId("SignInEvent.Notice");
    messageConsumer.setMessageListener(loginNotice);
    return messageConsumer;
}
@Bean("loginNotice")
public MessageListener<MemberSignInEvent> loginNoticeListener(){
    return new MemberSignInNoticeListener();
}

`

How can this work be simplified??

RedisConsumer get messages after downtime

Reliable delivery: Each consumer on a queue will receive each message even if they are temporarily offline.

and after start consumer how we can get them ? (maybe i have something missing)

We get only new messages.

thx

@Bean
   public RedisMessageQueue myQueue(){
       RedisMessageQueue redisMessageQueue = new RedisMessageQueue();
       redisMessageQueue.setQueueName("Sending");
       redisMessageQueue.setRedisOps(redisOps());


       redisMessageQueue.setDefaultConsumerId("defaultGetter");

       return redisMessageQueue;
   }

   @Bean
   public RedisOps redisOps(){
       RedisOps redisOps = new RedisOps();
       redisOps.setRedisTemplate(redisTemplateQ());

       return redisOps;
   }


   @Bean
   public MessageProducer messageProducer(){
      MessageProducerImpl messageProducer = new MessageProducerImpl();
      messageProducer.setQueue(myQueue());
      messageProducer.setRedisOps(redisOps());
      messageProducer.setDefaultTimeToLive(10);
      messageProducer.setDefaultTimeToLiveUnit(TimeUnit.MINUTES);
      return messageProducer;
   }

   @Bean
   public MessageConsumer messageConsumer(){
       MessageConsumer messageConsumer = new MessageConsumer();
       messageConsumer.setQueue(myQueue());
       messageConsumer.setConsumerId("someConsumerId");
       messageConsumer.setMessageListener(new SomeListener());
       messageConsumer.setAutoStartConsumers(true);
       messageConsumer.setRedisOps(redisOps());
       messageConsumer.setRetryStrategy(new MaxRetriesStrategy(5));

       messageConsumer.setThreadingStrategy(new MultiThreadingStrategy(4));
       return messageConsumer;
   }

hi, can you give a sample

hi, nice unit tests! but can you give an integral simple? contains how to use listener, producer, consumer, queue, serialization, retry. many thanks :)

Queue is missing when getting down

I try to use multi consumer on two different jvm (both connect to the same redis), i try to shutdown one of them when submitting queue, the queue is missing.
I loop ten message from 0 to 9, "FIRST" indicate first consumer and "SECOND" indicate second consumer. (every message must be a pair). Here are the messages that i get:
JVM 1:
2017-04-21 09:36:55.040 INFO 99957 --- [umer[my-queue]4] c.m.redis.listener.SimpleListener : Getting FIRST message ... this is a message 0
2017-04-21 09:37:05.069 INFO 99957 --- [umer[my-queue]2] c.m.redis.listener.SimpleListener : Getting FIRST message ... this is a message 1
2017-04-21 09:37:05.073 INFO 99957 --- [umer[my-queue]1] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 1
2017-04-21 09:37:15.105 INFO 99957 --- [umer[my-queue]3] c.m.redis.listener.SimpleListener : Getting FIRST message ... this is a message 2
2017-04-21 09:37:27.100 INFO 99957 --- [umer[my-queue]2] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 3
2017-04-21 09:37:37.082 INFO 99957 --- [umer[my-queue]2] c.m.redis.listener.SimpleListener : Getting FIRST message ... this is a message 4
2017-04-21 09:37:39.031 INFO 99957 --- [umer[my-queue]1] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 4
2017-04-21 09:37:49.102 INFO 99957 --- [umer[my-queue]2] c.m.redis.listener.SimpleListener : Getting FIRST message ... this is a message 5
2017-04-21 09:37:51.048 INFO 99957 --- [umer[my-queue]4] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 5
2017-04-21 09:37:59.036 INFO 99957 --- [umer[my-queue]3] c.m.redis.listener.SimpleListener : Getting FIRST message ... this is a message 6
2017-04-21 09:38:03.029 INFO 99957 --- [umer[my-queue]0] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 6
2017-04-21 09:38:11.056 INFO 99957 --- [umer[my-queue]0] c.m.redis.listener.SimpleListener : Getting FIRST message ... this is a message 7
2017-04-21 09:38:13.095 INFO 99957 --- [umer[my-queue]0] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 7
2017-04-21 09:38:27.021 INFO 99957 --- [umer[my-queue]4] c.m.redis.listener.SimpleListener : Getting FIRST message ... this is a message 8
2017-04-21 09:38:29.080 INFO 99957 --- [umer[my-queue]4] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 8
2017-04-21 09:38:35.062 INFO 99957 --- [umer[my-queue]4] c.m.redis.listener.SimpleListener : Getting FIRST message ... this is a message 9
2017-04-21 09:38:37.123 INFO 99957 --- [umer[my-queue]3] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 9

JVM 2:
2017-04-21 09:36:41.954 INFO 824 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2017-04-21 09:36:42.176 INFO 824 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8081 (http)
Found key spring.boot.redis.test, value=foo
2017-04-21 09:36:48.584 INFO 824 --- [ main] c.microservices.redis.RedisApplication : Started RedisApplication in 23.61 seconds (JVM running for 25.782)
2017-04-21 09:36:57.759 INFO 824 --- [umer[my-queue]3] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 0
2017-04-21 09:37:18.361 INFO 824 --- [umer[my-queue]3] c.m.redis.listener.SecondListener : Getting SECOND message ... this is a message 2
2017-04-21 09:37:22.687 INFO 824 --- [ Thread-15] ationConfigEmbeddedWebApplicationContext : Closing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@30946e09: startup date [Fri Apr 21 09:36:27 ICT 2017]; root of context hierarchy
2017-04-21 09:37:22.731 INFO 824 --- [ Thread-15] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown

I lost the number 3 message. Not consumed by the FIRST consumer.

My code:

@bean
JedisConnectionFactory jedisConnectionFactory() {
JedisConnectionFactory jedisConFactory = new JedisConnectionFactory();
jedisConFactory.setHostName("192.168.1.41");
jedisConFactory.setPort(6379);
return jedisConFactory;
}

@Bean
RedisOps redisOps(){
	return new RedisOps();
}

@Bean
RedisMessageQueue myQueue(){
	RedisMessageQueue redisMessageQueue = new RedisMessageQueue();
	redisMessageQueue.setQueueName("my-queue");
	redisMessageQueue.setQueueDequeueStrategy(new RandomQueueDequeueStrategy(redisOps()));
	return redisMessageQueue;
}

@Bean
MessageProducer<TextPackage> messageProducer(){
	MessageProducerImpl<TextPackage> messageProducer = new MessageProducerImpl<TextPackage>();
	messageProducer.setQueue(myQueue());
	return messageProducer;
}

@Bean
MessageListener<TextPackage> messageListener(){
	return new SimpleListener();
}

@Bean
MessageListener<TextPackage> secondListener(){
	return new SecondListener();
}

@Bean
MessageConsumer<TextPackage> messageConsumer(){
	ThreadingStrategy threadingStrategy = new MultiThreadingStrategy(5);
	MaxRetriesStrategy<TextPackage> maxRetriesStrategy = new MaxRetriesStrategy<TextPackage>(3);
	
	MessageConsumer<TextPackage> messageConsumer = new MessageConsumer<TextPackage>();
	messageConsumer.setQueue(myQueue());
	messageConsumer.setMessageListener(messageListener());
	messageConsumer.setConsumerId("1");
	messageConsumer.setThreadingStrategy(threadingStrategy);
	messageConsumer.setRetryStrategy(maxRetriesStrategy);
	return messageConsumer;
}

@Bean
MessageConsumer<TextPackage> secondConsumer(){
	ThreadingStrategy threadingStrategy = new MultiThreadingStrategy(5);
	MaxRetriesStrategy<TextPackage> maxRetriesStrategy = new MaxRetriesStrategy<TextPackage>(3);
	
	MessageConsumer<TextPackage> messageConsumer = new MessageConsumer<TextPackage>();
	messageConsumer.setQueue(myQueue());
	messageConsumer.setMessageListener(secondListener());
	messageConsumer.setConsumerId("2");
	messageConsumer.setThreadingStrategy(threadingStrategy);
	messageConsumer.setRetryStrategy(maxRetriesStrategy);
	return messageConsumer;
}

@Bean
RedisTemplate redisTemplate(){
	RedisTemplate redisTemplate = new RedisTemplate();
	redisTemplate.setConnectionFactory(jedisConnectionFactory());
	redisTemplate.setKeySerializer(new StringRedisSerializer());
	return redisTemplate;
}

@Autowired
private RedisTemplate redisTemplate;

@Autowired
private MessageProducer<TextPackage> messageProducer;

Listener:
public class SimpleListener implements MessageListener {

private Logger log = LoggerFactory.getLogger(SimpleListener.class);

@Override
public void onMessage(Message<TextPackage> message)
		throws RetryableMessageException {
	// TODO Auto-generated method stub
	TextPackage textPackage = message.getPayload();
	log.info("Getting FIRST message ... "+textPackage.getText());
}

}

Please give me advice, thanks.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.