Coder Social home page Coder Social logo

radovankavicky / pykafka Goto Github PK

View Code? Open in Web Editor NEW

This project forked from parsely/pykafka

0.0 0.0 0.0 2.6 MB

Apache Kafka client for Python; high-level & low-level consumer/producer, with great performance.

Home Page: http://pykafka.readthedocs.org/

License: Apache License 2.0

Makefile 0.07% Shell 0.12% Python 92.17% C 7.64%

pykafka's Introduction

https://travis-ci.org/Parsely/pykafka.svg?branch=master https://codecov.io/github/Parsely/pykafka/coverage.svg?branch=master

PyKafka

http://i.imgur.com/ztYl4lG.jpg

PyKafka is a cluster-aware Kafka>=0.8.2 client for Python. It includes Python implementations of Kafka producers and consumers, which are optionally backed by a C extension built on librdkafka, and runs under Python 2.7+, Python 3.4+, and PyPy.

PyKafka's primary goal is to provide a similar level of abstraction to the JVM Kafka client using idioms familiar to Python programmers and exposing the most Pythonic API possible.

You can install PyKafka from PyPI with

$ pip install pykafka

Full documentation and usage examples for PyKafka can be found on readthedocs.

You can install PyKafka for local development and testing by cloning this repository and running

$ python setup.py develop

Getting Started

Assuming you have at least one Kafka instance running on localhost, you can use PyKafka to connect to it.

>>> from pykafka import KafkaClient
>>> client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093,...")

Or, for a TLS connection, you might write (and also see SslConfig docs for further details):

>>> from pykafka import KafkaClient, SslConfig
>>> config = SslConfig(cafile='/your/ca.cert',
...                    certfile='/your/client.cert',  # optional
...                    keyfile='/your/client.key',  # optional
...                    password='unlock my client key please')  # optional
>>> client = KafkaClient(hosts="127.0.0.1:<ssl-port>,...",
...                      ssl_config=config)

If the cluster you've connected to has any topics defined on it, you can list them with:

>>> client.topics
{'my.test': <pykafka.topic.Topic at 0x19bc8c0 (name=my.test)>}
>>> topic = client.topics['my.test']

Once you've got a Topic, you can create a Producer for it and start producing messages.

>>> with topic.get_sync_producer() as producer:
...     for i in range(4):
...         producer.produce('test message ' + str(i ** 2))

The example above would produce to kafka synchronously - the call only returns after we have confirmation that the message made it to the cluster.

To achieve higher throughput, we recommend using the Producer in asynchronous mode, so that produce() calls will return immediately and the producer may opt to send messages in larger batches. You can still obtain delivery confirmation for messages, through a queue interface which can be enabled by setting delivery_reports=True. Here's a rough usage example:

>>> with topic.get_producer(delivery_reports=True) as producer:
...     count = 0
...     while True:
...         count += 1
...         producer.produce('test msg', partition_key='{}'.format(count))
...         if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)
...             while True:
...                 try:
...                     msg, exc = producer.get_delivery_report(block=False)
...                     if exc is not None:
...                         print 'Failed to deliver msg {}: {}'.format(
...                             msg.partition_key, repr(exc))
...                     else:
...                         print 'Successfully delivered msg {}'.format(
...                         msg.partition_key)
...                 except Queue.Empty:
...                     break

Note that the delivery report queue is thread-local: it will only serve reports for messages which were produced from the current thread. Also, if you're using delivery_reports=True, failing to consume the delivery report queue will cause PyKafka's memory usage to grow unbounded.

You can also consume messages from this topic using a Consumer instance.

>>> consumer = topic.get_simple_consumer()
>>> for message in consumer:
...     if message is not None:
...         print message.offset, message.value
0 test message 0
1 test message 1
2 test message 4
3 test message 9

This SimpleConsumer doesn't scale - if you have two SimpleConsumers consuming the same topic, they will receive duplicate messages. To get around this, you can use the BalancedConsumer.

>>> balanced_consumer = topic.get_balanced_consumer(
...     consumer_group='testgroup',
...     auto_commit_enable=True,
...     zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'
... )

You can have as many BalancedConsumer instances consuming a topic as that topic has partitions. If they are all connected to the same zookeeper instance, they will communicate with it to automatically balance the partitions between themselves.

You can also use the Kafka 0.9 Group Membership API with the managed keyword argument on get_balanced_consumer.

Using the librdkafka extension

PyKafka includes a C extension that makes use of librdkafka to speed up producer and consumer operation. To use the librdkafka extension, you need to make sure the header files and shared library are somewhere where python can find them, both when you build the extension (which is taken care of by setup.py develop) and at run time. Typically, this means that you need to either install librdkafka in a place conventional for your system, or declare C_INCLUDE_PATH, LIBRARY_PATH, and LD_LIBRARY_PATH in your shell environment to point to the installation location of the librdkafka shared objects. You can find this location with locate librdkafka.so.

After that, all that's needed is that you pass an extra parameter use_rdkafka=True to topic.get_producer(), topic.get_simple_consumer(), or topic.get_balanced_consumer(). Note that some configuration options may have different optimal values; it may be worthwhile to consult librdkafka's configuration notes for this.

We currently test against librdkafka 0.9.1 only. Note that use on pypy is not recommended at this time; the producer is certainly expected to crash.

Operational Tools

PyKafka includes a small collection of CLI tools that can help with common tasks related to the administration of a Kafka cluster, including offset and lag monitoring and topic inspection. The full, up-to-date interface for these tools can be fould by running

$ python cli/kafka_tools.py --help

or after installing PyKafka via setuptools or pip:

$ kafka-tools --help

What happened to Samsa?

This project used to be called samsa. It has been renamed PyKafka and has been fully overhauled to support Kafka 0.8.2. We chose to target 0.8.2 because the offset Commit/Fetch API stabilized on that release.

The Samsa PyPI package will stay up for the foreseeable future and tags for previous versions will always be available in this repo.

PyKafka or kafka-python?

These are two different projects. See the discussion here for comparisons between the two projects.

Contributing

If you're interested in contributing code to PyKafka, a good place to start is the "help wanted" issue tag. We also recommend taking a look at the contribution guide.

Support

If you need help using PyKafka, there are a bunch of resources available. For usage questions or common recipes, check out the StackOverflow tag. The Google Group can be useful for more in-depth questions or inquries you'd like to send directly to the PyKafka maintainers. If you believe you've found a bug in PyKafka, please open a github issue after reading the contribution guide.

pykafka's People

Contributors

amontalenti avatar dan-blanchard avatar dirn avatar dodysw avatar emmettbutler avatar erickt avatar fxsjy avatar jeffwidman avatar johnistan avatar jonsource avatar kbourgoin avatar maparent avatar messense avatar mlaprise avatar mwhooker avatar nk412 avatar northisup avatar ottomata avatar pdex avatar rduplain avatar sontek avatar steffann avatar tanaysoni avatar thedrow avatar tiriplicamihai avatar tkaemming avatar vortec avatar yungchin avatar zoidbergwill avatar zware avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.