Coder Social home page Coder Social logo

serpentian / kafka Goto Github PK

View Code? Open in Web Editor NEW

This project forked from tarantool/kafka

0.0 0.0 0.0 214 KB

Full featured high performance kafka library for Tarantool based on librdkafka.

License: Apache License 2.0

Python 11.79% C 59.26% Lua 19.71% Makefile 5.00% CMake 3.94% Dockerfile 0.29%

kafka's Introduction

Tarantool kafka

Full featured high performance kafka library for Tarantool based on librdkafka.

Can produce more then 150k messages per second and consume more then 140k messages per second.

Features

  • Kafka producer and consumer implementations.
  • Fiber friendly.
  • Mostly errorless functions and methods. Error handling in Tarantool ecosystem is quite a mess, some libraries throws lua native error while others throws box.error instead. kafka returns non critical errors as strings which allows you to decide how to handle it.

Requirements

  • Tarantool >= 1.10.2
  • Tarantool development headers
  • librdkafka >= 0.11.5
  • librdkafka development headers
  • openssl-libs
  • openssl development headers
  • make
  • cmake
  • gcc

Installation

    tarantoolctl rocks install kafka

Build module with statically linked librdkafka

To install kafka module with builtin librdkafka dependency, use option STATIC_BUILD:

tarantoolctl rocks STATIC_BUILD=ON install kafka

Be aware, that this approach doesn't include static openssl. Instead, it assumes tarantool has openssl symbols exported. That means, kafka static build is only usable with static tarantool build.

For successful static build you need to compile kafka against the same version of openssl that tarantool does.

Usage

Consumer

local os = require('os')
local log = require('log')
local tnt_kafka = require('kafka')

local consumer, err = tnt_kafka.Consumer.create({ brokers = "localhost:9092" })
if err ~= nil then
    print(err)
    os.exit(1)
end

local err = consumer:subscribe({ "some_topic" })
if err ~= nil then
    print(err)
    os.exit(1)
end

local out, err = consumer:output()
if err ~= nil then
    print(string.format("got fatal error '%s'", err))
    os.exit(1)
end

while true do
    if out:is_closed() then
        os.exit(1)
    end

    local msg = out:get()
    if msg ~= nil then
        print(string.format(
            "got msg with topic='%s' partition='%s' offset='%s' key='%s' value='%s'",
            msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value()
        ))
    end
end

-- from another fiber on app shutdown
consumer:close()

Producer

local os = require('os')
local log = require('log')
local tnt_kafka = require('kafka')

local producer, err = tnt_kafka.Producer.create({ brokers = "kafka:9092" })
if err ~= nil then
    print(err)
    os.exit(1)
end

for i = 1, 1000 do
    local message = "test_value " .. tostring(i)
    local err = producer:produce({
        topic = "test_topic",
        key = "test_key",
        value =  message
    })
    if err ~= nil then
        print(string.format("got error '%s' while sending value '%s'", err, message))
    else
        print(string.format("successfully sent value '%s'", message))
    end
end

producer:close()

You can pass additional configuration parameters for librdkafka https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md in special table options on client creation:

tnt_kafka.Producer.create({
    options = {
        ["some.key"] = "some_value",
    },
})

tnt_kafka.Consumer.create({
    options = {
        ["some.key"] = "some_value",
    },
})

More examples in examples folder.

Using SSL

Connection to brokers using SSL supported by librdkafka itself so you only need to properly configure brokers by using this guide https://github.com/edenhill/librdkafka/wiki/Using-SSL-with-librdkafka

After that you only need to pass following configuration parameters on client creation:

tnt_kafka.Producer.create({
    brokers = "broker_list",
    options = {
        ["security.protocol"] = "ssl",
        -- CA certificate file for verifying the broker's certificate.
        ["ssl.ca.location"] = "ca-cert",
        -- Client's certificate
        ["ssl.certificate.location"] = "client_?????_client.pem",
        -- Client's key
        ["ssl.key.location"] = "client_?????_client.key",
        -- Key password, if any
        ["ssl.key.password"] = "abcdefgh",
    },
})

tnt_kafka.Consumer.create({
    brokers = "broker_list",
    options = {
        ["security.protocol"] = "ssl",
        -- CA certificate file for verifying the broker's certificate.
        ["ssl.ca.location"] = "ca-cert",
        -- Client's certificate
        ["ssl.certificate.location"] = "client_?????_client.pem",
        -- Client's key
        ["ssl.key.location"] = "client_?????_client.key",
        -- Key password, if any
        ["ssl.key.password"] = "abcdefgh",
    },
})

Known issues

TODO

  • Ordered storage for offsets to prevent commits unprocessed messages
  • More examples
  • Better documentation

Benchmarks

Before any commands init and updated git submodule

    git submodule init
    git submodule update

Producer

Async

Result: over 160000 produced messages per second on macbook pro 2016

Local run in docker:

    make docker-run-environment
    make docker-create-benchmark-async-producer-topic
    make docker-run-benchmark-async-producer-interactive

Sync

Result: over 90000 produced messages per second on macbook pro 2016

Local run in docker:

    make docker-run-environment
    make docker-create-benchmark-sync-producer-topic
    make docker-run-benchmark-sync-producer-interactive

Consumer

Auto offset store enabled

Result: over 190000 consumed messages per second on macbook pro 2016

Local run in docker:

    make docker-run-environment
    make docker-create-benchmark-auto-offset-store-consumer-topic
    make docker-run-benchmark-auto-offset-store-consumer-interactive

Manual offset store

Result: over 190000 consumed messages per second on macbook pro 2016

Local run in docker:

    make docker-run-environment
    make docker-create-benchmark-manual-commit-consumer-topic
    make docker-run-benchmark-manual-commit-consumer-interactive

Developing

Tests

Before run any test you should add to /etc/hosts entry

127.0.0.1 kafka

You can run docker based integration tests via makefile target

    make test-run-with-docker

kafka's People

Contributors

darthunix avatar dimoffon avatar dsamirov-mrg avatar filonenko-mikhail avatar hackallcode avatar olegrok avatar repentantgopher avatar rosik avatar runsfor avatar totktonada avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.