Coder Social home page Coder Social logo

msgflo / msgflo Goto Github PK

View Code? Open in Web Editor NEW
141.0 13.0 13.0 5.33 MB

Distributed Flow-Based Programming via message queues

Home Page: https://msgflo.org

License: MIT License

JavaScript 0.38% CoffeeScript 89.86% Ruby 0.05% HTML 3.51% CSS 6.19%
mqtt fbp flowhub dataflow amqp pubsub distributed fbp-runtime iot-platform

msgflo's Introduction

MsgFlo - Flow-Based Programming with Message Queues Build Status

Implementation of the Flow-Based Programming using message queues as the communications layer between different processes. Initial message queue transports targeted are AMQP and MQTT.

MsgFlo lets you build robust polyglot FBP systems spanning multiple computers/devices. A node can be implemented in any language, to reuse existing code, libraries and developer know-how.

In FBP each component is a black-box that processes and produces data, without knowledge about where the input data comes from, or where the output data goes. This ensures that a service is easy to change, and facilitates automated testing.

MsgFlo is designed to enable partial and gradual integration into existing systems; by using standard broker/transports, not placing restrictions on message payloads, allowing to use existing queue names, and integrating non-MsgFlo nodes seamlessly.

Status

In Production

  • Used in production at TheGrid website builder, with AMQP/RabbitMQ. 20 roles, 1'000'000 jobs/weekly+
  • Used in production in imgflo image processing server. 4 roles, 200'000 jobs/weekly+
  • Used for IoT networks at hackerspaces c-base and Bitraf, using MQTT/Mosquitto.

Client support

Tooling

  • msgflo executable implements the FBP runtime protocol.
  • Initial support for automated testing using fbp-spec
  • Experimental support for visually building networks using Flowhub
  • guv provides autoscaling of workers when using Heroku/AMQP.

Licence

MIT, see ./LICENSE

Documentation

Please refer to https://msgflo.org

Support

Flowhub logo

MsgFlo is a part of Flowhub, a platform for building robust IoT systems and web services.
We offer an Integrated Development Environment and consulting services.

Debugging

The msgflo executable, as well as the transport/participant library uses the debug NPM module. You can enable (all) logging using:

export DEBUG=msgflo*

msgflo's People

Contributors

bergie avatar chadrik avatar greenkeeper[bot] avatar jonnor avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

msgflo's Issues

FBP participant declarations for foreign nodes

In order to facilitate use of foreign MQTT nodes (and similar systems) which don't have native support for the msgflo FBP discovery messages, the Coordinator should support providing the FBP participant declarations on behalf of nodes.

The definitions would basically be same as for msgflo.Participant, but would contain a matching rule containing a queue/topic. It should be possible to write these as YAML/json, and specify a group of them at once.

The Coordinator (using a helper?) will then listen for data on topics, check against registered rules, and emit the corresponding FBP participant declaration on matches. Since most MQTT nodes send updates periodically, should work pretty well.

Coordinator: Crashes when --graph contains participants without components

The graph loading code expects that it must, and can, set up all the participants based on components available in the Library. However for participants running on other devices (but which there is no foreign participant registration for) this is not the case.
There is no component available, and the coordinator does not influence whether it runs or not...

       throw new Error("No component " + component + " defined for role " + role);
        ^

Error: No component Prusa I3 3D printer defined for role 3D-printer
  at Library.componentCommand (/home/msgflo/bitraf-iot/node_modules/msgflo/src/library.coffee:169:15)
  at participantCommands (/home/msgflo/bitraf-iot/node_modules/msgflo/src/setup.coffee:74:19)
  at /home/msgflo/bitraf-iot/node_modules/msgflo/src/setup.coffee:286:18
  at /home/msgflo/bitraf-iot/node_modules/msgflo/src/library.coffee:124:14
  at /home/msgflo/bitraf-iot/node_modules/msgflo/src/library.coffee:79:14
  at FSReqWrap.oncomplete (fs.js:82:15)

