Coder Social home page Coder Social logo

squeedo's Introduction

Squeedo

Squeedo: The sexiest message consumer ever (โ„ข)

This library allows you to quickly spin up a message consumer and worker pool using core.async. The message is returned in its raw format, so it's up to the caller to determine the proper reader for their purposes.

Latest version

Clojars Project

Changes

Version 1.0.0 features breaking changes to queue connection and configuration. See CHANGELOG for release notes.

Inspiration

Squeedo's inspiration came from our continual need to quickly process lots of messages from SQS. We found that the code to support these processes was quite similar in that it often involved a lot of plumbing of listening to SQS, pulling messages, processing them with some kind of threadpool and then acking them back. The goal was to make this easier by somehow simply passing a compute function that would handle all of the plumbing and allow us to focus on the compute logic of what to do with the message.

After several iterations of basing this plumbing code on threadpools, we quickly found that we couldn't get the kind of throughput we wanted simply by tuning only the number of threads. We needed something more dynamic, that would adapt better to the number of cores it ran on and would squeeze every last bit of CPU from our EC2 instances. After reading this blog post http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io we were inspired to use core.async and Squeedo was born.

Why use Squeedo?

Where Squeedo shines is in its ability to push CPU utilization to the max without managing a threadpool. It is especially good when combined with an non-blocking I/O web client library like http-kit as mentioned in the blog above.

Simple quick start

In its simplest form, Squeedo is composed of only 2 parts, a compute function and a consumer.

