I'm running latest PyPi aiokafka 0.2.2. with kafka-python 1.3.1, and when running the following code I get the following mysterious error, which disappears when I downgrade to 0.2.1 - can you please take a look?
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
import asyncio, random
topic = 'test_topic_' + str(random.randint(1, 100000))
async def produce():
# Just adds message to sending queue
future = await producer.send(topic, b'some_message_bytes')
resp = await future
print("Message produced: partition {}; offset {}".format(
resp.partition, resp.offset))
async def consume_task(consumer):
try:
msg = await consumer.getone()
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
except KafkaError as err:
print("error while consuming message: ", err)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='192.168.99.100:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce())
loop.run_until_complete(producer.stop())
consumer = AIOKafkaConsumer(topic, loop=loop, bootstrap_servers='192.168.99.100:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(consumer.start())
loop.run_until_complete(consume_task(consumer))
# Will gracefully leave consumer group; perform autocommit if enabled
loop.run_until_complete(consumer.stop())
Task exception was never retrieved
future: <Task finished coro=<AIOKafkaConsumer._update_fetch_positions() done, defined at C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\consumer.py:576> exception=ValueError(<class 'struct.error'>,)>
Traceback (most recent call last):
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 10, in _pack
return pack(f, value)
struct.error: required argument is not an integer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py", line 239, in _step
result = coro.send(None)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\consumer.py", line 595, in _update_fetch_positions
yield from self._fetcher.update_fetch_positions(partitions)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 514, in update_fetch_positions
x.result()
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\futures.py", line 274, in result
raise self._exception
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\asyncio\tasks.py", line 239, in _step
result = coro.send(None)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 535, in _reset_offset
offset = yield from self._offset(partition, timestamp)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 560, in _offset
partition, timestamp)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\fetcher.py", line 598, in _proc_offset_request
response = yield from self._client.send(node_id, request)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\client.py", line 375, in send
request, expect_response=expect_response)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\aiokafka\conn.py", line 141, in send
message = header.encode() + request.encode()
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\struct.py", line 34, in _encode_self
[self.__dict__[name] for name in self.SCHEMA.names]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in encode
[self.array_of.encode(item) for item in items]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in <listcomp>
[self.array_of.encode(item) for item in items]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in encode
[self.array_of.encode(item) for item in items]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 162, in <listcomp>
[self.array_of.encode(item) for item in items]
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in encode
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 124, in <listcomp>
for i, field in enumerate(self.fields)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 56, in encode
return _pack('>q', value)
File "C:\Users\Egor\Anaconda2\envs\py3k\lib\site-packages\kafka\protocol\types.py", line 12, in _pack
raise ValueError(error)
ValueError: <class 'struct.error'>
Message produced: partition 0; offset 0