Coder Social home page Coder Social logo

kafka-utils's Introduction

Deprecation Warning

Please note that this repo is not maintained in the open source community. The code and examples contained in this repository are for demonstration purposes only.

You can read the latest from Yelp Engineering on our tech blog.

Build Status

Kafka-Utils

A suite of python tools to interact and manage Apache Kafka clusters. Kafka-Utils runs on python 3.7+.

Configuration

Kafka-Utils reads cluster configuration needed to access Kafka clusters from yaml files. Each cluster is identified by type and name. Multiple clusters of the same type should be listed in the same type.yaml file. The yaml files are read from $KAFKA_DISCOVERY_DIR, $HOME/.kafka_discovery and /etc/kafka_discovery, the former overrides the latter.

Sample configuration for sample_type cluster at /etc/kafka_discovery/sample_type.yaml

---
  clusters:
    cluster-1:
      broker_list:
        - "cluster-elb-1:9092"
      zookeeper: "11.11.11.111:2181,11.11.11.112:2181,11.11.11.113:2181/kafka-1"
    cluster-2:
      broker_list:
        - "cluster-elb-2:9092"
      zookeeper: "11.11.11.211:2181,11.11.11.212:2181,11.11.11.213:2181/kafka-2"
  local_config:
    cluster: cluster-1

Install

From PyPI:

    $ pip install kafka-utils

Kafka-Utils command-line interface

List all clusters

    $ kafka-utils
    cluster-type sample_type:
        cluster-name: cluster-1
        broker-list: cluster-elb-1:9092
        zookeeper: 11.11.11.111:2181,11.11.11.112:2181,11.11.11.113:2181/kafka-1
        cluster-name: cluster-2
        broker-list: cluster-elb-2:9092
        zookeeper: 11.11.11.211:2181,11.11.11.212:2181,11.11.11.213:2181/kafka-2

Get consumer offsets

    $ kafka-consumer-manager --cluster-type sample_type offset_get sample_consumer

Get consumer watermarks

    $ kafka-consumer-manager --cluster-type sample_type get_topic_watermark sample.topic

Rebalance cluster cluster1 of type sample_cluster

    $ kafka-cluster-manager --cluster-type sample_type --cluster-name cluster1
    --apply rebalance --brokers --leaders --max-partition-movements 10
    --max-leader-changes 15

Rolling-restart a cluster

    $ kafka-rolling-restart --cluster-type sample_type

Check in-sync replicas

    $ kafka-check --cluster-type sample_type min_isr

Check number of unavailable replicas

    $ kafka-check --cluster-type sample_type replica_unavailability

Documentation

Read the documentation at Read the Docs.

License

Kafka-Utils is licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0

Contributing

Everyone is encouraged to contribute to Kafka-Utils by forking the Github repository and making a pull request or opening an issue.

kafka-utils's People

Contributors

88manpreet avatar amribr avatar anthonysandrin avatar ashsingh21 avatar asottile avatar baisang avatar benbariteau avatar billyevans avatar bobuss avatar dbgrigsby avatar dependabot[bot] avatar dinegri avatar ecanzonieri avatar fede1024 avatar flavray avatar jayh5 avatar kdparker avatar kpid avatar lennrt avatar lsheng avatar lsterk avatar mango-j avatar mborst avatar mhaseebmlk avatar neoeahit avatar ny2ko avatar poros avatar simplesteph avatar yzernik avatar yzg112358 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-utils's Issues

Rolling restart unable to restart broker

Hi
I am trying use rolling restart script(latest) along with Jolokia (jolokia-jvm-1.6.2-agent.jar) which is embedded with the kafka service script running in the brokers node(passed via KAFKA_OPTS).

KAFKA_OPTS="-javaagent:/home/kafka/prometheus/jmx_prometheus_javaagent-0.3.1.jar=8080:/home/kafka/prometheus/kafka-0-8-2.yml -javaagent:/home/kafka/jolokia/jolokia-agent.jar=host=*"

I am able to get jolokia metrics from the remote brokers node using following CURL command.

curl bro1:8778/jolokia/read/kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager/Value | jq

When i run the rolling restart script, it detects all the brokers and after confirmation the script stops the first broker. Then it waits forever to broker 1 to restart with the following messages:

[kfk@admin-node ~]$ kafka-rolling-restart --cluster-type kafka --start-command "/home/kfk/bin/kafka-server-start -daemon /home/kfk/etc/kafka/server.properties " --stop-command "/home/kfk/bin/kafka-server-stop" --check-count 3
Will restart the following brokers in cluster-1:
  1: bro1
  2: bro2
  3: bro3
