Coder Social home page Coder Social logo

cockroachdb / replicator Goto Github PK

View Code? Open in Web Editor NEW
56.0 45.0 22.0 9.28 MB

replicator is a toolkit for ingesting logical replication feeds into a CockroachDB cluster

License: Apache License 2.0

Go 98.80% Shell 0.66% Dockerfile 0.30% TypeScript 0.24%
cockroachdb

replicator's Introduction

replicator's People

Contributors

bobvawter avatar bramgruneir avatar chriscasano avatar dependabot[bot] avatar duskeagle avatar glennfawcett avatar otan avatar robert-s-lee avatar sravotto avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

replicator's Issues

ERROR: cannot write directly to computed column

Below steps to reproduce.

Open terminal for the SOURCE

cockroach start-single-node --listen-addr :30000 --http-addr :30001 --store cockroach-data/30000 --insecure --background

cockroach sql --insecure --port 30000 <<EOF
create table t (id int, uuid uuid default gen_random_uuid(), ts timestamp default now(), primary key (id));
SET experimental_enable_hash_sharded_indexes = ON;
CREATE INDEX tidx ON t (uuid) USING hash WITH bucket_count = 2;
-- add enterprise license
SET CLUSTER SETTING cluster.organization = 'Workshop';
SET CLUSTER SETTING enterprise.license = 'crl-0-xxx';
SET CLUSTER SETTING kv.rangefeed.enabled = true;
CREATE CHANGEFEED FOR TABLE defaultdb.t INTO 'experimental-http://127.0.0.1:30004/crdbusertable' WITH updated,resolved='10s';
EOF

Open terminal for TARGET

cockroach start-single-node --listen-addr :30002 --http-addr :30003 --store cockroach-data/30002 --insecure --background

cockroach sql --insecure --port 30002 <<EOF
create table t (id int, uuid uuid default gen_random_uuid(), ts timestamp default now(), primary key (id));
SET experimental_enable_hash_sharded_indexes = ON;
CREATE INDEX tidx ON t (uuid) USING hash WITH bucket_count = 2;
EOF

Open terminal for CDC-SINK

./cdc-sink --port 30004 --conn postgresql://root@localhost:30002/defaultdb?sslmode=disable --config '[{"endpoint":"crdbusertable", "source_table":"t", "destination_database":"defaultdb", "destination_table":"t"}]'

On source

cockroach sql --insecure --port 30000 -e "insert into t (id) values (1);"

ERROR on CDC-SINK

2021/08/20 14:00:48 crdbusertable: resolved - timestamp 1629482444863757000.0
2021/08/20 14:00:48 crdbusertable: _CDC_SINK.defaultdb_t executed 1 operations
2021/08/20 14:00:48 ERROR: cannot write directly to computed column "crdb_internal_uuid_shard_2" (SQLSTATE 55000)

Suport backfilling pglogical connections

For simple use cases, we should support at least a simple way of copying the table data from the source db to the target cluster. If we use a pg_export_snapshot(), it would be possible to select data concurrently from all source tables that feed concurrent COPY INTO the destination cluster.

Apply resolved timestamps asynchronously from webhook call

We currently apply mutations within the resolved-timestamp HTTP request. This does allow us to report errors via the changefeed job, but creates "hiccups" in overall resolved-timestamp changefeed throughput if there are large batches of mutations.

Alternatively, we could stage the resolved timestamps into a table and have an asynchronous process apply the relevant mutations. This would imply the need for some kind of leasing mechanism, to avoid unnecessary contention on the resolved-timestamp and staged-mutation tables.

One thing that we would lose, however, is a way to immediately report failures to apply mutations (e.g. incompatible schema changes) via the resolved-timestamp HTTP request. This would imply that we need to persist the outcome of the most recent attempt to apply staged mutations and that the resolved-timestamp endpoints would only be able to report failure on subsequent resolved-timestamp messages from the source cluster.

logging: capability to write log to file

Currently the process writes the log to stdout/stderr. It would be great if we added the capability to write to file. Ideally, using the same logging framework that CockroachDB uses so we can specify channels, format, sink, etc..

Support https endpoint

Did the following on the source:

CREATE CHANGEFEED FOR TABLE me INTO 'experimental-https://172.31.2.194:26258/me.sql' WITH resolved, updated;

The log from the source machine showed:

error: retryable changefeed error: retryable http error: Put https://172.31.2.194:26258/me.sql/2020-04-19/202004191704475286656650000000001-bcfd19dba28ea94a-3-5017-00000000-me-1.ndjson: http: server gave HTTP response to HTTPS client

Single-statement promotion from staging to target tables

It should be possible to construct SQL statements that promote from the staging table to the target tables with sufficiently complicated query generation. This would eliminate another round-trip of the synchronized data through the cdc-sink binary.

Steps:

  • Ensure that mutation-ordering and deduplication within a range of resolved timestamps can be performed server-side.
  • Change the storage of staged mutations to JSONB or to a table schema which allows querying of individual row/col/value tuples.
  • Generate UPSERT statements which use the structured representation to mutate the target rows directly.

