Coder Social home page Coder Social logo

stomp's Introduction

stomp

Go language implementation of a STOMP client library.

Build Status Go Reference

Features:

  • Supports STOMP Specifications Versions 1.0, 1.1, 1.2 (https://stomp.github.io/)
  • Protocol negotiation to select the latest mutually supported protocol
  • Heart beating for testing the underlying network connection
  • Tested against RabbitMQ v3.0.1

Usage Instructions

go get github.com/go-stomp/stomp/v3

For API documentation, see https://pkg.go.dev/github.com/go-stomp/stomp/v3

Breaking changes between this previous version and the current version are documented in breaking_changes.md.

License

Copyright 2012 - Present The go-stomp authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

stomp's People

Contributors

anthonyraymond avatar asiragusa avatar codemedic avatar colinzuo avatar daveshanley avatar daytong avatar delorus avatar ebenoist avatar extbe avatar fjolnir-dvorak avatar fribeiro1 avatar froderik avatar hanjm avatar jjeffery avatar laurentluce avatar lvtiendev avatar maelick avatar moredure avatar mschneider82 avatar somuchforsubtlety avatar theraphim avatar thomaslee avatar voronelf avatar wmarbut avatar worg avatar zhangyang0108 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

stomp's Issues

Feature request: producer side connection pool

Hi, I am developing a simple STOMP-based command framework and I wondered if there is any plan to implement a connection pool. Happy to contribute code if willing to consider it for a merge.

Custom headers for Unsubscribe

It is useful to be able to pass custom headers to Unsubscribe(). RabbitMQ for example asks to pass the same "durable" and "auto-delete" headers to Unsubscribe for durable topic subscriptions.

How to set heartbeat correctly?

I set heartbeat to different values with

stomp.ConnOpt.HeartBeat(1time.Second, 1time.Second)

but still got "Error message : read timeout" , or "Error message : connection closed"

v2 doesnt support semantic versioning

hi
getting error when trying new 1.11 module versioning

go.mod

require (
github.com/go-stomp/stomp v2.0.0
)

\go.mod:5: invalid module version "v2.0.0": unknown revision v2.0.0

no check for receipt content

Hi everyone,
First of all, thanks for this nice library.

I have two questions:

  1. If I see correctly, the connection Disconnect method only checks for whether the received next frame has the command RECEIPT, but does not check whether the received receipt's value is the same one as the sent receipt's one... shouldn't the code do that? Am I missing something here?

  2. Moreover, checking for whether the request contains a receipt and then checking the receipt is in almost all methods (e.g. in the connection Send method), but not in the Subscribe method. Why not? (note that this is slightly different than issue #40)

Thanks a lot for your answer in advance.

processLoop can terminate w/o surfacing any errors.

If the underlying connection to a producer goes away, say because the STOMP server is forcibly restarted w/o any clean shutdown, then processLoop in conn.go can terminate w/o any errors being surfaced (e.g., the case f, ok := <-c.readCh: will get a "connection close" error and terminate the processLoop). If this happens any producer with an open connection can keep sending requests, and if those requests are set for auto ack they will silently fail until the writeCh buffer is filled at which point I believe the writes will just stall.

I think one solution to this problem is to incorporate the MustDisconnect method in #13 and to then have a "defer c.MustDisconnect()" in processLoop before launching the for {} loop.

Monitor incoming frames

Is there a way to monitor incoming frames from the code? Eg. by registering function that can then log the frame?

Deadlock occurs when Conn.Send function is called very fast

I send data to activeMQ server with the following code:

for {
	msg = fmt.Sprintf("aaaaa=%v", count)
	client.DeliverMsg("/queue/yyyyyyy", []byte(msg))
	count++
}

then I close the activeMQ server, deadlock occurs.

I read the code and found that the problem was here(conn.go):

func (c *Conn) Send(destination, contentType string, body []byte, opts ...func(*frame.Frame) error) error {
	c.closeMutex.Lock()
	defer c.closeMutex.Unlock()
	if c.closed {
		return ErrAlreadyClosed
	}

	f, err := createSendFrame(destination, contentType, body, opts)
	if err != nil {
		return err
	}

	if _, ok := f.Header.Contains(frame.Receipt); ok {
		// receipt required
		request := writeRequest{
			Frame: f,
			C:     make(chan *frame.Frame),
		}

		c.writeCh <- request
		response := <-request.C
		if response.Command != frame.RECEIPT {
			return newError(response)
		}
	} else {
		// no receipt required
		request := writeRequest{Frame: f}
		c.writeCh <- request
	}

	return nil
}

In Send function, write request to c.writeCh use "c.writeCh <- request", length of c.writeCh is initialized to 20

the processLoop function will read writeCh:

case req, ok := <-c.writeCh:
	// stop the write timeout
	if writeTimer != nil {
		writeTimer.Stop()
		writeTimer = nil
		writeTimeoutChannel = nil
	}
	if !ok {
		sendError(channels, errors.New("write channel closed"))
		return
	}
	if req.C != nil {
		if receipt, ok := req.Frame.Header.Contains(frame.Receipt); ok {
			// remember the channel for this receipt
			channels[receipt] = req.C
		}
	}

	switch req.Frame.Command {
	case frame.SUBSCRIBE:
		id, _ := req.Frame.Header.Contains(frame.Id)
		channels[id] = req.C
	case frame.UNSUBSCRIBE:
		id, _ := req.Frame.Header.Contains(frame.Id)
		// is this trying to be too clever -- add a receipt
		// header so that when the server responds with a
		// RECEIPT frame, the corresponding channel will be closed
		req.Frame.Header.Set(frame.Receipt, id)
	}

	// frame to send
	err := writer.Write(req.Frame)
	if err != nil {
		sendError(channels, err)
		return
	}
}

If I call Conn.Send very fast, writeCh will be full, Conn.Send will be blocked at "c.writeCh <- request".

Then in processLoop function, writer.Write(req.Frame) will fail if the activeMQ server suddenly shut down.

Before processLoop return, "defer c.MustDisconnect()" in processLoop will be called.

MustDisconnect() call c.closeMutex.Lock() to lock closeMutex, but closeMutex is locked by Conn.Send, Conn.Send is blocked at "c.writeCh <- request", so closeMutex will never be unlocked, deadlock occurs.

Add stomp.Dial timeout

STOMP client gets stuck (waiting for connectivity) when dialing if the server is not available.

This happens due to the usage of net.Dial default Dialer:

stomp/conn.go

Line 50 in a96d395

c, err := net.Dial(network, addr)

There are different options:

A better approach should use context to manage this issue: https://golang.org/src/net/dial.go?s=11711:11799#L359

Is there a current method to create a subscription id?

I need to guarantee a consistent and unique subscription id across a number of processes, is there a method by which to set the id with the current library when subscribing? I see that a random number is generated, but it doesn't seem if there is a way to override it.

Thank you!

After a frame error, call to Subscribe results in a SIGSEGV

It'd be nice if it just returned an error to say the connection has been closed

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x80 pc=0x677a41]

