Coder Social home page Coder Social logo

rabbitmq / amqp091-go Goto Github PK

View Code? Open in Web Editor NEW
1.3K 1.3K 131.0 1.12 MB

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`

License: Other

Go 97.07% Shell 0.76% Makefile 0.48% PowerShell 1.69%

amqp091-go's People

Contributors

0x6e6562 avatar akrennmair avatar alraujo avatar andygrunwald avatar chunyilyu avatar danielepalaia avatar dependabot[bot] avatar domodwyer avatar edercarloscosta avatar fho avatar gerhard avatar hagna avatar hjr265 avatar hodbn avatar kpurdon avatar lukebakken avatar makasim avatar michaelklishin avatar mirahimage avatar mordyovits avatar peterbourgon avatar pinkfish avatar pnordahl avatar raqbit avatar slagiewka avatar t2y avatar tie avatar umapathy-cisco avatar yywing avatar zerpet avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqp091-go's Issues

Provide a friendly way to set connection name

The client provided connection name is very useful to identify connections in Management UI and during debugging sessions. In the Java Client, it's quite easy to set a client provided name with ConnectionFactory.newConnection(string clientName). In this client, a user has to set a configuration property with a specific name, for example:

connection, err := amqp.DialConfig(
		amqpURI,
		amqp.Config{Properties: map[string]interface{}{"connection_name": "sample-producer"}},
	)

This isn't great because the user is required to know the property key connection_name and the Properties map has to be crafted manually.

We should either provide a simple function like amqp.SetConnectionName(amqp.Config, string), or a constant with the connection name key (and any other well-known properties).

Provide a logging interface

Reference: redis/go-redis#1279

There are several places in PR #78 where a panic has been inserted if an error is returned from low-level functions. Panicking is not appropriate, but neither is disregarding the error. These errors should be logged at the error or warning level.

unable to use properties

hey, I want to use the x-delayed-message plugin for rabbitmq. the rabbitmq document says:

 AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);  

How can I do this with the amqp091-go package?

Semver release

Can you create a release for this project and follow semantic versioning please? It is ok to start with v0 version to allow backward incompatible changes. So v0.1.0 could be a good start.

This will allow users to pin specific version instead of commit hash and setup auto-updating (e.g. Dependabot).

Thanks!

Documentation

I notice that this docs are copy pasted from Channel.NotifyClose method, but receiver didn't change, so it should be Connection insted of Channel

amqp091-go/connection.go

Lines 284 to 288 in 8b6de9a

The chan provided will be closed when the Channel is closed and on a
graceful close, no error will be sent.
In case of a non graceful close the error will be notified synchronously by the library
so that it will be necessary to consume the Channel from the caller in order to avoid deadlocks

Add possible cancel of DeferredConfirmation

In case of network error (supposed), a program using DeferredConfirmation can wait indefinitely on .Wait() method.
Linked issue in benthos: benthosdev/benthos#1299

The program can implement a timeout over the wait method but it will not clear it in amqp lib from confirms/deferredConfirmations of the channel.

First, is ok for you to add a cancel solution (new method, add context, ...) ?
If yes, I see many possible solution:

  • Add cancellable context to PublishWithDeferredConfirm and clear linked confirmation at context cancel
  • Add a CancelConfirm method to Channel that would call confirm.One (or new method Cancel)
  • Add cancellable context to (d *DeferredConfirmation) Wait() (this need change so that it can clear it self from deferredConfirmations)
  • Add Cancel method to DeferredConfirmation (this need change so that it can clear it self from deferredConfirmations)

Which one suit you the most ? or if you have an other to suggest ?

Exception (505) Reason: "UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead"

I've been a long-time user of the streadway/amqp library. Recently updated to this repository.

Since then, I have one instance of the error in the subject being sent as a close notification (Exception (505) Reason: "UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead"). This happened very shortly after program start (within the first second).

Searching around online, the most common explanation seems to be "you're using concurrency, but the library isn't designed for concurrency". However this library has a lock on Publish(), which serializes everything. I don't think the explanation is so simple in this instance.

My usage is fairly vanilla. There are pub-sub "fanout" exchanges. The publish happens with

c.Publish("", key, true, false,
	amqp091.Publishing{
		DeliveryMode: amqp091.Persistent,
		ContentType:  contentType,
		Body:         body,
	})

The message itself is binary. It may be small (several bytes), or it may be large (100kb+, but probably not as big as 1MB).

What I'm looking for are some plausible theories as to how this could happen. Is there some sort of race at startup? Are we meant to wait for something after ExchangeDeclare? We also have a consumer subscription on the same Channel. I don't think consuming messages can cause this sort of thing, but maybe in some weird instance?

Is there some additional information I should be looking at?

AMQPLAIN authentication does not work

I'm copying and pasting from streadway/amqp#514, since I guess this fork is more actively maintained :)

Hi,

I've encountered an issue when trying to use AMQPLAIN authentication. It seems that the current implementation does not work out of the box.

I looked for an official reference for the AMQPLAIN authentication but could not find any, but other implementations have different wire formats.

To reproduce, run this gist against a rabbitmq container:

% docker run -d -p 5672:5672 rabbitmq
% go run main.go guest guest amqp://127.0.0.1:5672/
2021/09/09 10:51:38 can't dial: Exception (403) Reason: "username or password not allowed"

I'll add a PR later.

Connection/Channel Deadlock

Hi, I already covered created an issue for this on streadway/amqp#518 but then it was for streadway/amqp the issue still reproduces with this library so I decided to add the issue here too

I have created a sample project with few files that you have to download and run the main to reproduce the issue, the files are on:
https://gist.github.com/melardev/1b9c7e1b1a4ac37cb31e57dc6cde99c7

The code is bad, it is not thread-safe, but It was easier to reproduce my issue with that code rather than my real project that was doing effort to keep things thread-safe, the idea is to show the framework hanging yet the connection is already closed, it may get stuck in any function call, mainly QueueDeclare and channel.Close() but most of the times it gets stuck on channel.Close() the issue is for closing a channel it has to acquire a Lock https://github.com/rabbitmq/amqp091-go/blob/main/connection.go#L640, which is already held here https://github.com/rabbitmq/amqp091-go/blob/main/connection.go#L408, basically the connection is waiting the channel to be closed but the channel::shutdown() never exits its main loop, even if the connection is closed ...

Selection_389

After a while consumer utilisation is 0% and consumer not working inside goroutine

This is my complete RabbitMQ code inside rabbitmq package.I use StartConsumer function for Consume message from rabbitmq with goroutine.

go rabbit.StartConsumer("myqueue", "", handler, 1)

After a while my consumer doesn't working and doesn't receive any message.Also I cannot see any error/issue.

package rabbitmq

import (
	"context"
	"fmt"
	"os"
	"runtime"
	"time"

	"github.com/getsentry/sentry-go"
	log "github.com/sirupsen/logrus"
	amqp "github.com/rabbitmq/amqp091-go"
)

type RabbitMQ struct {
	conn                  *amqp.Connection
	queues                map[string]amqp.Queue
	connString            string
	rabbitConnCloseError  chan *amqp.Error
	rabbitChannCloseError chan *amqp.Error
	recoveryConsumer      []RecoveryConsumer
	ch                    *amqp.Channel
	// exchange_name string
}

type RecoveryConsumer struct {
	queueName   string
	routingKey  string
	handler     func(d amqp.Delivery)
	concurrency int8
}

type (
	Delivery = amqp.Delivery
)

func (r *RabbitMQ) IfExist(queueName string) bool {
	for _, item := range r.recoveryConsumer {
		if item.queueName == queueName {
			return false
		}
	}
	return true
}

func (r *RabbitMQ) RecoverConsumers() {
	for _, i := range r.recoveryConsumer {
		go r.StartConsumer(i.queueName, i.routingKey, i.handler, int(i.concurrency))
		log.Infof("Consumer for %v successfully recovered", i.queueName)
	}
}

func (r *RabbitMQ) Reconnector() {
	for { //nolint //nolint
		select {
		case err := <-r.rabbitConnCloseError:
			log.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server)
			log.Debug("[RabbitMQ] Reconnecting after connection closed")
			sentry.CaptureException(fmt.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server))
			r.connection()
			r.RecoverConsumers()
		case err := <-r.rabbitChannCloseError:
			log.Errorf("[RabbitMQ] Channel Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server)
			log.Debug("[RabbitMQ] Reconnecting after channel closed")
			sentry.CaptureException(fmt.Errorf("[RabbitMQ] Channel Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server))
			r.ch.Close()
			r.RecoverConsumers()
		}
	}
}

func (r *RabbitMQ) Connect(host string, user string, pass string, virthost string) {
	r.connString = "amqp://" + user + ":" + pass + "@" + host + "/"
	if virthost != "/" || len(virthost) > 0 {
		r.connString += virthost
	}
	r.connection()
	go r.Reconnector()
}

func (r *RabbitMQ) connection() {
	if r.conn != nil {
		if !r.conn.IsClosed() {
			return
		} else {
			log.Info("Reconnecting to RabbitMQ...")
		}
	}

	var err error
	r.conn, err = amqp.Dial(r.connString)
	if err != nil {
		sentry.CaptureException(err)
		log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
	}
	r.conn.Config.Heartbeat = 5 * time.Second
	r.queues = make(map[string]amqp.Queue)

	r.rabbitConnCloseError = make(chan *amqp.Error)
	r.conn.NotifyClose(r.rabbitConnCloseError)
	log.Debug("[RabbitMQ] Successfully connected to RabbitMQ")
	log.Infof("Number of Active Thread/Goroutine %v", runtime.NumGoroutine())
}

func (r *RabbitMQ) CreateChannel() *amqp.Channel {
	ch, err := r.conn.Channel()
	if err != nil {
		log.Error(err)
		return nil
	}
	return ch
}

func (r *RabbitMQ) QueueAttach(ch *amqp.Channel, name string) {
	q, err := ch.QueueDeclare(
		name,  // name
		true,  // durable
		false, // delete when unused
		false, // exclusive
		false, // no-wait
		nil,   // arguments
	)
	if err != nil {
		log.Fatalf("%s: %s", "Failed to declare a queue", err)
	}
	r.queues[name] = q
	// r.ch.ExchangeDeclare()
}

func (r *RabbitMQ) TempQueueAttach(ch *amqp.Channel, name string) {
	_, err := ch.QueueDeclare(
		name,  // name
		true,  // durable
		false, // delete when unused
		false, // exclusive
		false, // no-wait
		nil,   // arguments
	)
	if err != nil {
		ch.Close()
		log.Fatalf("%s: %s", "Failed to declare a temporary queue", err)
		sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
	}
}

func (r *RabbitMQ) Publish(ch *amqp.Channel, queue string, body []byte) {
	span := sentry.StartSpan(context.TODO(), "publish message")
	defer span.Finish()
	err := ch.Publish(
		"",                   // exchange
		r.queues[queue].Name, // routing key
		false,                // mandatory
		false,                // immediate
		amqp.Publishing{
			Headers:         map[string]interface{}{},
			ContentType:     "application/json",
			ContentEncoding: "",
			DeliveryMode:    amqp.Persistent,
			Priority:        0,
			CorrelationId:   "",
			ReplyTo:         "",
			Expiration:      "",
			MessageId:       "",
			Timestamp:       time.Now().UTC(),
			Type:            "",
			UserId:          "",
			AppId:           "",
			Body:            body,
		})
	if err != nil {
		sentry.CaptureException(err)
		log.Fatalf("%s: %s", "Failed to publish a message", err)
	}

	log.Debugf("Send message: %s", string(body))
}

func (r *RabbitMQ) StartConsumer(queueName string, routingKey string, handler func(d amqp.Delivery), concurrency int) {
	// prefetch 4x as many messages as we can handle at once
	var err error
	ok := r.IfExist(queueName)
	if ok {
		r.recoveryConsumer = append(r.recoveryConsumer, RecoveryConsumer{
			queueName:   queueName,
			routingKey:  routingKey,
			handler:     handler,
			concurrency: int8(concurrency),
		})
	}

	r.ch, err = r.conn.Channel()
	if err != nil {
		log.Error(err)
	}

	r.ch.NotifyClose(r.rabbitChannCloseError)

	prefetchCount := concurrency * 1
	err = r.ch.Qos(prefetchCount, 0, false)
	if err != nil {
		sentry.CaptureException(err)
		log.Errorf("%s: %s", "Failed QOS", err)
	}
	r.QueueAttach(r.ch, queueName)

	msgs, err := r.ch.Consume(
		queueName, // queue
		"",        // consumer
		true,      // auto-ack
		false,     // exclusive
		false,     // no-local
		false,     // no-wait
		nil,       // args
	)
	if err != nil {
		sentry.CaptureException(err)
		log.Fatalf("%s: %s", "Failed consume message", err)
		sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
		os.Exit(1)
	}

	go func() {
		for msg := range msgs {
			handler(msg)
		}
		log.Error("[RabbitMQ] Rabbit consumer closed")
	}()
}

func (r *RabbitMQ) WaitMessage(ch *amqp.Channel, queueName string, timeout time.Duration) []byte {
	st := time.Now()
	for time.Since(st).Seconds() < 1 {
		msg, ok, err := ch.Get(queueName, true)
		if err != nil {
			log.Errorf("Can't consume queue. Error: %s", err.Error())
			sentry.CaptureException(err)
			return nil
		}
		if ok {
			return msg.Body
		}
		time.Sleep(50 * time.Millisecond)
	}
	return nil
}
`