Do you want to restart these brokers? y
Execute restart
Under replicated partitions: 0, missing brokers: 0 (1/1)
The cluster is stable
Stopping bro1 (1/3)
Starting bro1 (1/3)
Cannot find the key, Kafka is probably still starting up
Under replicated partitions: 40, missing brokers: 1 (0/3)
Broker bro1 is down: HTTPConnectionPool(host='bro1', port=8778): Max retries exceeded with url: /jolokia//read/kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager/Value (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6de814bb50>: Failed to establish a new connection: [Errno 111] Connection refused',)).This maybe because it is starting up
Under replicated partitions: 68, missing brokers: 1 (0/3)
Broker bro1 is down: HTTPConnectionPool(host='bro1', port=8778): Max retries exceeded with url: /jolokia//read/kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager/Value (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6de80d8890>: Failed to establish a new connection: [Errno 111] Connection refused',)).This maybe because it is starting up
Under replicated partitions: 68, missing brokers: 1 (0/3)
Broker bro1 is down: HTTPConnectionPool(host='bro1', port=8778): Max retries exceeded with url: /jolokia//read/kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager/Value (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6de80d8a50>: Failed to establish a new connection: [Errno 111] Connection refused',)).This maybe because it is starting up
Under replicated partitions: 68, missing brokers: 1 (0/3)
Broker bro1 is down: HTTPConnectionPool(host='bro1', port=8778): Max retries exceeded with url: /jolokia//read/kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager/Value (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6de80e9810>: Failed to establish a new connection: [Errno 111] Connection refused',)).This maybe because it is starting up
Under replicated partitions: 68, missing brokers: 1 (0/3)
Broker bro1 is down: HTTPConnectionPool(host='bro1', port=8778): Max retries exceeded with url: /jolokia//read/kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager/Value (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f6de8974b90>: Failed to establish a new connection: [Errno 111] Connection refused',)).This maybe because it is starting up

Tried with the following command as well:

[kfk@admin-node ~]$ kafka-rolling-restart --cluster-type kafka --start-command "sudo service kafka start " --stop-command "sudo service kafka stop" --check-count 3

On inspecting brokers node 1, I found kafka is stopped. Upon manual restart of broker 1, the rolling restart script stopped the second broker and again the script waits forever for broker 2 to get up. I have tested all the service command for kafka(start,stop,restart) manually in the broker's node and all of them are working.

It looks rolling restart script able to stop the kafka broker but unable to restart it.
Where could be the issues ?

Kafka version: confluent-5.2.1-2.12

how does delete_consumer work?

HI,

I have tried to delete a consumer group from kafka with kafka-consumer-manager:

./kafka-consumer-manager --cluster-type pro  delete_group console-consumer-68578
Cluster name: global-pro, consumer group: console-consumer-68578
Offsets of all topics and partitions listed below shall be modified:
Topic: scmspain.ms-ij-notifications.notification, Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]

Is this what you really intend? (y/n) y

I can see my console-consumer-68578 in kafka-manager yet.
Musn't the script clean all references?
How does it work really?

Regards

Remove the strict requirements

Looking at the list of requirements, they are all fixed versions. We should have the flexibility to use newer versions, what made you force specific versions? This is a nightmare for rpm packaging.

pip package updates

PIP still points to 2.3.0 as it's latest version.
Any reason as to why new releases haven't been published to pip.

How to pass argument to command

When I try to hit below command it works

`ubuntu@mynode:~$ kafka-rolling-restart --cluster-type qa_kafka_test --broker-ids

Will restart the following brokers in cluster-1:
101: mynode1
102: mynode2
Do you want to restart these brokers? yes
`
However when I tried to pass that "yes" as argument it doesn't work

`ubuntu@mynode:~$ kafka-rolling-restart --cluster-type qa_kafka_test --task-args yes

Will restart the following brokers in cluster-1:
101: mynode1
102: mynode2
Do you want to restart these brokers?
`
It still ask for confirmation.

What can be done to get this in one command?

Support for checking in sync replicas with SSL enabled Kafka.

We have a ssl enabled kafka cluster, where we are trying to use the kafka-utils to check the min_isr in the cluster.This feature seems to work in Plaintext Kafka & not in SSL Enabled Kafka. Is there a feature to check the in sync replicas in SSL Enabled Kafka? I see the client is still SimpleClient [Deprecated] instead of KafkaClient. Can someone help on this?

I\O gets to 100%

Hi,
every time i use this amazing tool the brokers that are part of the process (decommission \ rebalance) I\O go's mad..
is there an easy way to throttle the partition move? this also happens if i move only 1 partition at a time.
Thx!

Unable to list consumer groups from kafka 0.9.1

list_topics works..but list_groups is hung forever.
ubuntu@loadts5 ~>kafka-consumer-manager --cluster-type kafkaclusters --cluster-name cluster-1 list_topics --storage kafka "10-0-4-34"
Cluster name: cluster-1, consumer group: 10-0-4-34
Consumer Group ID: 10-0-4-34
Topic: uid_cap
Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Topic: uid_segment
Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Topic: ip_segment
Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Topic: geo_segment
Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
Topic: device_segment
Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
ubuntu@loadts5 ~>kafka-consumer-manager --cluster-type kafkaclusters --cluster-name cluster-1 list_groups --storage kafka

Switch to using the new `KafkaAdminClient` where appropriate

Some of the code in this utility could be simplified by leveraging kafka-python's new KafkaAdminClient, for example the code for fetching consumer group offsets is very straightforward if you use https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html#kafka.admin.KafkaAdminClient.list_consumer_group_offsets.

