Coder Social home page Coder Social logo

lapin's Introduction

Lapin

Elixir RabbitMQ Client

Installation

def deps do
  [
    {:lapin, "~> 1.0.0"}
  ]
end

Documentation can be be found at https://hexdocs.pm/lapin.

lapin's People

Contributors

bdusauso avatar declaneugeneleekennedy avatar dependabot-preview[bot] avatar dependabot[bot] avatar lswith avatar lucacorti avatar ntraum avatar pasikok avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

lapin's Issues

Gracefully shutdown connection on connection.close

๐Ÿ‘‹ Thank you for great library, it's very easy to use.

With a time we started getting the connection.close exception. It restores the connection very quickly, but it spams Sentry with exceptions.

Is it possible to handle this and log the errors more gracefully?

GenServer MyApp.Workers.Lapin.AMQPConsumerWorker terminating
** (stop) exited in: :gen_server.call(#PID<0.4984.0>, {:command, {:close, {:"connection.close", 200, "Goodbye", 0, 0}, 70000}}, 70000)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib 5.2.2) gen_server.erl:419: :gen_server.call/3
    (amqp 3.3.0) lib/amqp/connection.ex:258: AMQP.Connection.close/1
    (stdlib 5.2.2) gen_server.erl:1143: :gen_server.try_terminate/3
    (stdlib 5.2.2) gen_server.erl:1339: :gen_server.terminate/10
    (stdlib 5.2.2) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
Last message: {:DOWN, #Reference<0.1575501252.1150025729.99474>, :process, #PID<0.4984.0>, :socket_closed_unexpectedly}

mix.lock

lapin: 1.0.6
amqp: 3.3.0
amqp_client: 3.12.13

Thank you.

Problem with messages staying on the producer queue

Hi,

I'm using Lapin as a producer only, with the following config:

config :lapin, :connections, [
  [
    module: TestLapin.Worker,
    channels: [
      [
        role: :producer,
        exchange: "my_exchange",
        exchange_type: :fanout,
        queue: "my_queue"
      ]
    ]
  ]
]

And TestLapin.Worker is just a dummy module:

defmodule TestLapin.Worker do
  use Lapin.Connection
end

I can see on my rabbit that a queue my_queue is created and bound with my_exchange and an empty routing key. I don't think Lapin should do this, because AFAIK we publish messages on rabbit straight to a exchange and don't need a queue for that. This causes the following problem:

If I publish a message without a routing key:

:ok = TestLapin.Worker.publish("my_exchange", "", "payload")

It is routed to my_queue and stays there forever, waiting to be consumed. If I use any routing key this doesn't happen because the queue is bound with an empty routing key.

So, I think Lapin should not create and bind a queue when configuring a producer.

Thanks.

type heartbeaters() undefined

I face compilation error type heartbeaters() undefined

I searched Lapin documentation and github issues for Lapin and could not find anything related to heartbeat

Documentation has lot of errors

Hi, the documentation has errors and typos whereby the code is not workable.

In config missing a comma, no end on some defs, etc.

Also it returns the following error:

Lapin.Worker is not loaded and could not be found

Am trying this in latest Elixir in Phoenix 1.4

Publishing

Hey the publishing with this library seems a bit odd.

In the config we specify which queue, exchange, and worker will be publishing but the publish function doesn't use any of this context.

An example config:

config :lapin, :connections, [
  [
    handle: :myhandle,
    channels: [
      [
        role: :producer,
        worker: MyApp.SomeWorker,
        exchange: "some_exchange",
        queue: "some_queue"
      ]
    ]
  ]
]

An example of how a SomeWorker should not need to specify "exchange" or the "handle".

defmodule MyApp.SomeWorker do
  use Lapin.Worker

  def publish do
    Worker.Publish("routing_key", %Lapin.Message{}, [])
  end
end

Lapin Sandbox

I've started to work a lot with elixir and I've noticed the best frameworks are the ones that provide some sort of sandbox for a developer to substitute in when they write test cases. Currently, to test against lapin, I need to have an actual rabbitmq host running. It would be nice to have an in-memory sandbox that I could substitute in to test against.

The scope would look something like this:

  • doesn't require an amqp server
  • allows the control of messages through test harnesses (I can determine when I wish to send a message)
  • allows messages to be prepopulated on the queue

Test consumers without RabbitMQ running

Hi @lucacorti, first of all thanks for the library. Appreciate it a lot open-sourcing the library.

I am wondering how I can test consumer modules without having to run Rabbit MQ in my test environment.

Given I have a module like this one below, I'd like to verify the behavior when receiving the message with routing key foo.bar.

defmodule MyProducer do
  use Lapin.Connection

  def handle_deliver(_channel, %Lapin.Message{
        meta: %{routing_key: "foo.bar"},
        payload: payload
      }) do
  # here be dragons
  end

  def handle_deliver(_channel, _message) do
    :ok
  end
end

Calling the function in a test context with a struct created manually for channel and message fails with an argument error.
Thanks in advance!

Connection handling enhancements

  • Allow connections to request sync connect behaviour and error out on connection error.
  • Implement a configurable exponential backoff on reconnect
  • Support configurable connection timeout for both sync and async connect

Improper support for wildcards in routing keys

It seems Lapin currently doesn't support wildcards in routing keys correctly.

Environment

  • Elixir 1.5.3
  • Lapin 0.3.1

Steps to reproduce bug

Given this configuration

config :lapin, :connections, [
  [
    uri: "amqp://mq",
    module: MessagePublisher,
    channels: [
      [
        role: :producer,
        exchange: "events",
        exchange_type: :topic,
        queue: "lapin_test",
        queue_durable: true,
        publisher_persistent: true,
        routing_key: "*"
      ]
    ]
  ]
]

And publisher:

defmodule MessagePublisher do
  use Lapin.Connection
end

Sending a message like this

MessagePublisher.publish("events", "foo", %{bar: :baz} |> Poison.encode!)

results in an error message:

{:error, "Error publishing message: no channel for exchange 'events' with routing key 'foo'"}

Result expected

Message publishing should have occurred without any problem as the routing key set when publishing message should have matched the wildcard used in the binding.

The typespecs of some callbacks of `Lapin.Connection` may be improper

For example, the callback handle_deliver is specified as the following:

@callback handle_deliver(Channel.t(), Message.t()) :: on_deliver

But if we look at the code where it is invoked:

defp consume(
module,
%Consumer{pattern: pattern} = consumer,
%Message{
meta: %{delivery_tag: delivery_tag, redelivered: redelivered} = meta,
payload: payload
} = message
) do
with ack <- pattern.ack(consumer),
payload_for <- module.payload_for(consumer, message),
content_type <- Payload.content_type(payload_for),
meta <- Map.put(meta, :content_type, content_type),
{:ok, payload} <- Payload.decode_into(payload_for, payload),
message <- %Message{message | meta: meta, payload: payload},
:ok <- module.handle_deliver(consumer, message) do

We will find that its first parameter should be a Lapin.Consumer.t().

I guess that all types of the Lapin.Connection callback parameters that are now AMQP.Channel.t() need to be corrected.

Sensitive data are logged by default

When a connection occurs, Lapin outputs all fields in its configuration, even username and password.
Sensitive data like these should be obfuscated

[error] Connection error: unknown_host for [port: 5672, uri: :pop, module: EventStore.MessagePublisher, 
channels: [
[
  exchange_type: :topic, 
  exchange: "events", 
  queue: "event_store", 
  queue_durable: true, 
  routing_key: "*", 
  publisher_persistent: true, 
  consumer_ack: true, 
  role: :producer
], 
[
  exchange_type: :topic, 
  exchange: "events", 
  queue: "event_store", 
  queue_durable: true, 
  routing_key: "*", 
  publisher_persistent: true, 
  consumer_ack: true, 
  role: :consumer]
], 
password: "G4UyiJx6", 
username: "8014da67eeb9b027c7a79f473c2a5098", 
virtual_host: "/", 
host: 'mq'
], backing off for 1000

(Note: output is idented for readability)

Connection handling

Currently connection establishment is synchronous and badly handled. If a broker is unavailable the startup process just hangs.

Possibly investigate the use of the Connection library to implement a robust async connection with backoff mechanism.

Lapin rejecting messages

Hello everyone,

I'm trying to get a message from RabbitMQ with Lapin, but he still rejecting my messages.

Trace

16:46:04.174 [error] Rejected message 1: %Poison.ParseError{pos: 0, rest: nil, value: 
%Lapin.Message{meta: %{app_id: :undefined, cluster_id: :undefined, consumer_tag: 
"amq.ctag-CRhlrygmoOi-AnGydwdCXw", content_encoding: :undefined, content_type: nil, 
correlation_id: :undefined, delivery_tag: 1, exchange: "", expiration: :undefined, headers: [], 
message_id: :undefined, persistent: false, priority: :undefined, redelivered: false, reply_to: 
:undefined, routing_key: "credit", timestamp: :undefined, type: :undefined, user_id: 
:undefined}, payload: "{\"userId\": \"844e9ac6-3e39-45d1-bf57-84fc3395a479\", \"planId\": 
\"af6253b4-f4bb-4f77-b731-ba7502a39d92\"}"}}

My config

config :lapin, :connections, [
  [
    module: Worker,
    uri: "amqp://guest:guest@queue",
    channels: [
      [
        role: :consumer,
        queue: "credit",
        exchange: "credit",
        routing_key: "credit"
      ]
    ]
  ]
]

And finally, my file with the handle_deliver

defmodule Worker do
  use Lapin.Connection
  use Application
  require Logger

  def start(_type, _args) do
    Worker.Repo.start_link
  end
  
  def handle_deliver(_channel, message) do
    json = message |> Poison.decode!

    plan = Worker.Repo.get Worker.Schema.Plan, json["planId"]
    user = Worker.Repo.get Worker.Schema.User, json["userId"]
  

    for val <- 0..plan.duration do
      initial_date = Date.utc_today
      end_date = get_end_date(initial_date, val, plan.duration, plan.credits_periodicity)
      
      credit = %Worker.Schema.Credit{
        id: Ecto.UUID.generate(),
        used: false,
        start_date: initial_date,
        finish_date: end_date,
        user_id: user.id,
        created_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second),
        updated_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
      }

      Worker.Repo.insert! credit

      :ok
    end
  end

  defp get_end_date(initial_date, val, duration, periodicity) when duration != periodicity do
    initial_date 
    |> Date.add(periodicity+val)
  end
  
  defp get_end_date(initial_date, _val, duration, periodicity) when duration == periodicity do
    initial_date 
    |> Date.add(periodicity)
  end
end

Warning on Lapin.Connection.consume/3 on Elixir 1.17.2 Erlang 27.0.1

Compiling leads to no warnings using the workaround for OTP 27 but handling a message while the app is running leads to the following warning

warning: using module.function() notation (with parentheses) to fetch map field :payload is deprecated, you must remove the parentheses: map.field
  (myapp 0.1.0) lib/myapp/worker.ex:7: Myapp.Worker.handle_deliver/2
  (lapin 1.0.7) lib/lapin/connection.ex:326: Lapin.Connection.consume/3

Can't spawn up multiple connections

This is my configuration

config :lapin, :connections, [
  [
    module: consumer,
    uri: System.get_env("AMQP_URL"),
    channels: [
      [
        role: :consumer,
        pattern: Lapin.Pattern.Config,
        exchange: exchange,
        queue: queue,
        routing_key: queue,
        exchange_type: :topic,
        consumer_ack: true,
        consumer_prefetch: 1,
        queue_arguments: [
          {"x-dead-letter-exchange", exchange_dlx},
          {"x-dead-letter-routing-key", queue_dlx}
        ]
      ],
      [
        role: :producer,
        pattern: Lapin.Pattern.Config,
        exchange: exchange,
        queue: queue,
        routing_key: queue,
        exchange_type: :topic,
        consumer_ack: true,
        consumer_prefetch: 1,
        queue_arguments: [
          {"x-dead-letter-exchange", exchange_dlx},
          {"x-dead-letter-routing-key", queue_dlx}
        ]
      ]
    ]
  ],
  [
    module: another_consumer,
    uri: System.get_env("AMQP_URL"),
    channels: [
      [
        role: :consumer,
        pattern: Lapin.Pattern.Config,
        exchange: exchange,
        queue: queue,
        routing_key: queue,
        exchange_type: :topic,
        consumer_ack: true,
        consumer_prefetch: 1,
        queue_arguments: [
          {"x-dead-letter-exchange", exchange_dlx},
          {"x-dead-letter-routing-key", queue_dlx}
        ]
      ]
    ]
  ]
]

This one comes with the error duplicated id Lapin.Connection found in the supervisor specification, please explicitly pass the :id option when defining this worker/supervisor.

After some digging in, I found the issue may be this. It should provide id for each worker

AMQP URI in configuration

We only have the ability to set the amqp url in the configuration. This means that we can't specify the host, port, virtual_host, username and password but just a simple uri. It would be nice to be able to set a uri as part of configuration.

I've also opened pma/amqp#77 to make it easy to pass in the full configuration along with a uri.

Add protocol for payload handling

Message payload could be handled with a protocol which could be used to encode/decode the data and set relevant message properties (like content-type and headers).

This way users could provide custom payload structs and protocol implementations.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.