Coder Social home page Coder Social logo

Comments (51)

fakeh avatar fakeh commented on August 16, 2024

I discovered that the InterruptableWaiter was interrupting the await() in callWithRetries, though I don't yet understand that part of the program.
Best, Dan.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

This was because the channel.close() in recoverExchangesAndQueues was throwing an AlreadyClosedException, but the catch was on (IOException ignored). I changed it to catch Exception.
In this case though, will the connection continue to be resumed? I assume that there'll be an incoming ShutdownListener to re-trigger the reconnection process?

Best, Dan.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

The same thing can, of course, happen with the next channelHandler.recoverChannel(true).

Does

        try{
            for (ChannelHandler channelHandler : channels.values())
                if (channelHandler.canRecover()){
                    channelHandler.recoverChannel(true);
                }
        }catch(Exception ignored){}

make sense?

I also added a

                if(delegate.isOpen()) listener.onChannelRecovery(proxy);

from lyra.

fakeh avatar fakeh commented on August 16, 2024

Alternatively, this could be fixed, potentially more elegantly, by handling the InterruptionException, but I wasn't sure how to distinguish valid interrupts from invalid ones.

If you get a chance, you can see my final code of the day at https://github.com/fakeh/lyra

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

Hi Dan - Was traveling yesterday so I didn't get to look at this. Looking now...

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

I need to rethink some of the queue/exchange recovery logic a bit. Working on that along with getting the tests together. This stuff definitely isn't quite ready for release yet :)

from lyra.

fakeh avatar fakeh commented on August 16, 2024

Hi there,
I've been testing Lyra with my changes quite intensely overnight, where the connection to AMQP has been broken at random intervals (with 1 to 20 seconds up time followed by 1 to 20 seconds down time) and incoming requests. With Lyra trying to reconnect every 5 seconds, this has previously been enough to reliably produce bugs within a few minutes of operation. With the code currently in my fork the test has been running all night and is still performing well, though I appreciate that my code is quite hacky when compared with yours.

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

I just pushed a commit that re-works the way exchange/queue/binding recovery is done, along with some tests for these areas. Let me know how this looks against the tests you wrote.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

I merged this commit, which returns callWithReplies back to its original state, and reworks the exchange, queue & binding recovery into separate methods in RetryableResource which catch and selectively throw their exceptions.

Exceptions are being thrown which cancel the next connection recovery:

Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.RetryableResource recoverQueue
SEVERE: Failed to recover queue OsgiShellManagementAgent for 0ab0bcfc-5968-436e-b55e-e0085ef9c0d4 via cxn-1
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:768)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11)
    at net.jodah.lyra.internal.ResourceDeclaration.invoke(ResourceDeclaration.java:22)
    at net.jodah.lyra.internal.RetryableResource.recoverQueue(RetryableResource.java:169)
    at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:311)
    at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:288)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:260)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:92)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 16 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.ConnectionHandler$3 call
INFO: Recovering connection cxn-1 to [Lcom.rabbitmq.client.Address;@5b21409a
Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:768)
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61)
    at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:601)
    at net.jodah.lyra.internal.util.Reflection.invoke(Reflection.java:11)
    at net.jodah.lyra.internal.ResourceDeclaration.invoke(ResourceDeclaration.java:22)
    at net.jodah.lyra.internal.RetryableResource.recoverQueue(RetryableResource.java:169)
    at net.jodah.lyra.internal.ConnectionHandler.recoverQueues(ConnectionHandler.java:311)
    at net.jodah.lyra.internal.ConnectionHandler.recoverExchangesAndQueues(ConnectionHandler.java:288)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:260)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:92)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 16 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.RetryableResource callWithRetries
SEVERE: Throwable thrown, but ignored
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at net.jodah.lyra.internal.util.concurrent.InterruptableWaiter.await(InterruptableWaiter.java:31)
    at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:84)
    at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:222)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:245)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:92)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.RetryableResource callWithRetries
