silviucpp / erlkaf Goto Github PK
View Code? Open in Web Editor NEWErlang kafka driver based on librdkafka
License: MIT License
Erlang kafka driver based on librdkafka
License: MIT License
Hi, I've recently started testing erlkaf after a year using brod, and I'm very impressed of how it works perfectly well. Congratulations!
Have anyone tried to implement a consumer -> producer (stream processing) job with transactional_id to ensure Exactly once delivery?
How would it be?, Would we need a new type of consumer / producer group?
It would be useful for cleaning up the state of the broker when using erlkaf in tests
that produce to random topics that we are not really interested to be long lived.
For example:
setup_all do
:erlkaf.create_producer(TestProducer, [])
topic = "random_test_topic_#{Ecto.UUID.generate()}"
:ok = :erlkaf.create_topic(TestProducer, topic)
on_exit(fn ->
:erlkaf.delete_topic(TestProducer, topic)
:erlkaf.stop_client(TestProducer)
end)
end
Pardon my Elixir.
Getting this error when I try to start multiple consumers while connecting to Confluent Cloud
06:16:30.023 [error] rdkafka#consumer-1 FAIL [thrd:GroupCoordinator]: GroupCoordinator: b30-pkc-2396y.us-east-1.aws.confluent.cloud:9092: Failed to resolve 'b30-pkc-2396y.us-east-1.aws.confluent.cloud:9092': nodename nor servname provided, or not known (after 19ms in state CONNECT, 4 identical error(s) suppressed) 06:16:30.024 [error] rdkafka#consumer-1 ERROR [thrd:app]: rdkafka#consumer-1: GroupCoordinator: b30-pkc-2396y.us-east-1.aws.confluent.cloud:9092: Failed to resolve 'b30-pkc-2396y.us-east-1.aws.confluent.cloud:9092': nodename nor servname provided, or not known (after 19ms in state CONNECT, 4 identical error(s) suppressed)
Everything is working fine connecting to a local Kafka server (without sasl), and connecting one consumer to Confluent Cloud is working perfectly.
Is there any configuration or something I can do to get it working with more than one consumer?
Thanks in advance.
The default 1000ms seems too long for some of the consumers which are required to have lower latencies. It'll be awesome to have this configurable.
Great lib btw. Thanks.
Hi,
I am asked to add a timestamp to messages when producing a message. I am currently using brod but would like to migrate to erlkaf eventually.
I would like to know if it is supported by erlkaf or if support is planned.
Thank you!
Upon starting erlkaf
in the lldb I sometimes get random Segmentation faluts.
When I run the beam machine with lldb
this is where I get stopped over and over again.
* thread #5, name = '1_scheduler', stop reason = EXC_BAD_ACCESS (code=1, address=0x0)
frame #0: 0x000000010033ad80 beam.debug.smp`erts_dlc_create_lock(dlc=0x000000014383e078, name=0x0000000000000000) at erl_dyn_lock_check.c:170:5
167 erts_aint_t i, n = erts_atomic_read_nob(&n_lock_types);
168 int name_len;
169
-> 170 for (i = 0; name[i]; i++) {
171 if (name[i] == '[')
172 break;
173 }
Target 0: (beam.debug.smp) stopped.
When I do a backtrace:
(lldb) bt
* thread #5, name = '1_scheduler', stop reason = EXC_BAD_ACCESS (code=1, address=0x0)
* frame #0: 0x000000010033ad80 beam.debug.smp`erts_dlc_create_lock(dlc=0x000000014383e078, name=0x0000000000000000) at erl_dyn_lock_check.c:170:5
frame #1: 0x0000000100384549 beam.debug.smp`erl_drv_mutex_create(name=0x0000000000000000) at erl_drv_thread.c:171:5
frame #2: 0x00000001003a6325 beam.debug.smp`enif_mutex_create(name=0x0000000000000000) at erl_nif.c:2131:53
frame #3: 0x00000001484615c7 erlkaf_nif.so`QueueCallbacksDispatcher::QueueCallbacksDispatcher() [inlined] CriticalSection::CriticalSection(this=0x0000000100d18880) at critical_section.h:11:34 [opt]
frame #4: 0x00000001484615c0 erlkaf_nif.so`QueueCallbacksDispatcher::QueueCallbacksDispatcher() [inlined] CriticalSection::CriticalSection(this=0x0000000100d18880) at critical_section.h:11 [opt]
frame #5: 0x00000001484615c0 erlkaf_nif.so`QueueCallbacksDispatcher::QueueCallbacksDispatcher(this=0x0000000100d18880) at queuecallbacksdispatcher.cc:11 [opt]
frame #6: 0x0000000148464407 erlkaf_nif.so`on_nif_load(env=<unavailable>, priv_data=0x000000014383c4e0, load_info=<unavailable>) at erlkaf_nif.cc:56:27 [opt]
frame #7: 0x00000001003acb6f beam.debug.smp`erts_load_nif(c_p=0x00000001438b0268, I=0x000000014981e8b0, filename=0x14364c169 "/Users/y/sportening/we-api-user-account/_build/test/lib/erlkaf/priv/erlkaf_nif", args=0xf 0) at erl_nif.c:4653:16
frame #8: 0x0000000100032056 beam.debug.smp`beam_jit_load_nif(c_p=0x00000001438b0268, I=0x000000014981e8b0, reg=0x0000700008932dc0) at beam_jit_common.c:154:18
I do not understand how this can be correct, passing NULL
to the enif_mutex_create
function:
# critical_section.h
CriticalSection() { mutex_ = enif_mutex_create(NULL);}
Name becomes a NULL pointer (name=0x0000000000000000) and obviously accessing name[i]
fails
erlkaf version: 2.0.8
Hi Silviu,
Thanks for your work.
We are currently using erlkaf in our server. However, I noticed sometimes when partition rebalance is triggered, some partition consumers may get stuck. This happens when a partition consumer is doing heavy work that takes long time.
After dig deeper in erlkaf, I found that when :revoke_partition
is received by the consumer group, it tries to stop all partition consumers. It does so by sending a stop message to each partition consumer and wait for 5s. Now that we have a heavy consumer that is doing a task longer than 5s, it will then be force killed. When this happens, consumer_queue_cleanup
is not run. I suspect it may be related to partition stuck issue.
Not sure if my understanding is correct.
Is it possible to support this feature here? Is there a road map for other features?
Hello there,
I am getting this error while building the NIF on Manjaro Linux
HEAD is now at cff1001 Fixed compile error introduced in previous commit
~/projects/antecedentes_api/deps/erlkaf/deps ~/projects/antecedentes_api/deps/erlkaf
~/projects/antecedentes_api/deps/erlkaf
make[1]: Entering directory '/home/rafa/projects/antecedentes_api/deps/erlkaf/c_src'
CPP erlkaf_producer.cc
CPP erlkaf_config.cc
CPP topicmanager.cc
CPP nif_utils.cc
CPP queuemanager.cc
CPP erlkaf_logger.cc
CPP erlkaf_nif.cc
CPP queuecallbacksdispatcher.cc
CPP erlkaf_consumer.cc
/home/rafa/projects/antecedentes_api/deps/erlkaf/c_src/queuecallbacksdispatcher.cc: In member function ‘void QueueCallbacksDispatcher::watch(rd_kafka_t*, bool)’:
/home/rafa/projects/antecedentes_api/deps/erlkaf/c_src/queuecallbacksdispatcher.cc:47:75: error: ‘stoull’ is not a member of ‘std’; did you mean ‘wcstoull’?
47 | objects_[instance] = item(is_consumer, static_cast<uint64_t>(std::stoull(buffer)/2), now_ms());
| ^~~~~~
| wcstoull
make[1]: *** [nif.mk:78: /home/rafa/projects/antecedentes_api/deps/erlkaf/c_src/queuecallbacksdispatcher.o] Error 1
make[1]: *** Waiting for unfinished jobs....
make[1]: Leaving directory '/home/rafa/projects/antecedentes_api/deps/erlkaf/c_src'
make: *** [Makefile:7: compile_nif] Error 2
===> Hook for compile failed!
After some tests I got it to compile by adding
#include <string>
to https://github.com/silviucpp/erlkaf/blob/master/c_src/queuecallbacksdispatcher.cc#L6
I'm not sending a PR yet cause I don't know if this is the correct solution, maybe you could provide me some feedback @silviucpp ?
Inside of erlkaf_consumer_group
init
there is a call to erlkaf_nif:consumer_new(GroupId, TopicsNames, RdkClientConfig, RdkTopicConfig)
.
This appears to be returning an error {error, badarg}
up when I call erlkaf:create_consumer_group
.
So I printed the args being passed in to consumer_new
:
erlang:display(GroupId),
erlang:display(TopicsNames),
erlang:display(RdkClientConfig),
erlang:display(RdkTopicConfig),
And then from a console (I'm using elixir, so iex
) I try calling consumer_new
with those args (some info redacted):
:erlkaf_nif.consumer_new(
'lifecycle-consumer-local',
[<<"lifecycle_events">>],
[{<<"security.protocol">>,<<"ssl">>},{<<"bootstrap.servers">>,<<"blah-blah-blah.kafka.amazonaws.com:9094">>}],
[{<<"auto.offset.reset">>,<<"latest">>}]
)
And that call works, returning {ok, Ref}
… no badarg
🙃
Any ideas why it would return a badarg once and then work when I call it myself?
When I use hex and I add a dependency to the latest release like this
{:erlkaf, "~> 2.1.0"}
fetching the deps results in the following error
Error evaluating Rebar config script ./rebar.config.script:22: evaluation failed with reason error:{badmatch,{error,enoent}} and stacktrace [{erl_eval,expr,6,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,496}]},{erl_eval,exprs,6,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,136}]},{erl_eval,expr_list,7,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,961}]},{erl_eval,expr,6,[{file,[101,114,108,95,101,118,97,108,46,101,114,108]},{line,472}]},{file,eval_stream2,6,[{file,[102,105,108,101,46,101,114,108]},{line,1504}]},{file,script,2,[{file,[102,105,108,101,46,101,114,108]},{line,1142}]},{'Elixir.File','cd!',2,[{file,[108,105,98,47,102,105,108,101,46,101,120]},{line,1607}]},{'Elixir.Mix.Rebar',eval_script,2,[{file,[108,105,98,47,109,105,120,47,114,101,98,97,114,46,101,120]},{line,191}]}]
Any dependencies defined in the script won't be available unless you add them to your Mix project
This does not happen if we add the dependency directly from the github like this:
{:erlkaf, github: "silviucpp/erlkaf"}
even though no changes happened to rebar config in the master since the last release
I'm getting an error the first time I try to build the nif using the latest version of elixir.
Last log lines are as follows
~/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf/deps ~/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf
~/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf
make[1]: Entering directory '/root/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf/c_src'
CPP erlkaf_consumer.cc
CPP queuemanager.cc
CPP topicmanager.cc
CPP erlkaf_nif.cc
CPP queuecallbacksdispatcher.cc
CPP erlkaf_logger.cc
CPP nif_utils.cc
CPP erlkaf_message.cc
CPP erlkaf_producer.cc
CPP erlkaf_config.cc
LD erlkaf_nif.so
make[1]: Leaving directory '/root/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/deps/erlkaf/c_src'
===> Analyzing applications...
===> Compiling erlkaf
===> Missing artifact priv/erlkaf_nif.so
** (Mix.Error) Could not compile dependency :erlkaf, "/root/.mix/rebar3 bare compile --paths /root/.cache/mix/installs/elixir-1.13.1-erts-12.2/febfcb37852cfa2dd1caf0e08cdda1de/_build/dev/lib/*/ebin" command failed. Errors may have been logged above. You may run Mix.install/2 to try again or change the arguments to Mix.install/2 to try another version
Steps to reproduce
I created a simplified flow using docker, but it is happening on my local installation as well.
Using the following Dockerfile
to get latest elixir version and erlkaf build dependencies
FROM elixir:1.13.1-slim AS app_builder
RUN apt-get update && \
apt-get install -y --no-install-recommends git build-essential libssl-dev libcrypto++-dev zlib1g-dev libsasl2-dev \
liblz4-dev libzstd-dev ca-certificates
Then just build, start the container and test the installation:
docker build -t erlkaf_test .
docker run -it --rm erlkaf_test
iex(1)> Mix.install([{:erlkaf, github: "silviucpp/erlkaf"}])
# there will be a propmt to install rebar
Then the error occurs, after that if you try again on the same image it installs just fine. Maybe it is related to elixir-lang/elixir#11530
I recently had issues while building erlkaf on macOS due to its attempt to install its dependencies through a forced brew install
.
During the process, Homebrew ran some auto-update (brew update --auto-update
) which caused some unrelated packages to break. I had to spend some minutes figuring out what had happened, and how a mix deps.get
could potentially break my Homebrew – and then I could this snippet:
case $1 in
$LIBRDKAFKA_DESTINATION)
case $OS in
Darwin)
brew install [email protected] lz4 zstd curl
OPENSSL_ROOT_DIR=$(brew --prefix [email protected])
export CPPFLAGS=-I$OPENSSL_ROOT_DIR/include/
export LDFLAGS=-L$OPENSSL_ROOT_DIR/lib
;;
esac
# (...)
esac
See: https://github.com/silviucpp/erlkaf/blob/master/build_deps.sh#L67-L72
It's pretty bad to touch system packages during the build of a library since it can potentially affect other packages like the scenario I ran into. So, I would recommend removing this snippet from the build scripts, replacing it with a clear doc section about system requirements (in macOS specifically, suggest Homebrew packages as a recommendation – though people could also use alternative methods like nixpkgs, etc.), and possibly failing the build when necessary with some informative error messages stating that system requirements were not met.
I think we could accomplish this by checking if some binaries and/or environment variables are set. I do similar things in my .zshrc
to configure my shell if/when it's on a Homebrew-powered macOS (eg.: https://github.com/joeljuca/cli/blob/main/config/zsh/zshrc.sh#L70-L77).
if ! which openssl >/dev/null 2>&1; then
echo "You must have OpenSSL installed in your system."
echo "See: https://(...)"
exit 1
fi
if [ "${LDFLAGS}" == "" ]; then
echo "Required config LDFLAGS is not set"
echo "See: https://(...)"
exit 1
fi
# (...)
# if everything looks good, proceed with the build...
PS: I'm not a great shell programmer, but I imagine a couple of if
blocks could do the trick.
/home/admin/data/new_ejjabered/deps/erlkaf/c_src/erlkaf_consumer.cc: In function ‘ERL_NIF_TERM enif_consumer_cleanup(ErlNifEnv*, int, const ERL_NIF_TERM*)’:
/home/admin/data/new_ejjabered/deps/erlkaf/c_src/erlkaf_consumer.cc:456:5: error: this ‘if’ clause does not guard... [-Werror=misleading-indentation]
if(!enif_get_resource(env, argv[0], data->res_consumer, reinterpret_cast<void**>(&consumer)))
^~
/home/admin/data/new_ejjabered/deps/erlkaf/c_src/erlkaf_consumer.cc:459:2: note: ...this statement, but the latter is misleadingly indented as if it is guarded by the ‘if’
consumer->running = false;
^~~~~~~~
CPP erlkaf_nif.cc
undefied
here:
https://github.com/silviucpp/erlkaf/blob/master/src/erlkaf_app.erl#L20
Appears to always cause the other branch to be run.
When we start our application, it get's "stuck" or "freezes" before doing anything that we can observe. It doesn't crash, it doesn't time out, it never seems to start our application code.
Not sure of a better way to describe what's happening. An application (happens to be elixir) that was working on OTP 24 (haven't tried 25, yet) no longer works when we try to start the release on OTP 26.
I've currently got an application built that only uses erlkaf, and does nothing else but try to load. I've gone so far as to try to modify the release's start_clean.script
file to try to find out what is happening, or narrow it down.
Before running mix release
, we can run an iex
console with erlkaf operating correctly. We've compiled with the prod
environment.
We're just trying to narrow it down at this point. If you have any ideas what it could be, or perhaps have a simple erlkaf erlang example we can try (since maybe it's an elixir issue in the release?). Perhaps you've encountered something like this before.
We'll keep digging!
Hi,
I found here that you are comparing erlkaf:produce/4
to brod:produce_sync/5
. But if I understand well, erlkaf does not produces synchronously, but rather return immediately and sends a delivery report back.
Please correct me if that is wrong. If it is correct, is there a special support for blocking until the message is delivered ? Or should I just wait the delivery report?
Thank you.
Hi! 👋
I was poking around trying to see if there is a way to manually commit offsets to the kafka broker, and found this commit function in librdkafka docs - but as far as I can tell, this doesn't seem to be exposed by the erlkaf wrapper?
Is it possible to make this call in user-land, or does this require changes from erlkaf to support?
Cheers 🙌
When using the hex package, version 1.1.7, it fails during init
of erlkaf_logger
because of undef erl_nif:set_log_process
.
I look at the code for the module in the package and it seems fine.
When using the master branch as the dep in rebar3 it works fine, so seems to either be a problem with the package itself or just needs a new version published off current master.
I am looking at using this for something I am working on and I was wondering if there is a way to seek to an offset in a consumer or if it must start from the beginning / current time?
I'm having issues compiling erlkaf
on a macOS Ventura 13.6.1:
$ mix deps.compile erlkaf
=ERROR REPORT==== 28-Feb-2024::17:31:13.767352 ===
beam/beam_load.c(190): Error loading module rebar3_hex_owner:
This BEAM file was compiled for a later version of the runtime system than the current (Erlang/OTP 25).
To fix this, please re-compile this module with an Erlang/OTP 25 compiler.
(Use of opcode 182; this emulator supports only up to 180.)
=ERROR REPORT==== 28-Feb-2024::17:31:13.767370 ===
Loading of $HOME/.cache/rebar3/plugins/rebar3_hex/ebin/rebar3_hex_owner.beam failed: badfile
===> Errors loading plugin {rebar_cmd,"0.2.6"}. Run rebar3 with DEBUG=1 set to see errors.
librdkafka fork already exist. delete _build/deps/librdkafka for a fresh checkout ...
concurrentqueue fork already exist. delete _build/deps/concurrentqueue for a fresh checkout ...
LD erlkaf_nif.so
ld: Undefined symbols:
_SSL_get_peer_certificate, referenced from:
_rd_kafka_transport_ssl_handshake in librdkafka.a[70](rdkafka_ssl.o)
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[1]: *** [_build/dev/lib/erlkaf/priv/erlkaf_nif.so] Error 1
make: *** [compile_nif] Error 2
===> Hook for compile failed!
** (Mix) Could not compile dependency :erlkaf, "$HOME/.asdf/installs/elixir/1.15.4-otp-25/.mix/elixir/1-15/rebar3 bare compile --paths _build/dev/lib/*/ebin" command failed. Errors may have been logged above. You can recompile this dependency with "mix deps.compile erlkaf --force", update it with "mix deps.update erlkaf" or clean it with "mix deps.clean erlkaf"
I'm compiling it through Elixir's Mix. It breaks and I don't have much of a clue on how to tackle this. Any help is highly appreciated.
Versions:
I want to do some retry logic based on the message header. But I noticed that the delivery report doesn't include headers. Is it possible to add them?
Hello,
after upgrading to erlkaf 2.0.0 (commit 1d706c6) we started experiencing consumer stop timeouts on consumer group rebalance.
Our consumer group has only one topic with 36 partitions and originally we have 2 application instances consuming 18 each partitions. When there is some more load we scale horizontally up to 9 applications and consumer group rebalances.
Sometimes, when rebalance occurs consumer timeout on stop happens (erlkaf_consumer.erl:50), gen_server crashes. There wouldn't be any problem but consumer group rebalance continues in a loop and our app never recovers. The consumer just keeps starting and crashing in a loop and stable group rebalance never happens.
This never happened with erlkaf 1.1.9 which we used for quite a while. What is different?
As far as I can tell, pause/resume of consumers and producers is currently not supported by erlkaf. I would like to see if this is at all a possibility. My main use case for this is controlling the fetching of messages in cases where downstream data storage is e.g. offline. I imagine that for consumers it could be set returning a {pause, #state{}}
tuple or some such.
Would it be possible to implement this in erlkaf?
I'm looking at switching from Elsa
to erlkaf
. My use case is to consume entire Kafka topics to populate in-memory caches on application boot. This requires two things from my consumers:
I think I could accomplish this with randomly-generated unique consumer groups, but that feels like a hack, so I'm hoping there's a better way. I didn't see one looking through the code but might've missed it.
It looks like headers aren't exposed but are supported by the current version of librdkafka, correct?
I can take a wack at it if it is simply adding a field to the message record and setting it on decode and using it when calling librdkafka's produce function?
Greetings,
we have problems with erlkaf dependency jsone v1.4.5 on OTP21 in our elixir project. I know that this is pure erlang implementation, but it works great with our elixir projects until elixir 1.7.x which runs on OTP21.
When run mix compile, we're getting error:
mix compile ✔ 10073 19:12:35
===> Compiling jsone
===> Compiling src/jsone.erl failed
src/jsone.erl:297: erlang:get_stacktrace/0: deprecated; use the new try/catch syntax for retrieving the stack backtrace
src/jsone.erl:346: erlang:get_stacktrace/0: deprecated; use the new try/catch syntax for retrieving the stack backtrace
** (Mix) Could not compile dependency :jsone, "/Users/trajakovic/.mix/rebar3 bare compile --paths "/Users/trajakovic/IdeaProjects/scorealarm/pretty-kafka-client/_build/dev/lib/*/ebin"" command failed. You can recompile this dependency with "mix deps.compile jsone", update it with "mix deps.update jsone" or clean it with "mix deps.clean jsone"
Simplest solution to this is use jsone tag 1.4.7, so we're wondering if it's possible to update dependency directly in erlkaf project?
Thank you
Due to the async nature of the libdrkafka API, I understand that the only way to guarantee the producer is sync would be handling the delivery reports and blocking the producer waiting for the callback to return.
My question is, do we have other retry mechanisms, like some kind of auto-retry, or the only way to handle errors is for the user to implement such a callback?
MacOS Monterey 12.6.1
I'm trying to use erlkaf in an Elixir project. When I go to compile however it fails, and there are a number of "interesting" entries in the output pointing at why:
Creating static library librdkafka.a
...
ld: warning: ignoring file /opt/homebrew/opt/[email protected]/lib/libssl.dylib, building for macOS-x86_64 but attempting to link with file built for macOS-arm64
ld: warning: ignoring file /opt/homebrew/opt/[email protected]/lib/libcrypto.dylib, building for macOS-x86_64 but attempting to link with file built for macOS-arm64
ld: warning: ignoring file /opt/homebrew/Cellar/lz4/1.9.4/lib/liblz4.dylib, building for macOS-x86_64 but attempting to link with file built for macOS-arm64
cp librdkafka.a librdkafka-dbg.a
Undefined symbols for architecture x86_64:
...
And then further down:
WARNING: librdkafka-static.a: No static libraries available/enabled for inclusion in self-contained static library librdkafka-static.a: this library will be identical to librdkafka.a
And:
WARNING: librdkafka-static.a: The following libraries were not available as static libraries and need to be linked dynamically: -llz4 -lm -lcurl -lsasl2 -lssl -lcrypto -lz -ldl -lpthread
And finally:
ld: symbol(s) not found for architecture x86_64
cp librdkafka-static.a librdkafka-static-dbg.a
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[2]: *** [librdkafka.1.dylib] Error 1
make[2]: *** Waiting for unfinished jobs....
make[1]: *** [libs] Error 2
error with make
make: *** [get_deps] Error 1
===> Hook for compile failed!
When I look at the configuration that's generated, indeed I see ARCH x86_64
and I've been unable to determine why it's not set to arm64
as expected. A coworker has a similar setup as mine and his config is set correctly.
Some info:
m1-mbp:payment-processor $ brew info librdkafka
==> librdkafka: stable 1.9.2 (bottled), HEAD
Apache Kafka C/C++ library
https://github.com/edenhill/librdkafka
/opt/homebrew/Cellar/librdkafka/1.9.2 (38 files, 7.6MB) *
Poured from bottle on 2023-01-11 at 15:31:59
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/librdkafka.rb
License: BSD-2-Clause
==> Dependencies
Build: pkg-config ✔, [email protected] ✔
Required: lz4 ✔, lzlib ✔, [email protected] ✔, zstd ✔
==> Options
--HEAD
Install HEAD version
m1-mbp:payment-processor $ openssl version
LibreSSL 2.8.3
I admit that I am a bit out of my element here, however I have spent hours digging through GitHub issues, blog posts, StackOverflow, etc. trying to figure this out and I am out of ideas. I am happy to provide any more info that I can that might be helpful. Thank you
Do you have any plans to make a new release? Could I help by starting a PR? We're currently pinned to a SHA on master. I'd love to get to a release artifact.
Thanks!
I am trying to use the library on mac. The esq dependency is failing to compile with
src/esq_queue.erl: undefined parse transform 'category'
If I download esq repo, it does compiles by itself. Can you check please.
These 4 default configurations are injected when not specified by apply_kafka_default_config/1
routine from erlkaf_config.cc
.
bool appy_kafka_default_config(rd_kafka_conf_t* config)
{
if(rd_kafka_conf_set(config, "enable.auto.commit", "true", NULL, 0) != RD_KAFKA_CONF_OK)
return false;
if(rd_kafka_conf_set(config, "enable.auto.offset.store", "false", NULL, 0) != RD_KAFKA_CONF_OK)
return false;
if(rd_kafka_conf_set(config, "enable.partition.eof", "false", NULL, 0) != RD_KAFKA_CONF_OK)
return false;
if(rd_kafka_conf_set(config, "allow.auto.create.topics", "true", NULL, 0) != RD_KAFKA_CONF_OK)
return false;
...
It produces warnings everytime it starts:
CONFWARN [thrd:app]: Configuration property enable.auto.commit is a consumer property and will be ignored by this producer instance
CONFWARN [thrd:app]: Configuration property enable.auto.offset.store is a consumer property and will be ignored by this producer instance
CONFWARN [thrd:app]: Configuration property enable.partition.eof is a consumer property and will be ignored by this producer instance
CONFWARN [thrd:app]: Configuration property allow.auto.create.topics is a consumer property and will be ignored by this producer instance
There isn't a erlkaf:flush
function so I'm wondering if rdkafka or erlkaf internally has some logic to flush during shutdown of the node?
I called the erkaf:produce
function and got {:error, 29}
, which is not very helpful. The signature of the function says that reason is anything, but there is already an internal mapping in erlkaf_private.hrl
, is there no way that erlkaf can attempt to translate the error before returning?
The contract for create_consumer_group
function says:
-spec create_consumer_group(client_id(), binary(), [binary()], [client_option()], [topic_option()]) -> ok | {error, reason()}.
create_consumer_group(ClientId, GroupId, Topics, ClientConfig0, DefaultTopicsConfig)
It seems that we expect Topics
to be a [binary()]
but later in the code there is the following validation:
valid_consumer_topics([H|T]) ->
case H of
{K, V} when is_binary(K) and is_list(V) ->
Mod = erlkaf_utils:lookup(callback_module, V),
case Mod =/= undefined andalso is_atom(Mod) of
true ->
valid_consumer_topics(T);
_ ->
{error, {invalid_topic, H}}
end;
_ ->
{error, {invalid_topic, H}}
end;
That seems to force Topics
to be a list of pairs {binary(), [{callback_module, atom()}]}
or similar.
Dialyzer says the contract is broken for valid Topics
parameter.
lib/broadway_erlkaf/v1/api/topic.ex:0: The call erlkaf:create_consumer_group
(_worker_name@1 :: atom(),
_group_name@1 :: any(),
[{_,
[{'callback_args', [any(), ...]} |
{'callback_module', 'Elixir.BroadwayErlkaf.V1.Api.Topic'} |
{'dispatch_mode', {'batch', _}},
...]},
...],
_client@1 :: any(),
_topics_opts@1 :: [{'auto_offset_reset', 'smallest'}, ...]) breaks the contract
(client_id(),
binary(),
[binary()],
[client_option()],
[topic_option()]) ->
'ok' | {'error', reason()}
I believe we only need to properly define the spec for the Topics
parameter at create_consumer_group
but don't really know which is the proper type for it.
I am building on macos catalina 10.15.1
logs
make[3]: Entering directory `/Users/pankajsoni/Documents/Git/Rivers/ejabberd/master/deps/erlkaf/c_src'
CPP erlkaf_producer.cc
CPP queuemanager.cc
CPP topicmanager.cc
CPP erlkaf_logger.cc
CPP erlkaf_config.cc
CPP erlkaf_nif.cc
CPP erlkaf_consumer.cc
CPP nif_utils.cc
LD erlkaf_nif.so
Undefined symbols for architecture x86_64:
"_OPENSSL_sk_pop_free", referenced from:
_rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
"_OpenSSL_version", referenced from:
_rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
"_OpenSSL_version_num", referenced from:
_rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
"_SSL_CTX_set_options", referenced from:
_rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
"_TLS_client_method", referenced from:
_rd_kafka_transport_ssl_ctx_init in librdkafka.a(rdkafka_transport.o)
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[3]: *** [/Users/pankajsoni/Documents/Git/Rivers/ejabberd/master/deps/erlkaf/c_src/../priv/erlkaf_nif.so] Error 1
make[3]: Leaving directory `/Users/pankajsoni/Documents/Git/Rivers/ejabberd/master/deps/erlkaf/c_src'
I can't make erlkaf_nif.so.
my OS is Debian GNU/Linux 10 (buster)
and here is the log:
$ make compile_nif
librdkafka fork already exist. delete deps/librdkafka for a fresh checkout ...
concurrentqueue fork already exist. delete deps/concurrentqueue for a fresh checkout ...
make[1]: Entering directory '/home/nava/src/tst/erlkaf/c_src'
CPP topicmanager.cc
CPP queuemanager.cc
CPP erlkaf_logger.cc
CPP nif_utils.cc
CPP erlkaf_config.cc
CPP erlkaf_nif.cc
CPP queuecallbacksdispatcher.cc
CPP erlkaf_producer.cc
CPP erlkaf_consumer.cc
LD erlkaf_nif.so
/usr/bin/ld: cannot find -lz
collect2: error: ld returned 1 exit status
make[1]: *** [nif.mk:72: /home/ali/src/tst/erlkaf/c_src/../priv/erlkaf_nif.so] Error 1
make[1]: Leaving directory '/home/ali/src/tst/erlkaf/c_src'
make: *** [Makefile:7: compile_nif] Error 2
I haven't been able to reproduce this outside of the test suite of an internal project, so I'm opening in hopes you might have seen this before.
In the test suite I was going to have a consumer start and stop with the suite that needs it. A producer is configured in sys.config and started when the the application is booted during the start of the test suites.
The segfault happens when attempting to stop the consumer during test suite cleanup, erlkaf:stop_client(client_consumer)
. Logs look like:
22:03:18.453 [info] stop consumer for: <<"events">> partition: 0
22:03:18.453 [info] wait for consumer client client_consumer to stop...
22:03:18.529 [info] client client_consumer stopped
Segmentation fault (core dumped)
I tried reproducing by manually starting and stopping a consumer but it works fine.
Do you have any theories on possible reasons for a segfault that might help me make this reproducible?
Hi,
I'm unable to produce tombstones (null messages) to kafka for a specified key in a compacted topic. I think the problem is that the producer is always expecting a value with a binary content and when an undefined atom arrives, the make_badarg
method is invoked.
I would like to know if this is not supported or I'm missing something.
Thank you
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.