Coder Social home page Coder Social logo

wepay / waltz Goto Github PK

View Code? Open in Web Editor NEW
410.0 28.0 35.0 2.27 MB

Waltz is a quorum-based distributed write-ahead log for replicating transactions

Home Page: https://wepay.github.io/waltz/

License: Apache License 2.0

Shell 0.86% XSLT 0.33% Java 92.86% Python 5.68% JavaScript 0.25% CSS 0.01% HTML 0.02%
database write-ahead-log log distributed-systems transactional multi-region quorum distributed-ledger

waltz's Introduction

Waltz

Waltz is a distributed/replicated write ahead log for transactions.

Documentation

We have the documention here.

Building

To build Waltz:

./gradlew clean build

Testing

To run a single test class:

./gradlew clean :waltz-storage:test -Dtest.single=StorageClientTest

To run a single test method:

./gradlew clean :waltz-storage:test --tests com.wepay.waltz.storage.client.StorageClientTest.testBasicReadWrite

Smoke test

Waltz also comes with a smoke test that starts:

  • 1 ZooKeeper server
  • 2 Waltz clients
  • 3 Waltz servers
  • 3 Waltz storage nodes

It then sends 1 million transactions while turning the servers on and off at random. Afterwards, it validates that all transactions were received, and that the checksums of the data are identical. It also logs throughput and latency.

To run smoke tests:

bin/smoketest.sh

The smoke test output looks like:

0001[... .*. *]
0002[... .** *]
0003[... *** *]
0004[..* *** *]
0005[*.* *** *]
0006[*** *** *] --..-..-..-..-+..-..-+..-..-..-..-+..-..-..-..-..-..-+..+
0007[.** *** *] +++-..-..-..-
0008[*** *** *] +..-..-
0009[*** .** *]
0010[*.* .** *] +++..-..-..-..-..-..+-
0011[*.* *** *] ..
0012[*** *** *] +-..-..-..-+..-+..-..-..-..+-..-

Each state change results in a new line. The first four digits indicate the number of server/storage start/stops that have been triggered so far. The [... ... .] portion indicates the state of the server nodes, storage nodes and the zookeeper server, where * means that the process is up, and . means that it's down. Lastly, the trailing part of the line indicates 1000 writes (-), 1000 reads (.), and retries (+).

A log file (smoketest.log) can be found in your home directory.

Demo app

Waltz comes with a demo app that shows an example account balance database built on top of Waltz.

Start a test Waltz cluster in docker environment.

bin/test-cluster.sh start

Start a MySQL instance in docker environment.

bin/demo-bank-db.sh start

Run demo application with simple self-explanatory commands.

bin/demo-bank-app.sh

Run Waltz in Docker

We implemented shell scripts to run a Waltz cluster in local Docker containers for testing purpose.

Creating the Docker images

./gradlew distDocker

This builds the Docker images.

Starting test cluster

bin/test-cluster.sh start <- Start default cluster
bin/test-cluster.sh start <cluster_name> <- Start stopped cluster that is been already created
bin/test-cluster.sh start <cluster_name> <base_server_port> <base_storage_port> <- Start new cluster of one storage & server node running on provided ports

This creates a user defined docker network waltz-network and starts three container, a zookeeper server, a waltz storage node, and a waltz server node in waltz-network.

Zookeeper port is 2181 inside waltz-network and exposed to the host machine at 42181. So, if you want to run a Waltz application outside of waltz-network, use yourHostName:42181 for zookeeper.connectString. The cluster's root ZNode is /waltz_cluster. So specify this as cluster.root.

If the Docker images are not built yet, this script builds them. However, it doesn't automatically build a new images even when the source code is modified. You must rebuild images using distDocker gradle task.

Stopping waltz test cluster

bin/test-cluster.sh stop <- stop all created clusters
bin/test-cluster.sh stop <cluster_name> <- stop cluster with the given cluster name

This stops waltz containers. You can resume the cluster using start <cluster_name> command. All data in zookeeper and storages are preserved.

Destroying the test cluster

bin/test-cluster.sh clean <- This will remove all waltz containers including zookeeper, thus removes all data.
bin/test-cluster.sh clean <cluster_name> <- This will remove all two containers belonging to the same cluster. Zookeeper stays intact.

Restarting the test cluster

bin/test-cluster.sh restart <- This will restart all waltz containers and regenerate config files (equivalent to stop, start).
bin/test-cluster.sh restart <cluster_name> <- This will restart all two containers belonging to the same cluster and regenerate config files.

Setting up the test cluster with multiple partitions.

