Coder Social home page Coder Social logo

airbnb / reair Goto Github PK

View Code? Open in Web Editor NEW
279.0 39.0 98.0 962 KB

ReAir is a collection of easy-to-use tools for replicating tables and partitions between Hive data warehouses.

License: Apache License 2.0

Shell 0.13% Java 99.26% Thrift 0.17% HTML 0.44%

reair's Introduction

ReAir

ReAir is a collection of easy-to-use tools for replicating tables and partitions between Hive data warehouses. These tools are targeted at developers that already have some familiarity with operating warehouses based on Hadoop and Hive.

Overview

The replication features in ReAir are useful for the following use cases:

  • Migration of a Hive warehouse
  • Incremental replication between two warehouses
  • Disaster recovery

When migrating a Hive warehouse, ReAir can be used to copy over existing data to the new warehouse. Because ReAir copies both data and metadata, datasets are ready to query as soon as the copy completes.

While many organizations start out with a single Hive warehouse, they often want better isolation between production and ad hoc workloads. Two isolated Hive warehouses accommodate this need well, and with two warehouses, there is a need to replicate evolving datasets. ReAir can be used to replicate data from one warehouse to another and propagate updates incrementally as they occur.

Lastly, ReAir can be used to replicated datasets to a hot-standby warehouse for fast failover in disaster recovery scenarios.

To accommodate these use cases, ReAir includes both batch and incremental replication tools. Batch replication executes a one-time copy of a list of tables. Incremental replication is a long-running process that copies objects as they are created or changed on the source warehouse.

Additional Documentation

Batch Replication

Prerequisites:

  • Hadoop (Most, but tested with 2.5.0)
  • Hive (Most, but tested with 0.13)

Run Batch Replication

  • Read through and fill out the configuration from the template.
  • Switch to the repo directory and build the JAR.
cd reair
./gradlew shadowjar -p main -x test
  • Create a local text file containing the tables that you want to copy. A row in the text file should consist of the DB name and the table name separated by a period. e.g.
my_db1.my_table1
my_db2.my_table2
  • Launch the job using the hadoop jar command on the destination, specifying the config file and the list of tables to copy. A larger heap for the client may be needed for large batches, so set HADOOP_HEAPSIZE appropriately. Also, depending on how the warehouse is set up, you may need to run the process as a different user (e.g. hive).
export HADOOP_OPTS="-Dlog4j.configuration=file://<path to log4j.properties>"
export HADOOP_HEAPSIZE=8096
sudo -u hive hadoop jar main/build/libs/airbnb-reair-main-1.0.0-all.jar com.airbnb.reair.batch.hive.MetastoreReplicationJob --config-files my_config_file.xml --table-list my_tables_to_copy.txt
  • Additional CLI Options: --step, --override-input. These arguments are useful if want to run one of the three MR job individually for faster failure recovery. --step indicates which step to run. --override-input provides the path for the input when running the second and third stage MR jobs. The input path will usually be the output for the first stage MR job.

Incremental Replication

Prerequisites:

  • Hadoop (Most, but tested with 2.5.0)
  • Hive (Most, but tested with 0.13)

Audit Log Hook Setup

Incremental replication relies on recording changes in the source Hive warehouse to figure out what needs to be replicated. These changes can be recorded in two different ways. In the first method, the hook is added to the Hive CLI and runs after a query is successful. In the other method, the hook is added as a listener in the Hive remote metastore server. This method requires that you have the metastore server deployed and used by Hive, but it will work when systems other than Hive (e.g. Spark) make calls to the metastore server to create tables. The steps to deploy either hook are similar:

Build and deploy the JAR containing the audit log hook

  • Switch to the repository directory and build the JAR.
cd reair
./gradlew shadowjar -p hive-hooks -x test
  • Once built, the JAR for the audit log hook can be found under hive-hooks/build/libs/airbnb-reair-hive-hooks-1.0.0-all.jar.

  • Copy the JAR to the Hive auxiliary library path. The specifics of the path depending on your setup. Generally, the auxiliary library path can be configured using the configuration parameter hive.aux.jars.path. If you're deploying the hook for the CLI, you only have to deploy the JAR on the hosts where the CLI will be run, and likewise, if you're deploying the hook for the metastore server, you only have to deploy the JAR on the server host.

  • Create and setup the tables on MySQL required for the audit log. You can create the tables by running the create table commands in all of the .sql files here. If you're planning to use the same DB to store the tables for incremental replication, also run the create table commands here.

  • If you want to add the hook for the Hive CLI, change the configuration for the Hive CLI (in the source warehouse) to use the audit log hook by adding the following sections to hive-site.xml from the audit log configuration template after replacing with appropriate values.

  • If you want to add the hook for the metastore server, change the configuration for the Hive metastore server (in the source warehouse) to use the hook by adding the following sections to hive-site.xml from the metastore audit log configuration template after replacing with appropriate values.

  • Run a test query and verify that you see the appropriate rows in the audit_log and audit_objects tables.