goroutine 1 [running]:
github.com/go-stomp/stomp/v3.(*Conn).Subscribe(0x0, 0xc000025320, 0x90, 0x0, 0xc000cd1bc0, 0x4, 0x4, 0x0, 0x0, 0x0)
github.com/go-stomp/stomp/[email protected]/conn.go:595 +0x61

Support for activemq failover protocol

Would it be reasonable to implement the activemq failover protocol?
I'm thinking about doing this myself, but i'm not sure if it's relevant to the project or if it should be made separately.

On network error connections are kept as close_wait and never close

When there's an active connection and network goes down the connections are kept in close_wait state and stay forever in this state, calling disconnect waits [forever] for the disconnect receipt from the server [which is impossible in that situation], so the client must be restarted in order to close those connections.

I'll backport @guotie mustdisconnect method to be able to force close those connections

super useful

this looks really good.

i need to communicate with servers that are behind a router in a NAT environment.
Like your home network.

  1. SSH into them
  2. communicate with them over a message bus.

Can i use stomp to do this do you think?

Read and write timeouts don't work as expected

With a client not sending/receiving messages, the read timeout is always called or the write timeout is always called based on which one is set to the lowest value. It seems that there is a bug in the following code in conn.go:

var readTimer *time.Timer
var writeTimer *time.Timer

for {
    if readTimer == nil {
        readTimer := time.NewTimer(…)
    }
    if writeTimer == nil {
        writeTimer := time.NewTimer(…)
    }
    …
}

Notice the “:=” inside the if statement and the var before the loop. readTimer and writeTimer are back to nil at the start of each loop iteration and so they are set to a new Timer every time.

We should do:

readTimer = time.NewTimer(…)

Getting non-standard headers from the CONNECT response

I need a custom header from the connect response and I saw this is already documented in a TODO.

// TODO(jpj): make any non-standard headers in the CONNECTED
// frame available. This could be implemented as:
// (a) a callback function supplied as an option; or
// (b) a property of the Conn structure (eg CustomHeaders)
// Neither options are particularly elegant, so wait until
// there is a real need for this.

I would be happy to contribute the code for this in either of the two listed options.

I also created a shim that turns a websocket connection into an io.ReadWriteCloser so I can connect to Spring STOMP servers. I'm happy to also create a PR for that if it's not considered out of scope.

How to prevent closing the connection

Do we need to fetch new connection every time you send a message on connection?
As the document mentioned, we see that underlying connection is closed when there is a failure in sending the message.

