mfglabs / commons-aws Goto Github PK
View Code? Open in Web Editor NEWStreaming / asynchronous Scala client for common AWS services
Streaming / asynchronous Scala client for common AWS services
Hello!
Is there any work being done to bring this library up to date with the latest, stable version of akka? (2.4.x) Currently, this project depends on akka_streams_experimental 1.0 -- akka streams is long out of the experimental stage. This dependency is quite old and has several API inconsistencies with the 2.4.x.
I see that akka-stream-extensions version 0.10.0 depends on this updated akka streams version. However, 0.10.0 is not published (there's an issue for this: MfgLabs/akka-stream-extensions#20).
Is there an ETA that the maintainers have for updating to Akka streams 2.4.x ?
Thank you for your work!
-m
The Akka Streams 2.0 release is out, and I was just wondering if you intend to update this project to use it. I understand this project relies on a bunch of custom stages, and migrating them to the new GraphStage will be a bit time consuming. If you're open to outside help maybe I can lend a hand.
I want to see overload for S3StreamBuilder.uploadStreamAsMultipartFile()
and set file name based not only prefix but on my requirements e.g. current date or any other custom.
It probably doesn't make sense to have uploadFile default to PublicReadWrite ACL by default / without ability to be changed, and without it, the underlying library has an identical method: https://github.com/dwhjames/aws-wrap/blob/master/src/main/scala/s3/s3.scala#L518
Hi,
I'm trying to use the receiveMessageAsStream.
val sqs = new AmazonSQSScalaClient(new AmazonSQSAsyncClient(), global)
val builder = SQSStreamBuilder(sqs)
val producer_topic = actorSystem.settings.config.getString("sqs.producer_topic")
val receiver = builder
.receiveMessageAsStream(producer_topic, autoAck = false)
.runForeach{ message => println(s"${message.toString}")}
My scala app is receiving :
{ MessageId: e3bf62f1-1bc2-4a1b-be6b-ab1fab563ea7, ReceiptHandle:AQEBLk3OMnrtDs7kRnbxgq4nLHQeJQB1pOsnSkDzwZxBXSBNVvJtbW/Lwk8OL0W+Xg5Zxk+SU59rWNkxBykiyEXA5jWqc9f8EKHMirYMFeLOa77boUUH52bTO6lJHkSz8Y+GgJb+0PvqhCD0D7DQ5EBz7ldjZ1Mrm/W5ilsmitQBLmnIYzeXeB29LR0G+MaFv5TXPnvoKN2ki4x0gG9DY+YC70OCL9LQERj0aqsi2PZlAwEp8QClzkzvFQUSC86QpwVaycjudQNkPxkVOKSCnkvww8/fJbR3kE5L5IP+OTba7S3V+7EE32x3ayH+caBGj3c6ExLDtvrsfkT4Qxpns8+ohO6EYrTu6R5DjGngk6eVXvNOTkdDbwsik8i9+tAQxB3W, MD5OfBody: a391271abf507c9cd1f7068a35010845, Body: pheromone, Attributes: {}, MessageAttributes: {} }
And in the SQS console I have :
Message ID: e3bf62f1-1bc2-4a1b-be6b-ab1fab563ea7
Size: 9 bytes
MD5 of Body: a391271abf507c9cd1f7068a35010845
Sender Account ID: AIDAJP22F6U2PFQ4GLZXY
Sent: 2015-06-16 12:19:50.507 GMT+02:00
First Received: 2015-06-16 12:19:50.507 GMT+02:00
Receive Count: 16
Message Attribute Count: 1
Name Type Value
t Binary DCQWAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=
Message Attributes Size: 7 bytes
MD5 of Message Attributes: d42512910e23b3aabe049ea19f0029c7
Any plans to add SNS support?
It is not possible to provide your own region programmatically (or even an endpoint). Can you make AmazonS3AsyncClient
into a trait, so we can extend it?
My stream is running, and completing, but then something is spinning for a minute after I shut down the ActorSystem before the process terminates. I'm not sure what is keeping the process alive, though.
Any chance you guys might be adding DynamoDB support to this library soon?
Creating a very basic stream and limiting it to 1 mapAsync operation immediately pulls 15 messages from SQS for some reason.
def callback: (String) => Future[Unit]
val source = sqs.receiveMessageAsStreamWithRetryExpBackoff(queueUrl = queueUrl, autoAck = false)
val flow = Flow[Message].mapAsync(1) { message => callback(message.getBody) }
source.via(flow).runWith(Sink.ignore)(materializer)
The problem is that if the callback function is long running then the other 14 messages can outlive their visibility window without ever actually being processed and if this happens a few times they end up in a dead letter queue. The stream shouldn't pull more messages than it can concurrently process.
The weird thing is that I see the following in the receiveMessageAsStreamWithRetryExpBackoff, so not sure why 15 messages are pulled?
msg.setMaxNumberOfMessages(Math.min(currentDemand, 10)) // 10 is SQS limit
Locally, I modified the definition of upload to this, which is not correct in general but solves my immediate issue:
def uploadStreamAsFile(bucket: String, key: String, ctype: String, chunkUploadConcurrency: Int = 1): Flow[ByteString, CompleteMultipartUploadResult, Unit] = {
import scala.collection.JavaConversions._
val uploadChunkSize = 8 * 1024 * 1024 // recommended by AWS
def initiateUpload(bucket: String, key: String, ctype: String): Future[String] = {
val meta = new ObjectMetadata()
meta.setContentType(ctype)
val req =
new InitiateMultipartUploadRequest(bucket, key, meta)
.withCannedACL(CannedAccessControlList.PublicRead)
client.initiateMultipartUpload(req).map(_.getUploadId)
}
Basically, I wanted to set the ctype of the upload, which was done by creating a new ObjectMetadata object, as well as change the default ACL used when uploading the file. Ideally, both of these things would be exposed in the signature of uploadStreamAsFile although im not quite sure what would be the best way - for the object meta data it should be pretty easy, something the following should work:
def uploadStreamAsFile(bucket: String, key: String, ctype: String, chunkUploadConcurrency: Int = 1, metadata = new ObjectMetadata()): Flow[ByteString, CompleteMultipartUploadResult, Unit] = {
Exposing the ACL though seems more complicated. Perhaps the most general thing to do would be instead of doing the above, have a variant of uploadStreamAsFile simply take a "InitiateMultipartUploadRequest" object as it signature:
def uploadStreamAsFile(upload: InitiateMultipartUploadRequest, chunkUploadConcurrency: Int = 1): Flow[ByteString, CompleteMultipartUploadResult, Unit] = {
I am trying to work with this library, and it has been awesome by the way, but I am a bit lost with how to handle errors. I was perfectly able to read S3 files using getFileAsStream
. But now I am trying to handle errors. For instance, when a file is non-existent. I am writing an ActorSubscriber
to work as a sink to the file stream. The problem is that I don't get an exception when the stream is created with builder.getFileAsStream(...)
, I can't catch the exception in the stream runWith
, and there is also no OnError
message arriving at the actor sink. The exception is being thrown by some low-level AWS library and then catched by some actor, and all I get is a message like:
[ERROR] [07/22/2015 17:00:55.446] [Main-akka.actor.default-dispatcher-5] [akka://Main/user/app/$a/flow-1-7-singleSource-actorSubscriberSink] com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: BB8A57CBE63E1019E), S3 Extended Request ID: ibmciwv90sG0jqdj27s557NJGgBGci+oSsM1mfYkAnKJwaLgq+2pfd9c5WLhWhdk68su5i7Kx40= WARNING arguments left: 1
How could I treat this and other errors in my program?
Hi, I see that in s3stream, getting the list of files in a buucket gets all the values and sends it as a stream to the downstream component. Suppose if the downstream component is not ready to accept the data, how the back pressure is handled here?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.