(require '[com.climate.squeedo.sqs-consumer :refer [start-consumer stop-consumer]]
         '[clojure.core.async :refer [put!]])

;;the compute function that takes a message and a channel to ack or nack on when done with the message
(defn compute
  [message done-channel]
  (println message)
  ;; never limit or use the blocking IO calls here, use http-kit for these calls
  (put! done-channel message))

(def consumer (start-consumer "my-sqs-queue" compute))

;;when done listening
(stop-consumer consumer)

The compute function must post to the done-channel even for exceptions. This will ack/nack each message. Squeedo listens to the done-channel to know when work is complete so it can pass your compute function another message to process.

To nack a message (i.e., send update the visibility timeout for the message in SQS, so that it will be reporcessed again later, up until the configured MaxReceives for your queue), put the message back on the done-channel with a :nack key. You can specify a value of true to use the default visibility timeout of 0 (retry this message as soon as possible) or you can give an integer value in seconds.

;; retry as soon as possible
(put! done-channel (assoc message :nack true))
;; retry in about 10 seconds
(put! done-channel (assoc message :nack 10))

http-kit example with non-blocking IO

(require '[com.climate.squeedo.sqs-consumer :refer [start-consumer stop-consumer]]
         '[org.httpkit.client]
         '[clojure.core.async :refer [go >!]])

(defn- eat-some-cpu
  [how-much]
  (reduce + (range 1 how-much)))

(defn- async-get
  [url message channel]
  (org.httpkit.client/get url (fn [r] (go
                                        ; do some more processing with the response
                                        (eat-some-cpu 1000000)
                                        (>! channel message)))))

(defn compute
  [message done-channel]
  ; do something expensive
  (eat-some-cpu 1000000)
  ; do this if you will have I/O
  (async-get "http://google.com" message done-channel))

(def consumer (start-consumer "my-sqs-queue" compute :num-listeners 10 :max-concurrent-work 50))

;;when done listening
;; (stop-consumer consumer)

Usage in Jetty based Ring app

Typical project.clj configuration to setup the servlet context listener hooks:

(defproject com.awesome/microservice "0.0.1"

  :ring
    {:init initialize!
     :destroy destroy!}
(defonce consumer (atom {}))

(defn- compute-fn
  [message done-channel]
  ;; do something
  (put! done-channel message))

(defn initialize!
  "Call `initialize!` once to initialize global state before serving. This fn is
   invoked on servlet initialization with zero arguments"
  []
  (swap! consumer
         merge
         (start-consumer queue-name compute-fn :max-concurrent-work 10)))

(defn destroy!
  "Destroy the Jetty context and stop the SQS consumer"
  []
  (stop-consumer @consumer)
  (reset! consumer {}))

Advanced configurations options

One of the great things about Squeedo is the advanced configuration options that can be used to tune the consumer to your workflow beyond what the very reasonable defaults do out of the box.

  • :message-channel-size - the number of messages to prefetch from SQS; default 20 * num-listeners. Prefetching messages allow us to keep the compute function continuously busy without having to wait for more to be first pulled from the remote SQS queue. Make sure to set the timeout appropriately when you create the queue.
  • :num-workers - the number of workers processing messages concurrently. This controls how many workers actually process messages at any one time. (defaults to number of CPU's - 1 or 1 if single core). Squeedo works best with 2 or more CPU's. This is not the amount of work that can be outstanding at any one time, that is controlled below with :max-concurrent-work.
  • :num-listeners - the number of listeners polling from SQS. default is (num-workers /dequeue-limit) since each listener dequeues up to dequeue-limit messages at a time. If you have a really fast process, you can actually starve the compute function of messages and thus need more listeners pulling from SQS.
  • :dequeue-limit - the number of messages to dequeue at a time; default 10
  • :max-concurrent-work - the maximum number of total messages processed concurrently. This is mainly for async workflows where you can have work started and are waiting for parked IO threads to complete; default num-workers. This allows you to always keep the CPU's busy by having data returned by IO ready to be processed. Its really a memory game at this point -- how much data you can buffer that's ready to be processed by your asynchronous http clients.
  • :client - the SQS client used to start the consumer. By default an SQS client is created internally using instance credentials, but a client can be passed in to be used. This allows you to use a client you may have already created with some specific properties (e.g. manually overridden AWS credentials).
  • :exceptional-poll-delay-ms - the length of delay when a listener gets an exception while polling SQS (default is 10 seconds).

Additional goodies

Checkout the com.climate.squeedo.sqs namespace for extra goodies like configuring and connecting to queues, enqueuing and dequeuing messages, and acking and nacking.

Example of setting up a queue with custom attributes and a dead letter queue:

(require '[com.climate.squeedo.sqs :as sqs])

(defn initialize-my-queue
  []
  (sqs/configure-queue "my-queue"
                       :queue-attributes {"VisibilityTimeout" "60"}
                       :dead-letter "my-queue-failed"
                       :dead-letter-queue-attributes {"VisibilityTimeout" "120"})
  (sqs/mk-connection "my-queue"))

(def my-queue (initialize-my-queue))

Acknowledgments

License

Copyright (C) 2018 The Climate Corporation. Distributed under the Apache License, Version 2.0. You may not use this library except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

See the NOTICE file distributed with this work for additional information regarding copyright ownership. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

squeedo's People

Contributors

arcdragper avatar billga avatar danieltdt avatar ericturcotte avatar jmelching avatar jngbng avatar lainiewright avatar mtkp avatar paraseba avatar positron avatar stephencrampton avatar tommy avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

squeedo's Issues

Same Messages keep coming even after ack

Using the exact example from the README. Almost immediately after running

(put! done-channel message)

The same message shows back up with the {ApproximateReceiveCount=8 increasing.)

Create Queue Question

Hi.

I had a problem starting a project using this library on AWS which was caused by the fact the role on the EC2 instance didn't have the sqs:CreateQueue permission. The queues already existed and are managed with Terraform, so I don't want to give this permission out to the roles. I see at the moment this seems to be because it relies on the fact the Bandalore library provides create-queue which returns true if it already exists (but still required the CreateQueue permission).

What are your thoughts on this, would you be open to a PR checking the queues existence first? Same for dead-letter queue.

Cheers

Unable to execute HTTP request: peer not authenticated

We are getting the following randomly:

[async-dispatch-6] INFO  com.amazonaws.http.AmazonHttpClient - Unable to execute HTTP request: peer not authenticated
javax.net.ssl.SSLPeerUnverifiedException: peer not authenticated
        at sun.security.ssl.SSLSessionImpl.getPeerCertificates(SSLSessionImpl.java:431)
        at org.apache.http.conn.ssl.AbstractVerifier.verify(AbstractVerifier.java:128)
        at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:572)
        at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:180)
        at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:294)
        at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:641)
        at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:480)
        at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:906)
        at org.apache.http.impl.client.AbstractHttpClient.execute(AbstractHttpClient.java:805)
        at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:402)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:245)
        at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2221)
        at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:1270)
        at cemerick.bandalore$delete.invoke(bandalore.clj:186)
        at cemerick.bandalore$delete.invoke(bandalore.clj:182)
        at com.climate.squeedo.sqs$ack.invoke(sqs.clj:251)
        at com.climate.squeedo.sqs_consumer$create_workers$fn__6056$state_machine__3971__auto____6057$fn__6059$inst_6044__6087$state_machine__3971__auto____6088$fn__6090.invoke(sqs_consumer.clj:66)
        at com.climate.squeedo.sqs_consumer$create_workers$fn__6056$state_machine__3971__auto____6057$fn__6059$inst_6044__6087$state_machine__3971__auto____6088.invoke(sqs_consumer.clj:58)
        at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:1011)
        at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:1015)
        at com.climate.squeedo.sqs_consumer$create_workers$fn__6056$state_machine__3971__auto____6057$fn__6059$inst_6044__6087.invoke(sqs_consumer.clj:58)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Our code:

