Coder Social home page Coder Social logo

actor_backpressure's Introduction

Loading streaming data into a database. A backpressure story.

Laboratory work No2 at Real-Time Programming
University: Technical University of Moldova
Faculty: Software Engineering
Teacher: Burlacu Alexandru
Group: FAF -182
Student: Vizant Beatrice
Task: Loading streaming data into a database with Backpressure

Table of contents

Requirements

  • Copy the Dynamic Supervisor + Workers that compute the sentiment score and adapt this copy of the system to compute the Engagement Ratio per Tweet. Notice that some tweets are actually retweets and contain a special field retweet_status. You will have to extract it and treat it as a separate tweet. The Engagement Ratio will be computed as: (#favorites + #retweets) / #followers.
  • Workers now print sentiment scores, now they will have to send it to a dedicated aggregator actor where the sentiment score, the engagement ratio, and the original tweet will be merged together. Hint: you will need special ids to recombine everything properly because synchronous communication is forbidden.
  • Load everything to database, implement a backpressure mechanism called adaptive batching​​. Adaptive batching means that you write/send data in batches if the maximum batch size is reached, for example 128 elements, or the time is up, for example a window of 200ms is provided, whichever occurs first. This will be the responsibility of the sink actor(s).
  • Then split the tweet JSON into users and tweets and keep them in separate collections/tables in the DB.
  • Of course, don't forget about using actors and supervisors for your system to keep it running.

OPTIONAL:

  • Reactive pull backpressure mechanism between the aggregator actor and the sink.
  • Create an actor that would compute the Engagement Ratio per User.
  • Resumable/pausable transmission between the aggregator and the sink, that is, if sink or DB is unavailable, the aggregator will buffer the messages until they can be sent again.
  • Have a metrics endpoint to monitor the stats of the sink/DB controller service on ingested messages, average execution time, 75th, 90th, 95th percentile execution time, number of crashes per given time window, etc.
  • Anything else interesting and challenging you can think of.

Output example

alt-text alt-text alt-text alt-text

Explanation

First I optimized the previous actor system to work faster and better, by extracting and changing inner loops and rethink the function's behaviour. Then I created 2 classes DataWithAnalytics and DataWithId. DataWithId contains primary information for starting processing. Since this type of information is sufficiently typed and its used by multiple actors then we use it for primary transmission. For example to emotion score and ratio. If emotion score and ratio receive only one type of data but aggregator had a problem of receiving ambiguous type of data( data of different kind). From one side he receives data about tweet, from other side he receives data about ratio or emotion score or even about the user. Therefore, we have that the information that we get in the aggregator is heterogeneous.

Here are 2 possible way of solving this problem. First is creating a "master" class for example RecordWithId and extend from that class multiple children(eg. TweetWithID. NameWithId, UserWithId... etc.) this variant is not bad but it will complicate the structure of the program and can create problems of understanding of the program by some other programmer which will want to work with the code. I think it's unneeded "over-engineering" in this case, when you try to solve a simple problem by some complex or hard approach.

So I decided to make a class called DataAnalytics which contains all the information which might be used by aggregator altogether, but admitting that by different actors we can receive different messages which are not full-fledged (classes' object which don't contain all the information) in comparison to other data passed by Json we transmit not fully complete object but only fragments of this object. And in aggregator I combine these fragments into something integral. And I got the simplified version of program where I don't have big accumulation of classes and wherein didn't override big parts of code just for adaptation of specific situations, instead of adapting my view to the program - I adapted the program to my view.

In JsonBehaviour I had to extract more data by using nodes for this laboratory work which are used in ratio calculations for example. Then, inserted all the data received from Json to dataWithId and sent it to different actors for further process of that data. It is sent to actors which will calculate emotion score for example, tweet engagement ratio, user engagement ratio. And also in this class we have ready to be sent data to aggregator such as tweet and user.

// from this class we are already ready to transmit 2 fragments of data
    // regarding tweet and user directly to aggregator because they don't need
    // any additional processing
    DataWithAnalytics transmittableFragment = new DataWithAnalytics();
    transmittableFragment.setId(dataWithId.getId());
    // set the tweet which will be transmitted
    transmittableFragment.setTweet(dataWithId.getTweet());
    // set the user which will be transmitted
    transmittableFragment.setUser(dataWithId.getUser());

    // send the composed fragment to aggregator
    Supervisor.sendMessage("aggregator", transmittableFragment);

After this I added TweetEngagementRatio which receives the DataWithId which I talked about earlier, and calculates tweet's engagement ratio by the following formula: favorites+retweets/followers. And after calculation the result is put in the fragment and sent to aggregator as following:

try {
    // calculate the engagement ratio by formula favorites+retweets/followers
        tweetEngagementRatio = (dataWithId.getFavouritesCount() + dataWithId.getRetweetsCount()) / dataWithId.getRetweetFollowersCount();
    } catch (NullPointerException e) {
        // in case he is some cringe guy with retweets and favourites and nobody want to follow him.
        System.err.println("Can't calculate ratio -> 0 followers");
    }
    // Initialize new fragment and insert the engagement ratio 
    DataWithAnalytics transmittableFragment = new DataWithAnalytics();
    // get the incoming data id
    transmittableFragment.setId(dataWithId.getId());
    // append calculated ratio
    transmittableFragment.setEmotionRatio(tweetEngagementRatio);
    // send the fragment with tweet's ratio to aggregator
    Supervisor.sendMessage("aggregator", transmittableFragment);
    return true;

Then I added the UserEngagementRatio which is similar to the principles of tweet engagement ratio, but has other parameters and formula which is the following: followers-friends/statuses. I decided to make this formula like that because we will see the real user ratio by first seeing how many "real" followers user has except friends and then divide it by number of statuses to see the real impact and popularity. After calculations this little fragment with only user's ratio is sent to aggregator.
Now, about Aggregator, it receives and then combines all the fragments which arrive together by id and merge them in one chunk which will be sent to the sink. Code for this is commented and self-explanatory:

public boolean onReceive(Actor<DataWithAnalytics> self, DataWithAnalytics dataAnalyticsFragment) throws Exception {
    // check if there is such an entry in the local hashmap. If so, then the execution of the
    // code inside the if starts
    if (localHashMap.get(dataAnalyticsFragment.getId()) != null) {
        // since we have already checked and found such an entry with such an id in the hashmap, we pull it out
        // to perform operations on it
        DataWithAnalytics record = localHashMap.get(dataAnalyticsFragment.getId());
        // we check what data is in the transmitted fragment and transfer it to this record
        checkData(dataAnalyticsFragment, record);
        // then check the data for integrity, if it passes the check then it can be sent
        // to the sink and removed from local map
        if (record.checkForIntegrity()) {
            Supervisor.sendMessage("sink", record);
            localHashMap.remove(record);
        }
    } else {
        // else just create new record and place new incoming data
        DataWithAnalytics newRecord = new DataWithAnalytics();
        newRecord.setId(dataAnalyticsFragment.getId());
        checkData(dataAnalyticsFragment, newRecord);
        localHashMap.put(dataAnalyticsFragment.getId(), newRecord);
        }

Before creating the Sink I had to think first about database connection with MongoDB, so I created a class MongoUtility which establishes all the needed connections to database and its collections. It contains also a method called insertDataToDb() which update elements if are present or insert if there are no such records in db, first it inserts the desired fields to the "tweets" collection and then other fields regarding user data to the "users" collection such as id, user and user ratio. An example can be seen bellow(this method will later be called inside the Sink:

establishConnectionToCollection("tweets");
for (int i = 0; i < size; i++) {
    DataWithAnalytics currentRecord = dataRecords.get(i);
    Document tweetDoc = new Document();
    tweetDoc.put("id", currentRecord.getId());
    tweetDoc.put("tweet", currentRecord.getTweet());
    tweetDoc.put("emotionRatio", currentRecord.getEmotionRatio());
    tweetDoc.put("score", currentRecord.getEmotionScore());
    collection.insertOne(tweetDoc);
}

Now about Sink and the backpressure mechanism. It was required to send data in batches of specific size 128 or if the time is up (200ms), first idea was to put in a while loop the time-checking condition and within it to insert the condition for batch size, but it was very resource demanding and not the best approach. So I decided to insert a flag variable isSent. So now I have 2 separate optimized if-conditions one is checking for how many time has passed and other has just one check which is for if the time is up or the maximum batch size is reached.

if (isSent) {
    // get current time
    start = System.currentTimeMillis();
    // calculate 200 ms from starting of timer
    end = (long) (start + 0.2 * 1000);
    // set flag to false (the timer will finish)
    isSent = false;
}
// if the time is up or the maximum batch size is reached (whichever occurs first)
if (System.currentTimeMillis() >= end || recordsToDB.size() >= BATCH_SIZE) {
    // insert records to DB
    mongoUtility.insertDataToDB(recordsToDB);
    // create a list to store other records
    recordsToDB = new ArrayList<>();
    // set flag to true
    isSent = true;
}

Technologies

Java 11 and Maven for Jackson and MongoDB dependencies

Status

Project Status finished

actor_backpressure's People

Contributors

divinebee avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.