Coder Social home page Coder Social logo

mre / kafka-influxdb Goto Github PK

View Code? Open in Web Editor NEW
216.0 20.0 54.0 1.59 MB

High performance Kafka consumer for InfluxDB. Supports collectd message formats.

License: Apache License 2.0

Python 91.85% Shell 2.93% Makefile 2.41% Dockerfile 2.81%
kafka-influxdb kafka-consumer logstash influxdb producer consumer performance

kafka-influxdb's Introduction

Kafka-InfluxDB

Build Status Coverage Status Code Climate PyPi Version Scrutinizer

A Kafka consumer for InfluxDB written in Python.
Supports InfluxDB 0.9.x and up. For InfluxDB 0.8.x support, check out the 0.3.0 tag.

⚠️ The project should work as expected and bug fixes are very welcome, but activity on new functionality is quite low. For newer projects I recommend vector instead, which is both faster and more versatile.

Use cases

Kafka will serve as a buffer for your metric data during high load.
Also it's useful for sending metrics from offshore data centers with unreliable connections to your monitoring backend.

Quickstart

For a quick test, run kafka-influxdb inside a container alongside Kafka and InfluxDB. Some sample messages are generated automatically on startup (using kafkacat).

Python 2:

make
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s

Python 3:

make RUNTIME=py3
docker exec -it kafkainfluxdb
python -m kafka_influxdb -c config_example.yaml -s

PyPy 5.x

make RUNTIME=pypy
docker exec -it kafkainfluxdb
pypy3 -m kafka_influxdb -c config_example.yaml -s --kafka_reader=kafka_influxdb.reader.kafka_python

(Note that one additional flag is given: --kafka_reader=kafka_influxdb.reader.kafka_python. This is because PyPy is incompatible with the confluent kafka consumer which is a C-extension to librdkafka. Therefore we use the kafka_python library here, which is compatible with PyPy but a bit slower.)

Docker:

docker run mre0/kafka-influxdb

or simply

make run

Installation

pip install kafka_influxdb
kafka_influxdb -c config_example.yaml

Contributing

If you like to contribute, please create a pull request with your change.
Please run the tests before you submit the pull request make test.
If you're unsure, whether a change will be accepted, you can also create an issue first, to discuss.
Or look at the already existing issues for inspiration.

Thanks for contributing!

Performance

The following graph shows the number of messages/s read from Kafka for various Python versions and Kafka consumer plugins.
This is testing against a Kafka topic with 10 partitions and five message brokers. As you can see the best performance is achieved on Python 3 using the -O flag for bytecode optimization in combination with the confluent-kafka reader (default setup). Note that encoding and sending the data to InfluxDB might lower this maximum performance although you should still see a significant performance boost compared to logstash.

Benchmark results

Benchmark

For a quick benchmark, you can start a complete kafkacat -> Kafka -> kafka_influxdb -> Influxdb setup with the following command:

make

This will immediately start reading messages from Kafka and write them into InfluxDB. To see the output, you can use the InfluxDB cli.

docker exec -it docker_influxdb_1 bash # Double check your container name
influx
use metrics
show measurements

Supported formats

You can write a custom encoder to support any input and output format (even fancy things like Protobuf). Look at the examples inside the encoder directory to get started. The following formats are officially supported:

Input formats

mydatacenter.myhost.load.load.shortterm 0.45 1436357630
[{
    "values":[
       0.6
    ],
    "dstypes":[
       "gauge"
    ],
    "dsnames":[
       "value"
    ],
    "time":1444745144.824,
    "interval":10.000,
    "host":"xx.example.internal",
    "plugin":"cpu",
    "plugin_instance":"1",
    "type":"percent",
    "type_instance":"system"
 }]

Output formats

load_load_shortterm,datacenter=mydatacenter,host=myhost value="0.45" 1436357630

Custom encoders

If you are writing your custom encoder and you want to run it using the official docker image, you can simply mount it in the container:

docker run -v `pwd`/config.yaml:/usr/src/app/config.yaml -v `pwd`/myencoder.py:/usr/src/app/myencoder.py mre0/kafka-influxdb --encoder=myencoder

Another possibility is to create a custom Docker image that contains your encoder, for example:

FROM mre0/kafka-influxdb

ADD myencoder.py /usr/src/app/myencoder.py
ADD config.yaml /usr/src/app/

CMD python -m kafka_influxdb -c config.yaml -v --encoder=myencoder

Configuration

Take a look at the config-example.yaml to find out how to create a config file. You can overwrite the settings from the commandline. The following parameters are allowed:

Option Description
-h, --help Show help message and exit
--kafka_host KAFKA_HOST Hostname or IP of Kafka message broker (default: localhost)
--kafka_port KAFKA_PORT Port of Kafka message broker (default: 9092)
--kafka_topic KAFKA_TOPIC Topic for metrics (default: my_topic)
--kafka_group KAFKA_GROUP Kafka consumer group (default: my_group)
--kafka_reader KAFKA_READER Kafka client library to use (kafka_python or confluent) (default: kafka_influxdb.reader.confluent)
--influxdb_host INFLUXDB_HOST InfluxDB hostname or IP (default: localhost)
--influxdb_port INFLUXDB_PORT InfluxDB API port (default: 8086)
--influxdb_user INFLUXDB_USER InfluxDB username (default: root)
--influxdb_password INFLUXDB_PASSWORD InfluxDB password (default: root)
--influxdb_dbname INFLUXDB_DBNAME InfluxDB database to write metrics into (default: metrics)
--influxdb_use_ssl Use SSL connection for InfluxDB (default: False)
--influxdb_verify_ssl Verify the SSL certificate before connecting (default: False)
--influxdb_timeout INFLUXDB_TIMEOUT Max number of seconds to establish a connection to InfluxDB (default: 5)
--influxdb_use_udp Use UDP connection for InfluxDB (default: False)
--influxdb_retention_policy INFLUXDB_RETENTION_POLICY Retention policy for incoming metrics (default: autogen)
--influxdb_time_precision INFLUXDB_TIME_PRECISION Precision of incoming metrics. Can be one of 's', 'm', 'ms', 'u' (default: s)
--encoder ENCODER Input encoder which converts an incoming message to dictionary (default: collectd_graphite_encoder)
--buffer_size BUFFER_SIZE Maximum number of messages that will be collected before flushing to the backend (default: 1000)
-c CONFIGFILE, --configfile CONFIGFILE Configfile path (default: None)
-s, --statistics Show performance statistics (default: True)
-v, --verbose Set verbosity level. Increase verbosity by adding a v: -v -vv -vvv (default: 0)
--version Show version

Comparison with other tools

There is a Kafka input plugin and an InfluxDB output plugin for logstash. It supports Influxdb 0.9+. We've achieved a message throughput of around 5000 messages/second with that setup. Check out the configuration at docker/logstash/config.conf. You can run the benchmark yourself:

make RUNTIME=logstash
docker exec -it logstash
logstash -f config.conf

Please send a Pull Request if you know of other tools that can be mentioned here.

kafka-influxdb's People

Contributors