Note that this is a new class so the interfaces are a little unstable, particularly the ones that still return raw structs... but the ones that return more pythonically formatted data should generally be stable.

Support other commands to start and stop kafka

For my setup, I have kafka running within Docker, so stopping and starting kafka isn't as straightforward as sudo service kafka stop.

I'm going to submit a PR to support other options to start and stop kafka. This is not dangerous in my opinion, because it the power user already has ssh access to the Kafka Host.

list_groups returns 0 groups found

I recently started using using kafka-utils v2.3.0 against a 3 node kafka 1.0 cluster. I am trying to do a basic operation of listing all the consumer groups in my cluster. The kafka-consumer-groups.sh utility returns my groups, but:
kafka-consumer-manager --cluster-type=internal --cluster-name=prelease --vvverbose list_groups returns 0 groups with the message:

Consumer Groups:
0 groups found for cluster prelease of type internal

My initial thought was that I had mis-configured something, but I could not figure it out myself.

The get_topic_watermark command on the other hand works perfectly with my configs:

kafka-consumer-manager --cluster-type=internal --cluster-name=prelease get_topic_watermark test_topic
Topic Name: test_topic
	Partition ID: 0
	High Watermark: 565
	Low Watermark: 555

	Partition ID: 1
	High Watermark: 566
	Low Watermark: 554

	Partition ID: 2
	High Watermark: 574
	Low Watermark: 555

Appreciate inputs.

Rolling restart optimisation

Right now the rolling restart script goes from broker 1, to 2, to 3, etc...
It should isolate the active controller as the last broker to restart

In a worst case scenario, the 1 is active controller, then 2 becomes active controller, then 3, etc...
In which case, we always reboot the active controller, which triggers leader election at every single node reboot.

Thoughts? I might try to put something together someday, or you can jump on it :)

Host property in Zookeeper is null

I am testing kafka-utils with our Kafka cluster and the "host" property in Zookeeper brokers/ids/BROKER_ID is null

In our Kafka configuration we use the listener configuration parameters which over-write the host.name parameter, actually it is deprecated since version 0.10.0 of Kafka.
In our testing environment Zookeeper looks like this:

[zk: localhost:2181(CONNECTED) 16] get /brokers/ids/1 {"listener_security_protocol_map":{"INTERNAL":"SSL","EXTERNAL":"SSL"},"endpoints":["INTERNAL://MY_ENDPOINT:9091","EXTERNAL://MY_ENDPOINT:9092"],"rack":"MY_RACK","jmx_port":19999,"host":null,"timestamp":"1522162718367","port":-1,"version":4} cZxid = 0x100000641 ctime = Tue Mar 27 14:58:38 UTC 2018 mZxid = 0x100000641 mtime = Tue Mar 27 14:58:38 UTC 2018 pZxid = 0x100000641 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x162669c09dc0303 dataLength = 235 numChildren = 0 [zk: localhost:2181(CONNECTED) 17]

When I remove the listener parameters and set the host.name it works as expected.
Are you planning to fix this, by probably reading the "endpoints" instead of the "host"? Is there any misconfiguration on my side that I'm not aware of?

Thanks in advance

Upgrade pyopenssl to version 17.5.0 or later (CVE-2018-1000807, CVE-2018-1000808)

GitHub flagged the following security vulnerabilities affecting this repository.

https://nvd.nist.gov/vuln/detail/CVE-2018-1000807

CVE-2018-1000807
high severity
Vulnerable versions: < 17.5.0
Patched version: 17.5.0
Python Cryptographic Authority pyopenssl version prior to version 17.5.0 contains a CWE-416: Use After Free vulnerability in X509 object handling that can result in Use after free can lead to possible denial of service or remote code execution.. This attack appear to be exploitable via Depends on the calling application and if it retains a reference to the memory.. This vulnerability appears to have been fixed in 17.5.0.

https://nvd.nist.gov/vuln/detail/CVE-2018-1000808

CVE-2018-1000808
moderate severity
Vulnerable versions: < 17.5.0
Patched version: 17.5.0
Python Cryptographic Authority pyopenssl version Before 17.5.0 contains a CWE - 401 : Failure to Release Memory Before Removing Last Reference vulnerability in PKCS #12 Store that can result in Denial of service if memory runs low or is exhausted. This attack appear to be exploitable via Depends upon calling application, however it could be as simple as initiating a TLS connection. Anything that would cause the calling application to reload certificates from a PKCS #12 store.. This vulnerability appears to have been fixed in 17.5.0.

Guidance on how to use task?

There's a concept of prestart task and prestop task for the rolling restart.
Can we see an example of how to use those? I couldn't see it being documented anywhere.

Thanks!

Upgrade paramiko to version 2.0.9 or later (CVE-2018-1000805, CVE-2018-7750)

GitHub flagged the following security vulnerabilities affecting this repository.

https://nvd.nist.gov/vuln/detail/CVE-2018-1000805