tls problem

Hello.

https://go.dev/play/p/cuOKquzMu7K
2022/10/11 19:50:50 Failed to connect to RabbitMQ: x509: certificate signed by unknown authority
exit status 1

Please help.
tell me how to connect a certificate in this code so that you can contact the rabbit server with an example?

Potential race condition in Connection module

Running our tests with -race flag, it reports a race condition in the Connection module. Pasting here one stack trace to investigate.

๏ปฟWARNING: DATA RACE
Write at 0x00c00022a050 by goroutine 22:
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown.func1()
      /Users/acedres/workspace/amqp091-go/connection.go:443 +0x50e
  sync.(*Once).doSlow()
      /usr/local/Cellar/go/1.17.3/libexec/src/sync/once.go:68 +0x127
  sync.(*Once).Do()
      /usr/local/Cellar/go/1.17.3/libexec/src/sync/once.go:59 +0x46
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown()
      /Users/acedres/workspace/amqp091-go/connection.go:407 +0x7d
  github.com/rabbitmq/amqp091-go.(*Connection).reader()
      /Users/acedres/workspace/amqp091-go/connection.go:542 +0x344
  github.com/rabbitmq/amqp091-go.Openยทdwrapยท20()
      /Users/acedres/workspace/amqp091-go/connection.go:250 +0x58

Previous write at 0x00c00022a050 by goroutine 20:
  github.com/rabbitmq/amqp091-go.(*Connection).openComplete()
      /Users/acedres/workspace/amqp091-go/connection.go:847 +0x13d
  github.com/rabbitmq/amqp091-go.(*Connection).openVhost()
      /Users/acedres/workspace/amqp091-go/connection.go:832 +0x1ce
