Comments (13)
This might also be a useful approach: streadway/amqp#519
from amqp091-go.
Hi. I had the same problem with https://github.com/streadway/amqp.
I had to look at the channel shutdown function. It blocks on sending a notification to the notifyChanClose
channel. I consumed that and the consumer channel is closed afterwards.
Instead of
select {
case <-notifyConnClose:
log.Println("connection closed")
case <-notifyChanClose:
log.Println("channel closed")
}
I have this:
select {
case <-s.notifyConnClose:
log.Warn().Msg("Connection closed. Reconnecting...")
select {
case <-s.notifyChanClose:
log.Warn().Msg("Channel closed(2). Re-running init...")
case <-time.After(time.Second):
}
return false
case <-s.notifyChanClose:
log.Warn().Msg("Channel closed. Re-running init...")
}
This is the line that blocks and prevents the consumer channel from being closed here
from amqp091-go.
Hi,
thank you very much to have investigated on the issue both here and in #32
I can reproduce the issue with a local integration test.
As far as I understand here during an abnormal disconnection which can happen for example forcing a close connection from UI or simulating a network issue a shutdown process is invoked, and the shutdown function in channel.go is blocked here:
for _, c := range ch.closes {
c <- e
}
because the channel notifyChanClose in the select statement of the example is not consumed anymore after the notifyConnClose get consumed causing the lock acquired from the shutdown method to never been released and so creating the deadlock
The options are:
- Better document the usage/behaviour of the library during a shutdown event both in the documentation and in the code of the shutdown of the library
- Let the shutdown behaves asynchronously as already suggested, causing some notifications to be lost.
This library is widely used since some time and it is used as wrapper for other libraries, modifying this behaviour could cause some issue elsewhere, so we are more inclined to proceed with the first option.
In this case we will state in the documentation that during a shutdown caused by an abnormal drop in the connection the notification channels both for connections and channels need to be consumed or a buffered channel needs to be used.
from amqp091-go.
I tried your workaround solution on my repro project above and it works. Thank you very much.
Do you think that a quick fix like this in the lines you mention would be a good solution to this problem?
if e != nil {
for _, c := range ch.closes {
select {
case c <- e:
default:
}
}
}
Or should the blocking remain as intended functionality (we should listen on both connection and channel close channels in the event of abnormal disconnection). But it should be documented better?
from amqp091-go.
This would drop a notification in the lib. I have changed my reconnect code that is based on the example (here) to use buffered channels, but I need to check if it improves the behaviour.
Based on your code the change would be:
notifyConnClose := make(chan *amqp.Error)
notifyChanClose := make(chan *amqp.Error)
to
notifyConnClose := make(chan *amqp.Error, 1)
notifyChanClose := make(chan *amqp.Error, 1)
This means the above code isn't blocking anymore and I hope the mutexes don't deadlock. I still had problems when the channel or connection was closed while doing a call - in my case it was on DeclareQueue
. It was basically the same problem. The shutdown handler from the channel wants to send a notification and waits there and the code in DeclareQueue
waits for signals in some channels. Some other issue raises that the notifications should be handled in a different goroutine, but I hope that the buffered channels have the same result.
from amqp091-go.
I can also reproduce this issue.
from amqp091-go.
@DanielePalaia Thanks for the response.
Would Option 2 + a new major version (indicating a backward incompatible change) be also an option?
And can you elaborate on the bold part here a bit?
Let the shutdown behaves asynchronously as already suggested, causing some notifications to be lost.
from amqp091-go.
We understood the problem but we'd like to avoid putting timeouts for every single channel.
Here we are analyzing one single channel, but we could extend it to all the channels.
Given a simple go program:
func handleInt(ch chan int) {
fmt.Printf("Handle %d", <-ch)
}
func main() {
fmt.Println("Starting...")
ch := make(chan int)
go handleInt(ch)
ch <- 23
if we comment:
func handleInt(ch chan int) {
// fmt.Printf("Handle %d", <-ch)
}
We have a deadlock, but this is how golang works :)!
Speaking with the team, we'd tend to add some documentation.
Even if this is more about how to use Golang channel instead of how to use the library.
@andygrunwald thank you for your contributions.
We'd like to avoid introducing a 2.x version.
Too many versions to handle. We also have several clients to follow :)
from amqp091-go.
That is a fair design choice. Thanks for the context on this.
from amqp091-go.
In the documentation you are going to provide please include an explanation and maybe an example of the difference in behavior between a graceful close of a connection versus an unexpected one.
When I first encountered the issue, that part was what threw me off the most.
The problem happens when an unexpected disconnection occurs (for example I turn off my internet) . In that case even though the notifyConnClose channel gets a message the deliveryChan is not closed, and the range loop blocks forever.
In the event of a graceful disconnection by a connection.Close() then both the notifyConnClose gets a message, and the deliveryChan is Closed.
Having a different behavior for different types of disconnection was counter intuitive for me.
from amqp091-go.
Hi,
we updated the documentation explaining this use case scenario https://pkg.go.dev/github.com/rabbitmq/amqp091-go, we also commented to the notifyClose functions of the connection and channel struct.
I think we can close this one for now.
from amqp091-go.
@DanielePalaia Are these the commits?
from amqp091-go.
@andygrunwald yyes that ones!
from amqp091-go.
Related Issues (20)
- channel.Confirm(true) hangs
- Add a constant for CQ version
- Use of buffered delivery channels when prefetch_count is not null HOT 7
- Add a constant to set consumer timeout as queue argument
- Test failure on 32bit architectures HOT 1
- The server name and version could not be captured by using tls.ConnectionState HOT 1
- The ctx parameter in the PublishWithDeferredConfirmWithContext method does not have a contextual effec HOT 4
- Package panics at connection.Channel HOT 1
- connection block when write connection reset by peer HOT 7
- Method amqp.Connection.Channel() is blocked for infinity while there is no Internet HOT 13
- concurrent dispatch data race HOT 3
- Messages don't do ack HOT 3
- Inconsistent documentation HOT 1
- Import error while using in a Traefik plugin HOT 1
- Channel Notify Close not receive event when connection is closed by RMQ server. HOT 1
- Add mutex guard to Channel methods HOT 6
- Allow us to add queue sub to existing consumer
- add better debug information on DialConfig HOT 2
- pause consuming with auto ack true / rate limit with with auto ack true
- republishing on reconnect bug in the example HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from amqp091-go.