INFO: No more retry attempts for cxn-1, because the resource is closed or is not retryable
Feb 01, 2014 11:53:47 AM net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1 run
SEVERE: Failed to recover connection cxn-1
java.net.ConnectException: Connection refused
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:391)
    at java.net.Socket.connect(Socket.java:579)
    at com.rabbitmq.client.ConnectionFactory.createFrameHandler(ConnectionFactory.java:445)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:504)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:227)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:222)
    at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:48)
    at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:222)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:245)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:41)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:92)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)

After which no more connections recovery attempts are made.

Presumably because

public static boolean isCausedByConnectionClosure(Exception e) {
    ShutdownSignalException sse = Exceptions.extractCause(e, ShutdownSignalException.class);
    return sse != null && Exceptions.isConnectionClosure(sse);
  }

returns true.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

You can access the merge at https://github.com/fakeh/lyra/tree/MergeWithJHalterman

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

I tried reproducing this with your merges, but I haven't had any luck yet. Do you have a test you can share or describe what you're doing?

Also, I pushed another commit that is somewhat related to this work which you'll want to merge.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

I run the following script, which opens up a port proxying to an AMQP instance. You can run it on the same machine as the client, but you'll need to change the port. Customise the top to set the AMQP host's ip/port and local proxy port.

The port is open for a random period, between 1 and 20 seconds. It's then closed for another 1 to twenty seconds.

Ctrl-C quits.

#!/usr/bin/env bash
ip=192.168.1.251
rport=5672
lport=5672

function cleanup {
        rm -f backpipe
        lsof -iTCP:5672 -F p | cut -c 2- | xargs kill -9 #kill at the end too as, e.g. ctrl-c won't remove the last one, if one was opened.
}
trap cleanup EXIT #always cleanup when the script exits.
mkfifo backpipe

while true; do
        (nc -l $lport 0<backpipe | nc $ip $rport 1>backpipe) &
        pid=$!
        sleeping=$[ ( $RANDOM % 20 )  + 1 ]s
        echo "Killing ${pid} in ${sleeping}"
        sleep $sleeping
        kill -9 $pid
        lsof -iTCP:$lport -sTCP:LISTEN -F p | cut -c 2- | xargs kill -9 #don't know why the above doesn't close the channel.
        sleeping=$[ ( $RANDOM % 20 )  + 1 ]s
        echo "Opening in ${sleeping}"
        sleep $sleeping
done

I've not had to wait more than a few minutes for this to demonstrate the scenario above.

The client itself is configured:


        RecoveryPolicy recoveryPolicy = RecoveryPolicies.recoverAlways().withInterval(Duration.seconds(5));
        RetryPolicy retryPolicy = RetryPolicies.retryAlways().withInterval(Duration.seconds(5));

        Config config = new Config()
            .withRecoveryPolicy(recoveryPolicy)
            .withRetryPolicy(RetryPolicies.retryNever())
            .withConnectRetryPolicy(retryPolicy)

and makes ten or so consumers on one connection/channel, all use the same exchange but their their own queue and ~five bindings each.

Sorry it's not a unit test, whilst I wanted it to be, I couldn't quite figure out testng/your abstractions in a way I thought was productive.

Cheers, Dan.

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

So I run the shell script and run a client on my box (OSX) - nothing doing. How is the shell script intended to work?

from lyra.

fakeh avatar fakeh commented on August 16, 2024

Saving it to a file, giving it chmod +x and executing it doesn't give any output?

You should see "Killing in x seconds", "Opening in x seconds" repeatedly. This means that the port configured at the top of the script is proxying to the IP and remote port configured to the script for up to twenty seconds before the proxy if forcibly closed.

If you're running it on the same machine as the rabbitmq instance, you'll need to change the local port. You'll definitely need to change the IP address

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

I ran it from the same machine as my Rabbit client which is connected to a remote Rabbit server. The bash script printed output, but didn't effect the Rabbit client at all.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

The rabbit client would need to be configured to connect to localhost, as the RabbitMQ instance server. And the IP address in the script set to the IP of the remote RabbitMQ instance.

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