=== RUN   TestChannelOpen
  github.com/rabbitmq/amqp091-go.(*Connection).openTune()
      /Users/acedres/workspace/amqp091-go/connection.go:818 +0xb04
  github.com/rabbitmq/amqp091-go.(*Connection).openStart()
      /Users/acedres/workspace/amqp091-go/connection.go:754 +0x4e7
  github.com/rabbitmq/amqp091-go.(*Connection).open()
      /Users/acedres/workspace/amqp091-go/connection.go:726 +0xc4
  github.com/rabbitmq/amqp091-go.Open()
      /Users/acedres/workspace/amqp091-go/connection.go:251 +0x664
  github.com/rabbitmq/amqp091-go.TestOpen()
      /Users/acedres/workspace/amqp091-go/client_test.go:275 +0x267
  testing.tRunner()
      /usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Runยทdwrapยท21()
      /usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1306 +0x47

Goroutine 22 (running) created at:
  github.com/rabbitmq/amqp091-go.Open()
      /Users/acedres/workspace/amqp091-go/connection.go:250 +0x618
  github.com/rabbitmq/amqp091-go.TestOpen()
      /Users/acedres/workspace/amqp091-go/client_test.go:275 +0x267
  testing.tRunner()
      /usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Runยทdwrapยท21()
      /usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1306 +0x47

Goroutine 20 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1306 +0x726
  testing.runTests.func1()
      /usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1598 +0x99
  testing.tRunner()
      /usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1259 +0x22f
  testing.runTests()
      /usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1596 +0x7ca
  testing.(*M).Run()
      /usr/local/Cellar/go/1.17.3/libexec/src/testing/testing.go:1504 +0x9d1
  main.main()
      _testmain.go:251 +0x22b

Consumer channel isn't closed in the event of unexpected disconnection

This is a simplified version of my code.

package main

import (
	amqp "github.com/rabbitmq/amqp091-go"
	"log"
)

func main() {
	amqpUri := "amqpuri"
	conn, err := amqp.Dial(amqpUri)
	if err != nil {
		return
	}
	notifyConnClose := make(chan *amqp.Error)
	conn.NotifyClose(notifyConnClose)
	log.Println("RabbitMQ client connected")

	ch, err := conn.Channel()
	if err != nil {
		return
	}
	notifyChanClose := make(chan *amqp.Error)
	ch.NotifyClose(notifyChanClose)

	queueName := "testqueue"
	_, err = ch.QueueDeclare(queueName, false, false, false, false, nil)
	if err != nil {
 		return
	}

	deliveryChan, err := ch.Consume(queueName, "", false, false, false, false, nil)
	if err != nil {
		return
    	}

	go func() {
	select {
	case <-notifyConnClose:
		log.Println("connection closed")
	case <-notifyChanClose:
		log.Println("channel closed")
	}
	}()

	for d := range deliveryChan {
	log.Println(string(d.Body))
	d.Ack(false)
	//ch.Close()  //comment out to test graceful close.
	}

	log.Println("terminating...")
}

In this code I am getting a connection and a channel and registering a notification (go)channel for both the connection and channel to be notified when they are closed.

Then I declare a queue and start consuming messages from it by ranging on the deliveryChan <-chan amqp.Delivery returned by the consume function.

The problem happens when an unexpected disconnection occurs (for example I turn off my internet) . In that case even though the notifyConnClose channel gets a message the deliveryChan is not closed, and the range loop blocks forever.

In the event of a graceful disconnection by a connection.Close() then both the notifyConnClose gets a message, and the deliveryChan is Closed.

In the event of the unexpected disconnection, given that I can't close the <-chan amqp.Delivery from my code how am I supposed to proceed and get the loop to end?

reader go routine hangs and leaks when Connection.Close() is called multiple times

Hello,

we are using go.uber.org/goleak for our internal amqp wrapper package and stumbled over a go-routine leak.
This happens in an internal testcase where we send 2 messages to a non-existing exchange. This causes an error that is received by our reconnect goroutine on the NotifyClose channel which to close the connection and then the connection is also closed in our client (after the reconnect go-routine terminated).
In our testcase we only call 1x close on the channel (unnecessary) and 1x Close on the connection.

I managed to reproduce the same hang and go-routine leak when calling Close in parallel on the same connection. This is not the same scenario that happens in our internal testcase but it reproduces it. :-)

I'm using amqp091-go commit 6cac2fa.

This issue can be reproduced with the following testcase:

//go:build integration
// +build integration

package amqp091

import (
	"sync"
	"testing"

	"go.uber.org/goleak"
)

func TestGoRoutineLeakOnParallelConClose(t *testing.T) {
	const routines = 2
	defer goleak.VerifyNone(t)
	c := integrationConnection(t, t.Name())

	var wg sync.WaitGroup
	startSigCh := make(chan interface{})

	for i := 0; i < routines; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()

			<-startSigCh

			err := c.Close()
			if err != nil {
				t.Logf("close failed in routine %d: %s", id, err.Error())
			}
		}(i)
	}
	close(startSigCh)

	t.Log("waiting for go-routines to terminate")
	wg.Wait()
}

Test output:

=== RUN   TestGoRoutineLeakOnParallelConClose
    bug_test.go:36: waiting for go-routines to terminate
    bug_test.go:30: close failed in routine 0: Exception (504) Reason: "channel/connection is not open"
    leaks.go:78: found unexpected goroutines:
        [Goroutine 11 in state chan send, with github.com/rabbitmq/amqp091-go.(*Connection).dispatch0 on top of the stack:
        goroutine 11 [chan send]:
        github.com/rabbitmq/amqp091-go.(*Connection).dispatch0(0xc0001263c0, {0x8c5828?, 0xc00000e240})
        	/home/fho/git/amqp091-go/connection.go:483 +0x40f
        github.com/rabbitmq/amqp091-go.(*Connection).demux(0xc00005af18?, {0x8c5828, 0xc00000e240})
        	/home/fho/git/amqp091-go/connection.go:456 +0x59
        github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc0001263c0, {0x8c4980?, 0xc000010068})
        	/home/fho/git/amqp091-go/connection.go:550 +0x225
        created by github.com/rabbitmq/amqp091-go.Open
        	/home/fho/git/amqp091-go/connection.go:251 +0x5eb
        ]
--- FAIL: TestGoRoutineLeakOnParallelConClose (0.46s)

On my machine it happens on almost every execution, some succeed without the leak though.
It might be necessary to run the testcase multiple times to run into the issue:

while go test -race -count=1 -run=TestGoRoutineLeakOnParallelConClose -v -tags integration; do : ; done

Update:
I think I now understand how it happens:

  • Close() is called 2x, both calls send out a close message1
  • For both messages the reader reads the response frame, calls c.demux() and passed the msg to the rpc chan2
  • The shutdown closes the errors channel3
  • for one of the received responses the call() method did not read the msg from the rpc channel yet, it is waiting in the select loop4, because the errors channel is closed the call() returns without reading the msg from the rpc channel, the reader go-routine hangs forever in dispatch0() trying to send the msg to the rpc channel 2 because it is unbuffered

I guess this scenario, that call() returns before reading a msg from the rpc chan could also get triggered when Close() is called only 1x but an error happened, shortly after a message response was received.
call() could return because an error is read from c.errors while dispatch0 is sending a message to the rpc chan.

Footnotes

  1. https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L350 โ†ฉ

  2. https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L483 โ†ฉ โ†ฉ2

  3. https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L425 โ†ฉ

  4. https://github.com/rabbitmq/amqp091-go/blob/6cac2faf74b0e761395b4da4ebfa3fe4a8eb8b59/connection.go#L692 โ†ฉ

Publishing Headers are unable to store Table with slice values

I'm trying to publish message similar to:

channel.PublishWithContext(
        ctx,
        exchange,
        messageName,
        false,
        false,
        amqp091.Publishing{
            ContentType:  "application/json",
            ContentEncoding: "utf-8",
            Body:  payload,
            Headers: amqp091.Table{"_trace": []string{"a", "b"}},
        }
)

and i receive error
table field "_trace" value []string not supported
I guess problem lies on slice validation https://github.com/streadway/amqp/blob/master/types.go#L232, shouldn't it be done using reflect?
Passing strings vector is not a problem via PHP amqp lib, so it seems to me that this is not the limitation of a rabbitmq.