c0by avatar danielnelson avatar dependabot-preview[bot] avatar dependabot[bot] avatar di avatar ingafeick avatar japudcret avatar kaffeiflex avatar mre avatar quantifiedcode-bot avatar teone avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-influxdb's Issues

Add SSL params

Please add the following paras to the implementation and to the config file:
ssl (bool) – use https instead of http to connect to InfluxDB, defaults to False
verify_ssl (bool) – verify SSL certificates for HTTPS requests, defaults to False

Thanks a lot!
D.

ERROR:root:Error while creating InfluxDB datbase: database already exists

Hello - I'm a huge fan of kafka-influxdb. I believe is solves a major issue when dealing with volumes of metrics traffic. Thank you!

I did notice one issue after deploying though, if I creat the influxdb database prior to sending the metrics I received the following error "ERROR:root:Error while creating InfluxDB datbase: database already exists".

I'm able to avoid the error by having the DB created by kafka-influxdb. This creates a concern because if I have an outage which interrupts kafka-influxdb connection then I am unable to continue writing to my existing database.

What's the recommended way to run kafka-influxdb? I have multiple kafka nodes and attempted to execute the kafka-influxdb consumer on each node but realize they cannot produce to the same database. Any recommendations to configure this with some resiliency?

JSON encoder for CollectD events

Hi @mre

Please find the below several JSON examples of CollectD events.
If you have estimation\direction when\whether you build this encoder, please share, this will be very helpful feature.

[{"values":[1.13385872826296],"dstypes":["gauge"],"dsnames":["value"],"time":1444745136.182,"interval":10.000,"host":"xx.example.internal","plugin":"memory","plugin_instance":"","type":"percent","type_instance":"slab_recl"}]
[{"values":[0.907258064516129],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"0","type":"percent","type_instance":"user"}]
[{"values":[0.705645161290323],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"0","type":"percent","type_instance":"system"}]
[{"values":[0],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"0","type":"percent","type_instance":"wait"}]
[{"values":[37.7016129032258],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"0","type":"percent","type_instance":"nice"}]
[{"values":[0],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"0","type":"percent","type_instance":"interrupt"}]
[{"values":[0.30241935483871],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"0","type":"percent","type_instance":"softirq"}]
[{"values":[0],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"0","type":"percent","type_instance":"steal"}]
[{"values":[60.383064516129],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"0","type":"percent","type_instance":"idle"}]
[{"values":[0.709939148073022],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"1","type":"percent","type_instance":"user"}]
[{"values":[0.608519269776876],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"1","type":"percent","type_instance":"system"}]
[{"values":[0],"dstypes":["gauge"],"dsnames":["value"],"time":1444745144.824,"interval":10.000,"host":"xx.example.internal","plugin":"cpu","plugin_instance":"1","type":"percent","type_instance":"wait"}]

Thanks
D.

Installation Error

I get the following error when trying to install with pip:

  Downloading kafka_influxdb-0.8.0.tar.gz
  Running setup.py (path:/tmp/pip-build-9OWeIW/kafka-influxdb/setup.py) egg_info for package kafka-influxdb
    Traceback (most recent call last):
      File "<string>", line 17, in <module>
      File "/tmp/pip-build-9OWeIW/kafka-influxdb/setup.py", line 15, in <module>
        long_description = open('README.md').read()
    IOError: [Errno 2] No such file or directory: 'README.md'
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):

  File "<string>", line 17, in <module>

  File "/tmp/pip-build-9OWeIW/kafka-influxdb/setup.py", line 15, in <module>

    long_description = open('README.md').read()

IOError: [Errno 2] No such file or directory: 'README.md'

----------------------------------------
Cleaning up...
Command python setup.py egg_info failed with error code 1 in /tmp/pip-build-9OWeIW/kafka-influxdb
Storing debug log for failure in /root/.pip/pip.log

Add version flag

It's useful to know which version of kafka-influxdb is currently running. We should add an option to show the current version, e.g. kafka_influxdb --version.

Warnings and Errors on running kafka-influxdb

Hi there. I am new to Kafka and I'm trying to use kafka-influxdb to store data coming in from my kafka environment. This is my config-example.yaml file

kafka:
host: "kafka_environment.net"
port: 9092
topic: "topicname"
group: "kafka-influxdb"
influxdb:
host: "influxdb"
port: 8086
user: "root"
password: "root"
dbname: "mydb"
use_ssl: false
verify_ssl: False
timeout: 5
use_udp: False
retention_policy: "default"
time_precision: "s"

encoder: "collectd_json_encoder"
benchmark: false
buffer_size: 1000
statistics: true

When I try to run it, I get the following error:

Traceback (most recent call last):
File "/usr/bin/kafka_influxdb", line 11, in
sys.exit(main())
File "/usr/lib/python2.7/site-packages/kafka_influxdb/kafka_influxdb.py", line 102, in main
start_consumer(config)
File "/usr/lib/python2.7/site-packages/kafka_influxdb/kafka_influxdb.py", line 138, in start_consumer
client.consume()
File "/usr/lib/python2.7/site-packages/kafka_influxdb/kafka_influxdb.py", line 35, in consume
self.buffer.extend(self.encoder.encode(raw_message))
File "/usr/lib/python2.7/site-packages/kafka_influxdb/encoder/collectd_json_encoder.py", line 27, in encode
raise NotImplemented
TypeError: exceptions must be old-style classes or derived from BaseException, not NotImplementedType

I would really appreciate some input toward the right direction regarding this issue.
Thanks.

Unmatched data kills the process

Hello @mre, Thank you for brilliant code. I was able to get it for testing with no efforts.

I wanted to know your thoughts on input not matching to encoder settings. Should it die like it does now or possibly just print something?

there isnt a docker image for this project

Can you create a stable public docker image for this project please. there isn't one, that seems to be maintained at this moment .

This is not an bug/issue but a request to create and publish a docker image

Kerberos Support

This package can use authentication by kerberos?

Thanks for your answers!

Cannot find encoder

root@c358466be7e1:/home/kafka-influxdb# /usr/local/bin/kafka_influxdb -c config.yaml
Reading config file config.yaml
Traceback (most recent call last):
File "/usr/local/bin/kafka_influxdb", line 9, in
load_entry_point('kafka-influxdb==0.7.2', 'console_scripts', 'kafka_influxdb')()
File "/usr/local/lib/python2.7/dist-packages/kafka_influxdb/kafka_influxdb.py", line 102, in main
start_consumer(config)
File "/usr/local/lib/python2.7/dist-packages/kafka_influxdb/kafka_influxdb.py", line 118, in start_consumer
encoder = load_encoder(config.encoder)
File "/usr/local/lib/python2.7/dist-packages/kafka_influxdb/encoder/init.py", line 9, in load_encoder
encoder_module = importlib.import_module(modname)
File "/usr/lib/python2.7/importlib/init.py", line 37, in import_module
import(name)
ImportError: No module named kafka_influxdb.encoder.collectd_json_encoder
root@c358466be7e1:/home/kafka-influxdb# find / -name collectd_json_encoder*
/usr/local/lib/python2.7/dist-packages/kafka_influxdb/encoder/collectd_json_encoder.py
/usr/local/lib/python2.7/dist-packages/kafka_influxdb/encoder/collectd_json_encoder.pyc

