Coder Social home page Coder Social logo

threedotslabs / watermill Goto Github PK

View Code? Open in Web Editor NEW
6.8K 80.0 354.0 4.42 MB

Building event-driven applications the easy way in Go.

Home Page: https://watermill.io

License: MIT License

Makefile 0.11% Go 91.03% Shell 0.21% HTML 8.65%
watermill event-driven event-sourcing sagas events stream-processing reactive go golang rabbitmq

watermill's People

Contributors

0michalsokolowski0 avatar 0xflotus avatar alexcuse avatar boreq avatar breml avatar checkmunza avatar czeslavo avatar dependabot[bot] avatar dkotik avatar elgohr avatar fossabot avatar jbszczepaniak avatar kochetkov-av avatar m110 avatar ma-hartma avatar maclav3 avatar martinforreal avatar mehran-prs avatar minghsu0107 avatar roblaszczak avatar sagikazarmark avatar simonjanss avatar smixi avatar stong1994 avatar terev avatar thpk avatar tjnet avatar unjello avatar vladtenlive avatar xswordsx avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

watermill's Issues

Integration with go cloud development toolkit pubsub

Hey guys. First of all compliments on the project! Recently i came across the pubsub abstraction in the google cloud development toolkit: https://gocloud.dev/pages/pubsub. I think it would be a great addition to your systems to integrate with this toolkit. This would prevent us writing our own implementation for AWS and Azure ourselves, and leverage the power of the CDK ๐ŸŽ‰

Wildcard subscription

It would be convinient to have ability subscribe on multiple topics via wildcard character (asterisk)

Subscribe(ctx, "user.*")

Better integration with opentracing

Jaeger is kind of multilanguage standard for opentracing -> https://www.jaegertracing.io/
The required header name

const MessageUUIDHeaderKey = "_watermill_message_uuid"

Is Watermill specific, but messages produced/consumed by this library can be used by anything else. So I can suggest a few things:

Why would it be cool to have it?
If, for example, two companies use Watermill and Jaeger, they have to do the same work twice. The integration is faster and more comfortable then.

MySQL Binlog subscriber

MySQL binlog subscriber may be useful for implementing tool like Debezium.

To check before implementation:

This implementation should be added in https://github.com/ThreeDotsLabs/watermill-sql/ repository. It need to be compatible with already existing SQL Subscriber.

That means, that you should use SQL publisher for publishing messages, and binlog subscriber to read them.
We would, of course, keep already existing SQL based subscriber.

Add CloudEvents integration

CloudEvents is a specification for sending events in the cloud native environment. They also provide SDKs.

It might be a good addition to the project to support CloudEvents. Although it's not stable yet, as of 0.3 the "core" is considered to be stable enough.

Cross-languages example

@roblaszczak
The Architectural constructs are really nice.
But if someone has to use python or rust or blah blah for their "MicroService" what options are there ?

GRPC API wrapping Watermill BUS and SQL layers ?
GRPC Gateway wrapping GRPC ?
Other ideas ?

Or OUT OF SCOPE ?

Failing "go get -u github.com/ThreeDotsLabs/watermill" on Windows

Following fails on my Windows. It looks like watermill is having some exotic external dependencies which do not work out of the box on Windows, neither it is documented bzr is required.

This makes it hard to get a working environment on at least Windows to use watermill. Would be great if this could be simplyfied. Or maybe some of external deps to be internalized.