I'm using

  • go version 1.18.1
  • amqp091-go v1.5.0

Can not supprot stream queue with SAC

I use Rabbitmq3.11.3, and the official say "Single active consumer for streams is a feature available in RabbitMQ 3.11 and more."
but when I use amqp091-go to declare a queue which type is steam,just like this:
queue, err := channel.QueueDeclare(
"q5",
true,
false,
false,
false,
amqp091.Table{
"x-single-active-consumer": true,
"x-queue-type": "stream",
},
)
it always report error๏ผš 2022/11/23 16:56:19 Failed to declare a queue: Exception (406) Reason: "PRECONDITION_FAILED - invalid arg 'x-single-active-consumer' for queue 'q5' in vhost '/' of queue type rabbit_stream_queue"
How can I do for it?

copyright issue?

It's great that this project is being actively maintained. While investigating a race condition I noticed that 65f6e25 updated the source code link in the code files, but simultaneously replaced the existing copyright in the code files with one for VMWare or affiliates, rather than just adding the new claim.

Based on changes in other commits such as 34291f5 I believe the VMWare copyright should have been added to the files, leaving the existing copyright in place. Unless the original contributors to this project have formally transferred their rights to VMWare?

Remove deprecated API functions

Follow-up to #96

Version 2 should remove deprecated API functions and "clean up" those that are left, perhaps coming up with a better name than PublishWithDeferredConfirm.

To be honest if you're publishing without confirmations the function should be called PublishButDontComplainWhenYouLoseData

Persistent messages folder

Hi,

When writing persistent messages using this library, the messages are being written to "queues" folder but while using php amqp library, it is being written to "msg_store_persistent" folder. Where is this configuration being set? This is not an issue with the library but I need help to figure this out, thanks.

Example client can deadlock in Close due to unconsumed confirmations

Discussed in #121

Originally posted by mgdotson September 28, 2022
Using the document example running against a local docker container: docker run -p5672:5672 -p 15672:15672 -p5671:5671 -p15692:15692 rabbitmq:3.9.17-management, putting the server into a blocking situation causes the channel.Close() to hang and not finish.

Steps to reproduce:

  1. Start the container with docker run above
  2. Start the server code and watch a couple of message confirmations
  3. In the docker image, run: rabbitmqctl set_vm_memory_high_watermark 0.00000001
  4. Watch the server code display "Push didn't confirm. Retrying..." (let 4 or so pass)
  5. In the docker image, run: rabbitmqctl set_vm_memory_high_watermark 0.4
  6. Notice Push confirmations happen
  7. Context will time out and run the queue.Close()

At this point, the code will close the done channel but will hang on the queue.channel.Close() command.

Even if we wrap this in a context timeout to allow the calling code finish, if this is a long running process, this could cause leaks over time, especially if there are multiple channels that end up in this situation due to a blocked server.

This is also not the only scenario that can case a channel.Close() call to hang.

Best practices? TCP settings?

Analysis

During a memory alarm, RabbitMQ won't read from the publisher channel; therefore, it does not send a confirmation before the client example "gives up" on the confirmation:

select {
case confirm := <-client.notifyConfirm:
if confirm.Ack {
client.logger.Println("Push confirmed!")
return nil
}
case <-time.After(resendDelay):
}

The problem is that the client sends a new message and does not wait for any previous "given up" confirmation. This is not correct. The documentation of Channel.NotifyPublish(), which works very similar to Channel.NotifyConfirm(), states:

It's advisable to wait for all Confirmations to arrive before calling Channel.Close() or Connection.Close().

It is also advisable for the caller to consume from the channel returned till it is closed to avoid possible deadlocks

The current implementation of the example client, does, in fact, deadlock in the situation described in the repro steps, as one Go routine is trying to deliver a confirmation, grabs a lock on the confirms struct and sends a notification to a chan amqp.Confirm, which nobody is listening to. Then, during the close sequence, Channel.Close() tries to confirms.Close(), which blocks on acquiring a lock on the confirms struct. Because nobody is receiving on the chan amqp.Confirm, this is a deadlock.

Go routines dump (only relevant two):

Goroutine 5
  runtime.gopark (/Users/acedres/go/go1.18.6/src/runtime/proc.go:362)
  runtime.chansend (/Users/acedres/go/go1.18.6/src/runtime/chan.go:258)
  runtime.chansend1 (/Users/acedres/go/go1.18.6/src/runtime/chan.go:144)
  github.com/rabbitmq/amqp091-go.(*confirms).confirm (/Users/acedres/workspace/amqp091-go/confirms.go:56)
  github.com/rabbitmq/amqp091-go.(*confirms).One (/Users/acedres/workspace/amqp091-go/confirms.go:82)
  github.com/rabbitmq/amqp091-go.(*Channel).dispatch (/Users/acedres/workspace/amqp091-go/channel.go:336)
  github.com/rabbitmq/amqp091-go.(*Channel).recvMethod (/Users/acedres/workspace/amqp091-go/channel.go:373)
  github.com/rabbitmq/amqp091-go.(*Connection).dispatchN (/Users/acedres/workspace/amqp091-go/connection.go:545)
  github.com/rabbitmq/amqp091-go.(*Connection).demux (/Users/acedres/workspace/amqp091-go/connection.go:500)
  github.com/rabbitmq/amqp091-go.(*Connection).reader (/Users/acedres/workspace/amqp091-go/connection.go:600)
  github.com/rabbitmq/amqp091-go.Open.func1 (/Users/acedres/workspace/amqp091-go/connection.go:265)
  runtime.goexit (/Users/acedres/go/go1.18.6/src/runtime/asm_arm64.s:1270)
  created at: github.com/rabbitmq/amqp091-go.Open (/Users/acedres/workspace/amqp091-go/connection.go:265)

Goroutine 35
  runtime.gopark (/Users/acedres/go/go1.18.6/src/runtime/proc.go:362)
  runtime.goparkunlock (/Users/acedres/go/go1.18.6/src/runtime/proc.go:367)
  runtime.semacquire1 (/Users/acedres/go/go1.18.6/src/runtime/sema.go:144)
  sync.runtime_SemacquireMutex (/Users/acedres/go/go1.18.6/src/runtime/sema.go:71)
  sync.(*Mutex).lockSlow (/Users/acedres/go/go1.18.6/src/sync/mutex.go:162)
  sync.(*Mutex).Lock (/Users/acedres/go/go1.18.6/src/sync/mutex.go:81)
  github.com/rabbitmq/amqp091-go.(*confirms).Close (/Users/acedres/workspace/amqp091-go/confirms.go:105)
  github.com/rabbitmq/amqp091-go.(*Channel).shutdown.func1 (/Users/acedres/workspace/amqp091-go/channel.go:148)
  sync.(*Once).doSlow (/Users/acedres/go/go1.18.6/src/sync/once.go:68)
  sync.(*Once).Do (/Users/acedres/go/go1.18.6/src/sync/once.go:59)
  github.com/rabbitmq/amqp091-go.(*Channel).shutdown (/Users/acedres/workspace/amqp091-go/channel.go:102)
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown.func1 (/Users/acedres/workspace/amqp091-go/connection.go:483)
  sync.(*Once).doSlow (/Users/acedres/go/go1.18.6/src/sync/once.go:68)
  sync.(*Once).Do (/Users/acedres/go/go1.18.6/src/sync/once.go:59)
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown (/Users/acedres/workspace/amqp091-go/connection.go:453)
  github.com/rabbitmq/amqp091-go.(*Connection).send.func1 (/Users/acedres/workspace/amqp091-go/connection.go:433)
  runtime.goexit (/Users/acedres/go/go1.18.6/src/runtime/asm_arm64.s:1270)
  created at: github.com/rabbitmq/amqp091-go.(*Connection).send (/Users/acedres/workspace/amqp091-go/connection.go:433)

