Coder Social home page Coder Social logo

Comments (6)

BenASchwartz avatar BenASchwartz commented on August 23, 2024

The ITransformer, IBuffer, IFilter, and IEmitter interfaces are designed to allow for custom record processing methods.

If the records are different data types, one approach is to wrap all records in a class that can identify the record type before putting them into the Kinesis stream. In the ITransformer, the records will be transformed to this superclass and passed through the IFilter. The IFilter can filter records based on the record type field in the class. Next, the IBuffer can store these records and even perform aggregate operations on them as they are stored. Finally, the IEmitter can take your implementation of IBuffer and perform separate operations on each of the record types.

A simple example would be if you have two types of records coming through your Kinesis stream: A and B. We want to store records of type A in a DynamoDB table TableA and store the records of type B in DynamoDB table TableB. First, write an implementation of IBuffer that stores A and B records in two separate buffers. Then override DynamoDBEmitter's emit method to access the separate buffers and perform a BatchRequest on both tables.

from amazon-kinesis-connectors.

harlow avatar harlow commented on August 23, 2024

@BenASchwartz thanks for the detailed write up.

If the records are different data types, one approach is to wrap all records in a class that can identify the record type before putting them into the Kinesis stream. In the ITransformer, the records will be transformed to this superclass and passed through the IFilter.

Can you give an example of what that JSON would look like?

Would you suggest a node in the JSON structure:

{
   "employee" : { 
      "name" : "Bob Smith",
      "id": 1,
      "phone" : "613-555-5555"
    }
}

Or having a structure with type and attributes

{
  "type": "employee",
  "attributes" : { 
     "name" : "Bob Smith",
     "id": 1,
     "phone" : "613-555-5555"
   }
}

Then in the Java pipeline we could do something like this?

@Override
public ITransformer<KinesisMessageModel, byte[]> getTransformer(KinesisConnectorConfiguration configuration) {
  // somehow grab message type
  // or is there a way to infer the Class from the JSON directly?
  val className = jsonNode.get("type").asText()
  return Class.forName(className)
}

Any help would be appreciated.

from amazon-kinesis-connectors.

BenASchwartz avatar BenASchwartz commented on August 23, 2024

Every incoming record must have the same wrapping class so that it can be processed by the same ITransformer. For example if you have two types of records: employees and customers you could wrap them in a person object or even something more general. Since the ITransformer implementation must be aware of the data types, it makes sense to serialize the objects as JSON blobs directly from the class.

{
    "person" : {
        "customer" : {
           "name" : "Bob Smith",
           "customerId" : 1,
           "phone" : "613-555-5555"
        }
    }  
}
{
    "person" : {
        "employee" : {
           "name" : "Jane Johnson",
           "employeeId" : 1,
           "phone" : "613-555-5555"
        }
    }  
}

That way every message can be deserialized as a Person object. The Person class could look like this:

public class Person{
  private Employee emp;
  private Customer cust;

  // Getters and Setters for emp and cust
  public Employee getEmployee(){ return emp; }
  public Customer getCustomer(){ return cust; }
  public void setEmployee(Employee emp) { this.emp = emp; }
  public void setCustomer(Customer cust) { this.cust = cust; }

  // useful for processing
  public boolean isEmployee() { return emp != null; }
  public boolean isCustomer() { return cust != null; }
}

And you can process the messages like this:
in ITransformer:

public Person toClass(Record record){
    Person person = deserialize(record); // deserialize can be implemented with any JSON library
    if(person.isEmployee()){
       // Do something for employees
    }
    if(person.isCustomer()){
      // Do something for customers
    }
}

in IEmitter:

List<Person> emit(UnmodifiableBuffer<Person> buffer) throws IOException{
   for(Person p: buffer){
     if(person.isEmployee()){
       // Do something for employees
    }
    if(person.isCustomer()){
      // Do something for customers
    }
  }
}

from amazon-kinesis-connectors.

harlow avatar harlow commented on August 23, 2024

This is great. 👍 thanks for explaining in such detail.

from amazon-kinesis-connectors.

Diamonds0a avatar Diamonds0a commented on August 23, 2024

Hi, sorry for posting on an old thread.

I am looking through the redshiftbasic sample, and I wanted to ask how this applies to that sample.

Going with what you described, I want to put a record into either tableA or tableB.

Because s3 is used as an intermediary, I need to sort records into two files. How would I partition my buffer to do that?

from amazon-kinesis-connectors.

harlow avatar harlow commented on August 23, 2024

@Diamonds0a I spiked out a prototype of splitting JSON events into several files on S3.

Here is the core of it:
https://github.com/harlow/event-stream/blob/master/src/com/eventstream/S3EventEmitter.java#L48-L97

The README has a few more details on how the JSON payloads should be structured:
https://github.com/harlow/event-stream

I haven't written the Redshift Load part of it yet.

from amazon-kinesis-connectors.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.