Assume that we have three microservices, say A, B and C. The microservice A declares a queue, say QUEUE_1 and performs a basicPublish(...)
. B calls basicConsume(...)
to retrieve the messages from QUEUE_1 and then uses another queue: QUEUE_2 to publish a message to C. Finally, C consumes the message from QUEUE_2.
and came up with some solutions which doesn't work as intended. What I basically need is that just like the commonly used distributed tracing in microservice communications, is it possible to get a trace (traceID) associated with a certain send and receive in RabbitMQ for a certain workflow A -> B -> C?
@Service
public class B {
public static String RECV_QUEUE_NAME = "QUEUE_1";
public static String SEND_QUEUE_NAME = "QUEUE_2";
@Autowired
private JaegerTracer tracer;
public boolean sendMessage(String message) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
boolean status = true;
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
TracingChannel tracingChannel = new TracingChannel(channel, tracer);
try {
tracingChannel.queueDeclare(SEND_QUEUE_NAME, true, false, true, null);
tracingChannel.basicPublish("", SEND_QUEUE_NAME, null, message.getBytes("UTF-8"));
} catch (Exception e) {
status = false;
}
return status;
}
public void consumeAll(List<String> res) throws Exception {
Span span = tracer.buildSpan("B-ConsumeAll").start();
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
TracingChannel tracingChannel = new TracingChannel(channel, tracer);
tracingChannel.queueDeclare(RECV_QUEUE_NAME, true, false, true, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
res.add(message);
try {
sendMessage( ("SENDING AGAIN : " + message));
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(" [x] B Received '" + message + "'");
};
tracingChannel.basicConsume(RECV_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
span.finish();
}
}
@Service
public class RabbitService {
public static String RECV_QUEUE_NAME = "QUEUE_2";
@Autowired
private JaegerTracer tracer;
public void consumeAll(List<String> res) throws Exception {
Span span = tracer.buildSpan("C-ConsumeAll").start();
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
TracingChannel tracingChannel = new TracingChannel(channel, tracer);
tracingChannel.queueDeclare(RECV_QUEUE_NAME, true, false, true, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
res.add(message);
System.out.println(" [x] C Received '" + message + "'");
};
tracingChannel.basicConsume(RECV_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
span.finish();
}
}