A possible complication is that it could be necessary for cdc-sink to perform some metadata queries to be able to generate expressions to perform the necessary type coercions.

cdc-sink: support reading from s3

the current http sink requires cdc-sink to be HA. It also writes to target as a staging. Some drawbacks:

  • hard to audit trail what data was applied
  • cdc-sink process itself needs to be up to accept the cdc feed
  • writing to target as staging creates stress on the target (especially for a large table and high volume)

an alternate is to read from s3 cdc target that would address the above issues.

Unify logical-loop CLI code

There's a bit of duplication between the pg and mysql logical-replication CLI commands, related to configuring the logical.Config and whatnot. This would evolve better if we had an intermediate logical command that provided common flag configuration.

cdc-sink pglogical

I am trying to start the cdc-sink for replicating the data from postgres db to crdb. i am using the below command

cdc-sink pglogical --publicationName <mc_publication> --sourceConn / --targetConn --targetDB <application_db>

i am getting the below error

error="no target db was configured"
i have the db configured in the target cluster.

Let me know what i am missing

Think about detecting schema drift

This may be a separate tool, given that cdc-sink does not connect directly to the source database. May also be a monitoring metric to alert on if we see cases with unknown / mis-typed columns.

regression: in mysql/mariadb we are not using the defaultGTIDSet anymore.

ERROR [Jul 28 09:17:00] error from replication source; continuing detail="missing gtidset\ngithub.com/cockroachdb/cdc-sink/internal/source/mylogical.(*conn).ReadInto\n\t/Users/sravotto/go/src/github.com/cockroachdb/cdc-sink/internal/source/mylogical/conn.go:198\ngithub.com/cockroachdb/cdc-sink/internal/source/logical.(*loop).runOnce.func1\n\t/Users/sravotto/go/src/github.com/cockroachdb/cdc-sink/internal/source/logical/loop.go:200\ngolang.org/x/sync/errgroup.(*Group).Go.func1\n\t/Users/sravotto/go/pkg/mod/golang.org/x/[email protected]/errgroup/errgroup.go:75\nruntime.goexit\n\t/usr/local/opt/go/libexec/src/runtime/asm_amd64.s:1581" error="missing gtidset"

Build automation

cdc-sink does not have any automation around build, test, and packaging. The use-case is pretty simple, so GitHub Actions are probably the lowest-friction way forward.

pglogical: Add test case for DELETE in no-PK table

Add a test to document the behavior in the source db when there's no replication identity for a source row that's updated or deleted. If updates/deletes are permitted, revise the pglogical code as necessary.

Support core changefeed as a logical loop

We should support consuming a CRDB core changefeed, to support a "pull"-based model if the source cluster is not able to create outgoing network connections to a cdc-sink deployment. This isn't going to be as performant as a proper changefeed setup, but it does provide another driver for the logical package, to ensure it's generally-sufficient.

cdc: Out of Memory /w 10warehouse TPCC sink

5, "ol_number": 8, "ol_o_id": 1236, "ol_quantity": 5, "ol_supply_w_id": 4, "ol_w_id": 4}, "key": [4, 7, 1236, 8], "updated": "1587320776859132078.0000000000"}
2020/04/19 23:39:56 lineRaw: {"after": {"c_balance": -10.00, "c_city": "wsmd68P2bElAgr", "c_credit": "GC", "c_credit_lim": 50000.00, "c_d_id": 10, "c_data": "np8ueWNXJpBB0ObpVWo1BahdejZrKB2O3Hzk13xWSP8P9fwb2ZjtZAs3NbYdihFxFime6B6Adnt5jrXvRR7OGYhlpdljbDvShaRF4E9zNHsJ7ZvyiJ3n2X1f4fJoMgn5buTDyUmQupcYMoPylHqYo89SqHqQ4HFVNpmnIWHyIowzQN2r4uSQJ8PYVLLLZk9Epp6cNEnaVrN3JXcrBCOuRRSlC0zvh9lctkhRvAvE5H6TtiDNPEJrcjAUOegvQ1Ol7SuF7jPf275wNDlEbdC58hrunlPfhoY1dORoIgb0VnxqkqbEWTXujHUOvCRfqCdVyc8gRGMfAd4nWB1rXYANQ0fa6ZQJJI2uTeFFazaVwxnN13XunKGV6AwCKxhJQVgXWaljKLJ7r175FAuGYFLyx", "c_delivery_cnt": 0, "c_discount": 0.3930, "c_first": "raPxxELo5B1fcW", "c_id": 251, "c_last": "ABLEESEBAR", "c_middle": "OE", "c_payment_cnt": 1, "c_phone": "4732942322014469", "c_since": "2006-01-02T15:04:05", "c_state": "VW", "c_street_1": "8RsaCXoEzmssaF9", "c_street_2": "m9cdLXe0YhgLRr", "c_w_id": 7, "c_ytd_payment": 10.00, "c_zip": "082911111"}, "key": [7, 10, 251], "updated": "1587320776788760053.0000000000"}
fatal error: runtime: out of memory