var conn *stomp.Conn
if conn, err = stomp.Dial("tcp",
        Broker.URI,
        stomp.ConnOpt.Login(Broker.User, Broker.Password)); err != nil {
        panic(fmt.Sprintf("Could not connect to ActiveMQ using brokerUri %v. Can not continue.", Broker.URI))
    }
And else where in the code, I use the above "global" connection instances to send messages.
if err := conn.Send(queue, "text/plain", []byte(message)); err != nil {
            log.Printf("Failed to send message to Queue %v.  Error is %v, Payload is %v", queue, err.Error(), message)
            return err
        }

But during failures, I see that the connection is closed and subsequent Send call fails.
Is there a way to maintain the connection even during failures? or do we need to fetch a new connection every time one wants to send messages?

Can we have a updated release?

The last tagged release was in 2014 and is very far behind master. I'm using this library in a project and I'd like to have a stable build to use rather than master.

Properly way to check for closed connection

Hi there, is there any way to check if a Conn is valid before sending the message? I'm currently doing as follow

err := conn.Send(queueName, "text/plain", message)
if err != nil && err.Error() == "connection already closed" {
	// report error
}

The Conn struct has closed property but since it is not exposed and have no access function, I have no way to check the connection before sending.

Impossible to remove durable subscription in ActiveMQ

In ActiveMQ client must send two unsubscribe messages to remove durable subscription [stackoverflow].

UNSUBSCRIBE
destination:/topic/MyTopic

^@

UNSUBSCRIBE
destination:/topic/MyTopic
activemq.subscriptionName:MySubscriptionName

^@

First unsubscribe make subscription inactive and second removes subscription. But because of condition at subscription.go L63-65 is impossible to send unsubscribe twice.

not able to run client_test in example folder

I tried to run the client_test, it always returns error message "failed to send to server".
The root cause is in this line: err = conn.Send(*queueName, "text/plain", []byte(text), nil), you pass a nil option to Send method, and in the inner function createSendFrame, it will returns error if there is nil option.

So I think below is the fix point:

  1. In the main.go file of client_test folder, go to function sendMessages, do the below change:
    err = conn.Send(*queueName, "text/plain", []byte(text), nil) --> err = conn.Send(*queueName, "text/plain", []byte(text))
  2. in conn.go file, go to function createSendFrame, do the below change:
    if opt == nil {
    return nil, ErrNilOption
    }
    -->
    if opt == nil {
    continue;
    }

Support RECEIPT for SUBSCRIBE operations

As far as I can tell go-stomp doesn't support receipts for subscribe operations. I haven't seen a SubscribeOpt for this. When I manually add the header using stomp.SubscribeOpt.Header, I do see a RECEIPT from from the server, but this looks to close the Subscription's channel in go-stomp.

I'm working with a system where you subscribe to a topic and then trigger an operation that posts messages to the topic. Unless I add a sleep in, I can lose the initial messages that are posted.

heartbeat not response

ActiveMQ: 5.15.9
Client:v2.0.3
ConnectOptions:

	options := []func(*stomp.Conn) error{
		stomp.ConnOpt.Login(user, pwd),
		stomp.ConnOpt.Host("/" + host),
		stomp.ConnOpt.UseStomp,
		stomp.ConnOpt.HeartBeat(time.Minute, time.Minute),
	}

if no data is been sent for a while, readTimeoutChannel in processLoop is triggerred.

Use of Server for production?

Hello, I am very happy to have discovered go-stomp 😄
It looks like a very nice package.

Even though it is not mentioned in the README, I see there is a server package,
and the stompd cmd.
Is there a reason the server isn't mentioned in the README?

@worg @jjeffery
Is anyone regularly using the server in any kind of production usage?

I am considering trying to add Web-STOMP support to the server, but I'd like to get any feedback about it's state and the server todo-list if possible.

Thank you in advance for any feedback about the server, whether it has worked for you, or if you think it is working okay or not.

Thanks,
Cameron

Setting reply-to header

What is the correct way to set "reply-to" header ? Im sending it in "Send" method as follows: stomp.SendOpt.Header("reply-to", "Some-queue-name"). But in activemq Queue's list for that particular message the "reply-to" is displayed under "Properties" instead of "Headers"
So what is the correct way to set "reply-to" header ?

V3 - Module Support

Module support is a breaking change, we're updating the import path to github.com/go-stomp/stomp/v3
no other breaking changes are introduced, see #94

Subscribe will panic if Conn is closed before

The problem
In Subscribe function, a write request is sent to writeCh of Conn

// TODO is this safe? There is no check if writeCh is actually open.
c.writeCh <- request

This will cause panic if the connection processLoop has returned before that, thus closing writeCh

panic: send on closed channel

Solution
Similar to #52 we need to check for closed connection