$ go get -u github.com/ThreeDotsLabs/watermill                                                                                        go: finding github.com/armon/go-metrics latest                                                                                        go: finding github.com/streadway/amqp latest                                                                                          go: finding github.com/pascaldekloe/goe latest                                                                                        go: finding golang.org/x/net latest
go: finding golang.org/x/sync latest
go: finding golang.org/x/oauth2 latest
go: finding github.com/eapache/go-xerial-snappy latest
go: finding golang.org/x/crypto latest
go: finding github.com/google/btree latest
go: finding golang.org/x/sys latest
go: finding golang.org/x/lint latest
go: finding google.golang.org/genproto latest
go: finding golang.org/x/build latest
go: finding github.com/shurcooL/gopherjslib latest
go: labix.org/v2/[email protected]: bzr branch --use-existing-dir https://launchpad.net/mgo/v2 . in C:\private-s tuff\go-workspace\pkg\mod\cache\vcs\ca61c737a32b1e09a0919e15375f9c2b6aa09860cc097f1333b3c3d29e040ea8: exit status 4:
    bzr: ERROR: httplib.IncompleteRead: IncompleteRead(34 bytes read)

    Traceback (most recent call last):
         File "bzrlib\commands.pyo", line 920, in exception_to_return_code
         File "bzrlib\commands.pyo", line 1131, in run_bzr
         File "bzrlib\commands.pyo", line 673, in run_argv_aliases
         File "bzrlib\commands.pyo", line 695, in run
         File "bzrlib\cleanup.pyo", line 136, in run_simple
         File "bzrlib\cleanup.pyo", line 166, in _do_with_cleanups
         File "bzrlib\builtins.pyo", line 1438, in run
         File "bzrlib\controldir.pyo", line 779, in open_tree_or_branch
         File "bzrlib\controldir.pyo", line 459, in _get_tree_branch
         File "bzrlib\bzrdir.pyo", line 1082, in open_branch
         File "bzrlib\branch.pyo", line 2375, in open
         File "bzrlib\controldir.pyo", line 687, in open
         File "bzrlib\controldir.pyo", line 716, in open_from_transport
         File "bzrlib\transport\__init__.pyo", line 1718, in do_catching_redirections
         File "bzrlib\controldir.pyo", line 704, in find_format
         File "bzrlib\controldir.pyo", line 1149, in find_format
         File "C:/Program Files (x86)/Bazaar/plugins\git\__init__.py", line 235, in probe_transport
         File "C:/Program Files (x86)/Bazaar/plugins\git\__init__.py", line 182, in probe_http_transport
         File "socket.pyo", line 348, in read
         File "httplib.pyo", line 522, in read
         File "httplib.pyo", line 565, in _read_chunked
    IncompleteRead: IncompleteRead(34 bytes read)

    bzr 2.5.1 on python 2.6.6 (Windows-post2008Server-6.2.9200)
    arguments: ['bzr', 'branch', '--use-existing-dir',
        'https://launchpad.net/mgo/v2', '.']
    plugins: bzrtools[2.5.0], changelog_merge[2.5.1], colo[0.4.0],
        explorer[1.2.2], fastimport[0.14.0dev], git[0.6.8], launchpad[2.5.1],
        loom[2.3.0dev], netrc_credential_store[2.5.1], news_merge[2.5.1],
        pipeline[1.4.0], qbzr[0.22.3], rewrite[0.6.4dev], svn[1.2.2],
        upload[1.2.0dev], xmloutput[0.8.8]
    encoding: 'cp1252', fsenc: 'mbcs', lang: 'en_US.UTF-8'

    *** Bazaar has encountered an internal error.  This probably indicates a
          bug in Bazaar.  You can help us fix it by filing a bug report at
              https://bugs.launchpad.net/bzr/+filebug
          including this traceback and a description of the problem.
