Coder Social home page Coder Social logo

joowani / kq Goto Github PK

View Code? Open in Web Editor NEW
573.0 573.0 24.0 111 KB

Kafka-based Job Queue for Python

Home Page: http://kq.readthedocs.io

License: MIT License

Python 100.00%
apache-kafka job-queue kafka kafka-client kafka-consumer kafka-producer producer-consumer python python3 queueing worker-queue

kq's People

Contributors

joowani avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kq's Issues

kq doesn't start on MacOS 11.1

I could probably switch to Docker in my development environment to get around this but figured I would report this.

Here's my basic anaconda environment:
environment.txt

I've tested both in bash and zsh. My worker setup is:

if __name__ == '__main__' and hasattr(args, "group") and args.group:
    # Set up logging.
    formatter = logging.Formatter('[%(levelname)s] %(message)s')
    stream_handler = logging.StreamHandler()
    stream_handler.setFormatter(formatter)

    logger = logging.getLogger('kq.worker')
    logger.setLevel(logging.DEBUG)
    logger.addHandler(stream_handler)

    # Set up a Kafka consumer.
    print('Attempting to connect to kafka @: ', os.environ['bootstrap_server'])
    consumer = KafkaConsumer(
        bootstrap_servers = os.environ['bootstrap_server'],
        auto_offset_reset  = 'latest',
        group_id  = f"biddocs-{args.group}"
    )

    print("Group is: ", args.group) 

    worker = Worker(topic = 'bid_docs', consumer = consumer)
    worker.start()

The output is just a single line then it seems to die:

[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=bid_docs, group=biddocs-1) ...

The Kafka logs don't show even an attempt to connect. When I find a solution to this problem, I'll post an update. Any help appreciated!

Contributing preparation with py.test

I wanted to contribute some code.
py.test --cov=kq --cov-report=html did not work for me. Is there something I missed? Command not found. It's my first try. All previous steps described worked fine.

kafka.errors.CommitFailedError: / max_poll_records

I have a vanilla Kafka setup in Docker, and a really basic worker connected to it. The problem I find is after 6-8 message are processed, this message pops up in the logs:

[INFO] Job 5874a4627987449e85d6e974ac4b7a7f returned: None
Traceback (most recent call last):
  File "worker.py", line 39, in <module>
    worker.start()
  File "/opt/conda/envs/bidinterpreter/lib/python3.8/site-packages/kq/worker.py", line 260, in start
    self._consumer.commit()
  File "/opt/conda/envs/bidinterpreter/lib/python3.8/site-packages/kafka/consumer/group.py", line 526, in commit
    self._coordinator.commit_offsets_sync(offsets)
  File "/opt/conda/envs/bidinterpreter/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 518, in commit_offsets_sync
    raise future.exception # pylint: disable-msg=raising-bad-type
kafka.errors.CommitFailedError: CommitFailedError: Commit cannot be completed since the group has already
            rebalanced and assigned the partitions to another member.
            This means that the time between subsequent calls to poll()
            was longer than the configured max_poll_interval_ms, which
            typically implies that the poll loop is spending too much
            time message processing. You can address this either by
            increasing the rebalance timeout with max_poll_interval_ms,
            or by reducing the maximum size of batches returned in poll()
            with max_poll_records.

I've played with max_poll_interval_ms and max_poll_records but it seems like no matter how I set these, the problem always comes back. Any help appreciated!

Job processing not happening.

worker.py

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

# Set up a Kafka consumer.
consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    group_id='group',
    auto_offset_reset='latest'
)

# Set up a worker.
worker = Worker(topic='kq_topic', consumer=consumer)
worker.start()

producer.py

import requests

from kafka import KafkaProducer
from kq import Queue, Job

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='kq_topic', producer=producer)

def add(a, b):
  return a + b

# Enqueue a function call.
job = Job(func=add, args=[10,20], timeout=5)
queue.enqueue(job)  # timeout is still 5

when I run worker.py it hangs on

[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=kq_topic, group=group) 

when I run producer.py nothing happens.
Can you please help to get started.

Thanks.

Empty start method params will stop worker running

Hi, and thank you for your work. I found a problem running the documented example.
I am using Python 3.8.10 (default, Nov 26 2021, 20:14:08) [GCC 9.3.0] on linux (Ubuntu 20.04 LTS)