I like the proxy script, and I was able to reproduce the failure with your fork just as you described (after a few minutes), but I couldn't reproduce it with the current Lyra master after running for about 20 minutes. Perhaps it was a bad merge or some other changes you made? The current Lyra master looks good though.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

That's certainly a possibility; I'm new to git, though I was pretty confident I'd got it right. May I ask what part of the master code should handle the case above?

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

From the stacktrace it looks like connection recovery fails, in which case the next ShutdownListener invocation should be picked up and a new recovery should start.

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

Closing for now unless this pops up in Lyra master.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

It took a while, but the script was able to generate an exception thrown out of callWithRetries with the 0.4.0 release:

java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:234)
    at net.jodah.lyra.internal.ConnectionHandler$3.call(ConnectionHandler.java:229)
    at net.jodah.lyra.internal.RetryableResource.callWithRetries(RetryableResource.java:48)
    at net.jodah.lyra.internal.ConnectionHandler.createConnection(ConnectionHandler.java:229)
    at net.jodah.lyra.internal.ConnectionHandler.recoverConnection(ConnectionHandler.java:255)
    at net.jodah.lyra.internal.ConnectionHandler.access$100(ConnectionHandler.java:42)
    at net.jodah.lyra.internal.ConnectionHandler$ConnectionShutdownListener$1.run(ConnectionHandler.java:93)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1061)
    at edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:575)
    at java.lang.Thread.run(Thread.java:722)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:321)
    ... 11 more
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:189)
    at java.net.SocketInputStream.read(SocketInputStream.java:121)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

The SSE had:
_hardError true
_initiatedByApplication false
_reason SocketException (id=6584)
_ref AMQConnection (id=6585)

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

Thanks for sharing. Was it the same script you provided previously or were there any changes? Also, how long did it run till the failure occurred?

from lyra.

fakeh avatar fakeh commented on August 16, 2024

Yes, the same script.
I encountered an exception thrown out twice so far, once after about ~forty minutes, the next time much longer, more like ~five hours. I was surprised the first time, so didn't note it down.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

Could

    if(reason instanceof Exception) return true;