go: finding grpc.go4.org latest
go: finding golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e
go: golang.org/x/[email protected]: unknown revision 1e06a53dbb7e
go: finding github.com/shurcooL/github_flavored_markdown latest
go: finding github.com/shurcooL/httpgzip latest
go: launchpad.net/[email protected]: bzr branch --use-existing-dir https://launchpad.net/~niemeyer/gocheck/t runk . in C:\private-stuff\go-workspace\pkg\mod\cache\vcs\f46ce2ae80d31f9b0a29099baa203e3b6d269dace4e5357a2cf74bd109e13339: exit stat us 4:
    bzr: ERROR: httplib.IncompleteRead: IncompleteRead(34 bytes read)

    Traceback (most recent call last):
         File "bzrlib\commands.pyo", line 920, in exception_to_return_code
         File "bzrlib\commands.pyo", line 1131, in run_bzr
         File "bzrlib\commands.pyo", line 673, in run_argv_aliases
         File "bzrlib\commands.pyo", line 695, in run
         File "bzrlib\cleanup.pyo", line 136, in run_simple
         File "bzrlib\cleanup.pyo", line 166, in _do_with_cleanups
         File "bzrlib\builtins.pyo", line 1438, in run
         File "bzrlib\controldir.pyo", line 779, in open_tree_or_branch
         File "bzrlib\controldir.pyo", line 459, in _get_tree_branch
         File "bzrlib\bzrdir.pyo", line 1082, in open_branch
         File "bzrlib\branch.pyo", line 2375, in open
         File "bzrlib\controldir.pyo", line 687, in open
         File "bzrlib\controldir.pyo", line 716, in open_from_transport
         File "bzrlib\transport\__init__.pyo", line 1718, in do_catching_redirections
         File "bzrlib\controldir.pyo", line 704, in find_format
         File "bzrlib\controldir.pyo", line 1149, in find_format
         File "C:/Program Files (x86)/Bazaar/plugins\git\__init__.py", line 235, in probe_transport
         File "C:/Program Files (x86)/Bazaar/plugins\git\__init__.py", line 182, in probe_http_transport
         File "socket.pyo", line 348, in read
         File "httplib.pyo", line 522, in read
         File "httplib.pyo", line 565, in _read_chunked
     IncompleteRead: IncompleteRead(34 bytes read)

     bzr 2.5.1 on python 2.6.6 (Windows-post2008Server-6.2.9200)
     arguments: ['bzr', 'branch', '--use-existing-dir',
        'https://launchpad.net/~niemeyer/gocheck/trunk', '.']
    plugins: bzrtools[2.5.0], changelog_merge[2.5.1], colo[0.4.0],
        explorer[1.2.2], fastimport[0.14.0dev], git[0.6.8], launchpad[2.5.1],
        loom[2.3.0dev], netrc_credential_store[2.5.1], news_merge[2.5.1],
        pipeline[1.4.0], qbzr[0.22.3], rewrite[0.6.4dev], svn[1.2.2],
        upload[1.2.0dev], xmloutput[0.8.8]
    encoding: 'cp1252', fsenc: 'mbcs', lang: 'en_US.UTF-8'

    *** Bazaar has encountered an internal error.  This probably indicates a
          bug in Bazaar.  You can help us fix it by filing a bug report at
          https://bugs.launchpad.net/bzr/+filebug
          including this traceback and a description of the problem.
go get: error loading module requirements

Event Sourcing Completeness

Event Sourcing Completeness

As of today, we have a fantastic infrastructure to build a complete Event
Sourced system with few components.

  • Messages
  • Pub/Sub
    • Routing
    • Middlewares

And we have few components related to Event Sourcing following the CQRS docs.

However, I would like to track the completeness of Watermill based on what you
are trying to accomplish.

Completeness could mean either it supports the feature with some level of
abstraction and/or has some documentation about it helping to implement by
themselves.

These are some notes of what I think the completeness list would be, but feel
free to send feedback about it, I am more interested in starting a discussion
and have a thread for it.

Commands

  • Command Bus: Message bus for commands.
  • Command: data structure representing a command.
  • Command Handler: handles the incoming commands.
  • Router: routes the incoming command to its command handler.
  • Validations: validates the command before dispatching the command.

Events

  • Event Bus: Message bus for events.
  • Event: data structure representing an event.
  • Event Handler: handles the incoming events.
  • Event Store: stores events.
  • Snapshot: a snapshot of the event stream.

Extra

Also, add some extra abstractions or documentation, helping people to deal with
common use cases.

  • Idempotence: refers to the ability of a system to produce the same
    outcome even if an event or message is received more than once.
  • Aggregate Id: This field is used to associate the particular event to
    a specific Aggregate Root.
  • Causation ID: the ID of the command causing an event, or the event
    causing a command dispatch.
  • Correlation ID: the ID used to correlate related commands/events.

Aggregates

  • Aggregates: is comprised of its state (Aggregate Root), public command
    functions (Command Handlers), and state mutators (Event Handlers).
  • Aggregate Root: data structure representing an aggregate.

Process Manager

  • Process Manager: is responsible for coordinating one or more
    workflows. It handles events and dispatches commands in response.

Saga

Saga: distribution of multiple workflows across multiple systems,
each providing a path (fork) of compensating actions if any of the steps
in the workflow fails.

Read Model Projection

  • Read Model Projection: read model can be built using an event handler
    and whichever storage provider you prefer.

Thoughts?

SMTP-based MQ engine

I was looking for an SMTP-based MQ pub/sub capability: basically using gmail or any other SMTP provider as a "poor man's" MQ for small number of messages. Has anyone seen such a system that can be wrapped as a Watermill component?

