Coder Social home page Coder Social logo

116davinder / apache-kafka-backup-and-restore Goto Github PK

View Code? Open in Web Editor NEW
14.0 3.0 2.0 162 KB

It will take backup of given topic and store that into either local filesystem or S3.It will also restore from given local filesystem or S3.

License: Other

Python 93.10% Mustache 6.90%
backup restore kafka apache-kafka python3 linux s3 aws-backup threading consumer producer producer-consumer gcp google-cloud-storage kafka-backup partition topic

apache-kafka-backup-and-restore's Introduction

Apache Kafka Backup and Restore

Production Kafka Deployment Using Ansible

General Notes

Requirements

  • confluent_kafka
  • boto3
  • google-cloud-storage
  • pendulum
  • azure-storage-blob
  • minio

Kafka Backup Application

  • It will take backup of given topic and store that into either local filesystem or S3 or Azure.
  • It will auto resume from same point from where it died if given consumer group name is same before and after crash.
  • it will upload current.bin file to s3 which contains messages upto NUMBER_OF_MESSAGE_PER_BACKUP_FILE but will only upload with other backup files.
  • RETRY_UPLOAD_SECONDS controls upload to cloud storage.
  • NUMBER_OF_KAFKA_THREADS is used to parallelise reading from kafka topic. It should not be more than number of partitions.
  • NUMBER_OF_MESSAGE_PER_BACKUP_FILE will try to keep this number consistent in file but if application got restarted then it may be vary for first back file.

Kafka Restore Application

  • it will restore from backup dir into given topic.
  • RETRY_SECONDS controls when to reread FILESYSTEM_BACKUP_DIR for new files.
  • RESTORE_PARTITION_STRATEGY controls, in which partition it will restore messages. if same is mentioned then it will restore into same topic partition but if random is mentioned then it will restore to all partitions randomly.

Known Issues

  • NA

How to Run Applications

Azure Cloud Readme

AWS Cloud Readme

Minio Readme

Local Readme

apache-kafka-backup-and-restore's People

Contributors

116davinder avatar grglzrv avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

Forkers

vligade grglzrv

apache-kafka-backup-and-restore's Issues

aws s3 restore doesn't work