Process Setup

  • If the MySQL tables for incremental replication were not set up while setting up the audit log, create the state tables for incremental replication on desired MySQL instance by running the create table commands listed here.

  • Read through and fill out the configuration from the template. You might want to deploy the file to a widely accessible location.

  • Switch to the repo directory and build the JAR. You can skip the unit tests if no changes have been made (via the '-x test' flag).

cd reair
./gradlew shadowjar -p main -x test

Once the build finishes, the JAR to run the incremental replication process can be found under main/build/libs/airbnb-reair-main-1.0.0-all.jar

  • To start replicating, set options to point to the appropriate logging configuration and kick off the replication launcher by using the hadoop jar command on the destination cluster. An example log4j.properties file is provided here. Be sure to specify the configuration file that was filled out in the prior step. As with batch replication, you may need to run the process as a different user.
export HADOOP_OPTS="-Dlog4j.configuration=file://<path to log4j.properties>"
sudo -u hive hadoop jar airbnb-reair-main-1.0.0-all.jar com.airbnb.reair.incremental.deploy.ReplicationLauncher --config-files my_config_file.xml

If you use the recommended log4j.properties file that is shipped with the tool, messages with the INFO level will be printed to stderr, but more detailed logging messages with >= DEBUG logging level will be recorded to a log file in the current working directory.

When the incremental replication process is launched for the first time, it will start replicating entries after the highest numbered ID in the audit log. Because the process periodically checkpoints progress to the DB, it can be killed and will resume from where it left off when restarted. To override this behavior, please see the additional options section.

  • Verify that entries are replicated properly by creating a test table on the source warehouse and checking to see if it appears on the destination warehouse.

For production deployment, an external process should monitor and restart the replication process if it exits. The replication process will exit if the number of consecutive failures while making RPCs or DB queries exceed the configured number of retries.

Additional CLI options:

To force the process to start replicating entries after a particular audit log ID, you can pass the --start-after-id parameter:

export HADOOP_OPTS="-Dlog4j.configuration=file://<path to log4j.properties>"
hadoop jar main/build/libs/airbnb-reair-main-1.0.0-all.jar com.airbnb.reair.replication.deploy.ReplicationLauncher --config-files my_config_file.xml --start-after-id 123456

Replication entries that were started but not completed on the last invocation will be marked as aborted when you use --start-after-id to restart the process.

Monitoring / Web UI:

The incremental replication process starts a Thrift server that can be used to get metrics and view progress. The Thrift definition is provided here. A simple web server that displays progress has been included in the web-server module. To run the web server:

  • Switch to the repo directory and build the JAR's. You can skip the unit tests if no changes have been made.
cd reair
gradlew shadowjar -p web-server -x test
  • The JAR containing the web server can be found at
web-server/build/libs/airbnb-reair-web-server-1.0.0-all.jar
  • Start the web server, specifying the appropriate Thrift host and port where the incremental replication process is running.
java -jar airbnb-reair-web-server-1.0.0-all.jar --thrift-host localhost --thrift-port 9996 --http-port 8080
  • Point your browser to the appropriate URL e.g. http://localhost:8080 to view the active and retired replication jobs.

Discussion Group

A discussion group is available here.

In the wild

If you find ReAir useful, please list yourself on this page!

reair's People

Contributors

ahojman avatar aoen avatar dkrapohl avatar fermich avatar jameadows avatar jingweilu1974 avatar karentycoon avatar ljharb avatar plypaul avatar ronbak avatar saguziel avatar sungjuly avatar uzshao avatar zshao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reair's Issues

Migration of Hive Warehouse -- How ?

Need a understanding about the ReAir -Hows it's helping over the Hadoop - Hive Warehouse.
As Hadoop is already a Big platform to explore enough Big-Data

New Feature: Metastore Database Prefix

Allow a database prefix to be added to the destination cluster. This can be used for production testing . For example, add "test_" to all database names on the destination cluster.

Hive warehouse replicaton and hive batch metastore replication

