Coder Social home page Coder Social logo

alibaba / table-computing Goto Github PK

View Code? Open in Web Editor NEW
35.0 6.0 5.0 349 KB

Table-Computing (Simplified as TC) is a high performance and low latency computing framework, 10x faster than Flink for complicated use cases, distributed and light weighted, relational operation, simple to use, write less and do more.

License: Apache License 2.0

Java 100.00%
java big-data tc table-computing stream-processing data-analysis

table-computing's Introduction

Table-Computing

Welcome to the Table-Computing GitHub.

Table-Computing (Simplified as TC) is a distributed light weighted, high performance and low latency stream processing and data analysis framework. Relational operation, simple to use, write less and do more. From our using experience TC can achieve milliseconds latency and 10+ times faster than Flink for complicated use cases. For the same streaming task we use TC achieved 10+ times computing resource saving.

Why we develop this framework

Relational operation is an effective tool to process and analyze data, SQL is a widely used implementation of relational operation. But SQL is not Turing-compete, we need UDF/Stored-procedure/UDAF/UDTF etc. to solve complicated business scenario. If we need complicated WHERE criteria, JOIN criteria, a new Scalar Function, Transform Function, Aggregation Function, Window Function etc. we cannot use SQL easily do this. SQL is also not very efficient for complicated case, whether SQL can high-powered execute depend on the SQL plan optimizer has optimized the use case which we are using in the complicated business scenario. But more complicated scenario more difficult to guarantee every SQL use case had been optimized by the optimizer. Besides SQL that we can also use Flink DataStream/DataSet but we need very long code to implement a complex data processing task and we also need to design the Execution-graph this is a complex art we need compound the operator or disjoint them then observe whether the adjusted graph is more efficient and the task delay is acceptable, if not where is the bottleneck of this Execution-graph and how to resolve. Think that complex task usually include dozens of operators which have lots of combinations, trying those maybe-efficient combinations is a heavy work.

Example

Computes the last hour top 100 sales volume ranking list every half hour

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>table-computing</artifactId>
    <version>1.0.0</version>
</dependency>
MysqlDimensionTable mysqlDimensionTable = new MysqlDimensionTable("jdbc:mysql://localhost:3306/e-commerce",
        "commodity",
        "userName",
        "password",
        Duration.ofHours(1),
        new ColumnTypeBuilder()
        .column("id", Type.INT)
        .column("name", Type.VARCHAR)
        .column("price", Type.INT)
        .build(),
        "id"
        );

Map<String, Type> columnTypeMap = new ColumnTypeBuilder()
        .column("__time__", Type.BIGINT)
        .column("id", Type.BIGINT)
        .column("commodity_id", Type.INT)
        .column("count", Type.INT)
        .build();

KafkaStreamTable kafkaStreamTable = new KafkaStreamTable(bootstrapServers,
        "consumerGroupId",
        topic,
        0,
        columnTypeMap);
kafkaStreamTable.start();

StreamProcessing sp = new StreamProcessing();
String[] hashBy = new String[]{"commodity_id"};
Rehash rehashForSlideWindow = sp.rehash("uniqueNameForSlideWindow", hashBy);
String[] returnedColumns = new String[]{"commodity_id",
        "sales_volume",
        "saleroom",
        "window_start"};
SlideWindow slideWindow = new SlideWindow(Duration.ofHours(1),
        Duration.ofMinutes(30),
        hashBy,
        "__time__",
        new AggTimeWindowFunction() {
            @Override
            public Comparable[] agg(List<Comparable> partitionByColumns, List<Row> rows, long windowStart, long windowEnd) {
                return new Comparable[]{
                        partitionByColumns.get(0),
                        AggregationUtil.sumInt(rows, "count"),
                        AggregationUtil.sumInt(rows, "total_price"),
                        windowStart
                };
            }
        }, returnedColumns);
slideWindow.setWatermark(Duration.ofSeconds(2));

