Coder Social home page Coder Social logo

thurber's People

Contributors

atdixon avatar hden avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

thurber's Issues

Exception using fn*, possibly related to use of closure?

Code:

(defn pubsub-subscription->queue
  [subscription-name buffer-length]
  (let [q (ArrayBlockingQueue. buffer-length true)
        pubsub-source (.. (PubsubIO/readStrings)
                          (fromSubscription subscription-name))]
    (-> (th/create-pipeline {:streaming true :block-on-run false})
        (doto
         (th/apply!
          pubsub-source
          (th/fn* ->c [v] (.put q v) v))))))

Evaluating this gives

"ck.beam_direct_runner_pubsub_fix$eval8276$pubsub_subscription__GT_queue__8277$eval8279__8280.()"

Stack trace is a bunch of clojure eval stuff, first mention of thurber is thurber.clj line 133. Getting rid of the use of q fixes this. Workaround for now is to pull that code into a var and use th/partial.

NotSerializableException when using spec instrumentation

Should have been obvious in retrospect, but I tried to use spec to instrument my functions, got java.io.NotSerializableException : "clojure.spec.alpha$fspec_impl$reify__2524" when calling apply!. Not sure that this is fixable (or worth bothering). Would be nice to have for testing, I suppose, and probably worth a note in the docs.

Partial application for combiner functions?

This does not work:

(defn map-stats-combiner
  [ks]
  (th/combiner
   (th/fn* extract-map-stats [acc]
           (reduce (fn [x [k v]] (assoc x k (extract-stats v))) {} acc))
   (th/partial #'combine-stats-by-keys ks)
   (th/partial #'reduce-map-stats ks)))

Error: java.lang.AssertionError : "Assert failed: (var? combinef)".

Exception using BigQueryIO

This is a longshot, not sure this issue has anything to do with thurber, but it seems rare via Google. Seeing sporadic exceptions like this writing to BigQuery:

May 21, 2021 5:04:10 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=9, target=bigquerystorage.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
	at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:93)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
	at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
	at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:612)
	at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:261)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:327)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1700(InstantiatingGrpcChannelProvider.java:74)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel(InstantiatingGrpcChannelProvider.java:220)
	at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:227)
	at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:210)
	at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
	at com.google.cloud.bigquery.storage.v1beta2.stub.GrpcBigQueryWriteStub.create(GrpcBigQueryWriteStub.java:138)

There's a lot more stack trace. I'm on the latest versions of thurber and the Beam SDK's. I'm not even sure this is actually a problem, because the pipeline keeps running, but not clear whether it's dropping data due to this. Seen both using direct runner and Dataflow runner. I don't suppose this rings a bell?

I'll also try to put together a minimal example.

Tangent: is there a better place to discuss thurber without filing an issue here? Thanks.

Dramatically simplify and empower timer usage via Beam's new dynamic timers

https://beam.apache.org/documentation/programming-guide/#dynamic-timer-tags

Beam's new dynamic timers can be used to set ad hoc timers from Clojure where any Clojure function could be provided as the callback. Something like this, from within a fn:

(th/set-timer #'timer-callback <instant>)

This would replace having to explicitly set a single timer function th/with-timer, or even having to declare anything about timers in the pipeline. (set-timer would be invoked if/as needed from w/in a dofn.)

(Implementation note: th/set-timer would use the full name of the callback var to invoke TimerMap/set; and the TDoFn/onTimer would load, bind and dispatch to this timer callback using the full name via @TimerId.)

Unbounded source and grouping

I have a feeling this is an issue with DirectRunner (or my noobidity), not thurber, but giving it a shot here anyway. The flow below will not give any output. If I remove the GroupByKey, it does. Same thing happens trying to write to file.

(def pipeline
  (-> (th/create-pipeline {:streaming true :block-on-run false})
      (doto
       (th/apply!
            (read-pubsub-subscription "projects/my-project/my-sub")
            #'decode-json
            (WithTimestamps/of (th/ser-fn #'timestamp))
            (th/partial #'th/->kv :traceId)
            (-> (Window/into
                 (Sessions/withGapDuration (Duration/standardSeconds 1))))
            (GroupByKey/create)
            #'println))))

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.