Hi,
If I want to send hive/warehouse replication from one cluster to another cluster its not successfull.
can you send me some parameter are requried for changes and which user to run.how to use blacklist parameter with database and tables.

and one of batch.metastore.replication which configuration file is edit.
one is mysql and second one is without mysql.

change in org.apache.thrift version not working properly

change in org.apache.thrift version not working properly,
version change 0.9.1 to 0.12.0 will giving an issue while trying to build.
com.airbnb.reair.incremental.thrift.TReplicationService.AsyncClient.pause_call is not abstract and does not override abstract method getResult() in org.apache.thrift.async.TAsyncMethodCall

MetastoreScanInputFormat uses a single ThriftClient in many threads

See the code that passes a single ThriftClient to many theads.

I don't think the ThriftClient is thread-safe. We have many databases, so many threads (16 by default as defined in the source code) will be running on a single ThriftClient, which causes unexpected failures on the server side.

Suggestion fixes:
A. Create a new ThriftClient for each database, or
B. Just get rid of the multi-threaded case here since it won't take too much time to list through tens or even hundreds of databases.

Add Kerberos Support -Thrift Based

Does reair support replicating tables to Secured/Kerberized cluster?

Am facing below exception:

2017-07-07 01:52:06,930 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
2017-07-07 01:52:06,951 WARN [main] org.apache.hadoop.mapred.YarnChild: Exception running child : java.io.IOException: database 0, table some_hive_db.some_table_in_hive got exception
	at com.airbnb.reair.batch.hive.MetastoreReplicationJob$Stage1ProcessTableMapperWithTextInput.map(MetastoreReplicationJob.java:622)
	at com.airbnb.reair.batch.hive.MetastoreReplicationJob$Stage1ProcessTableMapperWithTextInput.map(MetastoreReplicationJob.java:593)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
Caused by: com.airbnb.reair.common.HiveMetastoreException: org.apache.thrift.transport.TTransportException
	at com.airbnb.reair.common.ThriftHiveMetastoreClient.getTable(ThriftHiveMetastoreClient.java:126)
	at com.airbnb.reair.incremental.primitives.TaskEstimator.analyzeTableSpec(TaskEstimator.java:84)
	at com.airbnb.reair.incremental.primitives.TaskEstimator.analyze(TaskEstimator.java:68)
	at com.airbnb.reair.batch.hive.TableCompareWorker.processTable(TableCompareWorker.java:136)
	at com.airbnb.reair.batch.hive.MetastoreReplicationJob$Stage1ProcessTableMapperWithTextInput.map(MetastoreReplicationJob.java:614)
	... 9 more
Caused by: org.apache.thrift.transport.TTransportException
	at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
	at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84)
	at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:340)
	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:202)
	at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_table(ThriftHiveMetastore.java:1263)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_table(ThriftHiveMetastore.java:1249)
	at com.airbnb.reair.common.ThriftHiveMetastoreClient.getTable(ThriftHiveMetastoreClient.java:121)
	... 13 more

I saw code in TableCompareWorker.java#L90 calling getMetastoreClient() of HardCodedCluster.java#L62. I don't see any implementation which checks if the connection need to be in secure mode or not. Based on the security flag in config, we can switch which type of TTransport to use.

Any thoughts on how to implement it?

I was trying to connect to thrift over standalone program based on https://github.com/joshelser/krb-thrift. But no luck. Now I get

Exception in thread "main" org.apache.thrift.transport.TTransportException: SASL authentication not complete

Stnadalone Java program snippet:

TTransport transport = new TSocket(host, port);
Map<String,String> saslProperties = new HashMap<String,String>();
        // Use authorization and confidentiality
        saslProperties.put(Sasl.QOP, "auth-conf");
        saslProperties.put(Sasl.SERVER_AUTH, "true");
        System.out.println("Security is enabled: " + UserGroupInformation.isSecurityEnabled());
        // Log in via UGI, ensures we have logged in with our KRB credentials
UserGroupInformation.loginUserFromKeytab("someuser","/etc/security/keytabs/someuser.headless.keytab");
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
System.out.println("Current user: " + currentUser);
// SASL client transport -- does the Kerberos lifting for us
        TSaslClientTransport saslTransport = new TSaslClientTransport(
                "GSSAPI", // tell SASL to use GSSAPI, which supports Kerberos
                null, // authorizationid - null
                args[0], // kerberos primary for server - "myprincipal" in myprincipal/[email protected]
                args[1], // kerberos instance for server - "my.server.com" in myprincipal/[email protected]
                saslProperties, // Properties set, above
                null, // callback handler - null
                transport); // underlying transport
        // Make sure the transport is opened as the user we logged in as
        TUGIAssumingTransport ugiTransport = new TUGIAssumingTransport(saslTransport, currentUser);
        ThriftHiveMetastore.Client client = new ThriftHiveMetastore.Client(new TBinaryProtocol(ugiTransport));
        transport.open();

