Comments (7)
@lukebakken That's what appears to be happening. I determined this by putting some log statements in a few places and passing the channel id into the confirmations code to test the hypothesis of channel mismatch was true (thankfully, it wasn't). With the logging (erased of course in the commit) I was able to see that sometimes the confirm would be called before the ch.confirms.Publish()
.
from amqp091-go.
I've found that, if I add a log.Printf()
on line 149 in confirms.go, that the if block which should not be entered, is in fact entered when I get a hanging DeferredConfirmation.Wait()
. (Sorry, I don't know how to reference code very well here in GitHub - the line number might be off by one due to the extra "log" import in my copy?).
I'm wondering whether the confirmation message is somehow getting dispatched to the wrong channel, causing the confirmation on the intended channel to never get the confirmation?
Here's what I changed:
func (d *deferredConfirmations) Confirm(confirmation Confirmation) {
d.m.Lock()
defer d.m.Unlock()
dc, found := d.confirmations[confirmation.DeliveryTag]
if !found {
log.Printf("Confirmed unpublished tag: %d\n", confirmation.DeliveryTag)
// We should never receive a confirmation for a tag that hasn't
// been published, but a test causes this to happen.
return
}
dc.setAck(confirmation.Ack)
delete(d.confirmations, confirmation.DeliveryTag)
}
Or perhaps more helpful, when I panic instead of merely logging the output, I get the following:
go run main.go
2023/03/16 00:00:58 Connecting to amqp://guest:guest@localhost/
2023/03/16 00:00:58 Pushing 10000 messages to 10 confirming publishers
panic: Confirmed unpublished tag: 53
goroutine 21 [running]:
github.com/rabbitmq/amqp091-go.(*deferredConfirmations).Confirm(0xc0000a03c0, {0x13f?, 0x36?})
/home/dec/projects/amqp091-go/confirms.go:150 +0x136
github.com/rabbitmq/amqp091-go.(*confirms).One(0xc0000b64b0, {0x0?, 0x58?})
/home/dec/projects/amqp091-go/confirms.go:81 +0x94
github.com/rabbitmq/amqp091-go.(*Channel).dispatch(0xc0000c26c0, {0x6a4e40, 0xc00027d5c0})
/home/dec/projects/amqp091-go/channel.go:352 +0x71e
github.com/rabbitmq/amqp091-go.(*Channel).recvMethod(0xc0000c26c0, {0x6a45c0?, 0xc00027a780})
/home/dec/projects/amqp091-go/channel.go:389 +0x146
github.com/rabbitmq/amqp091-go.(*Connection).dispatchN(0xc0001a2000, {0x6a45c0?, 0xc00027a780?})
/home/dec/projects/amqp091-go/connection.go:615 +0x1d2
github.com/rabbitmq/amqp091-go.(*Connection).demux(0xc00005ef28?, {0x6a45c0, 0xc00027a780})
/home/dec/projects/amqp091-go/connection.go:566 +0x5b
github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc0001a2000, {0x7f4243495868?, 0xc0000b2070?})
/home/dec/projects/amqp091-go/connection.go:670 +0x23d
created by github.com/rabbitmq/amqp091-go.Open
/home/dec/projects/amqp091-go/connection.go:268 +0x34c
exit status 2
from amqp091-go.
After some more determined testing, my hypothesis about messages dispatched to incorrect channels was false.
It appears to be a timing issue, in which the goroutine doing publishing gets interrupted between the sending the basicPublish (ch.send
) and generating the DeferredConfirmation
(ch.confirms.Publish()
) when publishing.
My fix creates the DeferredConfirmation
in advance, and rolls it back on error.
The only alternative I can think of would be to lock the reading part / message dispatch to the channel until the publish / confirmation creating is done.
In retrospect, I must admit that the code which can reproduce the error above was always tested against a local instance (although on different machines).
This issue is probably even less likely to come up in production instances (I suppose?) due to network latency.
from amqp091-go.
RabbitMQ is indeed not confirming the message
Extremely unlikely!
I might have found an issue in this library that is only revealed under particular circumstances
Much more likely! Thanks for the code and detailed information. We'll take a look.
from amqp091-go.
Thank you @calloway-jacob! I've been working on customer issues, and will review this soon. Have a great weekend ☘️
from amqp091-go.
@calloway-jacob that's interesting. So the publish/confirmation happens so fast it "beats" the call to ch.confirms.Publish()
? Makes sense to me.
from amqp091-go.
@calloway-jacob I wish every OSS user took the time like you did to dig into an issue. Thanks a lot.
from amqp091-go.
Related Issues (20)
- Use of buffered delivery channels when prefetch_count is not null HOT 7
- Add a constant to set consumer timeout as queue argument
- Test failure on 32bit architectures HOT 1
- The server name and version could not be captured by using tls.ConnectionState HOT 1
- The ctx parameter in the PublishWithDeferredConfirmWithContext method does not have a contextual effec HOT 4
- Package panics at connection.Channel HOT 1
- connection block when write connection reset by peer HOT 7
- Method amqp.Connection.Channel() is blocked for infinity while there is no Internet HOT 13
- concurrent dispatch data race HOT 3
- Messages don't do ack HOT 3
- Inconsistent documentation HOT 1
- Import error while using in a Traefik plugin HOT 1
- Channel Notify Close not receive event when connection is closed by RMQ server. HOT 1
- Add mutex guard to Channel methods HOT 6
- Allow us to add queue sub to existing consumer
- add better debug information on DialConfig HOT 3
- pause consuming with auto ack true / rate limit with with auto ack true
- republishing on reconnect bug in the example HOT 2
- deadlocks in `Channel.call(...)` HOT 2
- Release 1.10.0 HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from amqp091-go.