(defn -main []
  (let [environment (keyword (env :profile))
        client (sqs/create-client (config environment :amazon :amazonAccessKey) (config environment :amazon :amazonSecretKey))
        queue (config environment :audit)]
    (print-banner environment)
    (createdb environment)
    (let [consumer (start-consumer queue handle-message! :client client :max-concurrent-work 100 :num-listeners 80 )]
      (infinite-loop #(do (Thread/sleep 10000))))))

Are we using the lib correctly or are we missing something?

Allow queueing of messages with SQS attributes

The AWS SDK has support for attributes for SQS messages. This exists in com.amazonaws.services.sqs.model.SendMessageRequest/withMessageAttributes. These would be useful for fields that might normally be in an HTTP header, such as content-type, accept-types, api keys, etc... This in turn would allow us to do things like utilize a single queue for various serialization formats.

Unfortunately, Bandalore doesn't appear to support this directly, as there is no way to inject these attributes there either, so this would either require Squeedo to implement its own message send functionality, or we would need to submit a PR to Bandalore to support this before implementing in Squeedo.

Example of how this might be used:

(sqs/enqueue my-connection my-message-body :message-attrs {:content-type "application/json"})

Google Pubsub support?

Is it possible to use Squeedo with Google Cloud's PubSub service? It seems like Squeedo could be used for any sort of queue consumer. Is there anything that Squeedo depends on that is SQS specific that would limit its use across other queue providers?

Add support for middleware when consuming messages

For each message I receive, I want to perform the following

  • Make a logging statement
  • Send a notification to monitoring tools like Newrelic
  • Set some ThreadLocal context information
  • Deserialize
  • Validate input satisfies some schema
  • Perform my actual computation
  • Validate computation results, putting certain error types back onto the queue for retry, while sending others straight to the DLQ.
  • Log how long this process took .
  • Clear my ThreadLocal context.

Clojure Ring middleware provides great hooks for reducing the boilerplate required in order to follow this kind of workflow. It would be nice to be able to provide similar middleware for SQS message handling.

Workers > Cpus Warning?

"Work is done asynchronously by workers controlled by the size of the work buffer (currently
hardcoded to number of cpus minus 1) Bad things happen if you have workers > number of cpus."

The readme is missing this comment found in the source code. Is this accurate? can you define what this means a bit more?

sqs/enqueue of json messages causes quotes to be escaped

This is due to the fact that pr-str is being used in enqueue for some reason. So when you want to queue

{"a": 7, "b": 6}

What will end up on the queue will be

 "{\"b\":6,\"a\":7}"

This seems like a bug to me, as anyone reading that message on the other end will not be receiving valid json.

Update core.async deps

The latest version is 1.3.610 but it requires the ancient 0.4.474.

Updating may fix errors like

Syntax error macroexpanding clojure.core.async/go at (com/climate/squeedo/sqs_consumer.clj:34:7).
	at clojure.lang.Compiler.macroexpand1(Compiler.java:7010)
	at clojure.lang.Compiler.analyzeSeq(Compiler.java:7093)
...
Caused by: java.lang.IllegalArgumentException: No implementation of method: :has? of protocol: #'clojure.core.cache/CacheProtocol found for class: clojure.core.memoize.PluggableMemoization
	at clojure.core$_cache_protocol_fn.invokeStatic(core_deftype.clj:583)
	at clojure.core$_cache_protocol_fn.invoke(core_deftype.clj:575)

Filename too long creating uberjar

Hello, I'm trying to use Squeedo on a project in my current work, but if I require com.climate.squeedo.sqs-consumer, it fails to create an uberjar with lein uberjar:

java.io.IOException: File name too long, compiling:(com/climate/squeedo/sqs_consumer.clj:73:5)
Exception in thread "main" java.io.IOException: File name too long, compiling:(com/climate/squeedo/sqs_consumer.clj:73:5)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7010)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$InvokeExpr.parse(Compiler.java:3813)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7005)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$TryExpr$Parser.parse(Compiler.java:2278)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.access$300(Compiler.java:38)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6368)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$FnMethod.parse(Compiler.java:5460)
        at clojure.lang.Compiler$FnExpr.parse(Compiler.java:4022)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7001)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.access$300(Compiler.java:38)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6368)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$FnMethod.parse(Compiler.java:5460)
        at clojure.lang.Compiler$FnExpr.parse(Compiler.java:4022)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7001)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.access$300(Compiler.java:38)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6368)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$CaseExpr$Parser.parse(Compiler.java:8925)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6420)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.access$300(Compiler.java:38)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6368)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6420)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$TryExpr$Parser.parse(Compiler.java:2307)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$FnMethod.parse(Compiler.java:5460)
        at clojure.lang.Compiler$FnExpr.parse(Compiler.java:4022)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7001)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$InvokeExpr.parse(Compiler.java:3813)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7005)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$TryExpr$Parser.parse(Compiler.java:2278)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.access$300(Compiler.java:38)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6368)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$FnMethod.parse(Compiler.java:5460)
        at clojure.lang.Compiler$FnExpr.parse(Compiler.java:4022)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7001)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.access$300(Compiler.java:38)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6368)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$FnMethod.parse(Compiler.java:5460)
        at clojure.lang.Compiler$FnExpr.parse(Compiler.java:4022)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7001)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$InvokeExpr.parse(Compiler.java:3881)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7005)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6420)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$LetExpr$Parser.parse(Compiler.java:6420)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler$BodyExpr$Parser.parse(Compiler.java:6100)
        at clojure.lang.Compiler$FnMethod.parse(Compiler.java:5460)
        at clojure.lang.Compiler$FnExpr.parse(Compiler.java:4022)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7001)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:6991)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.access$300(Compiler.java:38)
        at clojure.lang.Compiler$DefExpr$Parser.parse(Compiler.java:595)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7003)
        at clojure.lang.Compiler.analyze(Compiler.java:6773)
        at clojure.lang.Compiler.analyze(Compiler.java:6729)
        at clojure.lang.Compiler.compile1(Compiler.java:7604)
        at clojure.lang.Compiler.compile(Compiler.java:7676)
        at clojure.lang.RT.compile(RT.java:413)
        at clojure.lang.RT.load(RT.java:458)
        at clojure.lang.RT.load(RT.java:426)
        at clojure.core$load$fn__6548.invoke(core.clj:6046)
        at clojure.core$load.invokeStatic(core.clj:6045)
        at clojure.core$load.doInvoke(core.clj:6029)
        at clojure.lang.RestFn.invoke(RestFn.java:408)
        at clojure.core$load_one.invokeStatic(core.clj:5848)
        at clojure.core$load_one.invoke(core.clj:5843)
        at clojure.core$load_lib$fn__6493.invoke(core.clj:5888)
        at clojure.core$load_lib.invokeStatic(core.clj:5887)
        at clojure.core$load_lib.doInvoke(core.clj:5868)
        at clojure.lang.RestFn.applyTo(RestFn.java:142)
        at clojure.core$apply.invokeStatic(core.clj:659)
        at clojure.core$load_libs.invokeStatic(core.clj:5925)
        at clojure.core$load_libs.doInvoke(core.clj:5909)
        at clojure.lang.RestFn.applyTo(RestFn.java:137)
        at clojure.core$apply.invokeStatic(core.clj:659)
        at clojure.core$require.invokeStatic(core.clj:5947)
        at clojure.core$require.doInvoke(core.clj:5947)
        at clojure.lang.RestFn.invoke(RestFn.java:619)
        at events_reports.core$loading__6434__auto____9068.invoke(core.clj:1)
        at clojure.lang.AFn.applyToHelper(AFn.java:152)
        at clojure.lang.AFn.applyTo(AFn.java:144)
        at clojure.lang.Compiler$InvokeExpr.eval(Compiler.java:3695)
        at clojure.lang.Compiler.compile1(Compiler.java:7609)
        at clojure.lang.Compiler.compile1(Compiler.java:7599)
        at clojure.lang.Compiler.compile(Compiler.java:7676)
        at clojure.lang.RT.compile(RT.java:413)
        at clojure.lang.RT.load(RT.java:458)
        at clojure.lang.RT.load(RT.java:426)
        at clojure.core$load$fn__6548.invoke(core.clj:6046)
        at clojure.core$load.invokeStatic(core.clj:6045)
        at clojure.core$load.doInvoke(core.clj:6029)
        at clojure.lang.RestFn.invoke(RestFn.java:408)
        at clojure.core$load_one.invokeStatic(core.clj:5848)
        at clojure.core$compile$fn__6553.invoke(core.clj:6056)
        at clojure.core$compile.invokeStatic(core.clj:6056)
        at clojure.core$compile.invoke(core.clj:6048)
        at user$eval164$fn__173.invoke(form-init3023777514780631851.clj:1)
        at user$eval164.invokeStatic(form-init3023777514780631851.clj:1)
        at user$eval164.invoke(form-init3023777514780631851.clj:1)
        at clojure.lang.Compiler.eval(Compiler.java:7062)
        at clojure.lang.Compiler.eval(Compiler.java:7052)
        at clojure.lang.Compiler.eval(Compiler.java:7052)
        at clojure.lang.Compiler.load(Compiler.java:7514)
        at clojure.lang.Compiler.loadFile(Compiler.java:7452)
        at clojure.main$load_script.invokeStatic(main.clj:278)
        at clojure.main$init_opt.invokeStatic(main.clj:280)
        at clojure.main$init_opt.invoke(main.clj:280)
        at clojure.main$initialize.invokeStatic(main.clj:311)
        at clojure.main$null_opt.invokeStatic(main.clj:345)
        at clojure.main$null_opt.invoke(main.clj:342)
        at clojure.main$main.invokeStatic(main.clj:424)
        at clojure.main$main.doInvoke(main.clj:387)
        at clojure.lang.RestFn.applyTo(RestFn.java:137)
        at clojure.lang.Var.applyTo(Var.java:702)
        at clojure.main.main(main.java:37)