runtime stack:
runtime.throw(0x798d32, 0x16)
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/panic.go:617 +0x72
runtime.sysMap(0xc058000000, 0x4000000, 0xa68378)
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/mem_linux.go:170 +0xc7
runtime.(*mheap).sysAlloc(0xa500e0, 0x211c000, 0xa500f0, 0x108e)
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/malloc.go:633 +0x1cd
runtime.(*mheap).grow(0xa500e0, 0x108e, 0x0)
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/mheap.go:1222 +0x42
runtime.(*mheap).allocSpanLocked(0xa500e0, 0x108e, 0xa68388, 0x7fb165258fd0)
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/mheap.go:1150 +0x37f
runtime.(*mheap).alloc_m(0xa500e0, 0x108e, 0x7fb166460100, 0xa500f0)
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/mheap.go:977 +0xc2
runtime.(*mheap).alloc.func1()
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/mheap.go:1048 +0x4c
runtime.(*mheap).alloc(0xa500e0, 0x108e, 0xc000010100, 0x7fb1664659e0)
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/mheap.go:1047 +0x8a
runtime.largeAlloc(0x211c000, 0x7fb166610001, 0x7fb1664659e0)
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/malloc.go:1055 +0x99
runtime.mallocgc.func1()
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/malloc.go:950 +0x46
runtime.systemstack(0x0)
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/asm_amd64.s:351 +0x66
runtime.mstart()
/usr/local/Cellar/go/1.12.6/libexec/src/runtime/proc.go:1153

Support pglogical replication

This is a tracking issue to be able to consume a logical replication feed from PostgreSQL to act as a source of row data.

Much of the necessary wire-protocol is already present in jackc/pglogrepl.

In the desired end state, cdc-sink would support multiple frontends for row data, with a common backend that supports transactionally-consistent staging and reification of source data.

It's likely that a mapping/transform layer will exist in cdc-sink at some point in the future to perform on-the-fly schema or data-type adjustments. This should be accommodated as a desired end-state, but will not be added in the initial implementation.

Plan needs (revise as necessary):

  • Add automated integration testing with CRDB
  • Source-code reorganization to clearly identify front- and back-half concerns
  • Clean up existing configuration ergonomics pglogical is now a separate subcommand.
  • Extend test-rig to also provide a matrix of postgres versions
  • Register and de-register a replication slot in a source database
  • Backfill and ingest
  • Checkpointing and error-recovery
  • Lease the feed across multiple cdc-sink instances pglogical is now a separate subcommand.
  • Fail usefully in the presence of incompatible upstream schema changes

Sanity-check persistent resolved timestamp

Consider adding a sanity-check which ensures that any data being written into the staging table is not stale with respect to any previously-received and -flushed resolved timestamp. This would indicate a possible desynchronization of the changefeed and should result in a permanent failure condition.

Authorize incoming feed connections

In v21.2, the webhook_auth_header parameter allows an auth header to be added to the http requests made by cockroach.

The header value is arbitrary, so we can use a JWT token for stateless authorization. It would be straightforward to embed a list of allowable target db/schema pairs. Code for hmac-signed JWT can be lifted from cacheroach. The signing keys could be loaded either from the filesystem, or stored in the target database.

An alternative would be to use a basic auth header, in combination with a bcrypted user table in the target database. This could also have a column to limit the allowable target schemas.

Auth support for older versions of cockroach could be added by allowing the auth token to be provided as a query parameter. Not ideal, though, since params are often logged by load balancers.

Sporadic unexpected EOF messages

Seeing sporadic unexpected EOF messages while running a tpcc workload on postgres and replicating it to crdb

