Coder Social home page Coder Social logo

mateusjunges / laravel-kafka Goto Github PK

View Code? Open in Web Editor NEW
506.0 8.0 72.0 1.02 MB

Use Kafka Producers and Consumers in your laravel app with ease!

Home Page: https://junges.dev/documentation/laravel-kafka

License: MIT License

PHP 100.00%
kafka php message laravel

laravel-kafka's Introduction

Laravel Kafka

art/laravel-kafka.png

Latest Version On Packagist Total Downloads MIT Licensed Continuous Integration Check & fix styling PHP Version Require

Do you use Kafka in your laravel projects? All packages I've seen until today, including some built by myself, does not provide a nice usage syntax or, if it does, the test process with these packages are very painful.

This package provides a nice way of producing and consuming kafka messages in your Laravel projects.

Sponsor my work!

If you think this package helped you in any way, you can sponsor me on GitHub!

Sponsor Me

Documentation

You can find the documentations for this package here

Testing

Run composer test to test this package.

Contributing

Thank you for considering contributing for the Laravel Kafka package! The contribution guide can be found here.

Credits

License

The Laravel Kafka package is open-sourced software licenced under the MIT License. Please see the License File for more information.

laravel-kafka's People

Contributors

bbonnet22 avatar behzadev avatar cragonnyunt avatar cvairlis avatar ebrahimradi avatar elnadrion avatar gajosu avatar isaacdarcilla avatar lentex avatar lukecurtis93 avatar mateusjunges avatar mihaileu avatar mosharaf13 avatar nmfzone avatar panaiteandreisilviu avatar remarkusable avatar rtuin avatar sergkeim avatar shanginn avatar slushpuppy avatar smortexa avatar stounhandj avatar tavsec avatar thetomcake avatar vitorhugoro1 avatar vsvp21 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

laravel-kafka's Issues

[QUESTION] Artisan Consume Command

Thank you for creating this, until now the Laravel ecosystem was lacking a solid Kafka package.

I was curious how the published config file was actually used within the library.

I see there is a Kafka Consume command that is not mentioned in the documentation. The way it is written now, this command does not know how to handle the messages it is consuming.

How do we pass a message handler into this command?

Seems like we can add a config item for handler and then update the command in this library to read from this map and call withHandler.

<?php

'handler' => \My\Namespace\MyNewHandler::class

Look forward to hearing your thoughts, and happy to submit a PR.

[BUG] CONFWARN on consumer with auto.offset.reset

Describe the bug
CONFWARN occurs when starting $consumer->consume();

To Reproduce
Steps to reproduce the behavior:

  1. Start consumer with option auto.offset.reset
 $consumer = Kafka::createConsumer(['topic', 'topic2'], 'group1', $broker_addr)
            ->withOptions([
                'auto.offset.reset'                     => 'earliest'
            ])
            ->withHandler(function (ConsumedMessage $message) { })
            ->build();
 $consumer->consume();
  1. We get the warning "Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance". But we did not launch a producer, only a consumer
  2. We get the warning "Configuration property compression.codec is a producer property and will be ignored by this consumer instance". But we did not specify the option compression.codec

Why is the option compression.codec passed to getConsumerOptions , which is relevant only for the producer

'compression.codec' => config('kafka.compression', 'snappy'),

Why is a producer created when calling consume()?

