Coder Social home page Coder Social logo

kinesis-example-scala-producer's Introduction

Sample Scala Event Producer for Amazon Kinesis

Introduction

This is an example event producer for [Amazon Kinesis] kinesis written in Scala and packaged as an SBT project.

This was built by the [Snowplow Analytics] snowplow team, as part of a proof of concept for porting our event collection and enrichment processes to run on Kinesis.

This has been built to run in conjunction with the [kinesis-example-scala-consumer] consumer.

Pre-requisites

This project requires Java 7 or 8, SBT 0.13.0 and Thrift:

  • On Mac OS X, Thrift is easily installed with brew install thrift.
  • On Linux, sudo apt-get install thrift-compiler libthrift-java.

All dependencies are handled for you if you use Vagrant (see next section).

Building

Assuming git, [Vagrant] vagrant-install and [VirtualBox] virtualbox-install installed:

 host> git clone https://github.com/snowplow/kinesis-example-scala-producer
 host> cd kinesis-example-scala-producer
 host> vagrant up && vagrant ssh
guest> cd /vagrant
guest> sbt compile

Unit testing

To come.

Usage

The event producer has the following command-line interface:

kinesis-example-scala-producer: Version 0.1.2. Copyright (c) 2013, Snowplow
Analytics Ltd.

Usage: kinesis-example-scala-producer [OPTIONS]

OPTIONS
--config filename  Configuration file. Defaults to "resources/default.conf"
                   (within .jar) if not set

Running

Create your own config file:

$ cp src/main/resources/default.conf my.conf

Now edit it and update the AWS credentials:

aws {
  access-key: "cpf"
  secret-key: "cpf"
}

Make sure that the AWS credentials you use have the permissions required to:

  1. Create and write to the Kinesis stream specified in the config file
  2. Create tables in DynamoDB

You can leave the rest of the settings for now.

Next, run the event producer, making sure to specify your new config file. The stream will be created if it doesn't already exist.

$ sbt "run --config ./my.conf"

Finally, verify that events are being sent to your stream by using [snowplow/kinesis-example-scala-consumer] kinesis-consumer or checking the [Kinesis Management Console] kinesis-ui:

ui-screengrab ui-screengrab

Next steps

Fork this project and adapt it into your own custom Kinesis event producer.

FAQ

Is a Kinesis event producer the right place to put stream setup code?

Probably not - best practice would be to handle this as part of your standard AWS devops flow, assigning appropriately-locked down IAM permissions etc. However, this stream setup functionality is included in this project, to simplify getting started.

What about an example Kinesis event consumer aka "Kinesis application" in Scala?

See [snowplow/kinesis-example-scala-consumer] consumer.