The test cluster is set up with a single partition by default. If you want to create the test cluster with multiple partitions, the environment variable WALTZ_TEST_CLUSTER_NUM_PARTITIONS to the desired number of partitions before creating the test cluster. For example, to create the test cluster with five partitions, do the following.

export WALTZ_TEST_CLUSTER_NUM_PARTITIONS=5
bin/test-cluster.sh start

Running DemoBankApp

First create a database. The following command will create a mysql container and the database.

bin/demo-bank-db.sh start

Then, start the demo application:

bin/demo-bank-app.sh

To stop the MySQL instance:

bin/demo-bank-db.sh stop

To remove the database:

bin/demo-bank-app.sh clean

Publishing Waltz Docs

Go through website/README.md

waltz's People

Contributors

abhipardhu avatar biplap-sarkar avatar criccomini avatar hans3q avatar hrdlotom avatar kination avatar patrickghannage avatar smithakoduri avatar stoynov96 avatar whynick1 avatar wrp avatar ymatsuda avatar zzlbuaa avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

waltz's Issues

Waltz IntegrationHelper should be more generic

Waltz IntegrationHelper currently makes some assumptions about a cluster setup.

i.e.

this.storageGroupMap = Utils.map(host + ":" + storagePort, STORAGE_GROUP_ID);

ZooKeeperCli.Create.createCluster(zkClient, root, "test cluster", numPartitions);
ZooKeeperCli.Create.createStores(zkClient, root, numPartitions, storageGroupMap);

The helper instance, upon starting ZK, creates znode metadata that implies there is only one storage node. A few other helper methods also assume there is only one waltz server/storage node (i.e. getServerPort(), getStoragePort(), etc.). It should be made more configurable such that the number of server/storage nodes is not presumed.

Fix a bug caused by race condition in Server's Partition.java code

Server's Partition -> Partition.java in waltz-server code.

Gist: If a Mount/Flush request comes to WaltzServer.Partition that is being closed, it is possible that the RequestThread gets stuck in a loop with no progress.

This is caused if the order of execution of RequestThread and ClosingThread is the following:

  1. A Server.Partition is being closed, but not yet closed. (Thread1). Thread1 is switched out before it can start the close operation.
  2. At around the same time, a MountRequest/FlushRequest is sent to WaltzServer for that partition. (Thread2)
  3. Thread2 will execute this Code and see that the Partition is still running, all the other relevant checks will pass.
  4. Thread2 will go ahead and call appendTask#flush.
  5. At this point, Thread2 is switched out and Thread1 starts executing.
  6. It will close the partition, appendTask, underlying appendQueue among other things.
  7. Thread1 has completed its work.
  8. Thread2 will start executing now and executes this Code
  9. Since the underlying queue is closed, it will return false
  10. Thread2 will keep looping without any progress.

Implement consumer groups in Waltz

Waltz does not currently support consumer groups. Every application service receives every transaction ID, and they have to divvy up who processes which message (if they all share the same backing DB).

It would be nice to provide a convenience mechanism somewhere to allow consumers to filter out who gets what transaction IDs.

Should start with a spike to decide how to implement this.

[Question] Testing speed

Hello,
I'm running on build with test, but seems it takes very long time...:

$ ./gradlew clean build
Welcome to Gradle 5.6.1!
...
...
Starting a Gradle Daemon (subsequent builds will be faster)
<===========--> 86% EXECUTING [45m 41s]
> IDLE
> IDLE
> IDLE
> :waltz-tools:test > Executing test com.wepay.waltz.tools.storage.StorageCliTest
> IDLE
> IDLE
> IDLE
> :waltz-tools:test > 32 tests completed

Is it okay?

Enable ClientCli to read partition's high-watermark

To validate if cluster can recovery from chaos environment, we need a way to validate cluster before and after torture.

Currently, it is achievable by using --high-watermark on ClientCli.Validate command. 
Consider single partition scenario, this is how we do it.

  1. we run validate command with a totally 100 transactions
  2. torture service nodes
  3. run validate command with a totally 100 transactions, with --high-watermark=99, which indicates client is expecting 100+100=200 callbacks

Obviously, this is not ideally. First, we need to calculate cumulated transactions in the past every time we want to use --high-watermark. Second, if validation failed in the middle, we cannot tell what's the current high watermark (for the next run).

Talked to Chris Riccomini, as an alternative, we can read "current high watermark" (server one, not storage one) from server or metrics. This way, validate command would always work without asking user for current high watermark. 

Now, the question is, how ClientCli suppose to know "current high watermark".
We can add a HighWaterMarkRequest to allow WaltzClient retrieve currently high watermark (committed) of given partition, which simplifies the implementation of ClientCli.Validate.