be a catch-all solution if added to

  private static boolean isRetryable(ShutdownSignalException e) {

?

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

Yes, I think that would be a catch all. Ideally we don't want to retry on certain exceptions that are fatal though, such as authorization failures, NoRouteToHost, etc. In theory, anything that cannot be resolved with some number of retries, we're better off just throwing than retrying. So I try to differentiate.

from lyra.

frascuchon avatar frascuchon commented on August 16, 2024

Hi Jonathan,

why not consider java.net.SocketException as a potential retryable exceptions ?

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

@frascuchon I would certainly consider it. What scenario are you seeing this with? Can I reproduce it?

from lyra.

frascuchon avatar frascuchon commented on August 16, 2024

Hi @jhalterman,

I see there is no reconnection when I run a producer in a rabbitmq server restart

from lyra.

spatula75 avatar spatula75 commented on August 16, 2024

I think this is very similar to the problem we were discussing in email regarding questionable proxy behavior, and trying to think of ways to reproduce it, @jhalterman . I managed to write a unit test this morning that reproduces it reliably.

Here's a self-contained Maven project containing a TestNG test... just "mvn test" should be enough to run it.

https://dl.dropboxusercontent.com/u/108601349/badproxy.tgz

The test contains a simple proxy server (that I borrowed/stole from java2s.com and modified a bit) that initially proxies requests to a real AMQP server, then it is switched to a mode of answering connections, waiting a bit, and then hanging up, then switched back to normal. You'll need to configure the correct hostname, port, username and password.

After the test forces a disconnect, Lyra will begin a recovery that will never get anywhere, and it won't retry after the initial connection back to the proxy server completes, even though it will also disconnect.

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

@spatula75 I tried your test case and I think the reason the connection never recovers is proxy fails to send the server's response to the client's AMQPConnection which is left waiting forever. This occurs when doProxy is false. I updated the runServer method to use an additional thread for handling server responses to make sure they're always sent. With this change, the connection to recover just fine:

  public void runServer(String host, int remoteport, int localport) throws IOException {
    // Create a ServerSocket to listen for connections with
    ServerSocket ss = new ServerSocket(localport);

    final byte[] request = new byte[1024];
    final byte[] reply = new byte[4096];

    while (true) {
      Socket server = null;
      try {
        // Wait for a connection on the local port
        client = ss.accept();

        LOGGER.info("Got a client connection!");

        streamFromClient = client.getInputStream();
        streamToClient = client.getOutputStream();

        // Make a connection to the real server.
        // If we cannot connect to the server, send an error to the
        // client, disconnect, and continue waiting for connections.
        try {
          LOGGER.info("Connecting to real server...");
          server = new Socket(host, remoteport);
        } catch (IOException e) {
          PrintWriter out = new PrintWriter(streamToClient);
          out.print("Proxy server cannot connect to " + host + ":" + remoteport + ":\n" + e + "\n");
          out.flush();
          client.close();
          continue;
        }

        // Get server streams.
        final InputStream streamFromServer = server.getInputStream();
        final OutputStream streamToServer = server.getOutputStream();

        // a thread to read the client's requests and pass them
        // to the server. A separate thread for asynchronous.
        runAsDaemon(new Runnable() {
          public void run() {
            int bytesRead;
            try {
              while ((bytesRead = streamFromClient.read(request)) != -1) {
                LOGGER.info("Read {} bytes from client", bytesRead);
                streamToServer.write(request, 0, bytesRead);
                streamToServer.flush();
              }
            } catch (IOException e) {
              LOGGER.error("Exception reading from client", e);
            }

            // the client closed the connection to us, so close our
            // connection to the server.
            try {
              streamToServer.close();
            } catch (IOException e) {
            }
          }
        });

        // Read the server's responses
        // and pass them back to the client.
        Thread t = runAsDaemon(new Runnable() {
          public void run() {
            int bytesRead;
            try {
              while ((bytesRead = streamFromServer.read(reply)) != -1) {
                LOGGER.info("Read {} bytes from server", bytesRead);
                streamToClient.write(reply, 0, bytesRead);
                streamToClient.flush();
              }
            } catch (IOException e) {
              LOGGER.error("Exception reading from client", e);
            }

            // the client closed the connection to us, so close our
            // connection to the server.
            try {
              streamToServer.close();
            } catch (IOException e) {
            }
          }
        });

        if (!doProxy) {
          LOGGER.info("I'm no longer proxying. Waiting and then hanging up.");
          Thread.sleep(20000);
          LOGGER.info("Goodbye, client!");
          client.close();
          continue;
        }

        t.join();
      } catch (IOException e) {
        LOGGER.error("IOException", e);
      } catch (InterruptedException e) {
        LOGGER.error("Interrupted", e);
      } finally {
        try {
          if (server != null)
            server.close();
          if (client != null)
            client.close();
        } catch (IOException e) {
        }
      }
    }
  }

  private Thread runAsDaemon(Runnable r) {
    Thread t = new Thread(r);
    t.setDaemon(true);
    t.start();
    return t;
  }

I understand that reproducing a failure like this in a test can be hard - but if you think there's still an outstanding issue please do see if you can hit it.

from lyra.

spatula75 avatar spatula75 commented on August 16, 2024

Yeah, that's the point of the test case: when the proxy is no longer proxying (and we disconnect and reconnect), even after it starts proxying again, we never attempt a reconnect again, despite the first reconnect failing. It's a situation we see in the Amazon EC2 world from time to time when it thinks it has nowhere to send an AMQP connection, so the ELB answers, does nothing, and hangs up. Then, rather than reconnecting according to the policy, no further reconnect attempts are made, even if the ELB starts working again.

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

This makes sense - I just had to get my head back into what this was all about :) I'm wondering if there's much we can do about this right now. See the stacktrace from a paused recovery thread:

Thread [lyra-recovery-1] (Suspended)    
    waiting for: BlockingValueOrException<V,E>  (id=28) 
    Object.wait(long) line: not available [native method]   
    BlockingValueOrException<V,E>(Object).wait() line: 503  
    BlockingValueOrException<V,E>(BlockingCell<T>).get() line: 50   
    BlockingValueOrException<V,E>(BlockingCell<T>).uninterruptibleGet() line: 89    
    BlockingValueOrException<V,E>.uninterruptibleGetValue() line: 33    
    AMQChannel$SimpleBlockingRpcContinuation(AMQChannel$BlockingRpcContinuation<T>).getReply() line: 343    
    AMQConnection.start() line: 313 
    ConnectionFactory.newConnection(ExecutorService, Address[]) line: 516   
    ConnectionHandler$3.call() line: 243    
    ConnectionHandler$3.call() line: 236    
    ConnectionHandler(RetryableResource).callWithRetries(Callable<T>, RecurringPolicy<?>, RecurringStats, Set<Class<Exception>>, boolean, boolean) line: 51 
    ConnectionHandler.createConnection(RecurringPolicy<?>, Set<Class<Exception>>, boolean) line: 236    
    ConnectionHandler.recoverConnection() line: 271 
    ConnectionHandler.access$100(ConnectionHandler) line: 41    
    ConnectionHandler$ConnectionShutdownListener$1.run() line: 95   
    ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1110  
    ThreadPoolExecutor$Worker.run() line: 603   
    Thread.run() line: 722  

Lyra calls the amqp-client library to create a new connection, which is successfully created internally, but the subsequent attempt to read hangs forever since the proxy is disabled. The connection timeout (which you set to 10 seconds) doesn't come into play here since it only effects the initial TCP connection, not the subsequent read attempt.

Aside from this test - any idea why ELB might accept connections but not send any data? That seems to be the killer thing here. If the ELB stops accepting connections - fine, we should be able to recover from that. If it takes connections but doesn't communicate, as this test does, that's a harder problem.

from lyra.

spatula75 avatar spatula75 commented on August 16, 2024

Yeah, unfortunately what happens is the ELB always accepts a connection, regardless of whether it believes it has any healthy AMQP servers to which it can proxy. Then, if it doesn't believe it has any healthy servers right now, it will just drop the connection again, so you get this connect/disconnect behavior, instead of connecting to AMQP like you normally would, or having a connection-refused situation like you would if AMQP were unavailable.

We ran into this when we were doing some failover testing, because ELB will very quickly decide that an AMQP instance is "unhealthy", but it takes comparatively longer for it to decide that it's "healthy" again. So when we were failing from one server to the next, to the next, it was marking them each unhealthy, so by the time we got to the last one in the list, even though the first several were actually back online, the ELB wouldn't send traffic back to them and would just drop the connection instead.

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

Ok - Your test seems to capture that behavior well then. I'm guessing this issue could effect any amqp-client connection though, not just recovery attempts. It's probably just that it pops up during recovery since that's when the ELB scenario occurs.

Perhaps @michaelklishin could weigh in on this. Michael - basically the problem is that when rabbitmq connections are proxied through ELB, ELB might accept a TCP connection but not respond to the initial handshake which leaves the client hanging forever. See the call stack 2 posts up. The first idea that comes to my mind is that everything that happens inside AMQConnection.start() should be covered by the connection timeout setting. Thoughts?

from lyra.

spatula75 avatar spatula75 commented on August 16, 2024

I won't swear to this, but I think when I was playing with this, I did try it with just a bare AMQP connection and saw it time out when the heartbeat interval had passed because the handshake didn't complete, but it has been a little while since I was toying with it. I think in that case, I just started up netcat on 5671 and tried to connect to a connection that didn't do anything, and I believe it did throw an exception as expected. It would be a useful thing to verify though, because my memory is like a sieve.

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

@spatula75 Do you know if ELB eventually closes the client connection or how long it takes?

from lyra.

spatula75 avatar spatula75 commented on August 16, 2024

The behavior varies. The first time, it seems to take about 20 seconds before the connection gets closed. After that, it's more of an immediate connect/close.

from lyra.

michaelklishin avatar michaelklishin commented on August 16, 2024