davinderpal@DESKTOP-07TAJVL:~/projects/apache-kafka-backup-and-restore$ python3 restore.py example-jsons/restore-s3.json
{ "@timestamp": "2022-10-17 23:07:48,509","level": "INFO","thread": "Kafka Restore Thread","name": "root","message": "retry for more files in /tmp/davinder.test after 100" }
{ "@timestamp": "2022-10-17 23:07:48,513","level": "INFO","thread": "S3 Download","name": "botocore.credentials","message": "Found credentials in environment variables." }
Exception in thread S3 Download:
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.10/threading.py", line 946, in run
    self._target(*self._args, **self._kwargs)
  File "/mnt/c/Users/Davinder/Documents/apache-kafka-backup-and-restore/cloud/aws.py", line 120, in s3_download
    s3_client = boto3.client('s3', endpoint_url="http://localhost:4566", use_ssl=False)
  File "/home/davinderpal/.local/lib/python3.10/site-packages/boto3/__init__.py", line 92, in client
    return _get_default_session().client(*args, **kwargs)
  File "/home/davinderpal/.local/lib/python3.10/site-packages/boto3/session.py", line 299, in client
    return self._session.create_client(
  File "/home/davinderpal/.local/lib/python3.10/site-packages/botocore/session.py", line 976, in create_client
    client = client_creator.create_client(
  File "/home/davinderpal/.local/lib/python3.10/site-packages/botocore/client.py", line 115, in create_client
    cls = self._create_client_class(service_name, service_model)
  File "/home/davinderpal/.local/lib/python3.10/site-packages/botocore/client.py", line 172, in _create_client_class
    self._event_emitter.emit(
  File "/home/davinderpal/.local/lib/python3.10/site-packages/botocore/hooks.py", line 412, in emit
    return self._emitter.emit(aliased_event_name, **kwargs)
  File "/home/davinderpal/.local/lib/python3.10/site-packages/botocore/hooks.py", line 256, in emit
    return self._emit(event_name, kwargs)
  File "/home/davinderpal/.local/lib/python3.10/site-packages/botocore/hooks.py", line 239, in _emit
    response = handler(**kwargs)
  File "/home/davinderpal/.local/lib/python3.10/site-packages/boto3/utils.py", line 65, in _handler
    module = import_module(module)
  File "/home/davinderpal/.local/lib/python3.10/site-packages/boto3/utils.py", line 56, in import_module
    __import__(name)
  File "/home/davinderpal/.local/lib/python3.10/site-packages/boto3/s3/inject.py", line 16, in <module>
    from boto3.s3.transfer import (
  File "/home/davinderpal/.local/lib/python3.10/site-packages/boto3/s3/transfer.py", line 126, in <module>
    from s3transfer.exceptions import (
  File "/home/davinderpal/.local/lib/python3.10/site-packages/s3transfer/__init__.py", line 373, in <module>
    class MultipartUploader:
  File "/home/davinderpal/.local/lib/python3.10/site-packages/s3transfer/__init__.py", line 388, in MultipartUploader
    executor_cls=concurrent.futures.ThreadPoolExecutor,
  File "/usr/lib/python3.10/concurrent/futures/__init__.py", line 49, in __getattr__
    from .thread import ThreadPoolExecutor as te
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 37, in <module>
    threading._register_atexit(_python_exit)
  File "/usr/lib/python3.10/threading.py", line 1497, in _register_atexit
    raise RuntimeError("can't register atexit after shutdown")
RuntimeError: can't register atexit after shutdown
^CException ignored in: <module 'threading' from '/usr/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.10/threading.py", line 1560, in _shutdown
    lock.acquire()
KeyboardInterrupt:

restore script logic issue

The below Function only returns the count and later we use for on this count variable to fetch backup files.

    def s3_count_partitions(s3_client,bucket,topic):
        """It will return number of objects in a given s3 bucket and s3 bucket path."""

        try:
            return s3_client.list_objects_v2(
                Bucket=bucket,
                Prefix=topic + "/",
                Delimiter='/'
            )['KeyCount']
        except NoCredentialsError as e:
            logging.error(e)
            exit(1)

Edge Case:
Initial Partition Count: 10
Backup script copies partitions : 0,1,4,6,7,8 only
Restore script aka above-mentioned function will return Count: 6

Now when following for loop on for p in range(_pc): will only consider 0,1,2,3,4,5 partitions and few of them won't even
exists in S3 so it will keep failing for them

    def s3_download(bucket,topic,tmp_dir,retry_download_seconds=60):
        s3_client = boto3.client('s3')
        while True:
            _pc = Download.s3_count_partitions(s3_client,bucket,topic)
            # create temp. topic directory
            for p in range(_pc):
                os.makedirs(os.path.join(tmp_dir,topic,str(p)),exist_ok=True)

            for p in range(_pc):
                os.makedirs(os.path.join(tmp_dir,topic,str(p)),exist_ok=True)

            for _pt in range(_pc):

                _ck = checkpoint.read_checkpoint_partition(tmp_dir,topic,str(_pt))
                _partition_path = os.path.join(topic,str(_pt))
                _s3_partition_files = Download.s3_list_files(s3_client,bucket,_partition_path)

Error

davinderpal@DESKTOP-07TAJVL:~/projects/apache-kafka-backup-and-restore$ python3 restore.py example-jsons/restore-s3.json
{ "@timestamp": "2022-10-17 23:30:34,750","level": "INFO","thread": "Kafka Restore Thread","name": "root","message": "retry for more files in /tmp/davinder.test after 100" }
{ "@timestamp": "2022-10-17 23:30:34,853","level": "INFO","thread": "MainThread","name": "root","message": "Test messeage" }
{ "@timestamp": "2022-10-17 23:30:34,861","level": "INFO","thread": "MainThread","name": "botocore.credentials","message": "Found credentials in environment variables." }
{ "@timestamp": "2022-10-17 23:30:34,909","level": "WARNING","thread": "MainThread","name": "root","message": "[Errno 2] No such file or directory: '/tmp/davinder.test/0/checkpoint'" }
{ "@timestamp": "2022-10-17 23:30:34,931","level": "WARNING","thread": "MainThread","name": "root","message": "[Errno 2] No such file or directory: '/tmp/davinder.test/2/checkpoint'" }
{ "@timestamp": "2022-10-17 23:30:34,938","level": "WARNING","thread": "MainThread","name": "root","message": "[Errno 2] No such file or directory: '/tmp/davinder.test/3/checkpoint'" }
{ "@timestamp": "2022-10-17 23:30:34,946","level": "WARNING","thread": "MainThread","name": "root","message": "[Errno 2] No such file or directory: '/tmp/davinder.test/4/checkpoint'" }
{ "@timestamp": "2022-10-17 23:30:34,985","level": "INFO","thread": "MainThread","name": "root","message": "download success for /tmp/davinder.test/4/20221017-230228.tar.gz and its sha256 file " }
{ "@timestamp": "2022-10-17 23:30:34,985","level": "WARNING","thread": "MainThread","name": "root","message": "[Errno 2] No such file or directory: '/tmp/davinder.test/5/checkpoint'" }
{ "@timestamp": "2022-10-17 23:30:34,993","level": "INFO","thread": "MainThread","name": "root","message": "retry for new file after 100s in s3://kafka-backup/davinder.test" }

Potential Solution:
Instead of returning the count of partitions, we can return the actual number of partitons but with regex or split method to extract from
list_objects_v2 method call

"[Errno 2] No such file or directory: '/tmp/<topic name>/0/20221013-133145.tar.gz

Getting the following error

{ "@timestamp": "2022-10-13 11:48:40,328","level": "INFO","thread": "Kafka Consumer 1","name": "root","message": "started polling on <topic name>" }                                                                                                                                               │
│ { "@timestamp": "2022-10-13 11:48:40,328","level": "INFO","thread": "Kafka Consumer 0","name": "root","message": "started polling on <topic name>" }                                                                                                                                               │
│ { "@timestamp": "2022-10-13 11:48:40,623","level": "INFO","thread": "MainThread","name": "botocore.credentials","message": "Found credentials from IAM Role: <role name>" }                                                                                                             │
│ { "@timestamp": "2022-10-13 11:48:40,726","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 11:50:20,825","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 11:52:00,931","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 11:52:01,587","level": "INFO","thread": "S3 Upload Threads","name": "root","message": "upload successful at s3://<s3 bucket name>/<<topic name>>/0/20221013-115127.tar.gz" }                                                                                    │
│ { "@timestamp": "2022-10-13 11:52:58,904","level": "INFO","thread": "Kafka Consumer 0","name": "root","message": "Created Successful Backupfile /tmp/<topic name>/0/20221013-115127.tar.gz" }                                                                                                      │
│ { "@timestamp": "2022-10-13 11:52:58,904","level": "ERROR","thread": "Kafka Consumer 0","name": "root","message": "[Errno 2] No such file or directory: '/tmp/<topic name>/0/20221013-115127.tar.gz'" }                                                                                            │
│ { "@timestamp": "2022-10-13 11:53:41,031","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }
{ "@timestamp": "2022-10-13 12:50:24,321","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 12:52:04,422","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 12:53:44,522","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 12:55:24,614","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 12:57:04,714","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 12:58:44,813","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:00:24,913","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:02:05,005","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:03:45,105","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:05:25,205","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:07:05,304","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:08:45,404","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:10:25,501","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:12:05,601","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:13:45,701","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:15:25,801","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:17:05,902","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:18:45,961","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:20:26,061","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:22:06,161","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:23:46,261","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:25:26,358","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:27:06,457","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:28:46,557","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:30:26,652","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:32:06,753","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }                                                                                                                                                           │
│ { "@timestamp": "2022-10-13 13:32:07,312","level": "INFO","thread": "S3 Upload Threads","name": "root","message": "upload successful at s3://<s3 bucket name>/ <topic name>/0/20221013-133145.tar.gz" }                                                                                    │
│ { "@timestamp": "2022-10-13 13:32:28,669","level": "INFO","thread": "Kafka Consumer 0","name": "root","message": "Created Successful Backupfile /tmp/ <topic name>/0/20221013-133145.tar.gz" }                                                                                                      │
│ { "@timestamp": "2022-10-13 13:32:28,669","level": "ERROR","thread": "Kafka Consumer 0","name": "root","message": "[Errno 2] No such file or directory: '/tmp/ <topic name>/0/20221013-133145.tar.gz'" }                                                                                            │
│ { "@timestamp": "2022-10-13 13:33:46,825","level": "INFO","thread": "MainThread","name": "root","message": "s3 upload retry for new files in 100 seconds" }

backup.json file

{
  "BOOTSTRAP_SERVERS": "kafka01:9092,kafka02:9092,kafka03:9092",
  "TOPIC_NAMES": ["davinder.test"],
  "GROUP_ID": "Kafka-BackUp-Consumer-Group",
  "FILESYSTEM_TYPE": "S3",
  "BUCKET_NAME": "kafka-backup",
  "FILESYSTEM_BACKUP_DIR": "/tmp/",
  "NUMBER_OF_MESSAGE_PER_BACKUP_FILE": 5000000,
  "RETRY_UPLOAD_SECONDS": 100,
  "NUMBER_OF_KAFKA_THREADS": 2,
  "LOG_LEVEL": 20
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.