[watermill-amqp] construct subscriptions using a single AMQP connection

At present, amqp/subscriber.go establishes a new AMQP connection each time a Subscriber is instantiated. RabbitMQ recommends sharing a single connection. The AMQP connection is heavy, consuming ~ 100 KB of RAM each on the RabbitMQ side and slow to establish. (Citation.)

Ideally, Watermill's amqp.NewSubscriber method would automatically reuse a single connection, creating instead a new Channel. If this isn't feasible, please consider adding a new method that accepts an existing *amqp.Connection parameter (vs the current amqp.Config param.)

It's worth mentioning, the above might require updating reconnect logic, to manage the case of concurrent reconnect attempts by multiple subscribers sharing the same connection.

Redis base MQ

Redis 5.0 release new data type: Stream
That's really powerful mq now.
Besides, redis below version 5, may use data type "list" to implement a mq (i think it should fit software framework that has not much third party dependencies)

Add context to the message

It might be a good idea to add a context.Context to the message (just like in case of http.Request) to allow passing information through API boundaries. This could be a way to implement distributed tracing for example.

HTTP Publisher

HTTP publisher implementation will work like webhooks based on produced messages.

[watermill-kafka] Stress test fails in CI

Because of too big load, Watermill Kafka tests are failing in CI. Locally the problem also occurs, until I will not set ulimit.

They are probably multiple solutions:

  • find a way to set ulimit in CI (I tried to do it in docker-compose config, but didn't help)
  • find, if we can fix something in the code in order to limit the load
  • reduce the number of parallel tests (not preferred)

Example failed build: https://circleci.com/gh/ThreeDotsLabs/watermill-kafka/235?utm_campaign=vcs-integration-link&utm_medium=referral&utm_source=github-build-link

Cannot install latest master

Trying to install latest master using go modules and replace:

replace github.com/ThreeDotsLabs/watermill v0.4.0 => github.com/ThreeDotsLabs/watermill v0.4.1-0.20190601181058-54fc7f5042f7

I receive the following error:

go: github.com/nats-io/[email protected]: unknown revision v0.4.5
go: error loading module requirements

Looks like the repo has been archived and there is no such revision.

YugabyteDB

Have you seen yubabyte db. Its 100% open equivalent to Cockroach.
https://github.com/yugabyte/yugabyte-db

It uses a Postresql api
have redis api and cassandra api too but its all done with the underlying yugabyte engine.
Pretty cool and its basis is event driven. It called the READ ONLY instances Tablets ( cute name ).

SO i noticed you only have mysql diver at the moment.

EDIT: My mistake. Looks like you have a postresql driver happening here: https://github.com/ThreeDotsLabs/watermill-sql/blob/master/pkg/sql/schema_adapter_postgresql.go
SO i think this might be perfect for using the DB as a Message queue.
yugabyte has a proper CDC engine so its possibel to listen to changes and then populate the "PUB SUB" tables as needed. A the moment i think the engine is Java based ( OMG :)).
SO i raised this issue and it looks like they are sort of going to make it work with golang: yugabyte/yugabyte-db#2513

Logging error when produced Messages and returned error

When error is returned in handler we are discarding produced messages.

	producedMessages, err := handler(msg)
	if err != nil {
		h.logger.Error("Handler returned error", err, nil)
		msg.Nack()
		return
	}

	if err := h.publishProducedMessages(producedMessages, msgFields); err != nil {
		h.logger.Error("Publishing produced messages failed", err, nil)
		msg.Nack()
		return
	}

We should log error in this situation.

Define interface for infrastructure component

I think it is a good idea to define an interface for different infrastructure to unify message processing logic. Also, It would help when we need to write unit test cases.

Here we defined an interface for Config.

End-user may need to get configuration from the command prompt. Also, there are a few utilities to bind command flag with config files and environment variables. Such as github.com/spf13/pflags. And there is no need to handle type conversion etc if config struct has such utilities.

reuse nats streaming connection

I think it would be a good idea to reuse nats connection if I need to run publisher and subscriber in one program.

The scenario would be as follows

I need to subscribe a queue to get command from other agent, and send reply by issuing events in another queue. So I need to setup a publisher and subscriber in one program. But I need to provide two different client-id.

