Coder Social home page Coder Social logo

zio-s3's Introduction

ZIO S3

ZIO S3 is a thin wrapper over S3 async client for ZIO.

Production Ready CI Badge Sonatype Releases Sonatype Snapshots javadoc ZIO S3

Introduction

ZIO-S3 is a thin wrapper over the s3 async java client. It exposes the main operations of the s3 java client.

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import zio.Chunk
import zio.s3._
import zio.stream.{ZSink, ZStream}
import software.amazon.awssdk.services.s3.model.S3Exception

  // list all buckets available  
  listBuckets.provideLayer(
     live("us-east-1", AwsBasicCredentials.create("accessKeyId", "secretAccessKey"))
  )
  
  // list all objects of all buckets
  val l2: ZStream[S3, S3Exception, String] = (for {
     bucket <- ZStream.fromIterableZIO(listBuckets) 
     obj <- listAllObjects(bucket.name)
  } yield obj.bucketName + "/" + obj.key).provideLayer(
     live("us-east-1", AwsBasicCredentials.create("accessKeyId", "secretAccessKey"))
  )  

All available s3 combinators and operations are available in the package object zio.s3, you only need to import zio.s3._

Installation

In order to use this library, we need to add the following line in our build.sbt file:

libraryDependencies += "dev.zio" %% "zio-s3" % "0.4.2.4" 

Example 1

Let's try an example of creating a bucket and adding an object into it. To run this example, we need to run an instance of Minio which is object storage compatible with S3:

docker run -p 9000:9000 -e MINIO_ACCESS_KEY=MyKey -e MINIO_SECRET_KEY=MySecret minio/minio  server --compat /data

In this example we create a bucket and then add a JSON object to it and then retrieve that:

import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.regions.Region
import zio._
import zio.s3._
import zio.stream.{ZStream, ZPipeline}
import zio.{Chunk, ExitCode, URIO}

import java.net.URI

object ZIOS3Example extends ZIOAppDefault {

  val myApp = for {
    _ <- createBucket("docs")
    json = Chunk.fromArray("""{  "id" : 1 , "name" : "A1" }""".getBytes)
    _ <- putObject(
      bucketName = "docs",
      key = "doc1",
      contentLength = json.length,
      content = ZStream.fromChunk(json),
      options = UploadOptions.fromContentType("application/json")
    )
    _ <- getObject("docs", "doc1")
      .via(ZPipeline.utf8Decode)
      .foreach(Console.printLine(_))
  } yield ()

  def run =
    myApp
      .provide(
        live(
          Region.CA_CENTRAL_1,
          AwsBasicCredentials.create("MyKey", "MySecret"),
          Some(URI.create("http://localhost:9000"))
        )
      )
}

Example 2

import software.amazon.awssdk.services.s3.model.S3Exception
import zio._
import zio.stream.{ ZSink, ZStream }
import zio.s3._

// upload
val json: Chunk[Byte] = Chunk.fromArray("""{  "id" : 1 , "name" : "A1" }""".getBytes)
val up: ZIO[S3, S3Exception, Unit] = putObject(
  "bucket-1",
  "user.json",
  json.length,
  ZStream.fromChunk(json),
  UploadOptions.fromContentType("application/json")
)

// multipartUpload 
import java.io.FileInputStream
import java.nio.file.Paths

val is = ZStream.fromInputStream(new FileInputStream(Paths.get("/my/path/to/myfile.zip").toFile))
val proc2: ZIO[S3, S3Exception, Unit] =
  multipartUpload(
    "bucket-1",
    "upload/myfile.zip",
    is,
    MultipartUploadOptions.fromUploadOptions(UploadOptions.fromContentType("application/zip"))
  )(4)

// download
import java.io.OutputStream

val os: OutputStream = ???
val proc3: ZIO[S3, Exception, Long] = getObject("bucket-1", "upload/myfile.zip").run(ZSink.fromOutputStream(os))

Support any commands?

If you need a method which is not wrapped by the library, you can have access to underlying S3 client in a safe manner by using

import java.util.concurrent.CompletableFuture
import zio.s3._
import software.amazon.awssdk.services.s3.S3AsyncClient
 
def execute[T](f: S3AsyncClient => CompletableFuture[T]) 

Documentation

Learn more on the ZIO S3 homepage!

Contributing

