Coder Social home page Coder Social logo

pg2ch's Introduction

PostgreSQL to ClickHouse

Continuous data transfer from PostgreSQL to ClickHouse using logical replication mechanism.

Status of the project

Currently pg2ch tool is in active testing stage, as for now it is not for production use

Getting and running

Get:

    go get -u github.com/mkabilov/pg2ch

Run:

    pg2ch --config {path to the config file (default config.yaml)}

Config file

tables:
    {postgresql table name}:
        main_table: {clickhouse table name}
        buffer_table: {clickhouse buffer table name} # optional, if not specified, insert directly to the main table
        buffer_row_id: {clickhouse buffer table column name for row id} 
        init_sync_skip: {skip initial copy of the data}
        init_sync_skip_buffer_table: {if true bypass buffer_table and write directly to the main_table on initial sync copy}
                                     # makes sense in case of huge tables        
        init_sync_skip_truncate: {skip truncate of the main_table during init sync}                                 
        engine: {clickhouse table engine: MergeTree, ReplacingMergeTree or CollapsingMergeTree}
        max_buffer_length: {number of DML(insert/update/delete) commands to store in the memory before flushing to the buffer/main table } 
        merge_threshold: {if buffer table specified, number of buffer flushed before moving data from buffer to the main table}
        columns: # postgres - clickhouse column name mapping, 
                 # if not present, all the columns are expected to be on the clickhouse side with the exact same names 
            {postgresql column name}: {clickhouse column name}
        is_deleted_column: # in case of ReplacingMergeTree 1 will be stored in the {is_deleted_column} in order to mark deleted rows
        sign_column: {clickhouse sign column name for CollapsingMergeTree engines only, default "sign"}
        ver_column: {clickhouse version column name for the ReplacingMergeTree engine, default "ver"}

inactivity_merge_timeout: {interval, default 1 min} # merge buffered data after that timeout

clickhouse: # clickhouse tcp protocol connection params
    host: {clickhouse host, default 127.0.0.1}
    port: {tcp port, default 9000}
    database: {database name}
    username: {username}
    password: {password}
    params:
        {extra param name}:{extra param value}
        ...

postgres: # postgresql connection params
    host: {host name, default 127.0.0.1}
    port: {port, default 5432}
    database: {database name}
    user: {user}
    replication_slot_name: {logical replication slot name}
    publication_name: {postgresql publication name}
    
db_path: {path to the persistent storage dir where table lsn positions will be stored}

Sample setup:

  • make sure you have PostgreSQL server running on localhost:5432
    • set wal_level in the postgresql config file to logical
    • set max_replication_slots to at least 2
  • make sure you have ClickHouse server running on localhost:9000 e.g. in the docker
  • create database pg2ch_test in PostgreSQL: CREATE DATABASE pg2ch_test;
  • create a set of tables using pgbench command: pgbench -U postgres -d pg2ch_test -i
  • change replica identity for the pgbench_accounts table to FULL, so that we'll receive old values of the updated rows: ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL;
  • create PostgreSQL publication for the pgbench_accounts table: CREATE PUBLICATION pg2ch_pub FOR TABLE pgbench_accounts;
  • create PostgreSQL logical replication slot: SELECT * FROM pg_create_logical_replication_slot('pg2ch_slot', 'pgoutput');
  • create tables on the ClickHouse side:
CREATE TABLE pgbench_accounts (aid Int32, abalance Int32, sign Int8) ENGINE = CollapsingMergeTree(sign) ORDER BY aid
-- our target table

CREATE TABLE pgbench_accounts_buf (aid Int32, abalance Int32, sign Int8, row_id UInt64) ENGINE = Memory()
-- will be used as a buffer table
  • create config.yaml file with the following content:
tables:
    pgbench_accounts:
        main_table: pgbench_accounts
        buffer_table: pgbench_accounts_buf
        buffer_row_id: row_id
        engine: CollapsingMergeTree
        max_buffer_length: 1000
        merge_threshold: 4
        columns:
            aid: aid
            abalance: abalance
        sign_column: sign

inactivity_merge_timeout: '10s'

clickhouse:
    host: localhost
    port: 9000
    database: default
    username: default
