Coder Social home page Coder Social logo

st-h / vertx-mongo-streams Goto Github PK

View Code? Open in Web Editor NEW
5.0 4.0 2.0 105 KB

Helpers to stream data between vert.x ReadStream (e.g. HttpServerFileUpload), WriteStream (e.g. HttpServerResponse) and MongoDB AsyncInputStream and AsyncOutputStream.

Java 100.00%
gridfs mongodb writestream readstream java groovy mongodb-async-driver vertx

vertx-mongo-streams's Introduction

Build Status codecov BCH compliance Maven Central

GridFS stream helper for Vert.x 3.x and MongoDB async driver in Java and Groovy

Helpers to stream data between vert.x ReadStream (e.g. HttpServerFileUpload), WriteStream (e.g. HttpServerResponse) and MongoDB AsyncInputStream and AsyncOutputStream.


Important

This library is no longer maintained and will not receive any updates because of the following reasons:

  • the mongodb driver will remove support for callback style invocations
  • we no longer use GridFS to store binary data

Prerequisites

  • Java 8
  • mongodb async driver >= 3.10.0
  • vert.x >= 3.4.0

Install

mvn:

 <groupId>com.github.st-h</groupId>
 <artifactId>vertx-mongo-streams</artifactId>
 <version>2.1.0</version>

gradle:

com.github.st-h:vertx-mongo-streams:2.1.0

Upgrading from 1.x

The GridFSInputStream factory method GridFSInputStream.create(vertx) now requires the vertx instance as an argument. This is needed to ensure that the drainHandler of the vert.x WriteStream is called within the correct context. When the constructor is invoked, the current vertx context is stored and will be restored when needed. Therefore one should not try to cache the GridFSInputStream instance. Please see this blog post for details about the vert.x context.

Usage

Since vert.x 3.4.0 usage within java and groovy is identical.

Upload

The GridFSInputStream allows to directly Pump data from a vert.x ReadStream (e.g. HttpServerFileUpload) to MongoDB AsyncInputStream.

Just create a new instance using the GridFSInputStream.create() method and use a Pump to transfer the data. Call the end() method when all data has been made available. The internal queue size can be changed using the setWriteQueueMaxSize() method.

This snippet creates a fully working http server that persists a file to GridFS:

import com.github.sth.vertx.mongo.streams.GridFSInputStream;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoDatabase;
import com.mongodb.async.client.gridfs.GridFSBucket;
import com.mongodb.async.client.gridfs.GridFSBuckets;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.http.HttpServer;
import io.vertx.core.streams.Pump;

public class UploadVerticle extends AbstractVerticle {

  private HttpServer httpServer;
  
  @Override
  public void start(Future fut) {

    // setup GridFS
    MongoDatabase db = MongoClients.create().getDatabase("test");
    GridFSBucket gridFSBucket = GridFSBuckets.create(db, "test-bucket");

    // setup the HttpServer
    httpServer = vertx.createHttpServer().requestHandler(request -> {

      request.setExpectMultipart(true);

      request.uploadHandler(fileUpload -> {

        // create a GridFSInputStream
        GridFSInputStream gridFSInputStream = GridFSInputStream.create(vertx);

        // when the upload has finished, notify the GridFSInputStream
        fileUpload.endHandler(endHandler -> gridFSInputStream.end());

        // just use a Pump to stream all the data
        Pump.pump(fileUpload, gridFSInputStream).start();

        gridFSBucket.uploadFromStream(fileUpload.filename(), gridFSInputStream, (id, t) -> {
          if (t != null) {
            // failed to persist
            request.response().setStatusCode(500).end();
          } else {
            // sucessfully persisted with ID: id
            request.response().end("uploaded: " + id);
          }
        });
      });
    }).listen(8080, res -> {
        if (res.succeeded()) {
          fut.complete();
        } else {
          fut.fail(res.cause());
        }
    });

  }

  @Override
  public void stop(Future fut) {
    httpServer.close( res -> fut.complete());
  }
}

Download

The GridFSOutputStream allows write to a vert.x WriteStream via the mongo drivers downloadToStream() method:

GridFSOutputStream outputStream = GridFSOutputStream.create(httpServerResponse)
gridFS.downloadToStream(objectId, outputStream, (bytesRead, t) -> {

    ...

})

Build

Integration Test

If you want to build this library yourself, the integration test requires a running mongodb on default port 27017. If you are using docker, you could just use the commands from .travis.yml to launch a docker container running a mongo instance and expose the default port:

docker pull mongo
docker run -d -p 127.0.0.1:27017:27017 mongo

Acknowledgments

Thanks to antofar for contributing an improved implementation of GridFSInputStream


vertx-mongo-streams's People

Contributors

antofar avatar st-h avatar steefaaaaan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

vertx-mongo-streams's Issues

GridFSInput/OutputStream

Steve, would you be willing to donate your GridFSInputStream and GridFSOutputStream classes to the official vertx3 mongo project? I've added gridfs support to the official project using your classes. If you are willing, I'll add attribution to you and this project in the official code. If you don't want to donate, I'll reimplement in a different way or think of an alternative. No pressure, just a request. Thanks for your consideration.

David Bush [email protected]

GridFSOutputStream

do you have an implementation also for reading files FROM gridfs into vert.x readstream? thanks.

Current implementation seems to writes incorrect data in multithreaded context

I wrote a concurrency test to validate issues with the old implementation, which also shows that current implementation seems to writes incorrect data. I added the old implementation with an added set of synchronized blocks and the concurrency test together to branch concurrency. The old implementation passes the test, whereas the new fails.

Visibility issue for internal queue

The mongo thread could see an inconsistent state of the internal queue while the writer thread is filling the queue.

@st-h
Hello, I have a PR ready for this. The refactoring is fairly intense as I fixed the concurrency issue as well as the boxing/unboxing garbage generation for the queue.
When working with large files that's a lot of objects created.
Do you want to have a look?

GridFSOutputStreamImpl can copy bytes beyond file boundary

When a file is larger than the default mongo buffer size (4194304 bytes), the buffer size is limited to the default.
Unless the file size is aligned to that value, the last segment will be less than the byte[] wrapped by the buffer. The current implementation will copy the whole array, that contains illegal bytes after the file boundary.
Reproducer

Callback is nullified incorrectly on recursive callback in GridFSInputStream

Stored call back could be nullified when recursive stack unrolls.
i.e. in a sequence of reads resulting in a call stack like:

storeCallback
doCallback
doCallback

When the recursion ends the bottom of the stack will nullify the callback stored by the top of the stack.

@st-h
Sorry about this, I had an integration test with mongo running in my codebase but probably I didn't run maven install on the final revision before the PR.

A fix is on the way.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.