For the general guidelines, see ZIO contributor's guide.

Code of Conduct

See the Code of Conduct

Support

Come chat with us on Badge-Discord.

License

License

zio-s3's People

Contributors

adamgfraser avatar ghostdogpr avatar github-actions[bot] avatar hbibel avatar jdegoes avatar jorokr21 avatar khajavi avatar melgenek avatar mschuwalow avatar msvaljek avatar pavulonx avatar pshemass avatar regis-leray avatar renovate-bot avatar renovate[bot] avatar scala-steward avatar sirocchj avatar softinio avatar tpl0ch avatar yurikpanic avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

zio-s3's Issues

Remove region check

Hi,

What do you think about allowing the use of any region, instead of using a limited list of options? The current solution (

case r if Region.regions().contains(r) => Right(new S3Region(r))
) makes it impossible to use other (than AWS) implementations of S3.

putObject does not work when a stream contains multiple chunks

It tries to upload every chunk with putObject and fails with the following error because the chunk size is not equal to the content length:

software.amazon.awssdk.services.s3.model.S3Exception: Your socket connection to the server was not read from or written to within the timeout period. Idle connections will be closed.

This seems to be introduced in #251 when .toPublisher has been removed. Version 0.3.5 works fine in this regard.

`fromWebIdentity` based authentication does not work

When S3Credentials.fromWebIdentity is used (either directly or via fromAll, when all other methods in the chain failed) it calls WebIdentityTokenFileCredentialsProvider.create().resolveCredentials() under the hood. And populates S3Credentials case class with values from AwsCredentials interface returned by this resolveCredentials method.

But it looks like it's not enough to copy just key and secret in this case. The instance that is returned by resolveCredentials is an instance of AwsSessionCredentials, which holds accessKey, secretKey and sessionToken. So, just passing an instance of StaticCredentialsProvider with values of key and secret copied from resolveCredentials result is not enough in this case. Authorization fails.

But it works, if we create the s3 client by passing an instance created by WebIdentityTokenFileCredentialsProvider.create() to .credentialsProvider builder method.

Add a possibility to handle session credentials in `live`.

There is currently a method called def live(region: Region, credentials: AwsCredentials, uriEndpoint: Option[URI] = None) which will only use the accessKeyId and secretAccessKey when authenticating. This makes it impossible to use with e.g. SessionAwsCredentials which will return cryptic errors about keys not being present in the registry.

We could either add a special handling for SessionAwsCredentials or add a note in the documentation about this behavior.

Fails to import build with Scala 3

When I compile with

val scalaVersion = "3.2.2"

And dependency

"dev.zio"     %% "zio-s3"                 % "0.4.2.1",

I'm getting an error

2023.01.27 10:40:18 INFO  [error] Modules were resolved with conflicting cross-version suffixes in ProjectRef(uri("file:/..."), "..."):
2023.01.27 10:40:18 INFO  [error]    org.scala-lang.modules:scala-collection-compat _3, _2.13

I am not referencing that package in any other dependency that I'm aware of, and if I remove the zio-s3 dependency my code compiles and runs just fine.

I'm new to Scala and ZIO, but from what I can see, the problem line appears to be this one https://github.com/zio/zio-s3/blob/main/build.sbt#L46 and this one

val ScalaDotty = "3.1.2"

Stub fails on MS Windows

Test instance use PosixAttributes which is not compatible with Windows operation system.

code:

import zio._
import zio.blocking.Blocking
import zio.nio.file.Files
object ToRemove extends zio.App {

  val stub =
    ZLayer.fromManaged(
      for {
        temp     <- Files.createTempDirectoryManaged(prefix = Some("s3-data"), fileAttributes = Seq.empty)
        blocking <- ZManaged.environment[Blocking]
      } yield zio.s3.Test.connect(temp)(blocking)
    )

  val program = zio.s3.createBucket("blah") *> zio.s3.listAllObjects("blah").runDrain

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
    program.provideCustomLayer(stub).exitCode
  }
}

error:

Fiber failed.
An unchecked error was produced.
java.lang.UnsupportedOperationException
	at java.base/sun.nio.fs.WindowsFileSystemProvider.readAttributes(WindowsFileSystemProvider.java:193)
	at java.base/java.nio.file.Files.readAttributes(Files.java:1763)
	at zio.nio.file.Files$.$anonfun$readAttributes$1(Files.scala:176)
	at zio.internal.FiberContext.evaluateNow(FiberContext.scala:490)
	at zio.internal.FiberContext.$anonfun$evaluateLater$1(FiberContext.scala:776)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