CVE-2018-1000805
high severity
Vulnerable versions: < 2.0.9
Patched version: 2.0.9
Paramiko version 2.4.1, 2.3.2, 2.2.3, 2.1.5, 2.0.8, 1.18.5, 1.17.6 contains a Incorrect Access Control vulnerability in SSH server that can result in RCE. This attack appear to be exploitable via network connectivity.

https://nvd.nist.gov/vuln/detail/CVE-2018-7750

CVE-2018-7750
high severity
Vulnerable versions: < 1.17.6
Patched version: 1.17.6
transport.py in the SSH server implementation of Paramiko before 1.17.6, 1.18.x before 1.18.5, 2.0.x before 2.0.8, 2.1.x before 2.1.5, 2.2.x before 2.2.3, 2.3.x before 2.3.2, and 2.4.x before 2.4.1 does not properly check whether authentication is completed before processing other requests, as demonstrated by channel-open. A customized SSH client can simply skip the authentication step.

Rolling restart without Jolokia

Is it possible to use the rolling restart functionality without Jolokia? We use the built in jmxremote functionality, is it possible to use that?

Thanks,
Eric Garcia

Can't manipulate offsets in kafka storage

`kafka-consumer-manager -t production --cluster-name kafka-1 offset_get --storage kafka test
Cluster name: kafka-1, consumer group: test

Traceback (most recent call last):
File "/usr/bin/kafka-consumer-manager", line 6, in
run()
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/main.py", line 90, in run
args.command(args, conf)
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/offset_get.py", line 92, in run
storage=args.storage,
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/offset_manager.py", line 73, in preprocess_args
fail_on_error=fail_on_error,
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/offset_manager.py", line 39, in get_topics_from_consumer_group_id
return cls.get_topics_for_group_from_kafka(cluster_config, groupid)
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/offset_manager.py", line 133, in get_topics_for_group_from_kafka
return kafka_group_reader.read_group(groupid)
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/util.py", line 168, in read_group
return self.read_groups(partition).get(group_id, [])
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/util.py", line 183, in read_groups
consumer_timeout_ms=3000,
File "/usr/lib/python2.7/site-packages/kafka/consumer/kafka.py", line 60, in init
self.set_topic_partitions(*topics)
File "/usr/lib/python2.7/site-packages/kafka/consumer/kafka.py", line 224, in set_topic_partitions
self._consume_topic_partition(topic, partition)
File "/usr/lib/python2.7/site-packages/kafka/consumer/kafka.py", line 588, in _consume_topic_partition
"in broker metadata" % (partition, topic))
kafka.common.UnknownTopicOrPartitionError: Partition 35 not found in Topic __consumer_offsets in broker metadata`

I only have 20 partitions in __consumer_offsets topic. How on earth it finds at least 35 of them? :)
Tested on 0.9.0.1 and 0.10.1.0 clusters.

Leveraging OpenSSH ~/.ssh/config

Are there any plans on leveraging ~/.ssh/config ?
Basically my issue is that the ssh fails here https://github.com/Yelp/kafka-utils/blob/master/kafka_utils/util/ssh.py#L118 with a cryptic error message if I don't add , username=ubuntu or whatever my username is.

How does Yelp specify specific username to ssh into their kafka brokers, or ssh configuration overall (tunnels, etc)? Or is it something I need to do a PR on?

My ~/.ssh/config looks like this, and the User field doesn't get picked up:

Host 10.13.80.*
    IdentityFile ~/.ssh/my-identity.pem
    User ubuntu
    UserKnownHostsFile /dev/null
    StrictHostKeyChecking no
    ServerAliveInterval 60

Feature: Allow for Hosts to change IP (kafka-rolling-restart)

I have my Kafka cluster in the cloud, and I have two kinds of reboot:

  • A soft reboot, which is just restarting my brokers
  • A hard reboot, which terminates EC2 instances, and wait for new ones to come up (using a task, thanks for the doc!)

With the hard reboot, because the IP changes and the connection is terminated, the ssh Connection object will fail on the start_kafka command.

So we need a new SSH connection to be instantiated before start_kafka command is executed to ensure we execute it against the right host.

Genetic Rebalance should raise RebalanceError when there are inactive brokers

Right now Genetic Rebalance fails with ValueError when there are any inactive brokers.

Traceback (most recent call last):
  File "/usr/bin/kafka-cluster-manager", line 21, in <module>
    main.run()
  File "/opt/venvs/yelp-kafka-tool/local/lib/python2.7/site-packages/kafka_utils/kafka_cluster_manager/main.py", line 240, in run
    args,
  File "/opt/venvs/yelp-kafka-tool/local/lib/python2.7/site-packages/kafka_utils/kafka_cluster_manager/cmds/command.py", line 99, in run
    self.run_command(ct, cluster_balancer(ct, args))
  File "/opt/venvs/yelp-kafka-tool/local/lib/python2.7/site-packages/kafka_utils/kafka_cluster_manager/cmds/rebalance.py", line 184, in run_command
    base_score = cluster_balancer.score()
  File "/opt/venvs/yelp-kafka-tool/local/lib/python2.7/site-packages/kafka_utils/kafka_cluster_manager/cluster_info/genetic_balancer.py", line 458, in score
    return self._score(_State(self.cluster_topology), score_movement=False)
  File "/opt/venvs/yelp-kafka-tool/local/lib/python2.7/site-packages/kafka_utils/kafka_cluster_manager/cluster_info/genetic_balancer.py", line 731, in __init__
    self.rgs.index(broker.replication_group) for broker in self.brokers
  File "/opt/venvs/yelp-kafka-tool/local/lib/python2.7/site-packages/kafka_utils/kafka_cluster_manager/cluster_info/genetic_balancer.py", line 731, in <genexpr>
    self.rgs.index(broker.replication_group) for broker in self.brokers
