Elixir RabbitMQ Client
def deps do
[
{:lapin, "~> 1.0.0"}
]
end
Documentation can be be found at https://hexdocs.pm/lapin.
Elixir RabbitMQ Client
License: MIT License
Elixir RabbitMQ Client
def deps do
[
{:lapin, "~> 1.0.0"}
]
end
Documentation can be be found at https://hexdocs.pm/lapin.
๐ Thank you for great library, it's very easy to use.
With a time we started getting the connection.close
exception. It restores the connection very quickly, but it spams Sentry with exceptions.
Is it possible to handle this and log the errors more gracefully?
GenServer MyApp.Workers.Lapin.AMQPConsumerWorker terminating
** (stop) exited in: :gen_server.call(#PID<0.4984.0>, {:command, {:close, {:"connection.close", 200, "Goodbye", 0, 0}, 70000}}, 70000)
** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
(stdlib 5.2.2) gen_server.erl:419: :gen_server.call/3
(amqp 3.3.0) lib/amqp/connection.ex:258: AMQP.Connection.close/1
(stdlib 5.2.2) gen_server.erl:1143: :gen_server.try_terminate/3
(stdlib 5.2.2) gen_server.erl:1339: :gen_server.terminate/10
(stdlib 5.2.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
Last message: {:DOWN, #Reference<0.1575501252.1150025729.99474>, :process, #PID<0.4984.0>, :socket_closed_unexpectedly}
mix.lock
lapin: 1.0.6
amqp: 3.3.0
amqp_client: 3.12.13
Thank you.
Make sure all RabbitMQ tutorial patterns are correctly implemented. RPC is missing, others might need some polishing.
Hi,
I'm using Lapin as a producer only, with the following config:
config :lapin, :connections, [
[
module: TestLapin.Worker,
channels: [
[
role: :producer,
exchange: "my_exchange",
exchange_type: :fanout,
queue: "my_queue"
]
]
]
]
And TestLapin.Worker
is just a dummy module:
defmodule TestLapin.Worker do
use Lapin.Connection
end
I can see on my rabbit that a queue my_queue
is created and bound with my_exchange
and an empty routing key. I don't think Lapin should do this, because AFAIK we publish messages on rabbit straight to a exchange and don't need a queue for that. This causes the following problem:
If I publish a message without a routing key:
:ok = TestLapin.Worker.publish("my_exchange", "", "payload")
It is routed to my_queue
and stays there forever, waiting to be consumed. If I use any routing key this doesn't happen because the queue is bound with an empty routing key.
So, I think Lapin should not create and bind a queue when configuring a producer.
Thanks.
I face compilation error type heartbeaters() undefined
I searched Lapin documentation and github issues for Lapin and could not find anything related to heartbeat
It seems if I wish to make a worker a publisher and a consumer it fails when trying to publish.
Hi, the documentation has errors and typos whereby the code is not workable.
In config missing a comma, no end on some defs, etc.
Also it returns the following error:
Lapin.Worker is not loaded and could not be found
Am trying this in latest Elixir in Phoenix 1.4
Hey the publishing with this library seems a bit odd.
In the config we specify which queue, exchange, and worker will be publishing but the publish function doesn't use any of this context.
An example config:
config :lapin, :connections, [
[
handle: :myhandle,
channels: [
[
role: :producer,
worker: MyApp.SomeWorker,
exchange: "some_exchange",
queue: "some_queue"
]
]
]
]
An example of how a SomeWorker should not need to specify "exchange" or the "handle".
defmodule MyApp.SomeWorker do
use Lapin.Worker
def publish do
Worker.Publish("routing_key", %Lapin.Message{}, [])
end
end
Some AMQP attributes like exclusive bindings are currently missing from Lapin.Pattern
and therefore not supported via either static or dynamic configuration.
I've started to work a lot with elixir and I've noticed the best frameworks are the ones that provide some sort of sandbox for a developer to substitute in when they write test cases. Currently, to test against lapin, I need to have an actual rabbitmq host running. It would be nice to have an in-memory sandbox that I could substitute in to test against.
The scope would look something like this:
Hi @lucacorti, first of all thanks for the library. Appreciate it a lot open-sourcing the library.
I am wondering how I can test consumer modules without having to run Rabbit MQ in my test environment.
Given I have a module like this one below, I'd like to verify the behavior when receiving the message with routing key foo.bar
.
defmodule MyProducer do
use Lapin.Connection
def handle_deliver(_channel, %Lapin.Message{
meta: %{routing_key: "foo.bar"},
payload: payload
}) do
# here be dragons
end
def handle_deliver(_channel, _message) do
:ok
end
end
Calling the function in a test context with a struct created manually for channel and message fails with an argument error.
Thanks in advance!
It seems Lapin currently doesn't support wildcards in routing keys correctly.
Given this configuration
config :lapin, :connections, [
[
uri: "amqp://mq",
module: MessagePublisher,
channels: [
[
role: :producer,
exchange: "events",
exchange_type: :topic,
queue: "lapin_test",
queue_durable: true,
publisher_persistent: true,
routing_key: "*"
]
]
]
]
And publisher:
defmodule MessagePublisher do
use Lapin.Connection
end
Sending a message like this
MessagePublisher.publish("events", "foo", %{bar: :baz} |> Poison.encode!)
results in an error message:
{:error, "Error publishing message: no channel for exchange 'events' with routing key 'foo'"}
Message publishing should have occurred without any problem as the routing key set when publishing message should have matched the wildcard used in the binding.
For example, the callback handle_deliver
is specified as the following:
@callback handle_deliver(Channel.t(), Message.t()) :: on_deliver
But if we look at the code where it is invoked:
Lines 315 to 329 in 9b7d69a
We will find that its first parameter should be a Lapin.Consumer.t()
.
I guess that all types of the Lapin.Connection
callback parameters that are now AMQP.Channel.t()
need to be corrected.
When a connection occurs, Lapin outputs all fields in its configuration, even username and password.
Sensitive data like these should be obfuscated
[error] Connection error: unknown_host for [port: 5672, uri: :pop, module: EventStore.MessagePublisher,
channels: [
[
exchange_type: :topic,
exchange: "events",
queue: "event_store",
queue_durable: true,
routing_key: "*",
publisher_persistent: true,
consumer_ack: true,
role: :producer
],
[
exchange_type: :topic,
exchange: "events",
queue: "event_store",
queue_durable: true,
routing_key: "*",
publisher_persistent: true,
consumer_ack: true,
role: :consumer]
],
password: "G4UyiJx6",
username: "8014da67eeb9b027c7a79f473c2a5098",
virtual_host: "/",
host: 'mq'
], backing off for 1000
(Note: output is idented for readability)
Publisher confirms with prefetch_count
> 1 need pma/amqp#41 to be solved before they can be supported properly, another option could be to switch to amqp_client
.
Currently connection establishment is synchronous and badly handled. If a broker is unavailable the startup process just hangs.
Possibly investigate the use of the Connection library to implement a robust async connection with backoff mechanism.
Hello everyone,
I'm trying to get a message from RabbitMQ with Lapin, but he still rejecting my messages.
Trace
16:46:04.174 [error] Rejected message 1: %Poison.ParseError{pos: 0, rest: nil, value:
%Lapin.Message{meta: %{app_id: :undefined, cluster_id: :undefined, consumer_tag:
"amq.ctag-CRhlrygmoOi-AnGydwdCXw", content_encoding: :undefined, content_type: nil,
correlation_id: :undefined, delivery_tag: 1, exchange: "", expiration: :undefined, headers: [],
message_id: :undefined, persistent: false, priority: :undefined, redelivered: false, reply_to:
:undefined, routing_key: "credit", timestamp: :undefined, type: :undefined, user_id:
:undefined}, payload: "{\"userId\": \"844e9ac6-3e39-45d1-bf57-84fc3395a479\", \"planId\":
\"af6253b4-f4bb-4f77-b731-ba7502a39d92\"}"}}
My config
config :lapin, :connections, [
[
module: Worker,
uri: "amqp://guest:guest@queue",
channels: [
[
role: :consumer,
queue: "credit",
exchange: "credit",
routing_key: "credit"
]
]
]
]
And finally, my file with the handle_deliver
defmodule Worker do
use Lapin.Connection
use Application
require Logger
def start(_type, _args) do
Worker.Repo.start_link
end
def handle_deliver(_channel, message) do
json = message |> Poison.decode!
plan = Worker.Repo.get Worker.Schema.Plan, json["planId"]
user = Worker.Repo.get Worker.Schema.User, json["userId"]
for val <- 0..plan.duration do
initial_date = Date.utc_today
end_date = get_end_date(initial_date, val, plan.duration, plan.credits_periodicity)
credit = %Worker.Schema.Credit{
id: Ecto.UUID.generate(),
used: false,
start_date: initial_date,
finish_date: end_date,
user_id: user.id,
created_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second),
updated_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
}
Worker.Repo.insert! credit
:ok
end
end
defp get_end_date(initial_date, val, duration, periodicity) when duration != periodicity do
initial_date
|> Date.add(periodicity+val)
end
defp get_end_date(initial_date, _val, duration, periodicity) when duration == periodicity do
initial_date
|> Date.add(periodicity)
end
end
Compiling leads to no warnings using the workaround for OTP 27 but handling a message while the app is running leads to the following warning
warning: using module.function() notation (with parentheses) to fetch map field :payload is deprecated, you must remove the parentheses: map.field
(myapp 0.1.0) lib/myapp/worker.ex:7: Myapp.Worker.handle_deliver/2
(lapin 1.0.7) lib/lapin/connection.ex:326: Lapin.Connection.consume/3
This is my configuration
config :lapin, :connections, [
[
module: consumer,
uri: System.get_env("AMQP_URL"),
channels: [
[
role: :consumer,
pattern: Lapin.Pattern.Config,
exchange: exchange,
queue: queue,
routing_key: queue,
exchange_type: :topic,
consumer_ack: true,
consumer_prefetch: 1,
queue_arguments: [
{"x-dead-letter-exchange", exchange_dlx},
{"x-dead-letter-routing-key", queue_dlx}
]
],
[
role: :producer,
pattern: Lapin.Pattern.Config,
exchange: exchange,
queue: queue,
routing_key: queue,
exchange_type: :topic,
consumer_ack: true,
consumer_prefetch: 1,
queue_arguments: [
{"x-dead-letter-exchange", exchange_dlx},
{"x-dead-letter-routing-key", queue_dlx}
]
]
]
],
[
module: another_consumer,
uri: System.get_env("AMQP_URL"),
channels: [
[
role: :consumer,
pattern: Lapin.Pattern.Config,
exchange: exchange,
queue: queue,
routing_key: queue,
exchange_type: :topic,
consumer_ack: true,
consumer_prefetch: 1,
queue_arguments: [
{"x-dead-letter-exchange", exchange_dlx},
{"x-dead-letter-routing-key", queue_dlx}
]
]
]
]
]
This one comes with the error duplicated id Lapin.Connection found in the supervisor specification, please explicitly pass the :id option when defining this worker/supervisor
.
After some digging in, I found the issue may be this. It should provide id for each worker
We only have the ability to set the amqp url in the configuration. This means that we can't specify the host, port, virtual_host, username and password but just a simple uri. It would be nice to be able to set a uri as part of configuration.
I've also opened pma/amqp#77 to make it easy to pass in the full configuration along with a uri.
Message payload could be handled with a protocol which could be used to encode/decode the data and set relevant message properties (like content-type and headers).
This way users could provide custom payload structs and protocol implementations.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.