Any guidance will help me contribute back patch for enabling support for Kerberos.

Have you ever tried Spark integration?

Thanks for open sourcing this project, I'm having an issue to integrate Spark app to record audit logs using hive-hooks module. Just curious have you ever tried Spark integration? It seems the Spark supports Hive hooks theoretically, but it doesn't work when initializing remote HiveMetastore client.

file checksum with different blocksize maybe Invalid

with different cluster if they blocksize is different, in the stage 3 , it will be fail, because directoryCopier.equalDirs must return false ,they checksum are different, if file size bigger than one block size ,it must be happen. now i check file use they bytes.
below is they different checksum
/test/123.lzo MD5-of-0MD5-of-512CRC32C
/test/123.lzo MD5-of-0MD5-of-256CRC32C

Copying metastore only

Is it possible to replicate metastore only?
Is it possible to copy metadata between different Hive metastore versions?

TaskEstimator does not cache MetastoreClient connections

Here are the 4 places:

  • analyzeTableSpec (1, 2)
  • analyzePartitionSpec (1, 2).

The result of these is that we are recreating many connections to Hive Metastore in a very short period of time with hundreds of mappers/reducers running concurrently.

We should cache the MetastoreClient connections in a thread-safe way, using ThreadLocal<>.

Incremental Replication

  1. Can you give an " Incremental Replication" example ?
  2. Have you used the " Incremental Replication" in the production environment?

java.util.NoSuchElementException

I start the ReAir jar for replicate source warehouse to dest warehouse. But i found the exception as follow, and I don't know why
17/05/23 17:05:33 INFO mapreduce.Job: The url to track the job: http://cdh.master.linesum:8088/proxy/application_1495527145657_0006/ 17/05/23 17:05:33 INFO mapreduce.Job: Running job: job_1495527145657_0006 17/05/23 17:05:38 INFO mapreduce.Job: Job job_1495527145657_0006 running in uber mode : false 17/05/23 17:05:38 INFO mapreduce.Job: map 0% reduce 0% 17/05/23 17:05:41 INFO mapreduce.Job: Task Id : attempt_1495527145657_0006_m_000001_0, Status : FAILED Error: java.util.NoSuchElementException at java.util.AbstractList$Itr.next(AbstractList.java:364) at com.google.common.collect.Iterators.any(Iterators.java:684) at com.google.common.collect.Iterables.any(Iterables.java:620) at com.airbnb.reair.batch.hive.TableCompareWorker.processTable(TableCompareWorker.java:123) at com.airbnb.reair.batch.hive.MetastoreReplicationJob$Stage1ProcessTableMapper.map(MetastoreReplicationJob.java:570) at com.airbnb.reair.batch.hive.MetastoreReplicationJob$Stage1ProcessTableMapper.map(MetastoreReplicationJob.java:556) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920) at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

cp: ... No such file or directory

private static void copyFile(Path srcFile, Path destFile) throws IOException {
    String[] copyArgs = { "-cp", srcFile.toString(), destFile.toString() };

    FsShell shell = new FsShell();

//1.============Whether it should be like this:======================
String[] copyArgs = { "-put", srcFile.toString(), destFile.toString() };

//2.================================================
I ask, whether the project is still in development

nameserver support

this version reair dont support two namesever , should add man config exp:

dfs.nameservices
srcCluster,destCluster


dfs.ha.namenodes.srcCluster
nn1,nn2


dfs.ha.namenodes.destCluster
namenode317,namenode319


dfs.namenode.rpc-address.srcCluster.nn1
a1-ops-hdnamenode01.hz:8020


dfs.namenode.rpc-address.srcCluster.nn2
a1-ops-hdnamenode02.hz:8020


dfs.namenode.rpc-address.destCluster.namenode317
a2-prod-sh-namenode-XX-124.sh:8020


dfs.namenode.rpc-address.destCluster.namenode319
a2-prod-sh-namenode-XX-123.sh:8020


dfs.client.failover.proxy.provider.srcCluster
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider


dfs.client.failover.proxy.provider.destCluster
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.