ValueError: tuple.index(x): x not in tuple

We should check for inactive brokers and raise RebalanceError when they exist, as we do in PartitionCountBalancer.

Fails to "make test" locally

When I try to run "make test" locally, I get the following error:

Error: version conflict: setuptools 28.8.0 (.tox/py36-unittest/lib/python3.6/site-packages) <-> setuptools>=30.0.0 (from tox>=2.7->tox-pip-extensions==1.2.1->-r requirements-dev.txt (line 5))

sudo without NOPASSWD into sudoers

Hi,

I tried to configure kafka-utils for rolling restart, but it stucked because when it executes sudo, there is a prompt for password (internally). I only was able to configure adding NOPASSWD for systemctl into sudoers

Is there any way to work without adding NOPASSWD into sudoers?

AttributeError: 'int' object has no attribute 'leader'

I am trying to use corruption check with valid parameters:

kafka-corruption-check --cluster-type sample_type --cluster-name $MY_CLUSTER_NAME --data-path $KAFKA_LOGS_PATH --minutes 30

But get this error:

Filtering leaders
Traceback (most recent call last):
File "/opt/rh/python27/root/usr/bin/kafka-corruption-check", line 6, in
run()
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/kafka_utils/kafka_corruption_check/main.py", line 535, in run
args.end_time,
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/kafka_utils/kafka_corruption_check/main.py", line 447, in check_cluster
broker_files = filter_leader_files(cluster_config, broker_files)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/kafka_utils/kafka_corruption_check/main.py", line 395, in filter_leader_files
leader_of = get_partition_leaders(cluster_config)
File "/opt/rh/python27/root/usr/lib/python2.7/site-packages/kafka_utils/kafka_corruption_check/main.py", line 364, in get_partition_leaders
result[topic_partition] = p_data.leader

SSL support

Support SSL connections using kafka-utils

Question: Using the tool where topics may have periods in them

We have a use case where some of our topics are defined with periods in them. Potentially not a good practice but that is the case none the less.

For running commands such as the following:

kafka-consumer-manager --cluster-type test --cluster-name my_cluster offset_set my_group topic1.0.38531

This poses as a problem due to the way the syntax is setup for the command line usage.

Can you recommend a work-around potentially for this or would you be open to a patch that allows topics with periods to be used somehow?

Thanks for the help in advance!

@deckarep

Plan not being executed, no error msg.

Dear,

When using the kafka-cluster-manager tool, I get the follow error:

kafka-cluster-manager --cluster-type prod --cluster-name cluster-1 set_replication_factor --topic GPS_00 2
INFO:kazoo.client:Connecting to SNIP:2181
INFO:kazoo.client:Zookeeper connection established, state: CONNECTED
INFO:kafka-zookeeper-manager:Fetching current cluster-topology from Zookeeper...
INFO:SetReplicationFactorCmd:Decreasing topic GPS_00 replication factor from 4 to 2.
INFO:SetReplicationFactorCmd:Total number of actions before reduction: 16.
INFO:SetReplicationFactorCmd:Number of partition changes: 16. Number of leader-only changes: 0
INFO:SetReplicationFactorCmd:Proposed plan assignment {'version': 1, 'partitions': [{'topic': u'GPS_00', 'partition': 9, 'replicas': [2, 6]}, {'topic': u'GPS_00', 'partition': 4, 'replicas': [4, 6]}, {'topic': u'GPS_00', 'partition': 10, 'replicas': [0, 5]}, {'topic': u'GPS_00', 'partition': 5, 'replicas': [0, 5]}, {'topic': u'GPS_00', 'partition': 11, 'replicas': [4, 0]}, {'topic': u'GPS_00', 'partition': 6, 'replicas': [2, 6]}, {'topic': u'GPS_00', 'partition': 12, 'replicas': [0, 5]}, {'topic': u'GPS_00', 'partition': 7, 'replicas': [3, 2]}, {'topic': u'GPS_00', 'partition': 13, 'replicas': [1, 2]}, {'topic': u'GPS_00', 'partition': 14, 'replicas': [3, 5]}, {'topic': u'GPS_00', 'partition': 15, 'replicas': [1, 5]}, {'topic': u'GPS_00', 'partition': 0, 'replicas': [1, 2]}, {'topic': u'GPS_00', 'partition': 1, 'replicas': [3, 4]}, {'topic': u'GPS_00', 'partition': 2, 'replicas': [3, 4]}, {'topic': u'GPS_00', 'partition': 8, 'replicas': [3, 1]}, {'topic': u'GPS_00', 'partition': 3, 'replicas': [4, 6]}]}
INFO:SetReplicationFactorCmd:Proposed-plan actions count: 16
INFO:SetReplicationFactorCmd:Proposed plan won't be executed.
INFO:kazoo.client:Closing connection to SNIP:2181
INFO:kazoo.client:Zookeeper session lost, state: CLOSED

