Coder Social home page Coder Social logo

delta-plus's Introduction

Delta Plus

A library based on delta for Spark and MLSQL.

Requirements

This library requires Spark 2.4+ (tested) and Delta 0.4.0.

Linking

You can link against this library in your program at the following coordinates:

Scala 2.11

groupId: tech.mlsql
artifactId: delta-plus_2.11
version: 0.2.0-SNAPSHOT

Limitation

  1. Compaction can not be applied to delta table which will be operated by upsert/delete action.

Binlog Replay Support

To incremental sync MySQL table to Delta Lake, you should combine delta-plus with project spark-binlog.

DataFrame:

val spark: SparkSession = ???

val df = spark.readStream.
format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
option("host","127.0.0.1").
option("port","3306").
option("userName","xxxxx").
option("password","xxxxx").
option("databaseNamePattern","mlsql_console").
option("tableNamePattern","script_file").
option("bingLogNamePrefix","mysql-bin")
optioin("binlogIndex","4").
optioin("binlogFileOffset","4").
load()


df.writeStream.
format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").  
option("__path__","/tmp/sync/tables").
option("mode","Append").
option("idCols","id").
option("duration","5").
option("syncType","binlog").
option("checkpointLocation","/tmp/cpl-binlog2").
option("path","{db}/{table}").
.outputmode(OutputMode.Append)...

MLSQL Code:

set streamName="binlog";

load binlog.`` where 
host="127.0.0.1"
and port="3306"
and userName="xxxx"
and password="xxxxxx"
and bingLogNamePrefix="mysql-bin"
and binlogIndex="4"
and binlogFileOffset="4"
and databaseNamePattern="mlsql_console"
and tableNamePattern="script_file"
as table1;

save append table1  
as rate.`mysql_{db}.{table}` 
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";

Before you run the streaming application, make sure you have fully sync the table :

connect jdbc where
 url="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
 and driver="com.mysql.jdbc.Driver"
 and user="xxxxx"
 and password="xxxx"
 as db_cool;
 
load jdbc.`db_cool.script_file`  as script_file;

run script_file as TableRepartition.`` where partitionNum="2" and partitionType="range" and partitionCols="id"
as rep_script_file;

save overwrite rep_script_file as delta.`mysql_mlsql_console.script_file` ;

load delta.`mysql_mlsql_console.script_file`  as output;

Upsert/Delete Support

DataFrame:

df.writeStream.
format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
option("idCols","id"). // this means will execute upsert
option("operation","delete"). // this means will delete  data in df
.mode(OutputMode.Append).save("/tmp/delta-table1")

df.readStream.format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").load("/tmp/delta-table1")

when idCols and operation is not configured, then we will execute normal Append/Overwrite operation. If you have idCols setup, then it will execute Upsert operation. If you have idCols, operation both setup and operation equal to delete, then it will delete table records in df.

Notice that if the data which will be written to the delta table have duplicate records, delta-plus will throw exception by default. If you wanna do deduplicating, set dropDuplicate as true.

MLSQL:

save append table1  
as rate.`mysql_{db}.{table}` 
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";

CompactionSupport

DataFrame:

val optimizeTableInDelta = CompactTableInDelta(log,
            new DeltaOptions(Map[String, String](), df.sparkSession.sessionState.conf), Seq(), Map(
              CompactTableInDelta.COMPACT_VERSION_OPTION -> "8",
              CompactTableInDelta.COMPACT_NUM_FILE_PER_DIR -> "1",
              CompactTableInDelta.COMPACT_RETRY_TIMES_FOR_LOCK -> "60"
            ))
val items = optimizeTableInDelta.run(df.sparkSession)

MLSQL:

-- compact table1 files before version 10, and make 
-- sure every partition only have one file
!delta compact /delta/table1 10 1;

You can use !delta history /delta/table1; to get the history of the table.

delta-plus's People

Contributors

allwefantasy avatar

Watchers

James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.