Boolean Properties Changed To Strings

screen shot 2015-10-20 at 17 02 11

Hi,

When I'm trying to run kafka-influxdb without config.yaml file and use variables, I have a problem with passing boolean variables.
I'm working with SSL, so I want to pass: --influxdb_use_ssl=True, but then I see in the log that this parameter is received as 'True' and not as it should be (True). Relevant screenshot is attached.
It causes a trouble working with HTTPS and it tries to connect to HTTP while I'm interested connecting to HTTPS.

If I'm working with config.yaml, all works great.

For now, I just changed the default config of this property to True and it works OK.

Thanks,

Nitz

Exception when running kafka-influxdb

Issue

  • Writes to influxdb fail. I find no information when I query influxdb.
  • Running into exceptions when using kafka-influxdb (see log excerpts below).

Log Excerpt
Exception caught while flushing: write_points_with_precision() takes at most 3 arguments (5 given)
Flush 17. Buffer size 1000. Parsed 17000 kafka mes. into 17000 datapoints. Speed: 10077 kafka mes./sec
Exception caught while flushing: write_points_with_precision() takes at most 3 arguments (5 given)
Flush 18. Buffer size 1000. Parsed 18000 kafka mes. into 18000 datapoints. Speed: 10095 kafka mes./sec
Exception caught while flushing: write_points_with_precision() takes at most 3 arguments (5 given)
Flush 19. Buffer size 1000. Parsed 19000 kafka mes. into 19000 datapoints. Speed: 10144 kafka mes./sec
Exception caught while flushing: write_points_with_precision() takes at most 3 arguments (5 given)
Flush 20. Buffer size 1000. Parsed 20000 kafka mes. into 20000 datapoints. Speed: 10209 kafka mes./sec

Code Reference
https://github.com/mre/kafka-influxdb/blob/master/kafka_influxdb.py#L119

Producer
Using kafka-influxdb, I am consuming from kafka topic written by the collectd write_kafka plugin.