postgres:
    host: localhost
    port: 5432
    database: pg2ch_test
    user: postgres
    replication_slot_name: pg2ch_slot
    publication_name: pg2ch_pub
    
db_path: db
  • run pg2ch to start replication:
    pg2ch --config config.yaml
  • run pgbench to have some test load:
    pgbench -U postgres -d pg2ch_test --time 30 --client 10 
  • wait for inactivity_merge_timeout period (in our case 10 seconds) so that data in the memory gets flushed to the table in ClickHouse
  • check the sums of the abalance column both on ClickHouse and PostgreSQL:
    • ClickHouse: SELECT SUM(abalance * sign), SUM(sign) FROM pgbench_accounts (why multiply by sign column?)
    • PostgreSQL: SELECT SUM(abalance), COUNT(*) FROM pgbench_accounts
  • numbers must match; if not, please open an issue.

pg2ch's People

Contributors

michaelhood avatar mkabilov avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pg2ch's Issues

Clickhouse table doesn't have same counts as Postgres table (following readme example)

After following the readme example, I end up with:

In Clickhouse:

SELECT SUM(abalance * sign), SUM(sign), count(0) FROM pgbench_accounts;

SELECT
    SUM(abalance * sign),
    SUM(sign),
    count(0)
FROM pgbench_accounts

┌─SUM(multiply(abalance, sign))─┬─SUM(sign)─┬─count(0)─┐
│                       -931490049744 │
└───────────────────────────────┴───────────┴──────────┘

And in Postgres:

pg2ch_test=# SELECT SUM(abalance), COUNT(*) FROM pgbench_accounts;
   sum   | count  
---------+--------
 -931490 | 100000
(1 row)

I see that the sums are the same but the counts are different. Is this because only changes are replicated and, thus, only rows that underwent a change are represented in Clickhouse?

CollapsingSortedBlockInputStream: Incorrect data: number of rows with sign = 1

Hello.

We're using a 2 node cluster of ClickHouse with CollapsingMergeTree engine tables.

We're just testing out the pg2ch plugin, and getting problems with updates for this engine. CH generates warnings about wrong number of rows with signs 1, -1, sometimes. As we noticed, this happens quite often when pg2ch receives a whole bunch of DML operations corresponding to the same unique primary id.

The error looks like:

2019.09.24 13:42:14.484495 [ 17 ] {} <Warning> CollapsingSortedBlockInputStream: Incorrect data: number of rows with sign = 1 (4) differs with number of rows with sign = -1 (1) by more than one (for key: 1219488).

Row from the logs above had 12 rows corresponding to it right after the warning, and the strange thing it actually collapsed to 2 rows after a while.

Can you help with this issue?

Mistake in readme

There is a little mistake in readme guide. In the description of the configuration file there is 'pg' key for the postgres connection and etc. This error follows:

root@9529452bd859:/home/go/pg2ch# go run main.go --config config.yaml
could not load config: publication name is not specified
exit status 1

If you look in the source file (pkg\config\config.go) there is different key 'postgres'
And if replace 'pg' with 'postgres', it works correctly.

partitioned tables in config

in case of setting up replication for child tables of the partitioned table it will be useful to specify pattern for tables, i.e. instead of :

    partitions.report_2017_03_01:
        main_table: report
        engine: CollapsingMergeTree
        init_sync_skip_truncate: true
    partitions.report_2017_04_01:
        main_table: report
        engine: CollapsingMergeTree
        init_sync_skip_truncate: true
    partitions.report_2017_05_01:
        main_table: report
        engine: CollapsingMergeTree
        init_sync_skip_truncate: true
    partitions.report_2017_01_01:
        main_table: report
        engine: CollapsingMergeTree
        init_sync_skip_truncate: true

write something like that:

    partitions.report_*:
        main_table: report
        engine: CollapsingMergeTree
        init_sync_skip_truncate: true

TBD

Roadmap to production

Hello Murat,

According to the Readme file in the project description, it is not ready for production.
What tasks are yet to be done?
Or what possible caveates should be kept in mind while working with it?

Thank you!

Could Not Sync Tables

I am getting this error when I run pg2ch command.

The script started working fine, the synchronization started, and then I got this error:

could not start: could not sync tables: could not sync $TABLE_NAME: could not commit transaction: driver: bad connection