Bridges to other protocols

Should be documented somewhere proper, but here is good for now. Feel free to add comments!

Existing for MQTT transport

  • mqttwarn has tons of protocol bridges, including many mobile and deskop notification systems, as well as data stores, from InfluxDB, Postgres,... to google docs. Also HTTP, AMQP etc
  • mqtt-launcher allows. Similar to #50
  • DBus. dbus-mqtt-bridge
  • Phillips Hue 1 2 3
  • OSC (Open Sound Control).
    osc2mqtt. Especially nice due to mobile apps like TouchOSC, which provides a range of nice generic touch-interfaces.
  • SmartThings
  • mqtt-bridge. Several databases, including AWS Kinesis, Graphite
  • HTTP GET/POST mqtt messages, mqtt-http-bridge
  • serialport. mqtt-serial

Existing for AMQP

  • ?

User interface tools

  • MQTT Widget, Android app providing a set of generic widgets which send on MQTT. Proprietary.

Wanted

  • 433 Mhz, for clocks/weatherstations/powercontrols
  • MPD (Music Player Daemon), for controlling music
  • HTTP Webhook -> MQTT/AMQP message
  • HID (Human Interface Device). For mouse/keyboard/joysticks etc. Supported on USB/BT/nrf24
  • Bluetooth Media Controls
  • MIDI (over USB or serial). Both directions, Using keyboards/sqeuencers as input devices, andusing synthesizers/samplers as sound generators.
  • ? add yours

Other frameworks

Allow introspecting number of messages in queue

Useful to detect stalls, rough estimate of system load and bottlenecks.

Requires new APIs in Transport. Acceptable that they are polling based?

Would be provided as edge metadata in Flowhub, so it can be visualized there?

register-foreign: Allow to dynamically replace #ROLE

For NoFlo and other MsgFlo participants, we have one entry per 'component'. Each of them can be then be instantiated multiple times (different 'roles', or node/process in .FBP), and we substitute #ROLE in the command that is actually used to start the participant.

To enable the same for foreign participant, we need to be able to replace #ROLE in the declared queue. msgflo-register-foreign should take a --name foo or --role foo and should do this replacement in config file before sending the MsgFlo definitions.

Supporting different delivery modes for connections

Right now messages will be correctly sent between Participants when they have queues/topics which are matching. Currently the msgflo Coordinator does not consider this to be an "connected" edge though...

On AMQP one can create a fanout exchange for every outport, and then set up bindings from these to the inports the outport is connected to.
Such a exchange will likely be needed for the Coordinator to be able to observe data going though the network, for debugging in Flowhub (using FBP runtime network:data messages). This because with a direct exchange, like the default, messages are only delivered to one consumer (in round-robin fashion).

Problem is that MQTT brokers does not support "binding" topics to eachother, so something else needs to perform that operation. The msgflo Coordinator could perform this, but this makes it required for normal functioning of the system (and not just debug/reprogramming).
In practical scenarios this might be a non-issue though, as the Coordinator can probably run on the same machine as the Broker.

In general there are two ways to change the connections between particpants,
complicating the architecture a bit:

    1. ask the Coordinator to re-bind the output->input bindings
    1. request the Participant to change its queues/topic. (not implemented, messages specced)

From the perspective of the running system, these should be equivalent.

One could say that for participants to know where they are sending mesages is somewhat breaking the "loose binding" concept, but for MQTT that is the only possibility which does not require more than clients+broker at runtime.

msgflo-procfile: Fail if there is no web role

web is special in Heroku, it is the HTTP frontend which a service must have.
The procfile generation should fail if this is not present, as such a service will be broken (not run). This would allow detecting it at CI/integration time instead of after being deployed...

Support IIPs

Since IIPs are a network/graph level construct the individual participants should not need to know about it. This raises some challenges. Participants can come and go (crashing, intended stop), and each new instance must receive the IIP(s).
Meaning, it is not enough to send it to a queue shared by all participants in a role...
The FBP graph format also has an order of IIPs. This becomes very hard to preserve when lifetimes of participants vary this much. Probably acceptable to drop this requirement though.