Caused by: java.io.IOException: File name too long
        at java.io.UnixFileSystem.createFileExclusively(Native Method)
        at java.io.File.createNewFile(File.java:1012)
        at clojure.lang.Compiler.writeClassFile(Compiler.java:7550)
        at clojure.lang.Compiler$ObjExpr.compile(Compiler.java:4663)
        at clojure.lang.Compiler$FnExpr.parse(Compiler.java:4099)
        at clojure.lang.Compiler.analyzeSeq(Compiler.java:7001)
        ... 207 more
Compilation failed: Subprocess failed

Binary message attributes

Haven't looked into this much, but message attributes should check what the data type of the MessageAttributeValue is and call the correct getter.

(defn- clojurify-message-attributes [^Message msg]
  (let [javafied-message-attributes (.getMessageAttributes msg)]
    (->> javafied-message-attributes
         (map (fn [[k ^MessageAttributeValue mav]] [(keyword k) (.getStringValue mav)]))
         (into {}))))

Untested but possible solution instead of (.getStringValue mav):

(case (.getDataType mav)
  "String" (.getStringValue mav)
  "Number" (try
             (BigDecimal. (.getStringValue mav))
             (catch NumberFormatException e
               {:error e :original-value (.getStringValue mav)}))
  "Binary" (.getBinaryValue mav))