documentation of changes triggering version updates

Thanks for keeping this package moving forward.

Would you consider keeping a changelog in the repository source?

I notice that this package is now version 1.2.0, compared to 1.1.0 that I've been using. I can find no clear description of the differences, but running a diff shows it contains #20, #23, and #25, and by looking at those I can infer at least #23 justifies the minor update.

If a CHANGELOG.md is maintained in the main branch and updated with each merge, that would make it easier to review at release time to ensure version tags are updated where appropriate. E.g. #26 has been merged which warrants a version update but that doesn't seem to be recorded anywhere.

Possible deadlock on DeferredConfirmation.Wait()

Hi, there. ๐Ÿ‘‹

I'm starting to migrate from github.com/streadway/amqp package and delve into the new features of this package. Currently I want to use the PublishWithDeferredConfirm method and it doesn't look production-ready to me. The case in which the problem occurs is closing the connection while waiting for publication confirmation from the AMQP server.

What I mean in more detail:
The type Channel has a shutdown method in which resources are cleared and closed, also here calls a Close method of confirms type. But this method does not properly clean up its resources such as deferredConfirmations an awaiting DeferredConfirmation.

dc, err := c.PublishWithDeferredConfirm(exchange, key, false, false, msg)
if err != nil {
	log.Fatalf("c.PublishWithDeferredConfirm: %s", err)
}
dc.Wait() // <- deadlock

Possible solution:
Add one more method in type deferredConfirmations and call it from confirms.Close.

// Close closes all awaiting DeferredConfirmation with Nack.
func (d *deferredConfirmations) Close() error {
	d.m.Lock()
	defer d.m.Unlock()

	for k, v := range d.confirmations {
		v.confirmation = Confirmation{DeliveryTag: k, Ack: false}
		v.wg.Done()
		delete(d.confirmations, k)
	}

	return nil
}

But it seems to me that such a situation should be handled with explicit error like ErrClosed. And it's breaking change of package API.

Package version: v1.3.0

Cannot install on go

I just can't install it over **go install github.com/rabbitmq/amqp091-go**
It shows

package github.com/rabbitmq/amqp091-go is not a main package

Can anyone help ?

A possible dead lock when publishing message with confirmation

I redirect an issue I found.

Channel.Publish is in dead lock state if confirmation notification Go channel is not consumed in time.

The core dump below shows how the dead lock happens on acquiring the mutex in confirms 0xc000b9b940.

  1. In Goroutine 98, the confirmation has arrived before the function Channel.Publish finished. It acquired the mutex in confirms 0xc000b9b940. It waits for consuming the confirmation notification channel.
  2. In Goroutine 2869, it wants to acquire the mutex in confirms 0xc000b9b940 to increase confirms.published.

The work around is to consume the confirmation notification Go channel in a separate Go routine and synchronize Publish with another Go channel.

What a mess!

(dlv) gr 2869 bt
 0  0x000000000043a4c5 in runtime.gopark
    at /usr/local/go/src/runtime/proc.go:307
 1  0x000000000044af85 in runtime.goparkunlock
    at /usr/local/go/src/runtime/proc.go:312
 2  0x000000000044af85 in runtime.semacquire1
    at /usr/local/go/src/runtime/sema.go:144
 3  0x000000000046c267 in sync.runtime_SemacquireMutex
    at /usr/local/go/src/runtime/sema.go:71
 4  0x0000000000487b45 in sync.(*Mutex).lockSlow
    at /usr/local/go/src/sync/mutex.go:138
 5  0x0000000000a82d9a in sync.(*Mutex).Lock
    at /usr/local/go/src/sync/mutex.go:81
 6  0x0000000000a82d9a in github.com/streadway/amqp.(*confirms).Publish
    at /go/pkg/mod/github.com/streadway/[email protected]/confirms.go:32
 7  0x0000000000a81edf in github.com/streadway/amqp.(*Channel).Publish
    at /go/pkg/mod/github.com/streadway/[email protected]/channel.go:1360
  ....

(dlv) gr 2869 frame 6 args
c = ("*github.com/streadway/amqp.confirms")(0xc000b9b940)
~r0 = (unreadable empty OP stack)
    
(dlv) gr 98 bt
 0  0x000000000043a4c5 in runtime.gopark
    at /usr/local/go/src/runtime/proc.go:307
 1  0x0000000000405aea in runtime.chansend
    at /usr/local/go/src/runtime/chan.go:258
 2  0x0000000000405895 in runtime.chansend1
    at /usr/local/go/src/runtime/chan.go:143
 3  0x0000000000a82e57 in github.com/streadway/amqp.(*confirms).confirm
    at /go/pkg/mod/github.com/streadway/[email protected]/confirms.go:45
 4  0x0000000000a82fc5 in github.com/streadway/amqp.(*confirms).One
    at /go/pkg/mod/github.com/streadway/[email protected]/confirms.go:66
 5  0x0000000000a7ea65 in github.com/streadway/amqp.(*Channel).dispatch
    at /go/pkg/mod/github.com/streadway/[email protected]/channel.go:314
 6  0x0000000000a7ec05 in github.com/streadway/amqp.(*Channel).recvMethod
    at /go/pkg/mod/github.com/streadway/[email protected]/channel.go:351
 7  0x0000000000a84f55 in github.com/streadway/amqp.(*Connection).dispatchN
    at /go/pkg/mod/github.com/streadway/[email protected]/connection.go:477
 8  0x0000000000a84a67 in github.com/streadway/amqp.(*Connection).demux
    at /go/pkg/mod/github.com/streadway/[email protected]/connection.go:436
 9  0x0000000000a851f6 in github.com/streadway/amqp.(*Connection).reader
    at /go/pkg/mod/github.com/streadway/[email protected]/connection.go:528
10  0x0000000000470001 in runtime.goexit
    at /usr/local/go/src/runtime/asm_amd64.s:1374

(dlv) gr 98 frame 4 args
c = ("*github.com/streadway/amqp.confirms")(0xc000b9b940)
confirmed = github.com/streadway/amqp.Confirmation {DeliveryTag: 1, Ack: true

See PR in streadway.

Interfaces

Iโ€™d love to see go interfaces as a part of this package. It does make everyoneโ€™s like miserable to write unit tests when you have integration with amqp.
I can take care or that.

target machine actively refused connection

Trying the example code, but got the below:

PS D:\Deployment\mqtt\simple-producer> go run publisher
2022/07/10 20:21:04 [INFO] dialing "amqp://guest:guest@localhost:5672/"
2022/07/10 20:21:06 [ERROR] Dial: dial tcp [::1]:5672: connectex: No connection could be made because the target machine actively refused it.
exit status 1

SAC not working properly

[Issue]

I have 2 single active consumers (A, B).

A first consumer was active, B second was ready.

then, i killed A first active consumer while other process publish 10000 messages.

I was expecting a transition from A to B for ACTIVE consumer.

but it isn't.

[Reproduce]

  1. run rabbitmq server
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmqtest rabbitmq:3.10.6-management
  1. run consumer
func main() {
	conn, err := amqp091.DialConfig("amqp://guest:guest@localhost:5672/", amqp091.Config{
		Heartbeat: time.Second * 30,
	})
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	if err := ch.Qos(200, 0, false); err != nil {
		panic(err)
	}

	queueArgs := make(amqp091.Table)
	queueArgs["x-single-active-consumer"] = true
	_, err = ch.QueueDeclare("queue",
		true,      // durable
		false,     // auto delete
		false,     //exclusive
		false,     //noWait
		queueArgs, // queue args
	)
	if err != nil {
		panic(err)
	}

	msgs, err := ch.Consume("queue", "consumer", false, false, false, false, nil)
	if err != nil {
		panic(err)
	}
	d := make(chan bool)
	go func() {
		for msg := range msgs {
			fmt.Println(string(msg.Body))
			_ = msg.Ack(true)
		}
		d <- true
	}()

	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL)
	<-sigs

	if err := ch.Cancel("consumer", false); err != nil {
		panic(err)
	}
	fmt.Println("cancel consume")
	if err := ch.Close(); err != nil {
		panic(err)
	}
	if err := conn.Close(); err != nil {
		panic(err)
	}
	<-d
	fmt.Println("terminate")
}
go run consumer.go (A session)
go run consumer.go (B session)
  1. run producer