More Info
I get an exception about write_points_with_precision() (which is
deprecated and I noticed that you are using write_points so not sure where the issue is yet. Will have to dig through the code further to see what I am missing. Let me know if any other info is required.

Values encoded as Strings into Influxdb 0.9x

on my setup using centos 6, the Graphite encoder is encoding all values as "strings" into influxdb

I have temporarily edit collectd_graphite_encoder.py as a workaround, comment out the lines which add the "":

    def escape_value(self, value):
        value = self.escape_measurement(value)
        # if isinstance(value, text_type):
        #    return "\"{}\"".format(value.replace(
        #                "\"", "\\\""
        #    ))
        # else:
        return str(value)

multiple consumer support

hi @mre

Wondering if this supports multiple processes. I'm not able to use kafka-influx with 2 processes using 2 configs.
i have the same topic in kafka that i'm trying to consume and send data to two databases in influxdb.

/usr/local/bin/kafka_influxdb -c /etc/kafka-influxdb/config_example.yaml < !! this works
/usr/local/bin/kafka_influxdb -c /etc/kafka-influxdb/lab.yaml < !! this doesn’t work

root@:~# more /etc/kafka-influxdb/config_example.yaml

kafka:
..
topic: "acct1"
group: "kafka-influxdb"
influxdb:
host: "localhost"
port: 8086
...
encoder: "kafka_influxdb.encoder.pmacct_encoder"
benchmark: false
buffer_size: 1000
statistics: true

root@:~# more /etc/kafka-influxdb/lab.yaml

kafka:
topic: "acct1"
group: "1"
influxdb:
host: "localhost"
port: 8086
..
encoder: "kafka_influxdb.encoder.pmacct_encoder"
benchmark: false
buffer_size: 10000
statistics: true

I can see that the readear gets the message from kafka ( print measurements in encoder ) but it never gets passed to the writer ( print msg in influxdb-writer def: write never gets called ).

Thx.
Catalin

Unexpected behaviour with influxdb 0.13

To reproduce, set up an influxdb 0.13 server and run kafka_influxdb in benchmark mode.
kafka-influxdb output:

WARNING:root:Cannot write data points: 400: {"error":"unable to parse '2': missing fields\nunable to parse '6': missing fields\nunable to parse 'f': missing fields\nunable to parse '2': missing fields\nunable to parse 'f': missing fields\nunable to parse 'c': missing fields\nunable to parse '9': missing fields\nunable to parse '1': missing fields\nunable to parse '8': missing fields\nunable to parse 'f': missing fields\nunable to parse '5': missing fields\nunable to parse '0': missing fields\nunable to parse '.': missing fields\nunable to parse 'l': missing fields\nunable to parse 'o': missing fields\nunable to parse 'a': missing fields\nunable to parse 'd': missing fields\nunable to parse '.': missing fields\nunable to parse 'l': missing fields\nunable to parse 'o': missing fields\nunable to parse 'a': missing fields\nunable to parse 'd': missing fields\nunable to parse '.': missing fields\nunable to parse 's': missing fields\nunable to parse 'h': missing fields\nunable to parse 'o': missing fields\nunable to parse 'r': missing fields\nunable to parse 't': missing fields\nunable to parse 't': missing fields\nunable to parse 'e': missing fields\nunable to parse 'r': missing fields\nunable to parse 'm': missing fields\nunable to parse '0': missing fields\nunable to parse '.': missing fields\nunable to parse '0': missing fields\nunable to parse '5': missing fields\nunable to parse '1': missing fields\nunable to parse '4': missing fields\nunable to parse '3': missing fields\nunable to parse '6': missing fields\nunable to parse '3': missing fields\nunable to parse '5': missing fields\nunable to parse '7': missing fields\nunable to parse '6': missing fields\nunable to parse '3': missing fields\nunable to parse '0': missing fields"}

Influxdb log excerpt:


2
6
f
2
f
c
9
1
8
f
5
0
.
l
o
a
d
.
l
o
a
d
.
l
o
n
g
t
e
r
m

0
.
0
5

1
4
3
6
3
5
7
6
3
0
...

kafka-simple-consumer:

26f2fc918f50.load.load.shortterm 0.05 1436357630
            26f2fc918f50.load.load.midterm 0.05 1436357630
            26f2fc918f50.load.load.longterm 0.05 1436357630

Note: upgraded the python influxdb client (3.0.0), same issue.

Greetings,

Alexis

Run kafka-influxdb as daemon by default

As of now, daemonizing kafka-influxdb is tricky. It can be done with kafka_influxdb &
We should also add a -d flag that puts the program into the background.

Performance for write to influxDB with kafka connector

Hi,

I started use kafka connector to write streaming data to influxDB. So i dispose one single node influxdb and when connector start all message start write to influxdb. And any queries i execute crash because i have more data.

In kafka connector, is exit a paramater to configure for limit number of message comming from kafka so number of insert to my database.

Thanks for advanced

Installation with pip3

Hi,

I installed kafka-influxdb within my own VMs with python2 successfully. But then, when I try to install in another environment, I directly downloaded the master branch, unzip, and try to run "pip3 install kafka-influxdb", it tells:

[root@dhcp-10-173-66-167 kafka-influxdb-master]# pip3 install kafka_influxdb Collecting kafka_influxdb Could not find a version that satisfies the requirement kafka_influxdb (from versions: ) No matching distribution found for kafka_influxdb

I checked with my pip3 version:
[root@dhcp-10-173-66-167 kafka_influxdb]# pip3 install --upgrade pip Requirement already up-to-date: pip in /usr/local/lib/python3.6/site-packages You have mail in /var/spool/mail/root [root@dhcp-10-173-66-167 kafka_influxdb]# pip3 -V pip 9.0.1 from /usr/local/lib/python3.6/site-packages (python 3.6)

And I'm running on CentOS Linux release 7.4.1708 and Linux 3.10.0-693.11.6.el7.x86_64.

Can you help me out of this?
Thanks in advance!

Shawn.

typo in README

There is a typo error in the README under the installation section,

Installation
pip install kafka_influxdb
kafka_influxdb -c config-example.yaml

the file name is config_example.yaml , note the filename has _ , readme uses -

Installation issue

➜  kafka-influxdb git:(master) ✗ make RUNTIME=py3
python setup.py test
usage: setup.py [global_opts] cmd1 [cmd1_opts] [cmd2 [cmd2_opts] ...]
   or: setup.py --help [cmd1 cmd2 ...]
   or: setup.py --help-commands
   or: setup.py cmd --help

error: invalid command 'pytest'
make: *** [test] Error 1

Then i tried
pip3 install -r requirements.txt . i got the error below
....
.....

Collecting futures==3.2.0 (from -r requirements.txt (line 12))
  Could not find a version that satisfies the requirement futures==3.2.0 (from -r requirements.txt (line 12)) (from versions: 0.2.python3, 0.1, 0.2, 1.0, 2.0, 2.1, 2.1.1, 2.1.2, 2.1.3, 2.1.4, 2.1.5, 2.1.6, 2.2.0, 3.0.0, 3.0.1, 3.0.2, 3.0.3, 3.0.4, 3.0.5, 3.1.0, 3.1.1)
No matching distribution found for futures==3.2.0 (from -r requirements.txt (line 12))

pip install kafka_influxdb on CentOS7 failed

I tried installing kafka-influxdb using pip it failed. But when I ran the same thing by downloading the code and using python setup.py install it worked.

[root@centos-agent ~]# pip install kafka_influxdb
Collecting kafka_influxdb
Downloading kafka_influxdb-0.7.2.tar.gz
Collecting certifi==2015.9.6.2 (from kafka_influxdb)
Downloading certifi-2015.9.6.2-py2.py3-none-any.whl (371kB)
100% |████████████████████████████████| 378kB 184kB/s
Collecting funcsigs==0.4 (from kafka_influxdb)
Downloading funcsigs-0.4-py2.py3-none-any.whl
Collecting influxdb==2.9.2 (from kafka_influxdb)
Downloading influxdb-2.9.2-py2.py3-none-any.whl (256kB)
100% |████████████████████████████████| 266kB 259kB/s
Collecting kafka-python==0.9.4 (from kafka_influxdb)
Downloading kafka-python-0.9.4.tar.gz (63kB)
100% |████████████████████████████████| 71kB 47kB/s
Collecting mock==1.3.0 (from kafka_influxdb)
Downloading mock-1.3.0-py2.py3-none-any.whl (56kB)
100% |████████████████████████████████| 61kB 162kB/s
Collecting nose==1.3.7 (from kafka_influxdb)
Downloading nose-1.3.7-py2-none-any.whl (154kB)
100% |████████████████████████████████| 163kB 88kB/s
Collecting pbr==1.8.0 (from kafka_influxdb)
Downloading pbr-1.8.0-py2.py3-none-any.whl (87kB)
100% |████████████████████████████████| 92kB 132kB/s
Collecting python-dateutil==2.4.2 (from kafka_influxdb)
Downloading python_dateutil-2.4.2-py2.py3-none-any.whl (188kB)
100% |████████████████████████████████| 194kB 70kB/s
Collecting pytz==2015.6 (from kafka_influxdb)
Downloading pytz-2015.6-py2.py3-none-any.whl (475kB)
100% |████████████████████████████████| 481kB 61kB/s
Collecting PyYAML==3.11 (from kafka_influxdb)
Downloading PyYAML-3.11.zip (371kB)
100% |████████████████████████████████| 378kB 64kB/s
Collecting requests==2.8.0 (from kafka_influxdb)
Downloading requests-2.8.0-py2.py3-none-any.whl (476kB)
100% |████████████████████████████████| 481kB 61kB/s
Requirement already satisfied (use --upgrade to upgrade): six>=1.9.0 in /usr/lib/python2.7/site-packages/six-1.10.0-py2.7.egg (from kafka_influxdb)
Collecting virtualenv==13.1.2 (from kafka_influxdb)
Downloading virtualenv-13.1.2-py2.py3-none-any.whl (1.7MB)
100% |████████████████████████████████| 1.7MB 63kB/s
Collecting wheel==0.24.0 (from kafka_influxdb)
Downloading wheel-0.24.0-py2.py3-none-any.whl (63kB)
100% |████████████████████████████████| 71kB 105kB/s
Building wheels for collected packages: kafka-influxdb, kafka-python, PyYAML
Running setup.py bdist_wheel for kafka-influxdb ... done
Stored in directory: /root/.cache/pip/wheels/d0/ab/21/e55a9338b9fd3809c5e919d9763edc60225c08e5e0a41c2351
Running setup.py bdist_wheel for kafka-python ... done
Stored in directory: /root/.cache/pip/wheels/1f/13/49/c9e3e163f95d492cd2835a975b0733b98a943dbad800621d13
Running setup.py bdist_wheel for PyYAML ... done
Stored in directory: /root/.cache/pip/wheels/4a/bf/14/d79994d19a59d4f73efdafb8682961f582d45ed6b459420346
Successfully built kafka-influxdb kafka-python PyYAML
Installing collected packages: certifi, funcsigs, requests, pytz, python-dateutil, influxdb, kafka-python, pbr, mock, nose, PyYAML, virtualenv, wheel, kafka-influxdb
Found existing installation: wheel 0.29.0
Uninstalling wheel-0.29.0:
Successfully uninstalled wheel-0.29.0
Successfully installed PyYAML-3.11 certifi-2015.9.6.2 funcsigs-0.4 influxdb-2.9.2 kafka-influxdb-0.7.2 kafka-python-0.9.4 mock-1.3.0 nose-1.3.7 pbr-1.8.0 python-dateutil-2.4.2 pytz-2015.6 requests-2.8.0 virtualenv-13.1.2 wheel-0.24.0

[root@centos-agent kafka_influxdb]# kafka_influxdb -c kafka_influxdb_conf.yaml
Reading config file kafka_influxdb_conf.yaml
Traceback (most recent call last):
File "/usr/bin/kafka_influxdb", line 11, in
sys.exit(main())
File "/usr/lib/python2.7/site-packages/kafka_influxdb/kafka_influxdb.py", line 102, in main
start_consumer(config)
File "/usr/lib/python2.7/site-packages/kafka_influxdb/kafka_influxdb.py", line 118, in start_consumer
encoder = load_encoder(config.encoder)
File "/usr/lib/python2.7/site-packages/kafka_influxdb/encoder/init.py", line 9, in load_encoder
encoder_module = importlib.import_module(modname)
File "/usr/lib64/python2.7/importlib/init.py", line 37, in import_module
import(name)
ImportError: No module named kafka_influxdb.encoder.collectd_graphite_encoder

Integrating kafka influxdb with my own collector and kafka environment

I have my own collector that sends data to Kafka on an environment and I want to integrate Kafka-InfluxDB with it. I just want to verify the usage of Kafka-InfluxDB.
I have created the config_example.yaml file and set the parameters there to point to my Kafka environment. Although, I am not sure what I should put in the encoder field.
In order to consume the data from Kafka and push it into InfluxDB, do I just need to run this command ? - kafka_influxdb -c config-example.yaml

I am asking since I am aware that there is a Docker container that runs collectD, kakfa, kafka-influxDB and influxDB together. However I don't need this as I already have my own collector on a separate environment.

installation issue

installing via easy_install wont look at my local packages set via PYTHONPATH. It keeps trying to download the package which is not allowed from the server I am using..

easy_install-2.7 --prefix='/data/mktlocal/log/LIBRARY' kafka_influxdb-0.9.3
Processing kafka_influxdb-0.9.3
Writing /data/mktlocal/log/PACKAGES/kafka_influxdb-0.9.3/setup.cfg
Running setup.py -q bdist_egg --dist-dir /data/mktlocal/log//PACKAGES/kafka_influxdb-0.9.3/egg-dist-tmp-j5ljyZ
removing '/data/mktlocal/log/LIBRARY/lib/python2.7/site-packages/kafka_influxdb-0.9.3-py2.7.egg' (and everything under it)
creating /data/mktlocal/log/LIBRARY/lib/python2.7/site-packages/kafka_influxdb-0.9.3-py2.7.egg
Extracting kafka_influxdb-0.9.3-py2.7.egg to /data/mktlocal/log/LIBRARY/lib/python2.7/site-packages
kafka-influxdb 0.9.3 is already the active version in easy-install.pth
Installing kafka_influxdb script to /data/mktlocal/log/LIBRARY/bin

Installed /data/mktlocal/log/LIBRARY/lib/python2.7/site-packages/kafka_influxdb-0.9.3-py2.7.egg
Processing dependencies for kafka-influxdb==0.9.3
Searching for ujson
Reading https://pypi.python.org/simple/ujson/
Download error on https://pypi.python.org/simple/ujson/: [Errno -2] Name or service not known -- Some packages may not be found!
Couldn't find index page for 'ujson' (maybe misspelled?)
Scanning index of all packages (this may take a while)
Reading https://pypi.python.org/simple/
Download error on https://pypi.python.org/simple/: [Errno -2] Name or service not known -- Some packages may not be found!
No local packages or download links found for ujson
error: Could not find suitable distribution for Requirement.parse('ujson')

echo $PYTHONPATH
:/python/Jinja2/2.7.3/common/2.7/lib/python/Jinja2-2.7.3-py2.7.egg/:/python/MarkupSafe/0.23/common/MarkupSafe-0.23-py2.7-linux_x86_64.egg/:/python/matplotlib/1.2.1/exec/matplotlib-1.2.1-py2.6-linux-x86_64.egg/:/python/pandas/0.17.1/common/pandas-0.17.1-py2.7-linux_x86_64.egg/:/python/numpy/1.9.2/exec/numpy-1.9.2-py2.7-linux-x86_64.egg/:/python/six/1.10.0/common/six-1.10.0-py2.7.egg:/python/requests/2.10.0/common/requests-2.10.0-py2.7.egg:/python/bokeh/0.10.0/common/bokeh-0.10.0-py2.7.egg:/python/setuptools/12.4/common/setuptools-12.4-py2.7.egg:/python/pycrypto/2.6.1_qz1/common/pycrypto-2.6.1+qz1-py2.7-linux_x86_64.egg:/python/ecdsa/0.13/common/ecdsa-0.13-py2.7.egg:/python/paramiko/1.15.2/common/paramiko-1.15.2-py2.7.egg:/python/lxml/3.5.0/exec/2.7/lib/python/lxml-3.5.0-py2.7-linux-x86_64.egg:/python/beautifulsoup4/4.4.1/common/beautifulsoup4-4.4.1-py2.7.egg:/python/django-tagging/0.4.5/common/2.7/lib/python/django_tagging-0.4.5-py2.7.egg::/python/urllib3/1.15.1/common/urllib3-1.15.1-py2.7.egg:/python/scandir/1.5/exec/2.7/lib/python/scandir-1.5-py2.7-linux-x86_64.egg:/python/pyparsing/2.2.0/common/2.7/lib/python/pyparsing-2.2.0-py2.7.egg-info:/python/ujson/1.35/exec/ujson-1.35-py2.7-linux_x86_64.egg:/data/mktlocal/log/LIBRARY/lib/python2.7/site-packages

[Question] PyPy compatibility?

Anyone know if kafka-influxdb is compatible with PyPy?
I was doing a load test the other day and was seeing 30-35% cpu use (single cpu VM), so I'm thinking that if PyPy was supported then that might give me some "better" performance under heavy load.

Throttling Messages - kafka_influxdb connector > Influxdb

Hello,

I know it was previously mentioned that throttling messages is outside the scope of this project in #50 but I really think it's closely related. Kafka as a whole is utilized heavily to reduce / buffer load due to high traffic volume. I believe adding those parameters to the kafka_influxdb connector only completes the process.

Telegraf (love this tool) > Kafka > Kafka_influxdb connector > LB (nginx) > Influxdb.

The folks at influx decided to close source influxdb and stopped offering clustering since version 0.9.6.1. I need clustering so I am using the previously mentioned version of influxdb, which limits the amount of nodes within the cluster to 4.

I'm running the cluster in Openstack, 4 nodes, 8vCPUs, 32GB of RAM. Due to the volume of traffic from various nodes (I haven't been able to calculate the exact amount of traffic) being sent directly to influxdb, the system became unresponsive. When I included kafka it was to reduce the load and become a buffer for the metrics if I had an outage or overly amount of metrics at once.