Retrieve Kafka topic partitions offset

I have an API service which needs to consume a Kafka compacted topic before to be considered ready to handle any request traffic. How can I determine its readiness?

In this case, it would be best to know how many messages are still left to process before to have catched up with the latest messages on the topic. It seems there is no way to know this information currently.

I suggest the partitions offset to be retrieved at the time the consumer subscribes to a topic (from all partitions consumed). An alternative would be to include a field to messages, a flag IsLatest for example to let the consumer knows that a message was the last one at the time of retrieval (of course the offsets are keeping growing therefore it should not be considered as an absolute indication, it's time sensitive).

The best I can do as a workaround for now is to infer it. I can use the throughput and assume I've reached the end of the topic once the number of message/sec significantly dropped (during catch up phase, the consumer processes as much messages as it can, then after only as much as "live" events currently produced on the topic).

I'll be happy to know other alternatives people might have come up with. Thanks.

Middleware per handler

Currently, you are not able to use middleware per handler in "clean way" (you can still manually wrap the handler function, or have separated router).

It would be nice to be able to add middleware per handler.

As a good reference point, you can use chi router: https://github.com/go-chi/chi

[watermill-amqp] What is the mechanism of exception handling?

If my rabbmit restarts

How to reestablish channel

[watermill] 2020/03/13 08:59:33.510953 subscriber.go:166: level=INFO msg="Starting consuming from AMQP channel" amqp_exchange_name= amqp_queue_name=listen_result_b111ad19-e4c0-4730-805c-3a44b5b7727f topic=listen_result_b111ad19-e4c0-4730-805c-3a44b5b7727f

Retry/Middlewares + CQRS busted?

Hey gang, I am opening this as a quick sanity check, I am using CQRS facade and ran into an interesting issue:

If a handler returns an error (or panics) it will retry that handler until the message is successfully processed. I tried using Retry middleware, but it does not work. The reason being, this line

returns an err (which is the err out of the handler) up the stack, instead of processing it inside of the retryLoop

simply changing it to

return nil, nil

fixes the issue.

Whats odd (but perhaps separate) is that Recoverer middleware has no effect on actual panics in CQRS.

In addition, I observed some weird behavior with backoff, and taking a look at the code I don't see how it accounts for backoff time.

I am happy to submit a PR to change that line if its wrong, but I am afraid I don't quite understand how messages are handled and it could easily break something somewhere else.

PostgreSQL LISTEN/NOTIFY Subscriber

The same as #5, but for PostgreSQL :)

This implementation should be added in https://github.com/ThreeDotsLabs/watermill-sql/ repository. It need to be compatible with already existing SQL Subscriber.

That means, that you should use SQL publisher for publishing messages, and binlog subscriber to read them.
We would, of course, keep already existing SQL based subscriber.

I'm not expert in PostgreSQL area, so any comments and ideas are welcome! ๐Ÿ˜‰

It probably depends on #127.

Found data race on Close() (in Router and GoChannel). Version v1.0.0-rc.1

Hello! I found some data races (probably).
Am I doing something wrong maybe (wrong closing)?

Code:

type Server struct {
	serverPubSub *gochannel.GoChannel
	clientPubSub *gochannel.GoChannel
}
...

logger := log.NewWatermillLogAdapter(lggr)
ServerPubSub = gochannel.NewGoChannel(gochannel.Config{}, logger)
ClientPubSub = gochannel.NewGoChannel(gochannel.Config{}, logger)

...
defer s.Stop()

func (s *Server) Stop() {
	err := s.clientPubSub.Close()
	if err != nil {
		panic(err)
	}
	err = s.serverPubSub.Close()
	if err != nil {
		panic(err)
	}
}
...

Logs for gochannel.Gochannel.Close() (for message.Publisher)
go test -failfast -count 1 -race ./ledger/light/integration -run '^Test_BootstrapCalls$'

==================
WARNING: DATA RACE
Write at 0x00c0000fa288 by goroutine 7:
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/pubsub/gochannel.(*GoChannel).Close()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/pubsub/gochannel/pubsub.go:272 +0xad
  github.com/insolar/insolar/ledger/light/integration_test.(*Server).Stop()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/integration/server_test.go:483 +0x122
  github.com/insolar/insolar/ledger/light/integration_test.Test_BootstrapCalls()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/integration/light_test.go:64 +0x499
  testing.tRunner()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:865 +0x163

Previous read at 0x00c0000fa288 by goroutine 81:
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/pubsub/gochannel.(*GoChannel).Publish()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/pubsub/gochannel/pubsub.go:83 +0x61
  github.com/insolar/insolar/insolar/bus.(*Bus).sendTarget()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/insolar/bus/bus.go:261 +0xf81
  github.com/insolar/insolar/insolar/bus.(*Bus).SendRole()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/insolar/bus/bus.go:179 +0x5ba
  github.com/insolar/insolar/ledger/light/proc.(*HotObjects).sendConfirmationToHeavy()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/proc/hot_data.go:175 +0x64f
  github.com/insolar/insolar/ledger/light/proc.(*HotObjects).Proceed()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/proc/hot_data.go:157 +0x1368
  github.com/insolar/insolar/insolar/flow/internal/thread.(*Thread).procedure.func1()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/insolar/flow/internal/thread/thread.go:143 +0x56

Goroutine 7 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:916 +0x65a
  testing.runTests.func1()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1157 +0xa8
  testing.tRunner()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:865 +0x163
  testing.runTests()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1155 +0x523
  testing.(*M).Run()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1072 +0x2eb
  main.main()
      _testmain.go:76 +0x222

And this - data race for message.Router.Close():

		inRouter, _ := message.NewRouter(message.RouterConfig{}, logger)
		outRouter,_ := message.NewRouter(message.RouterConfig{}, logger)
...

defer s.Stop()
...

func (s *Server) Stop() {
	err := s. inRouter.Close()
	if err != nil {
		panic(err)
	}
	err = s. outRouter.Close()
	if err != nil {
		panic(err)
	}
}

Logs for message.Router.Close() (for message.Publisher)
go test -failfast -count 1 -race ./ledger/light/integration -run '^Test_BasicOperations$'

==================
WARNING: DATA RACE
Read at 0x00c00058e5a0 by goroutine 141:
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message.(*Router).Close()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message/router.go:350 +0x4f
  github.com/insolar/insolar/ledger/light/integration_test.(*Server).Stop()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/integration/server_test.go:499 +0x122
  github.com/insolar/insolar/ledger/light/integration_test.Test_BasicOperations()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/ledger/light/integration/light_test.go:195 +0x4cf
  testing.tRunner()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:865 +0x163

Previous write at 0x00c00058e5a0 by goroutine 229:
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message.(*Router).Close()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message/router.go:353 +0x6c
  github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message.(*Router).closeWhenAllHandlersStopped()
      /Users/ilyamarkin/go/src/github.com/insolar/insolar/vendor/github.com/ThreeDotsLabs/watermill/message/router.go:334 +0x168

Goroutine 141 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:916 +0x65a
  testing.runTests.func1()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1157 +0xa8
  testing.tRunner()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:865 +0x163
  testing.runTests()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1155 +0x523
  testing.(*M).Run()
      /usr/local/Cellar/go/1.12.7/libexec/src/testing/testing.go:1072 +0x2eb
  main.main()
      _testmain.go:76 +0x222

amqp message rejected if header is not string

I've got message with header that is not string, and it gets rejected by watermill:
[watermill] 2019/04/17 15:48:14.371452 subscriber.go:245: level=ERROR msg="Processing message failed, sending nack" amqp_exchange_name=x amqp_queue_name=y err="metadata x-death is not a string, but []interface {}....

x-death header is added by dead letter, and I would like to process this message again after some time in DL.

My flow looks like this:

  1. try to process message,
  2. after few NACKs message gets moved to dead letter queue not to block main queue,
  3. there's policy on DL that moves message back to main queue after some few minutes to try to process it again.

Most simple solution is to ignore headers that are not strings and not add them to Metadata - this would work me.

I'm willing to make fix/PR, but please advise if that's good solution or should I do it in some other way (maybe flattening

Clarification for reading multiple events

In the docs BookingsFinancialReport is only listening for RoomBooked however what if it needed to listen to multiple event types to form the read model.

My guess is you'd have to have a composite of struct each listening for a single event type. Wanted to avoid boilerplate but just making sure before moving forward.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.