Coder Social home page Coder Social logo

commons-aws's People

Contributors

alexandrnikitin avatar atamborrino avatar dmnpignaud avatar joearasin avatar malcolmgreaves avatar mandubian avatar ostronom avatar pocman avatar timshel avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

commons-aws's Issues

Update to latest akka-stream-extensions

Hello!

Is there any work being done to bring this library up to date with the latest, stable version of akka? (2.4.x) Currently, this project depends on akka_streams_experimental 1.0 -- akka streams is long out of the experimental stage. This dependency is quite old and has several API inconsistencies with the 2.4.x.

I see that akka-stream-extensions version 0.10.0 depends on this updated akka streams version. However, 0.10.0 is not published (there's an issue for this: MfgLabs/akka-stream-extensions#20).

Is there an ETA that the maintainers have for updating to Akka streams 2.4.x ?

Thank you for your work!

-m

Akka Streams 2.0

The Akka Streams 2.0 release is out, and I was just wondering if you intend to update this project to use it. I understand this project relies on a bunch of custom stages, and migrating them to the new GraphStage will be a bit time consuming. If you're open to outside help maybe I can lend a hand.

MessageAttributes is missing.

Hi,
I'm trying to use the receiveMessageAsStream.

  val sqs = new AmazonSQSScalaClient(new AmazonSQSAsyncClient(), global)
  val builder = SQSStreamBuilder(sqs)
  val producer_topic = actorSystem.settings.config.getString("sqs.producer_topic")

  val receiver = builder
    .receiveMessageAsStream(producer_topic, autoAck = false)
    .runForeach{ message => println(s"${message.toString}")}

My scala app is receiving :
{ MessageId: e3bf62f1-1bc2-4a1b-be6b-ab1fab563ea7, ReceiptHandle:AQEBLk3OMnrtDs7kRnbxgq4nLHQeJQB1pOsnSkDzwZxBXSBNVvJtbW/Lwk8OL0W+Xg5Zxk+SU59rWNkxBykiyEXA5jWqc9f8EKHMirYMFeLOa77boUUH52bTO6lJHkSz8Y+GgJb+0PvqhCD0D7DQ5EBz7ldjZ1Mrm/W5ilsmitQBLmnIYzeXeB29LR0G+MaFv5TXPnvoKN2ki4x0gG9DY+YC70OCL9LQERj0aqsi2PZlAwEp8QClzkzvFQUSC86QpwVaycjudQNkPxkVOKSCnkvww8/fJbR3kE5L5IP+OTba7S3V+7EE32x3ayH+caBGj3c6ExLDtvrsfkT4Qxpns8+ohO6EYrTu6R5DjGngk6eVXvNOTkdDbwsik8i9+tAQxB3W, MD5OfBody: a391271abf507c9cd1f7068a35010845, Body: pheromone, Attributes: {}, MessageAttributes: {} }

And in the SQS console I have :

Message ID: e3bf62f1-1bc2-4a1b-be6b-ab1fab563ea7
Size: 9 bytes
MD5 of Body: a391271abf507c9cd1f7068a35010845
Sender Account ID: AIDAJP22F6U2PFQ4GLZXY
Sent: 2015-06-16 12:19:50.507 GMT+02:00
First Received: 2015-06-16 12:19:50.507 GMT+02:00
Receive Count: 16
Message Attribute Count: 1

Name Type Value
t Binary DCQWAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=
Message Attributes Size: 7 bytes
MD5 of Message Attributes: d42512910e23b3aabe049ea19f0029c7

DynamoDB Support

Any chance you guys might be adding DynamoDB support to this library soon?

AmazonSQSClient receiveMessageAsStreamWithRetryExpBackoff pulling more messages than it should

Creating a very basic stream and limiting it to 1 mapAsync operation immediately pulls 15 messages from SQS for some reason.

def callback: (String) => Future[Unit]

val source = sqs.receiveMessageAsStreamWithRetryExpBackoff(queueUrl = queueUrl, autoAck = false)
val flow = Flow[Message].mapAsync(1) { message => callback(message.getBody) }
source.via(flow).runWith(Sink.ignore)(materializer)

The problem is that if the callback function is long running then the other 14 messages can outlive their visibility window without ever actually being processed and if this happens a few times they end up in a dead letter queue. The stream shouldn't pull more messages than it can concurrently process.

The weird thing is that I see the following in the receiveMessageAsStreamWithRetryExpBackoff, so not sure why 15 messages are pulled?

msg.setMaxNumberOfMessages(Math.min(currentDemand, 10)) // 10 is SQS limit

Expose ObjectMetadata and ACL in uploadStreamAsFile

Locally, I modified the definition of upload to this, which is not correct in general but solves my immediate issue:

  def uploadStreamAsFile(bucket: String, key: String, ctype: String, chunkUploadConcurrency: Int = 1): Flow[ByteString, CompleteMultipartUploadResult, Unit] = {
    import scala.collection.JavaConversions._

    val uploadChunkSize = 8 * 1024 * 1024 // recommended by AWS

    def initiateUpload(bucket: String, key: String, ctype: String): Future[String] = {
      val meta = new ObjectMetadata()
      meta.setContentType(ctype)
      val req =
        new InitiateMultipartUploadRequest(bucket, key, meta)
          .withCannedACL(CannedAccessControlList.PublicRead)
      client.initiateMultipartUpload(req).map(_.getUploadId)
    }

Basically, I wanted to set the ctype of the upload, which was done by creating a new ObjectMetadata object, as well as change the default ACL used when uploading the file. Ideally, both of these things would be exposed in the signature of uploadStreamAsFile although im not quite sure what would be the best way - for the object meta data it should be pretty easy, something the following should work:

def uploadStreamAsFile(bucket: String, key: String, ctype: String, chunkUploadConcurrency: Int = 1, metadata = new ObjectMetadata()): Flow[ByteString, CompleteMultipartUploadResult, Unit] = {

Exposing the ACL though seems more complicated. Perhaps the most general thing to do would be instead of doing the above, have a variant of uploadStreamAsFile simply take a "InitiateMultipartUploadRequest" object as it signature:

def uploadStreamAsFile(upload: InitiateMultipartUploadRequest, chunkUploadConcurrency: Int = 1): Flow[ByteString, CompleteMultipartUploadResult, Unit] = {

How to handle errors?

I am trying to work with this library, and it has been awesome by the way, but I am a bit lost with how to handle errors. I was perfectly able to read S3 files using getFileAsStream. But now I am trying to handle errors. For instance, when a file is non-existent. I am writing an ActorSubscriber to work as a sink to the file stream. The problem is that I don't get an exception when the stream is created with builder.getFileAsStream(...), I can't catch the exception in the stream runWith, and there is also no OnError message arriving at the actor sink. The exception is being thrown by some low-level AWS library and then catched by some actor, and all I get is a message like:

[ERROR] [07/22/2015 17:00:55.446] [Main-akka.actor.default-dispatcher-5] [akka://Main/user/app/$a/flow-1-7-singleSource-actorSubscriberSink] com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: BB8A57CBE63E1019E), S3 Extended Request ID: ibmciwv90sG0jqdj27s557NJGgBGci+oSsM1mfYkAnKJwaLgq+2pfd9c5WLhWhdk68su5i7Kx40= WARNING arguments left: 1

How could I treat this and other errors in my program?

Question on implementing back pressure

Hi, I see that in s3stream, getting the list of files in a buucket gets all the values and sends it as a stream to the downstream component. Suppose if the downstream component is not ready to accept the data, how the back pressure is handled here?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.