$this->producer = app(KafkaProducer::class, [

Expected behavior
The consumer will start without errors or warnings.

Additional context
Logs:

%4|1645017413.959|CONFWARN|rdkafka#consumer-1| [thrd:app]: Configuration property compression.codec is a producer property and will be ignored by this consumer instance
%4|1645017413.963|CONFWARN|rdkafka#producer-2| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance

[QUESTION] Problem in producer

I need to producer message in Kafka, but I need to get SASL OAuth and I need to pass a Certificate.

I tried to use but always returns error.

SSL handshake failed: error:1416F086:SSL routines:tls_process_server_certificate:certificate verify failed: broker certificate could not be verified, verify that ssl.ca.location is correctly configured or root CA certificates are installed (brew install openssl) (after 28ms in state SSL_HANDSHAKE, 1 identical error(s) suppressed)
Sent messages may not be completed yet.12%

Could you help me?

[QUESTION] Is possible to publish and consume messages in Aws MSK with SASL enabled?

Hi, i'm trying to publish a message to an AWS MSK broker using SASL authentication with SCRAM mechanism, but i'm getting the message "Sent messages may not be completed yet." as response. Below you can see my code:

      $message = new Message(
            body: $request->get('message'),
        );

        $sasl = new Sasl(
            username: 'myuser',
            password: 'mypass',
            mechanisms: 'SCRAM-SHA-256'
        );

        try {
            Kafka::publishOn('test')
                ->withSasl($sasl)
                ->withMessage($message)
                ->send();
        } catch (CouldNotPublishMessage $exception) {
            return back()->with('error', $exception->getMessage());
        }

Am I missing something or there is some new feature needed to publish in AWS MSK?

[QUESTION] Compatible with lumen 8.+?

Hi,

is the package laravel/kafka also compatible with lumen 8.+? If so, what adjustments do I have to make so that the package also runs under lumen?

Best regards
3xc3ption

[BUG] Facade custom options not being passed to Producer object

Im trying to instantiate a producer with a remote kafka broker (confluent), but the custom options that im setting on the Kafka facade are not going to the final Producer class.

$producer = Kafka::publishOn('[REMOTE_ADDRESS]', 'cleaners')
        ->withConfigOptions([
            'bootstrap.servers' => '[REMOTE_ADDRESS]',
            'metadata.broker.list' => '[REMOTE_ADDRESS]',
            'security.protocol' => 'SASL_SSL',
            'sasl.mechanisms' => 'PLAIN',
            'sasl.username' => '[API_KEY]',
            'sasl.password' => '[API_KEY]',
        ])
        ->withBodyKey("test", "test")
        ->send();

This throws an exception because these configOptions are not being passed to the Conf object from rdkafka. Probably because of this line:

Junges\Kafka\Producers\Producer.php - Line 22

        $this->producer = app(KafkaProducer::class, [
            'conf' => $this->setConf($this->config->getProducerOptions()),
        ]);

Which limits the config passed to KafkaProducer::class to be only the ones returned from getProducerOptions().

Am i doing something wrong?
It works when i try to use php-rdkafka with the same configs.

[QUESTION] Broker is defined in many places rather than using the config by default

I noticed that both of the core methods on the facade publishOn and createConsumer both have a required parameter of broker/brokers. However the config file has a brokers key but the only place this is used is in the KafkaConsumerCommand.

This doesn't seem very laravely. I would suggest that removing that broker/brokers parameter requirement and using the brokers defined in the config would make the library easier to use.

Obviously specific brokers/broker can be defined by using a setBroker(s)/withBroker(s) method on the class or by an optional constructor parameter.

Please can you confirm if there is reasoning behind this design decision. Happy to assist if possible.

[QUESTION] Offset Commit - Clarification

I'm just getting started with this package. I've gone through the documentation and parts of the code surrounding consuming a message and using committers. From other research into librdkafka, I know that under the covers the offset committal process is controlled by a background thread so there is a chance that an offset could be committed to kafka before the message was successfully consumed. i.e. Kafka could think a message got received and is told to move on when really the message failed so you lose out on 'At Least Once' processing. (librdkafka doc reference)

My question is around the offset commit functionality in your package. When I was looking through the default DefaultCommitterFactory and the consumer code, it looked like by default the consumer will wait until the message successfully processes before trying a commit. If it fails processing, it will try to commit to the dead letter queue where as if it was successful it will commit the offset AFTER PROCESSING to the original topic. Is my understanding here correct?

If not, can you explain more what is happening when commitMessage runs or when you set up a topic using ->withAutoCommit()?

[QUESTION]Failed to create thread: Operation not permitted (1)

[2022-03-10 11:27:29] server.ERROR: Failed to create thread: Operation not permitted (1) {"request-id":"request-62297021531bd","user":"","exception":"[object] (RdKafka\Exception(code: 0): Failed to create thread: Operation not permitted (1) at /app/server/vendor/laravel/framework/src/Illuminate/Container/Container.php:917)
[2022-03-10 11:27:29] server.ERROR: exception happened {"request-id":"request-62297021531bd","user":"","message":"Failed to create thread: Operation not permitted (1)","code":0,"line":917}

[BUG] - Consumer not getting messages with php rdkafka extension version 6.0.0

There has been a new release of rdkafka on 2022-01-07 and since then consumer can't process messages, producing the following error:

"throwable":{
    "class":"Error",
    "message":"Typed property RdKafka\\Message::$headers must not be accessed before initialization",
    "code":0,
    "file":"/var/www/html/vendor/mateusjunges/laravel-kafka/src/Consumers/Consumer.php:282"
}

For everyone having the same error, you should install rdkafka-5.0.2.

[QUESTION] Consumer not Running in Laravel

<?php

use Illuminate\Support\Facades\Route;
use Junges\Kafka\Facades\Kafka;

Route::get('/', function () {
    $consumer = Kafka::createConsumer('0',['warning'])->withConsumerGroupId('foo')->build();
    $consumer->consume();
});
?>

what to add to get consumed message in this code?

[QUESTION] Problem accessing protected properties of Consumed Message

Hi Mateus, thanks for your work.
I successfully installed your package and I'm able to produce and send a message in this way:

$message = new Message(
    'topicname',
    0,
    ['header-key' => 'header-value'],
    ['key' => 'value']
);

$producer =  Kafka::publishOn('....:9092', 'topicname')->withMessage($message);

$producer->send();

I successfully consume the message but I cannot access the properties because they are protected in this way:

$consumer = Kafka::createConsumer('....:9092')
    ->withAutoCommit()
    ->withConsumerGroupId('group')->subscribe('topicname')
    ->withHandler(function(\Junges\Kafka\Contracts\KafkaConsumerMessage $message) {
        // Handle your message here
        print_r($message);
    })->build();
        
$consumer->consume();

What am I missing?

Thanks in advance

P.S. This is an example of the message I consume:

Object
(
    [topicName:protected] => topicname
    [partition:protected] => 0
    [headers:protected] => Array
        (
            [header-key] => header-value
        )

    [body:protected] => Array
        (
            [key] => value
        )

    [key:protected] => 
    [offset:protected] => 164
    [timestamp:protected] => 1635866426370
)

[QUESTION] Batch messages consumer

Hi, I couldn't figure out what was going on and therefore I want to ask directly. Thank you in advance for the answer.

  1. I decided to test a new function with the return of event packets, but for some reason the delay between them does not work.
    As you can see, 10 seconds are set, and as a result, there is a delay of 1 to 4 seconds
$consumer = Kafka::createConsumer(['topic'], 'check-consumer)
            ->enableBatching()
            ->withBatchSizeLimit(2)
            ->withBatchReleaseInterval(10000)
            ->withHandler(function (Collection $collection) {
                $this->info(Carbon::now()->second." ".$collection->count());
            })
            ->build();
10 2
10 2
10 2
10 2
11 2
11 2
11 2
11 2
15 1
  1. The second question. If I do this, I expect that the received message will be considered unsuccessful and there will be no indentation in kafka so that it can be re-processed in the future, but as a result this does not happen.
$consumer = Kafka::createConsumer(['topic'], 'check-consumer', config('trafficMetrics.kafka_brokers'))
            ->enableBatching()
            ->withBatchSizeLimit(2)
            ->withBatchReleaseInterval(10000)
            ->withHandler(function (Collection $collection) {
                throw new Exception();
            })
            ->build();

Looking at ConsumerJunges\Kafka\Consumers\Consumer::HandleException, you can see that false should return for RdKafka\Message\Committer::commitMessage.

private function handleException(Throwable $exception, Message|KafkaConsumerMessage $message): bool
    {
        try {
            $this->config->getConsumer()->failed(
                $message->payload,
                $this->config->getTopics()[0],
                $exception
            );

            return true;
        } catch (Throwable $throwable) {
            if ($exception !== $throwable) {
                $this->logger->error($message, $throwable, 'HANDLER_EXCEPTION');
            }

            return false;
        }
    }

Maybe I don't understand the indentation work correctly, then I'll listen to how it really works.

[BUG] incorrect configuration setting in KafkaConsumerCommand

Describe the bug
The KafkaConsumerCommand class uses the value kafka.group_id, although it is defined as kafka.consumer_group_id in the configuration file kafka.php.

How to fix
Need to replace this part of the code in KafkaConsumerCommand

'groupId' => config('kafka.group_id'),

to this line

'groupId' => config('kafka.consumer_group_id'),

[QUESTION] Messages are not been sent

Hello there,

I am using a docker-compose file to have a local Kafka container.

docker-compose:

version: "2"

services:
  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - "2181:2181"
    volumes:
      - "zookeeper_data:/bitnami"
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: bitnami/kafka:latest
    ports:
      - "9092:9092"
    volumes:
      - "kafka_data:/bitnami"
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    ports:
      - "9000:9000"
    environment:
      - KAFKA_BROKERCONNECT=kafka:9092
    depends_on:
      - kafka

volumes:
  zookeeper_data:
    driver: local
  kafka_data:
    driver: local

On my Laravel project I set in the .env the KAFKA_BROKERS as following:

KAFKA_BROKERS="localhost:9092"

I did go to Kafdrop and opened a new topic:
http://localhost:9000
topic = test

This is the code in my controller to do some tests:

 $message = new Message(
            body: 'This is a new message'
        );

        Kafka::publishOn('test', 'localhost:9092')
            ->withMessage($message)
            ->withDebugEnabled()
            ->send();

It seems that I always get an Exception with the following error:

Sent messages may not be completed yet.

There arent any messages on the topic.

Am I doing anything wrong?

Thank you in advance

[QUESTION] Getting "Unsupported value "snappy" for configuration property "compression.codec" while creating a consumer

Hi @mateusjunges , i have the same error as #105 but when creating a consumer :

$builder = Kafka::createConsumer($this->topics, $this->group, $this->host.':'.$this->port);
$builder->withHandler(function(KafkaConsumerMessage $message) {
	// Handle your message here
	echo $message->getBody();
	echo $message->getTopicName();
})
->stopAfterLastMessage()
->withMaxMessages(100);

$consumer = $builder->build();

$consumer->consume();

I'm using the following versions (on windows 11) :

laravel-kafka : 1.8.1
rdKafka : 6.0.1
librdkafka version (runtime) : 1.6.2
librdkafka version (build) :| 1.6.2.255

I tried to change the KAFKA_COMPRESSION_TYPE option to "none" or "gzip" without any success.

Note that by following the high level consumer exemple from rdkafka i am receiving message just fine.

Any ideas what am i missing ?

Thanks

When create a producer

Unsupported value "snappy" for configuration property "compression.codec": snappy not enabled at build time

[QUESTION]How to cosume a message?

@mateusjunges Here is my producer code and this returning success but i am not able to consume the message to consumer end

 $message = new Message(
    headers: ['apiKey' => '12345'],
    body: ['foo' => 'bar'],
);     

$producer = Kafka::publishOn('test-topic','localhost:9092')->withMessage($message)->send();

Here is my consumer code but it is not working:

class TestController extends Controller
{
    function test_consumer () {
        test();
    }
}
class Handler
 {
     public function __invoke(Message $message){
       // Handle your message here
    }
 }
 function test() {
     $consumer = Kafka::createConsumer()->subscribe(['test-topic'])
    ->withHandler(Handler::class)
    ->build();

    $consumer->consume();
    
    return $consumer;
}

[QUESTION] what we are missing in consumer code?

HI @mateusjunges we are trying to consume messages from confluent cloud's kafka cluster but we are unable to consume. we are using same credentials to publish the messges which is working fine.

This is our consumer code. please let us know what we are missing here.

`<?php

namespace App\Console\Commands;

use App\Handlers\TestHandler;
use Illuminate\Console\Command;
use Junges\Kafka\Facades\Kafka;

class TestTopicConsumer extends Command
{
protected $signature = 'kafka:test-consume';

protected $description = 'Command description';

public function handle(): void
{
    $consumer = Kafka::createConsumer(['test_topic'], 'test-consumer', 'broker')
        ->subscribe('test_topic')
        ->withHandler(new TestHandler)
        ->withAutoCommit()
        ->withConsumerGroupId('test-consumer')
        ->withSasl(new \Junges\Kafka\Config\Sasl(
            password: 'kafka-api-secret',
            username: 'kafka-api-key',
            mechanisms: 'PLAIN',
            securityProtocol: 'SASL_SSL',
        ))
        ->build();

    $consumer->consume();
}

}`

[BUG] Mockery\Exception while creating producer

Hello,

I have created producer using below code and i am getting "Mockery\Exception".

I am using Laravel 8 and my OS is Windows 10.

Below is the code of creating producer.

public function store(Request $request)
{
    $producer = Kafka::publishOn('dev-signup')
            ->withKafkaKey('1qaz2wsx')
            ->withHeaders(['test' => 'test'])
            ->withBodyKey('type',$request->type);

    $producer->send();
        
    return response()->json(['Details published successfully !!']);
}

image

[QUESTION]How to save consumer message in database?

@mateusjunges Hi
i am trying to save data in database but not save in database.

<?php

namespace App\Handlers;

use Illuminate\Support\Facades\Log;
use Junges\Kafka\Contracts\KafkaConsumerMessage;
use App\Http\Controllers\TermsController;
use App\Model\Aquib;
use DB;

class TestHandler
{
    public function __invoke(KafkaConsumerMessage $message)
    {
        $values = array('ab' => 1);
       DB::table('test')->insert($values);

        
        $var = $message->getBody();
        Log::debug('Message received!', [
            'body' => $var,
            'headers' => $message->getHeaders(),
            'partition' => $message->getPartition(),
            'key' => $message->getKey(),
            'topic' => $message->getTopicName()
        ]);
        
        
    }
}

[QUESTION] - Custom Offset storage

Quick question, is it possible to save the offset by consumer in, for example, Redis or DB?
And, when the broker shuts down and comes back up, is it possible to tell the consumer to start consuming messages from that specific offset?

Let's say I have a laravel Artisan command that builds the following consumer :

public function handle()
{
    $topics = [
        'fake-topic-1',
        'fake-topic-2',
        'fake-topic-3'
    ];

    $cachedRegistry = new CachedRegistry(
        new BlockingRegistry(
            new PromisingRegistry(
                new Client(['base_uri' => 'https://fake-schema-registry.com'])
            )
        ),
        new AvroObjectCacheAdapter()
    );        

    $registry = new \Junges\Kafka\Message\Registry\AvroSchemaRegistry($cachedRegistry);
    $recordSerializer = new RecordSerializer($cachedRegistry);

    foreach ($topics as $topic) 
    {
        $registry->addKeySchemaMappingForTopic(
            $topic,
            new \Junges\Kafka\Message\KafkaAvroSchema($topic . '-key')
        );
        $registry->addBodySchemaMappingForTopic(
            $topic,
            new \Junges\Kafka\Message\KafkaAvroSchema($topic . '-value')
        );
    }

    $deserializer = new \Junges\Kafka\Message\Deserializers\AvroDeserializer($registry, $recordSerializer);

    $consumer = \Junges\Kafka\Facades\Kafka::createConsumer(
        $topics, 'fake-test-group', 'fake-broker.com:9999')
    ->withOptions([
        'security.protocol' => 'SSL',
        'ssl.ca.location' => storage_path() . '/client.keystore.crt',
        'ssl.keystore.location' => storage_path() . '/client.keystore.p12',
        'ssl.keystore.password' => 'fakePassword',
        'ssl.key.password' => 'fakePassword',
    ])
    ->withAutoCommit()
    ->usingDeserializer($deserializer)
    ->withHandler(function(\Junges\Kafka\Contracts\KafkaConsumerMessage $message) {

        KafkaMessagesJob::dispatch($message)->onQueue('kafka_messages_queue');

    }) 
    ->build();    
    
    $consumer->consume();
}

My problem now is that, from time to time, the "fake-broker.com:9999" shuts down and when it comes up again, it misses a few messages...

  • offset_reset is set to latest ;
  • The option auto.commit.interval.ms is not set on the ->withOptions() method, so it is using the default value (5 seconds, I believe) ;
  • auto_commit is set to true and the consumer is built with the option ->withAutoCommit() as well ;

[FEATURE REQUEST] support for php 7.4

Is your feature request related to a problem? Please describe.
run application on env with php 7.4

Describe the solution you'd like
support for php 7.4

Describe alternatives you've considered
run transpiler on your code to adjust it to php 7.4

Additional context
you can do this easily with rector/rector package
i forked your repo you can take a look (my-forked-repo) at my config for the rector compiler
i got only one failed test , something with one of the option so not so bad

please consider this

[QUESTION] Example Project

Hello, i wonder if someone can share their example project on how to use this project..

i tried to install on my own, but always got so many error, inside this project varible or else..

[QUESTION] Topic required on KafkaProducerMessage duplicated in CanProduceMessages

Hi Guys,

From the documentation here I constructed the following code:

use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;

$topic = 'testtopic';
$headers = ['example' => 'header'];
$body = ['example' => 'body'];

Kafka::publishOn(config('kafka.brokers'), $topic)
    ->withMessage(new Message(headers: $headers, body: $body))
    ->send();

This errors with TypeError: Junges/Kafka/AbstractMessage::__construct(): Argument #1 ($topicName) not passed as $topicName is a required constructor parameter on \Junges\Kafka\AbstractMessage

I can just add the topicName constructor parameter but it seems a little weird given that the topic is defined in the publishOn method in the previous line. Looking at the implementation of the \Junges\Kafka\Contracts\CanProduceMessages it does not seem to actually use the topicName on the Message interface (for the producers anyway).

Can you clarify:

  1. If I am misunderstanding this library entirely
  2. If the documentation is incorrect
  3. What should be happening here.

Happy to assist with PRs but wanted to ensure I understood the situation.

[BUG] Sasl not working due to error in code

Describe the bug

Can't send message if listener uses SSL. Log message contains hint "connecting to a SSL listener?":

%7|1645035197.078|FAIL|rdkafka#producer-1| [thrd:msk-as01kafka-t:9093/bootstrap]: msk-as01kafka-t:9093/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 70ms in state APIVERSION_QUERY) (_TRANSPORT)

To Reproduce
Steps to reproduce the behavior:

  1. Run PHPUnit test with SSL Kafka listener:
<?php

namespace Tests\Feature\Kafka;

use Junges\Kafka\Config\Sasl;
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Message\Message;
use Tests\TestCase;

class KafkaProtocolTest extends TestCase
{
    public function testKafkaProtocol(): void
    {
        $topic = env('TOPIC_NAME', 'ORDERS');
        $user = env('KAFKA_USER');
        $password = env('KAFKA_PASSWORD');

        $message = new Message(
            topicName: $topic,
            body: ['key' => 'value'],
        );

        $sasl = new Sasl(
            username: $user,
            password: $password,
            mechanisms: 'PLAIN',
            securityProtocol: 'SASL_SSL'
        );

        $producer = Kafka::publishOn($topic)
            ->withSasl($sasl)
            ->withMessage($message);
        $producer->withDebugEnabled(true);

        // return false
        $this->assertTrue($producer->send());
    }
}

This test fails because Sals authentication failed.

Expected behavior
This test passed.

How to fix it

Patch ProducerBuilder:

private function build(): Producer
{
    $conf = new Config(
        broker: $this->broker,
        topics: [$this->getTopic()],
        // This line has been added:
        securityProtocol: $this->saslConfig ? $this->saslConfig->getSecurityProtocol() : null,
        sasl: $this->saslConfig,
        customOptions: $this->options,
    );

    return app(Producer::class, [
        'config' => $conf,
        'topic' => $this->topic,
        'serializer' => $this->serializer,
    ]);
}

Desktop (please complete the following information):

  • OS: Debian 11 in Docker
  • mateusjunges/laravel-kafka: v1.6.1
  • laravel/framework: v8.83.1

[QUESTION] Exception No such configuration property: "key"

@mateusjunges can you plz help me on this error is coming when am trying to send message to kafka.

Screenshot (94)

Here is my code:

/** @var \Junges\Kafka\Producers\ProducerBuilder $producer */
 $producer = Kafka::publishOn('test-topic', 'localhost:9092')
        ->withBodyKey('foo', 'bar')
        ->withConfigOptions(['key' => 'value'])
        ->withKafkaKey('your-kafka-key')
        ->withKafkaKey('kafka-key')
        ->withHeaders(['header-key' => 'header-value'])
        ->send(); // Also to disable debug mode

return response($producer);

Installation error ext-rdkafka

Hello, i have problem with ext-rdkafka when install..

i use PHP 8.1

i've installed librdkafka via homebrew, here is my brew info

librdkafka: stable 1.8.2 (bottled), HEAD
Apache Kafka C/C++ library
https://github.com/edenhill/librdkafka
/opt/homebrew/Cellar/librdkafka/1.8.2 (36 files, 4.9MB) *
Poured from bottle on 2022-01-06 at 13:00:56
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/librdkafka.rb
License: BSD-2-Clause
==> Dependencies
Build: pkg-config ✘, [email protected]
Required: lz4 ✔, lzlib ✔, [email protected] ✔, zstd ✔
==> Options
--HEAD
Install HEAD version
==> Analytics
install: 5,617 (30 days), 28,011 (90 days), 101,246 (365 days)
install-on-request: 3,609 (30 days), 19,617 (90 days), 66,506 (365 days)
build-error: 0 (30 days)

and when in run composer require mateusjunges/laravel-kafka, i got this error

[InvalidArgumentException]
Package mateusjunges/laravel-kafka has requirements incompatible with your PHP version, PHP extension
s and Composer version:

  • mateusjunges/laravel-kafka v1.5.3 requires ext-rdkafka ^5.0|^4.0 but it is not present.

can someone help me, how to fix this error?
thx

[QUESTION] How to consume with offset?

hello, i want to know how to consume old massage inside broker?

i need to do this because i want to store data inside broker massage so the data itself will store inside kafka memory. and i need to reconsume that data as many times as possible.

[BUG] Missing SASL Config Due to String Casing

Describe the bug
I was trying to debug why my SASL configs were not being loaded. Everything looked correct, except the usingSasl() function below was still returning false. This was because it checks specifically for uppercase strings, whereas our config was in lowercase.

private function usingSasl(): bool
{
    return $this->securityProtocol === 'SASL_PLAINTEXT' || $this->securityProtocol === 'SASL_SSL';
}

Because Kafka itself seems to understand the lowercase sasl_plaintext values, I suggest we change the function to ignore case by throwing a strtolower in there.

To Reproduce
Steps to reproduce the behavior:

  1. Setup SASL, but use lowercase sasl_plaintext as the security protocol.
  2. See that SASL configs are not loaded via getConsumerOptions()

Expected behavior
I would expect the SASL configs to be loaded properly regardless of the casing of their values.

[BUG] withDlq without specifiying name, raise an error

If we're configuring DLQ with below syntax (from the docs):

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withDlq();

It would raise an error

In ConsumerBuilder.php line 221:
                         
  Undefined array key 0 

Package version 1.7.3

[QUESTION] Long term

Hi,

Can this library read many messages per second to the same consumer for the long term?
I recently used the Supervisor to guarantee long life over consumer connections.

Best,

[QUESTION] Error in Docs + Other Considerations for withHandler()

I believe the docs are showing some incorrect syntax when passing around invokable classes as callables.

Ex: This will bomb because withHandler expects a callable, not a string

$consumer = Kafka::createConsumer()->withHandler(MyMessageHandler::class)

In order to work, this should be:

$consumer = Kafka::createConsumer()->withHandler(new MyMessageHandler)

Suggestions/Thoughts

However the correct syntax is not super useful as designed. For instance, if our handler class requires any dependencies in the constructor. We have to know the dependencies ahead of time and provide them. If we can have the handler work with dependency injection and resolve dependencies automatically from the container that would be great.

Right now the best alternative is to create a closure that calls our processor class. Not a show-stopper but not as nice.

Additional Notes

I would also suggest we make the $broker parameter in createConsumer() signature non-nullable and therefore required. Currently there is no other way to setup the broker config without setting it there, as are missing a withBrokers() setter method.

With that being said, I think the withAutoCommit() method should accept a boolean that defaults to true. This would allow us to just pass config vars that automatically set things up without requiring the logic on our end.

Ex. instead of

$consumer = Kafka::createConsumer(brokers: $config['brokers']);

if ($config['auto_commit']) {
   $consumer->withAutoCommit();
}

We can simply do

$consumer = Kafka::createConsumer(brokers: $config['brokers'])->withAutoCommit($config['auto_commit']);

I feel strongly that other config should be setup the same way. Ex. withSasl should accept nullable-strings

[QUESTION] Sent messages may not be completed yet.

I have this configuration:

lib version
ubuntu os 20.0
PHP Version 8.0.13
rdkafka version v5.0.2V
librdkafka version (runtime) 1.2.1
librdkafka version (build) 1.2.1.255
"mateusjunges/laravel-kafka" ^1.5

and my producer code

 $producer = Kafka::publishOn('topic')
    ->withKafkaKey('your-kafka-key')
    ->withKafkaKey('kafka-key')
    ->withHeaders(['header-key' => 'header-value']);

$producer->send();

get error

Junges\Kafka\Exceptions\CouldNotPublishMessage Sent messages may not be completed yet.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.