At the moment I am attempting to utilizing the following parameters from telegraf (see below). The goal is send 700 lines of data to kafka which will be written to influxdb. In conjunction with the following lines from kafka_influxdb connector, buffer_size: 700.

What's the purpose of the buffer_size parameter if it's not for throttling?

telegraf_metric_batch_size: 700
telegraf_metric_buffer_limit: 701

Add option to write to logfile

All output is written to stdout currently. There should be a way to specify a logfile in the config and on commandline.

DEBUG:root:None?

I started an InfluxDB using Docker as described at:
https://hub.docker.com/_/influxdb/
but I'm unable to load anything from a Kafka topic "connect-test".
This is the verbose output:

# kafka_influxdb -vvv  -c config.yaml
Reading config file config.yaml
DEBUG:root:Initializing Kafka Consumer
DEBUG:root:Initializing connection to InfluxDB at localhost:8086
INFO:root:Connecting to InfluxDB at localhost:8086 (SSL: False, UDP: False)
DEBUG:root:Initializing message encoder: kafka_influxdb.encoder.collectd_graphite_encoder
INFO:root:Creating InfluxDB database if not exists: metrics
DEBUG:urllib3.connectionpool:Starting new HTTP connection (1): localhost
DEBUG:urllib3.connectionpool:http://localhost:8086 "GET /query?q=CREATE+DATABASE+%22metrics%22&db=metrics HTTP/1.1" 200 None
INFO:root:Listening for messages on Kafka topic connect-test...
INFO:root:Connecting to Kafka with the following settings:
 {'default.topic.config': {'auto.offset.reset': 'largest'}, 'bootstrap.servers': 'localhost:9092', 'offset.store.method': 'broker', 'group.id': 'kafka-influxdb'}...