diskv s.storage.ReadString undefined

When I try

go get -u github.com/mkabilov/pg2ch

I will got:

# github.com/mkabilov/pg2ch/pkg/utils/kvstorage
root/go/pkg/mod/github.com/mkabilov/[email protected]/pkg/utils/kvstorage/diskv.go:52:41: s.storage.ReadString undefined (type *diskv.Diskv has no field or method ReadString)
root/go/pkg/mod/github.com/mkabilov/[email protected]/pkg/utils/kvstorage/diskv.go:60:18: s.storage.WriteString undefined (type *diskv.Diskv has no field or method WriteString)

Add some tests

Project needs to be covered with tests, both integration and unit

skip new columns

Skip columns which are added on postgresql side after initial sync.
(that will involve tracking the RELATION message on logical replication protocol)

Replication Failed Timeout 10s

@mkabilov Hi Murat, thanks for creating a useful tool! It runs successfully (replicates the rows from PG to CH) but the timeouts (doesn't keep listening for new inserts) after 10s. See log below. Any ideas why this is happening or how can it be fixed?

I'm using PG v14.2 and CH v22.2.3.5.

pg2ch $ go run main.go --config config.yaml
2022/02/28 17:57:12 consuming changes for table testtable starting from 0/1C7B250 lsn position
2022/02/28 17:57:12 generation_id: 5
2022/02/28 17:57:12 Starting from 0/1C7B250 lsn
2022/02/28 17:57:12 Primary Keepalive Message => ServerWALEnd: 0/1C7A520 ServerTime: 2022-02-28 17:57:12.383815 -0500 EST ReplyRequested: false
2022/02/28 17:57:12 XLogData => WALStart 0/1C7A670 ServerWALEnd 0/1C7A670 ServerTime: 2022-02-28 17:57:12.414696 -0500 EST WALData size 21
2022/02/28 17:57:12 XLogData => WALStart 0/0 ServerWALEnd 0/0 ServerTime: 2022-02-28 17:57:12.415047 -0500 EST WALData size 326
2022/02/28 17:57:12 XLogData => WALStart 0/1C7A670 ServerWALEnd 0/1C7A670 ServerTime: 2022-02-28 17:57:12.415365 -0500 EST WALData size 325
2022/02/28 17:57:12 XLogData => WALStart 0/1C7B280 ServerWALEnd 0/1C7B280 ServerTime: 2022-02-28 17:57:12.4154 -0500 EST WALData size 26
2022/02/28 17:57:12 Primary Keepalive Message => ServerWALEnd: 0/1C7B2B8 ServerTime: 2022-02-28 17:57:12.419801 -0500 EST ReplyRequested: false
2022/02/28 17:57:22 replication failed: receive message failed: read tcp [::1]:64963->[::1]:5432: i/o timeout

It looks like the error is generating on

c.close(fmt.Errorf("replication failed: %v", err))

Working with big DBs

Since this project is using Logical Replication and we have limitation of number of slots in our PostgreSQL instances what we should do about databases with lots of table inside?
I think using Physical Replication can handle this problem by making just a single Slot
And also Physical Slots are more reliable

Please support 'block_size' in clickhouse connection configuration

Initial sync for very large and wide(500columns) table will fail
DB::Exception: Memory limit (for query) exceeded: would use 9.31 GiB (attempt to allocate chunk of 8388608 bytes)
After discover, it cause by clickhouse driver default block_size=1000000.
When the table is too wide, 1million rows will execeed Memory limit 10G.
Add a setting for 'block_size' will solve this.

multiple tables in a single tx

if multiple table are updated in a single transaction, the merge won't be synched among them.
so the state on the ClickHouse side will be inconsistent.

could not create temporary replication slot

Hello! I have problem while sending data from PG partitioned table to CH MergeTree table over buffer table with Memory Engine.
Here is full body of ERROR
could not start: could not sync tables: could not create temporary replication slot: could not scan: ERROR: syntax error (SQLSTATE 42601)
PG logs onlys shows ERROR: syntax error

How to safely stop replication?

Is there a way to safely stop pg2ch replication?

I need to run pg2ch for few minutes each hour and then to stop it to avoid keeping connections open on PostgreSQL. Is there a way to do that rather than the kill command?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.