Coder Social home page Coder Social logo

brod's Introduction

NOTICE

This product includes software developed by Klarna Bank AB (publ)

Brod - Apache Kafka Client for Erlang/Elixir

brod

Brod is an Erlang implementation of the Apache Kafka protocol, providing support for both producers and consumers.

Why "brod"? http://en.wikipedia.org/wiki/Max_Brod

Features

  • Supports Apache Kafka v0.8+
  • Robust producer implementation supporting in-flight requests and asynchronous acknowledgements
  • Both consumer and producer handle leader re-election and other cluster disturbances internally
  • Opens max 1 tcp connection to a broker per brod_client, one can create more clients if needed
  • Producer: will start to batch automatically when number of unacknowledged (in flight) requests exceeds configurable maximum
  • Producer: will try to re-send buffered messages on common errors like "Not a leader for partition", errors are resolved automatically by refreshing metadata
  • Simple consumer: The poller, has a configurable "prefetch count" - it will continue sending fetch requests as long as total number of unprocessed messages (not message-sets) is less than "prefetch count"
  • Group subscriber: Support for consumer groups with options to have Kafka as offset storage or a custom one
  • Topic subscriber: Subscribe on messages from all or selected topic partitions without using consumer groups
  • Pick latest supported version when sending requests to kafka.
  • Direct APIs for message send/fetch and cluster inspection/management without having to start clients/producers/consumers.
  • A escriptized command-line tool for message send/fetch and cluster inspection/management.
  • Configurable compression library. snappy compression is supported by default. For more compression options, see kafka_protocol/README

Building and testing

NOTE: Min Erlang/OTP version 22

make compile
make test-env t # requires docker-compose in place

Working With Kafka 0.9.x or Earlier

Make sure {query_api_versions, false} exists in client config. This is because ApiVersionRequest was introduced in kafka 0.10, sending such request to older version brokers will cause connection failure.

e.g. in sys.config:

[{brod,
   [ { clients
     , [ { brod_client_1 %% registered name
         , [ { endpoints, [{"localhost", 9092}]}
           , { query_api_versions, false} %% <---------- here
           ]}]}]}]

Quick Demo

Assuming kafka is running at localhost:9092 and there is a topic named test-topic.