Folks, line numbers in the stack trace don't match master or 3.5.2, AFAICT. If you have a way to reproduce this reasonably quickly, can you please try with 3.5.2 (the issue is likely there, I just want to see an up-to-date trace). Thank you!

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

@michaelklishin Sure, here's a stacktrace taken with 3.5.2:

Thread [lyra-recovery-1] (Suspended)    
    waiting for: BlockingValueOrException<V,E>  (id=28) 
    Object.wait(long) line: not available [native method]   
    BlockingValueOrException<V,E>(Object).wait() line: 503  
    BlockingValueOrException<V,E>(BlockingCell<T>).get() line: 50   
    BlockingValueOrException<V,E>(BlockingCell<T>).uninterruptibleGet() line: 89    
    BlockingValueOrException<V,E>.uninterruptibleGetValue() line: 33    
    AMQChannel$SimpleBlockingRpcContinuation(AMQChannel$BlockingRpcContinuation<T>).getReply() line: 348    
    AMQConnection.start() line: 294 
    ConnectionFactory.newConnection(ExecutorService, Address[]) line: 603   
    ConnectionHandler$3.call() line: 243    
    ConnectionHandler$3.call() line: 236    
    ConnectionHandler(RetryableResource).callWithRetries(Callable<T>, RecurringPolicy<?>, RecurringStats, Set<Class<Exception>>, boolean, boolean) line: 51 
    ConnectionHandler.createConnection(RecurringPolicy<?>, Set<Class<Exception>>, boolean) line: 236    
    ConnectionHandler.recoverConnection() line: 271 
    ConnectionHandler.access$100(ConnectionHandler) line: 41    
    ConnectionHandler$ConnectionShutdownListener$1.run() line: 95   
    ThreadPoolExecutor.runWorker(ThreadPoolExecutor$Worker) line: 1110  
    ThreadPoolExecutor$Worker.run() line: 603   
    Thread.run() line: 722  

from lyra.

michaelklishin avatar michaelklishin commented on August 16, 2024

Should be fixed in rabbitmq/rabbitmq-java-client#65, will be in 3.5.3 (scheduled in 4 weeks).

from lyra.

spatula75 avatar spatula75 commented on August 16, 2024

Awesome!

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

@spatula75 Here's some log output running your test against Michael's changes along with a few tweaks in Lyra:

[TestNG] Running:
  /private/var/folders/my/jry08ttx4_l7_5gpjmv42zcc0000gn/T/testng-eclipse-1758990684/testng-customsuite.xml

