Coder Social home page Coder Social logo

erlbus's Introduction

Erlbus

Message/Event Bus written in Erlang.

CI

The PubSub core is a clone of the original, remarkable, and proven Phoenix PubSub Layer, but re-written in Erlang.

A new way to build soft real-time and high scalable messaging-based applications, not centralized but distributed!

See the online documentation.

Introduction

Erlbus is a simple and lightweight library/tool to build messaging-based applications.

Erlbus PubSub implementation was taken from Phoenix Framework, which provides an amazing, scalable and proven PubSub solution. In addition to this, Erlbus provides an usable and simpler interface on top of this implementation.

You can read more about the PubSub implementation HERE.

Installation

Erlang

In your rebar.config:

{deps, [
  {ebus, "0.3.0", {pkg, erlbus}}
]}.

Elixir

In your mix.exs:

def deps do
  [
    {:ebus, "~> 0.3", hex: :erlbus}
  ]
end

Getting Started

Assuming you have a working Erlang installation (18 or later), building erlbus should be as simple as:

$ git clone https://github.com/cabol/erlbus.git
$ cd erlbus
$ make

Quick Start Example

Start an Erlang console with erlbus running:

make shell

Once into the erlang console:

% subscribe the current shell process
ebus:sub(self(), "foo").
ok

% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>

% subscribe spawned PID
ebus:sub(Pid, "foo").
ok

% publish a message
ebus:pub("foo", {foo, "hi"}).
ok

% check received message for Pid
ebus_proc:messages(Pid).
[{foo,"hi"}]

% check received message for self
ebus_proc:messages(self()).
[{foo,"hi"}]

% unsubscribe self
ebus:unsub(self(), "foo").
ok

% publish other message
ebus:pub("foo", {foo, "hello"}).
ok

% check received message for Pid
ebus_proc:messages(Pid).
[{foo,"hi"},{foo,"hello"}]