func (c *Conn) Subscribe(destination string, ack AckMode, opts ...func(*frame.Frame) error) (*Subscription, error) {
	c.closeMutex.Lock()
	defer c.closeMutex.Unlock()
	if c.closed {
		c.conn.Close()
		return nil, ErrClosedUnexpectedly
	}
        ...

Data race Conn

The data race detector complains that the Conn.closed boolean is read and written without synchronization from two different go routines.
Would be nice if this didn't happen.
Here is the race detection (MY_REPOS = my repository (redacted)):

==================
WARNING: DATA RACE
Read at 0x00c0421983e0 by goroutine 35:
  github.com/go-stomp/stomp.(*Conn).MustDisconnect()
      C:/Users/patrik/go/src/github.com/go-stomp/stomp/conn.go:392 +0x4c
  github.com/go-stomp/stomp.processLoop()
      C:/Users/patrik/go/src/github.com/go-stomp/stomp/conn.go:270 +0xa4d

Previous write at 0x00c0421983e0 by goroutine 11:
  github.com/go-stomp/stomp.(*Conn).Disconnect()
      C:/Users/patrik/go/src/github.com/go-stomp/stomp/conn.go:384 +0x35b
  MY_REPOS/stompclient.(*StompClient).connectionManager()
      C:/Users/patrik/go/src/MY_REPOS/stompclient/stompclient.go:332 +0xea2

Goroutine 35 (running) created at:
  github.com/go-stomp/stomp.Connect()
      C:/Users/patrik/go/src/github.com/go-stomp/stomp/conn.go:178 +0xb7f
  MY_REPOS/stompclient.(*StompClient).connectionManager()
      C:/Users/patrik/go/src/MY_REPOS/stompclient/stompclient.go:228 +0x25f6

Goroutine 11 (running) created at:
  MY_REPOS/stompclient.New()
      C:/Users/patrik/go/src/MY_REPOS/stompclient/stompclient.go:114 +0x518
  main.main()
      C:/Users/patrik/go/src/MY_REPOS/device_controller/main.go:73 +0x531
==================

I guess all places that possibly can access c.closed from different go routines needs a mutex around them.

Content-length header suppression for sending TextMessages to ActiveMQ?

Is there a way to suppress/remove the content-length header currently? I ask this because ActiveMQ uses the content-length header to determine whether to store a message as TextMessage or BytesMessage. When using a Stomp to send a TextMessage to a Stomp consumer there isn't a problem but when sending to ActiveMQ and consumed by ActiveMQ the ability marshal the incoming message becomes a bit difficult because ActiveMQ stores the message as a ByteMessage when the consumer is expecting a TextMessage.

Thanks

can not send with expires

followed code can send
w.Send("/queue/room_" + room_no, "", bs)

but followed code can not send
w.Send("/queue/room_" + room_no, "", bs, stomp.SendOpt.Receipt, stomp.SendOpt.Header("expires", "2049-12-31 23:59:59" /*time.Unix(end,0).Format("2006-01-02 15:04:05")*/ )/*, stomp.SendOpt.Header("priority", fmt.Sprintf("%d", p))*/)

ActiceMq 5.15.6
docker run -dt -P webcenter/activemq

support verb headers

I'm connecting to ActiveMQ using AckClient in an attempt to pull up to X messages and then ack the last one. Sadly, ActiveMQ requires a nonstandard activemq.prefetchSize header to be specified in the SUBSCRIBE headers for this behavior to work. Currently, the library doesn't seem to support this. Any way this could be a thing?

Is there any way I can mock stomp.Conn?

Hey guys,

I'm writing a unit test in my project using your library. But I don't want to send the event to Amazon MQ when I run a unit test.

Is there any way I can mock the stomp.Conn?

Currently, I'm trying to use FakeConn but when I pass the FakeServer to stomp.Connect(FakeServer), it's just stuck there.

func mockingActiveMQ() *stomp.Conn {
	_, fakeServer := testutil.NewFakeConn(nil)

	stompConn, err := stomp.Connect(fakeServer)
	if err != nil {
		panic(err)
	}

	return stompConn
}

use Apache license text to display on pkg.go.dev

This module's contents doesn't show up on pkg.go.dev because:

  1. The license filename isn't recognized (see https://pkg.go.dev/license-policy).
  2. The license contents would not be recognized. (Not legal advice: You've copied the Apache 2.0 header, designed to be added to the top of every file; it is not appropriate for the whole module.)

If you change the filename and use the exact text of the Apache 2.0 license, pkg.go.dev will show your module doc and readme.

Panic: send on closed channel

The function Send in stomp/conn.go (line 412) is sometimes raising a panic when writing to a queue.
Here are two images of the panic. My program is using termUI which is the reason why the stacktrace is not that readable. It seems not really to be reproduceable.

panic_activemq_stomp
panic_activemq_stomp_2

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.