DEBUG:root:<cimpl.Message object at 0x2268210>
INFO:root:connect-test [0] reached end at offset 524608 with key None

DEBUG:root:None
DEBUG:root:None
DEBUG:root:None
DEBUG:root:None
...

Database is created but then I have only "DEBUG:root:None" message endlessly.
I tried also to reset offsets of my Kafka topic but no results.
Any help?

cannot query the ingested data

I'm trying to configure collectd->kafka->kafka-influxdb->influxdb->grafana

collectd is correctly configured to report to graphite but I wish to use kafka/influxdb instead.

I publish to kafka, pick that up in kafka-influxdb and something shows up in influxdb but I cannot figure out what is wrong since I cannot even query the data. (on the other hand I new to influxdb...)

this is (some of) what the kafka console consumer shows

f014-520-kafka.internal.machines.smart-sda.smart_powercycles 35 1447706286
f014-520-kafka.internal.machines.thermal-cooling_device1.gauge 0 1447706286
f014-520-kafka.internal.machines.thermal-cooling_device3.gauge 0 1447706286
f014-520-kafka.internal.machines.GenericJMX.gauge-kafka-X-UnderReplicatedPartitions- 0 1447706286

from collectd.conf

<Plugin write_kafka>
   Property "metadata.broker.list" "{{ collectd_kafka_broker_list }}"
<Topic "collectd.graphite">
   Format Graphite
</Topic>
</Plugin>
#kafka-influxdb.config
kafka:
  host: "f013-520-kafka.internal.machines"
  port: 9092
  topic: "collectd.graphite"
  group: "kafka-influxdb.{{ inventory_hostname }}"
influxdb:
  host: "localhost"
  port: 8086
  user: "root"
  password: "root"
  dbname: "metrics"
  use_ssl: false
  verify_ssl: False
  timeout: 5
  use_udp: False
  retention_policy: "default"
  time_precision: "s"
benchmark: false
buffer_size: 1000
statistics: true

and finally in influx tool

use metrics
show measurements;

internal_machines_thermal-cooling_device7_gauge
internal_machines_thermal-cooling_device8_gauge
internal_machines_thermal-cooling_device9_gauge

select * from internal_machines_thermal-cooling_device9_gauge
ERR: error parsing query: found -, expected SELECT, DELETE, SHOW, CREATE, DROP, GRANT, REVOKE, ALTER, SET at line 1, char 40

do I have to anything else?

any hints appreciated...

need path argument for influxdb to conne t

In most cases to connect to influxdb mere host pot name isnt enough , as there is this path in url to reach the infludb instance please introduce this path argument , and it can be optional

a sample influx url is

curl -G "http://<ipaddress>:<port>/api/v1/proxy/somefolder/anotherfolder/services/influxdb:influxdb/query?pretty=true" --data-urlencode "q=show databases"

python3 compatibility of echo_encoder

Using "kafka_influxdb.encoder.echo_encoder" with python3 gives the following error:
WARNING:root:Cannot write data points: sequence item 0: expected str instance, bytes found
Decoding msg solves the issue.

Support for json format

Hello,

Would be nice to have an encoder to support json and be able to choose what keys are tags or values and the time field.

so for example in below json :

{
 "interface_id": 6939,
 "bytes": 13780000,
 "interface_name": "eth0",
 "time": " 1470920344.173186
}

we can choose interface_id, interface_name as tags and bytes as values resulting
example_measuarment,interface_id=6939,interface_name="eth0" bytes= 13780000 1470920344.173186

Add comparison with Telegraf to README

As it is possible to use Telegraf with its kafka_consumer input to transfer from Kafka to InfluxDB, it would be helpful if Telegraf was added to the comparison section with information about how it compares in terms of performance, feature set, and durability to this project.

I'm the current maintainer of https://github.com/influxdata/telegraf so I'm happy to help with any questions about Telegraf.

Support Python 3.x

Currently kafka-influxdb only supports Python 2.7.
We should also support Python 3.x using six.

No messages read from kafka

Kafka jar : kafka_2.11-0.8.2.1.jar
Python mods :
kafka-influxdb==0.5.0
kafka-python==0.9.4
config.yaml :

kafka:
  host: "localhost"
  port: 9092
  topic: "mytopic"
influxdb:
  host: "localhost"
  port: 8086
  user: "myuser"
  password: "mypwd"
  dbname: "mydb"
  retention_policy: "default"
encoder: "collectd_graphite_encoder"
benchmark: false
buffer_size: 1000
verbose: false
statistics: true

On a simple local configuration, kafka broker is on localhost:9092, influxdb and collectd running as well.
Collectd is sending messages to kafka (checked with kafka-console-consumer.sh)
Influxdb accepting requests.
However, when starting kafka_influxdb as follows (foreground mode)
kafka_influxdb -c ./config.yaml -vvv

Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 851, in emit
    msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 724, in format
    return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 464, in format
    record.message = record.getMessage()
  File "/usr/lib/python2.7/logging/__init__.py", line 328, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Logged from file kafka_influxdb.py, line 100
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 851, in emit
    msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 724, in format
    return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 464, in format
    record.message = record.getMessage()
  File "/usr/lib/python2.7/logging/__init__.py", line 328, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Logged from file kafka_influxdb.py, line 118
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 851, in emit
    msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 724, in format
    return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 464, in format
    record.message = record.getMessage()
  File "/usr/lib/python2.7/logging/__init__.py", line 328, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Logged from file kafka_influxdb.py, line 130
INFO:root:Connecting to InfluxDB at localhost:8086...
INFO:root:Creating database mydb if not exists
INFO:root:Creating InfluxDB database if not exists: mydb
INFO:urllib3.connectionpool:Starting new HTTP connection (1): localhost
DEBUG:urllib3.connectionpool:Setting read timeout to None
DEBUG:urllib3.connectionpool:"GET /query?q=CREATE+DATABASE+mydb&db=mydb HTTP/1.1" 200 72
INFO:root:database already exists
INFO:root:Listening for messages on Kafka topic mytopic...

