Coder Social home page Coder Social logo

libkafka-asio's Introduction

libkafka-asio

C++ Kafka Client Library using Boost Asio

Build Status Documentation Status

Introduction

libkafka-asio is a C++ header-only library, implementing the Kafka client protocol. All Kafka APIs, including offset commit/fetch, are implemented:

It supports GZIP and Snappy message compression and therefore relies on the zlib and Snappy libraries. Message compression can optionally be turned off.

The library was tested on Linux (GCC 4.9, clang 3.5.1) and Windows (MSVC 10, 11, 12). There is a set of unit tests available under test.

Usage

Add the lib directory to your include paths and:

#include <libkafka_asio/libkafka_asio.h>

Please see the examples directory for some examples on how to use the library.

Also consult the documentation: libkafka-asio Reference

Dependencies

libkafka-asio depends on the Boost C++ libraries -- specially on Boost Asio. The following Boost sub-libraries are explicitly used in the project:

You need to link against boost_thread and boost_system.

So installing the boost library package on your distribution should do the trick (e.g. apt-get install libboost-dev on Ubuntu, or pacman -S boost on Arch).

libkafka-asio's People

Contributors

danieljoos avatar devil0000 avatar jpicht avatar sgso avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

libkafka-asio's Issues

improvement: allow setting a Message `Key` as `AddValue` parameter

It is possible to set the Key for a message on the message and then pass it to AddMessage but there is no way to set it with AddValue.
This way you offer two different APIs to do the same but one of them is not supporting all options/features.
Maybe there should be only one simple to use API
I would prefer the AddValue API. I think the user should not know about the Message class and there are just a few fields to set. And using the message class on the outside means to create and destroy a object just to call it (because you need a internal copy until it is delivered) -> overhead.

Using Message and Message Set like KeyedMessage

On the official kafka page https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example , consider

    Properties props = new Properties();
    props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("partitioner.class", "example.producer.SimplePartitioner");
    props.put("request.required.acks", "1");

    ProducerConfig config = new ProducerConfig(props);

    Producer<String, String> producer = new Producer<String, String>(config);

    for (long nEvents = 0; nEvents < events; nEvents++) { 
           long runtime = new Date().getTime();  
           String ip = “192.168.2.” + rnd.nextInt(255); 
           String msg = runtime + “,www.example.com,” + ip; 
           KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
           producer.send(data);
    }

I tried to emulate this piece of code into its equivalent C++ code. Based on your example code. I have tried:

 Client::Configuration configuration;
 configuration.auto_connect = true;
 configuration.client_id = "libkafka_asio_example";
 configuration.socket_timeout = 10000;
 configuration.AddBrokerFromString("192.168.2.60:9092");
  boost::asio::io_service ios;
  Client client(ios, configuration);

  ProduceRequest request;

  int i(0);
  while(i < 100)
  { 
     std::string msg = "Hello world " + i;
  libkafka_asio::Message message;
  // How to construct the message like KeyedMessage here and send it to Kafka.
  client.AsyncRequest(request, &HandleRequest);
  ios.run();
  delete message;
  ++i;
   }

I am using Windows VS2010.

kind of TopicPartitionBlock with maps

For requests it makes perfect sense to use vectors to store the information because you normally just add data and don't search it.
But for responses it would be useful to have the TopicPartitionBlocks as map structure (because you will most likely search it).
With the functions in detail/functional you build some kind of simple data search and access functions. If the structure would be a map the access would be faster and simple. And we would be able to just use it in the Client or where ever without the need of reorganising data.

Do you agree?

edit:
why are you using the OptionalType there?

typedef boost::optional<Partition> OptionalType;

sub classing the struct should be enough right?

struct Partition :
    public TPartitionProperties
  {
  }

edit2:
may not be true for all response types but for meta data request I think it is

CXX_STANDARD requires CMake 3.1+

The CXX_STANDARD property is set if C++11 example building is enabled, but that first appeared in CMake 3.1 and this project only require CMake 2.6.

commit history correction

If you want to correct the history we should do it better now instead of later.
I have tested the following script on my system:

#!/bin/sh

git filter-branch --env-filter '

OLD_EMAIL="root@75dd17ad782e.(none)"
OLD_EMAIL2="[email protected]"
CORRECT_NAME="DEvil0000"
CORRECT_EMAIL="[email protected]"

if [ "$GIT_COMMITTER_EMAIL" = "$OLD_EMAIL" ] || [ "$GIT_COMMITTER_EMAIL" = "$OLD_EMAIL2" ]
then
    export GIT_COMMITTER_NAME="$CORRECT_NAME"
    export GIT_COMMITTER_EMAIL="$CORRECT_EMAIL"