nack inside catch clause causes issue in case of failure

Hello,

we recently stumbled upon an issue with the worker processing messages in a go-loop:

https://github.com/TheClimateCorporation/squeedo/blob/master/src/com/climate/squeedo/sqs_consumer.clj#L68-L77

as the worker is working through messages, the compute is wrapped in a try/catch block. In case of an error, the catch clause attempts to nack the message.

The problem occurs when the nack fails and throws as well (e.g. due to the message being expired or something, an AmazonSQSException is thrown). As this error is uncaught, it bubbles outside the go-loop. This results in the worker getting stuck and no more messages are processed.
I was wondering, is this the expected behaviour?

Thank you!

enqueue double escapes json from Cheshire

Not sure this is even Squeedo, there are so many parts here. But basically, when sending a json encoded string to the SQS queue, it's showing up with all the json values escape with ", ie {"foo":"bar"}

Generally, this is not an issue, but when python does a json.dumps and pushes the same message it shows up as {"foo":"bar"}

it's further complicated by aws Lamdba functions double escaping the value for some reason.

either way, I wonder if there is something in Squeedo that isn't specifying the format or another way to send it to not escape the json?

Listeners dequeue sqs msgs inside a go block, although dequeue can block the caller thread

I have a scenario with several consumers, the queues are empty except one that has thousands of msgs. I notice that the listener for the non-empty queue is blocking even when the message-channel is not full.
After some investigations, I saw in the SQS doc [http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html] that SQS receiveMessage method blocks the caller thread, until poll time elapses, if the queue is empty. If you decide to implement long polling with multiple queues, we recommend using one thread for each queue instead of trying to use a single thread for polling all of the queues..
Because create-queue-listener function calls sqs/dequeue [that calls receiveMessage with 10secs of long-polling] inside a async/go block - which should not have thread blocking, it seams that empty queue listeners are making the consumer of non-empty queues slow.
Let me know if my conclusion makes sense.
Thanks for the great library.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.