Comments (51)
I discovered that the InterruptableWaiter was interrupting the await() in callWithRetries, though I don't yet understand that part of the program.
Best, Dan.
from lyra.
This was because the channel.close() in recoverExchangesAndQueues was throwing an AlreadyClosedException, but the catch was on (IOException ignored). I changed it to catch Exception.
In this case though, will the connection continue to be resumed? I assume that there'll be an incoming ShutdownListener to re-trigger the reconnection process?
Best, Dan.
from lyra.
The same thing can, of course, happen with the next channelHandler.recoverChannel(true).
Does
try{
for (ChannelHandler channelHandler : channels.values())
if (channelHandler.canRecover()){
channelHandler.recoverChannel(true);
}
}catch(Exception ignored){}
make sense?
I also added a
if(delegate.isOpen()) listener.onChannelRecovery(proxy);
from lyra.
Alternatively, this could be fixed, potentially more elegantly, by handling the InterruptionException, but I wasn't sure how to distinguish valid interrupts from invalid ones.
If you get a chance, you can see my final code of the day at https://github.com/fakeh/lyra
from lyra.
Hi Dan - Was traveling yesterday so I didn't get to look at this. Looking now...
from lyra.
I need to rethink some of the queue/exchange recovery logic a bit. Working on that along with getting the tests together. This stuff definitely isn't quite ready for release yet :)
from lyra.
Hi there,
I've been testing Lyra with my changes quite intensely overnight, where the connection to AMQP has been broken at random intervals (with 1 to 20 seconds up time followed by 1 to 20 seconds down time) and incoming requests. With Lyra trying to reconnect every 5 seconds, this has previously been enough to reliably produce bugs within a few minutes of operation. With the code currently in my fork the test has been running all night and is still performing well, though I appreciate that my code is quite hacky when compared with yours.
from lyra.
I just pushed a commit that re-works the way exchange/queue/binding recovery is done, along with some tests for these areas. Let me know how this looks against the tests you wrote.
from lyra.
I merged this commit, which returns callWithReplies back to its original state, and reworks the exchange, queue & binding recovery into separate methods in RetryableResource which catch and selectively throw their exceptions.
Exceptions are being thrown which cancel the next connection recovery:
Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.RetryableResource recoverQueue
SEVERE: Failed to recover queue OsgiShellManagementAgent for 0ab0bcfc-5968-436e-b55e-e0085ef9c0d4 via cxn-1
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:768)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11)
at net.jodah.lyra.internal.ResourceDeclaration.invoke(ResourceDeclaration.java:22)
at net.jodah.lyra.internal.RetryableResource.recoverQueue(RetryableResource.java:169)
at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:311)
at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:288)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:260)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:92)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 16 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@5b21409a
Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:768)
at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11)
at net.jodah.lyra.internal.ResourceDeclaration.invoke(ResourceDeclaration.java:22)
at net.jodah.lyra.internal.RetryableResource.recoverQueue(RetryableResource.java:169)
at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:311)
at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:288)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:260)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:92)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
... 16 more
Caused by: java.io.EOFException
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.RetryableResource callWithRetries
SEVERE: Throwable thrown, but ignored
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at net.jodah.lyra.internal.util.concurrent.InterruptableWaiter.await(InterruptableWaiter.java:31)
at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:84)
at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:222)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:245)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:92)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.RetryableResource callWithRetries
INFO: No more retry attempts for cxn-1, because the resource is closed or is not retryable
Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
at java.net.Socket.connect(Socket.java:579)
at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:445)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:504)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:227)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:222)
at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:48)
at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:222)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:245)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:92)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
After which no more connections recovery attempts are made.
Presumably because
public static boolean isCausedByConnectionClosure(Exception e) {
ShutdownSignalException sse = Exceptions.extractCause(e, ShutdownSignalException.class);
return sse != null && Exceptions.isConnectionClosure(sse);
}
returns true.
from lyra.
You can access the merge at https://github.com/fakeh/lyra/tree/MergeWithJHalterman
from lyra.
I tried reproducing this with your merges, but I haven't had any luck yet. Do you have a test you can share or describe what you're doing?
Also, I pushed another commit that is somewhat related to this work which you'll want to merge.
from lyra.
I run the following script, which opens up a port proxying to an AMQP instance. You can run it on the same machine as the client, but you'll need to change the port. Customise the top to set the AMQP host's ip/port and local proxy port.
The port is open for a random period, between 1 and 20 seconds. It's then closed for another 1 to twenty seconds.
Ctrl-C quits.
#!/usr/bin/env bash
ip=192.168.1.251
rport=5672
lport=5672
function cleanup {
rm -f backpipe
lsof -iTCP:5672 -F p | cut -c 2- | xargs kill -9 #kill at the end too as, e.g. ctrl-c won't remove the last one, if one was opened.
}
trap cleanup EXIT #always cleanup when the script exits.
mkfifo backpipe
while true; do
(nc -l $lport 0<backpipe | nc $ip $rport 1>backpipe) &
pid=$!
sleeping=$[ ( $RANDOM % 20 ) + 1 ]s
echo "Killing ${pid} in ${sleeping}"
sleep $sleeping
kill -9 $pid
lsof -iTCP:$lport -sTCP:LISTEN -F p | cut -c 2- | xargs kill -9 #don't know why the above doesn't close the channel.
sleeping=$[ ( $RANDOM % 20 ) + 1 ]s
echo "Opening in ${sleeping}"
sleep $sleeping
done
I've not had to wait more than a few minutes for this to demonstrate the scenario above.
The client itself is configured:
RecoveryPolicy recoveryPolicy = RecoveryPolicies.recoverAlways().withInterval(Duration.seconds(5));
RetryPolicy retryPolicy = RetryPolicies.retryAlways().withInterval(Duration.seconds(5));
Config config = new Config()
.withRecoveryPolicy(recoveryPolicy)
.withRetryPolicy(RetryPolicies.retryNever())
.withConnectRetryPolicy(retryPolicy)
and makes ten or so consumers on one connection/channel, all use the same exchange but their their own queue and ~five bindings each.
Sorry it's not a unit test, whilst I wanted it to be, I couldn't quite figure out testng/your abstractions in a way I thought was productive.
Cheers, Dan.
from lyra.
So I run the shell script and run a client on my box (OSX) - nothing doing. How is the shell script intended to work?
from lyra.
Saving it to a file, giving it chmod +x and executing it doesn't give any output?
You should see "Killing in x seconds", "Opening in x seconds" repeatedly. This means that the port configured at the top of the script is proxying to the IP and remote port configured to the script for up to twenty seconds before the proxy if forcibly closed.
If you're running it on the same machine as the rabbitmq instance, you'll need to change the local port. You'll definitely need to change the IP address
from lyra.
I ran it from the same machine as my Rabbit client which is connected to a remote Rabbit server. The bash script printed output, but didn't effect the Rabbit client at all.
from lyra.
The rabbit client would need to be configured to connect to localhost, as the RabbitMQ instance server. And the IP address in the script set to the IP of the remote RabbitMQ instance.
from lyra.
I like the proxy script, and I was able to reproduce the failure with your fork just as you described (after a few minutes), but I couldn't reproduce it with the current Lyra master after running for about 20 minutes. Perhaps it was a bad merge or some other changes you made? The current Lyra master looks good though.
from lyra.
That's certainly a possibility; I'm new to git, though I was pretty confident I'd got it right. May I ask what part of the master code should handle the case above?
from lyra.
From the stacktrace it looks like connection recovery fails, in which case the next ShutdownListener invocation should be picked up and a new recovery should start.
from lyra.
Closing for now unless this pops up in Lyra master.
from lyra.
It took a while, but the script was able to generate an exception thrown out of callWithRetries with the 0.4.0 release:
java.io.IOException
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:234)
at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:229)
at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:48)
at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:229)
at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:255)
at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:42)
at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:93)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:321)
... 11 more
Caused by: java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:189)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)
The SSE had:
_hardError true
_initiatedByApplication false
_reason SocketException (id=6584)
_ref AMQConnection (id=6585)
from lyra.
Thanks for sharing. Was it the same script you provided previously or were there any changes? Also, how long did it run till the failure occurred?
from lyra.
Yes, the same script.
I encountered an exception thrown out twice so far, once after about ~forty minutes, the next time much longer, more like ~five hours. I was surprised the first time, so didn't note it down.
from lyra.
Could
if(reason instanceof Exception) return true;
be a catch-all solution if added to
private static boolean isRetryable(ShutdownSignalException e) {
?
from lyra.
Yes, I think that would be a catch all. Ideally we don't want to retry on certain exceptions that are fatal though, such as authorization failures, NoRouteToHost, etc. In theory, anything that cannot be resolved with some number of retries, we're better off just throwing than retrying. So I try to differentiate.
from lyra.
Hi Jonathan,
why not consider java.net.SocketException as a potential retryable exceptions ?
from lyra.
@frascuchon I would certainly consider it. What scenario are you seeing this with? Can I reproduce it?
from lyra.
Hi @jhalterman,
I see there is no reconnection when I run a producer in a rabbitmq server restart
from lyra.
I think this is very similar to the problem we were discussing in email regarding questionable proxy behavior, and trying to think of ways to reproduce it, @jhalterman . I managed to write a unit test this morning that reproduces it reliably.
Here's a self-contained Maven project containing a TestNG test... just "mvn test" should be enough to run it.
https://dl.dropboxusercontent.com/u/108601349/badproxy.tgz
The test contains a simple proxy server (that I borrowed/stole from java2s.com and modified a bit) that initially proxies requests to a real AMQP server, then it is switched to a mode of answering connections, waiting a bit, and then hanging up, then switched back to normal. You'll need to configure the correct hostname, port, username and password.
After the test forces a disconnect, Lyra will begin a recovery that will never get anywhere, and it won't retry after the initial connection back to the proxy server completes, even though it will also disconnect.
from lyra.
@spatula75 I tried your test case and I think the reason the connection never recovers is proxy fails to send the server's response to the client's AMQPConnection which is left waiting forever. This occurs when doProxy
is false. I updated the runServer
method to use an additional thread for handling server responses to make sure they're always sent. With this change, the connection to recover just fine:
public void runServer(String host, int remoteport, int localport) throws IOException {
// Create a ServerSocket to listen for connections with
ServerSocket ss = new ServerSocket(localport);
final byte[] request = new byte[1024];
final byte[] reply = new byte[4096];
while (true) {
Socket server = null;
try {
// Wait for a connection on the local port
client = ss.accept();
LOGGER.info("Got a client connection!");
streamFromClient = client.getInputStream();
streamToClient = client.getOutputStream();
// Make a connection to the real server.
// If we cannot connect to the server, send an error to the
// client, disconnect, and continue waiting for connections.
try {
LOGGER.info("Connecting to real server...");
server = new Socket(host, remoteport);
} catch (IOException e) {
PrintWriter out = new PrintWriter(streamToClient);
out.print("Proxy server cannot connect to " + host + ":" + remoteport + ":\n" + e + "\n");
out.flush();
client.close();
continue;
}
// Get server streams.
final InputStream streamFromServer = server.getInputStream();
final OutputStream streamToServer = server.getOutputStream();
// a thread to read the client's requests and pass them
// to the server. A separate thread for asynchronous.
runAsDaemon(new Runnable() {
public void run() {
int bytesRead;
try {
while ((bytesRead = streamFromClient.read(request)) != -1) {
LOGGER.info("Read {} bytes from client", bytesRead);
streamToServer.write(request, 0, bytesRead);
streamToServer.flush();
}
} catch (IOException e) {
LOGGER.error("Exception reading from client", e);
}
// the client closed the connection to us, so close our
// connection to the server.
try {
streamToServer.close();
} catch (IOException e) {
}
}
});
// Read the server's responses
// and pass them back to the client.
Thread t = runAsDaemon(new Runnable() {
public void run() {
int bytesRead;
try {
while ((bytesRead = streamFromServer.read(reply)) != -1) {
LOGGER.info("Read {} bytes from server", bytesRead);
streamToClient.write(reply, 0, bytesRead);
streamToClient.flush();
}
} catch (IOException e) {
LOGGER.error("Exception reading from client", e);
}
// the client closed the connection to us, so close our
// connection to the server.
try {
streamToServer.close();
} catch (IOException e) {
}
}
});
if (!doProxy) {
LOGGER.info("I'm no longer proxying. Waiting and then hanging up.");
Thread.sleep(20000);
LOGGER.info("Goodbye, client!");
client.close();
continue;
}
t.join();
} catch (IOException e) {
LOGGER.error("IOException", e);
} catch (InterruptedException e) {
LOGGER.error("Interrupted", e);
} finally {
try {
if (server != null)
server.close();
if (client != null)
client.close();
} catch (IOException e) {
}
}
}
}
private Thread runAsDaemon(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.start();
return t;
}
I understand that reproducing a failure like this in a test can be hard - but if you think there's still an outstanding issue please do see if you can hit it.
from lyra.
Yeah, that's the point of the test case: when the proxy is no longer proxying (and we disconnect and reconnect), even after it starts proxying again, we never attempt a reconnect again, despite the first reconnect failing. It's a situation we see in the Amazon EC2 world from time to time when it thinks it has nowhere to send an AMQP connection, so the ELB answers, does nothing, and hangs up. Then, rather than reconnecting according to the policy, no further reconnect attempts are made, even if the ELB starts working again.
from lyra.
This makes sense - I just had to get my head back into what this was all about :) I'm wondering if there's much we can do about this right now. See the stacktrace from a paused recovery thread:
Thread [lyra-recovery-1] (Suspended)
waiting for: BlockingValueOrException<V,E> (id=28)
Object.wait(long) line: not available [native method]
BlockingValueOrException<V,E>(Object).wait() line: 503
BlockingValueOrException<V,E>(BlockingCell<T>).get() line: 50
BlockingValueOrException<V,E>(BlockingCell<T>).uninterruptibleGet() line: 89
BlockingValueOrException<V,E>.uninterruptibleGetValue() line: 33
AMQChannel$SimpleBlockingRpcContinuation(AMQChannel$BlockingRpcContinuation<T>).getReply() line: 343
AMQConnection.start() line: 313
ConnectionFactory.newConnection(ExecutorService, Address[]) line: 516
ConnectionHandler$3.call() line: 243
ConnectionHandler$3.call() line: 236
ConnectionHandler(RetryableResource).callWithRetries(Callable<T>, RecurringPolicy<?>, RecurringStats, Set<Class<Exception>>, boolean, boolean) line: 51
ConnectionHandler.createConnection(RecurringPolicy<?>, Set<Class<Exception>>, boolean) line: 236
ConnectionHandler.recoverConnection() line: 271
ConnectionHandler.access$100(ConnectionHandler) line: 41
ConnectionHandler$ConnectionShutdownListener$1.run() line: 95
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1110
ThreadPoolExecutor$Worker.run() line: 603
Thread.run() line: 722
Lyra calls the amqp-client library to create a new connection, which is successfully created internally, but the subsequent attempt to read hangs forever since the proxy is disabled. The connection timeout (which you set to 10 seconds) doesn't come into play here since it only effects the initial TCP connection, not the subsequent read attempt.
Aside from this test - any idea why ELB might accept connections but not send any data? That seems to be the killer thing here. If the ELB stops accepting connections - fine, we should be able to recover from that. If it takes connections but doesn't communicate, as this test does, that's a harder problem.
from lyra.
Yeah, unfortunately what happens is the ELB always accepts a connection, regardless of whether it believes it has any healthy AMQP servers to which it can proxy. Then, if it doesn't believe it has any healthy servers right now, it will just drop the connection again, so you get this connect/disconnect behavior, instead of connecting to AMQP like you normally would, or having a connection-refused situation like you would if AMQP were unavailable.
We ran into this when we were doing some failover testing, because ELB will very quickly decide that an AMQP instance is "unhealthy", but it takes comparatively longer for it to decide that it's "healthy" again. So when we were failing from one server to the next, to the next, it was marking them each unhealthy, so by the time we got to the last one in the list, even though the first several were actually back online, the ELB wouldn't send traffic back to them and would just drop the connection instead.
from lyra.
Ok - Your test seems to capture that behavior well then. I'm guessing this issue could effect any amqp-client connection though, not just recovery attempts. It's probably just that it pops up during recovery since that's when the ELB scenario occurs.
Perhaps @michaelklishin could weigh in on this. Michael - basically the problem is that when rabbitmq connections are proxied through ELB, ELB might accept a TCP connection but not respond to the initial handshake which leaves the client hanging forever. See the call stack 2 posts up. The first idea that comes to my mind is that everything that happens inside AMQConnection.start()
should be covered by the connection timeout setting. Thoughts?
from lyra.
I won't swear to this, but I think when I was playing with this, I did try it with just a bare AMQP connection and saw it time out when the heartbeat interval had passed because the handshake didn't complete, but it has been a little while since I was toying with it. I think in that case, I just started up netcat on 5671 and tried to connect to a connection that didn't do anything, and I believe it did throw an exception as expected. It would be a useful thing to verify though, because my memory is like a sieve.
from lyra.
@spatula75 Do you know if ELB eventually closes the client connection or how long it takes?
from lyra.
The behavior varies. The first time, it seems to take about 20 seconds before the connection gets closed. After that, it's more of an immediate connect/close.
from lyra.
Folks, line numbers in the stack trace don't match master or 3.5.2, AFAICT. If you have a way to reproduce this reasonably quickly, can you please try with 3.5.2 (the issue is likely there, I just want to see an up-to-date trace). Thank you!
from lyra.
@michaelklishin Sure, here's a stacktrace taken with 3.5.2:
Thread [lyra-recovery-1] (Suspended)
waiting for: BlockingValueOrException<V,E> (id=28)
Object.wait(long) line: not available [native method]
BlockingValueOrException<V,E>(Object).wait() line: 503
BlockingValueOrException<V,E>(BlockingCell<T>).get() line: 50
BlockingValueOrException<V,E>(BlockingCell<T>).uninterruptibleGet() line: 89
BlockingValueOrException<V,E>.uninterruptibleGetValue() line: 33
AMQChannel$SimpleBlockingRpcContinuation(AMQChannel$BlockingRpcContinuation<T>).getReply() line: 348
AMQConnection.start() line: 294
ConnectionFactory.newConnection(ExecutorService, Address[]) line: 603
ConnectionHandler$3.call() line: 243
ConnectionHandler$3.call() line: 236
ConnectionHandler(RetryableResource).callWithRetries(Callable<T>, RecurringPolicy<?>, RecurringStats, Set<Class<Exception>>, boolean, boolean) line: 51
ConnectionHandler.createConnection(RecurringPolicy<?>, Set<Class<Exception>>, boolean) line: 236
ConnectionHandler.recoverConnection() line: 271
ConnectionHandler.access$100(ConnectionHandler) line: 41
ConnectionHandler$ConnectionShutdownListener$1.run() line: 95
ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1110
ThreadPoolExecutor$Worker.run() line: 603
Thread.run() line: 722
from lyra.
Should be fixed in rabbitmq/rabbitmq-java-client#65, will be in 3.5.3
(scheduled in 4 weeks).
from lyra.
Awesome!
from lyra.
@spatula75 Here's some log output running your test against Michael's changes along with a few tweaks in Lyra:
[TestNG] Running:
/private/var/folders/my/jry08ttx4_l7_5gpjmv42zcc0000gn/T/testng-eclipse-1758990684/testng-customsuite.xml
INFO [2015-05-13 16:10:11,905] ConnectionHandler - Creating connection cxn-1 to [localhost:55672]
INFO [2015-05-13 16:10:11,907] RudeProxyTest - Got a client connection!
INFO [2015-05-13 16:10:11,907] RudeProxyTest - Connecting to real server...
INFO [2015-05-13 16:10:11,921] RudeProxyTest - Read 8 bytes from client
INFO [2015-05-13 16:10:11,927] RudeProxyTest - Read 468 bytes from server
INFO [2015-05-13 16:10:11,982] RudeProxyTest - Read 382 bytes from client
INFO [2015-05-13 16:10:11,991] RudeProxyTest - Read 20 bytes from server
INFO [2015-05-13 16:10:11,996] RudeProxyTest - Read 20 bytes from client
INFO [2015-05-13 16:10:11,996] RudeProxyTest - Read 16 bytes from client
INFO [2015-05-13 16:10:12,042] RudeProxyTest - Read 13 bytes from server
INFO [2015-05-13 16:10:12,042] ConnectionHandler - Created connection cxn-1 to amqp://127.0.0.1:55672/
INFO [2015-05-13 16:10:12,050] RudeProxyTest - Read 13 bytes from client
INFO [2015-05-13 16:10:12,051] RudeProxyTest - Read 16 bytes from server
INFO [2015-05-13 16:10:12,073] ConnectionHandler - Created channel-1 on cxn-1
INFO [2015-05-13 16:10:12,075] RudeProxyTest - Read 30 bytes from client
INFO [2015-05-13 16:10:12,080] RudeProxyTest - Read 12 bytes from server
INFO [2015-05-13 16:10:12,082] RudeProxyTest - Read 20 bytes from client
INFO [2015-05-13 16:10:12,092] RudeProxyTest - Read 51 bytes from server
INFO [2015-05-13 16:10:12,094] RudeProxyTest - Read 57 bytes from client
INFO [2015-05-13 16:10:12,101] RudeProxyTest - Read 12 bytes from server
INFO [2015-05-13 16:10:12,105] RudeProxyTest - Read 51 bytes from client
INFO [2015-05-13 16:10:12,112] RudeProxyTest - Read 44 bytes from server
INFO [2015-05-13 16:10:12,115] ChannelHandler - Created consumer-amq.ctag-Nc4IAzpZDDF0KZDaNOUcxQ of amq.gen-Ja861vZAtjHxpfGQNyRApg via channel-1 on cxn-1
INFO [2015-05-13 16:10:12,117] RudeProxyTest - Read 55 bytes from client
INFO [2015-05-13 16:10:12,121] RudeProxyTest - Read 93 bytes from server
INFO [2015-05-13 16:10:12,123] RudeProxyTest - Consumed derp
ERROR [2015-05-13 16:10:12,126] RudeProxyTest - Exception reading from client
java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:150)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
at java.net.SocketInputStream.read(SocketInputStream.java:107)
at vc.bjn.badproxy.RudeProxyTest$3.run(RudeProxyTest.java:165)
ERROR [2015-05-13 16:10:12,127] ChannelHandler - Channel channel-1 on cxn-1 was closed unexpectedly
ERROR [2015-05-13 16:10:12,129] ConnectionHandler - Connection cxn-1 was closed unexpectedly
INFO [2015-05-13 16:10:12,131] ConnectionHandler - Recovering connection cxn-1 to [localhost:55672]
INFO [2015-05-13 16:10:12,131] RudeProxyTest - Got a client connection!
INFO [2015-05-13 16:10:12,131] RudeProxyTest - I'm no longer proxying. Waiting and then hanging up.
INFO [2015-05-13 16:10:19,140] ConnectionHandler - Recovering connection cxn-1 to [localhost:55672]
INFO [2015-05-13 16:10:28,147] ConnectionHandler - Recovering connection cxn-1 to [localhost:55672]
INFO [2015-05-13 16:10:32,133] RudeProxyTest - Goodbye, client!
INFO [2015-05-13 16:10:32,134] RudeProxyTest - Got a client connection!
INFO [2015-05-13 16:10:32,134] RudeProxyTest - Connecting to real server...
INFO [2015-05-13 16:10:32,140] RudeProxyTest - Read 8 bytes from client
INFO [2015-05-13 16:10:32,141] RudeProxyTest - Got a client connection!
INFO [2015-05-13 16:10:32,141] RudeProxyTest - Connecting to real server...
INFO [2015-05-13 16:10:32,142] RudeProxyTest - Read 8 bytes from client
INFO [2015-05-13 16:10:32,146] RudeProxyTest - Read 468 bytes from server
INFO [2015-05-13 16:10:32,147] RudeProxyTest - Read 382 bytes from client
INFO [2015-05-13 16:10:32,149] RudeProxyTest - Read 20 bytes from server
INFO [2015-05-13 16:10:32,149] RudeProxyTest - Read 20 bytes from client
INFO [2015-05-13 16:10:32,149] RudeProxyTest - Read 16 bytes from client
INFO [2015-05-13 16:10:32,182] RudeProxyTest - Read 13 bytes from server
INFO [2015-05-13 16:10:32,182] ConnectionHandler - Recovered connection cxn-1 to amqp://127.0.0.1:55672/
INFO [2015-05-13 16:10:32,183] ConnectionHandler - Recovering exchange test via cxn-1
INFO [2015-05-13 16:10:32,183] RudeProxyTest - Read 13 bytes from client
INFO [2015-05-13 16:10:32,191] RudeProxyTest - Read 16 bytes from server
INFO [2015-05-13 16:10:32,192] RudeProxyTest - Read 30 bytes from client
INFO [2015-05-13 16:10:32,194] RudeProxyTest - Read 12 bytes from server
INFO [2015-05-13 16:10:32,198] RudeProxyTest - Read 20 bytes from client
INFO [2015-05-13 16:10:32,201] RudeProxyTest - Read 51 bytes from server
INFO [2015-05-13 16:10:32,201] ConnectionHandler - Recovered queue amq.gen-Ja861vZAtjHxpfGQNyRApg as amq.gen-x__GTOc1NfEOAyrs-hTO3g via cxn-1
INFO [2015-05-13 16:10:32,202] ConnectionHandler - Recovering queue binding from test to amq.gen-x__GTOc1NfEOAyrs-hTO3g with # via cxn-1
INFO [2015-05-13 16:10:32,202] RudeProxyTest - Read 57 bytes from client
INFO [2015-05-13 16:10:32,211] RudeProxyTest - Read 12 bytes from server
INFO [2015-05-13 16:10:32,212] RudeProxyTest - Read 21 bytes from client
INFO [2015-05-13 16:10:32,221] RudeProxyTest - Read 12 bytes from server
INFO [2015-05-13 16:10:32,222] ChannelHandler - Recovering channel-1 on cxn-1
INFO [2015-05-13 16:10:32,222] RudeProxyTest - Read 13 bytes from client
INFO [2015-05-13 16:10:32,230] RudeProxyTest - Read 16 bytes from server
INFO [2015-05-13 16:10:32,230] ChannelHandler - Recovered channel-1 on cxn-1
INFO [2015-05-13 16:10:32,230] ChannelHandler - Recovering consumer-amq.ctag-Nc4IAzpZDDF0KZDaNOUcxQ of amq.gen-x__GTOc1NfEOAyrs-hTO3g via channel-1 on cxn-1
INFO [2015-05-13 16:10:32,231] RudeProxyTest - Read 51 bytes from client
INFO [2015-05-13 16:10:32,240] RudeProxyTest - Read 44 bytes from server
This looks pretty good to me. I'll make a new release of Lyra sometime soon which will work with the changes to come in the amqp-client 3.5.3. You should be able to build and use either in the meantime if you wish.
from lyra.
Agreed, this looks like exactly the behavior we would expect: more than one attempt to recover, and once the proxy switches back into "actually proxying" mode, recovery succeeds and things return to normal.
from lyra.
Hi again!
I also occasionally see blocking in the BVOE class and have written some messages to the RMQ mailing list about it:
- http://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2014-February/034081.html
- https://groups.google.com/d/msg/rabbitmq-users/MWxAkY7Hsko/u6Q74ICrdpUJ
Reducing contention on the Channel reduced the occurrence.
from lyra.
Happy to report that using amqp-client 3.5.3 (released last week), RudeProxyTest passes.
from lyra.
I just released Lyra 0.5.1
which includes the TimeoutException stuff to correspond with the 3.5.3 release.
from lyra.
Awesome! Any idea how long it normally takes things to propagate to maven central?
from lyra.
I think it's usually within an hour...
from lyra.
👍
from lyra.
Lyra 0.5.1
still depends on amqp-client
3.5.1
in the pom. Were you going to increase that to 3.5.3
? https://github.com/jhalterman/lyra/blob/lyra-0.5.1/pom.xml#L46
from lyra.
Sigh... I suck.
Let's try this again - 0.5.2 - should be in Maven central shortly.
from lyra.
Related Issues (20)
- Reconnection after RabbitMQ shutdown is not successful HOT 5
- when java.net.UnknownHostException is thrown the recovery failed
- What about moving to the latest 'amqp-client' version?
- Forgotten copying of automaticRecovery in ConnectionOptions.copy() HOT 1
- Documentation on ConnectionFactory used in ConnectionOptions HOT 2
- support withUri() in ConnectionOptions HOT 5
- Moving to latest RabbitMQ Java Client 4.0 HOT 5
- Why to copy ConnectionFactory on each create HOT 7
- No communication after recovery HOT 4
- Consumers not getting messages after broker restart HOT 8
- Connection name is not passed HOT 8
- RabbitMQ flooded by new connections HOT 5
- Lyra recover mechanism can prevent from the JVM to shutdown HOT 6
- Get rid of static executor service HOT 2
- Rapid growth of connections when network conditions are unreliable HOT 3
- retryableExceptions configuration and implementation HOT 3
- Use a `Predicate` for retry exception strategy
- Getting Rabbitmq HeartBeat Exception HOT 3
- Lyra is no longer under active development. HOT 1
- Connection recovery fails and gives up even though recovery policy is recoverAlways() HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from lyra.