Waltz performance tests in Ducktape

We need to add benchmark test for waltz with ducktape. So that we can

  1. test Waltz producer/consumer performance with different deployment (multi-region)
  2. detect performance degradation
    Expecting performance report, like:
####################### PERFORMANCE REPORT #######################
Appended 1000000 transactions, with total 488.2813 MB, in 1569.85 secs
Transaction/sec: 637.0052
MB/sec: 0.3110
Retry/Transaction: 0.0000
milliSec/Transaction: 93.6697
##################################################################

Questions about the blog post

I just read this blog and enjoyed reading it! The length and the technical detail was just right.

I have a few questions:

In Stream Oriented Processing you mention using kafka to let down-stream consumers observe state changes. This enables producers of data and consumers of data to be logically separate. Service X creates data and publishes on kafka, Service Y consumes on kafka and writes to materialized view.

The diagram in the blog shows an application that writes to Waltz and then later writes to a materialized view. Does Waltz allow those responsibilities to be separate like in my example with kafka above? E.g. Service X writes to the log, Service Y consumes changes(??) from the log and updates a materialized view.


In the Limitations and Requirements:

We require an application to have a transactional data storage like an SQL database. The database acts as a materialized view of Waltz transaction log.

Is the transactional storage an actual requirement? I thought that Waltz does not care about the views (that's the application's job). Why could I not create a materialized view with MongoDB or ElasticSearch?


Distributed KV-stores are all the buzz lately. Does Waltz take an existing KV db off-the-shelf, or did your team roll your own? I initally thought you were using Zookkeeper, but you say it's only used for leader election and cluster stuff.


I would love to see more technical posts on this technology. Will Waltz get a Jepsen test?

Add ability to bootstrap messages based on message type

When a Waltz log is shared amongst several applications, it might be useful for some applications to ignore messages that they don't care about. One way to ignore irrelevant message types would be to do server-side filtering, where the Waltz server or storage node drops unwanted messages. This is useful in a case where the application cares about messages across the whole timeline of the log.

A second use case is when the application is cares about only a subset of messages that might or might not be distributed all the way back to the beginning of the log. In such a case, it would be useful to have the application request the lowest watermark for the union of all the message types, so that the application can start reading from that point.

For example, suppose we have a Waltz log that's 2 years old. Four months ago, a new message type was added. A new application is being written that pays attention only to the new events. Having it fetch the low watermark for the new message type means that it can start reading from exactly the point in the log where the first message type first occurs, and allows the application to skip the vast majority of the irrelevant messages.

Note that this will have to be per-partition.

SPIKE: Waltz admin UI

Waltz needs an admin UI. UI should initially be read-only, and should allow us to visualize the state of the cluster. Storage nodes, partitions, latency, offline/online reads, etc.

We should do this work once the admin CLI tools are completed.

This ticket is to spec out what an M0 Waltz admin UI would look like.

Add Waltz metrics documentation to wiki

We should agree and document on naming convention of metrics before proceeding further.

I think using a class name as a part of a metric name is expressively long and not so useful for monitoring. I'd rather have names like "waltz-server" and "waltz-storage" as top level names. We can map these to domain names in JMX. The second level may be "partition-", which may be mapped to a type name in JMX. I am not sure if it is useful to add metric type name ("counter", "meter", "gauge") at the end of the metric name.

So, my preference is something like "waltz-server.partition-1.high-water-mark". It appears in console like this:

·waltz-server
        ·partition-1
                ·high-water-mark

Waltz commands should be idempotent

Verifying cluster does not already exist...
Cluster already exists. Aborting.
[20:43:44 17560 williamp@poc-waltz01]$ java com.wepay.waltz.tools.zk.ZooKeeperCli add-storage-node --zookeeper 127.0.0.1:2181 --root /wrptest --storage 127.0.0.1:13000 --group  0
[success] Storage node added!

[20:47:57 17560 williamp@poc-waltz01]$ java com.wepay.waltz.tools.zk.ZooKeeperCli add-storage-node --zookeeper 127.0.0.1:2181 --root /wrptest --storage 127.0.0.1:13000 --group  0
[failed] Storage node already exists.

[20:48:11 17560 williamp@poc-waltz01]$ echo $?
0

If a cluster already exists, an attempt to create it should succeed.
Similarly, an attempt to add a storage node that already exists should succeed. (In both cases, this is assuming that the currently existing cluster and the existing storage node are in the same state they would be in if they were now being initially created. I believe both of these commands are just manipulating znodes, so I think this is the case).

In the example above, we see the message '[failed] Storage node already exists', but a return value of 0 is given. This is inconsistent. If something returns 0, then by definition it has been successful. So the message indicating failure is just confusing.

