rabbitmq / amqp091-go Goto Github PK
View Code? Open in Web Editor NEWAn AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`
License: Other
An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`
License: Other
The client provided connection name is very useful to identify connections in Management UI and during debugging sessions. In the Java Client, it's quite easy to set a client provided name with ConnectionFactory.newConnection(string clientName)
. In this client, a user has to set a configuration property with a specific name, for example:
connection, err := amqp.DialConfig(
amqpURI,
amqp.Config{Properties: map[string]interface{}{"connection_name": "sample-producer"}},
)
This isn't great because the user is required to know the property key connection_name
and the Properties
map has to be crafted manually.
We should either provide a simple function like amqp.SetConnectionName(amqp.Config, string)
, or a constant with the connection name key (and any other well-known properties).
Reference: redis/go-redis#1279
There are several places in PR #78 where a panic
has been inserted if an error is returned from low-level functions. Panicking is not appropriate, but neither is disregarding the error. These errors should be logged at the error
or warning
level.
hey, I want to use the x-delayed-message plugin for rabbitmq. the rabbitmq document says:
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
How can I do this with the amqp091-go package?
Can you create a release for this project and follow semantic versioning please? It is ok to start with v0
version to allow backward incompatible changes. So v0.1.0
could be a good start.
This will allow users to pin specific version instead of commit hash and setup auto-updating (e.g. Dependabot).
Thanks!
I notice that this docs are copy pasted from Channel.NotifyClose method, but receiver didn't change, so it should be Connection insted of Channel
Lines 284 to 288 in 8b6de9a
In case of network error (supposed), a program using DeferredConfirmation can wait indefinitely on .Wait() method.
Linked issue in benthos: benthosdev/benthos#1299
The program can implement a timeout over the wait method but it will not clear it in amqp lib from confirms/deferredConfirmations of the channel.
First, is ok for you to add a cancel solution (new method, add context, ...) ?
If yes, I see many possible solution:
(d *DeferredConfirmation) Wait()
(this need change so that it can clear it self from deferredConfirmations)Which one suit you the most ? or if you have an other to suggest ?
publishWithContext will blocked forever when connection bloced
I've been a long-time user of the streadway/amqp library. Recently updated to this repository.
Since then, I have one instance of the error in the subject being sent as a close notification (Exception (505) Reason: "UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead"). This happened very shortly after program start (within the first second).
Searching around online, the most common explanation seems to be "you're using concurrency, but the library isn't designed for concurrency". However this library has a lock on Publish()
, which serializes everything. I don't think the explanation is so simple in this instance.
My usage is fairly vanilla. There are pub-sub "fanout" exchanges. The publish happens with
c.Publish("", key, true, false,
amqp091.Publishing{
DeliveryMode: amqp091.Persistent,
ContentType: contentType,
Body: body,
})
The message itself is binary. It may be small (several bytes), or it may be large (100kb+, but probably not as big as 1MB).
What I'm looking for are some plausible theories as to how this could happen. Is there some sort of race at startup? Are we meant to wait for something after ExchangeDeclare
? We also have a consumer subscription on the same Channel
. I don't think consuming messages can cause this sort of thing, but maybe in some weird instance?
Is there some additional information I should be looking at?
I'm copying and pasting from streadway/amqp#514, since I guess this fork is more actively maintained :)
Hi,
I've encountered an issue when trying to use AMQPLAIN authentication. It seems that the current implementation does not work out of the box.
I looked for an official reference for the AMQPLAIN authentication but could not find any, but other implementations have different wire formats.
To reproduce, run this gist against a rabbitmq
container:
% docker run -d -p 5672:5672 rabbitmq
% go run main.go guest guest amqp://127.0.0.1:5672/
2021/09/09 10:51:38 can't dial: Exception (403) Reason: "username or password not allowed"
I'll add a PR later.
Hi, I already covered created an issue for this on streadway/amqp#518 but then it was for streadway/amqp the issue still reproduces with this library so I decided to add the issue here too
I have created a sample project with few files that you have to download and run the main to reproduce the issue, the files are on:
https://gist.github.com/melardev/1b9c7e1b1a4ac37cb31e57dc6cde99c7
The code is bad, it is not thread-safe, but It was easier to reproduce my issue with that code rather than my real project that was doing effort to keep things thread-safe, the idea is to show the framework hanging yet the connection is already closed, it may get stuck in any function call, mainly QueueDeclare and channel.Close() but most of the times it gets stuck on channel.Close()
the issue is for closing a channel it has to acquire a Lock https://github.com/rabbitmq/amqp091-go/blob/main/connection.go#L640, which is already held here https://github.com/rabbitmq/amqp091-go/blob/main/connection.go#L408, basically the connection is waiting the channel to be closed but the channel::shutdown() never exits its main loop, even if the connection is closed ...
This is my complete RabbitMQ code inside rabbitmq package.I use StartConsumer function for Consume message from rabbitmq with goroutine.
go rabbit.StartConsumer("myqueue", "", handler, 1)
After a while my consumer doesn't working and doesn't receive any message.Also I cannot see any error/issue.
package rabbitmq
import (
"context"
"fmt"
"os"
"runtime"
"time"
"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
amqp "github.com/rabbitmq/amqp091-go"
)
type RabbitMQ struct {
conn *amqp.Connection
queues map[string]amqp.Queue
connString string
rabbitConnCloseError chan *amqp.Error
rabbitChannCloseError chan *amqp.Error
recoveryConsumer []RecoveryConsumer
ch *amqp.Channel
// exchange_name string
}
type RecoveryConsumer struct {
queueName string
routingKey string
handler func(d amqp.Delivery)
concurrency int8
}
type (
Delivery = amqp.Delivery
)
func (r *RabbitMQ) IfExist(queueName string) bool {
for _, item := range r.recoveryConsumer {
if item.queueName == queueName {
return false
}
}
return true
}
func (r *RabbitMQ) RecoverConsumers() {
for _, i := range r.recoveryConsumer {
go r.StartConsumer(i.queueName, i.routingKey, i.handler, int(i.concurrency))
log.Infof("Consumer for %v successfully recovered", i.queueName)
}
}
func (r *RabbitMQ) Reconnector() {
for { //nolint //nolint
select {
case err := <-r.rabbitConnCloseError:
log.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server)
log.Debug("[RabbitMQ] Reconnecting after connection closed")
sentry.CaptureException(fmt.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server))
r.connection()
r.RecoverConsumers()
case err := <-r.rabbitChannCloseError:
log.Errorf("[RabbitMQ] Channel Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server)
log.Debug("[RabbitMQ] Reconnecting after channel closed")
sentry.CaptureException(fmt.Errorf("[RabbitMQ] Channel Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server))
r.ch.Close()
r.RecoverConsumers()
}
}
}
func (r *RabbitMQ) Connect(host string, user string, pass string, virthost string) {
r.connString = "amqp://" + user + ":" + pass + "@" + host + "/"
if virthost != "/" || len(virthost) > 0 {
r.connString += virthost
}
r.connection()
go r.Reconnector()
}
func (r *RabbitMQ) connection() {
if r.conn != nil {
if !r.conn.IsClosed() {
return
} else {
log.Info("Reconnecting to RabbitMQ...")
}
}
var err error
r.conn, err = amqp.Dial(r.connString)
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
r.conn.Config.Heartbeat = 5 * time.Second
r.queues = make(map[string]amqp.Queue)
r.rabbitConnCloseError = make(chan *amqp.Error)
r.conn.NotifyClose(r.rabbitConnCloseError)
log.Debug("[RabbitMQ] Successfully connected to RabbitMQ")
log.Infof("Number of Active Thread/Goroutine %v", runtime.NumGoroutine())
}
func (r *RabbitMQ) CreateChannel() *amqp.Channel {
ch, err := r.conn.Channel()
if err != nil {
log.Error(err)
return nil
}
return ch
}
func (r *RabbitMQ) QueueAttach(ch *amqp.Channel, name string) {
q, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("%s: %s", "Failed to declare a queue", err)
}
r.queues[name] = q
// r.ch.ExchangeDeclare()
}
func (r *RabbitMQ) TempQueueAttach(ch *amqp.Channel, name string) {
_, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
ch.Close()
log.Fatalf("%s: %s", "Failed to declare a temporary queue", err)
sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
}
}
func (r *RabbitMQ) Publish(ch *amqp.Channel, queue string, body []byte) {
span := sentry.StartSpan(context.TODO(), "publish message")
defer span.Finish()
err := ch.Publish(
"", // exchange
r.queues[queue].Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: map[string]interface{}{},
ContentType: "application/json",
ContentEncoding: "",
DeliveryMode: amqp.Persistent,
Priority: 0,
CorrelationId: "",
ReplyTo: "",
Expiration: "",
MessageId: "",
Timestamp: time.Now().UTC(),
Type: "",
UserId: "",
AppId: "",
Body: body,
})
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed to publish a message", err)
}
log.Debugf("Send message: %s", string(body))
}
func (r *RabbitMQ) StartConsumer(queueName string, routingKey string, handler func(d amqp.Delivery), concurrency int) {
// prefetch 4x as many messages as we can handle at once
var err error
ok := r.IfExist(queueName)
if ok {
r.recoveryConsumer = append(r.recoveryConsumer, RecoveryConsumer{
queueName: queueName,
routingKey: routingKey,
handler: handler,
concurrency: int8(concurrency),
})
}
r.ch, err = r.conn.Channel()
if err != nil {
log.Error(err)
}
r.ch.NotifyClose(r.rabbitChannCloseError)
prefetchCount := concurrency * 1
err = r.ch.Qos(prefetchCount, 0, false)
if err != nil {
sentry.CaptureException(err)
log.Errorf("%s: %s", "Failed QOS", err)
}
r.QueueAttach(r.ch, queueName)
msgs, err := r.ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed consume message", err)
sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
os.Exit(1)
}
go func() {
for msg := range msgs {
handler(msg)
}
log.Error("[RabbitMQ] Rabbit consumer closed")
}()
}
func (r *RabbitMQ) WaitMessage(ch *amqp.Channel, queueName string, timeout time.Duration) []byte {
st := time.Now()
for time.Since(st).Seconds() < 1 {
msg, ok, err := ch.Get(queueName, true)
if err != nil {
log.Errorf("Can't consume queue. Error: %s", err.Error())
sentry.CaptureException(err)
return nil
}
if ok {
return msg.Body
}
time.Sleep(50 * time.Millisecond)
}
return nil
}
`
Hello.
https://go.dev/play/p/cuOKquzMu7K
2022/10/11 19:50:50 Failed to connect to RabbitMQ: x509: certificate signed by unknown authority
exit status 1
Please help.
tell me how to connect a certificate in this code so that you can contact the rabbit server with an example?
Running our tests with -race
flag, it reports a race condition in the Connection module. Pasting here one stack trace to investigate.
๏ปฟWARNING: DATA RACE
Write at 0x00c00022a050 by goroutine 22:
github.com/rabbitmq/amqp091-go.(*Connection).shutdown.func1()
/Users/acedres/workspace/amqp091-go/connection.go:443 +0x50e
sync.(*Once).doSlow()
/usr/local/Cellar/go/1.17.3/libexec/src/sync/once.go:68 +0x127
sync.(*Once).Do()
/usr/local/Cellar/go/1.17.3/libexec/src/sync/once.go:59 +0x46
github.com/rabbitmq/amqp091-go.(*Connection).shutdown()
/Users/acedres/workspace/amqp091-go/connection.go:407 +0x7d
github.com/rabbitmq/amqp091-go.(*Connection).reader()
/Users/acedres/workspace/amqp091-go/connection.go:542 +0x344
github.com/rabbitmq/amqp091-go.Openยทdwrapยท20()
/Users/acedres/workspace/amqp091-go/connection.go:250 +0x58
Previous write at 0x00c00022a050 by goroutine 20:
github.com/rabbitmq/amqp091-go.(*Connection).openComplete()
/Users/acedres/workspace/amqp091-go/connection.go:847 +0x13d
github.com/rabbitmq/amqp091-go.(*Connection).openVhost()
/Users/acedres/workspace/amqp091-go/connection.go:832 +0x1ce
=== RUN TestChannelOpen
github.com/rabbitmq/amqp091-go.(*Connection).openTune()
/Users/acedres/workspace/amqp091-go/connection.go:818 +0xb04
github.com/rabbitmq/amqp091-go.(*Connection).openStart()
/Users/acedres/workspace/amqp091-go/connection.go:754 +0x4e7
github.com/rabbitmq/amqp091-go.(*Connection).open()
/Users/acedres/workspace/amqp091-go/connection.go:726 +0xc4
github.com/rabbitmq/amqp091-go.Open()
/Users/acedres/workspace/amqp091-go/connection.go:251 +0x664
github.com/rabbitmq/amqp091-go.TestOpen()
/Users/acedres/workspace/amqp091-go/client_test.go:275 +0x267
testing.tRunner()
/usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1259 +0x22f
testing.(*T).Runยทdwrapยท21()
/usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1306 +0x47
Goroutine 22 (running) created at:
github.com/rabbitmq/amqp091-go.Open()
/Users/acedres/workspace/amqp091-go/connection.go:250 +0x618
github.com/rabbitmq/amqp091-go.TestOpen()
/Users/acedres/workspace/amqp091-go/client_test.go:275 +0x267
testing.tRunner()
/usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1259 +0x22f
testing.(*T).Runยทdwrapยท21()
/usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1306 +0x47
Goroutine 20 (running) created at:
testing.(*T).Run()
/usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1306 +0x726
testing.runTests.func1()
/usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1598 +0x99
testing.tRunner()
/usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1259 +0x22f
testing.runTests()
/usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1596 +0x7ca
testing.(*M).Run()
/usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1504 +0x9d1
main.main()
_testmain.go:251 +0x22b
This is a simplified version of my code.
package main
import (
amqp "github.com/rabbitmq/amqp091-go"
"log"
)
func main() {
amqpUri := "amqpuri"
conn, err := amqp.Dial(amqpUri)
if err != nil {
return
}
notifyConnClose := make(chan *amqp.Error)
conn.NotifyClose(notifyConnClose)
log.Println("RabbitMQ client connected")
ch, err := conn.Channel()
if err != nil {
return
}
notifyChanClose := make(chan *amqp.Error)
ch.NotifyClose(notifyChanClose)
queueName := "testqueue"
_, err = ch.QueueDeclare(queueName, false, false, false, false, nil)
if err != nil {
return
}
deliveryChan, err := ch.Consume(queueName, "", false, false, false, false, nil)
if err != nil {
return
}
go func() {
select {
case <-notifyConnClose:
log.Println("connection closed")
case <-notifyChanClose:
log.Println("channel closed")
}
}()
for d := range deliveryChan {
log.Println(string(d.Body))
d.Ack(false)
//ch.Close() //comment out to test graceful close.
}
log.Println("terminating...")
}
In this code I am getting a connection and a channel and registering a notification (go)channel for both the connection and channel to be notified when they are closed.
Then I declare a queue and start consuming messages from it by ranging on the deliveryChan <-chan amqp.Delivery
returned by the consume function.
The problem happens when an unexpected disconnection occurs (for example I turn off my internet) . In that case even though the notifyConnClose channel gets a message the deliveryChan is not closed, and the range loop blocks forever.
In the event of a graceful disconnection by a connection.Close() then both the notifyConnClose gets a message, and the deliveryChan is Closed.
In the event of the unexpected disconnection, given that I can't close the <-chan amqp.Delivery
from my code how am I supposed to proceed and get the loop to end?
Hello,
we are using go.uber.org/goleak for our internal amqp wrapper package and stumbled over a go-routine leak.
This happens in an internal testcase where we send 2 messages to a non-existing exchange. This causes an error that is received by our reconnect goroutine on the NotifyClose channel which to close the connection and then the connection is also closed in our client (after the reconnect go-routine terminated).
In our testcase we only call 1x close on the channel (unnecessary) and 1x Close on the connection.
I managed to reproduce the same hang and go-routine leak when calling Close in parallel on the same connection. This is not the same scenario that happens in our internal testcase but it reproduces it. :-)
I'm using amqp091-go commit 6cac2fa.
This issue can be reproduced with the following testcase:
//go:build integration
// +build integration
package amqp091
import (
"sync"
"testing"
"go.uber.org/goleak"
)
func TestGoRoutineLeakOnParallelConClose(t *testing.T) {
const routines = 2
defer goleak.VerifyNone(t)
c := integrationConnection(t, t.Name())
var wg sync.WaitGroup
startSigCh := make(chan interface{})
for i := 0; i < routines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
<-startSigCh
err := c.Close()
if err != nil {
t.Logf("close failed in routine %d: %s", id, err.Error())
}
}(i)
}
close(startSigCh)
t.Log("waiting for go-routines to terminate")
wg.Wait()
}
Test output:
=== RUN TestGoRoutineLeakOnParallelConClose
bug_test.go:36: waiting for go-routines to terminate
bug_test.go:30: close failed in routine 0: Exception (504) Reason: "channel/connection is not open"
leaks.go:78: found unexpected goroutines:
[Goroutine 11 in state chan send, with github.com/rabbitmq/amqp091-go.(*Connection).dispatch0 on top of the stack:
goroutine 11 [chan send]:
github.com/rabbitmq/amqp091-go.(*Connection).dispatch0(0xc0001263c0, {0x8c5828?, 0xc00000e240})
/home/fho/git/amqp091-go/connection.go:483 +0x40f
github.com/rabbitmq/amqp091-go.(*Connection).demux(0xc00005af18?, {0x8c5828, 0xc00000e240})
/home/fho/git/amqp091-go/connection.go:456 +0x59
github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc0001263c0, {0x8c4980?, 0xc000010068})
/home/fho/git/amqp091-go/connection.go:550 +0x225
created by github.com/rabbitmq/amqp091-go.Open
/home/fho/git/amqp091-go/connection.go:251 +0x5eb
]
--- FAIL: TestGoRoutineLeakOnParallelConClose (0.46s)
On my machine it happens on almost every execution, some succeed without the leak though.
It might be necessary to run the testcase multiple times to run into the issue:
while go test -race -count=1 -run=TestGoRoutineLeakOnParallelConClose -v -tags integration; do : ; done
Update:
I think I now understand how it happens:
c.demux()
and passed the msg to the rpc chan2call()
method did not read the msg from the rpc channel yet, it is waiting in the select loop4, because the errors channel is closed the call() returns without reading the msg from the rpc channel, the reader go-routine hangs forever in dispatch0()
trying to send the msg to the rpc channel 2 because it is unbufferedI guess this scenario, that call() returns before reading a msg from the rpc chan could also get triggered when Close() is called only 1x but an error happened, shortly after a message response was received.
call()
could return because an error is read from c.errors
while dispatch0
is sending a message to the rpc chan.
https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L350 โฉ
https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L483 โฉ โฉ2
https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L425 โฉ
https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L692 โฉ
I'm trying to publish message similar to:
channel.PublishWithContext(
ctx,
exchange,
messageName,
false,
false,
amqp091.Publishing{
ContentType: "application/json",
ContentEncoding: "utf-8",
Body: payload,
Headers: amqp091.Table{"_trace": []string{"a", "b"}},
}
)
and i receive error
table field "_trace" value []string not supported
I guess problem lies on slice validation https://github.com/streadway/amqp/blob/master/types.go#L232, shouldn't it be done using reflect?
Passing strings vector is not a problem via PHP amqp lib, so it seems to me that this is not the limitation of a rabbitmq.
I'm using
I use Rabbitmq3.11.3, and the official say "Single active consumer for streams is a feature available in RabbitMQ 3.11 and more."
but when I use amqp091-go to declare a queue which type is steam,just like this:
queue, err := channel.QueueDeclare(
"q5",
true,
false,
false,
false,
amqp091.Table{
"x-single-active-consumer": true,
"x-queue-type": "stream",
},
)
it always report error๏ผ 2022/11/23 16:56:19 Failed to declare a queue: Exception (406) Reason: "PRECONDITION_FAILED - invalid arg 'x-single-active-consumer' for queue 'q5' in vhost '/' of queue type rabbit_stream_queue"
How can I do for it?
It's great that this project is being actively maintained. While investigating a race condition I noticed that 65f6e25 updated the source code link in the code files, but simultaneously replaced the existing copyright in the code files with one for VMWare or affiliates, rather than just adding the new claim.
Based on changes in other commits such as 34291f5 I believe the VMWare copyright should have been added to the files, leaving the existing copyright in place. Unless the original contributors to this project have formally transferred their rights to VMWare?
I make sure that there is no concurrent goroutine using shared channel.
Follow-up to #96
Version 2 should remove deprecated API functions and "clean up" those that are left, perhaps coming up with a better name than PublishWithDeferredConfirm
.
To be honest if you're publishing without confirmations the function should be called PublishButDontComplainWhenYouLoseData
Hi,
When writing persistent messages using this library, the messages are being written to "queues" folder but while using php amqp library, it is being written to "msg_store_persistent" folder. Where is this configuration being set? This is not an issue with the library but I need help to figure this out, thanks.
Originally posted by mgdotson September 28, 2022
Using the document example running against a local docker container: docker run -p5672:5672 -p 15672:15672 -p5671:5671 -p15692:15692 rabbitmq:3.9.17-management
, putting the server into a blocking situation causes the channel.Close()
to hang and not finish.
Steps to reproduce:
rabbitmqctl set_vm_memory_high_watermark 0.00000001
rabbitmqctl set_vm_memory_high_watermark 0.4
queue.Close()
At this point, the code will close the done
channel but will hang on the queue.channel.Close()
command.
Even if we wrap this in a context timeout to allow the calling code finish, if this is a long running process, this could cause leaks over time, especially if there are multiple channels that end up in this situation due to a blocked server.
This is also not the only scenario that can case a channel.Close()
call to hang.
Best practices? TCP settings?
During a memory alarm, RabbitMQ won't read from the publisher channel; therefore, it does not send a confirmation before the client example "gives up" on the confirmation:
amqp091-go/example_client_test.go
Lines 295 to 302 in 048b5b2
The problem is that the client sends a new message and does not wait for any previous "given up" confirmation. This is not correct. The documentation of Channel.NotifyPublish(), which works very similar to Channel.NotifyConfirm()
, states:
It's advisable to wait for all Confirmations to arrive before calling Channel.Close() or Connection.Close().
It is also advisable for the caller to consume from the channel returned till it is closed to avoid possible deadlocks
The current implementation of the example client, does, in fact, deadlock in the situation described in the repro steps, as one Go routine is trying to deliver a confirmation, grabs a lock on the confirms
struct and sends a notification to a chan amqp.Confirm
, which nobody is listening to. Then, during the close sequence, Channel.Close()
tries to confirms.Close()
, which blocks on acquiring a lock on the confirms
struct. Because nobody is receiving on the chan amqp.Confirm
, this is a deadlock.
Go routines dump (only relevant two):
Goroutine 5
runtime.gopark (/Users/acedres/go/go1.18.6/src/runtime/proc.go:362)
runtime.chansend (/Users/acedres/go/go1.18.6/src/runtime/chan.go:258)
runtime.chansend1 (/Users/acedres/go/go1.18.6/src/runtime/chan.go:144)
github.com/rabbitmq/amqp091-go.(*confirms).confirm (/Users/acedres/workspace/amqp091-go/confirms.go:56)
github.com/rabbitmq/amqp091-go.(*confirms).One (/Users/acedres/workspace/amqp091-go/confirms.go:82)
github.com/rabbitmq/amqp091-go.(*Channel).dispatch (/Users/acedres/workspace/amqp091-go/channel.go:336)
github.com/rabbitmq/amqp091-go.(*Channel).recvMethod (/Users/acedres/workspace/amqp091-go/channel.go:373)
github.com/rabbitmq/amqp091-go.(*Connection).dispatchN (/Users/acedres/workspace/amqp091-go/connection.go:545)
github.com/rabbitmq/amqp091-go.(*Connection).demux (/Users/acedres/workspace/amqp091-go/connection.go:500)
github.com/rabbitmq/amqp091-go.(*Connection).reader (/Users/acedres/workspace/amqp091-go/connection.go:600)
github.com/rabbitmq/amqp091-go.Open.func1 (/Users/acedres/workspace/amqp091-go/connection.go:265)
runtime.goexit (/Users/acedres/go/go1.18.6/src/runtime/asm_arm64.s:1270)
created at: github.com/rabbitmq/amqp091-go.Open (/Users/acedres/workspace/amqp091-go/connection.go:265)
Goroutine 35
runtime.gopark (/Users/acedres/go/go1.18.6/src/runtime/proc.go:362)
runtime.goparkunlock (/Users/acedres/go/go1.18.6/src/runtime/proc.go:367)
runtime.semacquire1 (/Users/acedres/go/go1.18.6/src/runtime/sema.go:144)
sync.runtime_SemacquireMutex (/Users/acedres/go/go1.18.6/src/runtime/sema.go:71)
sync.(*Mutex).lockSlow (/Users/acedres/go/go1.18.6/src/sync/mutex.go:162)
sync.(*Mutex).Lock (/Users/acedres/go/go1.18.6/src/sync/mutex.go:81)
github.com/rabbitmq/amqp091-go.(*confirms).Close (/Users/acedres/workspace/amqp091-go/confirms.go:105)
github.com/rabbitmq/amqp091-go.(*Channel).shutdown.func1 (/Users/acedres/workspace/amqp091-go/channel.go:148)
sync.(*Once).doSlow (/Users/acedres/go/go1.18.6/src/sync/once.go:68)
sync.(*Once).Do (/Users/acedres/go/go1.18.6/src/sync/once.go:59)
github.com/rabbitmq/amqp091-go.(*Channel).shutdown (/Users/acedres/workspace/amqp091-go/channel.go:102)
github.com/rabbitmq/amqp091-go.(*Connection).shutdown.func1 (/Users/acedres/workspace/amqp091-go/connection.go:483)
sync.(*Once).doSlow (/Users/acedres/go/go1.18.6/src/sync/once.go:68)
sync.(*Once).Do (/Users/acedres/go/go1.18.6/src/sync/once.go:59)
github.com/rabbitmq/amqp091-go.(*Connection).shutdown (/Users/acedres/workspace/amqp091-go/connection.go:453)
github.com/rabbitmq/amqp091-go.(*Connection).send.func1 (/Users/acedres/workspace/amqp091-go/connection.go:433)
runtime.goexit (/Users/acedres/go/go1.18.6/src/runtime/asm_arm64.s:1270)
created at: github.com/rabbitmq/amqp091-go.(*Connection).send (/Users/acedres/workspace/amqp091-go/connection.go:433)
Hello,
I have three nodes of RabbitMQ in cluster and want use them properly according to docs but I cannot find any mentions of how to specify their FQDNs properly, it there a way to use clustered RabbitMQ at all?
Thanks in advance.
Old issue is https://github.com/streadway/amqp/issues/339ใ
Thanks for keeping this package moving forward.
Would you consider keeping a changelog in the repository source?
I notice that this package is now version 1.2.0, compared to 1.1.0 that I've been using. I can find no clear description of the differences, but running a diff shows it contains #20, #23, and #25, and by looking at those I can infer at least #23 justifies the minor update.
If a CHANGELOG.md
is maintained in the main branch and updated with each merge, that would make it easier to review at release time to ensure version tags are updated where appropriate. E.g. #26 has been merged which warrants a version update but that doesn't seem to be recorded anywhere.
Hi, there. ๐
I'm starting to migrate from github.com/streadway/amqp package and delve into the new features of this package. Currently I want to use the PublishWithDeferredConfirm method and it doesn't look production-ready to me. The case in which the problem occurs is closing the connection while waiting for publication confirmation from the AMQP server.
What I mean in more detail:
The type Channel has a shutdown method in which resources are cleared and closed, also here calls a Close method of confirms type. But this method does not properly clean up its resources such as deferredConfirmations an awaiting DeferredConfirmation.
dc, err := c.PublishWithDeferredConfirm(exchange, key, false, false, msg)
if err != nil {
log.Fatalf("c.PublishWithDeferredConfirm: %s", err)
}
dc.Wait() // <- deadlock
Possible solution:
Add one more method in type deferredConfirmations and call it from confirms.Close.
// Close closes all awaiting DeferredConfirmation with Nack.
func (d *deferredConfirmations) Close() error {
d.m.Lock()
defer d.m.Unlock()
for k, v := range d.confirmations {
v.confirmation = Confirmation{DeliveryTag: k, Ack: false}
v.wg.Done()
delete(d.confirmations, k)
}
return nil
}
But it seems to me that such a situation should be handled with explicit error like ErrClosed. And it's breaking change of package API.
Package version: v1.3.0
I just can't install it over **go install github.com/rabbitmq/amqp091-go**
It shows
package github.com/rabbitmq/amqp091-go is not a main package
Can anyone help ?
I redirect an issue I found.
Channel.Publish
is in dead lock state if confirmation notification Go channel is not consumed in time.
The core dump below shows how the dead lock happens on acquiring the mutex in confirms
0xc000b9b940
.
Channel.Publish
finished. It acquired the mutex in confirms
0xc000b9b940
. It waits for consuming the confirmation notification channel.confirms
0xc000b9b940
to increase confirms.published
.The work around is to consume the confirmation notification Go channel in a separate Go routine and synchronize Publish with another Go channel.
What a mess!
(dlv) gr 2869 bt
0 0x000000000043a4c5 in runtime.gopark
at /usr/local/go/src/runtime/proc.go:307
1 0x000000000044af85 in runtime.goparkunlock
at /usr/local/go/src/runtime/proc.go:312
2 0x000000000044af85 in runtime.semacquire1
at /usr/local/go/src/runtime/sema.go:144
3 0x000000000046c267 in sync.runtime_SemacquireMutex
at /usr/local/go/src/runtime/sema.go:71
4 0x0000000000487b45 in sync.(*Mutex).lockSlow
at /usr/local/go/src/sync/mutex.go:138
5 0x0000000000a82d9a in sync.(*Mutex).Lock
at /usr/local/go/src/sync/mutex.go:81
6 0x0000000000a82d9a in github.com/streadway/amqp.(*confirms).Publish
at /go/pkg/mod/github.com/streadway/[email protected]/confirms.go:32
7 0x0000000000a81edf in github.com/streadway/amqp.(*Channel).Publish
at /go/pkg/mod/github.com/streadway/[email protected]/channel.go:1360
....
(dlv) gr 2869 frame 6 args
c = ("*github.com/streadway/amqp.confirms")(0xc000b9b940)
~r0 = (unreadable empty OP stack)
(dlv) gr 98 bt
0 0x000000000043a4c5 in runtime.gopark
at /usr/local/go/src/runtime/proc.go:307
1 0x0000000000405aea in runtime.chansend
at /usr/local/go/src/runtime/chan.go:258
2 0x0000000000405895 in runtime.chansend1
at /usr/local/go/src/runtime/chan.go:143
3 0x0000000000a82e57 in github.com/streadway/amqp.(*confirms).confirm
at /go/pkg/mod/github.com/streadway/[email protected]/confirms.go:45
4 0x0000000000a82fc5 in github.com/streadway/amqp.(*confirms).One
at /go/pkg/mod/github.com/streadway/[email protected]/confirms.go:66
5 0x0000000000a7ea65 in github.com/streadway/amqp.(*Channel).dispatch
at /go/pkg/mod/github.com/streadway/[email protected]/channel.go:314
6 0x0000000000a7ec05 in github.com/streadway/amqp.(*Channel).recvMethod
at /go/pkg/mod/github.com/streadway/[email protected]/channel.go:351
7 0x0000000000a84f55 in github.com/streadway/amqp.(*Connection).dispatchN
at /go/pkg/mod/github.com/streadway/[email protected]/connection.go:477
8 0x0000000000a84a67 in github.com/streadway/amqp.(*Connection).demux
at /go/pkg/mod/github.com/streadway/[email protected]/connection.go:436
9 0x0000000000a851f6 in github.com/streadway/amqp.(*Connection).reader
at /go/pkg/mod/github.com/streadway/[email protected]/connection.go:528
10 0x0000000000470001 in runtime.goexit
at /usr/local/go/src/runtime/asm_amd64.s:1374
(dlv) gr 98 frame 4 args
c = ("*github.com/streadway/amqp.confirms")(0xc000b9b940)
confirmed = github.com/streadway/amqp.Confirmation {DeliveryTag: 1, Ack: true
See PR in streadway.
Iโd love to see go interfaces as a part of this package. It does make everyoneโs like miserable to write unit tests when you have integration with amqp.
I can take care or that.
see (streadway/amqp#489)
Trying the example code, but got the below:
PS D:\Deployment\mqtt\simple-producer> go run publisher
2022/07/10 20:21:04 [INFO] dialing "amqp://guest:guest@localhost:5672/"
2022/07/10 20:21:06 [ERROR] Dial: dial tcp [::1]:5672: connectex: No connection could be made because the target machine actively refused it.
exit status 1
[Issue]
I have 2 single active consumers (A, B).
A first consumer was active, B second was ready.
then, i killed A first active consumer while other process publish 10000 messages.
I was expecting a transition from A to B for ACTIVE consumer.
but it isn't.
[Reproduce]
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmqtest rabbitmq:3.10.6-management
func main() {
conn, err := amqp091.DialConfig("amqp://guest:guest@localhost:5672/", amqp091.Config{
Heartbeat: time.Second * 30,
})
if err != nil {
panic(err)
}
ch, err := conn.Channel()
if err != nil {
panic(err)
}
if err := ch.Qos(200, 0, false); err != nil {
panic(err)
}
queueArgs := make(amqp091.Table)
queueArgs["x-single-active-consumer"] = true
_, err = ch.QueueDeclare("queue",
true, // durable
false, // auto delete
false, //exclusive
false, //noWait
queueArgs, // queue args
)
if err != nil {
panic(err)
}
msgs, err := ch.Consume("queue", "consumer", false, false, false, false, nil)
if err != nil {
panic(err)
}
d := make(chan bool)
go func() {
for msg := range msgs {
fmt.Println(string(msg.Body))
_ = msg.Ack(true)
}
d <- true
}()
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
<-sigs
if err := ch.Cancel("consumer", false); err != nil {
panic(err)
}
fmt.Println("cancel consume")
if err := ch.Close(); err != nil {
panic(err)
}
if err := conn.Close(); err != nil {
panic(err)
}
<-d
fmt.Println("terminate")
}
go run consumer.go (A session)
go run consumer.go (B session)
func main() {
conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
ch, err := conn.Channel()
if err != nil {
panic(err)
}
for i := 0; i < 10000; i++ {
err := ch.Publish("", "queue", false, false, amqp091.Publishing{
DeliveryMode: 0,
ContentType: "text/plain",
Body: []byte(fmt.Sprintf("%d", i)),
})
if err != nil {
panic(err)
}
}
if err := ch.Close(); err != nil {
panic(err)
}
if err := conn.Close(); err != nil {
panic(err)
}
fmt.Println("terminate")
}
root@2d6192fe4fed:/# rabbitmqctl list_consumers
Listing consumers in vhost / ...
queue_name channel_pid consumer_tag ack_required prefetch_count active arguments
queue <[email protected].0> consumer true 200 false []
queue <[email protected].0> consumer true 200 true []
go run publisher.go
# active consumer log
1
2
3
4
...
^C (it will be print text "terminate")
root@2d6192fe4fed:/# rabbitmqctl list_consumers
Listing consumers in vhost / ..
As a result, I was expecting a transition from A to B , but it didn't.
All consumers are gone.
However, I found error logs of rabbitmq server.
It's too long, so I'm attaching it as a file.
rabbit-error-logs.txt
It doesn't seem to be a problem with amqp091-go.
I'd like to bump this issue raised in streadway/amqp:
It's unfortunate that it is not possible to determine if a specific message was/wasn't returned. Is there a workaround that I am missing?
I notice increasing demand/expectations from user for built-in integration with Distributed tracing.
One such example is Knative where Distributed tracing was integrated from the start.
Some time ago two major parties - OpenCensus and OpenTracing merged to OpenTelemetry and now Tracing API considered mature - https://github.com/open-telemetry/opentelemetry-go.
As a first step towards full Distributed tracing support I propose to integrate Tracing context propagation.
Feedback is much appreciated!
go mod init test-amqp
amqp.go
:package amqp_client
import (
"errors"
"log"
"os"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type Client struct {
queueName string
logger *log.Logger
connection *amqp.Connection
channel *amqp.Channel
done chan bool
notifyConnClose chan *amqp.Error
notifyChanClose chan *amqp.Error
notifyConfirm chan amqp.Confirmation
isReady bool
}
const (
// When reconnecting to the server after connection failure
reconnectDelay = 5 * time.Second
// When setting up the channel after a channel exception
reInitDelay = 2 * time.Second
// When resending messages the server didn't confirm
resendDelay = 5 * time.Second
)
var (
errNotConnected = errors.New("not connected to a server")
errAlreadyClosed = errors.New("already closed: not connected to the server")
errShutdown = errors.New("client is shutting down")
)
// New creates a new consumer state instance, and automatically
// attempts to connect to the server.
func New(queueName, addr string) *Client {
client := Client{
logger: log.New(os.Stdout, "", log.LstdFlags),
queueName: queueName,
done: make(chan bool),
}
go client.handleReconnect(addr)
return &client
}
// handleReconnect will wait for a connection error on
// notifyConnClose, and then continuously attempt to reconnect.
func (client *Client) handleReconnect(addr string) {
for {
client.isReady = false
client.logger.Println("Attempting to connect")
conn, err := client.connect(addr)
if err != nil {
client.logger.Println("Failed to connect. Retrying...")
select {
case <-client.done:
return
case <-time.After(reconnectDelay):
}
continue
}
if done := client.handleReInit(conn); done {
break
}
}
}
// connect will create a new AMQP connection
func (client *Client) connect(addr string) (*amqp.Connection, error) {
conn, err := amqp.Dial(addr)
if err != nil {
return nil, err
}
client.changeConnection(conn)
client.logger.Println("Connected!")
return conn, nil
}
// handleReconnect will wait for a channel error
// and then continuously attempt to re-initialize both channels
func (client *Client) handleReInit(conn *amqp.Connection) bool {
for {
client.isReady = false
err := client.init(conn)
if err != nil {
client.logger.Println("Failed to initialize channel. Retrying...")
select {
case <-client.done:
return true
case <-time.After(reInitDelay):
}
continue
}
select {
case <-client.done:
return true
case <-client.notifyConnClose:
client.logger.Println("Connection closed. Reconnecting...")
return false
case <-client.notifyChanClose:
client.logger.Println("Channel closed. Re-running init...")
}
}
}
// init will initialize channel & declare queue
func (client *Client) init(conn *amqp.Connection) error {
ch, err := conn.Channel()
if err != nil {
return err
}
err = ch.Confirm(false)
if err != nil {
return err
}
_, err = ch.QueueDeclare(
client.queueName,
false, // Durable
false, // Delete when unused
false, // Exclusive
false, // No-wait
nil, // Arguments
)
if err != nil {
return err
}
client.changeChannel(ch)
client.isReady = true
client.logger.Println("Setup!")
return nil
}
// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (client *Client) changeConnection(connection *amqp.Connection) {
client.connection = connection
client.notifyConnClose = make(chan *amqp.Error)
client.connection.NotifyClose(client.notifyConnClose)
}
// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (client *Client) changeChannel(channel *amqp.Channel) {
client.channel = channel
client.notifyChanClose = make(chan *amqp.Error)
client.notifyConfirm = make(chan amqp.Confirmation, 1)
client.channel.NotifyClose(client.notifyChanClose)
client.channel.NotifyPublish(client.notifyConfirm)
}
// Push will push data onto the queue, and wait for a confirm.
// If no confirms are received until within the resendTimeout,
// it continuously re-sends messages until a confirm is received.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (client *Client) Push(data []byte) error {
if !client.isReady {
return errors.New("failed to push: not connected")
}
for {
err := client.UnsafePush(data)
if err != nil {
client.logger.Println("Push failed. Retrying...")
select {
case <-client.done:
return errShutdown
case <-time.After(resendDelay):
}
continue
}
select {
case confirm := <-client.notifyConfirm:
if confirm.Ack {
client.logger.Println("Push confirmed!")
return nil
}
case <-time.After(resendDelay):
}
client.logger.Println("Push didn't confirm. Retrying...")
}
}
// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// receive the message.
func (client *Client) UnsafePush(data []byte) error {
if !client.isReady {
return errNotConnected
}
return client.channel.Publish(
"", // Exchange
client.queueName, // Routing key
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "text/plain",
Body: data,
},
)
}
// Consume will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (client *Client) Consume() (<-chan amqp.Delivery, error) {
if !client.isReady {
return nil, errNotConnected
}
return client.channel.Consume(
client.queueName,
"", // Consumer
false, // Auto-Ack
false, // Exclusive
false, // No-local
false, // No-Wait
nil, // Args
)
}
// Close will cleanly shutdown the channel and connection.
func (client *Client) Close() error {
if !client.isReady {
return errAlreadyClosed
}
close(client.done)
err := client.channel.Close()
if err != nil {
return err
}
err = client.connection.Close()
if err != nil {
return err
}
client.isReady = false
return nil
}
amqp_test.go
:package amqp_client
import "testing"
func TestAmqp(t *testing.T) {
client := New("test_queue", "amqp://guest:guest@localhost:5672/")
defer client.Close()
client.Push([]byte("test"))
}
docker run --rm -d --net host --name some-rabbit rabbitmq
-race
:go test -race ./...
Test fails with the following message:
==================
WARNING: DATA RACE
Write at 0x00c000100228 by goroutine 8:
test-amqp.(*Client).handleReconnect()
/tmp/test-amqp/amqp.go:57 +0x5d
test-amqp.New.func1()
/tmp/test-amqp/amqp.go:49 +0x58
Previous read at 0x00c000100228 by goroutine 7:
test-amqp.(*Client).Push()
/tmp/test-amqp/amqp.go:180 +0x68
test-amqp.TestAmqp()
/tmp/test-amqp/amqp_test.go:8 +0xc4
testing.tRunner()
/usr/local/go/src/testing/testing.go:1439 +0x213
testing.(*T).Run.func1()
/usr/local/go/src/testing/testing.go:1486 +0x47
Goroutine 8 (running) created at:
test-amqp.New()
/tmp/test-amqp/amqp.go:49 +0x324
test-amqp.TestAmqp()
/tmp/test-amqp/amqp_test.go:6 +0x4f
testing.tRunner()
/usr/local/go/src/testing/testing.go:1439 +0x213
testing.(*T).Run.func1()
/usr/local/go/src/testing/testing.go:1486 +0x47
Goroutine 7 (finished) created at:
testing.(*T).Run()
/usr/local/go/src/testing/testing.go:1486 +0x724
testing.runTests.func1()
/usr/local/go/src/testing/testing.go:1839 +0x99
testing.tRunner()
/usr/local/go/src/testing/testing.go:1439 +0x213
testing.runTests()
/usr/local/go/src/testing/testing.go:1837 +0x7e4
testing.(*M).Run()
/usr/local/go/src/testing/testing.go:1719 +0xa71
main.main()
_testmain.go:47 +0x2e4
==================
2022/04/16 07:32:08 Attempting to connect
2022/04/16 07:32:08 Connected!
2022/04/16 07:32:08 Setup!
Found 1 data race(s)
FAIL test-amqp 0.021s
FAIL
Go Version: 1.18.1-bullseye
Library version: v1.3.4
Hi,
I would like to ask if OpenTelemetry support should be part of this library.
When a service A communicate through rabbitmq to service B, service A needs to propagate the "TraceID" and "SpanID" to service B to measure the time needed.
I saw this issue which may help:
open-telemetry/opentelemetry-go-contrib#376
example_client_test.go
shows a great way how to build a Producer with reconnection support.
The Consuming part is not covering the reconnection support:
amqp091-go/example_client_test.go
Lines 257 to 274 in 51fade5
Even if reconnection support is mentioned as a non-goal for the library itself, I do think showcasing this in the example might be a good thing.
Right now, the implementation is only possible with a hack involved.
Mainly due to these two issues:
These deadlocks prevent the consumer delivery channel to be closed during an unexpected connection loss (e.g., when the RabbitMQ server goes down).
The dead lock happens when the connection is closing. I lost the core dump since then. But here is the sequence:
connection.NotifyClose
.The documentation should state that a separate Go routines should be dedicated to consume the connection close notification Go channel.
The code sends two messages in sequence - first is bad and next one is good:
err := channel.Publish("Exchange_does_not_exist", .....)
/* here err == nil /
/ message is not delivered - exchange does not exist - ok, but channel.IsClosed() == false !! */
err := channel.Publish(good_message)
here err == nil
channel.IsClosed() == false !!
message is not delivered with no error. It was sent to an emptiness...
In some seconds channel.IsClosed() becomes "true"...
I am forced to check the channel status with channel.QueueInspect() - it works.
Question:
Why channel.Publish() always returns nil?
Why channel.IsClosed() switches to true not at once and some subsequent valid messages go to emptiness with no error?
Hello,
I have one channel with 2 queue and after a few minutes (about 15 minutes) I got 504 error :
Exception (504) Reason: channel/connection is not open
My code:
RabbitConnection, err := rabbitmq.Dial("amqp://" + username + ":" + password + "@" + host + "/")
if err != nil {
return err
}
//conn.Close()
RabbitChannel, err = RabbitConnection.Channel()
if err != nil {
return err
}
//RabbitChannel.Close()
_, err = RabbitChannel.QueueDeclare(
LsQueue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
_, err = RabbitChannel.QueueDeclare(
CoreQueue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
err = RabbitChannel.Confirm(false)
if err != nil {
return err
}
[root@edge-chengli-00001 rabbitmq-queue-declarer]# docker logs -f 81d03ce572f3
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x54 pc=0x7aaddc]
goroutine 465 [running]:
github.com/rabbitmq/amqp091-go.(*Channel).setClosed(...)
/go/pkg/mod/github.com/rabbitmq/[email protected]/channel.go:94
github.com/rabbitmq/amqp091-go.updateChannel(...)
/go/pkg/mod/github.com/rabbitmq/[email protected]/types.go:321
github.com/rabbitmq/amqp091-go.(*Connection).dispatchN(0xc000344140, {0xd7a958, 0xc00019e4e0?})
/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:516 +0xbc
github.com/rabbitmq/amqp091-go.(*Connection).demux(0xc000085f28?, {0xd7a958, 0xc00019e4e0})
/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:477 +0x5b
github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc000344140, {0xd78300?, 0xc0005aa090?})
/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:577 +0x227
created by github.com/rabbitmq/amqp091-go.Open
/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:258 +0x34a
I found an issue when:
I caused my program to hang, now I'm just deleting the connection.
Don't know if this is intended, but better to report it :)
The AMQP 0-9-1 reference guide at https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.update-secret describes a method to update a secret / password on existing connections without having to disconnect and reconnect again. This makes things a lot easier if authentication is done using the rabbitmq_auth_backend_oauth2 which comes with JWT tokens that have a certain lifespan. If such a token expires and it is not updated, this results in a unusable connection. Updating the token results in a usable connection again without having to deal with the hassle of reconnecting and thus having to resubscribe consumers.
Would be very nice if this could be supported in future versions as we heavily depend on this feature for authentication and authorization means in our backend.
I'm using NotifyPublish to get confirms.
At the end of the run, I'm trying to close the connection and the associate channel, but it hangs indefinitely.
Here's a test to reproduce the issue (reproduces 80-90% of the time):
func TestCloseHandBug(t *testing.T) {
ctx := context.Background()
testQueue := "test"
c, err := amqp091.Dial(fmt.Sprintf("amqp://%s:%s@%s%s", Config.Username, Config.Password, Config.Hostname, Config.VHost))
if err != nil {
t.Fatalf("Error connecting to server: %s", err)
}
t.Log("connected")
closeChan := make(chan struct{})
// close connection
defer func() {
close(closeChan)
if c != nil {
t.Log("disconnecting")
//TODO: program hangs here - removing c.close and adding
//TODO: <-time.After(time.Second * 120) doesn't help. closing channel hangs
if err := c.Close(); err != nil {
t.Logf("disconnect error: %s", err)
return
}
}
c = nil
t.Log("disconnected")
}()
ch, err := c.Channel()
if err != nil && err == amqp091.ErrClosed {
t.Fatalf("couldn't open channel: %s", err)
}
err = ch.Qos(
16, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
t.Fatalf("couldn't configure qos: %s", err)
}
// handle confirms TODO: without this, disconnection works
confirms := ch.NotifyPublish(make(chan amqp091.Confirmation))
go func() {
for {
select {
case <-confirms:
// do something here in real scenario
continue
case <-closeChan:
return
case <-ctx.Done():
return
}
}
}()
err = ch.Confirm(false)
if err != nil {
t.Fatalf("couldn't configure confirm: %s", err)
}
ch.ExchangeDeclare("amq.topic", "topic", true, false, false, false, nil)
// handle close channel TODO: removing this func doesn't solve the issue
go func() {
for {
select {
case <-closeChan:
t.Logf("closing channel. id=%d",1)
//TODO: program hangs here as well
if err := ch.Close(); err != nil {
t.Logf("closing channel err. id=%d. err=%d",1, err)
return
}
t.Logf("channel closed. id=%d",1)
return
case <-ctx.Done():
return
}
}
}()
_, err = ch.QueueDelete(testQueue, false, false, false)
if err != nil {
t.Fatalf("error deleting test queue: %s", err)
}
key := "subscribe.send"
_, err = ch.QueueDeclare(testQueue, true, false, false, false, nil)
if err != nil {
t.Fatalf("error declaring queue: %s", err)
}
err = ch.QueueBind(testQueue, key, "amq.topic", false, nil)
if err != nil {
t.Fatalf("error binding queue: %s", err)
}
delivery, err := ch.Consume(testQueue, "", false, false, false, false, nil)
if err != nil {
t.Fatalf("error consuming: %s", err)
}
expected := "hello world!"
t.Logf("sending. id=%d, data=%s",1, []byte(expected))
m := amqp091.Publishing{
ContentType: "text/plain",
Body: []byte(expected),
DeliveryMode: amqp091.Persistent,
}
err = ch.Publish("amq.topic", key, true, false, m)
if err != nil {
t.Fatalf("Error publishing: %s", err)
}
var actual string
select {
case message := <-delivery:
message.Ack(false)
actual = string(message.Body)
case <-time.After(time.Second / 2):
t.Fatalf("Timeout recieving message")
}
if actual != expected {
t.Fatalf("Expected message %s, but got %s", expected, actual)
}
}
Here is the result:
=== RUN TestCloseHandBug
provider_test.go:125: connected
provider_test.go:224: sending. id=1, data=hello world!
provider_test.go:133: disconnecting
provider_test.go:187: closing channel. id=1
l <- confirmation
in func (c *confirms) confirm(confirmation Confirmation)
c.destructor.Do(func() {
in func (c *Connection) shutdown(err *Error)
, originating in connection.closec.destructor.Do(func() {
in func (c *Connection) shutdown(err *Error)
, originating in channel.closec.m.Lock()
in func (c *confirms) Close() error
(originating in go c.shutdown(&Error{
)Notes:
Hello
package main
import (
"context"
"github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
var wg sync.WaitGroup
func amqp(ctx context.Context) {
defer func() {
wg.Done()
}()
c, err := amqp091.Dial("amqp://guest:[email protected]:5672/")
if err != nil {
panic("connection error")
}
defer c.Close()
<-ctx.Done()
}
const n = 16
func main() {
ctx, cancel := context.WithCancel(context.Background())
wg.Add(n)
for i := 0; i < n; i++ {
go amqp(ctx)
}
cancel()
wg.Wait()
time.Sleep(time.Hour)
}
The above code causes 100% CPU usage after a short while
The problem does not always occur, so you may have to run it several times to reproduce the problem
Strace attached after the problem occurred:
strace: Process 3167635 attached
% time seconds usecs/call calls errors syscall
------ ----------- ----------- --------- --------- ----------------
100,00 4,571892 4571892 1 1 futex
0,00 0,000010 5 2 rt_sigprocmask
0,00 0,000002 1 2 getpid
0,00 0,000001 0 2 gettid
0,00 0,000001 0 2 tgkill
0,00 0,000000 0 1 rt_sigaction
------ ----------- ----------- --------- --------- ----------------
100.00 4,571906 10 1 total
Steps to reproduce
The call to Delivery.Ack blocks indefinitely instead of returning an error.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.