What could be the reason? Am I missing a flag which is not documented maybe?

Thanks!
Jarko

Unable to unsubsribe topic from consumer group

Offsets stored in kafka. Kafka 0.10.1. Kafka-utils 1.5.0. Kafka-python 1.3.3. Exception:

`kafka-consumer-manager -t devel --cluster-name test-1 unsubscribe_topics --topic test-topic --storage kafka mirror-maker-1
Cluster name: test-1, consumer group: mirror-maker-1
Offsets of all topics and partitions listed below shall be modified:
Topic: test-topic, Partitions: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Is this what you really intend? (y/n) y
Traceback (most recent call last):
File "/usr/bin/kafka-consumer-manager", line 6, in
run()
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/main.py", line 90, in run
args.command(args, conf)
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/unsubscribe_topics.py", line 127, in run
topics_dict,
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/unsubscribe_topics.py", line 150, in unsubscribe_topics
self.delete_topic(group, topic)
File "/usr/lib/python2.7/site-packages/kafka_utils/kafka_consumer_manager/commands/unsubscribe_topics.py", line 221, in delete_topic
offset_storage='kafka',
File "/usr/lib/python2.7/site-packages/kafka_utils/util/offsets.py", line 545, in set_consumer_offsets
callback=_check_commit_response_error
File "/usr/lib/python2.7/site-packages/retrying.py", line 49, in wrapped_f
return Retrying(*dargs, **dkw).call(f, *args, **kw)
File "/usr/lib/python2.7/site-packages/retrying.py", line 206, in call
return attempt.get(self._wrap_exception)
File "/usr/lib/python2.7/site-packages/retrying.py", line 247, in get
six.reraise(self.value[0], self.value[1], self.value[2])
File "/usr/lib/python2.7/site-packages/retrying.py", line 200, in call
attempt = Attempt(fn(*args, **kwargs), attempt_number, False)
File "/usr/lib/python2.7/site-packages/kafka_utils/util/client.py", line 69, in send_offset_commit_request_kafka
if not fail_on_error or not self._raise_on_response_error(resp)]
File "/usr/lib/python2.7/site-packages/kafka/client.py", line 413, in _raise_on_response_error
kafka.errors.check_error(resp)
File "/usr/lib/python2.7/site-packages/kafka/errors.py", line 496, in check_error
raise error_class(response)
kafka.errors.UnknownMemberIdError: [Error 25] UnknownMemberIdError: OffsetCommitResponsePayload(topic=u'test-topic', partition=0, error=25)
`

Plans for Python3 support?

kafka-utils is a great tool, but our stack is all python3, and I'd love to integrate it into our existing tooling.

Are there any plans for python3 support? Would you mind if I submitted a PR (if there hasn't been any work on this already)?

Using Offset Set

The notation in the docs says the usage for Offset Set is the following:

This command takes a group id, and a set of topics, partitions, and offsets.
kafka-consumer-manager --cluster-type test --cluster-name my_cluster offset_set my_group topic1.0.38531

Running that I get the following error:

Error: Badly formatted input, please re-run command with --help option.

When running the --help option, the usage statement says it should be

kafka-consumer-manager offset_set groupid <topic>.<partition>=<offset>

Once again running that I get an error:

Error: Unable to commit consumer offsets for:
  Topic: my_topic Partition: 0 Error: 0

The usage statements seem to differ and both aren't working for me, could someone help me with my confusion?
Thanks

kafka-rolling-restart error

Broker 172.26.0.250 is down: HTTPConnectionPool(host='172.26.0.250', port=8778): Max retries exceeded with url: /jolokia//read/kafka.server:name=UnderReplicatedPartitions,type=ReplicaManager/Value (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7f3e90c30850>: Failed to establish a new connection: [Errno 111] Connection refused',)).This maybe because it is starting up

help in running kafka-utils from command line

Hi Everyone,

I apologize in advance for the silly question. ) I am still coming up to speed on Python familiar with running python scripts either within the python CLI or via python <script name>.py.

I did the pip install of kafka-utils, but I don't understand how I can execute kafka-utils directly from the shell prompt. Any ideas?

Thanks

--John

Not able to do a rolling bounce

Hey guys,
I recently set kafka-utils up and while I was able to run a bunch of commands to check ISR etc. However, I get a lot of errors when trying to run the rolling restart. Here is the output of the command: ./kafka-rolling-restart --cluster-type alpha-cluster