INFO  [2015-05-13 16:10:11,905] ConnectionHandler - Creating connection cxn-1 to [localhost:55672]
INFO  [2015-05-13 16:10:11,907] RudeProxyTest - Got a client connection!
INFO  [2015-05-13 16:10:11,907] RudeProxyTest - Connecting to real server...
INFO  [2015-05-13 16:10:11,921] RudeProxyTest - Read 8 bytes from client
INFO  [2015-05-13 16:10:11,927] RudeProxyTest - Read 468 bytes from server
INFO  [2015-05-13 16:10:11,982] RudeProxyTest - Read 382 bytes from client
INFO  [2015-05-13 16:10:11,991] RudeProxyTest - Read 20 bytes from server
INFO  [2015-05-13 16:10:11,996] RudeProxyTest - Read 20 bytes from client
INFO  [2015-05-13 16:10:11,996] RudeProxyTest - Read 16 bytes from client
INFO  [2015-05-13 16:10:12,042] RudeProxyTest - Read 13 bytes from server
INFO  [2015-05-13 16:10:12,042] ConnectionHandler - Created connection cxn-1 to amqp://127.0.0.1:55672/
INFO  [2015-05-13 16:10:12,050] RudeProxyTest - Read 13 bytes from client
INFO  [2015-05-13 16:10:12,051] RudeProxyTest - Read 16 bytes from server
INFO  [2015-05-13 16:10:12,073] ConnectionHandler - Created channel-1 on cxn-1
INFO  [2015-05-13 16:10:12,075] RudeProxyTest - Read 30 bytes from client
INFO  [2015-05-13 16:10:12,080] RudeProxyTest - Read 12 bytes from server
INFO  [2015-05-13 16:10:12,082] RudeProxyTest - Read 20 bytes from client
INFO  [2015-05-13 16:10:12,092] RudeProxyTest - Read 51 bytes from server
INFO  [2015-05-13 16:10:12,094] RudeProxyTest - Read 57 bytes from client
INFO  [2015-05-13 16:10:12,101] RudeProxyTest - Read 12 bytes from server
INFO  [2015-05-13 16:10:12,105] RudeProxyTest - Read 51 bytes from client
INFO  [2015-05-13 16:10:12,112] RudeProxyTest - Read 44 bytes from server
INFO  [2015-05-13 16:10:12,115] ChannelHandler - Created consumer-amq.ctag-Nc4IAzpZDDF0KZDaNOUcxQ of amq.gen-Ja861vZAtjHxpfGQNyRApg via channel-1 on cxn-1
INFO  [2015-05-13 16:10:12,117] RudeProxyTest - Read 55 bytes from client
INFO  [2015-05-13 16:10:12,121] RudeProxyTest - Read 93 bytes from server
INFO  [2015-05-13 16:10:12,123] RudeProxyTest - Consumed derp
ERROR [2015-05-13 16:10:12,126] RudeProxyTest - Exception reading from client
java.net.SocketException: Socket closed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:150)
    at java.net.SocketInputStream.read(SocketInputStream.java:121)
    at java.net.SocketInputStream.read(SocketInputStream.java:107)
    at vc.bjn.badproxy.RudeProxyTest$3.run(RudeProxyTest.java:165)