No support for EKS IAM

Our Team had a problem using this library, and pin point the problem to this issue:

aws/aws-sdk-java-v2#1470

The solution was to add the sts dependency in order to enable WebIdentity and then pass the DefaultCredentialsProvider

val builder = S3AsyncClient
              .builder()
              .credentialsProvider(DefaultCredentialsProvider.create())
              .region(region)
            uriEndpoint.foreach(builder.endpointOverride)
            builder.build()

So the solution would be to allow to create a layer passing a CredentialsProvider or allow a constructor that only takes region and calls DefaultCredentialsProvider.create(), theres also the sts dependency if we want to add it or not, which seems like we should

Please let me know what you decide and i can help with a PR.

The use of heap ByteBuffers in putObject can lead to subtle OOM issues

The usage of HeapByteBuffer in the putObject method can lead to a pretty subtle OOM issue that is tricky to figure out. If you are using many threads to do IO, in this case putting an object in s3, the JVM is going to cache one or more of these ByteBuffers per thread, and by default there is no limit on the size or number of these buffers. As as result, if you create many threads for IO and these buffers are large, the app can use a lot of additional native memory that looks like a leak.

For us, when running in production, this presented itself as the java process consuming several more GB of memory than we had allocated for the heap and thus getting killed when it ran out of memory.

This issue can be mitigated by defining jdk.nio.maxCachedBufferSize so that large buffers are not cached, but maybe it makes sense to consider using a direct buffer or something else.

Normal stream errors unexpectedly kill the fiber in multipartUpload

The signature for multipartUpload is:

def multipartUpload[R](
  bucketName: String,
  key: String,
  content: ZStream[R, Throwable, Byte],
  options: MultipartUploadOptions
)(parallelism: Int): ZIO[R, S3Exception, Unit] =

content, the stream passed in by the library user, has the error channel type Throwable, indicating that it is allowed to fail with a Throwable.

However, multipartUpload does the following:

.refineOrDie {
  case e: S3Exception => e
}

Now the Throwable we were allowed to fail with causes an unexpected fiber death. This caused a production issue for us, as without looking into the code we would not have known that something allowed in the method signature would be considered a fiber-killing code defect.

This behavior was introduced in #170 and I would argue it is incorrect. A stream failing with a normal error is not a defect that cannot be recovered from. I would suggest that instead of dying or wrapping content stream errors with a custom type, just keep the Throwable type as the return error channel type for multipartUpload.

I would be happy to put together a PR for this change.

How to call `liveZIO`?

This is more of a question than an issue.
The docs show us the following way of building credentials:

// build S3 Layer from default available credentials providers
val s3: Layer[S3Exception, S3] = liveZIO(Region.AF_SOUTH_1, default)

However, this doesn't result in a Layer[S3Exception, S3], it returns a ZLayer[zio.Scope, S3Exception, S3]

I get the following error, for example:

type mismatch;
 found   : zio.ZLayer[zio.Scope,software.amazon.awssdk.services.s3.model.S3Exception,zio.s3.S3]
 required: zio.Layer[software.amazon.awssdk.services.s3.model.S3Exception,zio.s3.S3]
    (which expands to)  zio.ZLayer[Any,software.amazon.awssdk.services.s3.model.S3Exception,zio.s3.S3]
Error occurred in an application involving default arguments.
  def live2: Layer[S3Exception, S3] = liveZIO(Region.AF_SOUTH_1, default)