func main() {
	conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		panic(err)
	}

	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}

	for i := 0; i < 10000; i++ {
		err := ch.Publish("", "queue", false, false, amqp091.Publishing{
			DeliveryMode: 0,
			ContentType:  "text/plain",
			Body:         []byte(fmt.Sprintf("%d", i)),
		})
		if err != nil {
			panic(err)
		}
	}

	if err := ch.Close(); err != nil {
		panic(err)
	}
	if err := conn.Close(); err != nil {
		panic(err)
	}
	fmt.Println("terminate")
}
  1. check active consumer
root@2d6192fe4fed:/# rabbitmqctl list_consumers
Listing consumers in vhost / ...
queue_name	channel_pid	consumer_tag	ack_required	prefetch_count	active	arguments
queue	<[email protected].0>	consumer	true	200	false	[]
queue	<[email protected].0>	consumer	true	200	true	[]
  1. run publisher
go run publisher.go
  1. kill Active consumer ๏ฟฝwhen message is received.
    It is important that Active consumer does not receive all messages.
    It should be killed in the middle of receiving messages.
# active consumer log
1
2
3
4
...
^C (it will be print text "terminate")
  1. check active consumer
root@2d6192fe4fed:/# rabbitmqctl list_consumers
Listing consumers in vhost / ..

As a result, I was expecting a transition from A to B , but it didn't.
All consumers are gone.

However, I found error logs of rabbitmq server.
It's too long, so I'm attaching it as a file.
rabbit-error-logs.txt

It doesn't seem to be a problem with amqp091-go.

Feature Proposal: W3C Context propagation integration via OpenTelemetry

I notice increasing demand/expectations from user for built-in integration with Distributed tracing.
One such example is Knative where Distributed tracing was integrated from the start.

Some time ago two major parties - OpenCensus and OpenTracing merged to OpenTelemetry and now Tracing API considered mature - https://github.com/open-telemetry/opentelemetry-go.

As a first step towards full Distributed tracing support I propose to integrate Tracing context propagation.

  • I suggest to use headers as it is done for example here - https://github.com/knative-sandbox/eventing-rabbitmq/blob/main/cmd/ingress/main.go#L153 to carry context information.
  • I suggest to have same header names across all officially supproted RabbitMQ clients.
  • I argue that merely providing the header naming suggestion in the official docs will be a great step forward as it sets the ground for future development and solves compatibility issues immediately.

Feedback is much appreciated!

Data race in the client example

How to reproduce

  • Setup go mod:
go mod init test-amqp
Code Example
package amqp_client

import (
	"errors"
	"log"
	"os"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

type Client struct {
	queueName       string
	logger          *log.Logger
	connection      *amqp.Connection
	channel         *amqp.Channel
	done            chan bool
	notifyConnClose chan *amqp.Error
	notifyChanClose chan *amqp.Error
	notifyConfirm   chan amqp.Confirmation
	isReady         bool
}

const (
	// When reconnecting to the server after connection failure
	reconnectDelay = 5 * time.Second

	// When setting up the channel after a channel exception
	reInitDelay = 2 * time.Second

	// When resending messages the server didn't confirm
	resendDelay = 5 * time.Second
)

var (
	errNotConnected  = errors.New("not connected to a server")
	errAlreadyClosed = errors.New("already closed: not connected to the server")
	errShutdown      = errors.New("client is shutting down")
)

// New creates a new consumer state instance, and automatically
// attempts to connect to the server.
func New(queueName, addr string) *Client {
	client := Client{
		logger:    log.New(os.Stdout, "", log.LstdFlags),
		queueName: queueName,
		done:      make(chan bool),
	}
	go client.handleReconnect(addr)
	return &client
}

// handleReconnect will wait for a connection error on
// notifyConnClose, and then continuously attempt to reconnect.
func (client *Client) handleReconnect(addr string) {
	for {
		client.isReady = false
		client.logger.Println("Attempting to connect")

		conn, err := client.connect(addr)

		if err != nil {
			client.logger.Println("Failed to connect. Retrying...")

			select {
			case <-client.done:
				return
			case <-time.After(reconnectDelay):
			}
			continue
		}

		if done := client.handleReInit(conn); done {
			break
		}
	}
}

// connect will create a new AMQP connection
func (client *Client) connect(addr string) (*amqp.Connection, error) {
	conn, err := amqp.Dial(addr)

	if err != nil {
		return nil, err
	}

	client.changeConnection(conn)
	client.logger.Println("Connected!")
	return conn, nil
}

// handleReconnect will wait for a channel error
// and then continuously attempt to re-initialize both channels
func (client *Client) handleReInit(conn *amqp.Connection) bool {
	for {
		client.isReady = false

		err := client.init(conn)

		if err != nil {
			client.logger.Println("Failed to initialize channel. Retrying...")

			select {
			case <-client.done:
				return true
			case <-time.After(reInitDelay):
			}
			continue
		}

		select {
		case <-client.done:
			return true
		case <-client.notifyConnClose:
			client.logger.Println("Connection closed. Reconnecting...")
			return false
		case <-client.notifyChanClose:
			client.logger.Println("Channel closed. Re-running init...")
		}
	}
}

// init will initialize channel & declare queue
func (client *Client) init(conn *amqp.Connection) error {
	ch, err := conn.Channel()

	if err != nil {
		return err
	}

	err = ch.Confirm(false)

	if err != nil {
		return err
	}
	_, err = ch.QueueDeclare(
		client.queueName,
		false, // Durable
		false, // Delete when unused
		false, // Exclusive
		false, // No-wait
		nil,   // Arguments
	)

	if err != nil {
		return err
	}

	client.changeChannel(ch)
	client.isReady = true
	client.logger.Println("Setup!")

	return nil
}

// changeConnection takes a new connection to the queue,
// and updates the close listener to reflect this.
func (client *Client) changeConnection(connection *amqp.Connection) {
	client.connection = connection
	client.notifyConnClose = make(chan *amqp.Error)
	client.connection.NotifyClose(client.notifyConnClose)
}

// changeChannel takes a new channel to the queue,
// and updates the channel listeners to reflect this.
func (client *Client) changeChannel(channel *amqp.Channel) {
	client.channel = channel
	client.notifyChanClose = make(chan *amqp.Error)
	client.notifyConfirm = make(chan amqp.Confirmation, 1)
	client.channel.NotifyClose(client.notifyChanClose)
	client.channel.NotifyPublish(client.notifyConfirm)
}

// Push will push data onto the queue, and wait for a confirm.
// If no confirms are received until within the resendTimeout,
// it continuously re-sends messages until a confirm is received.
// This will block until the server sends a confirm. Errors are
// only returned if the push action itself fails, see UnsafePush.
func (client *Client) Push(data []byte) error {
	if !client.isReady {
		return errors.New("failed to push: not connected")
	}
	for {
		err := client.UnsafePush(data)
		if err != nil {
			client.logger.Println("Push failed. Retrying...")
			select {
			case <-client.done:
				return errShutdown
			case <-time.After(resendDelay):
			}
			continue
		}
		select {
		case confirm := <-client.notifyConfirm:
			if confirm.Ack {
				client.logger.Println("Push confirmed!")
				return nil
			}
		case <-time.After(resendDelay):
		}
		client.logger.Println("Push didn't confirm. Retrying...")
	}
}

// UnsafePush will push to the queue without checking for
// confirmation. It returns an error if it fails to connect.
// No guarantees are provided for whether the server will
// receive the message.
func (client *Client) UnsafePush(data []byte) error {
	if !client.isReady {
		return errNotConnected
	}
	return client.channel.Publish(
		"",               // Exchange
		client.queueName, // Routing key
		false,            // Mandatory
		false,            // Immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        data,
		},
	)
}