% check received message for self (last message didn't arrive)
ebus_proc:messages(self()).
[{foo,"hi"}]

% check subscribers (only Pid should be in the returned list)
ebus:subscribers("foo").
[<0.57.0>]

% check topics
ebus:topics().
[<<"foo">>]

% subscribe self to other topic
ebus:sub(self(), "bar").
ok

% check topics
ebus:topics().
[<<"bar">>,<<"foo">>]

% publish other message
ebus:pub("bar", {bar, "hi bar"}).
ok

% check received message for Pid (last message didn't arrive)
ebus_proc:messages(Pid).
[{foo,"hi"},{foo,"hello"}]

% check received message for self
ebus_proc:messages(self()).
[{foo,"hi"},{bar,"hi bar"}]

Note:

You may have noticed that is not necessary additional steps/calls to create/delete a topic, this is automatically handled by ebus, so you don't worry about it!

Now, let's make it more fun, start two Erlang consoles, first one:

erl -name [email protected] -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config

The second one:

erl -name [email protected] -setcookie ebus -pa _build/default/lib/*/ebin -s ebus -config test/test.config

Then what we need to do is put these Erlang nodes in cluster, so from any of them send a ping to the other:

% From node1 ping node2
net_adm:ping('[email protected]').
pong

Excellent, we have both nodes in cluster, thanks to the beauty of Distributed Erlang. So, let's repeat the above exercise but now in two nodes.

In the node1 create a handler and subscription to some topic:

% create a callback fun to use ebus_proc utility
CB1 = fun(Msg) ->
  io:format("CB1: ~p~n", [Msg])
end
#Fun<erl_eval.6.54118792>

% other callback but receiving additional arguments,
% which may be used when message arrives
CB2 = fun(Msg, Args) ->
  io:format("CB2: Msg: ~p, Args: ~p~n", [Msg, Args])
end.
#Fun<erl_eval.12.54118792>

% use ebus_proc utility to spawn a handler
H1 = ebus_proc:spawn_handler(CB1).
<0.70.0>
H2 = ebus_proc:spawn_handler(CB2, ["any_ctx"]).
<0.72.0>

% subscribe handlers
ebus:sub(H1, "foo").
ok
ebus:sub(H2, "foo").
ok

Repeat the same thing above in node2.

Once you have handlers subscribed to the same channel in both nodes, publish some messages from any node:

% publish message
ebus:pub("foo", {foo, "again"}).
CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"
ok

And in the other node you will see those messages have arrived too:

CB1: {foo,"again"}
CB2: Msg: {foo,"again"}, Args: "any_ctx"

Let's check subscribers, so from any Erlang console:

% returns local and remote subscribers
ebus:subscribers("foo").
[<7023.67.0>,<7023.69.0>,<0.70.0>,<0.72.0>]

You can also check the tests for more examples about using ebus.

So far, so good! Let's continue!

Point-To-Point Example

The great thing here is that you don't need something special to implement a point-to-point behavior. It is as simple as this:

ebus:dispatch("topic1", #{payload => "M1"}).

Dispatch function gets the subscribers and then picks one of them to send the message out. You can provide a dispatch function to pick up a subscriber, otherwise, a default function is provided (picks a subscriber random).

Dispatch function comes in 3 different flavors:

  • ebus:dispatch/2: receives the topic and the message.
  • ebus:dispatch/3: receives the topic, message and a list of options.
  • ebus:dispatch/4: same as previous but receives as 1st argument the name of the server, which is placed by default in the other functions.

Dispatch options are:

  • {scope, local | global}: allows you to choose if you want to pick a local subscriber o any. Default value: local.
  • {dispatch_fun, fun(([term()]) -> term())}: function to pick up a subscriber. If it isn't provided, a default random function is provided.

To see how this function is implemented go HERE.

Let's see an example:

% subscribe local process
ebus:sub(self(), "foo").
ok

% spawn a process
Pid = spawn_link(fun() -> timer:sleep(infinity) end).
<0.57.0>

% subscribe spawned PID
ebus:sub(Pid, "foo").
ok

% check that we have two subscribers
ebus:subscribers("foo").
[<0.57.0>,<0.38.0>]

% now dispatch a message (default dispatch fun and scope)
ebus:dispatch("foo", #{payload => foo}).
ok

% check that only one subscriber received the message
ebus_proc:messages(self()).
[#{payload => foo}]
ebus_proc:messages(Pid).
[]

% dispatch with options
Fun = fun([H | _]) -> H end.
#Fun<erl_eval.6.54118792>
ebus:dispatch("foo", <<"M1">>, [{scope, global}, {dispatch_fun, Fun}]).
ok

% check again
ebus_proc:messages(self()).
[#{payload => foo}]
ebus_proc:messages(Pid).
[<<"M1">>]

Extremely easy isn't?

Distributed Erlbus

Erlbus is distributed by nature, it doesn't require any additional/magical thing.

Once you have an Erlang cluster, messages are broadcasted using PG2, which is the default PubSub adapter. Remember, it's a Phoenix PubSub clone, so the architecture and design it's the same.

Phoenix Channels are supported on PubSub layer, which is the core. Take a look at this blog post.

Important links

Running Tests

make test

Building docs

make docs

Note: Once you run previous command, a new folder doc is created, and you'll have a pretty nice HTML documentation.

Change Log

All notable changes to this project will be documented in the CHANGELOG.md.

Copyright and License

Original work Copyright (c) 2014 Chris McCord

Modified work Copyright (c) 2016 Carlos Andres Bolaños

Erlbus source code is licensed under the MIT License.

NOTE: Pub/Sub implementation was taken from Phoenix Framework.

erlbus's People

Contributors

cabol avatar ferigis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

erlbus's Issues

Add scalability tests

Add scalability tests:

  • Default: ebus with pg2
  • ebus with riak_core and gproc local

Implement distributed tests

  • Spawn a set of slave nodes, and put them in cluster
  • Start erlbus on all nodes
  • Start a set of test cases across all of them

Compatibility with OTP 24

I am not able to compile erlbus with OTP 24 since PG2 module is deprecated in OTP 23 and removed in OTP 24 and replaced with PG module...

erlang:get_stacktrace() stop working on v21

The use of erlang:get_stacktrace() in here

error:Value ->
Reason = {Value, erlang:get_stacktrace()},
exit(Info, MFA, Reason, Reason);
throw:Value ->
Reason = {{nocatch, Value}, erlang:get_stacktrace()},
exit(Info, MFA, Reason, Reason);
exit:Value ->
exit(Info, MFA, {Value, erlang:get_stacktrace()}, Value)
is failing. Should be modified to use this syntax:

try
    apply(Module, Fun, Args)
  catch
    _Class:Reason:_Stacktrace ->
      exit(Info, MFA, Reason, Reason);
   % ...
  end.

function for deleting channel

I tried to look around in the code base but couldn't find any function for deleting a channel. Did I miss it or is it not there ?

Fix compatibility with Erlang/OTP 18

I am using Erlang/OTP 18. When I try to make from /erlbus folder I get this error:
...
andler.erl ebus_dist_ring_event_handler.erl ebus_global_sup.erl ebus_gproc.erl ebus_handler.erl ebus_pg2.erl ebus_sup.erl ebus_util.erl ebus_worker.erl
compile: warnings being treated as errors
src/ebus_dist.erl:139: erlang:now/0: Deprecated BIF. See the "Time and Time Correction in Erlang" chapter of the ERTS User's Guide for more information.
make[1]: *** [ebin/ebus.app] Error 1
make: *** [all] Error 2
...

but it works fine if I build an app with erlbus as a dependence.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.