ERROR [2015-05-13 16:10:12,127] ChannelHandler - Channel channel-1 on cxn-1 was closed unexpectedly
ERROR [2015-05-13 16:10:12,129] ConnectionHandler - Connection cxn-1 was closed unexpectedly
INFO  [2015-05-13 16:10:12,131] ConnectionHandler - Recovering connection cxn-1 to [localhost:55672]
INFO  [2015-05-13 16:10:12,131] RudeProxyTest - Got a client connection!
INFO  [2015-05-13 16:10:12,131] RudeProxyTest - I'm no longer proxying. Waiting and then hanging up.
INFO  [2015-05-13 16:10:19,140] ConnectionHandler - Recovering connection cxn-1 to [localhost:55672]
INFO  [2015-05-13 16:10:28,147] ConnectionHandler - Recovering connection cxn-1 to [localhost:55672]
INFO  [2015-05-13 16:10:32,133] RudeProxyTest - Goodbye, client!
INFO  [2015-05-13 16:10:32,134] RudeProxyTest - Got a client connection!
INFO  [2015-05-13 16:10:32,134] RudeProxyTest - Connecting to real server...
INFO  [2015-05-13 16:10:32,140] RudeProxyTest - Read 8 bytes from client
INFO  [2015-05-13 16:10:32,141] RudeProxyTest - Got a client connection!
INFO  [2015-05-13 16:10:32,141] RudeProxyTest - Connecting to real server...
INFO  [2015-05-13 16:10:32,142] RudeProxyTest - Read 8 bytes from client
INFO  [2015-05-13 16:10:32,146] RudeProxyTest - Read 468 bytes from server
INFO  [2015-05-13 16:10:32,147] RudeProxyTest - Read 382 bytes from client
INFO  [2015-05-13 16:10:32,149] RudeProxyTest - Read 20 bytes from server
INFO  [2015-05-13 16:10:32,149] RudeProxyTest - Read 20 bytes from client
INFO  [2015-05-13 16:10:32,149] RudeProxyTest - Read 16 bytes from client
INFO  [2015-05-13 16:10:32,182] RudeProxyTest - Read 13 bytes from server
INFO  [2015-05-13 16:10:32,182] ConnectionHandler - Recovered connection cxn-1 to amqp://127.0.0.1:55672/
INFO  [2015-05-13 16:10:32,183] ConnectionHandler - Recovering exchange test via cxn-1
INFO  [2015-05-13 16:10:32,183] RudeProxyTest - Read 13 bytes from client
INFO  [2015-05-13 16:10:32,191] RudeProxyTest - Read 16 bytes from server
INFO  [2015-05-13 16:10:32,192] RudeProxyTest - Read 30 bytes from client
INFO  [2015-05-13 16:10:32,194] RudeProxyTest - Read 12 bytes from server
INFO  [2015-05-13 16:10:32,198] RudeProxyTest - Read 20 bytes from client
INFO  [2015-05-13 16:10:32,201] RudeProxyTest - Read 51 bytes from server
INFO  [2015-05-13 16:10:32,201] ConnectionHandler - Recovered queue amq.gen-Ja861vZAtjHxpfGQNyRApg as amq.gen-x__GTOc1NfEOAyrs-hTO3g via cxn-1
INFO  [2015-05-13 16:10:32,202] ConnectionHandler - Recovering queue binding from test to amq.gen-x__GTOc1NfEOAyrs-hTO3g with # via cxn-1
INFO  [2015-05-13 16:10:32,202] RudeProxyTest - Read 57 bytes from client
INFO  [2015-05-13 16:10:32,211] RudeProxyTest - Read 12 bytes from server
INFO  [2015-05-13 16:10:32,212] RudeProxyTest - Read 21 bytes from client
INFO  [2015-05-13 16:10:32,221] RudeProxyTest - Read 12 bytes from server
INFO  [2015-05-13 16:10:32,222] ChannelHandler - Recovering channel-1 on cxn-1
INFO  [2015-05-13 16:10:32,222] RudeProxyTest - Read 13 bytes from client
INFO  [2015-05-13 16:10:32,230] RudeProxyTest - Read 16 bytes from server
INFO  [2015-05-13 16:10:32,230] ChannelHandler - Recovered channel-1 on cxn-1
INFO  [2015-05-13 16:10:32,230] ChannelHandler - Recovering consumer-amq.ctag-Nc4IAzpZDDF0KZDaNOUcxQ of amq.gen-x__GTOc1NfEOAyrs-hTO3g via channel-1 on cxn-1
INFO  [2015-05-13 16:10:32,231] RudeProxyTest - Read 51 bytes from client
INFO  [2015-05-13 16:10:32,240] RudeProxyTest - Read 44 bytes from server

This looks pretty good to me. I'll make a new release of Lyra sometime soon which will work with the changes to come in the amqp-client 3.5.3. You should be able to build and use either in the meantime if you wish.

from lyra.

spatula75 avatar spatula75 commented on August 16, 2024

Agreed, this looks like exactly the behavior we would expect: more than one attempt to recover, and once the proxy switches back into "actually proxying" mode, recovery succeeds and things return to normal.

from lyra.

fakeh avatar fakeh commented on August 16, 2024

Hi again!
I also occasionally see blocking in the BVOE class and have written some messages to the RMQ mailing list about it:

Reducing contention on the Channel reduced the occurrence.

from lyra.

spatula75 avatar spatula75 commented on August 16, 2024

Happy to report that using amqp-client 3.5.3 (released last week), RudeProxyTest passes.

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

I just released Lyra 0.5.1 which includes the TimeoutException stuff to correspond with the 3.5.3 release.

from lyra.

spatula75 avatar spatula75 commented on August 16, 2024

Awesome! Any idea how long it normally takes things to propagate to maven central?

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

I think it's usually within an hour...

from lyra.

michaelklishin avatar michaelklishin commented on August 16, 2024

👍

from lyra.

jplock avatar jplock commented on August 16, 2024

Lyra 0.5.1 still depends on amqp-client 3.5.1 in the pom. Were you going to increase that to 3.5.3? https://github.com/jhalterman/lyra/blob/lyra-0.5.1/pom.xml#L46

from lyra.

jhalterman avatar jhalterman commented on August 16, 2024

Sigh... I suck.

Let's try this again - 0.5.2 - should be in Maven central shortly.

from lyra.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.