Start Erlang shell by make compile; erl -pa _build/default/lib/*/ebin, then paste lines below into shell:

rr(brod),
{ok, _} = application:ensure_all_started(brod),
KafkaBootstrapEndpoints = [{"localhost", 9092}],
Topic = <<"test-topic">>,
Partition = 0,
ok = brod:start_client(KafkaBootstrapEndpoints, client1),
ok = brod:start_producer(client1, Topic, _ProducerConfig = []),
{ok, FirstOffset} = brod:produce_sync_offset(client1, Topic, Partition, <<"key1">>, <<"value1">>),
ok = brod:produce_sync(client1, Topic, Partition, <<"key2">>, <<"value2">>),
SubscriberCallbackFun = fun(Partition, Msg, ShellPid = CallbackState) -> ShellPid ! Msg, {ok, ack, CallbackState} end,
Receive = fun() -> receive Msg -> Msg after 1000 -> timeout end end,
brod_topic_subscriber:start_link(client1, Topic, Partitions=[Partition],
                                 _ConsumerConfig=[{begin_offset, FirstOffset}],
                                 _CommittedOffsets=[], message, SubscriberCallbackFun,
                                 _CallbackState=self()),
AckCb = fun(Partition, BaseOffset) -> io:format(user, "\nProduced to partition ~p at base-offset ~p\n", [Partition, BaseOffset]) end,
ok = brod:produce_cb(client1, Topic, Partition, <<>>, [{<<"key3">>, <<"value3">>}], AckCb).
Receive().
Receive().
{ok, {_, [Msg]}} = brod:fetch(KafkaBootstrapEndpoints, Topic, Partition, FirstOffset + 2), Msg.

Example outputs:

#kafka_message{offset = 0,key = <<"key1">>,
               value = <<"value1">>,ts_type = create,ts = 1531995555085,
               headers = []}
#kafka_message{offset = 1,key = <<"key2">>,
               value = <<"value2">>,ts_type = create,ts = 1531995555107,
               headers = []}
Produced to partition 0 at base-offset 406
#kafka_message{offset = 2,key = <<"key3">>,
               value = <<"value3">>,ts_type = create,ts = 1531995555129,
               headers = []}

Overview

Brod supervision (and process link) tree.

brod supervision architecture

Clients

A brod_client in brod is a gen_server responsible for establishing and maintaining tcp sockets connecting to kafka brokers. It also manages per-topic-partition producer and consumer processes under two-level supervision trees.

To use producers or consumers, you have to start at least one client that will manage them.

Start clients by default

You may include client configs in sys.config have them started by default (by application controller)

Example of configuration (for sys.config):

[{brod,
   [ { clients
     , [ { brod_client_1 %% registered name
         , [ { endpoints, [{"localhost", 9092}]}
           , { reconnect_cool_down_seconds, 10} %% socket error recovery
           ]
         }
       ]
     }
     %% start another client for another kafka cluster
     %% or if you think it's necessary to start another set of tcp connections
   ]
}]

Example of configuration in Elixir (for config/dev.exs or config/prod.exs, etc.):

config :brod,
  clients: [
    # :brod_client_1 is the registered name of the client
    brod_client_1: [
      endpoints: [{"localhost", 9092}],
      reconnect_cool_down_seconds: 10
    ]
  ]

Start brod client on demand

You may also call brod:start_client/1,2,3 to start a client on demand, which will be added to brod supervision tree.

ClientConfig = [{reconnect_cool_down_seconds, 10}],
ok = brod:start_client([{"localhost", 9092}], brod_client_1, ClientConfig).

Extra socket options could be passed as {extra_sock_opts, ExtraSockOpts}, e.g.

ExtraSockOpts = [{sndbuf, 1024*1024}],
ok = brod:start_client([{"localhost", 9092}], brod_client_1, [{extra_sock_opts, ExtraSockOpts}]).

Producers

A brod_producer is a gen_server that is responsible for producing messages to a given partition of a given topic.

Producers may be started either manually or automatically in the moment you call brod:produce but did not call brod:start_producer beforehand.

Auto start producer with default producer config

Put below configs to client config in sys.config or app env if you start client statically:

{auto_start_producers, true}
{default_producer_config, []}

Or pass the {auto_start_producers, true} option to brod:start_client if you start the client dynamically.

Start a Producer on Demand

brod:start_producer(_Client         = brod_client_1,
                    _Topic          = <<"brod-test-topic-1">>,
                    _ProducerConfig = []).

Supported Message Input Format

Brod supports below produce APIs:

The Value arg in these APIs can be:

  • binary(): One single message
  • {brod:msg_ts(), binary()}: One single message with its create-time timestamp
  • #{ts => brod:msg_ts(), value => binary(), headers => [{_, _}]}: One single message. If this map does not have a key field, the Key argument is used.
  • [{K, V} | {T, K, V}]: A batch, where V could be a nested list of such representation.
  • [#{key => K, value => V, ts => T, headers => [{_, _}]}]: A batch.

When Value is a batch, the Key argument is only used as partitioner input and all messages are written on the same partition. All messages are unified into a batch format of below spec: [#{key => K, value => V, ts => T, headers => [{_, _}]}]. ts field is dropped for kafka prior to version 0.10 (produce API version 0, magic version 0). headers field is dropped for kafka prior to version 0.11 (produce API version 0-2, magic version 0-1).

Synchronized Produce API

brod:produce_sync(_Client    = brod_client_1,
                  _Topic     = <<"brod-test-topic-1">>,
                  _Partition = 0,
                  _Key       = <<"some-key">>,
                  _Value     = <<"some-value">>).

Or block calling process until Kafka confirmed the message:

{ok, CallRef} =
  brod:produce(_Client    = brod_client_1,
               _Topic     = <<"brod-test-topic-1">>,
               _Partition = 0,
               _Key       = <<"some-key">>,
               _Value     = <<"some-value">>),
brod:sync_produce_request(CallRef).

Produce One Message and Receive Its Offset in Kafka

Client = brod_client_1,
Topic  = <<"brod-test-topic-1">>,
{ok, Offset} = brod:produce_sync_offset(Client, Topic, 0, <<>>, <<"value">>).

Produce with Random Partitioner

Client = brod_client_1,
Topic  = <<"brod-test-topic-1">>,
ok = brod:produce_sync(Client, Topic, random, Key, Value).

Produce a Batch

brod:produce(_Client    = brod_client_1,
             _Topic     = <<"brod-test-topic-1">>,
             _Partition = MyPartitionerFun,
             _Key       = KeyUsedForPartitioning,
             _Value     = [ #{key => "k1", value => "v1", headers => [{"foo", "bar"}]}
                          , #{key => "k2", value => "v2"}
                          ]).

Handle Acks from Kafka as Messages

For async produce APIs brod:produce/3 and brod:produce/5, the caller should expect a message of below pattern for each produce call.

#brod_produce_reply{ call_ref = CallRef %% returned from brod:produce
                   , result   = brod_produce_req_acked
                   }

Add -include_lib("brod/include/brod.hrl"). to use the record.

In case the brod:produce caller is a process like gen_server which receives ALL messages, the callers should keep the call references in its looping state and match the replies against them when received. Otherwise brod:sync_produce_request/1 can be used to block-wait for acks.

NOTE: If required_acks is set to none in producer config, kafka will NOT ack the requests, and the reply message is sent back to caller immediately after the message has been sent to the socket process.

NOTE: The replies are only strictly ordered per-partition. i.e. if the caller is producing to two or more partitions, it may receive replies ordered differently than in which order brod:produce API was called.

Handle Acks from Kafka in Callback Function

Async APIs brod:produce_cb/4 and brod:produce_cb/6 allow callers to provided a callback function to handle acknowledgements from kafka. In this case, the caller may want to monitor the producer process because then they know that the callbacks will not be evaluated if the producer is 'DOWN', and there is perhaps a need for retry.

Consumers

Kafka consumers work in poll mode. In brod, brod_consumer is the poller, which is constantly asking for more data from the kafka node which is a leader for the given partition.

By subscribing to brod_consumer a process should receive the polled message sets (not individual messages) into its mailbox.

In brod, we have so far implemented two different subscribers (brod_topic_subscriber and brod_group_subscriber), hopefully covered most of the common use cases.

For maximum flexibility, applications may implement their own per-partition subscriber.

Below diagrams illustrate 3 examples of how subscriber processes may work with brod_consumer.

Partition subscriber

partition subscriber architecture

This gives the best flexibility as the per-partition subscribers work directly with per-partition pollers (brod_consumers).

The messages are delivered to subscribers in message sets (batches), not individual messages, (however the subscribers are allowed to ack individual offsets).

Example:

ok = brod:start_client([{"localhost", 9092}], my_client). % one client per application is enough
ok = brod:start_consumer(my_client, <<"my_topic">>, []).

% Now in a separate process for each partition of my_topic call:
{ok, ConsumerPid} = brod:subscribe(my_client, self(), <<"my_topic">>, Partition, []).
% The process should now receive messages sets as regular messages

Topic subscriber (brod_topic_subscriber)

topic subscribe flow

A topic subscriber provides the easiest way to receive and process messages from ALL partitions of a given topic. See brod_demo_cg_collector and brod_demo_topic_subscriber for example.

Users may choose to implement the brod_topic_subscriber behaviour callbacks in a module, or simply provide an anonymous callback function to have the individual messages processed.

Group subscriber (brod_group_subscriber)

group subscriber flow

Similar to topic subscriber, the brod_group_subscriber behaviour callbacks are to be implemented to process individual messages. See brod_demo_group_subscriber_koc and brod_demo_group_subscriber_loc for example.

A group subscriber is started by giving a set of topics, some (maybe none, or maybe all) of the partitions in the topic set will be assigned to it, then the subscriber should subscribe to ALL the assigned partitions.

Users may also choose to implement the brod_group_member behaviour (callbacks for brod_group_coordinator) for a different group subscriber (e.g. spawn one subscriber per partition), see brucke for example.

Example of group consumer which commits offsets to Kafka

-module(my_subscriber).
-include_lib("brod/include/brod.hrl"). %% needed for the #kafka_message record definition

-export([start/1]).
-export([init/2, handle_message/4]). %% callback api

%% brod_group_subscriber behaviour callback
init(_GroupId, _Arg) -> {ok, []}.

%% brod_group_subscriber behaviour callback
handle_message(_Topic, Partition, Message, State) ->
  #kafka_message{ offset = Offset
                , key   = Key
                , value = Value
                } = Message,
  error_logger:info_msg("~p ~p: offset:~w key:~s value:~s\n",
                        [self(), Partition, Offset, Key, Value]),
  {ok, ack, State}.

%% @doc The brod client identified ClientId should have been started
%% either by configured in sys.config and started as a part of brod application
%% or started by brod:start_client/3
%% @end
-spec start(brod:client_id()) -> {ok, pid()}.
start(ClientId) ->
  Topic  = <<"brod-test-topic-1">>,
  %% commit offsets to kafka every 5 seconds
  GroupConfig = [{offset_commit_policy, commit_to_kafka_v2},
                 {offset_commit_interval_seconds, 5}
                ],
  GroupId = <<"my-unique-group-id-shared-by-all-members">>,
  ConsumerConfig = [{begin_offset, earliest}],
  brod:start_link_group_subscriber(ClientId, GroupId, [Topic],
                                   GroupConfig, ConsumerConfig,
                                   _CallbackModule  = ?MODULE,
                                   _CallbackInitArg = []).

Authentication support

brod supports SASL PLAIN, SCRAM-SHA-256 and SCRAM-SHA-512 authentication mechanisms out of the box. To use it, add {sasl, {Mechanism, Username, Password}} or {sasl, {Mechanism, File}} to client config. Where Mechanism is plain | scram_sha_256 | scram_sha_512, and File is the path to a text file which contains two lines, first line for username and second line for password

Also, brod has authentication plugins support with {sasl, {callback, Module, Opts}} in client config. Authentication callback module should implement kpro_auth_backend behaviour. Auth function spec:

auth(Host :: string(), Sock :: gen_tcp:socket() | ssl:sslsocket(),
     Mod :: gen_tcp | ssl, ClientId :: binary(),
     Timeout :: pos_integer(), SaslOpts :: term()) ->
        ok | {error, Reason :: term()}

If authentication is successful - callback function should return an atom ok, otherwise - error tuple with reason description. For example, you can use brod_gssapi plugin for SASL GSSAPI authentication. To use it - add it as dependency to your top level project that uses brod. Then add {sasl, {callback, brod_gssapi, {gssapi, Keytab, Principal}}} to client config. Keytab should be the keytab file path, and Principal should be a byte-list or binary string.

See also: https://github.com/klarna/brod/wiki/SASL-gssapi-(kerberos)-authentication

Other API to play with/inspect kafka

These functions open a connection to kafka cluster, send a request, await response and then close the connection.

Hosts = [{"localhost", 9092}].
Topic = <<"topic">>.
Partition = 0.
Timeout = 1000.
TopicConfigs = [
  #{
    configs => [ #{name  => <<"cleanup.policy">>, value => "compact"}],
    num_partitions => 1,
    assignments => [],
    replication_factor => 1,
    name => Topic
  }
].
brod:get_metadata(Hosts).
brod:create_topics(Hosts, TopicConfigs, #{timeout => Timeout}).
brod:get_metadata(Hosts, [Topic]).
brod:resolve_offset(Hosts, Topic, Partition).
brod:delete_topics(Hosts, [Topic], Timeout).

Caution the above delete_topics can fail if you do not have delete.topic.enable set to true in your kafka config

brod-cli: A command line tool to interact with Kafka

This will build a self-contained binary with brod application

make brod-cli
_build/brod_cli/rel/brod/bin/brod -h

Disclaimer: This script is NOT designed for use cases where fault-tolerance is a hard requirement. As it may crash when e.g. kafka cluster is temporarily unreachable, or (for fetch command) when the partition leader migrates to another broker in the cluster.

brod-cli examples (with alias brod=_build/brod_cli/rel/brod/bin/brod):

Fetch and print metadata

brod meta -b localhost

Produce a Message

brod send -b localhost -t test-topic -p 0 -k "key" -v "value"

Fetch a Message

brod fetch -b localhost -t test-topic -p 0 --fmt 'io:format("offset=~p, ts=~p, key=~s, value=~s\n", [Offset, Ts, Key, Value])'

Bound variables to be used in --fmt expression:

  • Offset: Message offset
  • Key: Kafka key
  • Value: Kafka Value
  • TsType: Timestamp type either create or append
  • Ts: Timestamp, -1 as no value

Stream Messages to Kafka

Send README.md to kafka one line per kafka message

brod pipe -b localhost:9092 -t test-topic -p 0 -s @./README.md

Resolve Offset

brod offset -b localhost:9092 -t test-topic -p 0

List or Describe Groups

# List all groups
brod groups -b localhost:9092

# Describe groups
brod groups -b localhost:9092 --ids group-1,group-2

Display Committed Offsets

# all topics
brod commits -b localhost:9092 --id the-group-id --describe

# a specific topic
brod commits -b localhost:9092 --id the-group-id --describe --topic topic-name

Commit Offsets

NOTE: This feature is designed for force overwriting commits, not for regular use of offset commit.

# Commit 'latest' offsets of all partitions with 2 days retention
brod commits -b localhost:9092 --id the-group-id --topic topic-name --offsets latest --retention 2d

# Commit offset=100 for partition 0 and 200 for partition 1
brod commits -b localhost:9092 --id the-group-id --topic topic-name --offsets "0:100,1:200"

# Use --retention 0 to delete commits (may linger in kafka before cleaner does its job)
brod commits -b localhost:9092 --id the-group-id --topic topic-name --offsets latest --retention 0

# Try join an active consumer group using 'range' protocol and steal one partition assignment then commit offset=10000
brod commits -b localhost:9092 -i the-group-id -t topic-name -o "0:10000" --protocol range

TODOs

  • Support scram-sasl in brod-cli
  • Transactional produce APIs

brod's People

Contributors

ates avatar axs-mvd avatar bjosv avatar brigadier avatar chsukivra avatar chulkilee avatar dianaolympos avatar dszoboszlay avatar epsylonix avatar gonzalobf avatar id avatar jesperes avatar juise avatar k32 avatar kianmeng avatar kjellwinblad avatar mikpe avatar onno-vos-dev avatar qzhuyan avatar robertoaloi avatar robsonpeixoto avatar serikdm avatar spencerdcarlson avatar ssepml avatar taavi avatar thalesmg avatar tpitale avatar v0idpwn avatar xxdavid avatar zmstone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

brod's Issues

be lazy on establishing payload connections

in current (2.0-dev) implementation: if an idle payload connection is killed by kafka (or some load balancer etc.), brod_consumer will try to setup a new one after a (configurable) timeout.
this is not necessary when there is currently no subscriber listening.

it should allow subscribers to subscribe when there is no alive brod_sock (instead of return {error, no_connection}), and try to connect to the partition leader after the subscriber checks in.

error in brod_group_subscriber:handle_ack and else

brod: 2.2.3

after started server, i got this error:

2016-10-13 21:05:29.654 [info] <0.572.0> group coordinator (groupId=flume_hive_table_v1-mstream_source_kafka-group-id,memberId='[email protected]'/<0.572.0>-51c3b7e6-a8ab-426a-8159-e1b3c25eff7d,generation=2,pid=<0.572.0>):
assignments received:
flume_hive_table_v1:
    partition=2 begin_offset=160291
2016-10-13 21:05:29.664 [error] <0.571.0> gen_server <0.571.0> terminated with reason: no match of right hand value {error,consumer_down} in brod_group_subscriber:handle_ack/2 line 398
2016-10-13 21:05:29.670 [error] <0.571.0> CRASH REPORT Process <0.571.0> with 0 neighbours exited with reason: no match of right hand value {error,consumer_down} in brod_group_subscriber:handle_ack/2 line 398 in gen_server:terminate/7 line 826
2016-10-13 21:05:29.670 [info] <0.572.0> group coordinator (groupId=flume_hive_table_v1-mstream_source_kafka-group-id,memberId='[email protected]'/<0.572.0>-51c3b7e6-a8ab-426a-8159-e1b3c25eff7d,generation=2,pid=<0.572.0>):
leaving group, reason {{badmatch,{error,consumer_down}},[{brod_group_subscriber,handle_ack,2,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_group_subscriber.erl"},{line,398}]},{brod_group_subscriber,handle_cast,2,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_group_subscriber.erl"},{line,317}]},{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,615}]},{gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,681}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}
2016-10-13 21:05:29.670 [error] <0.572.0> gen_server <0.572.0> terminated with reason: no match of right hand value {error,consumer_down} in brod_group_subscriber:handle_ack/2 line 398
2016-10-13 21:05:29.672 [error] <0.572.0> CRASH REPORT Process <0.572.0> with 0 neighbours exited with reason: no match of right hand value {error,consumer_down} in brod_group_subscriber:handle_ack/2 line 398 in gen_server:terminate/7 line 826

several mins later , i got this:

2016-10-13 21:24:01.360 [info] <0.582.0> group coordinator (groupId=flume_hive_table_v1-mstream_source_kafka-group-id,memberId='[email protected]'/<0.582.0>-edf2c54d-0722-448e-a321-920aac7f4d87,generation=5,pid=<0.582.0>):
leaving group, reason {{{function_clause,[{gen,do_for_proc,[{down,"2016-10-13:13:23:52.214585",{already_subscribed_by,<0.573.0>}},#Fun<gen.0.132519590>],[{file,"gen.erl"},{line,252}]},{gen_server,call,3,[{file,"gen_server.erl"},{line,208}]},{brod_consumer,safe_gen_call,3,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_consumer.erl"},{line,594}]},{brod_group_subscriber,'-handle_call/3-fun-0-',1,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_group_subscriber.erl"},{line,306}]},{lists,foreach,2,[{file,"lists.erl"},{line,1337}]},{brod_group_subscriber,handle_call,3,[{file,"/media/psf/AllFiles/code/git/mstream/_build/default/lib/brod/src/brod_group_subscriber.erl"},{line,302}]},{gen_server,try_handle_call,4,[{file,"gen_server.erl"},{line,629}]},{gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,661}]}]},{gen_server,call,[{down,"2016-10-13:13:23:52.214585",{already_subscribed_by,<0.573.0>}},{unsubscribe,<0.581.0>},infinity]}},{gen_server,call,[<0.581.0>,unsubscribe_all_partitions,infinity]}}
2016-10-13 21:24:01.361 [error] <0.582.0> gen_server <0.582.0> terminated with reason: no match of right hand value {error,{sock_down,noproc}} in brod_group_coordinator:stop_socket/1 line 505

during the error time, it seems that all subscibe cannt receive messages.

Group subscriber supervision

I implemented a pool of group subscribers as described in the Readme (and the demo module) to consume a topic in kafka. I didn't use the topic subscribers as we can have several agents running who share the load.

What I've been seeing however is that my group subscribers are crashing because of timeouts writing the offsets to kafka (not necessarily all at once). The subscribers are never restarted, meaning that after a few timeouts I don't have any subscribers left.

I've been working with brod 2.1.6, and I saw that in brod 2.1.9 subscribers should no longer crash if they can't write their offsets to kafka. However that doesn't solve the subscribers not being restarted if they crash some other way.

As such, how should I be supervising the group subscribers to ensure they don't die permanently?

gen_tcp:send return value bad match

2016-12-12 17:39:43.028 [error] <0.6283.0> CRASH REPORT Process <0.6283.0> with 0 neighbours exited with reason: no match of right hand value {error,closed} in brod_sock:handle_msg/3 line 272 in brod_sock:init/5 line 185
2016-12-12 17:39:43.030 [info] <0.2596.0> client mstream_source_kafka: payload socket down hd4.mingchao.com:9092
reason:{{badmatch,{error,closed}},[{brod_sock,handle_msg,3,[{file,"/code/git/mstream/_build/default/lib/brod/src/brod_sock.erl"},{line,272}]},{brod_sock,init,5,[{file,"/code/git/mstream/_build/default/lib/brod/src/brod_sock.erl"},{line,182}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}

maybe we need case clause

calling brod:start_client(Hosts, undefined) returns already_started

1> H = [{"localhost",9092}].
[{"localhost",9092}]
2> brod:start_client(H, undefined).
{error,{{already_started,undefined},
{child,undefined,undefined,
{brod_client,start_link,[[{"localhost",9092}],undefined,[]]},
{permanent,10},
5000,worker,
[brod_client]}}}
3> brod:start_client(H, anyclient).
ok

required_acks=0 only processes first message

Hi,

I've just started exploring the driver. I notice that when required_acks = 0 (which if I'm not mistaken basically tells kafka to not send any acknowledgments), only the first message gets processed by the consumer. All succeeding messages are not consumed.

Upon further investigation, it seems as if the driver is stuck on expecting a response or something? I'm not 100% sure as I have to delve into abit of code. But just incase you guys have any idea?

I'm basically just calling

brod:produce(brod1, ?SESSION_TOPIC, ?RANDOM_PARTITION, <<>>, J).

sync_produce will result in permanent waiting of response.

Please let me know.

Thanks!
Byron

client topic producer relationship

Hello
   I asked a framework on the issue, a client, in the same topic, there can be only one producer. But in many production processes, a topic may require multiple producers, may I ask what is your good idea?

Problem compiling Snappyer in v2.2.1 in Mac OS X

Hello, I'm trying to update brod to v2.2.1 and I've got error when trying to compile snappyer on Mac OS X (El Capitan). I'm using Hex in an Elixir project.

A co-worker that also uses Mac OS X has the same problem. Meanwhile, it seems that compiles perfectly in another co-worker's machine that uses Ubuntu (but our acceptance tests stop working with message parse errors ...)

With version 2.1.8 everything works perfectly.

This is the output that I got:

==> snappyer (compile)
DEPEND snappyer.d
ERLC snappyer.erl
APP snappyer.app.src
CPP snappy-sinksource.cc
clang: warning: optimization flag '-finline-functions' is not supported
CPP snappy-stubs-internal.cc
clang: warning: optimization flag '-finline-functions' is not supported
CPP snappy.cc
clang: warning: optimization flag '-finline-functions' is not supported
CPP snappyer.cc
clang: warning: optimization flag '-finline-functions' is not supported
LD snappyer.so
Undefined symbols for architecture x86_64:
"enif_alloc_binary", referenced from:
SnappyNifSink::SnappyNifSink(enif_environment_t
) in snappyer.o
SnappyNifSink::SnappyNifSink(enif_environment_t
) in snappyer.o
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
"_enif_inspect_iolist_as_binary", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_atom", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_badarg", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_binary", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
"_enif_make_existing_atom", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_tuple", referenced from:
_snappy_compress_erl in snappyer.o
_snappy_decompress_erl in snappyer.o
_snappy_uncompressed_length_erl in snappyer.o
_snappy_is_valid in snappyer.o
"_enif_make_ulong", referenced from:
_snappy_uncompressed_length_erl in snappyer.o
"enif_realloc_binary", referenced from:
SnappyNifSink::GetAppendBuffer(unsigned long, char
) in snappyer.o
SnappyNifSink::getBin() in snappyer.o
_snappy_compress_erl in snappyer.o
"_enif_release_binary", referenced from:
SnappyNifSink::~SnappyNifSink() in snappyer.o
SnappyNifSink::~SnappyNifSink() in snappyer.o
SnappyNifSink::~SnappyNifSink() in snappyer.o
snappy_compress_erl in snappyer.o
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make: *
* [/Users/samuel/cfadapter/deps/snappyer/priv/snappyer.so] Error 1
ERROR: Command [compile] failed!
==> cfadapter
** (Mix) Could not compile dependency :snappyer, "/Users/samuel/.mix/rebar compile skip_deps=true deps_dir="/Users/samuel/cfadapter/build/dev/lib"" command failed. You can recompile this dependency with "mix deps.compile snappyer", update it with "mix deps.update snappyer" or clean it with "mix deps.clean snappyer"

I'm not used to NIFs in Erlang/Elixir, so I'm not sure where is the problem. Any idea???

Thank you.

consumer offset reset policy

when OffsetOutOfRange exception is received by the consumer, the current implementation will send #kafka_fetch_error{} to the subscriber and expect the subscriber to re-subscribe with valid offsets.

a offset_reset_policy should allow brod_consumer to reset to a valid offset without taking it to the subscriber.

Metadata socket down

Hi,

From what I understand, there is an occasional check for kafka metadata. However, I am getting these in the logs.

2016-05-30 08:33:49.887 [error] <0.282.0> CRASH REPORT Process <0.282.0> with 0 neighbours exited with reason: tcp_closed in brod_sock:handle_msg/3 line 214
2016-05-30 08:33:49.887 [error] <0.278.0> client brod1 metadata socket down 10.11.5.41:9092

If I'm not mistaken, tcp_closed is expected from the metadata socket after retrieval of inforight? Is this error expected as well? If this is expected, the only reason this error is appearing is because it's not properly handled?

Please enlighten me.

Thanks!

Batching happens only opportunistically

It looks like brod will try to send produce requests as quickly as possible. Notably, in the case where requiredAcks = 0 it will just send everything individually. Many other kafka libraries seem to offer something like lingerMS which lets the library batch things more effectively. Then a produce request is only set after lingerMS milliseconds have passed or the buffer gets over some predetermined size. The benefits of this are reduced network traffic and load on the brokers, and it makes the compression options much more effective.

Is this be something brod might support? It's something I could potentially work on if it turns out to be helpful for my company, but it's a slightly larger feature so I thought I'd check first that it wasn't being worked on and would potentially be accepted.

brod_demo_group_subscriber_loc has error in kafka 0.8.2

Hey, when i use brod (new, 2.0) as a consumer client to connect kafka server(0.8.2) as brod_demo_group_subscriber_loc.erl/brod_demo_group_subscriber_koc.erl

the kafka server has error :

[2016-04-25 16:01:38,744] ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.network.RequestChannel$Request.(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)

and the client has error:

=INFO REPORT==== 25-Apr-2016::16:01:36 ===
group controller (groupId=brod-demo-group-subscriber-loc-group-id,memberId=,generation=0,pid=<5384.23225.2>):
re-joining group, reason:{sock_down,tcp_closed}** at node [email protected] **

=INFO REPORT==== 25-Apr-2016::16:01:36 ===
group controller (groupId=brod-demo-group-subscriber-loc-group-id,memberId=,generation=0,pid=<5384.23223.2>):
connected to group coordinator localhost:9092** at node [email protected] **

=INFO REPORT==== 25-Apr-2016::16:01:36 ===
group controller (groupId=brod-demo-group-subscriber-loc-group-id,memberId=,generation=0,pid=<5384.23223.2>):
failed to join group
reason:{sock_down,tcp_closed}** at node [email protected] **

i dont know the kafka protocol details so i dont know what mistake i make.
sorry for my poor english , if have some thing confusion we can communicate by chinese @ zaiming

Thanks

consumer may receive stale acks from subscriber

in some cases brod_consumer may receive stale 'ack' messages.
e.g. when a subscriber delegates 'ack's to other processes, and itself re-subscribe with a new begin_offset (which cause a buffer reset in brod_consumer).

example:
when pending-acks = [4,5,6]
the subscriber delegate the messages to another process, while the messages are being processed,
the subscriber does a new subscribe call with new begin_offset, which causes buffer reset in brod_consumer.
brod_consumer quickly consumed messages [1, 2, 3] and put to pending-acks.
if the 'ack' message for 4, 5 or 6 is received after the reset,
all newly consumed messages will be removed from pending list.

Topic creation/partitions

get_metadata will create a topic if it doesn't already exist, but are we missing something about how to create a certain number of partitions on this topic? Producing to a nonzero partition number generates errors.

Is there a set way to declare a partition and topic as we produce to it?

Bad Hex Package

The package on hex does not contain the dependencies: https://hex.pm/packages/brod

Dependencies must also be hex packages. Without a fix for this anyone using the hex package must manually include the 2 dependencies required by brod in their top level config.

Best way to reliably connect clients for producing?

Hello!

If I create a bunch of separate processes using brod (in a riak post-commit hook specifically) I'm finding the start_link_client calls affect each other and I start getting {error,{already_started, <PID>}} responses.

I've tried simply using the PID from the error for the calls to produce, but then I get {error,{producer_down,shutdown}} when trying to produce to the PID. Presumably because the other process that started that connection is wrapping up. I get the same behavior if I give start_link_client an atom name and use that instead of the PID.

In this case I'm using only producing to Kafka. From what I see in the code it looks like the already_started response is from preparing for consumers. If that's right, is there a better way that I should be prepping a link for producing?

If not, should I be watching for the producer_down responses and trying to start a new client and producer in response?

How to track the acknowledged offsets from a brod topic subscriber

Hello! I'm making a thin Elixir wrapper around brod for local Kafka work. I've got a Consumer module using brod_topic_subscriber to consume messages just fine. Right now I'm using the {:ok, :ack, state} flow so messages are acknowledged immediately after processing.

My expectation was that if something goes wrong during message processing then the given message(s) would be unacknowledged and redelivered.

That seems to be the case while my application is running. Messages fail to process and new messages add to the message set that's pending processing.

But if I restart the Elixir application then those unacknowledged messages are not reprocessed.

I've traced it to my setting [] as the uncommitted offsets in the init function of my brod topic subscriber module. Is there a way to have the topic subscriber track the offsets that have been acknowledged? Or is a topic subscribe expected to maintain its own acknowledged offsets? Do I really want to be using a group subscriber here?

Very little high-level documentation

I'm evaluating whether I can use a range of Kafka clients for a project I'm developing, and README.md doesn't answer the following basic questions about Brod:

  • Are these bindings to a Java client or an independent implementation? (answered by looking at all of the code)
  • Does it require ZooKeeper (locally or at all)?
  • Does it support the high-level consumer or it's an equivalent of the Simple Comsumer? (Turns out simple only because there is an issue for that)
  • Does it support message batching? (If not I could probably contribute it, sounds simple to implement)

inconsistent behaviour of brod:producer_sync/5

When no client is created:
1>brod:produce_sync(noclient, <<"Topic">>, 0, <<"Key">>, <<"Data">>).
{error,client_down}
2> brod:produce_sync(noclient, <<"Topic">>, fun(_) -> 0 end, <<"Key">>, <<"Data">>).
** exception error: bad argument
in function ets:lookup/2
called as ets:lookup(noclient,{topic_metadata,<<"Topic">>})
in call from brod_client:lookup_partitions_count_cache/2 (src/brod_client.erl, line 742)
in call from brod_client:get_partitions_count/3 (src/brod_client.erl, line 722)
in call from brod:produce/5 (src/brod.erl, line 279)
in call from brod:produce_sync/5 (src/brod.erl, line 317)

Can we have either returning {error, client_down} or crashing both cases please?
Thanks in advance

filter already consumed messages based on begin_offset

Compressed message sets are delivered in compressed batches to consumers.
Requesting to fetch from an offset in the middle of a compressed sets will get the whole set delivered.
This should be transparent to Brod subscribers: brod_consumer should filter the already consumed messages based on begin_offset after each new subscription.

"metadata socket down" in the 2.0-dev branch

I'm trying to prototype against Kafka 0.9 using the 2.0-dev branch and I'm running into the following in erl:

> {ok, ClientPid} = brod:start_link_client([{"docker", 9092}]).
{ok,<0.35.0>}
> brod:start_producer(ClientPid, <<"test">>, []).

=ERROR REPORT==== 13-Apr-2016::16:46:40 ===
client brod_default_client metadata socket down docker:9092
Reason:{sock_down,
           {undef,
               [{kpro,next_corr_id,[0],[]},
                {brod_kafka_requests,add,2,
                    [{file,"src/brod_kafka_requests.erl"},{line,66}]},
                {brod_sock,handle_msg,3,
                    [{file,"src/brod_sock.erl"},{line,217}]},
                {brod_sock,init,5,[{file,"src/brod_sock.erl"},{line,138}]},
                {proc_lib,init_p_do_apply,3,
                    [{file,"proc_lib.erl"},{line,239}]}]}}

Just trying to fetch metadata:

> Hosts = [{"docker", 9092}].
> brod:get_metadata(Hosts).
** exception error: no match of right hand side value {error,{sock_down,noproc}}
     in function  brod_utils:get_metadata/2 (src/brod_utils.erl, line 49)

On master:

> Hosts = [{"docker", 9092}].
> brod:get_metadata(Hosts).
{ok,{metadata_response,[{broker_metadata,1001,"docker",
                                         9092}],
                       [{topic_metadata,no_error,<<"test">>,
                                        [{partition_metadata,no_error,0,1001,[1001],[1001]}]}]}}

Is there some configuration piece or intermediate command for the 2.0 version I'm missing?

Compression of messages

Hey,

Turns out brod doesnt support gzip/snappy which are part of the kafka wire protocol. I know you are heading towards 2.0. Will this be a feature in there? Do you want the code I wrote to support this?

I am looking forward to 2.0 and the high level consumer, I ended up writing quite a bit of code to do a simple consumer with the current version.

brod 2.0-dev can't produce to an under-replicated topic

Setup:

  • brod: 2.0-dev / 697bbe8
  • Kafka: kafka_2.11-0.9.0.0 (2 brokers)
  • Zookeeper: 3.4.6 (3 servers)

Topic: "brod", 6 partitions, replication factor of 2.

If both Kafka brokers are up I can produce messages to the topic. However, if one of them goes down I can't:

Erlang/OTP 18 [erts-7.1] [source-2882b0c] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V7.1  (abort with ^G)
1> f().
ok
2> Hosts = [{"kafka1.vagrant", 9092},{"kafka2.vagrant", 9092}].
[{"kafka1.vagrant",9092},{"kafka2.vagrant",9092}]
3> {ok, ClientPid} =
3>   brod:start_link_client(_ClientId  = brod_client_1,
3>                          _Endpoints = Hosts,
3>                          _Config    = [{restart_delay_seconds, 10}],
3>                          _Producers = [ { <<"brod">>
3>                                         , [ {topic_restart_delay_seconds, 10}
3>                                           , {partition_restart_delay_seconds, 2}
3>                                           , {required_acks, -1}
3>                                           ]
3>                                         }
3>                                       ]).
{ok,<0.38.0>}
4> 
=ERROR REPORT==== 2-Feb-2016::21:53:08 ===
** Generic server <0.44.0> terminating 
** Last message in was {post_init,self,brod_producers_sup,
                           {brod_producers_sup2,brod_client_1,<<"brod">>,
                               [{partition_restart_delay_seconds,2},
                                {required_acks,-1}]}}
** When Server state == {state,undefined,undefined,[],undefined,undefined,
                               undefined,[],undefined,undefined}
** Reason for termination == 
** {{badmatch,{error,9}},
    [{brod_producers_sup,post_init,1,
                         [{file,"src/brod_producers_sup.erl"},{line,93}]},
     {supervisor3,handle_info,2,[{file,"src/supervisor3.erl"},{line,717}]},
     {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,615}]},
     {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,681}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}

Trying the same with the "new-consumer-api" branch (4edd0ec) gives me a "ReplicaNotAvailable" error:

Erlang/OTP 18 [erts-7.1] [source-2882b0c] [64-bit] [smp:8:8] [async-threads:10] [hipe] [kernel-poll:false]

Eshell V7.1  (abort with ^G)
1> f().
ok
2> Hosts = [{"kafka1.vagrant", 9092},{"kafka2.vagrant", 9092}].
[{"kafka1.vagrant",9092},{"kafka2.vagrant",9092}]
3> {ok, ClientPid} =
3>   brod:start_link_client(_ClientId  = brod_client_1,
3>                          _Endpoints = Hosts,
3>                          _Config    = [{restart_delay_seconds, 10}],
3>                          _Producers = [ { <<"brod">>
3>                                         , [ {topic_restart_delay_seconds, 10}
3>                                           , {partition_restart_delay_seconds, 2}
3>                                           , {required_acks, -1}
3>                                           ]
3>                                         }
3>                                       ]).
{ok,<0.38.0>}
4> 
=ERROR REPORT==== 2-Feb-2016::21:54:50 ===
** Generic server <0.44.0> terminating 
** Last message in was {post_init,self,brod_producers_sup,
                           {brod_producers_sup2,brod_client_1,<<"brod">>,
                               [{partition_restart_delay_seconds,2},
                                {required_acks,-1}]}}
** When Server state == {state,undefined,undefined,[],undefined,undefined,
                               undefined,[],undefined,undefined}
** Reason for termination == 
** {{badmatch,{error,'ReplicaNotAvailable'}},
    [{brod_producers_sup,post_init,1,
                         [{file,"src/brod_producers_sup.erl"},{line,93}]},
     {supervisor3,handle_info,2,[{file,"src/supervisor3.erl"},{line,717}]},
     {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,615}]},
     {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,681}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,240}]}]}

brod 1.5.3 (742ef99) works.

Terminal session recording: https://asciinema.org/a/0prwuld7v6lwclpizbnn03y6n

Am I not using brod 2.0-dev correctly or is this a bug?

create start_client return PID

Hi
Whether it can support the start_client without parameters, start_client function returns the client PID, which will help pre-access connection pool

Consumer offset in kafka topic

Hi!

Would be amazing to have something like the high-level consumer that can use Kafka itself to manage offsets. Is it planned for the future?

Producer leader caching

The way brod producer caches leaders leads to creating way too many sockets.

The Producer creates a proplist of [{{Topic, Partition}, brod_sock Pid} | Leaders ]

For a fairly typical case of having say 10 topics with 15 partitions each you would create 150 socket connections. Even if you have a typical 3 node kafka this ends up being 50 connections to each kafka.

We discovered this through testing brod producing to this setup

  • single kafka
  • 1 topic
  • 11 partitions
  • 10 poolboy workers each with a producer

After messing with our worker size we discovered that we can easily flood kafka with connections . After adding some debug to brod we saw that we are creating 11 sockets per producer when 1 would do.

Any thoughts?

Handle error in brod_group_coordinator commit_offsets

In brod_group_coordinator in handle_info(?LO_CMD_COMMIT_OFFSETS and in handle_call(commit_offsets you try to catch throw and stabilize.
But in do_commit_offsets_ you raise exception with erlang:error(EC), but not throw.
As a result this error is not catched and server crashes without attempt to stabilize.
Is it correct/expected behavior or we should catch this kafka error?

feature-request: reset committed offsets for consumer groups

It would make a lot of consumer's lives easier if brod can support a offset-reset feature.
a easy way is stop all other clients, and start ONE single member to commit the prepared (CLI or file) offsets.
Or a member that keeps re-joining the group until it gets elected as group leader then do not assign any partition to other members before committing the prepared offsets.

Why I can not compile???

$ rebar3 compile
===> Verifying dependencies...
===> Fetching kafka_protocol ({pkg,<<"kafka_protocol">>,<<"0.7.3">>})
===> Download error, using cached file at c:/Users/Net/.cache/rebar3/hex/default/packages/kafka_protocol-0.7.3.tar
===> Fetching supervisor3 ({pkg,<<"supervisor3">>,<<"1.1.3">>})
===> Downloaded package, caching at c:/Users/Net/.cache/rebar3/hex/default/packages/supervisor3-1.1.3.tar
===> Fetching snappyer ({pkg,<<"snappyer">>,<<"1.1.3-1.0.4">>})
===> Downloaded package, caching at c:/Users/Net/.cache/rebar3/hex/default/packages/snappyer-1.1.3-1.0.4.tar
===> Compiling supervisor3
**===> Compiling snappyer
MAKE Version 5.2 Copyright (c) 1987, 2000 Borland
Error makefile 15: Command syntax error
Error makefile 18: Command syntax error
Error makefile 20: Command syntax error
* 3 errors during make ***
===> Hook for compile failed!

Add an option to subscribe on a topic

Right now we only have brod:subscribe/5 which allows users to subscribe on specific partition. And user application must also monitor brod_consumer process.

  • User's process should be able to subscribe on a topic in one call
  • User's process should not be required to monitor brod internal processes in order to work reliable

track consumer lag?

Is there way to track the consumer lag? Consumer responses tell you the offset high watermark, so you can subtract the that by the current offset you've seen to determine how far behind you are.

maybe we need flow control of messages ?

i print process_info of one brod_sock process:

([email protected])133> erlang:process_info(list_to_pid("<0.16546.12>")).
[{current_function,{kpro,do_decode_fields,4}},
 {initial_call,{proc_lib,init_p,5}},
 {status,running},
 {message_queue_len,9672},
 {messages,[{tcp,#Port<0.1512>,
                 <<"16686B05A27C0\t93294\t-1.00\t-1.00\t60.00\t19.54\t1135087616.00\t763130112.00"...>>},
            {tcp,#Port<0.1512>,
                 <<"qqgame\tD649ADF73C9E2A681324C0A85FA2DEB8\t93294\t-1.00\t-1.00\t111.53\t2.4"...>>},
            {tcp,#Port<0.1512>,
                 <<"t\tgame_name\tagent_name\tpf_name\tpf\tstep\tmtime\ttime_diff\tstep_name"...>>},
            {tcp,#Port<0.1512>,<<"table&t_log_client_device"...>>},
            {tcp,#Port<0.1512>,
                 <<"or      \t4\t3194\t1000201977856\t505531793408\tAMD Radeon HD"...>>},
            {tcp,#Port<0.1512>,
                 <<3,31,15,0,0,255,255,255,255,0,0,1,140,...>>},
            {tcp,#Port<0.1512>,
                 <<"\tpf\tstep\tmtime\ttime_diff\tstep_name\tcversion\tfirs"...>>},
            {tcp,#Port<0.1512>,
                 <<"r_level\tmtime\tpf\taccount_name\tversion\tmin_fp"...>>},
            {tcp,#Port<0.1512>,
                 <<"81DAC5E4\t93294\t-1.00\t-1.00\t784.31\t63.67\t"...>>},
            {tcp,#Port<0.1512>,
                 <<"_id=0/dt=2016-10-11\ntable\"t_log_clie"...>>},
            {tcp,#Port<0.1512>,
                 <<"mac\tos\tdevice_name\tmemory\tremain"...>>}

the message queue is too long

the default socket options is:

SockOpts = [{active, true}, {packet, raw}, binary, {nodelay, true}],

maybe {active, false} or {active, once} is more suitable ?

Batch producing doesn't work with compression

Having {compression, gzip} and trying to use the new batch producing API causes a badarg:


** Reason for termination == 
** {badarg,[{erlang,size,
                    [[{<<"KEY">>,
                       [<<"VALUE">>,
                        <<"VALUE">>,
                        <<"VALUE">>]}]],
                    []},
            {brod_producer,batch_size,1,
                           [{file,"src/brod_producer.erl"},{line,376}]},
            {brod_producer,'-init/1-fun-0-',3,
                           [{file,"src/brod_producer.erl"},{line,195}]},
            {brod_producer,'-init/1-fun-1-',7,
                           [{file,"src/brod_producer.erl"},{line,204}]},
            {brod_producer_buffer,do_send,3,
                                  [{file,"src/brod_producer_buffer.erl"},
                                   {line,264}]},
            {brod_producer,maybe_produce,1,
                           [{file,"src/brod_producer.erl"},{line,337}]},
            {brod_producer,handle_produce,4,
                           [{file,"src/brod_producer.erl"},{line,317}]},
            {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,599}]}]}.

I will try to fix this this week if you all don't beat me to it.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.