hashBy = new String[]{"window_start"};
Rehash rehashForSessionWindow = sp.rehash("uniqueNameForSessionWindow", hashBy);
SessionWindow sessionWindow = new SessionWindow(Duration.ofSeconds(1),
        hashBy,
        "window_start",
        new TimeWindowFunction() {
            @Override
            public List<Comparable[]> transform(List<Comparable> partitionByColumns, List<Row> rows, long windowStart, long windowEnd) {
                int[] top100 = WindowUtil.topN(rows, "sales_volume", 100);
                List<Comparable[]> ret = new ArrayList<>(100);
                for (int i = 0; i < top100.length; i++) {
                    ret.add(rows.get(top100[i]).getAll());
                }
                return ret;
            }
        }, returnedColumns);
sessionWindow.setWatermark(Duration.ofSeconds(3));

sp.compute(new Compute() {
    @Override
    public void compute(int myThreadIndex) throws InterruptedException {
        Table table = kafkaStreamTable.consume();
        TableIndex tableIndex = mysqlDimensionTable.curTable();
        table = table.leftJoin(tableIndex.getTable(), new JoinCriteria() {
            @Override
            public List<Integer> theOtherRows(Row thisRow) {
                // Use tableIndex.getRows but not mysqlDimensionTable.curTable().getRows. Consider the second
                // mysqlDimensionTable.curTable() may correspond to the newly reloaded dimension table which
                // is not consistent with the first mysqlDimensionTable.curTable() and tableIndex.getTable()
                return tableIndex.getRows(thisRow.getInteger("commodity_id"));
            }},
            new As().
                as("id", "order_id").
                build(),
            new As().
                as("name", "commodity_name").
                as("price", "commodity_price").
                build());
        List<Table> tables = rehashForSlideWindow.rehash(table, myThreadIndex);
        table = slideWindow.slide(tables);
        tables = rehashForSessionWindow.rehash(table, myThreadIndex);
        table = sessionWindow.session(tables);
        if (table.size() > 0) {
            table.print();
            //you can elegantly finish the streaming task when terminate condition is satisfied
            Thread.currentThread().interrupt();
        }
    }
});

Distributed deploy your table-computing task:

java -Xmx100g -XX:MaxDirectMemorySize=500g -Dself=localhost:8888 -Dall=localhost:8888,localhost:9999 -jar my_task.jar

java -Xmx100g -XX:MaxDirectMemorySize=500g -Dself=localhost:9999 -Dall=localhost:8888,localhost:9999 -jar my_task.jar

Optimize:

  1. Use only 1 thread concurrency to test the 1 thread throughput, then use upstream data volume divide 1 thread throughput to get the StreamProcessing concurrent thread number. The thread number should not be too large since thread race will lead to unnecessary resource consumption which maybe give rise to OOM (no enough CPU time to release the unused memory)
  2. -Xmx parameter should be appropriate. Since the table data are all store on the off-heap memory to improve performance too large -Xmx will lead to belatedly memory release which may give rise to OOM, while too small -Xmx will lead to too frequently GC to reduce the throughput.
  3. Not only the old GC stop the world young GC also stop the world transiently, more threads means more garbage generation means more GC means more often STW means thread CPU usage cannot be raised, you may find use more StreamProcessing thread cannot increase the throughput now you should start a new JVM (can be on the same machine use localhost:anotherPort). Actually use N thread can not get N times throughput you may need start a new JVM, you can also use top -H -p pid to see whether the compute-X named threads CPU usage approximate 100% to make your decision.

Notice:

  1. For no continuous data case the AbstractStreamTable will return an empty table after sleep 100ms (default) to trigger computing, else the watermark data/window data/rehashed or rebalanced to other server/thread data will never be computed
  2. Reading dimension table thread will block until the dimension table finished loading

Copyright and License

Table-Computing is provided under the Apache-2.0 license.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.