When starting the Worker example, it immediately exits without fetching the messages, leaving no error message. If I enter any positive integer to the start() method, then it will work and continue as described in the docs until reaching the given number.

I isolated the line at:

kq/kq/worker.py

Line 250 in 0e07957

while max_messages is not None and messages_processed < max_messages:

I added a logging command to your start-method, to print what the value may be and it isn't None but -1. Of course, the while-loop will exit. How to solve that?

The problem looks like a python3 related one. It's not possible to compare NoneType to anything else than None. Only that will return "True". In regard to the while-loop this means, that it will always be false and so the loop won't be initialized.

Quick and Dirty - Hack:


  if max_messages is not None:
            while messages_processed < max_messages:
                self._logger.info("max_messages in loop "+ str( max_messages) + " ...".format(self))
                record = next(self._consumer)

                message = Message(
                    topic=record.topic,
                    partition=record.partition,
                    offset=record.offset,
                    key=record.key,
                    value=record.value,
                )
                self._process_message(message)

                if commit_offsets:
                    self._consumer.commit()

                messages_processed += 1
            return messages_processed
        else:
            while True:
                record = next(self._consumer)

                message = Message(
                    topic=record.topic,
                    partition=record.partition,
                    offset=record.offset,
                    key=record.key,
                    value=record.value,
                )
                self._process_message(message)

                if commit_offsets:
                    self._consumer.commit()

Using KQ with Flask

System
OS: macOS Catalina 10.15.6
Kafka : stable 2.6.0
zookeeper: stable 3.6.1

I am trying to using kq with my flask application and below is the code for the same.
When the application starts the worker also starts with a topic provided.
And when I hit an api /status it should execute a function add which is not happening here and print("add executed") is never called.

Also kafka-console-consumer --bootstrap-server localhost:9092 --topic my_topic gives some gibberish output when I hit the api

�cdill._dill
_create_namedtuple
qXJobq(XidqX    timestampqXtopicqXfuncqXargsqXkwargsqXtimeoutXkeyq     Xpartitionq
tq
  Xkq.jobq
�Ctmy_topicqcmain40f88fcc519f99e3d1d9q�
add
q]q(K
KKe}qKNNtq�q.

main.py

import os
from worker_app import start_worker
from queue import enqueue
from flask import Flask

def add(a, b, c):
    print("add executed")
    return a + b + c

app = Flask(__name__)

@app.route("/add")
def hello():
  enqueue("my_topic", add, [10, 20, 30])
  return "success"

if __name__ == '__main__':
    start_worker("my_topic")
    app.run(debug=True, use_reloader=False)

worker_app.py

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

def start_worker(topic):
  # Set up a Kafka consumer.
  consumer = KafkaConsumer(
      bootstrap_servers='127.0.0.1:9092',
      group_id='my_group',
      auto_offset_reset='latest'
  )

  # Set up a worker.
  worker = Worker(topic=topic, consumer=consumer)
  worker.start()

queue.py

from kafka import KafkaProducer
from kq import Queue, Job

def enqueue(topic, func, args):

  # Set up a Kafka producer.
  producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

  # Set up a queue.
  queue = Queue(topic=topic, producer=producer)

  # Enqueue a function call.
  job = Job(func=func, args=args, timeout=5)
  queue.enqueue(job)  # timeout is still 5

Messages lost on offset out of range

If an offset is lost completely due to an out of range error, KafkaConsumer has an attribute auto_offset_reset which controls what to do in that scenario. By default this is set to latest. This would mean that all enqueued tasks (both processed and unprocessed) before that point would be lost.

Usually this should not be a problem, but while testing a completely new topic, I started the producer but I did not start a consumer. After a couple of tasks were queued, the consumer was started. On start, the consumer remained idle and did not process the tasks already queued. But I could see the tasks queued in kafka through kafkacat.

Once I queued a task again, while consumer was running, the consumer started processing but only processed the task just queued. Thereafter, tasks enqueued while the consumer was down were not lost and 'normal' functioning resumed.

Would it make sense to set auto_offset_reset to earliest? There are obvious drawbacks. The queue length might be too long for offset to be reseted to earliest. The tasks written by the developer might not be properly idempotent. It might be preferable to lose a few tasks in case of offset errors (I dont know any other cases where they might appear but there might be).

