Coder Social home page Coder Social logo

conduit's Introduction

Conduit

build status

A messaging library designed to:

  • enable the creation of worker components that are isolated from the underlying messaging library.
  • enable reliable message publishing

Requirements

  • Java 1.8

pre-commit

  • Install: https://pre-commit.com/
  • running locally: This will also happen automatically before committing to a branch, but you can also run the tasks with pre-commit run --all-files

Usage

Project.clj

[com.farmlogs.conduit "0.1.0"]

Reliable Publishing

Create a reliable channel:

(require '[com.farmlogs.conduit.connection :as conn])
(require '[com.stuartsierra.component :as component])

(def system
  (component/start-system
   (component/system-map
    :rmq (conn/connection "amqp://guest:guest@localhost"))))
(def reliable-chan (-> system :rmq :conn (->reliable-chan 1000)))

Publish using the channel:

(a/<!! (p/publish! reliable-chan "hi!" {:exchange ""
                                        :routing-key "test"}))

Close the channel by using .close

(.close reliable-chan)

Reliable Consumers

Production

(require '[com.farmlogs.conduit.connection :as conn])
(require '[com.farmlogs.conduit.subscription :refer [subscription]])
(require '[com.stuartsierra.component :as component])
(require '[clojure.core.async :as a])

(defrecord LoggingWorker
      [subscription]
    component/Lifecycle
    (start [this]
      (let [ctrl-chan (a/chan)]
        (assoc this
               :ctrl-chan ctrl-chan
               :process
               (a/go
                 (loop []
                   (let [[[result-chan msg :as event]] (a/alts! [ctrl-chan subscription])]
                     (when-not (nil? event)
                       (println "msg:" msg)
                       (a/put! result-chan :ack)
                       (recur))))))))
    (stop [{:keys [ctrl-chan process] :as this}]
      (a/close! ctrl-chan)
      (a/<!! process)
      (dissoc this :ctrl-chan :process)))

(def system
    (-> (component/system-map
         :rmq (conn/connection "amqp://guest:guest@localhost")
         :subscription (component/using (subscription {:exchange-name "foo"
                                                       :queue-name "foo"
                                                       :exchange-type "topic"
                                                       :routing-key "*"}
                                                      1024)
                                        {:rmq-connection :rmq})
         :worker (component/using (->LoggingWorker nil)
                                  [:subscription]))
        (component/start-system)))

Testing Your Workers Without RMQ

(extend-protocol component/Lifecycle
  clojure.core.async.impl.channels.ManyToManyChannel
  (start [this] this)
  (stop [this] this))

(def system
  (-> (component/system-map
        :subscription (a/chan)
        :worker (component/using (->LoggingWorker nil)
                                 [:subscription]))
      (component/start-system)))

(let [result-chan (a/chan 1)]
  (a/put! (:subscription system) [result-chan "Heya!"])
  (= (a/<!! result-chan) :ack))

(component/stop-system system)

License

Copyright © 2015 AgriSight, Inc

conduit's People

Contributors

briprowe avatar fraburnham avatar derick-hess avatar

Stargazers

Łukasz Korecki avatar

Watchers

 avatar Rob Putnam avatar James Cloos avatar Jesse Vollmar avatar Lance Sandino avatar Jeff Monschke avatar Damian Wieczorek avatar Jake  Lopez avatar John Shriver avatar  avatar

conduit's Issues

Connection recovery

java.net.SocketException: Connection reset
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113) ~[na:1.8.0_66-internal]
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153) ~[na:1.8.0_66-internal]
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) ~[na:1.8.0_66-internal]
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) ~[na:1.8.0_66-internal]
    at java.io.DataOutputStream.flush(DataOutputStream.java:123) ~[na:1.8.0_66-internal]
    at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:150) ~[uberjar-standalone.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:518) ~[uberjar-standalone.jar:na]
    at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:125) ~[uberjar-standalone.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:334) ~[uberjar-standalone.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:310) ~[uberjar-standalone.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:303) ~[uberjar-standalone.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicReject(ChannelN.java:1057) ~[uberjar-standalone.jar:na]
    at com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN.basicReject(RecoveryAwareChannelN.java:72) ~[uberjar-standalone.jar:na]
    at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicReject(AutorecoveringChannel.java:362) ~[uberjar-standalone.jar:na]
    at langohr.basic$reject.invokePrim(basic.clj:214) ~[uberjar-standalone.jar:na]
    at com.farmlogs.conduit.subscription.ack_process$fn__17270.invoke(ack_process.clj:21) ~[uberjar-standalone.jar:na]
    at com.farmlogs.conduit.protocols$fn__7948$G__7943__7957.invoke(protocols.clj:3) ~[uberjar-standalone.jar:na]
    at com.farmlogs.conduit.subscription.ack_process$__GT_responder$fn__17274$fn__17279.invoke(ack_process.clj:32) ~[uberjar-standalone.jar:na]
    at com.farmlogs.conduit.subscription.ack_process$__GT_responder$fn__17274.invoke(ack_process.clj:32) [uberjar-standalone.jar:na]
    at clojure.core.async$thread_call$fn__5602.invoke(async.clj:434) [uberjar-standalone.jar:na]
    at clojure.lang.AFn.run(AFn.java:22) [uberjar-standalone.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_66-internal]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_66-internal]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66-internal]

That is the first exception observed. After this point the queue is deleted from rmq (auto-delete true), work is lost and restarting the worker appears to be the only way to recover. The expectation is that rmq will recover without needed to restart the worker system.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.