// Consume will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (client *Client) Consume() (<-chan amqp.Delivery, error) {
	if !client.isReady {
		return nil, errNotConnected
	}
	return client.channel.Consume(
		client.queueName,
		"",    // Consumer
		false, // Auto-Ack
		false, // Exclusive
		false, // No-local
		false, // No-Wait
		nil,   // Args
	)
}

// Close will cleanly shutdown the channel and connection.
func (client *Client) Close() error {
	if !client.isReady {
		return errAlreadyClosed
	}
	close(client.done)
	err := client.channel.Close()
	if err != nil {
		return err
	}
	err = client.connection.Close()
	if err != nil {
		return err
	}

	client.isReady = false
	return nil
}
  • Write a test file amqp_test.go:
package amqp_client

import "testing"

func TestAmqp(t *testing.T) {
	client := New("test_queue", "amqp://guest:guest@localhost:5672/")
	defer client.Close()
	client.Push([]byte("test"))
}
  • Run rabbitmq:
docker run --rm -d --net host --name some-rabbit rabbitmq
  • Run test with -race:
go test -race ./...

Error Message

Test fails with the following message:

==================
WARNING: DATA RACE
Write at 0x00c000100228 by goroutine 8:
  test-amqp.(*Client).handleReconnect()
      /tmp/test-amqp/amqp.go:57 +0x5d
  test-amqp.New.func1()
      /tmp/test-amqp/amqp.go:49 +0x58

Previous read at 0x00c000100228 by goroutine 7:
  test-amqp.(*Client).Push()
      /tmp/test-amqp/amqp.go:180 +0x68
  test-amqp.TestAmqp()
      /tmp/test-amqp/amqp_test.go:8 +0xc4
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1439 +0x213
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1486 +0x47

Goroutine 8 (running) created at:
  test-amqp.New()
      /tmp/test-amqp/amqp.go:49 +0x324
  test-amqp.TestAmqp()
      /tmp/test-amqp/amqp_test.go:6 +0x4f
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1439 +0x213
  testing.(*T).Run.func1()
      /usr/local/go/src/testing/testing.go:1486 +0x47

Goroutine 7 (finished) created at:
  testing.(*T).Run()
      /usr/local/go/src/testing/testing.go:1486 +0x724
  testing.runTests.func1()
      /usr/local/go/src/testing/testing.go:1839 +0x99
  testing.tRunner()
      /usr/local/go/src/testing/testing.go:1439 +0x213
  testing.runTests()
      /usr/local/go/src/testing/testing.go:1837 +0x7e4
  testing.(*M).Run()
      /usr/local/go/src/testing/testing.go:1719 +0xa71
  main.main()
      _testmain.go:47 +0x2e4
==================
2022/04/16 07:32:08 Attempting to connect
2022/04/16 07:32:08 Connected!
2022/04/16 07:32:08 Setup!
Found 1 data race(s)
FAIL	test-amqp	0.021s
FAIL

Environment

Go Version: 1.18.1-bullseye
Library version: v1.3.4

Example Client: Implementation of a Consumer with reconnection support

example_client_test.go shows a great way how to build a Producer with reconnection support.

The Consuming part is not covering the reconnection support:

// Stream will continuously put queue items on the channel.
// It is required to call delivery.Ack when it has been
// successfully processed, or delivery.Nack when it fails.
// Ignoring this will cause data to build up on the server.
func (session *Session) Stream() (<-chan amqp.Delivery, error) {
if !session.isReady {
return nil, errNotConnected
}
return session.channel.Consume(
session.name,
"", // Consumer
false, // Auto-Ack
false, // Exclusive
false, // No-local
false, // No-Wait
nil, // Args
)
}

Even if reconnection support is mentioned as a non-goal for the library itself, I do think showcasing this in the example might be a good thing.

Right now, the implementation is only possible with a hack involved.
Mainly due to these two issues:

These deadlocks prevent the consumer delivery channel to be closed during an unexpected connection loss (e.g., when the RabbitMQ server goes down).

A possible dead lock in connection close notification Go channel

The dead lock happens when the connection is closing. I lost the core dump since then. But here is the sequence:

  1. Create connection close notification Go channel by assigning in connection.NotifyClose.
  2. The connection is disconnected. It is in closing state.
  3. Create a channel in the closing connection.
  4. If the connection close notification Go channel is not consumed, step 3 is in dead lock.

The documentation should state that a separate Go routines should be dedicated to consume the connection close notification Go channel.

Unexpacted behavor of channel.IsClosed()

The code sends two messages in sequence - first is bad and next one is good:
err := channel.Publish("Exchange_does_not_exist", .....)
/* here err == nil /
/
message is not delivered - exchange does not exist - ok, but channel.IsClosed() == false !! */
err := channel.Publish(good_message)
here err == nil
channel.IsClosed() == false !!
message is not delivered with no error. It was sent to an emptiness...
In some seconds channel.IsClosed() becomes "true"...
I am forced to check the channel status with channel.QueueInspect() - it works.
Question:
Why channel.Publish() always returns nil?
Why channel.IsClosed() switches to true not at once and some subsequent valid messages go to emptiness with no error?

504 channel/connection is not open after a few minutes

Hello,
I have one channel with 2 queue and after a few minutes (about 15 minutes) I got 504 error :
Exception (504) Reason: channel/connection is not open
My code:

RabbitConnection, err := rabbitmq.Dial("amqp://" + username + ":" + password + "@" + host + "/")
if err != nil {
	return err
}

//conn.Close()
RabbitChannel, err = RabbitConnection.Channel()
if err != nil {
	return err
}
//RabbitChannel.Close()

_, err = RabbitChannel.QueueDeclare(
	LsQueue, // name
	true,   // durable
	false,   // delete when unused
	false,   // exclusive
	false,   // no-wait
	nil,     // arguments
)
_, err = RabbitChannel.QueueDeclare(
	CoreQueue, // name
	true,     // durable
	false,     // delete when unused
	false,     // exclusive
	false,     // no-wait
	nil,       // arguments
)

err = RabbitChannel.Confirm(false)
if err != nil {
	return err
}

bug, close cause panic

[root@edge-chengli-00001 rabbitmq-queue-declarer]# docker logs -f 81d03ce572f3
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x54 pc=0x7aaddc]

goroutine 465 [running]:
github.com/rabbitmq/amqp091-go.(*Channel).setClosed(...)
	/go/pkg/mod/github.com/rabbitmq/[email protected]/channel.go:94
github.com/rabbitmq/amqp091-go.updateChannel(...)
	/go/pkg/mod/github.com/rabbitmq/[email protected]/types.go:321
github.com/rabbitmq/amqp091-go.(*Connection).dispatchN(0xc000344140, {0xd7a958, 0xc00019e4e0?})
	/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:516 +0xbc
github.com/rabbitmq/amqp091-go.(*Connection).demux(0xc000085f28?, {0xd7a958, 0xc00019e4e0})
	/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:477 +0x5b
github.com/rabbitmq/amqp091-go.(*Connection).reader(0xc000344140, {0xd78300?, 0xc0005aa090?})
	/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:577 +0x227
created by github.com/rabbitmq/amqp091-go.Open
	/go/pkg/mod/github.com/rabbitmq/[email protected]/connection.go:258 +0x34a

Channel Close gets stuck after closing a connection (via management UI)