fi
if [ "$GIT_AUTHOR_EMAIL" = "$OLD_EMAIL" ] || [ "$GIT_AUTHOR_EMAIL" = "$OLD_EMAIL2" ]
then
    export GIT_AUTHOR_NAME="$CORRECT_NAME"
    export GIT_AUTHOR_EMAIL="$CORRECT_EMAIL"
fi
' --tag-name-filter cat -- --branches --tags 

after executing this script all your branches have corrected history and you can push --force them to github.

compiler errors - help!

/usr/local/include/libkafka_asio/detail/connection_service.h: In member function ‘void libkafka_asio::detail::BasicConnectionService::construct(libkafka_asio::detail::BasicConnectionService::implementation_type&)’:
[build] /usr/local/include/libkafka_asio/detail/connection_service.h:52:28: error: there are no arguments to ‘get_io_service’ that depend on a template parameter, so a declaration of ‘get_io_service’ must be available [-fpermissive]
[build] 52 | impl.reset(new Service(get_io_service()));
[build] | ^~~~~~~~~~~~~~
[build] /usr/local/include/libkafka_asio/detail/connection_service.h:52:28: note: (if you use ‘-fpermissive’, G++ will accept your code, but allowing the use of an undeclared name is deprecated)
[build] In file included from /usr/local/include/boost/detail/endian.hpp:9,
[build] from /usr/local/include/libkafka_asio/detail/endian.h:15,
[build] from /usr/local/include/libkafka_asio/detail/impl/request_write.h:13,
[build] from /usr/local/include/libkafka_asio/detail/request_write.h:104,
[build] from /usr/local/include/libkafka_asio/detail/impl/connection_service.h:16,
[build] from /usr/local/include/libkafka_asio/detail/connection_service.h:250,
[build] from /usr/local/include/libkafka_asio/connection.h:14,
[build] from /usr/local/include/libkafka_asio/libkafka_asio.h:17,

[build] /usr/local/include/boost/predef/detail/endian_compat.h: At global scope:
[build] /usr/local/include/boost/predef/detail/endian_compat.h:11:161: note: #pragma message: The use of BOOST_ENDIAN and BOOST_BYTE_ORDER is deprecated. Please include <boost/predef/other/endian.h> and use BOOST_ENDIANBYTE instead
[build] 11 | #pragma message("The use of BOOST
ENDIAN and BOOST_BYTE_ORDER is deprecated. Please include <boost/predef/other/endian.h> and use BOOST_ENDIAN_BYTE instead")
[build] |

it' will not reload meta?

we found that it continually reports error topic not found. So it's not automatically reload meta data?

try to build examples in win10 mingw

i try to build libkafka-asio on:

win10-64bit / codeblocks / cmake-win / mingw from: https://nuwen.net/mingw.html
after entering C:\libkafka-asio-master\lib in the path.
and in the project link added: boost_thread and boost_system.

from cmake-win:
C:\libkafka-asio-master\examples\cpp11
C:\libkafka-asio-master\examples\cpp11\build

The C compiler identification is GNU 8.1.0
The CXX compiler identification is GNU 8.1.0
Check for working C compiler: C:/MinGW/bin/gcc.exe
Check for working C compiler: C:/MinGW/bin/gcc.exe -- works
Detecting C compiler ABI info
Detecting C compiler ABI info - done
Detecting C compile features
Detecting C compile features - done
Check for working CXX compiler: C:/MinGW/bin/g++.exe
Check for working CXX compiler: C:/MinGW/bin/g++.exe -- works
Detecting CXX compiler ABI info
Detecting CXX compiler ABI info - done
Detecting CXX compile features
Detecting CXX compile features - done
CMake Warning (dev) at CMakeLists.txt:6 (find_package):
Policy CMP0074 is not set: find_package uses PackageName_ROOT variables.
Run "cmake --help-policy CMP0074" for policy details. Use the cmake_policy
command to set the policy and suppress this warning.

Environment variable Boost_ROOT is set to:

C:\MinGW\lib

For compatibility, CMake is ignoring the variable.
This warning is for project developers. Use -Wno-dev to suppress it.

Looking for pthread.h
Looking for pthread.h - found
Looking for pthread_create
Looking for pthread_create - found
Found Threads: TRUE
Boost version: 1.67.0
Found the following Boost libraries:
thread
system
chrono
date_time
atomic
Configuring done
Generating done

from inside codeblocks linker:

C:\MinGW\lib\libboost_thread.a
C:\MinGW\lib\libboost_system.a

-------------- Build: all in libkafka_asio_examples_cxx11 (compiler: GNU GCC Compiler)---------------