There is some code in Coordinator with some IIP handling, but it should probably move into setup somehow because one might need it during normal execution of a network.
Probably needs to continiously listen for FBP participant changes, and when a new participant is discovered it must (re)-send the IIP.

Support binding deadletter queue in FBP graph

When deadlettering is enabled, synthesize an outport name 'dead_$inport' with the deadletter queue set.
Will require little bit of work in msgflo-nodejs and noflo-runtime-msgflo.
Then it can be connected using the normal syntax DEAD_IN -> IN recover(BeefyWorker).

systemd .service unit file generator

We should be able to generate .service unit files for systemd, that can start MsgFlo participant(s).

Similar to to how we can generate Procfile using msgflo-procfile for starting participants on Heroku.

Useful when hosting participants directly on a Linux machine, be it a embedded device or a VM.

Docker compose file generator

Similarly to the msgflo-procfile tool we have for generating Heroku Procfiles from a graph, we should have a tool to generate/update a docker-compose.yml file.
Format reference https://docs.docker.com/compose/compose-file/#/command

Requirements

Since the file is YAML and with a well-defined format, we should modify the file, instead of generating it. That way people can use all the fancy features without this tool having to care. If the file does not exist, we can create the basic one.
The file to use should be specifiable, for instance with --file mycompose.yml.

Initially we can assume that each participant in the graph uses the same base Docker image. --image.

For each of the participants defined in the graph we should ensure there exists a service in the Dockerfile, with the name matching the participant role, and the command is set to the command gotten from MsgFlo library, and the image set to that of the specified base image.

There should be --ignore for specifying roles which there should not be generated a service. Non-participant nodes, like msgflo/PubSub should always be ignored.

There should be a --keep option for telling the tool about existing services which should be kept as-is. Other services which are neither in keep nor defined in FBP graph should be removed (they are probably leftover from an older graph definition).

Upstart config file generator

Upstart is still in use on for instance RHELv5 and Ubuntu LTS systems, as well as official Debian used on Raspberry PI and Beaglebone.

Related: systemd generator #20

msgflo-setup: Respect queue mappings in FBP participant discovery messages

Currently we assume the msgflo.Participant convention of role.PORT which is not generally true, especially with dummy participants, or where one cannot change the queue names for compatibility reasons, or not having access to the existing system.

To make this work msgflo-setup needs to start up, read the graph, check existing and/or wait for participant discovery messages, look up the mappping beween node/port names and queue names, and only then do the actual queue bindings.

We have some code in the Coordinator that does this kind of setup. This code should in general move down into 'setup'. To be able to be used from Flowhub, the coordinator should reuse the same code.

Make NoFlo graphs/components show up as components automatically

Using the handlers concept, the files under participants/ (with supported extensions) show up automatically as MsgFlo components, which can then be instantiated as participants.