WARNING[Apr  6 19:12:01] error while flushing mutations; will retry    
error="UPSERT INTO \"benchbase\".\"public\".\"history\"
(\n\"h_c_id\",\"h_c_d_id\",\"h_c_w_id\",\"h_d_id\",\"h_w_id\",\"h_date\",\"h_amount\",\"h_data\"\n) 
VALUES\n($1::INT8,$2::INT8,$3::INT8,$4::INT8,$5::INT8,$6::TIMESTAMP,$7::DECIMAL(6,2),$8::VARCHAR(24)): unexpected EOF"

cdc_sink too verbose... need flags to adjust level

Every Upsert and Change is recorded. This is causing delays in the processing.

2020/04/20 16:28:48 Upsert Statement: UPSERT INTO tpcc_sink.item (i_price, i_data, i_id, i_im_id, i_name) VALUES ($1, $2, $3, $4, $5)
2020/04/20 16:28:48 line to update: {After:map[] Key:[] Updated: nanos:1587396586358642714 logical:0 after:{"i_data":"Mask","i_id":96305,"i_im_id":9290,"i_name":"hy3v1U5yraPxxELo5B1fcW8R","i_price":34.37} key:[96305]}
2020/04/20 16:28:48 Type: string, value: Mask
2020/04/20 16:28:48 Type: string, value: hy3v1U5yraPxxELo5B1fcW8R

schemawatch: Primary constraints aren't always named primary

For example:

CREATE TABLE public.new_order (
	no_w_id INT8 NOT NULL, no_d_id INT8 NOT NULL, no_o_id INT8 NOT NULL,
	CONSTRAINT new_order_pkey PRIMARY KEY (no_w_id ASC, no_d_id ASC, no_o_id ASC),
	FAMILY "primary" (no_w_id, no_d_id, no_o_id)
);

Non-transactional backfill path

There are related problems to consider:

  • initial_scan over large data sets
  • High update rates on the source cluster which overwhelm the 3x write amplification in the target db (i.e.: staging and flushing on a resolved timestamp)
  • High update rates or very large amounts of incoming data which exceed the maximum transaction size in the target database during resolved-timestamp flush operations.

These would seem to call for some kind of direct-feed approach, where cdc-sink bypasses the staging table and populates the target tables directly. For the initial_scan case, this should be relatively safe, as there is no expectation that the target database would be usable until the backfill is complete. For the other cases, we may want to allow the operator to place cdc-sink into a non-transactional, "catch up" mode to accommodate operational issues that may be encountered.

Add option to capture unknown source columns in an "extras" column

The apply package requires all incoming properties in a mutation to map onto a destination column. To support document-store use cases with variable schemas, it would be useful to be able to store all unmapped mutation properties in a JSONB column in the destination table. This would provide operators with the option to incrementally adjust their CRDB schema by backfilling new columns with a DEFAULT value that extracts values from the extras column.

This behavior could also accept a mode in which the entire json mutation is stored instead of just the unmapped properties.

Support for webhook CDC scheme

The experimental-http schemes are being deprecated in favor of a webhook scheme. Investigate any necessary changes resulting from payload format, resolved timestamps, etc.

DELETE of a row triggered and error (pq: at or near "2": syntax error)

Running against a v20.1.5 cluster, three nodes each in two different GCP zones, a DELETE of one row failed to replicate and triggered the following output in the cdc-sink process stdout/stderr:

2020/09/09 19:17:21 osm.sql: resolved - timestamp 1599679037607728867.0
2020/09/09 19:17:21 osm.sql: _CDC_SINK.defaultdb_osm executed 1 operations
2020/09/09 19:17:21 pq: at or near "2": syntax error
2020/09/09 19:17:37 osm.sql: resolved - timestamp 1599679048411984860.0

The sink was running on one of the CockroachDB nodes:

$ ./cdc-sink --port 30004 --conn postgresql://root@localhost:26257/defaultdb?sslmode=disable --config='[{"endpoint": "osm.sql", "source_table": "osm", "destination_database": "defaultdb", "destination_table": "osm"}]'

And the changefeed was created as follows, on the other cluster:

CREATE CHANGEFEED FOR TABLE osm INTO 'experimental-http://10.142.0.101:30004/osm.sql' WITH updated,resolved;

Table DDL:

CREATE TABLE osm
(
  id BIGINT
  , date_time TIMESTAMP WITH TIME ZONE
  , uid TEXT
  , name TEXT
  , key_value TEXT[]
  , lat FLOAT8
  , lon FLOAT8
  , geohash4 TEXT
  , CONSTRAINT "primary" PRIMARY KEY (geohash4 ASC, id ASC)
);

After loading the data (done via INSERTs; a dump is available https://storage.googleapis.com/crl-goddard-gis/osm_dump.sql.gz), the following query yielded identical results on both source and sink clusters:

SELECT name, geohash4, id, date_time
FROM osm
ORDER BY geohash4, id
LIMIT 10;

Taking the geohash4 and id values from one of these rows, an UPDATE was done:

UPDATE osm
SET name = 'Chez Mike'
WHERE geohash4 = 'eykn' AND id = 10917230;

and this was replicated to the sink cluster.

Next, I ran the DELETE which caused the issue:

DELETE FROM osm
WHERE geohash4 = 'eykr' AND id = 13841437;

The effect of the delete was observed on the source end, but never on the sink end. This is when the log entry, above, was seen.

cdc-sink: print table name of the operations

the table name the operation is performed can provide more insight

2020/06/13 13:39:51 crdbtpcc: added 1000 operations
2020/06/13 13:39:51 crdbtpcc: added 0 operations
2020/06/13 13:39:54 crdbtpcc: added 1 operations
2020/06/13 13:39:55 crdbtpcc: added 1000 operations
2020/06/13 13:39:55 crdbtpcc: added 1000 operations
2020/06/13 13:39:56 crdbtpcc: added 1000 operations
2020/06/13 13:39:56 crdbtpcc: added 1000 operations

Allow transactional mode to optionally degrade to a batch mode

The changefeed-processing code only has two states, immediate and transactionally-consistent.

Consider adding a third option to the consistency/performance spectrum:

  • Transactional: Dequeue mutations and apply them in a single transaction.
  • Batching: Stage mutations, but dequeue and apply them in batches.
  • Immediate: Apply mutations as they are received.

If we add a knob that allows the transactional mode to degrade to a batching mode when there are too many pending mutations (or perhaps too many staged bytes of data), we can allow operators of cdc-sink to trade absolute transactional-boundary-preservation for being able to catch up if the target cluster becomes overwhelmed.

ERROR: read binary tuple: partial copy data row (SQLSTATE XXUUU)

# Open terminal for the source
cockroach start-single-node --listen-addr :30000 --http-addr :30001 --store cockroach-data/30000 --insecure --background

cockroach sql --insecure --port 30000 <<EOF
CREATE SEQUENCE customer_seq;
create table t (id uuid, clob string, ts timestamp, primary key (id));
-- add enterprise license
SET CLUSTER SETTING cluster.organization = 'Workshop';
SET CLUSTER SETTING enterprise.license = 'crl-0-';
SET CLUSTER SETTING kv.rangefeed.enabled = true;
CREATE CHANGEFEED FOR TABLE defaultdb.t INTO 'experimental-http://127.0.0.1:30004/crdbusertable' WITH updated,resolved='10s';
EOF



# Open terminal for target
cockroach start-single-node --listen-addr :30002 --http-addr :30003 --store cockroach-data/30002 --insecure --background

cockroach sql --insecure --port 30002 <<EOF
create table t (id uuid, clob string, ts timestamp, primary key (id));
EOF


# Open terminal for cdc-sink
cdc-sink --port 30004 --conn postgresql://root@localhost:30002/defaultdb?sslmode=disable --config '[{"endpoint":"crdbusertable", "source_table":"t", "destination_database":"defaultdb", "destination_table":"t"}]'



# On source
cockroach sql --insecure --port 30000

insert into t values (
gen_random_uuid(),
'WERSFDBMDNZZGAXFSYDSYSVIBVULDXZEKSWBINCFVGZWSKSTZUTLHTCIKUMRQJASBVMTHBLRADNFXVUMPSINMZZFKIAGBOMZFCWXKHLRMQXAIIDEKBABDUTVKXIGJNZQHTPZSDHCZIGFJZCSGYIGPZMEPYHMJVCVPVIDUYVYOBVXUREJUBXGGEGQKBLXEMTNXTWQIAFUDREEQIDRBAHFQPXVMPKKTCGQMZRNLPMYTYLODKLSPWSOCAZLIEZGVLQMHXHZIFHJLNCYYROKZZJSWLWMQYIJRALBNMFHUGPCQOEACRDFFHQCDCNZRJCDQRNBZDQDYRZOXVVZITUNEKDCKTAXOKSPGFTKNYRAIMTNBGRBZQPFKTGWRTOOLBJQUUUJSWJGXTLDWTXSCIZEOJKZKBGARGYREBHNPARCCZJHJWQPGBPCEQBOGDBUBSVFJPOUTBXGAAVQIVYCQXSVRSOHLXJAUJXBONKOQOTLJCRBVXNKDSQGYTEFJYCPAPQALQTHCKFISGBEBFCTASSCOSQGQRSSABLUMZHDXXPDWGOFJZDPKUCTRMAKNUDVXCKOUSQGULTSVTCOQAWCFGZGTBNAYROWKBJQTLTMWONQDAXYNUCGFEJMYXIYEPDRAAURECEEFPZRCXFTGUATVYTQBLXMHYYLWLOAQVBIOBVZENWTLGEWUEHUVDYELUSYDDLLCVCRJZLAYQCOODLIYRKZLXZZRGRUQLQKOTTVWGIDDBWOEOEVCELPHORTJBAOIRVYMBDPUWIBGNXUVZMJYLERYFKMRLQRYQDNYNUKAMJSMSZDVYHJRHTUSPLDUPLURGJRBKHHLTOWPDYSFYRVOVIHZVSZDNCPTZYFZWZKJAJZJMVHWAUXPDYOWHFMBBTDUEWOQTTQJACGHXYOGIYLOURAWKGRXUFUXDYJXHNSUWMHAZTMIGPJJETXDVEWGPEJADRPYAWLKBFUBKYQMZYBKHOZOHOZZSABDGGENGUQHRTBLSBACFZWIDMUMEMLPVYVIVXUXCPLEUADLYNARKZOTCCLXRIQAARLYXCCWUPSWQUAVYEECBBEKXQKXWXHVUCQAMTSYVKRSJJJLXZKYNUCHYZEKXDMTQLEXJPMEULUKTNGHTSOQJZMPDWFUMETRNGWASQQEZYCXFGIIERVBJEAHEABDANBBQCWWBPILNJDEFESSTTIRCLCPJXPAULGVCWBGAAARBLJDBJMIBRDAYLYUTQMERFYEQENASNIMBJZURLOHIUFEGSTMNWJPNKAKZLPOEQQFXKXYVTXTSZGATZXSRUXDDJTVMWEMXYRPOCTWDEVSOPHZHYXESUYRCOEQIRODZFOSETHACSAZRGOSIILGGCYTRGXEXASKKTCBKBYIUTUDYWEKNHDFDOPDJTJARJCOCDNHIWXWQNWPPBSXFCAKIHSSLLDPFXSNIXXSLFTKCHVDYOHERQHZNVPDZBENCEUDFDPGQTUKGGKUMGWOVSRUTRYDVESVVBJZXKUQLYYSCTEJEDDWOJGGTNEHHAPAUYDPUAAUTOLLQCLSOYWLVIISWOIDPDTUIUMSIATXWIWGJEZDEAZMVTCBOZHCOMHNVEBJCQUDPGFDGSFJYSRFTJIMLQNKUGHHLKXLPCQOQSAHCSITDECMRQXHNPAXDTCSCXMXFJDIDLNXPFXQEWXYCCKNPXNSCGACINSMMSTKWYPRGIRRDUTJZNVIYXAWZGIAFVECQXSNKPBWCWSCWOFKQMQFASCQZEGWWRBMBJVLRMSFEXUDYOEOZBOPOFKUFXYEZLTGRIGFGQFOLUCZYVVEYKILRBCXGFZQQWTXGVEEMZIMLKZOWSIBVGATFKLJBGNXXXNKXMIXXKEAQHVXLHIXUOBKKNDOBIZXSJSJCEIAEQFUVBJGJWTJKXJFUIYFZFMIHRIAIQKXLPOFUBMSOMAKRSTIPZFKMHBRJOOZNLGGCOYOFIFMWIVOLOFZPFSXYDVVPKBKZVABUJTQSMGTSSEQCROCBSUPMIBYMBROMBDFPJVHPWDZPRXFEGQGCSMUYDCLEGYLFAOVCYKAFKZGFHGIKHHDELIUWPPVYTEKUMUZHIYMTIIZDPSHIIAYSUURPJPBDHZLZPMZKURRFZAMGKUBLZYJDXZEHAFCXHMANWRGTJDDZKSAGSHHCHLEPGKIPMUVUMMPIGASCZVEHLNMPNVNWDRVXLISXADQSXLOBESATDPPNHMIJDPFGYOBIKMLUGVZCTRPFIWJBZHKDFIROIKICMTLWCYJJBHFQLKZZVWKFCLYNXDAMVTAZMSXOKKZTGGXGWXHOAPFETXOALLMZSCZCHZIBNWJKOUXQAIGJXXOFETMMNWSRNOUSKGTVSLRJJTCHVGXELIIHXVBVQAGDUEDJENLADRBCHRZZNUPNQDKWOXJBQUFBNDSWVVNBKORUGVUGUTRFNEWSSARGFWUWMUIHODHOXYGRCSVWNJGJNKWSDGXKXNJHPFSDOBGJHTHTOZIUDFZJGMPAUVZPDROQBTNZQRUQFNYDMWQPNVRVFNTRAHUVTXOXXOHHBZQAIQVXIKPYECSNADYKVDILAGWFHYSRILTPAMHHBAEIMPDRAYRGWVETTIQFAIUNGESNPHEXWWZMZVVGGJALTIUHNESIUBISNZDDPBLUEFQBRJUTHOLWGWMZBPRBSKSAYTKJIYVXEJFIKHYBZUIRAGKMUPSOYQSGWVEFKXDLKNRAIQUWUIFARKONUCHRLQYSYBQWQZLNTOCTTHAJZUPYJKGRDNOORKCSMQKNLPGQEEOPMBCJHHLOPGKIIGVYPZGWRNTIDSGUPEMQGTQCNLOTUTIHIDIDXMHVTVGIFWJMSOQIQDDRPFGYZGTUJPVPJRIUFDVZUZCJJJVSHRJFUFKUQVYRTYHGKNZPXFOFQOOVTXMZPFDNAAUGVIYNBNVVAEJXTLCJLGSIXIDKWNSKPGGOUVUGEVUACXQOXJIDTBBHCONQOWFUOWAGTCALCOCFGQVOVMFTKUSPHRUKMMHEEGWURPEUSVFRIDGOIEDFZMKFFHFTIWGHFOOQPRPMVZHILCWTXJRFRSPEAZFFRNOFKTGBOQZNQGVAXHNDHXNFDZEHQSJUJBSELHAOWCXMHBSRDIOEESQTGXUNWOERYLIYZZIYWFLKODGGCWWBIVGQWDAHTUYTIPPFGLXBWCLEHYTXRVELSVYOOOSRTODXDVAWYWQVYBOSCEVIPUBJRMOFFBTEZGUZGBKRSRDCYESYAJXHZGQVOGWVTVTFEURURGTRDZCKYIIKGXYDGFFQIPRWXCICEPXQISKRDZNCOIBLMJRHWNSIJUFYVROXNRCYSNYUEUFZDTVWZNSHXGCSRFDQYVXMKFFLXXQVHMXONOOTCDZYDHEGGOTDMWEOVSXFWXAWWSIBTHBGLVZESEBSTIEKGPAWEYKOJNXJQFIUTGEZDZLQFHBTRUWXRWHFASMZQRIFECJKREIZELZFDMBSNXKNMNSZSFPIUYNPQCVYLPWNMSBIWFKXYDGFBWHXQGKADDFRHUWPPVSXPUORPBARCQCXQTGFQEFLVYGZUKUGLALKHQXXZAGNTJLIXLRMGESYSADQBFMOURSEHZPPJBNMDXZCPUVQUDXYYBDTMGUXMCOATZASHUIHFRLUBIQOAJIQNYPFMKHCYLSJAEARZLZSKQEMLVYIGGAWRCXXWTXBKAQKBCIEDGHGDZHSBPWOHTMPSBYWICTDHLJZOIMGUQLCCCOIRWFMEKBEASPQCPJTAGPDLHILSVHBTFMOALHYSBGNRJHSHWLEOKFQZWFRYSNTATZDQKDQXCDTTOXPWJUYDWNCAUHVNISBTMYRMLMKMUFSOBFRMXVRVUCKSAKLYUONCXWJDZCDTVQNEUJRNQMGIJYWHDFIWLTRTDNNRGBJOYKTWYVVNTXHNVCBFSLYXINQEOMZHHVYVRQTOFZFDVMLAAYODRXKEAJGICLKJMZWUXDRUWSYFJCUJOEGDGDZBPEJMOVZGYSBIEWLBIVHGNDUEDPJAWEUOPQSYCQQAJYNZOFPXOKKJEZMRIVQTDDHVHHRBRIAXTTWGCUPMYWFJWGZLHEYKGGKBVMRNYGNFJKMGPQKTPUXZGDXHGPKOYEUKRMQUKJKWALQGZMJSNBRNOADCLUALNNKBIVWGJZVUBWPEUXBGNJAGYDCMXIKEGPFDEQXMCNEZDODJLUBAHEJAGDIIXRZMRQPBPZXFVCVXSZGKRFOBIEFHKNBVDPDRMKSJZPEJRMKQJTPGMRVTAWFWLCINEINAVUZVMEGQDNSHVPZKYVASFADNYYLBXJGTWEQTUCXDRLVNGWABCEFQDZOPENIGTNMNJAUPINBHWSJJPHYQIHDCVJLGFALZPIMEDRBWJCLJOPRCVLRDTDUEYSBEUOSXUXQCOSVRPICQQUJVSUAJYCNLYCIKXEHDKARKRJZZJDWZOFAPMTVIRNKXQXJYMRDTMWXRTWBRYJUTFEQXWVACUIGJLKDMMOVYDMFTGEKWYGYILEZEGAGOTGOSIMOEUNQAMXUKRTRVNZRBYKAPMSJNQXMQONOKFLAAUOHTTKEHYMDDVWDKRHUCUEFBDFUZRVVZNFFZJUKJPMCYFQPPHOHVCYFGDVSTCLEHHTFEEBSDOMBUPLCGGLBYOBTOXFUIJEWRRZJSCAAUNTPFMRPCQZEUJLFDLRWYFOQQBAQVTGEENGMELYIELZHHZMFANKQXGELNTSTHTNPREYMBZKSTUXRXYJKYFAHVQGIWSEWUGVDTWGZNBUNRUMMJXTCCPOKWDMVNHUTNKNEVWMNZKRHLGFSJVEBBUDWTSODMKPDVJWMBGBXAPWIJOICWQNHGFPQVALWGJCKDBVAOACDTDGVJMMXGMSIJWTEUCEKKYGGWRDIODVMMOICFPONSARDUEFELCMJWNQVCNYUFBTBKUTPHJEACWBWKNGGVMHGVEYJITPTWYXAGQAMWEQOBFZKUEAWWAJRKOGMCVZBGVCWACUKUIKLXEQWXPMZIYVYSCIKURJRSDUPKAJIBYIOOJKZFADKIGCRJTTSNPXNMQPRFZJXDZAMDMNYTQFLHKRDBIYETEVRYYJXTYLLQDVYHEHHRWVEQCXFAYKJAPSHRWQMBULEGLIOHSGVALGYTOOBMNDPIIQTMDAHZKXXJNLWYSTOEPGFSBVRZNDJGBTOISOWWABQIFWJDGHHVPCMSIPVWCQJITTHWLSIYYEICSKOQVAHWZLLGUVJMHIOCWFCMNGLSFPTMLTXIGUPZSAENQQFBWBOKEMDWOICZZJECUYYIXRJSMUNSWYYANKWHOQIRLLCZTVCEYECUDOFLVVXLAZRYPKKZNPLBDEDBDAVUFEQNUXQFBSKMBQBFGLWWYLKXXSJNYXYXXOTBHTDSCOSLWGEIYMFFIROFZKQPMUFXPEONVBVWSDZOPBRTEFTJBRRNUIWCWLDVHIUDDHUZIEXDSZIXZOQNVNTLAWLTUWXZLMTJGUDDMYFTAPQPVATLKMHBVTWKWHLNBZPBKCGGJRLSMHVPRKDZNGVIKBBIFQESUZMOOCJRTBGYUTYJHKMHFPLAGWQPXZQBPRKRIAMCUACUPURUESGNVSOPVSGWYBTBXGAZUJWJMBXIMBZIHSGJIOIEGVSMAJHPZNDXIYWKMBLWRWIGRBMCNMIUXAGUGZLMPROUPWQNQWVVOWUNUDQWFBQRBYIMLKSCXFXOLITJWOYNKAUUXJCFEBIPSTTFKBGFTYARCLAVGJZJVNXADIQASDWQXDEYLMJMCIDCTMTCWCJRVJSZWXSLEONXEUFCTBWQGIAUVSQMTVRWIEZYKMGPOYYFEGAWRFHXLSSWEQTIGKOALUNHYXZRDOYVHMRVOHNBCMKKLOBDLKCQQMUPHQIOFAPZYAZFRDCVYHQSLJNSDLBQXJOKERCHGDQWFDVVVVWWUGYOGOIUWXHDRXHIIBRKXUEQSYEKKDFRLEFATKPWIQKCGCWUSBCSMFSBYTOWMDKHGUENNCSXYXXDOSKYJZLZYOJWBKNQHEIGN',
now()
);


# Repeat the command slowly and cdcsink works. Repeat the stmt faster, it will break, see below output...

# ...from cdcsink
2021/08/06 16:17:35 crdbusertable: added 8 operations
2021/08/06 16:17:45 ERROR: read binary tuple: partial copy data row (SQLSTATE XXUUU)

# ...from source
root@:30000/defaultdb> show job 682263940606296065;        job_id       |  job_type  |                                                           description                                                           | statement | user_name | status  |                                                                                                                       running_status                                                                                                                        |          created           |          started           | finished |          modified          | fraction_completed | error | coordinator_id
---------------------+------------+---------------------------------------------------------------------------------------------------------------------------------+-----------+-----------+---------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------+----------------------------+----------+----------------------------+--------------------+-------+-----------------
  682263940606296065 | CHANGEFEED | CREATE CHANGEFEED FOR TABLE defaultdb.t INTO 'experimental-http://127.0.0.1:30004/crdbusertable' WITH resolved = '10s', updated |           | root      | running | retryable error: retryable changefeed error: retryable http error: Put "http://127.0.0.1:30004/crdbusertable/2021-08-06/202108062017314865210000000000001-2603f532e206235a-1-12-00000000-t-1.ndjson": dial tcp 127.0.0.1:30004: connect: connection refused | 2021-08-06 20:13:51.093247 | 2021-08-06 20:13:51.376963 | NULL     | 2021-08-06 20:18:16.100691 | NULL               |       |           NULL
(1 row)


# ...from target
root@:30002/defaultdb> select count(*) from _cdc_sink.defaultdb_t;
  count
---------
      8
(1 row)

Rename the Project to Brooklyn Bridge

The original goal of cdc-sink was just to take in a CockroachDB cdc feed and upsert it into another CockroachDB.

As this project has evolved to being able to take in other types of feeds, I feel it's time that it's name be changed to something less exacting.

I think Brooklyn Bridge is a good and catchy name for it. Cockroach Labs is located (primarily) in NYC and this tool acts as a bridge to help get data into the db.

Improve documentation for postgres logical replication

While these steps are available in the Postgres documentation, it would be useful to explicitly mention them:

  • wal_level should be set to "logical"
  • During bulk migration, "SELECT pg_export_snapshot()" needs to be executed in one session that needs to be left open while running pg_dump --snapshot

Sequence value is not synced

The sequence created on target database is not in sync with the source.

# Open terminal for the source
cockroach start-single-node --listen-addr :30000 --http-addr :30001 --store cockroach-data/30000 --insecure --background

cockroach sql --insecure --port 30000 <<EOF
CREATE SEQUENCE customer_seq;
create table t (id int, s string, rownum INT DEFAULT nextval('customer_seq'), ts timestamp, primary key (id, s));
-- add enterprise license
SET CLUSTER SETTING cluster.organization = 'Workshop';
SET CLUSTER SETTING enterprise.license = 'crl-0-ELfszokGGAIiCFdvcmtzaG9w';
SET CLUSTER SETTING kv.rangefeed.enabled = true;
CREATE CHANGEFEED FOR TABLE defaultdb.t INTO 'experimental-http://127.0.0.1:30004/crdbusertable' WITH updated,resolved='10s';
EOF



# Open terminal for target
cockroach start-single-node --listen-addr :30002 --http-addr :30003 --store cockroach-data/30002 --insecure --background

cockroach sql --insecure --port 30002 <<EOF
CREATE SEQUENCE customer_seq;
create table t (id int, s string, rownum INT DEFAULT nextval('customer_seq'), ts timestamp, primary key (id, s));
EOF


# Open terminal for cdc-sink
cdc-sink --port 30004 --conn postgresql://root@localhost:30002/defaultdb?sslmode=disable --config '[{"endpoint":"crdbusertable", "source_table":"t", "destination_database":"defaultdb", "destination_table":"t"}]'



# On source
cockroach sql --insecure --port 30000

insert into t (id, s, ts) values (1, 'hello', now());
insert into t (id, s, ts) values (2, 'hello', now());

SELECT * FROM customer_seq;
  last_value | log_cnt | is_called
-------------+---------+------------
           2 |       0 |   true

# on target
cockroach sql --port 30002 --insecure

SELECT * FROM customer_seq;
  last_value | log_cnt | is_called
-------------+---------+------------
           0 |       0 |   true

Use flag groups

New feature in flags library from #160 allows groups of flags to be required as a group. This would be useful for some of the TLS flags, where we need the key and certificate as pairs.

Think about bi-directional sync

Discussion issue for thinking about how two clusters could be kept in eventual-sync:

  • Source/destination cluster tracking column to avoid looping cdc-sink'ed mutations
  • MVCC timestamp comparison to drop too-old events from source cluster
  • Conflict resolution detection / logic?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.