In either case I wanted to document here that worker must be started before producer for any new deployment (or any new topic) for the offset to come in sync. Otherwise tasks queued before the worker boots up will not be processed, if not lost.

Job processing is not happening.

worker.py

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

# Set up a Kafka consumer.
consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    group_id='group',
    auto_offset_reset='latest'
)

# Set up a worker.
worker = Worker(topic='kq_topic', consumer=consumer)
worker.start()

producer.py

import requests

from kafka import KafkaProducer
from kq import Queue, Job

# Set up a Kafka producer.
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')

# Set up a queue.
queue = Queue(topic='kq_topic', producer=producer)

def add(a, b):
  return a + b

# Enqueue a function call.
job = Job(func=add, args=[10,20], timeout=5)
queue.enqueue(job)  # timeout is still 5

when I run worker.py it hangs on

[INFO] Starting Worker(hosts=127.0.0.1:9092, topic=kq_topic, group=group) 

when I run producer.py nothing happens.
Can you please help to get started.

Thanks.

Syncrhomising multiple workers with the same queue

Hello,

I am trying to solve the following use case using 2 workers and a job queue

for simplicity let's call our workers 'worker A' and 'worker B'

  • Both workers are passed a string and they store the string in thier local list.
  • When size of list becomes '5' they print out the list to console make and empty the contents of list
  • In a nutshell what they are doing is to collect messages from kafka queue and when they receive 5 messages they are printing out everything to console.
  • After printing the worker sleeps for 5 seconds.

Now my problem is when 'worker A' sleeps after printing out messages to console my expectation is 'worker B' should take over which is not happening

queue.enqueue( custom_func, '1')
queue.enqueue( custom_func, '2')
queue.enqueue( custom_func, '3')
queue.enqueue( custom_func, '4')
queue.enqueue( custom_func, '5') # after exectution of this step worker 'a' prints and then sleeps
queue.enqueue( custom_func, '6')
queue.enqueue( custom_func, '7')
queue.enqueue( custom_func, '8')
queue.enqueue( custom_func, '9')
queue.enqueue( custom_func, '10')

I see 'worker a' waking up after 5 seconds and processing rest of stuff

My setup:

My workers are executing the following code

import logging

from kafka import KafkaConsumer
from kq import Worker

# Set up logging.
formatter = logging.Formatter('[%(levelname)s] %(message)s')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger('kq.worker')
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)

# Set up a Kafka consumer.
consumer = KafkaConsumer(
    bootstrap_servers='127.0.0.1:9092',
    group_id='group',
    auto_offset_reset='latest'
)

# Set up a worker.
worker = Worker(topic='topic', consumer=consumer)
worker.start()

I have two terminals running two processes of which I assume will play the role of 'worker a' and 'worker b'

production ready

Is this thing well tested and production ready?

is anyone using this in production?

Question: Parallel working on Jobs

I have the following use case:
Read from a arbitrary Kafka Topic and enqueue it into your queue. The requested callback parameter of Queue.enqueue method will invoke a separate class, which creates a web service call. The web service will return eventually and I'll have a result. The time needed may be between 1-10 s.

Does kq do threaded work automatically, to increase the frequence of web service calls? Or do I have to do something additional?

Retry Logic

Hi,

Nice project, I just came here from the hackernews discussion and noticed this comment you made.

"I've also thought about reserving a topic + consumer group specifically for failed jobs and bake the retry logic into KQ itself. But that's an area I must explore more."

At my work, we have made a less sophisticated version of kq -- and this retry logic has been baked in by us, so I thought I would describe it, as it works out quite well for us.

  1. Message is consumed.

  2. If exception happens send it to a retry topic named like this {original-topic-name}.{consumer-group}.retry we have the consumer group in the retry topic name because there are usually multiple consumers consuming off the same topic. Also add a retry_count into the message.

  3. At an interval defined by the operator, read all the messages back off the retry topic, if they succeed, that's great, if they fail send it back to the retry topic but increment the retry_count by one (I should note, all of our messages are JSON encoded)

  4. Keep reading messages, but if a message has over N retry_counts, instead of sending it back to the retry topic, we send it to a dead letter topic {original-topic-name}.{consumer-group}.dlt which usually gets picked up by an operator.

This was super simple to implement, and has worked out great for us so far (we originally came up with a much more complicated solution, but it sucked)

Thanks,

Ben

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.