However NoFlo components and graphs are by convention stored in components/*.{js,coffee} and graphs/*.{fbp,json} respectively, so they are not picked up.
Perhaps we should extend the handler concept to allow rules to match not just on extension, but path with wildcards?

"components/*.js": "noflo-runtime-msgflo ...."
"components/*.coffee': "noflo-runtime-msgflo ...."
"graphs/*.coffee': "noflo-runtime-msgflo ...."
"graphs/*.js': "noflo-runtime-msgflo ...."

Shell script support

For quick & dirty work, for instance when prototyping, it can be very useful to have a MsgFlo participant based on shell scripts, and mostly calling existing programs.

This can be done fairly simply in Python or Node.js, but one always needs to write some custom code, and this "host" wrapper code obscures the real meat; the commandline/script that is being run.
One could imagine a generic msgflo-script which would take on the commandline.

  1. A shell script snippet
  2. A string description of the component
  3. Standard configuration like role/broker
    It would then always expose an in port, and have outports out and error. The input message payload is provided as stdin. If the script (all commands in it) passes, the anything produced on stdout is send on out. Otherwise, stderr is sent on err.

If it is JSON input, script can for instance use jq to query https://stedolan.github.io/jq/ it

Setup needs to detect already-bound connections

There are situations where we have MsgFlo participants are already bound to each other. The setup should not touch those, but know that the connection exists.

Example:

Foreign MQTT participants that are directly connected to each other, like for example is with many things at c-base. We want to show these connections in the Flowhub graph, but not make the any changes to how the participants communicate.

Send number of participants as FBP message

Should be done by monitoring participants on the msgflo discovery queue. Requires participants to send discovery messages periodically (#30). Coordinator should then send some FBP protocol messages out.

Maybe keep this as graph node metadata?

Support access control on FBP protocol

Right now anyone who can find the address/port of the service can connect. Which is obviously unsafe, especially once supporting adding components, as these may execute arbitrary code.

When access control is enabled, we must check and enforce the secret which is passed on each message of the protocol.

Should be able to set a default access level (when there is no secret provided).
For instance all access, no access, read-only access.

Ability to start/stop participant remotely?

Would be needed for live-coding, adding/removing nodes in Flowhub.

Would need to handle both case of spawning something locally, and on a remote service like Heroku, which are quite different...
For local case: Could be a shell command to run?
For remote case: Could be an HTTP endpoint to call?

Authentication also a major issue...

Do we need to template in some parameters, like the role name, or broker URL?

Java support

Would be nice to have good support for Java, and modern languages that build ontop of Java/JVM like Scala, Kotlin and Closure.

This is a tracking issue for interest & efforts in that area.

Allow coordinator to set up participants

In order to get all the available participant types and their FBP data up, so that the component information can be conveied over to Flowhub over the FBP runtime protocol.

Support "reason" when dead-lettering

When messages are nacked in order to dead-letter them, it would be useful to provide a "reason" together with the rejection (for example, message of Error objects sent to NoFlo error ports).

From RabbitMQ DLX docs:

The dead-lettering process adds an array to the header of each dead-lettered message named x-death. This array contains an entry for each dead lettering event, identified by a pair of {queue, reason}.

AMQP: FBP discovery messages should be sent on fanout exchange

That way the messages will not crop up in an unused queue when there is no coordinator,
and there can be multiple consumers of the messages instead of just one.

Requires changes in all the participant libraries, the coordinator and to the documentation.

  • msgflo-nodejs
  • msgflo-python
  • noflo-runtime-msgflo
  • msgflo-cpp
  • msgflo-rust

amqp: Don't automatically create queues for outports

Problem is that no-one reads from them, and then we get messages that crop up indefintely.

If we can run .bindQueue() without the target queues existing then it should be just a matter of removing the queue creation in amqp.Client::createQueue. This is however not clear from the API docs, http://www.squaremobius.net/amqp.node/doc/channel_api.html
Might need to get creative in case that does not work.

Alternatively we drop the feature that in/outports with queue of matching names are automatically bound to eachother (would then have to use msgflo.setup)

Support for fbp-spec

https://github.com/flowbased/fbp-spec is a testing tool for FBP-runtimes. It would be great if MsgFlo supported it, so one can test multi-participant systems same way we now test individual FBP participants.

Should primarily require some more work on the FBP runtime protocol implementation in the 'coordinator'.

Coordinator: Component information not updated when changed

To reproduce:

  • Start a participant with some ports defined
  • Check which ports exist in component on FBP protocol
  • Stop the participant
  • Start participant anew with other ports defined

Expected: Component has the new port information

Actual: Component has the original port information

Coordinator: Cache discovery information

Should persist the information to disk, so that it can be recovered. This especially useful as a stop-gap until we enforce periodic heartbeats (#30). But also afterwards, as the heartbeats may come very seldom for some devices.

Persistence should probably be handled by Library?

AMQP bindings debugging tool

It would be useful to have a tool that shows bindings between queues, and possible unconnected ones, and ones no longer connected in the graph

Proxy FBP runtime messages from participants

noflo-runtime-msgflo now send/receives some FBP protocol messages (trace:start and trace:dump). But hopefully soon it will provide the whole protocol (noflo/noflo-runtime-msgflo#30).

If these were relayed via the MsgFlo coordinator and over WebSocket, then we should be able to go inside the MsgFlo participant and see/manipulate the internal NoFlo runtime from Flowhub - as if we were connected directly.

We probably need to add some runtime identification (probably UUID?) for each packet to the protocol, to be able to multiplex multiple across the same WebSocket connection.

Monitoring tool for queue traffic

If any of the participants in a Msgflo graph deadlocks, or for other reasons* stop to process/send messages, the system will generally fail to perform intended function.
By monitoring the queues that represent the output of the system, we could detect this.

msgflo-monitor --queue sensor.NEWVALUE --expect-periodic 1s --fail-missed 3
Process should probably exit (with non-zero code) on failure. Then one can use for instance in systemd OnFailure=myservice-restart.service to try to recover.
Alternatively it may be useful if it writes a single line on stdout, or can execute some command.

For systems that don't produce data periodically by nature, it is recommended that there is a process generating test message periodically.

To have a chain of trust, the msgflo-monitor program should ideally also be monitored. For instance by using the watchdog feature of systemd.

* crashing process and restarting it usually well handled by the service management already, be it Heroku or systemd (#20).

Allow to get and visualize queue length/data

Should allow to replace usage of the RabbbitMQ dashboard for monitoring (which is tedious for a full-scale system because it just shows queues/exchanges, without any relation to the FBP graph/program ).
In addition to visualization of current number of messages in queue, some indicators for 'abnormal' rates (especially too high) is desirable.

FBP runtime protocol requirements

One alternative is to split the current messages for connection data (network:data) into two messages, one for data going into the connection/queue and one for data going out of the queue. The data in the queue is then the difference between these. This models the queue length/capacity directly into core model. Right now, FBP runtimes with queue capacities != 1 has to decide whether to send network:data on enqueing or dequeing/delivering, and an FBP client (like Flowhub) has no way of knowing which one of these conventions are used...

Alternative is to have an extra, more metadata-like, message network:queuelength

Implementation

In msgflo-nodejs we should probably add a method to the transport.Broker interface for getting this kind of information. On AMQP/RabbitMQ this can be done using the RabbitMQ HTTP admin API. Example code in guv.
msgflo.Coordinator would read this data periodically and send FBP runtime protocol messages.

Detect bound queues in MQTT

With MQTT the setup/coordinator process is the one that binds queues/topics together, as the broker cannot provide this functionality. For showing whats-connected correctly in live mode we need to detect this.

There are two main approaches here:

  1. make Coordinator do all the setup, such that the same mqtt.Broker instance is used. Then we can just call .listBindings and make it.
  2. make mqtt.Broker (or rather routing.BinderMixin) send all its routing changes to a dedicated topic like /fbp/routing. It would also need to listen to changes to these, and 'listBindings' should return these. Would be useful also to introduce a bindings-changed event.

Coordinator/Runtime then need to subscribe to these changes and notify UI over the FBP runtime protocol.

Quotation issue with #IIPS when not using shell

If not using a shell, we get one quote too many around the JSON data so it becomes '{}' instead of {}.
In general, the non-shell codepath in setup.coffee is a bit crack, as it parses arguments based on (space) alone - which is bound to fuck things up if there are embedded spaces in JSON data, or component/role names etc.
Should use something like the parser from https://github.com/substack/node-shell-quote

Support schemas on participant input/output data

Primarly case is JSON schemas. The schema to use should be declared on the ports (ex: type: 'object', datatype: 'http://schemas.myservice/foo.json') and then the Participant should enforce this schema.

The information should also bubble up to Flowhub via the Msgflo discovery protocol, and then the FBP runtime protocol.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.