lsof -nn on the process gives the following output :

COMMAND     PID USER   FD   TYPE    DEVICE SIZE/OFF    NODE NAME
kafka_inf 23950 root  txt    REG       8,2  3345416  131084 /usr/bin/python2.7
kafka_inf 23950 root  mem    REG       8,2    47712 4722756 /lib/x86_64-linux-gnu/libnss_files-2.19.so
kafka_inf 23950 root  mem    REG       8,2   194174 1449890 /usr/local/lib/python2.7/dist-packages/pandas/_testing.so
kafka_inf 23950 root  mem    REG       8,2   589629 1449892 /usr/local/lib/python2.7/dist-packages/pandas/msgpack.so
kafka_inf 23950 root  mem    REG       8,2   304666 1450056 /usr/local/lib/python2.7/dist-packages/pandas/json.so
kafka_inf 23950 root  mem    REG       8,2  2005176 1449891 /usr/local/lib/python2.7/dist-packages/pandas/parser.so
kafka_inf 23950 root  mem    REG       8,2  1437878 1450065 /usr/local/lib/python2.7/dist-packages/pandas/_sparse.so
kafka_inf 23950 root  mem    REG       8,2  1653285 1450058 /usr/local/lib/python2.7/dist-packages/pandas/_period.so
kafka_inf 23950 root  mem    REG       8,2  1054219 1450055 /usr/local/lib/python2.7/dist-packages/pandas/index.so
kafka_inf 23950 root  mem    REG       8,2 12434880 1450136 /usr/local/lib/python2.7/dist-packages/pandas/algos.so
kafka_inf 23950 root  mem    REG       8,2    34256  272801 /usr/lib/python2.7/lib-dynload/_csv.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2  4801036 1450057 /usr/local/lib/python2.7/dist-packages/pandas/lib.so
kafka_inf 23950 root  mem    REG       8,2  5514790 1450137 /usr/local/lib/python2.7/dist-packages/pandas/tslib.so
kafka_inf 23950 root  mem    REG       8,2  2128770  923406 /usr/local/lib/python2.7/dist-packages/numpy/random/mtrand.so
kafka_inf 23950 root  mem    REG       8,2   135687 1063785 /usr/local/lib/python2.7/dist-packages/numpy/fft/fftpack_lite.so
kafka_inf 23950 root  mem    REG       8,2     7632  272779 /usr/lib/python2.7/lib-dynload/future_builtins.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2   673678  923385 /usr/local/lib/python2.7/dist-packages/numpy/linalg/_umath_linalg.so
kafka_inf 23950 root  mem    REG       8,2   244856  135734 /usr/lib/x86_64-linux-gnu/libquadmath.so.0.0.0
kafka_inf 23950 root  mem    REG       8,2    90080 4722708 /lib/x86_64-linux-gnu/libgcc_s.so.1
kafka_inf 23950 root  mem    REG       8,2  1153392  135335 /usr/lib/x86_64-linux-gnu/libgfortran.so.3.0.0
kafka_inf 23950 root  mem    REG       8,2   511344  789219 /usr/lib/libblas/libblas.so.3.0
kafka_inf 23950 root  mem    REG       8,2  5882272  792640 /usr/lib/lapack/liblapack.so.3.0
kafka_inf 23950 root  mem    REG       8,2    51744  923391 /usr/local/lib/python2.7/dist-packages/numpy/linalg/lapack_lite.so
kafka_inf 23950 root  mem    REG       8,2   102644 1207007 /usr/local/lib/python2.7/dist-packages/numpy/lib/_compiled_base.so
kafka_inf 23950 root  mem    REG       8,2   625265 1063910 /usr/local/lib/python2.7/dist-packages/numpy/core/scalarmath.so
kafka_inf 23950 root  mem    REG       8,2  2234567 1063884 /usr/local/lib/python2.7/dist-packages/numpy/core/umath.so
kafka_inf 23950 root  mem    REG       8,2  7452726 1063879 /usr/local/lib/python2.7/dist-packages/numpy/core/multiarray.so
kafka_inf 23950 root  mem    REG       8,2  1270596 1450138 /usr/local/lib/python2.7/dist-packages/pandas/hashtable.so
kafka_inf 23950 root  mem    REG       8,2    18936 4722839 /lib/x86_64-linux-gnu/libuuid.so.1.3.0
kafka_inf 23950 root  mem    REG       8,2    30944  139649 /usr/lib/x86_64-linux-gnu/libffi.so.6.0.1
kafka_inf 23950 root  mem    REG       8,2   136232  272793 /usr/lib/python2.7/lib-dynload/_ctypes.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2    54064  272798 /usr/lib/python2.7/lib-dynload/_json.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2    33448  272783 /usr/lib/python2.7/lib-dynload/_multiprocessing.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2    77752  271801 /usr/lib/python2.7/lib-dynload/parser.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2   387256 4718682 /lib/x86_64-linux-gnu/libssl.so.1.0.0
kafka_inf 23950 root  mem    REG       8,2    38480  272785 /usr/lib/python2.7/lib-dynload/_ssl.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2  1930528 4718681 /lib/x86_64-linux-gnu/libcrypto.so.1.0.0
kafka_inf 23950 root  mem    REG       8,2    20664  271802 /usr/lib/python2.7/lib-dynload/_hashlib.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2   109232  272794 /usr/lib/python2.7/lib-dynload/datetime.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2  7476720  138263 /usr/lib/locale/locale-archive
kafka_inf 23950 root  mem    REG       8,2  1071552 4722731 /lib/x86_64-linux-gnu/libm-2.19.so
kafka_inf 23950 root  mem    REG       8,2   100728 4722844 /lib/x86_64-linux-gnu/libz.so.1.2.8
kafka_inf 23950 root  mem    REG       8,2    10680 4722836 /lib/x86_64-linux-gnu/libutil-2.19.so
kafka_inf 23950 root  mem    REG       8,2    14664 4722698 /lib/x86_64-linux-gnu/libdl-2.19.so
kafka_inf 23950 root  mem    REG       8,2  1845024 4722681 /lib/x86_64-linux-gnu/libc-2.19.so
kafka_inf 23950 root  mem    REG       8,2   141574 4722801 /lib/x86_64-linux-gnu/libpthread-2.19.so
kafka_inf 23950 root  mem    REG       8,2   149120 4722657 /lib/x86_64-linux-gnu/ld-2.19.so
kafka_inf 23950 root    0u   CHR     136,9      0t0      12 /dev/pts/9
kafka_inf 23950 root    1u   CHR     136,9      0t0      12 /dev/pts/9
kafka_inf 23950 root    2u   CHR     136,9      0t0      12 /dev/pts/9
kafka_inf 23950 root    3u  IPv4 261900078      0t0     TCP 127.0.0.1:37828->127.0.0.1:8086 (EST

It doesn't seem to be talking to kafka nor consuming any messages.

Any help would be appreciated.

Regards,

Alexis

Database created successfully, but with no measurements or content.

Hi,

I'm trying to build up a CollectD-Kafka-InfluxDB-Grafana setup.
With collectd-5.8.0, kafka-2.11-1.1.0, influxdb-1.6.0, and grafana-5.2.2-1, and today's master branch of kafka-influxdb on a CentOS 7.

I have successfully installed this kafka-influxdb with "pip install kafka_influxdb", and config file modified as below:


kafka:
host: "localhost"
port: 9092
topic: "collectd"
group: "my_group"
reader: "kafka_influxdb.reader.confluent"
#reader: "kafka_influxdb.reader.kafka_python" # Legacy consumer
influxdb:
host: "localhost"
port: 8086
user: "root"
password: "root"
dbname: "collectd"
use_ssl: false
verify_ssl: False
timeout: 5
use_udp: False
retention_policy: "autogen"
time_precision: "s"
encoder: "kafka_influxdb.encoder.collectd_graphite_encoder"
buffer_size: 1000
statistics: true

When I run "kafka_influxdb -c config_example.yaml -v", the outputs are

[root@master kafka-influxdb-master]# kafka_influxdb -c config_example.yaml -v
Reading config file config_example.yaml
INFO:root:Connecting to InfluxDB at localhost:8086 (SSL: False, UDP: False)
INFO:root:Creating InfluxDB database if not exists: collectd
INFO:requests.packages.urllib3.connectionpool:Starting new HTTP connection (1): localhost
INFO:root:Listening for messages on Kafka topic collectd...
INFO:root:Connecting to Kafka with the following settings:
{'default.topic.config': {'auto.offset.reset': 'largest'}, 'bootstrap.servers': 'localhost:9092', 'offset.store.method': 'broker', 'group.id': 'my_group'}...
INFO:root:collectd [0] reached end at offset 18683 with key None
INFO:root:collectd [0] reached end at offset 18706 with key None
INFO:root:collectd [0] reached end at offset 18729 with key None
...

With InfluxDB, I can see the database "collectd" created when I use "show databased", however, when I "use collectd" and "show measurements" in influx, nothing appears.

However, if I consume kafka directly, the outputs looks all right:

[root@master kafka_2.11-1.1.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic collectd
[2018-07-26 17:08:57,060] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-19358 with old generation 0 (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
[2018-07-26 17:08:57,061] INFO [GroupCoordinator 0]: Stabilized group console-consumer-19358 generation 1 (__consumer_offsets-27) (kafka.coordinator.group.GroupCoordinator)
[2018-07-26 17:08:57,065] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-19358 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2018-07-26 17:08:57,066] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: __consumer_offsets-27. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[{"values":[6.69987148753071],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"user"}]
[{"values":[2.09994854710406],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"system"}]
[{"values":[8.6997867683244],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"user"}]
[{"values":[0.299992589351314],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"wait"}]
[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"nice"}]
[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"nice"}]
[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"interrupt"}]
[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"wait"}]
[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"interrupt"}]
[{"values":[0.0999975409963425],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"softirq"}]
[{"values":[0.09999754230013],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"softirq"}]
[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"0","type":"cpu","type_instance":"steal"}]
[{"values":[0],"dstypes":["derive"],"dsnames":["value"],"time":1532596144.384,"interval":10.000,"host":"master","plugin":"cpu","plugin_instance":"1","type":"cpu","type_instance":"steal"}]