Will restart the following brokers in alpha-cluster:
6001: 10.2.90.177
6002: 10.2.68.139
6003: 10.2.69.188
Do you want to restart these brokers? yes
Execute restart
Traceback (most recent call last):
File "./kafka-rolling-restart", line 6, in
run()
File "build/bdist.linux-x86_64/egg/kafka_utils/kafka_rolling_restart/main.py", line 501, in run
File "build/bdist.linux-x86_64/egg/kafka_utils/kafka_rolling_restart/main.py", line 384, in execute_rolling_restart
File "/usr/lib64/python2.7/contextlib.py", line 17, in enter
return self.gen.next()
File "build/bdist.linux-x86_64/egg/kafka_utils/util/ssh.py", line 145, in ssh
File "/usr/lib/python2.7/dist-packages/paramiko/client.py", line 307, in connect
look_for_keys, gss_auth, gss_kex, gss_deleg_creds, gss_host)
File "/usr/lib/python2.7/dist-packages/paramiko/client.py", line 520, in _auth
raise SSHException('No authentication methods available')
paramiko.ssh_exception.SSHException: No authentication methods available

Here are the contents of my alpha-cluster.yaml file:

clusters:
alpha-cluster:
broker_list:
- "broker1-IP:9092,broker2-IP:9092,broker2-IP:9092"
zookeeper: "ZK-ELB:80"
local_config:
cluster: alpha-cluster

Is there any obvious config I'm missing?

Help with configuration file (SSL enabled kafka cluster)

I have a bit of problem connecting to our cluster, our kafka cluster is SSL-configured and I am getting NoBrokersAvailable exception. I assume it has something to do with SSL-configuration but I can't find a sample config file to point the ssl/certs. Any help on this one?
also, our zookeeper and JMX port is not ssl-configured (at least not yet), so I had no problem getting stats using kafka-cluster-manager.

running kafka-utils in python3 virtual env gives error.

Hi Guys,
I have installed kafka-utils using pip. After activating the virtual environment I have created $HOME/.kafka_discovery/cluster.yaml . I tried to kafka-utils and I get the errors below:

py3venv) osboxes@osboxes:~/kafka/bin$ kafka-utils
Traceback (most recent call last):
File "/home/osboxes/py3venv/bin/kafka-utils", line 6, in
run()
File "/home/osboxes/py3venv/lib/python3.6/site-packages/kafka_utils/main.py", line 55, in run
for config in iter_configurations(args.discovery_base_path):
File "/home/osboxes/py3venv/lib/python3.6/site-packages/kafka_utils/util/config.py", line 274, in iter_configurations
config_dir,
File "/home/osboxes/py3venv/lib/python3.6/site-packages/kafka_utils/util/config.py", line 111, in init
self.load_topology_config()
File "/home/osboxes/py3venv/lib/python3.6/site-packages/kafka_utils/util/config.py", line 133, in load_topology_config
topology_config = load_yaml_config(config_path)
File "/home/osboxes/py3venv/lib/python3.6/site-packages/kafka_utils/util/config.py", line 69, in load_yaml_config
return yaml.safe_load(config_file)
File "/home/osboxes/py3venv/lib/python3.6/site-packages/yaml/init.py", line 162, in safe_load
return load(stream, SafeLoader)
File "/home/osboxes/py3venv/lib/python3.6/site-packages/yaml/init.py", line 114, in load
return loader.get_single_data()
File "/home/osboxes/py3venv/lib/python3.6/site-packages/yaml/constructor.py", line 41, in get_single_data
node = self.get_single_node()
File "/home/osboxes/py3venv/lib/python3.6/site-packages/yaml/composer.py", line 36, in get_single_node
document = self.compose_document()
File "/home/osboxes/py3venv/lib/python3.6/site-packages/yaml/composer.py", line 55, in compose_document
node = self.compose_node(None, None)
File "/home/osboxes/py3venv/lib/python3.6/site-packages/yaml/composer.py", line 84, in compose_node
node = self.compose_mapping_node(anchor)
File "/home/osboxes/py3venv/lib/python3.6/site-packages/yaml/composer.py", line 127, in compose_mapping_node
while not self.check_event(MappingEndEvent):
File "/home/osboxes/py3venv/lib/python3.6/site-packages/yaml/parser.py", line 98, in check_event
self.current_event = self.state()
File "/home/osboxes/py3venv/lib/python3.6/site-packages/yaml/parser.py", line 439, in parse_block_mapping_key
"expected , but found %r" % token.id, token.start_mark)
yaml.parser.ParserError: while parsing a block mapping
in "/home/osboxes/.kafka_discovery/cluster.yaml", line 1, column 1
expected , but found ''
in "/home/osboxes/.kafka_discovery/cluster.yaml", line 6, column 3

Any thoughts?

paramiko dependency check?

Hi. I'm a bit of a noob with python, so excuse if this doesn't belong here.

On an ubuntu 12 system (yah!) I installed kafka-utils through pip and got 1.3.1, with python 2.7. when trying the rolling restart it gave this error:

Traceback (most recent call last):
  File "/usr/local/bin/kafka-rolling-restart", line 3, in <module>
    from kafka_utils.kafka_rolling_restart.main import run
  File "/usr/local/lib/python2.7/dist-packages/kafka_utils/kafka_rolling_restart/main.py", line 36, in <module>
    from kafka_utils.util.ssh import report_stderr
  File "/usr/local/lib/python2.7/dist-packages/kafka_utils/util/ssh.py", line 25, in <module>
    from paramiko.agent import AgentRequestHandler
ImportError: cannot import name AgentRequestHandler

The error was resolved by uninstalling and reinstalling paramiko

Uninstalling paramiko-1.7.7.1:
  /usr/lib/python2.7/dist-packages/paramiko-1.7.7.1.egg-info
Proceed (y/n)? y
  Downloading paramiko-2.4.0-py2.py3-none-any.whl (192kB)
Requirement already satisfied: cryptography>=1.5 in /usr/local/lib/python2.7/dist-packages (from paramiko)
Collecting pynacl>=1.0.1 (from paramiko)
  Downloading PyNaCl-1.2.0-cp27-cp27mu-manylinux1_x86_64.whl (696kB)
Collecting pyasn1>=0.1.7 (from paramiko)
  Downloading pyasn1-0.4.2-py2.py3-none-any.whl (71kB)
Collecting bcrypt>=3.1.3 (from paramiko)
  Downloading bcrypt-3.1.4-cp27-cp27mu-manylinux1_x86_64.whl (57kB)

Despite this being an old ubuntu, I'd have thought kafka-utils would handle the correct dependency?

LZ4 decompression unsupported

We're trying to use your utils to reset a consumer offset for a topic that is using LZ4 compression. When we run the command we get the following response.

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/kafka/future.py", line 79, in _call_backs
    f(value)
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 760, in _handle_fetch_response
    unpacked = list(self._unpack_message_set(tp, messages))
  File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/fetcher.py", line 482, in _unpack_message_set
    inner_mset = msg.decompress()
  File "/usr/local/lib/python2.7/dist-packages/kafka/protocol/message.py", line 129, in decompress
    assert has_lz4(), 'LZ4 decompression unsupported'
AssertionError: LZ4 decompression unsupported
ERROR:kafka.future:Error processing callback

Rebalancing Multiple Log Dirs

Hi, currently it doesn't look like rebalancing across multiple log directories are supported (with disk-size in consideration). Do you know this is in the plan or if it's supported already?

BufferUnderflowError: Not enough data left

Everytime I try to execute commands from the Consumer Manager set I get the following error:

Traceback (most recent call last):
  File "/usr/local/bin/kafka-consumer-manager", line 6, in <module>
    run()
  File "/usr/local/lib/python3.6/site-packages/kafka_utils/kafka_consumer_manager/main.py", line 121, in run
    args.command(args, conf)
  File "/usr/local/lib/python3.6/site-packages/kafka_utils/kafka_consumer_manager/commands/list_topics.py", line 57, in run
    fail_on_error=False,
  File "/usr/local/lib/python3.6/site-packages/kafka_utils/kafka_consumer_manager/commands/offset_manager.py", line 82, in preprocess_args
    groupid,
  File "/usr/local/lib/python3.6/site-packages/kafka_utils/kafka_consumer_manager/commands/offset_manager.py", line 37, in get_topics_from_consumer_group_id
    return cls.get_topics_for_group_from_kafka(cluster_config, groupid)
  File "/usr/local/lib/python3.6/site-packages/kafka_utils/kafka_consumer_manager/commands/offset_manager.py", line 147, in get_topics_for_group_from_kafka
    group_topics = kafka_group_reader.read_group(groupid)
  File "/usr/local/lib/python3.6/site-packages/kafka_utils/kafka_consumer_manager/util.py", line 270, in read_group
    return self.read_groups(partition).get(group_id, [])
  File "/usr/local/lib/python3.6/site-packages/kafka_utils/kafka_consumer_manager/util.py", line 322, in read_groups
    self.process_consumer_offset_message(message)
  File "/usr/local/lib/python3.6/site-packages/kafka_utils/kafka_consumer_manager/util.py", line 382, in process_consumer_offset_message
    group, topic, partition, offset = self.parse_consumer_offset_message(message)
  File "/usr/local/lib/python3.6/site-packages/kafka_utils/kafka_consumer_manager/util.py", line 367, in parse_consumer_offset_message
    (group, cur) = read_short_string(key, cur)
  File "/usr/local/lib/python3.6/site-packages/kafka/util.py", line 51, in read_short_string
    raise BufferUnderflowError("Not enough data left")
kafka.errors.BufferUnderflowError: BufferUnderflowError: Not enough data left

Any suggestions why this error occurs?

Upgrade requests to version 2.20.0 or later (CVE-2018-18074)

GitHub flagged the following security vulnerability affecting this repository.

https://nvd.nist.gov/vuln/detail/CVE-2018-18074

CVE-2018-18074
moderate severity
Vulnerable versions: <= 2.19.1
Patched version: 2.20.0
The Requests package through 2.19.1 before 2018-09-14 for Python sends an HTTP Authorization header to an http URI upon receiving a same-hostname https-to-http redirect, which makes it easier for remote attackers to discover credentials by sniffing the network.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.