Add onApplication callback method to TransactionContext

TransactionContext has onCompletion method which invoked when the transaction is persisted to Waltz. This doesn't mean the transaction is applied to the application database. If the application is submitting a series of transactions that uses the same write lock, it is better for the application to wait for the application to database rather than completion of write to Waltz, or the lock conflicts force retries. Therefore, we want onApplication callback which is invoked only after the transaction is applied to the application database.

Sub-task: Update Waltz storage to store min offset for every message type

In order to support #13, the Waltz storage node will need to keep the minimum offset for each message type on disk. The storage node needs to be updated to support this.

One place to store this might be the control file. I'm not sure if this is the best approach, as the control file is shared across all partitions, and this will be a partition-level setting. Still, the control file has all of the features that we need.

Eliminate Waltz admin protocol

Because we decide to add 32 (or 64) bit Flags to distinguish a regular request from an admin request, like

public HelloRequest(String greeting, Flags flags) {
 this.greeting = greeting;
 this.flags = flags;
}

admin protocol is not needed anymore. So, we want to

  1. eliminate Waltz admin handler and admin port
  2. instead use Flags to replace existing admin related request.

Add a prefetch option to Waltz client

Based on PerformanceCli test-consumers performance test, we believe most of the latency in inter-region performance tests are coming from the network hop between regions. The idea is to have the Waltz client add a shouldPrefetch function to the Waltz client requests that will allow the Waltz client internals to evaluate whether a record should be fetched immediately, rather than waiting for the application to call getTransactionData(). Then, when the application does call getTransactionData(), the request is short circuited, and the cached result is returned.

Update zktools/Waltz to assign server partition ownership

zktools needs to be updated to add a new CLI command to set server:partition assignments. Currently, the PartitionAssignmentPolicy dictates what the partition assignments for each server are. We need to add a way to force the assignment in a specific way.

This ticket also requires us to add a new command in ZooKeeperCli in Waltz. The new command should be:

assign --partition=[partition_id] --server=[server_id]
unassign --partition=[partition_id] --server=[server_id]

Add MAX_TRANSACTION_ID_REQUEST to Waltz admin channel

A StorageCli.HighWaterMark command would be helpful to integration test, like to check if a replica caught up with others.

We can add MAX_TRANSACTION_ID_REQUEST to admin channel, and then create a CLI to make the request.

waltz uber jar contains SNAPSHOT version of jetty

Using waltz-0.4.2, when I start a server I see logs like:

[2019-10-12 06:45:49,812] 838 [main] INFO org.eclipse.jetty.server.Server - jetty-9.4.z-SNAPSHOT; built: 2018-08-30T13:59:14.071Z; git: 27208684755d94a92186989f695db2d7b21ebc51; jvm 1.8.0_171-b11

This suggests that the build is using a non-release version of jetty

Add integration test to cover offline recovery

We should add integration test to cover offline recovery, including scenarios of:

  1. scale up a new replica with recovery command=
  2. recover an offline replica should success
  3. offline replica set back online should recover

To make the test easier, we need to find a way to read replica's max transaction Id. So, let's do #3 first.

SPIKE: Design doc for auditing tool for Waltz

We will need an auditing tool for Waltz to validate that no transactions are ever lost. Ideally, this tool will run continuously, and immediately alert us to any lost transactions.

@ymatsuda, have you thought about any strategies for this? It seems loosely similar to some of the auditing work for Kafka.

I think that there are two things that need to be audited: 1) that a transaction wasn't lost before it was even persisted to Waltz (i.e. client thinks it was persisted, but it wasn't successfully), and 2) that a transaction was persisted to Waltz, but later disappeared for some reason.

Expose highWatermark API call in Waltz server protocol

Waltz server has a high watermark for each partition. This is currently not exposed via the API, though. It would be useful to do so, so that applications can decide to consume not from the beginning of a Waltz log, but from the end if they know they don't care about the older messages.

This came up when running Waltz performance tests, and noticing that every execution as taking longer than the one before it. This is because the test had to re-read every message during mount, since it's always mounting from the beginning of the log. It would be better to make it start from the end, so it can run its tests immediately, since it doesn't care about past messages, but this can't be done because the Waltz server doesn't expose the high watermark that the test client would want to use.

Make Waltz server StoreSessionImpl.BATCH_SIZE configurable

Based on some ad hoc performance tests, Yasuhiro Matsuda suggested that we might be able to increase producer throughput by increasing the StoreSessionImpl.BATCH_SIZE. This is currently hard coded, and we should make it configurable.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.