So I don't know what's going on with writing to InfluxDB.

Thanks a lot for your help!

Shawn

error while trying to write data in to influxdb

I get this error while trying to write to influx( i am using a custom encoder which produces an array of influx db expressions.

INFO:root:Shutdown. Flushing remaining messages from buffer.
DEBUG:root:Writing message: ['cpu_load_short,host=server02 value=0.67
cpu_load_short,host=server02,region=us-west value=0.55 1422568543702900257
cpu_load_short,direction=in,host=server01,region=us-west value=2.0 1422568543702900257]
DEBUG:urllib3.connectionpool:http://127.0.0.1:8001 "POST /write?db=metrics&precision=s&rp=autogen HTTP/1.1" 404 1287
WARNING:root:Cannot write data points: 404: {
  "paths": [
    "/api",
    "/api/v1",
    "/apis",
    "/apis/apps",
    "/apis/apps/v1beta1",
    "/apis/authentication.k8s.io",
    "/apis/authentication.k8s.io/v1",
    "/apis/authentication.k8s.io/v1beta1",
    "/apis/authorization.k8s.io",
    "/apis/authorization.k8s.io/v1",
    "/apis/authorization.k8s.io/v1beta1",
    "/apis/autoscaling",
    "/apis/autoscaling/v1",
    "/apis/autoscaling/v2alpha1",
    "/apis/batch",
    "/apis/batch/v1",
    "/apis/batch/v2alpha1",
    "/apis/certificates.k8s.io",
    "/apis/certificates.k8s.io/v1beta1",
    "/apis/extensions",
    "/apis/extensions/v1beta1",
    "/apis/policy",
    "/apis/policy/v1beta1",
    "/apis/rbac.authorization.k8s.io",
    "/apis/rbac.authorization.k8s.io/v1alpha1",
    "/apis/rbac.authorization.k8s.io/v1beta1",
    "/apis/settings.k8s.io",
    "/apis/settings.k8s.io/v1alpha1",
    "/apis/storage.k8s.io",
    "/apis/storage.k8s.io/v1",
    "/apis/storage.k8s.io/v1beta1",
    "/healthz",
    "/healthz/ping",
    "/healthz/poststarthook/bootstrap-controller",
    "/healthz/poststarthook/ca-registration",
    "/healthz/poststarthook/extensions/third-party-resources",
    "/healthz/poststarthook/rbac/bootstrap-roles",
    "/logs",
    "/metrics",
    "/swaggerapi/",
    "/ui/",
    "/version"
  ]
}
Flushing output buffer. 18.59 messages/s

Can this project lose data?

After viewing the code, I don't see whether the project is protecting the user from data loss or not.
Specifically, If the reader reads a message from a Kafka topic, and cant deliver to InfluxDB because it is down, in the (rare) event that the process dies, we will loose that information too.
Right?

Thank you very much for this project!

Telegraf and echo encoder

Hi,
I'm trying to use kafka-influxdb with echo encoder since I have Telegraf on producer side with output set to InfluxDB line protocol, so I would like just to pass the messages from Kafka to InfluxDB.

But I'm getting this error:

WARNING:root:Cannot write data points: 400: {"error":"unable to parse 'swap,ncgroup=client01,host=test01.local total=1719660544i,used=0i,used_percent=0 1524041460000000000': time outside range -9223372036854775806 - 9223372036854775806\nunable to parse 'mem,host=test01.local,ncgroup=client01 used_percent=9.597548106254225,used=381460480i,total=3974561792i 1524041460000000000': time outside range -9223372036854775806 - 9223372036854775806\ [...]

Time stamp is generated on the Telegraf and complete line is ok.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.