I found an issue when:

  1. I close a connection from the management UI (force close it, haven't got the time to test other possible connection close scenarios)
  2. The program is waiting for a channel or connection notify close, and receives it
  3. It checks if the channel is closed, it is not, then it proceeds to close it, but the method gets stuck and never returns

I caused my program to hang, now I'm just deleting the connection.

Don't know if this is intended, but better to report it :)

Support connection.update-secret

The AMQP 0-9-1 reference guide at https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.update-secret describes a method to update a secret / password on existing connections without having to disconnect and reconnect again. This makes things a lot easier if authentication is done using the rabbitmq_auth_backend_oauth2 which comes with JWT tokens that have a certain lifespan. If such a token expires and it is not updated, this results in a unusable connection. Updating the token results in a usable connection again without having to deal with the hassle of reconnecting and thus having to resubscribe consumers.

Would be very nice if this could be supported in future versions as we heavily depend on this feature for authentication and authorization means in our backend.

Closing connection and/or channel hangs NotifyPublish is used

I'm using NotifyPublish to get confirms.
At the end of the run, I'm trying to close the connection and the associate channel, but it hangs indefinitely.

Here's a test to reproduce the issue (reproduces 80-90% of the time):


func TestCloseHandBug(t *testing.T) {
	ctx := context.Background()
	testQueue := "test"

	c, err := amqp091.Dial(fmt.Sprintf("amqp://%s:%s@%s%s", Config.Username, Config.Password, Config.Hostname, Config.VHost))
	if err != nil {
		t.Fatalf("Error connecting to server: %s", err)
	}
	t.Log("connected")

	closeChan := make(chan struct{})

	// close connection
	defer func() {
		close(closeChan)
		if c != nil {
			t.Log("disconnecting")
			//TODO: program hangs here - removing c.close and adding
			//TODO: <-time.After(time.Second * 120) doesn't help. closing channel hangs
			if err := c.Close(); err != nil {
				t.Logf("disconnect error: %s", err)
				return
			}
		}

		c = nil
		t.Log("disconnected")
	}()

	ch, err := c.Channel()
	if err != nil && err == amqp091.ErrClosed {
		t.Fatalf("couldn't open channel: %s", err)
	}

	err = ch.Qos(
		16,    // prefetch count
		0,     // prefetch size
		false, // global
	)
	if err != nil {
		t.Fatalf("couldn't configure qos: %s", err)
	}

	// handle confirms TODO: without this, disconnection works
	confirms := ch.NotifyPublish(make(chan amqp091.Confirmation))
	go func() {
		for {
			select {
			case <-confirms:
				// do something here in real scenario
				continue
			case <-closeChan:
				return
			case <-ctx.Done():
				return
			}
		}
	}()

	err = ch.Confirm(false)
	if err != nil {
		t.Fatalf("couldn't configure confirm: %s", err)
	}

	ch.ExchangeDeclare("amq.topic", "topic", true, false, false, false, nil)

	// handle close channel TODO: removing this func doesn't solve the issue
	go func() {
		for {
			select {
			case <-closeChan:
				t.Logf("closing channel. id=%d",1)
				//TODO: program hangs here as well
				if err := ch.Close(); err != nil {
					t.Logf("closing channel err. id=%d. err=%d",1, err)
					return
				}
				t.Logf("channel closed. id=%d",1)
				return
			case <-ctx.Done():
				return
			}
		}
	}()

	_, err = ch.QueueDelete(testQueue, false, false, false)
	if err != nil {
		t.Fatalf("error deleting test queue: %s", err)
	}

	key := "subscribe.send"

	_, err = ch.QueueDeclare(testQueue, true, false, false, false, nil)
	if err != nil {
		t.Fatalf("error declaring queue: %s", err)
	}

	err = ch.QueueBind(testQueue, key, "amq.topic", false, nil)
	if err != nil {
		t.Fatalf("error binding queue: %s", err)
	}

	delivery, err := ch.Consume(testQueue, "", false, false, false, false, nil)
	if err != nil {
		t.Fatalf("error consuming: %s", err)
	}

	expected := "hello world!"

	t.Logf("sending. id=%d, data=%s",1, []byte(expected))

	m := amqp091.Publishing{
		ContentType:  "text/plain",
		Body:         []byte(expected),
		DeliveryMode: amqp091.Persistent,
	}

	err = ch.Publish("amq.topic", key, true, false, m)
	if err != nil {
		t.Fatalf("Error publishing: %s", err)
	}

	var actual string
	select {
	case message := <-delivery:
		message.Ack(false)
		actual = string(message.Body)
	case <-time.After(time.Second / 2):
		t.Fatalf("Timeout recieving message")
	}

	if actual != expected {
		t.Fatalf("Expected message %s, but got %s", expected, actual)
	}
}


Here is the result:
=== RUN TestCloseHandBug
provider_test.go:125: connected
provider_test.go:224: sending. id=1, data=hello world!
provider_test.go:133: disconnecting
provider_test.go:187: closing channel. id=1

  • One goroutine waits on l <- confirmation in func (c *confirms) confirm(confirmation Confirmation)
  • another goroutine waits on c.destructor.Do(func() { in func (c *Connection) shutdown(err *Error), originating in connection.close
  • and another goroutine waits in the same place: c.destructor.Do(func() { in func (c *Connection) shutdown(err *Error), originating in channel.close
  • another goroutine waits on c.m.Lock() in func (c *confirms) Close() error (originating in go c.shutdown(&Error{)

Notes:

  1. If I remove the "handle close channel" function, the issue still occurs.
  2. If I remove the c.close() part in the "close connection" section, waiting instead for a long time before closing the test, I will still never get to the "channel closed" part - it looks like closing the channel hangs indefinitely in this case as well.
  3. Sometimes it works fine (10-20% of the time):
    === RUN TestCloseHandBug
    provider_test.go:125: connected
    provider_test.go:225: sending. id=1, data=hello world!
    provider_test.go:133: disconnecting
    provider_test.go:187: closing channel. id=1
    provider_test.go:142: disconnected
    --- PASS: TestCloseHandBug (1.67s)
    PASS

100% CPU usage

Hello

package main

import (
	"context"
	"github.com/rabbitmq/amqp091-go"
	"sync"
	"time"
)

var wg sync.WaitGroup

func amqp(ctx context.Context) {
	defer func() {
		wg.Done()
	}()
	c, err := amqp091.Dial("amqp://guest:[email protected]:5672/")
	if err != nil {
		panic("connection error")
	}
	defer c.Close()

	<-ctx.Done()
}

const n = 16

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	wg.Add(n)
	for i := 0; i < n; i++ {
		go amqp(ctx)
	}

	cancel()
	wg.Wait()

	time.Sleep(time.Hour)
}

The above code causes 100% CPU usage after a short while
The problem does not always occur, so you may have to run it several times to reproduce the problem
Strace attached after the problem occurred:

strace: Process 3167635 attached
% time     seconds  usecs/call     calls    errors syscall
------ ----------- ----------- --------- --------- ----------------
100,00    4,571892     4571892         1         1 futex
  0,00    0,000010           5         2           rt_sigprocmask
  0,00    0,000002           1         2           getpid
  0,00    0,000001           0         2           gettid
  0,00    0,000001           0         2           tgkill
  0,00    0,000000           0         1           rt_sigaction
------ ----------- ----------- --------- --------- ----------------
100.00    4,571906                    10         1 total

Call to Delivery.Ack blocks indefinitely in case of disconnection

Steps to reproduce

  1. Create a queue in the RabbitMQ server
  2. Call consume and start consuming messages
  3. While processing a message and BEFORE Delivery.Ack is called disconnect from the RabbitMQ server (for example turn off your internet)

The call to Delivery.Ack blocks indefinitely instead of returning an error.

No access to this vhost

// dns: amqp://customservice:customservice123@IP:5672/zssb
mqConn, err = amqp.Dial(mq.dns)

image
image
image

Exception (403) Reason: "no access to this vhost"

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.