Checking if target is up-to-date: mingw32-make.exe -q -f Makefile all
Running command: C:/MinGW/bin/mingw32-make.exe -f "C:/libkafka-asio-master/examples/cpp11/build/Makefile" VERBOSE=1 all
"C:\Program Files\CMake\bin\cmake.exe" -HC:\libkafka-asio-master\examples\cpp11 -BC:\libkafka-asio-master\examples\cpp11\build --check-build-system CMakeFiles\Makefile.cmake 0
"C:\Program Files\CMake\bin\cmake.exe" -E cmake_progress_start C:\libkafka-asio-master\examples\cpp11\build\CMakeFiles C:\libkafka-asio-master\examples\cpp11\build\CMakeFiles\progress.marks
C:/MinGW/bin/mingw32-make.exe -f CMakeFiles\Makefile2 all
mingw32-make.exe[1]: Entering directory 'C:/libkafka-asio-master/examples/cpp11/build'
C:/MinGW/bin/mingw32-make.exe -f CMakeFiles\fetch_cxx11.dir\build.make CMakeFiles/fetch_cxx11.dir/depend
mingw32-make.exe[2]: Entering directory 'C:/libkafka-asio-master/examples/cpp11/build'
"C:\Program Files\CMake\bin\cmake.exe" -E cmake_depends "MinGW Makefiles" C:\libkafka-asio-master\examples\cpp11 C:\libkafka-asio-master\examples\cpp11 C:\libkafka-asio-master\examples\cpp11\build C:\libkafka-asio-master\examples\cpp11\build C:\libkafka-asio-master\examples\cpp11\build\CMakeFiles\fetch_cxx11.dir\DependInfo.cmake --color=
Dependee "C:\libkafka-asio-master\examples\cpp11\build\CMakeFiles\fetch_cxx11.dir\DependInfo.cmake" is newer than depender "C:/libkafka-asio-master/examples/cpp11/build/CMakeFiles/fetch_cxx11.dir/depend.internal".
Dependee "C:/libkafka-asio-master/examples/cpp11/build/CMakeFiles/CMakeDirectoryInformation.cmake" is newer than depender "C:/libkafka-asio-master/examples/cpp11/build/CMakeFiles/fetch_cxx11.dir/depend.internal".
Scanning dependencies of target fetch_cxx11
mingw32-make.exe[2]: Leaving directory 'C:/libkafka-asio-master/examples/cpp11/build'
C:/MinGW/bin/mingw32-make.exe -f CMakeFiles\fetch_cxx11.dir\build.make CMakeFiles/fetch_cxx11.dir/build
mingw32-make.exe[2]: Entering directory 'C:/libkafka-asio-master/examples/cpp11/build'
[ 10%] Building CXX object CMakeFiles/fetch_cxx11.dir/fetch.cpp.obj
C:\MinGW\bin\g++.exe -DLIBKAFKAASIO_NO_COMPRESSION @CMakeFiles/fetch_cxx11.dir/includes_CXX.rsp -std=gnu++11 -o CMakeFiles\fetch_cxx11.dir\fetch.cpp.obj -c C:\libkafka-asio-master\examples\cpp11\fetch.cpp
[ 20%] Linking CXX executable fetch_cxx11.exe
"C:\Program Files\CMake\bin\cmake.exe" -E cmake_link_script CMakeFiles\fetch_cxx11.dir\link.txt --verbose=1
"C:\Program Files\CMake\bin\cmake.exe" -E remove -f CMakeFiles\fetch_cxx11.dir/objects.a
C:\MinGW\bin\ar.exe cr CMakeFiles\fetch_cxx11.dir/objects.a @CMakeFiles\fetch_cxx11.dir\objects1.rsp
C:\MinGW\bin\g++.exe -Wl,--whole-archive CMakeFiles\fetch_cxx11.dir/objects.a -Wl,--no-whole-archive -o fetch_cxx11.exe -Wl,--out-implib,libfetch_cxx11.dll.a -Wl,--major-image-version,0,--minor-image-version,0 @CMakeFiles\fetch_cxx11.dir\linklibs.rsp
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail17winsock_init_base7startupERNS2_4dataEhh[_ZN5boost4asio6detail17winsock_init_base7startupERNS2_4dataEhh]+0x73): undefined reference to __imp_WSAStartup' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail17winsock_init_base7cleanupERNS2_4dataE[_ZN5boost4asio6detail17winsock_init_base7cleanupERNS2_4dataE]+0x35): undefined reference to __imp_WSACleanup'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZNK5boost4asio6detail18win_fd_set_adapter6is_setEy[_ZNK5boost4asio6detail18win_fd_set_adapter6is_setEy]+0x1f): undefined reference to __WSAFDIsSet' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops16clear_last_errorEv[_ZN5boost4asio6detail10socket_ops16clear_last_errorEv]+0x10): undefined reference to __imp_WSASetLastError'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops5closeEyRhbRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops5closeEyRhbRNS_6system10error_codeE]+0x9d): undefined reference to __imp_closesocket' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops5closeEyRhbRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops5closeEyRhbRNS_6system10error_codeE]+0x134): undefined reference to __imp_ioctlsocket'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops5closeEyRhbRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops5closeEyRhbRNS_6system10error_codeE]+0x157): undefined reference to __imp_closesocket' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops25set_internal_non_blockingEyRhbRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops25set_internal_non_blockingEyRhbRNS_6system10error_codeE]+0x98): undefined reference to __imp_ioctlsocket'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops8shutdownEyiRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops8shutdownEyiRNS_6system10error_codeE]+0x43): undefined reference to __imp_shutdown' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops20non_blocking_connectEyRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops20non_blocking_connectEyRNS_6system10error_codeE]+0x141): undefined reference to __imp_select'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops6listenEyiRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops6listenEyiRNS_6system10error_codeE]+0x43): undefined reference to __imp_listen' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops4recvEyP7_WSABUFyiRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops4recvEyP7_WSABUFyiRNS_6system10error_codeE]+0x62): undefined reference to __imp_WSARecv'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops4sendEyPK7_WSABUFyiRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops4sendEyPK7_WSABUFyiRNS_6system10error_codeE]+0x63): undefined reference to __imp_WSASend' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops6socketEiiiRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops6socketEiiiRNS_6system10error_codeE]+0x42): undefined reference to __imp_WSASocketW'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops6socketEiiiRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops6socketEiiiRNS_6system10error_codeE]+0x99): undefined reference to __imp_setsockopt' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops5ioctlEyRhiPmRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops5ioctlEyRhiPmRNS_6system10error_codeE]+0x52): undefined reference to __imp_ioctlsocket'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops6selectEiP6fd_setS4_S4_P7timevalRNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops6selectEiP6fd_setS4_S4_P7timevalRNS_6system10error_codeE]+0x10c): undefined reference to __imp_select' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops24translate_addrinfo_errorEi[_ZN5boost4asio6detail10socket_ops24translate_addrinfo_errorEi]+0x183): undefined reference to __imp_WSAGetLastError'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops11getaddrinfoEPKcS4_RK8addrinfoPPS5_RNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops11getaddrinfoEPKcS4_RK8addrinfoPPS5_RNS_6system10error_codeE]+0x7f): undefined reference to __imp_getaddrinfo' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops12freeaddrinfoEP8addrinfo[_ZN5boost4asio6detail10socket_ops12freeaddrinfoEP8addrinfo]+0x13): undefined reference to __imp_freeaddrinfo'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops20host_to_network_longEm[_ZN5boost4asio6detail10socket_ops20host_to_network_longEm]+0x11): undefined reference to __imp_htonl' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail28win_iocp_socket_service_base13start_send_opERNS2_24base_implementation_typeEP7_WSABUFyibPNS1_18win_iocp_operationE[_ZN5boost4asio6detail28win_iocp_socket_service_base13start_send_opERNS2_24base_implementation_typeEP7_WSABUFyibPNS1_18win_iocp_operationE]+0xe9): undefined reference to __imp_WSASend'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail28win_iocp_socket_service_base13start_send_opERNS2_24base_implementation_typeEP7_WSABUFyibPNS1_18win_iocp_operationE[_ZN5boost4asio6detail28win_iocp_socket_service_base13start_send_opERNS2_24base_implementation_typeEP7_WSABUFyibPNS1_18win_iocp_operationE]+0xf5): undefined reference to __imp_WSAGetLastError' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail28win_iocp_socket_service_base16start_receive_opERNS2_24base_implementation_typeEP7_WSABUFyibPNS1_18win_iocp_operationE[_ZN5boost4asio6detail28win_iocp_socket_service_base16start_receive_opERNS2_24base_implementation_typeEP7_WSABUFyibPNS1_18win_iocp_operationE]+0xec): undefined reference to __imp_WSARecv'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail28win_iocp_socket_service_base16start_receive_opERNS2_24base_implementation_typeEP7_WSABUFyibPNS1_18win_iocp_operationE[_ZN5boost4asio6detail28win_iocp_socket_service_base16start_receive_opERNS2_24base_implementation_typeEP7_WSABUFyibPNS1_18win_iocp_operationE]+0xf8): undefined reference to __imp_WSAGetLastError' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail28win_iocp_socket_service_base16start_connect_opERNS2_24base_implementation_typeEiiPK8sockaddryPNS1_31win_iocp_socket_connect_op_baseE[_ZN5boost4asio6detail28win_iocp_socket_service_base16start_connect_opERNS2_24base_implementation_typeEiiPK8sockaddryPNS1_31win_iocp_socket_connect_op_baseE]+0x172): undefined reference to __imp_WSAGetLastError'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail28win_iocp_socket_service_base14get_connect_exERNS2_24base_implementation_typeEi[_ZN5boost4asio6detail28win_iocp_socket_service_base14get_connect_exERNS2_24base_implementation_typeEi]+0xdd): undefined reference to __imp_WSAIoctl' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN13libkafka_asio6detail13be_to_host_64Ex[_ZN13libkafka_asio6detail13be_to_host_64Ex]+0x23): undefined reference to __imp_htonl'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN13libkafka_asio6detail13be_to_host_64Ex[_ZN13libkafka_asio6detail13be_to_host_64Ex]+0x3f): undefined reference to __imp_htonl' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN13libkafka_asio6detail10WriteInt16EsRSo[_ZN13libkafka_asio6detail10WriteInt16EsRSo]+0x1e): undefined reference to __imp_htons'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN13libkafka_asio6detail10WriteInt32EiRSo[_ZN13libkafka_asio6detail10WriteInt32EiRSo]+0x17): undefined reference to __imp_htonl' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN13libkafka_asio6detail9ReadInt16ERSi[_ZN13libkafka_asio6detail9ReadInt16ERSi]+0x34): undefined reference to __imp_ntohs'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN13libkafka_asio6detail9ReadInt32ERSi[_ZN13libkafka_asio6detail9ReadInt32ERSi]+0x31): undefined reference to __imp_ntohl' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops11call_acceptIiEEyMNS2_6msghdrET_yP8sockaddrPy[_ZN5boost4asio6detail10socket_ops11call_acceptIiEEyMNS2_6msghdrET_yP8sockaddrPy]+0x50): undefined reference to __imp_accept'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops13error_wrapperIyEET_S4_RNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops13error_wrapperIyEET_S4_RNS_6system10error_codeE]+0x21): undefined reference to __imp_WSAGetLastError' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops9call_bindIiEEiMNS2_6msghdrET_yPK8sockaddry[_ZN5boost4asio6detail10socket_ops9call_bindIiEEiMNS2_6msghdrET_yPK8sockaddry]+0x2f): undefined reference to __imp_bind'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops13error_wrapperIiEET_S4_RNS_6system10error_codeE[_ZN5boost4asio6detail10socket_ops13error_wrapperIiEET_S4_RNS_6system10error_codeE]+0x20): undefined reference to __imp_WSAGetLastError' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops12call_connectIiEEiMNS2_6msghdrET_yPK8sockaddry[_ZN5boost4asio6detail10socket_ops12call_connectIiEEiMNS2_6msghdrET_yPK8sockaddry]+0x2f): undefined reference to __imp_connect'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops15call_setsockoptIiEEiMNS2_6msghdrET_yiiPKvy[_ZN5boost4asio6detail10socket_ops15call_setsockoptIiEEiMNS2_6msghdrET_yiiPKvy]+0x3e): undefined reference to __imp_setsockopt' CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops15call_getsockoptIiEEiMNS2_6msghdrET_yiiPvPy[_ZN5boost4asio6detail10socket_ops15call_getsockoptIiEEiMNS2_6msghdrET_yiiPvPy]+0x3d): undefined reference to __imp_getsockopt'
CMakeFiles\fetch_cxx11.dir/objects.a(fetch.cpp.obj):fetch.cpp:(.text$_ZN5boost4asio6detail10socket_ops16call_getsocknameIiEEiMNS2_6msghdrET_yP8sockaddrPy[_ZN5boost4asio6detail10socket_ops16call_getsocknameIiEEiMNS2_6msghdrET_yP8sockaddrPy]+0x37): undefined reference to `__imp_getsockname'
collect2.exe: error: ld returned 1 exit status
mingw32-make.exe[2]: *** [CMakeFiles\fetch_cxx11.dir\build.make:91: fetch_cxx11.exe] Error 1
mingw32-make.exe[1]: *** [CMakeFiles\Makefile2:72: CMakeFiles/fetch_cxx11.dir/all] Error 2
mingw32-make.exe: *** [C:/libkafka-asio-master/examples/cpp11/build/Makefile:83: all] Error 2
mingw32-make.exe[2]: Leaving directory 'C:/libkafka-asio-master/examples/cpp11/build'
mingw32-make.exe[1]: Leaving directory 'C:/libkafka-asio-master/examples/cpp11/build'
Process terminated with status 2 (0 minute(s), 41 second(s))
45 error(s), 0 warning(s) (0 minute(s), 41 second(s))

Extract ip/name resolver to a service

I think it would make sense to extract the resolver code to have some kind of resolver/lookup service.
At the moment it is used to establish the connection but it could also be useful to map a host:port string to a Connection or Broker UID.
And in this case we need to resolve the thing in the Client and not the Connection.

Do you see a problem with this?

connect failed

when the socket connect failed, have no api to help to reconnect after an duration times.
if we connect after handlequest failed, this will generate many socket time_wait.

deadline handling in `ConnectionService`

If we hit a deadline for one of the operations we close the service no matter what deadline was hit.
I think the different deadlines should get handled different.
connect_deadline -> retry connecting... (at least have the option)
write_deadline -> retry sending... (at least have the option)
read_deadline -> retry sending if the order is still fine... (at least have the option)

We need to be careful on the read cases. At least for some (produce/fetch) requests the order is important. Maybe this type of requests should get handled different then other requests like metadata ones.

Or do you plan to handle retries on a different level?

BTW:
any planes for the implementation of the whole client/producer/consumer logic?

What if I want to use client to connect multiple brokers?

Hi, guys:

You guys really did a great work, really appreciate your time and talent of contributing to the open source world. I recently play with the client and found there is no way of using multiple brokers for producer. What if the broker service is unavailable and need to be OOO? I am thinking of adding this support to the code. Let me know if I am wrong, there is actually a way of using multiple brokers.

Thanks,
Zhongqi

[Client] sending requests pretty frequent results in error "Another operation is still in progress"

I was trying to send MessageSets with 1000 Messages at a high rate. This is why I was running into this.
It seams the Client return with a error in the callback because it is in kStateWriting. Callback triggered in AsyncRequest.
I would not expect a callback for this case - I would expect a return value indicating this.
Or more correct I would expect a Client where I can send more than one request at a time (or queue them?). I don't like to care about Client states and being busy.

Such a MessageSet takes about 10 to 20 ms until I can use the Client again. Compared to the faked rdkafak performance this is pretty slow.

I guess it is somehow Network/Socket/Connection/Buffer related.
Should it open more Connections to the Broker?

try to build libkafka: codeblocks/tdm-gcc/windows10 but boost_root - cmake problem

try to build libkafka with codeblocks /cmake/tdm-gcc/windows10, but cmake cannot find: boost_root
even if it is declared %boost_root% and in the path.
in cmake i defined location of: boost.

CMake Error at C:/Program Files/CMake/share/cmake-3.11/Modules/FindBoost.cmake:2044 (message):
Unable to find the requested Boost libraries.

Boost version: 1.67.0

Boost include path: C:/boost_1_67_0_GCC

Could not find the following Boost libraries:

      boost_thread
      boost_system

Some (but not all) of the required Boost libraries were found. You may
need to install these additional Boost libraries. Alternatively, set
BOOST_LIBRARYDIR to the directory containing Boost libraries or BOOST_ROOT
to the location of Boost.
Call Stack (most recent call first):
examples/cpp03/CMakeLists.txt:6 (find_package)

CMake Error at C:/Program Files/CMake/share/cmake-3.11/Modules/FindBoost.cmake:2044 (message):
Unable to find the requested Boost libraries.

Boost version: 1.67.0

Boost include path: C:/boost_1_67_0_GCC

Could not find the following Boost libraries:

      boost_thread
      boost_system

Some (but not all) of the required Boost libraries were found. You may
need to install these additional Boost libraries. Alternatively, set
BOOST_LIBRARYDIR to the directory containing Boost libraries or BOOST_ROOT
to the location of Boost.
Call Stack (most recent call first):
examples/cpp11/CMakeLists.txt:6 (find_package)

CMake Error at C:/Program Files/CMake/share/cmake-3.11/Modules/FindPackageHandleStandardArgs.cmake:137 (message):
Could NOT find GTest (missing: GTEST_LIBRARY GTEST_INCLUDE_DIR
GTEST_MAIN_LIBRARY)
Call Stack (most recent call first):
C:/Program Files/CMake/share/cmake-3.11/Modules/FindPackageHandleStandardArgs.cmake:378 (_FPHSA_FAILURE_MESSAGE)
C:/Program Files/CMake/share/cmake-3.11/Modules/FindGTest.cmake:196 (FIND_PACKAGE_HANDLE_STANDARD_ARGS)
test/CMakeLists.txt:6 (find_package)

Configuring incomplete, errors occurred!
See also "C:/libkafka-asio/build/CMakeFiles/CMakeOutput.log".

Support fetch messages according to offsets not bytes

I'm using libkafka-asio in my project. When using FetchRequest::FetchTopic
void FetchTopic(const String& topic_name, Int32 partition, Int64 fetch_offset, Int32 max_bytes)
I can only fetch messages whose size is less than max_bytes, the problem is sometimes I can not get the complete message.

For example, there are three messages like following:
200
300
400
when I set max_bytes=8, fetching result will be:
200
300
40
the third message will be incomplete, so I'm thinking can this Fetch API support fetch messages according to offsets such like FetchTopic(topic_name, partition, fetch_offset, max_offsets), where max_offsets means the max messages can be fetched once?

produce succeeds without producing (create topic related)

precondition: topic "mytopic" not existing on the broker
Produce TEST:
run produce test..
result: success but the topic is still not existing and no message was produced..

Running MetadataRequest for "mytopic" before produce:
first MetadataRequest creates the topic.
after that the topic is created and we can produce to it and request metadata.

Fails to compile on Ubuntu vivid

ubuntu@localhost:~/libkafka-asio$ make
[  3%] Building CXX object examples/CMakeFiles/fetch_cxx03.dir/fetch_cxx03.cpp.o
Linking CXX executable fetch_cxx03
[  3%] Built target fetch_cxx03
[  6%] Building CXX object examples/CMakeFiles/fetch_cxx11.dir/fetch_cxx11.cpp.o
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp: In function ‘int main(int, char**)’:
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:43:8: error: ‘BytesToString’ does not name a type
   auto BytesToString = [](const libkafka_asio::Bytes& bytes) -> std::string
        ^
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp: In lambda function:
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:73:49: error: ‘BytesToString’ was not declared in this scope
       std::cout << BytesToString(message.value()) << std::endl;
                                                 ^
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp: In lambda function:
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:74:5: warning: lambda expressions only available with -std=c++11 or -std=gnu++11
     });
     ^
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:74:6: error: no matching function for call to ‘for_each(libkafka_asio::FetchResponse::const_iterator, libkafka_asio::FetchResponse::const_ite
rator, main(int, char**)::<lambda(const ErrorCodeType&, const OptionalType&)>::<lambda(const libkafka_asio::MessageAndOffset&)>)’
     });
      ^
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:74:6: note: candidate is:
In file included from /usr/include/c++/4.9/algorithm:62:0,
                 from /usr/include/boost/smart_ptr/shared_ptr.hpp:42,
                 from /usr/include/boost/shared_ptr.hpp:17,
                 from /usr/include/boost/asio/detail/shared_ptr.hpp:23,
                 from /usr/include/boost/asio/detail/socket_ops.hpp:21,
                 from /usr/include/boost/asio/detail/socket_holder.hpp:20,
                 from /usr/include/boost/asio/detail/reactive_socket_accept_op.hpp:24,
                 from /usr/include/boost/asio/detail/reactive_socket_service.hpp:30,
                 from /usr/include/boost/asio/datagram_socket_service.hpp:30,
                 from /usr/include/boost/asio/basic_datagram_socket.hpp:21,
                 from /usr/include/boost/asio.hpp:21,
                 from /home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:18:
/usr/include/c++/4.9/bits/stl_algo.h:3750:5: note: template<class _IIter, class _Funct> _Funct std::for_each(_IIter, _IIter, _Funct)
     for_each(_InputIterator __first, _InputIterator __last, _Function __f)
     ^
/usr/include/c++/4.9/bits/stl_algo.h:3750:5: note:   template argument deduction/substitution failed:
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp: In substitution of ‘template<class _IIter, class _Funct> _Funct std::for_each(_IIter, _IIter, _Funct) [with _IIter = libkafka_asio::detail::
FetchResponseIterator<std::map<std::basic_string<char>, libkafka_asio::detail::TopicPartitionMap<libkafka_asio::FetchResponse::TopicPartitionProperties>, std::less<std::basic_string<char> >, st
d::allocator<std::pair<const std::basic_string<char>, libkafka_asio::detail::TopicPartitionMap<libkafka_asio::FetchResponse::TopicPartitionProperties> > > > >; _Funct = main(int, char**)::<lamb
da(const ErrorCodeType&, const OptionalType&)>::<lambda(const libkafka_asio::MessageAndOffset&)>]’:
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:74:6:   required from here
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:74:6: error: template argument for ‘template<class _IIter, class _Funct> _Funct std::for_each(_IIter, _IIter, _Funct)’ uses local type ‘main(
int, char**)::<lambda(const ErrorCodeType&, const OptionalType&)>::<lambda(const libkafka_asio::MessageAndOffset&)>’
     });
      ^
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:74:6: error:   trying to instantiate ‘template<class _IIter, class _Funct> _Funct std::for_each(_IIter, _IIter, _Funct)’
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp: In function ‘int main(int, char**)’:
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:75:3: warning: lambda expressions only available with -std=c++11 or -std=gnu++11
   });
   ^
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:75:4: error: no matching function for call to ‘libkafka_asio::detail::BasicConnection<libkafka_asio::detail::BasicConnectionService<libkafka_
asio::detail::ConnectionServiceImpl> >::AsyncRequest(libkafka_asio::FetchRequest&, main(int, char**)::<lambda(const ErrorCodeType&, const OptionalType&)>)’
   });
    ^
/home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:75:4: note: candidate is:
In file included from /home/ubuntu/libkafka-asio/examples/../lib/libkafka_asio/connection.h:13:0,
                 from /home/ubuntu/libkafka-asio/examples/../lib/libkafka_asio/libkafka_asio.h:17,
                 from /home/ubuntu/libkafka-asio/examples/fetch_cxx11.cpp:19:
/home/ubuntu/libkafka-asio/examples/../lib/libkafka_asio/detail/basic_connection.h:146:8: note: void libkafka_asio::detail::BasicConnection<Service>::AsyncRequest(const TRequest&, const typename libkafka_asio::detail::BasicConnection<Service>::Handler<TRequest>::Type&) [with TRequest = libkafka_asio::FetchRequest; Service = libkafka_asio::detail::BasicConnectionService<libkafka_asio::detail::ConnectionServiceImpl>; typename libkafka_asio::detail::BasicConnection<Service>::Handler<TRequest>::Type = boost::function<void(const boost::system::error_code&, const boost::optional<libkafka_asio::FetchResponse>&)>]
   void AsyncRequest(const TRequest& request,
        ^
/home/ubuntu/libkafka-asio/examples/../lib/libkafka_asio/detail/basic_connection.h:146:8: note:   no known conversion for argument 2 from ‘main(int, char**)::<lambda(const ErrorCodeType&, const OptionalType&)>’ to ‘const Type& {aka const boost::function<void(const boost::system::error_code&, const boost::optional<libkafka_asio::FetchResponse>&)>&}’
examples/CMakeFiles/fetch_cxx11.dir/build.make:54: recipe for target 'examples/CMakeFiles/fetch_cxx11.dir/fetch_cxx11.cpp.o' failed
make[2]: *** [examples/CMakeFiles/fetch_cxx11.dir/fetch_cxx11.cpp.o] Error 1
CMakeFiles/Makefile2:128: recipe for target 'examples/CMakeFiles/fetch_cxx11.dir/all' failed
make[1]: *** [examples/CMakeFiles/fetch_cxx11.dir/all] Error 2
Makefile:86: recipe for target 'all' failed
make: *** [all] Error 2
ubuntu@localhost:~/libkafka-asio$ gcc -v
Using built-in specs.
COLLECT_GCC=gcc
COLLECT_LTO_WRAPPER=/usr/lib/gcc/x86_64-linux-gnu/4.9/lto-wrapper
Target: x86_64-linux-gnu
Configured with: ../src/configure -v --with-pkgversion='Ubuntu 4.9.2-10ubuntu13' --with-bugurl=file:///usr/share/doc/gcc-4.9/README.Bugs --enable-languages=c,c++,java,go,d,fortran,objc,obj-c++ --prefix=/usr --program-suffix=-4.9 --enable-shared --enable-linker-build-id --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --with-gxx-include-dir=/usr/include/c++/4.9 --libdir=/usr/lib --enable-nls --with-sysroot=/ --enable-clocale=gnu --enable-libstdcxx-debug --enable-libstdcxx-time=yes --enable-gnu-unique-object --disable-vtable-verify --enable-plugin --with-system-zlib --disable-browser-plugin --enable-java-awt=gtk --enable-gtk-cairo --with-java-home=/usr/lib/jvm/java-1.5.0-gcj-4.9-amd64/jre --enable-java-home --with-jvm-root-dir=/usr/lib/jvm/java-1.5.0-gcj-4.9-amd64 --with-jvm-jar-dir=/usr/lib/jvm-exports/java-1.5.0-gcj-4.9-amd64 --with-arch-directory=amd64 --with-ecj-jar=/usr/share/java/eclipse-ecj.jar --enable-objc-gc --enable-multiarch --disable-werror --with-arch-32=i686 --with-abi=m64 --with-multilib-list=m32,m64,mx32 --enable-multilib --with-tune=generic --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu
Thread model: posix
gcc version 4.9.2 (Ubuntu 4.9.2-10ubuntu13)
ubuntu@localhost:~/libkafka-asio$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 15.04
Release:        15.04
Codename:       vivid

"Topic or partition does not exist on this broker" error

Hi - We've been testing libkafka-asio for a while and it has been great. However, we run into this error once in a while and haven't been able to figure out what is causing it:

Topic or partition does not exist on this broker

The topic in question clearly exists on the broker, and sometimes restarting the Kafka service resolves the issue. Has anyone else run into this problem? I'm not sure if it is a client or server issue.

Thanks!
Mike

The callbacks in Client do not support members

The Client callbacks do not support member functions as a target.
They only support plain functions.

Expected API usage example:

client.AsyncRequest(metaRequest, &MyClass::HandleMetaRequest, this);

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.