What should I do? Call liveZIO[Any] (which is the only thing I can think of that let's me compile this code)? What do I need to provide here?

no commonPrefixes property that exists in native aws s3 client listObjectsV2 response

Having such s3 files
client_1/file_1
client_1/file_n
client_2/file_1
client_n/file_n

With zio s3 there is no way to query without fetching all files to get all list of clients
This can be achieved with usage of native s3 client and passing delimiter and retrieve commonPrefixes property from response

Can be used execute method, but it will luck pagination, so may be make sense to improve a little zio.s3
Thanks

Allow extending the client

Currently the environment methods are in the package object, not in a trait that can be extended. Similarly, the live implementation is final, so cannot be extended.

The use case is adding more user-friendly methods, e.g.

  def putObject(
                 bucketName: String,
                 key: String,
                 content: String,
                 options: UploadOptions = UploadOptions.default
               ): Task[Unit] = {
    val bytes = content.getBytes("UTF-8")
    wrappedClient.putObject(bucketName, key, bytes.length, ZStream.fromChunk(Chunk.fromArray(bytes)), options)
  }

So the options are:

  1. Put packabe objects in a trait
  2. Make the Live and stub non final (less ideal as they would both need to be extended)
  3. Both?

Scala 3 solves this via export that makes proxy classes trivial but, of course, most people are in Scala 2.

release snapshot

dmijicToday at 4:10 PM
@here Just released snapshots for the following projects:

  • zio-interop-cats
  • zio-interop-scalaz
  • zio-interop-monix
  • zio-interop-twitter
  • zio-interop-reactivestreams
  • zio-interop-java

As there are quite a few projects in the organization nowadays, I'd like to ask maintainers of projects that has release process enabled to help @softinio and me by aligning their projects with the latest changes. The following PR can be used as an example: https://github.com/zio/interop-twitter/pull/75/files.

S3 getObject exception cant return correct status code when object key not exist.

verison: 0.4.2.1

import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model._
import zio._
import zio.s3.getObject
import zio.stream.ZStream

object Test1 extends ZIOAppDefault {

  override def run: ZIO[Any with ZIOAppArgs with Scope, Throwable, Unit] = {
    val bucketName = "cassandra-dev"
    val key = "not-exist-key"

    def awsGetObject() = {

      val region = Region.CN_NORTHWEST_1
      val client = S3Client.builder.region(region).build

      val request = GetObjectRequest
        .builder()
        .bucket(bucketName)
        .key(key)
        .build()

      try {
        client.getObject(request)
      } catch {
        case e: S3Exception =>
          println("aws sdk return code:" + e.statusCode())
      }
    }
    awsGetObject()

    getObject(bucketName, key).catchAll { e =>
      println("zio s3 return code:" + e.statusCode())
      ZStream.fromIterable(Array[Byte]())
    }.runDrain.provideLayer(layer.all)
  }

}

output:

aws sdk return code:404
zio s3 return code:0

It should return 404 rather than 0.

It seems that the builder not init correctly?

.flatMap(identity)
.flattenChunks
.mapError(e => S3Exception.builder().message(e.getMessage).cause(e).build())
.refineOrDie {
case e: S3Exception => e
}

Stub does not list files if prefix is a directory

Hi all,

I found an inconsistency between the stub and the live implementations of the s3 service with regards to listing objects with a prefix. This small app highlights the issue:

package zio.s3

import zio._
import zio.s3._
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import java.net.URI
import zio.nio.file.Path

object ListingObjectsWithPrefix extends ZIOAppDefault {
  // Run in the root of this repo
  // Run docker-compose up before running this app

  override def run =
    for {
      _ <- listInMinio
      _ <- listWithStub
    } yield ()

  def listInMinio =
    list("minio").provide(
      zio.s3
        .live(
          Region.CA_CENTRAL_1,
          AwsBasicCredentials.create("TESTKEY", "TESTSECRET"),
          Some(URI.create("http://127.0.0.1:9000"))
        )
    )

  def listWithStub =
    list("stub").provide(
      stub(Path("./test-data"))
    )

  def list(label: String) =
    for {
      objs <- listObjects(
                "bucket-1",
                ListObjectOptions(prefix = Some("dir1"), maxKeys = 10, delimiter = None, starAfter = None)
              )
      _    <- Console.printLine(s"Objects using $label: " + objs.objectSummaries.map(_.key).mkString(", "))
    } yield ()

}

This will print:

Objects using minio: dir1/hello.txt, dir1/user.csv
Objects using stub:

From my understanding of the AWS API the minio output is correct. I could also reproduce it using an actual s3 bucket.

I've looked into the code and believe I found the cause of the issue. I'll create a PR shortly.

fix publishing doc build

https://github.com/zio/zio-s3/runs/7165967902?check_suite_focus=true

No deploy key found. Attempting to auth with ssh key saved in ssh-agent. To use a deploy key instead, set the GIT_DEPLOY_KEY environment variable.
yarn install v1.22.19
warning package.json: No license field
info No lockfile found.
warning No license field
[1/4] Resolving packages...
warning docusaurus > @babel/[email protected]: 🚨 This package has been deprecated in favor of separate inclusion of a polyfill and regenerator-runtime (when needed). See the @babel/polyfill docs (https://babeljs.io/docs/en/babel-polyfill) for more information.
warning docusaurus > [email protected]: Support has ended for 9.x series. Upgrade to @latest
warning docusaurus > [email protected]: request has been deprecated, see https://github.com/request/request/issues/3142
warning docusaurus > crowdin-cli > [email protected]: request has been deprecated, see https://github.com/request/request/issues/3142
warning docusaurus > @babel/polyfill > [email protected]: core-js@<3.23.3 is no longer maintained and not recommended for usage due to the number of issues. Because of the V8 engine whims, feature detection in old core-js versions could cause a slowdown up to 100x even if nothing is polyfilled. Some versions have web compatibility issues. Please, upgrade your dependencies to the actual version of core-js.
warning docusaurus > imagemin-svgo > [email protected]: This SVGO version is no longer supported. Upgrade to v2.x.x.
warning docusaurus > request > [email protected]: this library is no longer supported
warning docusaurus > request > [email protected]: Please upgrade  to version 7 or higher.  Older versions may use Math.random() in certain circumstances, which is known to be problematic.  See https://v8.dev/blog/math-random for details.
warning docusaurus > cssnano > cssnano-preset-default > postcss-svgo > [email protected]: This SVGO version is no longer supported. Upgrade to v2.x.x.
warning docusaurus > imagemin-optipng > exec-buffer > tempfile > [email protected]: Please upgrade  to version 7 or higher.  Older versions may use Math.random() in certain circumstances, which is known to be problematic.  See https://v8.dev/blog/math-random for details.
warning docusaurus > imagemin-svgo > svgo > [email protected]: Modern JS already guarantees Array#sort() is a stable sort, so this library is deprecated. See the compatibility table on MDN: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/sort#browser_compatibility
warning docusaurus > markdown-toc > gray-matter > [email protected]: CoffeeScript on NPM has moved to "coffeescript" (no hyphen)
warning docusaurus > markdown-toc > remarkable > autolinker > [email protected]: Removed event-stream from gulp-header
warning docusaurus > react-dev-utils > fork-ts-checker-webpack-plugin > micromatch > snapdragon > [email protected]: See https://github.com/lydell/source-map-resolve#deprecated
warning docusaurus > react-dev-utils > fork-ts-checker-webpack-plugin > micromatch > snapdragon > source-map-resolve > [email protected]: See https://github.com/lydell/source-map-url#deprecated
warning docusaurus > react-dev-utils > fork-ts-checker-webpack-plugin > micromatch > snapdragon > source-map-resolve > [email protected]: https://github.com/lydell/resolve-url#deprecated
warning docusaurus > react-dev-utils > fork-ts-checker-webpack-plugin > micromatch > snapdragon > source-map-resolve > [email protected]: Please see https://github.com/lydell/urix#deprecated
[2/4] Fetching packages...
[3/4] Linking dependencies...
warning " > [email protected]" has unmet peer dependency "react@>= 0.11.2 < 16.0.0".
[4/4] Building fresh packages...
success Saved lockfile.
Done in 22.11s.
yarn run v1.22.19
warning package.json: No license field
$ docusaurus-publish
HEAD
https://github.com/zio/zio-s3
generate.js triggered...
Version 9 of Highlight.js has reached EOL and is no longer supported.
Please upgrade or ask whatever dependency you are using to upgrade.
https://github.com/highlightjs/highlight.js/issues/2877
sitemap.js triggered...
Site built successfully. Generated files in 'build' folder.
79b75240166c818ad3ac0e4a3b1b8f59aeb3373c
Cloning into 'zio-s3-gh-pages'...
Warning: Permanently added the ECDSA host key for IP address '192.[30](https://github.com/zio/zio-s3/runs/7165967902?check_suite_focus=true#step:4:31).255.112' to the list of known hosts.
[email protected]: Permission denied (publickey).
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.
Error: git clone failed
error Command failed with exit code 1.
info Visit https://yarnpkg.com/en/docs/cli/run for documentation about this command.
[error] java.lang.AssertionError: assertion failed: command returned 1: [/tmp/docusaurus29551788178[40](https://github.com/zio/zio-s3/runs/7165967902?check_suite_focus=true#step:4:41)105059install_ssh.sh]
[error] 	at scala.Predef$.assert(Predef.scala:223)
[error] 	at mdoc.DocusaurusPlugin$XtensionProcess.execute(DocusaurusPlugin.scala:167)
[error] 	at mdoc.DocusaurusPlugin$.$anonfun$projectSettings$5(DocusaurusPlugin.scala:119)
[error] 	at mdoc.DocusaurusPlugin$.$anonfun$projectSettings$5$adapted(DocusaurusPlugin.scala:100)
[error] 	at scala.Function1.$anonfun$compose$1(Function1.scala:49)
[error] 	at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:62)
[error] 	at sbt.std.Transform$$anon$4.work(Transform.scala:68)
[error] 	at sbt.Execute.$anonfun$submit$2(Execute.scala:282)
[error] 	at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:23)
[error] 	at sbt.Execute.work(Execute.scala:291)
[error] 	at sbt.Execute.$anonfun$submit$1(Execute.scala:282)
[error] 	at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:265)
[error] 	at sbt.CompletionService$$anon$2.call(CompletionService.scala:64)
[error] 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[error] 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[error] 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[error] 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
[error] 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
[error] 	at java.lang.Thread.run(Thread.java:7[48](https://github.com/zio/zio-s3/runs/7165967902?check_suite_focus=true#step:4:49))
[error] (docs / docusaurusPublishGhpages) java.lang.AssertionError: assertion failed: command returned 1: [/tmp/docusaurus295517881784010[50](https://github.com/zio/zio-s3/runs/7165967902?check_suite_focus=true#step:4:51)59install_ssh.sh]
[error] Total time: 67 s (01:07), completed Jul 3, 2022 2:[52](https://github.com/zio/zio-s3/runs/7165967902?check_suite_focus=true#step:4:53):[54](https://github.com/zio/zio-s3/runs/7165967902?check_suite_focus=true#step:4:55) AM

multipartUpload throws software.amazon.awssdk.services.s3.model.S3Exception

Hello
I have found that multipartUpload can throw

A checked error was not handled.
software.amazon.awssdk.services.s3.model.S3Exception: Unable to execute HTTP request: null

for parallel upload (parallelism > 1)
file for reproduce

  val ak = ???
  val sk = ???
  val uri = ???
  val filePath = ???
  val stream = ZStream.fromFile(filePath).provideLayer(Blocking.live)
  val s3Layer = s3.live(Region.EU_CENTRAL_1, AwsBasicCredentials.create(ak, sk), Some(URI.create(uri)))
  val app = multipartUpload("temp", "test", stream)(2)

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = app.provideLayer(s3Layer).exitCode

version
"dev.zio" %% "zio-s3" % "0.3.7"

Expose S3AsynBuilder

So I managed to figure out why my issue was happening. It seems like I need to add this line of code when creating the S3AsyncClient
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) sadly zio.s3 does not allow me to specify this configuration so I had to manually create a client and instantiate Live

Connection timeout when streaming large files?

I am seeing slow performance and connection timeouts for "large" files (only tested with 10 MB or above).
Example:
Streaming the contents of all the files in a folder.
The folder consists of 2 files which are filled with json lines.
I was able to reproduce the issues on AWS with the following code:

    s3.listAllObjects(s3Config.extractedDataBucket, ListObjectOptions.default.copy(prefix = Some(s"${dataFolder}")))
      .flatMap(file => s3
        .streamLines(s3Config.extractedDataBucket, file.key)
        .map(line => jawn.decode[T](line))
      )
      .collectRight
      .groupedWithin(1000, 10.seconds)
      .runCount

The equivalent code runs quickly and to completion when run locally with minio.
Anyone have ideas on how to fix this? Atleast increase the timeout?

Version:
"dev.zio" %% "zio-s3" % "0.4.2.1"

Stacktrace:

2023-08-25 21:26:56 | java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-4 [ACTIVE] |  
-- | -- | --
  |   | 2023-08-25 21:26:56 | at org.opensearch.client.RestClient.extractAndWrapCause(RestClient.java:936) |  
  |   | 2023-08-25 21:26:56 | at org.opensearch.client.RestClient.performRequest(RestClient.java:332) |  
  |   | 2023-08-25 21:26:56 | at org.opensearch.client.RestClient.performRequest(RestClient.java:320) |  
  |   | 2023-08-25 21:26:56 | at org.opensearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1911) |  
  |   | 2023-08-25 21:26:56 | at org.opensearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1877) |  
  |   | 2023-08-25 21:26:56 | at org.opensearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1845) |  
  |   | 2023-08-25 21:26:56 | at org.opensearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:364) |  
  |   | 2023-08-25 21:26:56 | at org.******.ingestion.opensearch.OpensearchClient.putDocs(OpensearchClient.scala:233) |  
  |   | 2023-08-25 21:26:56 | at org.*******.ingestion.load.Repository.$anonfun$dataStream$2(Repository.scala:26) |  
  |   | 2023-08-25 21:26:56 | at zio.stream.ZChannel.$anonfun$mapOutZIOPar$26(ZChannel.scala:647) |  
  |   | 2023-08-25 21:26:56 | at zio.ZIO$InterruptibilityRestorer$MakeInterruptible$.apply(ZIO.scala:5864) |  
  |   | 2023-08-25 21:26:56 | at zio.stream.ZChannel.$anonfun$mapOutZIOPar$25(ZChannel.scala:647) |  
  |   | 2023-08-25 21:26:56 | at zio.ZIO.$anonfun$raceFirst$1(ZIO.scala:1368) |  
  |   | 2023-08-25 21:26:56 | at zio.ZIO.raceWith(ZIO.scala:1457) |  
  |   | 2023-08-25 21:26:56 | at zio.ZIO.raceWith$(ZIO.scala:1453) |  
  |   | 2023-08-25 21:26:56 | at zio.ZIO$OnSuccessAndFailure.raceWith(ZIO.scala:5788) |  
  |   | 2023-08-25 21:26:56 | at zio.ZIO.$anonfun$race$1(ZIO.scala:1290) |  
  |   | 2023-08-25 21:26:56 | at zio.ZIO$.$anonfun$fiberIdWith$1(ZIO.scala:3175) |  
  |   | 2023-08-25 21:26:56 | at zio.ZIO$.$anonfun$descriptorWith$1(ZIO.scala:3075) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1067) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:890) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1024) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:967) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:381) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.evaluateMessageWhileSuspended(FiberRuntime.scala:504) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.drainQueueOnCurrentThread(FiberRuntime.scala:220) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.FiberRuntime.run(FiberRuntime.scala:139) |  
  |   | 2023-08-25 21:26:56 | at zio.internal.ZScheduler$anon$4.run(ZScheduler.scala:476) |  
  |   | 2023-08-25 21:26:56 | Caused by: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection http-outgoing-4 [ACTIVE] |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.timeout(HttpAsyncRequestExecutor.java:387) |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:98) |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.impl.nio.client.InternalIODispatch.onTimeout(InternalIODispatch.java:40) |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.impl.nio.reactor.AbstractIODispatch.timeout(AbstractIODispatch.java:175) |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.impl.nio.reactor.BaseIOReactor.sessionTimedOut(BaseIOReactor.java:261) |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.impl.nio.reactor.AbstractIOReactor.timeoutCheck(AbstractIOReactor.java:506) |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.impl.nio.reactor.BaseIOReactor.validate(BaseIOReactor.java:211) |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:280) |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) |  
  |   | 2023-08-25 21:26:56 | at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) |  
  |   | 2023-08-25 21:26:56 | at java.base/java.lang.Thread.run(Thread.java:833)

