Coder Social home page Coder Social logo

pykafka's Introduction

pykafka

pykafka allows you to produce messages to the Kafka distributed publish/subscribe messaging service.

Requirements

You need to have access to your Kafka instance and be able to connect through TCP. You can obtain a copy and instructions on how to setup kafka at https://github.com/kafka-dev/kafka

Installation

pip install pykafka

Usage

Sending a simple message

import kafka

producer = kafka.producer.Producer('test')
message  = kafka.message.Message("Foo!")
producer.send(message)

Sending a sequence of messages

import kafka

producer = kafka.producer.Producer('test')
message1 = kafka.message.Message("Foo!")
message2 = kafka.message.Message("Bar!")
producer.send([message1, message2])

Batching a bunch of messages using a context manager.

import kafka
producer = kafka.producer.Producer('test')

with producer.batch() as messages:
  print "Batching a send of multiple messages.."
  messages.append(kafka.message.Message("first message to send")
  messages.append(kafka.message.Message("second message to send")
  • they will be sent all at once, after the context manager execution.

Consuming messages one by one

import kafka
consumer = kafka.consumer.Consumer('test')
messages = consumer.consume()

Consuming messages using a generator loop

import kafka

consumer = kafka.consumer.Consumer('test')

for message in consumer.loop():
  print message

Contact:

Please use the GitHub issues: https://github.com/dsully/pykafka/issues

pykafka's People

Contributors

colindickson avatar michielbaird avatar premal avatar rancerbeta avatar sholsapp avatar williame avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

pykafka's Issues

Kafka Socket Error

Hi,
I am running Tail2kafka but I get Flush message's and when I see the server I Got the below Error:
ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
at kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:46)
at kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42)
at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:39)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:39)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
kafka
tail2kafka

Kafka 0.8 Support

Kafka 0.8 includes a breaking update to the protocol. Should there be a development branch of pykafka targeting the new protocol?

Kafka Socket Error

Hi,
I am running Tail2kafka but I get Flush message's and when I see the server I Got the below Error:
ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
at kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:46)
at kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42)
at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:39)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:39)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
kafka
tail2kafka

Add zookeeper support

Hi, may be I can't found it but a very insteresting feature of kafka is use zookeeper as coordinator and discovery engine, perhaps an interesting feature to add to Python client is the zookeeper support.

Thanks.
Gabriel

.

.

Installation through pip fails

Hi,

I'm trying to install pykafka through pip on Ubuntu 11.10 and i'm getting an error trying to find the README.md file.... Performing pip install directly on the git source tree works properly. Seems that the pip "release" is not identical to the current github release.

I'm trying to write a utility which relies on pykafka and this issue makes it harder to use.

Thanks
RL

Full error:

Downloading/unpacking pykafka
Downloading pykafka-0.1.tar.gz
Running setup.py egg_info for package pykafka
Traceback (most recent call last):
File "", line 14, in
File "/var/tmp/ppp/q/build/pykafka/setup.py", line 12, in
description = open('README.md').read(),
IOError: [Errno 2] No such file or directory: 'README.md'
Complete output from command python setup.py egg_info:
Traceback (most recent call last):

File "", line 14, in

File "/var/tmp/ppp/q/build/pykafka/setup.py", line 12, in

description = open('README.md').read(),

IOError: [Errno 2] No such file or directory: 'README.md'

Add support for new message format

Kafka uses a new messaging format in the most recent versions. This is denoted by the magic byte being set to 1 and is followed by some message attributes which let you know if compression was used, and if so what type. See the Kafka design guide for more details.

FYI: The ruby client looks like they've updated for this already.

After old messages are removed from the server, consuming from offset 0 fails

To reproduce:

  1. Set the log.retention.hours = 1 in config/server.properties on a test kafka server
  2. Add a batch of messages using the Producer into a new topic
  3. Consume the batch of messages from step 2, and print out the value of consumer.offset
  4. Wait an hour for those messages to be deleted. Make some tea, and/or browse reddit while you wait.
  5. If you add another batch of messages now into the same topic and attempt to consume the messages starting at offset 0, the consumer.consume() loop will not return anything. Only if you instantiate the consumer using the offset that you took from the end of step 3 will you be able to retrieve the new messages.

But what if you don't know that offset? It probably is not easy to keep track of that offset in a production environment where older messages are deleted every hour.

Otherwise, there's this ugly O(N) hack solution to finding the offset where the messages begin, in the case where we don't know where on the disk the messages begin.

offset = -1
found = False
while not found:
    offset += 1
    consumer = kafka.consumer.Consumer(topic, offset=offset)
    for message in consumer.consume():
        if len(str(message)) > 0:
            print offset
            found = True
        break

(again, this code is awful and not recommended to use, but just something i wrote to get my answer for myself. would be a disaster to run on an empty topic, or even on a topic where the offset is a huge number.)

Can you think of a prettier solution to put into consume.py which could allow us to consume all the messages in a topic, even after some have been deleted by the server?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.