Roadmap

  1. Add support for ordered events in the stream [#1]
  2. Add logging output to the producer [#2]
  3. Add live Kinesis integration tests to the test suite [#3]
  4. Add ability to send events as Avro [#5]
  5. Rebase to run on top of Akka (maybe) [#6]

If you would like to help with any of these, feel free to submit a pull request.

Copyright and license

Copyright 2013 Snowplow Analytics Ltd, with portions copyright 2013 Amazon.com, Inc or its affiliates.

Licensed under the [Apache License, Version 2.0] license (the "License"); you may not use this software except in compliance with the License.

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

kinesis-example-scala-producer's People

Contributors

alexanderdean avatar petervandenabeele avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kinesis-example-scala-producer's Issues

Error compiling project

Hello Snowplow,

I am encountering an error building this project.

Specs:

  • Mac OSX High Sierra 10.13
  • sbt 1.0.4
  • Thrift Version: 0.11.0

Steps to repro

  1. brew install thrift
  2. Clone repo
  3. cd kinesis-example-scala-producer
  4. sbt compile
Stack
[info] Loading project definition from /Users/unknown/unknown/kinesis-example-scala-producer/project
[info] Updating {file:/Users/unknown/unknown/kinesis-example-scala-producer/project/}kinesis-example-scala-producer-build...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Compiling 3 Scala sources to /Users/unknown/unknown/kinesis-example-scala-producer/project/target/scala-2.10/sbt-0.13/classes...
[info] Set current project to kinesis-example-scala-producer (in build file:/Users/unknown/unknown/kinesis-example-scala-producer/)
[info] Updating {file:/Users/unknown/unknown/kinesis-example-scala-producer/}kinesis-example-scala-producer...
[info] Compiling schema with command: thrift -gen java:hashcode -o /Users/unknown/unknown/kinesis-example-scala-producer/target/scala-2.10/src_managed/main /Users/unknown/unknown/kinesis-example-scala-producer/src/main/thrift/thrift_data.thrift
[info] Resolving org.apache.httpcomponents#httpclient;4.3.1 ...
[info]
[error] [FAILURE:generation:1] Error: unknown option java:hashcode
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Compiling 3 Scala sources to /Users/unknown/unknown/kinesis-example-scala-producer/target/scala-2.10/classes...
[error] /Users/unknown/unknown/kinesis-example-scala-producer/src/main/scala/com/snowplowanalytics/kinesis/producer/StreamProducer.scala:218: type StreamData is not a member of package com.snowplowanalytics.kinesis.producer.generated
[error]     val streamData = new generated.StreamData(dataName, dataTimestamp)
[error]                                    ^
[error] one error found
[error] (compile:compile) Compilation failed
[error] Total time: 6 s, completed Jan 16, 2018 9:26:13 PM

Update README

Can remove:

Please note: Amazon Kinesis is currently in private beta. Being on the Kinesis private beta is a pre-requisite to building and running this project.

And:

Now manually copy the relevant jar from your Amazon Kinesis SDK Preview:

$ cd kinesis-example-scala-producer
$ cp ~/downloads/AmazonKinesisSDK-preview/aws-java-sdk-1.6.4/lib/aws-java-sdk-1.6.4.jar lib/
$ sbt assembly

Build Fail

When i did a sbt package it failed with the following stacktrace. Looks like i need to have thrift in the $PATH?

java.io.IOException: Cannot run program "thrift": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1047)
    at sbt.SimpleProcessBuilder.run(ProcessImpl.scala:384)
    at sbt.AbstractProcessBuilder.run(ProcessImpl.scala:136)
    at sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(ProcessImpl.scala:167)
    at sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(ProcessImpl.scala:167)
    at sbt.BufferedLogger.buffer(BufferedLogger.scala:25)
    at sbt.AbstractProcessBuilder.runBuffered(ProcessImpl.scala:167)
    at sbt.AbstractProcessBuilder.$bang(ProcessImpl.scala:164)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$1$$anonfun$apply$1.apply(ThriftPlugin.scala:125)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$1$$anonfun$apply$1.apply(ThriftPlugin.scala:120)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$1.apply(ThriftPlugin.scala:120)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$1.apply(ThriftPlugin.scala:117)
    at sbt.FileFunction$$anonfun$cached$1.apply(Tracked.scala:188)
    at sbt.FileFunction$$anonfun$cached$1.apply(Tracked.scala:188)
    at sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3$$anonfun$apply$4.apply(Tracked.scala:202)
    at sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3$$anonfun$apply$4.apply(Tracked.scala:198)
    at sbt.Difference.apply(Tracked.scala:177)
    at sbt.Difference.apply(Tracked.scala:158)
    at sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3.apply(Tracked.scala:198)
    at sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3.apply(Tracked.scala:197)
    at sbt.Difference.apply(Tracked.scala:177)
    at sbt.Difference.apply(Tracked.scala:152)
    at sbt.FileFunction$$anonfun$cached$2.apply(Tracked.scala:197)
    at sbt.FileFunction$$anonfun$cached$2.apply(Tracked.scala:195)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$.compileThrift(ThriftPlugin.scala:132)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$thriftSettings$9.apply(ThriftPlugin.scala:51)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$thriftSettings$9.apply(ThriftPlugin.scala:49)
    at scala.Function7$$anonfun$tupled$1.apply(Function7.scala:35)
    at scala.Function7$$anonfun$tupled$1.apply(Function7.scala:34)
    at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
    at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
    at sbt.std.Transform$$anon$4.work(System.scala:64)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
    at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
    at sbt.Execute.work(Execute.scala:244)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
    at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
    at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:186)
    at java.lang.ProcessImpl.start(ProcessImpl.java:130)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1028)
    at sbt.SimpleProcessBuilder.run(ProcessImpl.scala:384)
    at sbt.AbstractProcessBuilder.run(ProcessImpl.scala:136)
    at sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(ProcessImpl.scala:167)
    at sbt.AbstractProcessBuilder$$anonfun$runBuffered$1.apply(ProcessImpl.scala:167)
    at sbt.BufferedLogger.buffer(BufferedLogger.scala:25)
    at sbt.AbstractProcessBuilder.runBuffered(ProcessImpl.scala:167)
    at sbt.AbstractProcessBuilder.$bang(ProcessImpl.scala:164)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$1$$anonfun$apply$1.apply(ThriftPlugin.scala:125)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$1$$anonfun$apply$1.apply(ThriftPlugin.scala:120)
    at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$1.apply(ThriftPlugin.scala:120)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$1.apply(ThriftPlugin.scala:117)
    at sbt.FileFunction$$anonfun$cached$1.apply(Tracked.scala:188)
    at sbt.FileFunction$$anonfun$cached$1.apply(Tracked.scala:188)
    at sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3$$anonfun$apply$4.apply(Tracked.scala:202)
    at sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3$$anonfun$apply$4.apply(Tracked.scala:198)
    at sbt.Difference.apply(Tracked.scala:177)
    at sbt.Difference.apply(Tracked.scala:158)
    at sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3.apply(Tracked.scala:198)
    at sbt.FileFunction$$anonfun$cached$2$$anonfun$apply$3.apply(Tracked.scala:197)
    at sbt.Difference.apply(Tracked.scala:177)
    at sbt.Difference.apply(Tracked.scala:152)
    at sbt.FileFunction$$anonfun$cached$2.apply(Tracked.scala:197)
    at sbt.FileFunction$$anonfun$cached$2.apply(Tracked.scala:195)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$.compileThrift(ThriftPlugin.scala:132)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$thriftSettings$9.apply(ThriftPlugin.scala:51)
    at com.github.bigtoast.sbtthrift.ThriftPlugin$$anonfun$thriftSettings$9.apply(ThriftPlugin.scala:49)
    at scala.Function7$$anonfun$tupled$1.apply(Function7.scala:35)
    at scala.Function7$$anonfun$tupled$1.apply(Function7.scala:34)
    at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
    at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
    at sbt.std.Transform$$anon$4.work(System.scala:64)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
    at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
    at sbt.Execute.work(Execute.scala:244)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
    at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
    at sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
[error] (thrift:generateJava) java.io.IOException: Cannot run program "thrift": error=2, No such file or directory
[error] Total time: 0 s, completed 12 Nov, 2014 5:54:28 PM

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.