Test stub should use correct `NoSuchKeyException` when file is not found.

Currently the test stubs for getObject and getObjectMetadata just throw S3Exception when a file for a specific key could not be found. This makes it hard to implement use cases that check for existence of an object first, i.e. to not transfer a lot of data.

The correct behavior is to throw a NoSuchKeyException.

0.3.3 not in Central

Hey, when trying to upgrade to 0.3.3 I noticed that the release went all through into sonatype releases (see https://oss.sonatype.org/service/local/repositories/releases/content/dev/zio/zio-s3_2.13/0.3.3/zio-s3_2.13-0.3.3.pom) but it's not yet in Maven Central https://repo1.maven.org/maven2/dev/zio/zio-s3_2.13/ (and it's more than 18hrs now)

I noticed that sonatype had a few issues over yesterday and today (see https://status.maven.org/)... so you may want to keep an eye on this and maybe even consider releasing again

listAllObjects with prefix specified returns unrelated objects when pagination is triggered.

Looks like we need to send .prefix(...) with each request when doing pagination on object listings.

Or else if there are more then maxKeys objects with certain prefix - we'll get all objects till the end of the bucket.
If there are less then maxKeys objects - all works fine, because the first request is sent with a correct prefix parameter. But those that are sent with getNextObjects are sent without a prefix.

Requirements :P

I have no idea what you are trying to accomplish here but I have a small ZIO-AWS wrapper myself, so let me list off what I have needed :)

  • The AWS SDK reads implicit credentials, which is essentially never what you want. Make sure a client is passed credentials
  • You can fetch S3 files onto disk or as an OutputStream. The latter is very useful for larger files
  • There is a Region class but you can make invalid instances of it, which will blow things up with runtime exceptions

I solved the Region issue like so:

def region(r: String): Option[Region] =
    Option(Region.of(r)).filter(Region.regions.contains(_))

It would be sweet if the S3 wrapper could stream me file contents line-by-line (i.e. for CSV or line-break separated JSONs).

I am getting bitten with a Graalvm-native image

Question:

I am doing this right?

Working with a Graalvm native image and after adding the dependency

      "dev.zio"                     %% "zio-s3"                   % ZioS3Version,

I started to noticed reflection issues in build time, therefore I had to flag my graalVMNativeImageOptionsin build.sbt

graalVMNativeImageOptions ++= Seq(
  "-H:ClassInitialization=io.netty.util.internal.logging.Log4JLogger:run_time,org.apache.log4j.Logger:build_time,org.slf4j.simple.SimpleLogger:build_time,org.slf4j.impl.StaticLoggerBinder:build_time,org.slf4j.LoggerFactory:build_time,io.netty.handler.ssl.ReferenceCountedOpenSslClientContext:run_time,io.netty.handler.ssl.ReferenceCountedOpenSslServerContext:run_time,io.netty.handler.ssl.ReferenceCountedOpenSslContext:run_time,io.netty.handler.ssl.JdkNpnApplicationProtocolNegotiator:run_time,io.netty.handler.ssl.ConscryptAlpnSslEngine:run_time,io.netty.handler.ssl.JettyAlpnSslEngine$ClientEngine:run_time,io.netty.handler.ssl.JettyAlpnSslEngine$ServerEngine:run_time,io.netty.handler.ssl.JettyNpnSslEngine:run_time,io.netty.handler.ssl.ReferenceCountedOpenSslEngine:run_time,io.netty.util.AbstractReferenceCounted:run_time",

Still unsuccessful 😣

2021-03-26 10:29:29  #8 159.9 [error] Error: Classes that should be initialized at run time got initialized during image building:
2021-03-26 10:29:29  #8 159.9 [error]  org.slf4j.LoggerFactory was unintentionally initialized at build time. To see why org.slf4j.LoggerFactory got initialized use --trace-class-initialization=org.slf4j.LoggerFactory
2021-03-26 10:29:29  #8 159.9 [error] com.oracle.svm.core.util.UserError$UserException: Classes that should be initialized at run time got initialized during image building:
2021-03-26 10:29:29  #8 159.9 [error]  org.slf4j.LoggerFactory was unintentionally initialized at build time. To see why org.slf4j.LoggerFactory got initialized use --trace-class-initialization=org.slf4j.LoggerFactory

My question is: Am I doing this right? I starting to be too painful to be the normal process. It feels like I need to configure a less obvious side-effect, to me at least, to make it work.

Aditional resources

Would appreciate any hint, cheers!

Stub layer's Any type for E is unexpected

While using this library's stub layer for writing specs, I noticed something odd.

def stub(path: ZPath): ZLayer[Blocking, Any, S3] =
    ZLayer.fromFunction(Test.connect(path))

The E type is Any, which is a widening from the E of ZLayer.fromFunction, which is Nothing - the function provided to fromFunction is meant to be total. The consequence of this is that it is impossible to eliminate the error type of the stub layer with .orDie (as we do for our other test layers). I see that your in your tests you use .mapError to transform the Any to TestFailure[Any], but that's not ergonomic for our code. I'm just curious about the motivation behind this design decision; maybe I'm just confused.

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.