Coder Social home page Coder Social logo

mongey / terraform-provider-kafka Goto Github PK

View Code? Open in Web Editor NEW
499.0 21.0 126.0 11 MB

Terraform provider for managing Apache Kafka Topics + ACLs

License: MIT License

Go 96.38% Shell 2.39% Makefile 0.40% Dockerfile 0.11% HTML 0.71%
kafka terraform-provider terraform kafka-topic kafka-acl

terraform-provider-kafka's Introduction

terraform-provider-kafka

CircleCI

A Terraform plugin for managing Apache Kafka.

Contents

Installation

terraform-provider-kafka is available on the terraform registry. To install, add the below into your main.tf and execute terraform init

terraform {
  required_providers {
    kafka = {
      source = "Mongey/kafka"
    }
  }
}

provider "kafka" {
  bootstrap_servers = ["localhost:9092"]
  ca_cert           = file("../secrets/ca.crt")
  client_cert       = file("../secrets/terraform-cert.pem")
  client_key        = file("../secrets/terraform.pem")
  tls_enabled       = true
}

Otherwise, install by downloading and extracting the latest release to your terraform plugin directory (typically ~/.terraform.d/plugins/)

Developing

  1. Install go
  2. Clone repository to: $GOPATH/src/github.com/Mongey/terraform-provider-kafka
    mkdir -p $GOPATH/src/github.com/Mongey/terraform-provider-kafka; cd $GOPATH/src/github.com/Mongey/
    git clone https://github.com/Mongey/terraform-provider-kafka.git
    cd terraform-provider-kafka
  3. Build the provider make build
  4. Run the tests make test
  5. Start a TLS enabled kafka-cluster docker-compose up
  6. Run the acceptance tests make testacc

Provider Configuration

Example

Example provider with TLS client authentication.

provider "kafka" {
  bootstrap_servers = ["localhost:9092"]
  ca_cert           = file("../secrets/ca.crt")
  client_cert       = file("../secrets/terraform-cert.pem")
  client_key        = file("../secrets/terraform.pem")
  tls_enabled       = true
}

Example provider with aws-iam(Assume role) client authentication.

provider "kafka" {
  bootstrap_servers = ["localhost:9098"]
  tls_enabled       = true
  sasl_mechanism    = "aws-iam"
  sasl_aws_region   = "us-east-1"
  sasl_aws_role_arn = "arn:aws:iam::account:role/role-name"
}

Example provider with aws-iam(Aws Profile) client authentication.

provider "kafka" {
  bootstrap_servers = ["localhost:9098"]
  tls_enabled       = true
  sasl_mechanism    = "aws-iam"
  sasl_aws_region   = "us-east-1"
  sasl_aws_profile  = "dev"
}

Example provider with aws-iam(Static Creds) client authentication. You have to export AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN(Optional if you are using temp creds)

provider "kafka" {
  bootstrap_servers = ["localhost:9098"]
  tls_enabled       = true
  sasl_mechanism    = "aws-iam"
  sasl_aws_region   = "us-east-1"
}

Compatibility with Redpanda

provider "kafka" {
  bootstrap_servers = ["localhost:9092"]
  kafka_version = "2.1.0"
}

Due to Redpanda not implementing some Metadata APIs, we need to force the Kafka version to use when creating the provider.

Property Description Default
bootstrap_servers A list of host:port addresses that will be used to discover the full set of alive brokers Required
ca_cert The CA certificate or path to a CA certificate file in PEM format to validate the server's certificate. ""
client_cert The client certificate or path to a file containing the client certificate in PEM format. Use for Client authentication to Kafka.
If you have Intermediate CA certificate(s) append them to client_cert.
""
client_key The private key or path to a file containing the private key that the client certificate was issued for. ""
client_key_passphrase The passphrase for the private key that the certificate was issued for. ""
kafka_version The version of Kafka protocol to use in $MAJOR.$MINOR.$PATCH format. Some features may not be available on older versions. ""
tls_enabled Enable communication with the Kafka Cluster over TLS. true
skip_tls_verify Skip TLS verification. false
sasl_username Username for SASL authentication. ""
sasl_password Password for SASL authentication. ""
sasl_mechanism Mechanism for SASL authentication. Allowed values are plain, aws-iam, scram-sha256, scram-sha512 or oauthbearer plain
sasl_aws_region AWS region for IAM authentication. ""
sasl_aws_role_arn Arn of AWS IAM role to assume for IAM authentication. ""
sasl_aws_profile AWS profile to use for IAM authentication. ""
sasl_aws_creds_debug Enable debug logging for AWS authentication. false
sasl_token_url The url to retrieve oauth2 tokens from, when using sasl mechanism oauthbearer ""

Resources

kafka_topic

A resource for managing Kafka topics. Increases partition count without destroying the topic.

Example

provider "kafka" {
  bootstrap_servers = ["localhost:9092"]
}

resource "kafka_topic" "logs" {
  name               = "systemd_logs"
  replication_factor = 2
  partitions         = 100

  config = {
    "segment.ms"     = "20000"
    "cleanup.policy" = "compact"
  }
}

Properties

Property Description
name The name of the topic
partitions The number of partitions the topic should have
replication_factor The number of replicas the topic should have
config A map of string K/V attributes

Importing Existing Topics

You can import topics with the following

terraform import kafka_topic.logs systemd_logs

kafka_acl

A resource for managing Kafka ACLs.

Example

provider "kafka" {
  bootstrap_servers = ["localhost:9092"]
  ca_cert           = file("../secrets/ca.crt")
  client_cert       = file("../secrets/terraform-cert.pem")
  client_key        = file("../secrets/terraform.pem")
}

resource "kafka_acl" "test" {
  resource_name       = "syslog"
  resource_type       = "Topic"
  acl_principal       = "User:Alice"
  acl_host            = "*"
  acl_operation       = "Write"
  acl_permission_type = "Deny"
}

Properties

Property Description Valid values
acl_principal Principal that is being allowed or denied *
acl_host Host from which principal listed in acl_principal will have access *
acl_operation Operation that is being allowed or denied Unknown, Any, All, Read, Write, Create, Delete, Alter, Describe, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite
acl_permission_type Type of permission Unknown, Any, Allow, Deny
resource_name The name of the resource *
resource_type The type of resource Unknown, Any, Topic, Group, Cluster, TransactionalID
resource_pattern_type_filter Prefixed, Any, Match, Literal

Importing Existing ACLs

For import, use as a parameter the items separated by | character. Quote it to avoid shell expansion.

# Fields in shell notation are
# ${acl_principal}|${acl_host}|${acl_operation}|${acl_permission_type}|${resource_type}|${resource_name}|${resource_pattern_type_filter}
terraform import kafka_acl.admin 'User:12345|*|Describe|Allow|Topic|experimental-topic|Prefixed'

kafka_quota

A resource for managing Kafka Quotas.

Example

provider "kafka" {
  bootstrap_servers = ["localhost:9092"]
  ca_cert           = file("../secrets/ca.crt")
  client_cert       = file("../secrets/terraform-cert.pem")
  client_key        = file("../secrets/terraform.pem")
}

resource "kafka_quota" "test" {
  entity_name       = "client1"
  entity_type       = "client-id"
  config = {
    "consumer_byte_rate" = "4000000"
    "producer_byte_rate" = "3500000"
  }
}

Properties

Property Description
entity_name The name of the entity
entity_type The entity type (client-id, user, ip)
config A map of string attributes for the entity

kafka_user_scram_credential

A resource for managing Kafka SCRAM user credentials.

Example

provider "kafka" {
  bootstrap_servers = ["localhost:9092"]
  ca_cert           = file("../secrets/ca.crt")
  client_cert       = file("../secrets/terraform-cert.pem")
  client_key        = file("../secrets/terraform.pem")
}

resource "kafka_user_scram_credential" "test" {
  username               = "user1"
  scram_mechanism        = "SCRAM-SHA-256"
  scram_iterations       = "8192"
  password               = "password"
}

Importing Existing SCRAM user credentials

For import, use as a parameter the items separated by | character. Quote it to avoid shell expansion.

# Fields in shell notation are
# ${username}|${scram_mechanism}|${password}
terraform import kafka_user_scram_credential.test 'user1|SCRAM-SHA-256|password'

Properties

Property Description
username The username
scram_mechanism The SCRAM mechanism (SCRAM-SHA-256 or SCRAM-SHA-512)
scram_iterations The number of SCRAM iterations (must be >= 4096). Default: 4096
password The password for the user

Requirements

terraform-provider-kafka's People

Contributors

adrien-f avatar aprice-olo avatar constantin07 avatar dalvizu avatar dependabot[bot] avatar dnwe avatar donovanbai-dd avatar dvdliao avatar elntagka avatar errm avatar g3zz avatar jaapterwoerds avatar jeqo avatar kmdlcp avatar llamahunter avatar luisfmcalado avatar mccullya avatar mileswilson avatar mongey avatar mrluje avatar pablo-ruth avatar rogerzxu avatar sahilkang avatar sanchospancho avatar sappusaketh avatar sbuliarca avatar squ1d123 avatar stack72 avatar stevie- avatar ymatsiuk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

terraform-provider-kafka's Issues

Topic deleted from state store when ACL on topic is deleted

It seems that when both a topic and a related ACL are defined in Terraform, if you subsequently delete the terraform ACL definition, this will force the deletion of the topic from the state store, and then attempt to create the same topic which already exists. Example scenario below:

~/kafka_acl_issue $ ls -l
total 20
-rw-r--r-- 1 mat mat 233 May 28 21:07 acl.tf
-rw-r--r-- 1 mat mat 109 May 28 21:12 providers.tf
-rw-r--r-- 1 mat mat 156 May 28 21:39 terraform.tfstate
-rw-r--r-- 1 mat mat 564 May 28 21:39 terraform.tfstate.backup
-rw-r--r-- 1 mat mat 126 May 28 21:06 topic.tf

~/kafka_acl_issue $ cat topic.tf 
resource "kafka_topic" "test_topic" {
  name               = "test.topic"
  replication_factor = 1
  partitions         = 1
}

~/kafka_acl_issue $ cat acl.tf 
resource "kafka_acl" "test_acl" {
  resource_name       = "test.topic"
  resource_type       = "Topic"
  acl_principal       = "User:Alice"
  acl_host            = "*"
  acl_operation       = "Write"
  acl_permission_type = "Deny"
}

~/kafka_acl_issue $ terraform apply

An execution plan has been generated and is shown below.
Resource actions are indicated with the following symbols:
  + create

Terraform will perform the following actions:

  # kafka_acl.test_acl will be created
  + resource "kafka_acl" "test_acl" {
      + acl_host                     = "*"
      + acl_operation                = "Write"
      + acl_permission_type          = "Deny"
      + acl_principal                = "User:Alice"
      + id                           = (known after apply)
      + resource_name                = "test.topic"
      + resource_pattern_type_filter = "Literal"
      + resource_type                = "Topic"
    }

  # kafka_topic.test_topic will be created
  + resource "kafka_topic" "test_topic" {
      + id                 = (known after apply)
      + name               = "test.topic"
      + partitions         = 1
      + replication_factor = 1
    }

Plan: 2 to add, 0 to change, 0 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: yes

kafka_topic.test_topic: Creating...
kafka_acl.test_acl: Creating...
kafka_topic.test_topic: Creation complete after 0s [id=test.topic]
kafka_acl.test_acl: Creation complete after 0s [id=User:Alice|*|Write|Deny|Topic|test.topic|Literal]

Apply complete! Resources: 2 added, 0 changed, 0 destroyed.

~/kafka_acl_issue $ mv acl.tf acl.tf.disabled

~/kafka_acl_issue $ terraform apply
kafka_acl.test_acl: Refreshing state... [id=User:Alice|*|Write|Deny|Topic|test.topic|Literal]
kafka_topic.test_topic: Refreshing state... [id=test.topic]

An execution plan has been generated and is shown below.
Resource actions are indicated with the following symbols:
  + create
  - destroy

Terraform will perform the following actions:

  # kafka_acl.test_acl will be destroyed
  - resource "kafka_acl" "test_acl" {
      - acl_host                     = "*" -> null
      - acl_operation                = "Write" -> null
      - acl_permission_type          = "Deny" -> null
      - acl_principal                = "User:Alice" -> null
      - id                           = "User:Alice|*|Write|Deny|Topic|test.topic|Literal" -> null
      - resource_name                = "test.topic" -> null
      - resource_pattern_type_filter = "Literal" -> null
      - resource_type                = "Topic" -> null
    }

  # kafka_topic.test_topic will be created
  + resource "kafka_topic" "test_topic" {
      + id                 = (known after apply)
      + name               = "test.topic"
      + partitions         = 1
      + replication_factor = 1
    }

Plan: 1 to add, 0 to change, 1 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: yes

kafka_acl.test_acl: Destroying... [id=User:Alice|*|Write|Deny|Topic|test.topic|Literal]
kafka_topic.test_topic: Creating...
kafka_acl.test_acl: Destruction complete after 0s

Error: kafka server: Topic with this name already exists.

  on topic.tf line 1, in resource "kafka_topic" "test_topic":
   1: resource "kafka_topic" "test_topic" {


~/kafka_acl_issue $ cat terraform.tfstate
{
  "version": 4,
  "terraform_version": "0.12.0",
  "serial": 11,
  "lineage": "9fa0063f-476c-8bfb-0002-9ebc7478dd46",
  "outputs": {},
  "resources": []
}
~/kafka_acl_issue $ 

Error when running `terraform apply` a second time on a topic without config

First of all : creating a topic works fine, importing a topic also works, and finally deleting the topic works fine as well.

Here is my configuration :

provider "kafka" {
  alias             = "kafk00"
  bootstrap_servers = ["kafk00:9092"]
  timeout           = 90000
}

resource "kafka_topic" "topic00" {
  provider           = "kafka.kafk00"

  name               = "topic00"
  replication_factor = 3
  partitions         = 1
}

I can run terraform apply with succes :

An execution plan has been generated and is shown below.
Resource actions are indicated with the following symbols:
  + create

Terraform will perform the following actions:

  + kafka_topic.topic00
      id:                 <computed>
      name:               "topic00"
      partitions:         "1"
      replication_factor: "3"


Plan: 1 to add, 0 to change, 0 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: yes

kafka_topic.topic00: Creating...
  name:               "" => "topic00"
  partitions:         "" => "1"
  replication_factor: "" => "3"
kafka_topic.topic00: Creation complete after 0s (ID: topic00)

Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

Logs on the kafka server :

2018-11-28T19:21:19.904533+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,904] INFO Topic creation Map(topic00-0 -> ArrayBuffer(3, 1, 2)) (kafka.zk.AdminZkClient)
2018-11-28T19:21:19.925127+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,924] INFO Replica loaded for partition topic00-0 with initial high watermark 0 (kafka.cluster.Replica)
2018-11-28T19:21:19.927577+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,927] INFO [Log partition=topic00-0, dir=/var/log/kafka/main] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
2018-11-28T19:21:19.928153+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,927] INFO [Log partition=topic00-0, dir=/var/log/kafka/main] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 1 ms (kafka.log.Log)
2018-11-28T19:21:19.931738+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,928] INFO Created log for partition topic00-0 in /var/log/kafka/main with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 536870912, retention.ms -> 172800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
2018-11-28T19:21:19.932316+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,928] INFO [Partition topic00-0 broker=1] No checkpointed highwatermark is found for partition topic00-0 (kafka.cluster.Partition)
2018-11-28T19:21:19.932915+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,928] INFO Replica loaded for partition topic00-0 with initial high watermark 0 (kafka.cluster.Replica)
2018-11-28T19:21:19.933426+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,928] INFO Replica loaded for partition topic00-0 with initial high watermark 0 (kafka.cluster.Replica)
2018-11-28T19:21:19.933932+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,929] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(topic00-0) (kafka.server.ReplicaFetcherManager)
2018-11-28T19:21:19.934465+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:19,929] INFO [ReplicaFetcherManager on broker 1] Added fetcher to broker BrokerEndPoint(id=3, host=de-00-kafk00-0002.node.consul:9092) for partitions Map(topic00-0 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)
2018-11-28T19:21:20.203266+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:20,202] WARN [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Based on replica's leader epoch, leader replied with an unknown offset in topic00-0. The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)
2018-11-28T19:21:20.204245+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:21:20,204] INFO [Log partition=topic00-0, dir=/var/log/kafka/main] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)

But if I run terraform apply again it will not only try to change the topic but will also fail :

kafka_topic.topic00: Refreshing state... (ID: topic00)

An execution plan has been generated and is shown below.
Resource actions are indicated with the following symbols:
  ~ update in-place

Terraform will perform the following actions:

  ~ kafka_topic.topic00
      config.%:             "1" => "0"
      config.segment.bytes: "536870912" => ""


Plan: 0 to add, 1 to change, 0 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: yes

kafka_topic.topic00: Modifying... (ID: topic00)
  config.%:             "1" => "0"
  config.segment.bytes: "536870912" => ""
kafka_topic.topic00: Still modifying... (ID: topic00, 10s elapsed)
kafka_topic.topic00: Still modifying... (ID: topic00, 20s elapsed)
kafka_topic.topic00: Still modifying... (ID: topic00, 30s elapsed)
kafka_topic.topic00: Still modifying... (ID: topic00, 40s elapsed)

Error: Error applying plan:

1 error(s) occurred:

* kafka_topic.topic00: 1 error(s) occurred:

* kafka_topic.topic00: Error waiting for topic (topic00) to become ready: couldn't find resource (21 retries)

Terraform does not automatically rollback in the face of errors.
Instead, your Terraform state file has been partially updated with
any resources that successfully completed. Please address the error
above and apply again to incrementally change your infrastructure.

Logs on the kafka server :

2018-11-28T19:22:40.501062+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:22:40,500] INFO Processing notification(s) to /config/changes (kafka.common.ZkNodeChangeNotificationListener)
2018-11-28T19:22:40.503525+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:22:40,503] INFO Processing override for entityPath: topics/topic00 with config: Map() (kafka.server.DynamicConfigManager)
2018-11-28T19:22:40.522534+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:22:40,522] INFO Processing notification(s) to /config/changes (kafka.common.ZkNodeChangeNotificationListener)

Finally : if I try to change the number of partitions or add some configuration it works flawlessly :

New config :

resource "kafka_topic" "topic00" {
  provider           = "kafka.kafk00"

  name               = "topic00"
  replication_factor = 3
  partitions         = 3
  config = {
    cleanup.policy     = "compact"
    segment.bytes      = "104857600"
    compression.type   = "producer"
  }

}

Result :

kafka_topic.topic00: Refreshing state... (ID: topic00)

An execution plan has been generated and is shown below.
Resource actions are indicated with the following symbols:
  ~ update in-place

Terraform will perform the following actions:

  ~ kafka_topic.topic00
      config.%:                "1" => "3"
      config.cleanup.policy:   "" => "compact"
      config.compression.type: "" => "producer"
      config.segment.bytes:    "536870912" => "104857600"
      partitions:              "1" => "3"


Plan: 0 to add, 1 to change, 0 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: yes

kafka_topic.topic00: Modifying... (ID: topic00)
  config.%:                "1" => "3"
  config.cleanup.policy:   "" => "compact"
  config.compression.type: "" => "producer"
  config.segment.bytes:    "536870912" => "104857600"
  partitions:              "1" => "3"
kafka_topic.topic00: Modifications complete after 3s (ID: topic00)

Apply complete! Resources: 0 added, 1 changed, 0 destroyed.

Logs Kafka :

2018-11-28T19:26:53.976255+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:53,976] INFO Processing notification(s) to /config/changes (kafka.common.ZkNodeChangeNotificationListener)
2018-11-28T19:26:53.978289+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:53,977] INFO Processing override for entityPath: topics/topic00 with config: Map(cleanup.policy -> compact, segment.bytes -> 104857600, compression.type -> producer) (kafka.server.DynamicConfigManager)
2018-11-28T19:26:54.038541+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,038] INFO Creating 2 partitions for 'topic00' with the following replica assignment: Map(2 -> ArrayBuffer(2, 3, 1), 1 -> ArrayBuffer(1, 2, 3)). (kafka.zk.AdminZkClient)
2018-11-28T19:26:54.039314+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,038] INFO Topic update Map(topic00-0 -> Vector(3, 1, 2), topic00-2 -> ArrayBuffer(2, 3, 1), topic00-1 -> ArrayBuffer(1, 2, 3)) (kafka.zk.AdminZkClient)
2018-11-28T19:26:54.065498+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,065] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(topic00-1) (kafka.server.ReplicaFetcherManager)
2018-11-28T19:26:54.069196+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,069] INFO [Log partition=topic00-1, dir=/var/log/kafka/main] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
2018-11-28T19:26:54.070156+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,069] INFO [Log partition=topic00-1, dir=/var/log/kafka/main] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 1 ms (kafka.log.Log)
2018-11-28T19:26:54.070961+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,070] INFO Created log for partition topic00-1 in /var/log/kafka/main with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> compact, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 104857600, retention.ms -> 172800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
2018-11-28T19:26:54.071654+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,070] INFO [Partition topic00-1 broker=1] No checkpointed highwatermark is found for partition topic00-1 (kafka.cluster.Partition)
2018-11-28T19:26:54.072341+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,070] INFO Replica loaded for partition topic00-1 with initial high watermark 0 (kafka.cluster.Replica)
2018-11-28T19:26:54.073007+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,070] INFO Replica loaded for partition topic00-1 with initial high watermark 0 (kafka.cluster.Replica)
2018-11-28T19:26:54.073666+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,070] INFO Replica loaded for partition topic00-1 with initial high watermark 0 (kafka.cluster.Replica)
2018-11-28T19:26:54.074291+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,071] INFO [Partition topic00-1 broker=1] topic00-1 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
2018-11-28T19:26:54.082191+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,081] INFO Replica loaded for partition topic00-2 with initial high watermark 0 (kafka.cluster.Replica)
2018-11-28T19:26:54.082967+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,081] INFO Replica loaded for partition topic00-2 with initial high watermark 0 (kafka.cluster.Replica)
2018-11-28T19:26:54.084792+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,084] INFO [Log partition=topic00-2, dir=/var/log/kafka/main] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
2018-11-28T19:26:54.085374+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,085] INFO [Log partition=topic00-2, dir=/var/log/kafka/main] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 2 ms (kafka.log.Log)
2018-11-28T19:26:54.086114+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,086] INFO Created log for partition topic00-2 in /var/log/kafka/main with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> compact, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 104857600, retention.ms -> 172800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
2018-11-28T19:26:54.086842+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,086] INFO [Partition topic00-2 broker=1] No checkpointed highwatermark is found for partition topic00-2 (kafka.cluster.Partition)
2018-11-28T19:26:54.087375+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,086] INFO Replica loaded for partition topic00-2 with initial high watermark 0 (kafka.cluster.Replica)
2018-11-28T19:26:54.088184+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,087] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(topic00-2) (kafka.server.ReplicaFetcherManager)
2018-11-28T19:26:54.089003+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,087] INFO [ReplicaFetcherManager on broker 1] Added fetcher to broker BrokerEndPoint(id=2, host=de-00-kafk00-0001.node.consul:9092) for partitions Map(topic00-2 -> (offset=0, leaderEpoch=0)) (kafka.server.ReplicaFetcherManager)
2018-11-28T19:26:54.388024+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,387] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Based on replica's leader epoch, leader replied with an unknown offset in topic00-2. The initial fetch offset 0 will be used for truncation. (kafka.server.ReplicaFetcherThread)
2018-11-28T19:26:54.389008+00:00 kafk00 daemon info kafka-server-start.sh[3819]: [2018-11-28 19:26:54,388] INFO [Log partition=topic00-2, dir=/var/log/kafka/main] Truncating to 0 has no effect as the largest offset in the log is -1 (kafka.log.Log)

I believe this is not the intended behavior

kafka_topic resource idempotency error

Using the example here: https://github.com/Mongey/terraform-provider-kafka/tree/master/examples , it always attempts to create the kafka_topic.syslog resource which results in Error: kafka server: Topic with this name already exists.

To reproduce:

  • start kafka and zookeeper via the docker-compose provided in the project
  • apply the terraform configuration provided in the example folder
  • run a terraform plan, you would shows it intends to create this topic again and applying would result in the error quoted above.

v0.2.3 Error: kafka: client has run out of available brokers to talk to

Hi,

We've been using v0.2.2 to create kafka topics without any issues, but I'm getting this error after upgrading to v0.2.3

Error: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

I'm using the terraform-provider-kafka_0.2.3_darwin_amd64 release on macOS 10.15.2

Our provider config is very basic, such as:

provider "kafka" {
bootstrap_servers = ["kafka.example.com:9092"]
}

Unable to auth via SASL over SSL

I can't seem to get this to work when a listener is configured to use SASL_SSL.
For example, let's say your broker has the following config:

listeners=SASL_SSL://somehost:9092

With the TF config as follows:

provider "kafka" {
  bootstrap_servers = ["somehost:9092"]
  tls_enabled = true
  sasl_username = "username"
  sasl_password = "password"
  skip_tls_verify   = true
}

I just get the following error

Error running plan: 1 error(s) occurred:

* provider.kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

I can connect via a java client, using the following jaas config:

KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required
        username="username"
        password="password";
    };

(I tested changing the password to something incorrect, and then I can't connect so it does seem to be working).

I'm using the following opts in the working java code:

        PROPS.put("security.protocol","SASL_SSL");
        PROPS.put("sasl.mechanism", "PLAIN");

Any idea what I might be doing wrong?

kafka_acl acl_operation to take a list

to mimick a producer, one needs describe, write and create actions on a topic. It would be nice if the kafka_acl resource could take a list of operations, because currently one needs to replicate the resource with the different operations

Is your cluster reachable

I'm trying to use this plugin over in SSL auth config but I keep getting the above error. I've even tried it directly on the broker, but it still doesn't work. Am I missing something obvious?

Error: Error running plan: 1 error occurred:
* provider.kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

I'm using kafka 2.1.1 on centos 7 if that's at all helpful.

Export newKafkaConfig()

Hello,

I'm working on a terraformer provider for kafka using your kafka provider.
It it possible to export the function newKafkaConfig() so we can use from terraformer ?

Regards

Error getting request for apiKey: DESCRIBE_CONFIGS

With TF v0.11.11 and Kafka 1.0.2 we get the following error after successfully creating a topic:

Feb 28 15:47:43 ip-10-20-0-4 docker[13089]: org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: DESCRIBE_CONFIGS, apiVersion: 1, connectionId: 10.20.0.4:9093-172.19.202.139:49218-4, listenerName: ListenerName(SSL), principal: User:CN=provision,OU=SRE,O=XXX,L=Amsterdam,ST=Noord-Holland,C=NL

import issues

Import is unable to use list generated from locals

provider "consul" {
  address    = var.consul_address
  datacenter = "aws-ue1"
}

data "consul_service" "kafka_cluster" {
    name = "kafka-cluster"
    datacenter = "aws-ue1"
}

locals {
bootstrap_servers = [
 for i in data.consul_service.kafka_cluster.service : {
        bootstrap ="${i.node_address}:${i.port}"
   }
]
}

provider "kafka" {
  bootstrap_servers = formatlist("%s",[for node in local.bootstrap_servers: node.bootstrap])
  tls_enabled = false
  skip_tls_verify = true

When I attempt to insert it I am getting bootstrap_servers was not set

2020-02-04T10:19:32.625-0600 [INFO]  plugin.terraform-provider-kafka_v0.2.3: configuring server automatic mTLS: timestamp=2020-02-04T10:19:32.625-0600
2020-02-04T10:19:32.653-0600 [DEBUG] plugin: using plugin: version=5
2020-02-04T10:19:32.653-0600 [DEBUG] plugin.terraform-provider-kafka_v0.2.3: plugin address: address=/var/folders/3n/kcphbjs52992xcll2ywhjjb00000gp/T/plugin381724773 network=unix timestamp=2020-02-04T10:19:32.653-0600
2020-02-04T10:19:32.708-0600 [DEBUG] plugin.terraform-provider-kafka_v0.2.3: 2020/02/04 10:19:32 [DEBUG] configuring provider with Brokers @ <nil>
2020/02/04 10:19:32 [ERROR] <root>: eval: *terraform.EvalConfigProvider, err: bootstrap_servers was not set
2020/02/04 10:19:32 [ERROR] <root>: eval: *terraform.EvalSequence, err: bootstrap_servers was not set
2020/02/04 10:19:32 [ERROR] <root>: eval: *terraform.EvalOpFilter, err: bootstrap_servers was not set
2020/02/04 10:19:32 [ERROR] <root>: eval: *terraform.EvalSequence, err: bootstrap_servers was not set

Error: bootstrap_servers was not set

Error upon go build

Hello,

Trying to build this provider so I can use it in terraform.

Upon building the provider, the following error appears

# _/terraform-provider-kafka
./main.go:9:45: cannot use kafka.Provider (type func() "github.com/Mongey/terraform-provider-kafka/vendor/github.com/hashicorp/terraform/terraform".ResourceProvider) as type "github.com/hashicorp/terraform/plugin".ProviderFunc in field value
go build  50.23s user 9.76s system 465% cpu 12.888 total

License?

Hello,
this project looks exactly what I was looking for to better configure our Kafka cluster. Sadly, my company is a little reluctant to use open source projects that don't have a license specified. Do you mind adding a LICENSE file to this repo? It would help me out a lot, thanks!

Similarly it would be good for your kafka-connect provider (https://github.com/Mongey/terraform-provider-kafka-connect)

Pick highest supported API based on broker version

See #43 & #39

From http://kafka.apache.org/protocol.html#api_versions

In order to work against multiple broker versions, clients need to know what versions of various APIs a broker supports. The broker exposes this information since 0.10.0.0 as described in KIP-35. Clients should use the supported API versions information to choose the highest API version supported by both client and broker. If no such version exists, an error should be reported to the user.

The following sequence may be used by a client to obtain supported API versions from a broker.

Client sends ApiVersionsRequest to a broker after connection has been established with the broker. If SSL is enabled, this happens after SSL connection has been established.
On receiving ApiVersionsRequest, a broker returns its full list of supported ApiKeys and versions regardless of current authentication state (e.g., before SASL authentication on an SASL listener, do note that no Kafka protocol requests may take place on a SSL listener before the SSL handshake is finished). If this is considered to leak information about the broker version a workaround is to use SSL with client authentication which is performed at an earlier stage of the connection where the ApiVersionRequest is not available. Also, note that broker versions older than 0.10.0.0 do not support this API and will either ignore the request or close connection in response to the request.
If multiple versions of an API are supported by broker and client, clients are recommended to use the latest version supported by the broker and itself.
Deprecation of a protocol version is done by marking an API version as deprecated in the protocol documentation.
Supported API versions obtained from a broker are only valid for the connection on which that information is obtained. In the event of disconnection, the client should obtain the information from the broker again, as the broker might have been upgraded/downgraded in the mean time.

ACL created without resource_pattern_type_filter can't be removed

Putting this to good use with a Confluent Cloud setup. Many thanks for writing the provider!

Given an ACL resource like this:

resource "kafka_acl" "allow_idempotent_write_in_cluster" {
  resource_name       = "kafka-cluster"
  resource_type       = "Cluster"
  acl_principal       = var.principal
  acl_host            = "*"
  acl_operation       = "IdempotentWrite"
  acl_permission_type = "Allow"
}

When that gets removed from the terraform config, applying the ACL deletion will fail with:

2019/11/20 14:06:42 [DEBUG] module.topic_write_acl.kafka_acl.allow_idempotent_write_in_cluster: apply errored, but we're indicating that via the Error pointer rather than returning it: There were no acls matching this filter
2019/11/20 14:06:42 [TRACE] module.topic_write_acl: eval: *terraform.EvalWriteState
2019/11/20 14:06:42 [TRACE] EvalWriteState: writing current state object for module.topic_write_acl.kafka_acl.allow_idempotent_write_in_cluster
2019/11/20 14:06:42 [ERROR] module.topic_write_acl: eval: *terraform.EvalApplyPost, err: There were no acls matching this filter
2019/11/20 14:06:42 [ERROR] module.topic_write_acl: eval: *terraform.EvalSequence, err: There were no acls matching this filter
2019/11/20 14:06:42 [ERROR] module.topic_write_acl: eval: *terraform.EvalOpFilter, err: There were no acls matching this filter
2019/11/20 14:06:42 [TRACE] [walkApply] Exiting eval tree: module.topic_write_acl.kafka_acl.allow_idempotent_write_in_cluster (destroy)

A related error during terraform plan may be:

2019/11/20 14:05:57 [WARN] Provider "kafka" produced an invalid plan for module.topic_write_acl.kafka_acl.allow_idempotent_write_in_cluster, but we are tolerating it because it is using the legacy plugin SDK.
    The following problems may be the cause of any confusing errors from downstream operations:
      - .resource_pattern_type_filter: planned value cty.StringVal("Literal") does not match config value cty.NullVal(cty.String)

It seems the resource_pattern_type_filter variable is not required and defaults to "Literal" which could cause this discrepancy between the planned value and config?

https://github.com/Mongey/terraform-provider-kafka/blob/master/kafka/resource_kafka_acl.go#L35

Documentation

Hi @Mongey

Would you accept a PR that also added the docs in a website folder structure to mirror that of other Terraform providers?

This would help us generate a Pulumi provider based off the structure :)

Paul

Provider versioning support

Terraform provides a way to constraint provider versions, but this requires plugin filename to end with version number in the format "_vx.y.z". Usage is explained in detail in https://www.terraform.io/docs/configuration/providers.html#plugin-names-and-versions. Feature can then be used within modules by applying constraints such as:

terraform {
 required_version > ">= 0.12"

 required_providers {
    aws = ">= 2.14.0"
 }
}

See https://www.terraform.io/docs/configuration/providers.html#version-provider-versions for more details

This is especially useful in cases where Terraform versions are upgraded within organizations and where custom providers require manual installations, as old plugin versions are not necessarily compatible with the new Terraform release.

Currently Kafka provider version is displayed as follows, which prevents usage of versioning constraint:

$ terraform version
Terraform v0.12.1
+ provider.aws v2.13.0
+ provider.kafka (unversioned)

Kindly requesting that versioning support is added as part of the provider

provider for windows

is there a plan to release provider for windows also? I can only see linux version of it in releases.

Chicken and Egg

Got a tricky little issue when using it on a experimental development cluster. If you build up a development kafka cluster using terraform EC2 instances, then add a bunch of topics using this provider, then decide you've screwed up the EC2 instances running kafka, so tweak your EC2 terraform, and terminate all the cluster instances, then run terraform apply to re-create them, this provider cannot connect to the cluster, so causes the tf apply to fail, you therefore can't recreate the cluster. You can't even apply any unrelated changes. The only way I could find around this was to manually unpick all the kafka resources from the state file.

I've never done a custom provider, so this could be a load of rubbish, but is there a way to allow all the other TF resources to apply (EC2 create) before even attempting to check the kafka topics ?

Adding partitions recreates the topic

-/+ kafka_topic.test (new resource required)
      id:                     "test" => <computed> (forces new resource)
      config.%:               "0" => "2"
      config.retention.bytes: "" => "107374182400"
      config.retention.ms:    "" => "604800001"
      name:                   "test" => "test"
      partitions:             "50" => "55" (forces new resource)
      replication_factor:     "3" => "3"

It seems that adding partitions tries to recreate the topic. Is there any way in Sarama to have this operation to be in place?

Bug with MTLS connections

Thanks for the Kafka provider!

You have a major bug in your mutual TLS provider setting. The problem is: I can provide ANY client certificate to a broker that is not trusted for that broker's PKI. Nevertheless, the provider does NOT give a connection error for terraform apply (it should bail out, bit it does not!) . Instead, it cannot get topics from the broker and your provider decides it wants to recreate topics that already exists!

Using a trustore location and password

Does the library have support for ssl.truststore.location and ssl.truststore.password?

Our kafka implementation uses those and we would like to use this terraform provider. If it doesn't, could you point us where I could contribute to this library in order to add this feature?

Thanks

SASL/GSSAPI (Kerberos) Support

Has there been thought of adding support for Kerberos, I see the sarama go client has support for it, wondering if there would be any hesitation for me to create a PR for this?

Resource Kafka scram sasl user

Hello, i realized that creation of SASL user is not supported. I'm consider creating this as a resource and possibly made a pull request in near future. Unfurtunately the creation of user in Kafka by command line script needs to connect to Zookeeper and alter some configs. There is a opened KIP to allow creating scram password via Kafka admin interface.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-506%3A+Allow+setting+SCRAM+password+via+Admin+interface

Currently it is only possible to create SCRAM credentials for users using the kafka-configs.sh script; there is no equivalent functionality in the Admin interface. But the script makes use of a direct ZooKeeper connection in order to create the znodes needed for storing user credentials. Since KIP-4 there has been a push to remove the need for ZooKeeper connections from scripts and replace them by the AdminClient API.

To implement this functionality we need to connect to Zookeeper. I have made a first look of the provider:

"zookeeper": &schema.Schema{ Optional: true, Elem: &schema.Resource{ Schema: map[string]*schema.Schema{ "host": { Type: schema.TypeString, Required: true, }, "sasl_username": &schema.Schema{ Type: schema.TypeString, Required: false, DefaultFunc: schema.EnvDefaultFunc("ZOOKEEPER_SASL_USERNAME", ""), Description: "Username for SASL authentication.", }, "sasl_password": &schema.Schema{ Type: schema.TypeString, Required: false, DefaultFunc: schema.EnvDefaultFunc("ZOOKEEPER_SASL_PASSWORD", ""), Description: "Password for SASL authentication.", }, }, }, },

I propose a resource containing the username, password and scram sasl mechanisms as today is in Scala:

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConfigCommand.scala

This is a "temporary" solution as when eventually the KIP is approved and we could remove zookeeper dependency. I'm wondering why not include Zookeeper in provider to improve the support to functionalities.

Thank you.

Kafka_acl consumer and producer options

Hello,

First of all, thank you for this amazing provider.

I've been using it and it's really nice to use. However, I found that the acl resource is lacking the possibility to use the option --consumer or --producer when they are created.
I might've missed those options in checking the code and trying to use correctly your provider.
Are those options already implemented or will they be implemented in the near futurs ?

Regards.

tls: failed to parse password protected private key

Does this plugin support TLS certificate password? probably not.
My ca_cert_file, client_cert_file and client_key_file are generated using AWS certificate manager. Those certificates are protected by password.
Then I got an error tls: failed to parse private key when terraform plan

$ cat kafka.tf 
provider "kafka" {
  bootstrap_servers = ["my-broker-host:19092"]
  ca_cert_file      = "./Certificate_chain.txt"
  client_cert_file  = "./Certificate.txt"
  client_key_file   = "./private_key.txt"
  tls_enabled       = true
  skip_tls_verify   = false
}

resource "kafka_topic" "logs" {
  name               = "acl-admin-test"
  replication_factor = 3
  partitions         = 5
}
$ TF_LOG=debug terraform plan
2019/07/23 16:13:36 [INFO] Terraform version: 0.12.3  
2019/07/23 16:13:36 [INFO] Go runtime version: go1.12.6
2019/07/23 16:13:36 [INFO] CLI args: []string{"/usr/local/bin/terraform", "plan"}
2019/07/23 16:13:36 [DEBUG] Attempting to open CLI config file: /Users/01087872/.terraformrc
2019/07/23 16:13:36 [DEBUG] File doesn't exist, but doesn't need to. Ignoring.
2019/07/23 16:13:36 [INFO] CLI command args: []string{"plan"}
2019/07/23 16:13:36 [DEBUG] New state was assigned lineage "681c36a3-5568-f97a-df76-362a578ad749"
2019/07/23 16:13:36 [DEBUG] checking for provider in "."
2019/07/23 16:13:36 [DEBUG] checking for provider in "/usr/local/bin"
2019/07/23 16:13:36 [DEBUG] checking for provider in ".terraform/plugins/darwin_amd64"
2019/07/23 16:13:36 [DEBUG] checking for provider in "/Users/01087872/.terraform.d/plugins"
2019/07/23 16:13:36 [WARN] found legacy provider "terraform-provider-kafka"
2019/07/23 16:13:36 [DEBUG] found valid plugin: "kafka", "0.0.0", "/Users/01087872/.terraform.d/plugins/terraform-provider-kafka"
2019/07/23 16:13:36 [DEBUG] checking for provisioner in "."
2019/07/23 16:13:36 [DEBUG] checking for provisioner in "/usr/local/bin"
2019/07/23 16:13:36 [DEBUG] checking for provisioner in ".terraform/plugins/darwin_amd64"
2019/07/23 16:13:36 [DEBUG] checking for provisioner in "/Users/01087872/.terraform.d/plugins"
2019/07/23 16:13:36 [INFO] backend/local: starting Plan operation
2019-07-23T16:13:36.317+0800 [INFO]  plugin: configuring client automatic mTLS
2019-07-23T16:13:36.342+0800 [DEBUG] plugin: starting plugin: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka args=[/Users/01087872/.terraform.d/plugins/terraform-provider-kafka]
2019-07-23T16:13:36.346+0800 [DEBUG] plugin: plugin started: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka pid=13999
2019-07-23T16:13:36.347+0800 [DEBUG] plugin: waiting for RPC address: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka
2019-07-23T16:13:36.375+0800 [INFO]  plugin.terraform-provider-kafka: configuring server automatic mTLS: timestamp=2019-07-23T16:13:36.375+0800
2019-07-23T16:13:36.399+0800 [DEBUG] plugin: using plugin: version=5
2019-07-23T16:13:36.400+0800 [DEBUG] plugin.terraform-provider-kafka: plugin address: address=/var/folders/w1/9cq510fj2719932_y9wpyhynh55kwm/T/plugin875569410 network=unix timestamp=2019-07-23T16:13:36.399+0800
2019-07-23T16:13:36.457+0800 [DEBUG] plugin: plugin process exited: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka pid=13999
2019-07-23T16:13:36.457+0800 [DEBUG] plugin: plugin exited
2019/07/23 16:13:36 [TRACE] terraform.NewContext: complete
2019/07/23 16:13:36 [INFO] terraform: building graph: GraphTypeValidate
2019/07/23 16:13:36 [DEBUG] ProviderTransformer: "kafka_topic.acl-admin-test-topic" (*terraform.NodeValidatableResource) needs provider.kafka
2019/07/23 16:13:36 [TRACE] (graphTransformerMulti) Completed graph transform *terraform.ProviderTransformer with new graph:
kafka_topic.acl-admin-test-topic - *terraform.NodeValidatableResource
  provider.kafka - *terraform.NodeApplyableProvider
provider.kafka - *terraform.NodeApplyableProvider
------
2019/07/23 16:13:36 [TRACE] (graphTransformerMulti) Executing graph transform *terraform.PruneProviderTransformer
2019/07/23 16:13:36 [DEBUG] ReferenceTransformer: "kafka_topic.acl-admin-test-topic" references: []
2019/07/23 16:13:36 [DEBUG] ReferenceTransformer: "provider.kafka" references: []
2019/07/23 16:13:36 [DEBUG] Starting graph walk: walkValidate
2019/07/23 16:13:36 [TRACE] dag/walk: updating graph
2019-07-23T16:13:36.458+0800 [INFO]  plugin: configuring client automatic mTLS
2019-07-23T16:13:36.482+0800 [DEBUG] plugin: starting plugin: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka args=[/Users/01087872/.terraform.d/plugins/terraform-provider-kafka]
2019-07-23T16:13:36.486+0800 [DEBUG] plugin: plugin started: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka pid=14000
2019-07-23T16:13:36.486+0800 [DEBUG] plugin: waiting for RPC address: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka
2019-07-23T16:13:36.504+0800 [INFO]  plugin.terraform-provider-kafka: configuring server automatic mTLS: timestamp=2019-07-23T16:13:36.504+0800
2019-07-23T16:13:36.529+0800 [DEBUG] plugin: using plugin: version=5
2019-07-23T16:13:36.529+0800 [DEBUG] plugin.terraform-provider-kafka: plugin address: network=unix address=/var/folders/w1/9cq510fj2719932_y9wpyhynh55kwm/T/plugin344576815 timestamp=2019-07-23T16:13:36.529+0800
2019-07-23T16:13:36.587+0800 [DEBUG] plugin: plugin process exited: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka pid=14000
2019-07-23T16:13:36.587+0800 [DEBUG] plugin: plugin exited
2019/07/23 16:13:36 [TRACE] [walkValidate] Exiting eval tree: provider.kafka (close)
2019/07/23 16:13:36 [TRACE] vertex "provider.kafka (close)": visit complete
2019/07/23 16:13:36 [INFO] backend/local: plan calling Refresh
2019/07/23 16:13:36 [INFO] terraform: building graph: GraphTypeRefresh
Refreshing Terraform state in-memory prior to plan...
The refreshed state will be used to calculate this plan, but will not be
2019/07/23 16:13:36 [DEBUG] Starting graph walk: walkRefresh
persisted to local or remote state storage.

2019/07/23 16:13:36 [INFO] backend/local: plan calling Plan

2019/07/23 16:13:36 [INFO] terraform: building graph: GraphTypePlan
2019/07/23 16:13:36 [TRACE] Executing graph transform *terraform.ConfigTransformer
2019/07/23 16:13:36 [TRACE] ConfigTransformer: Starting for path: 
------------------------------------------------------------------------
2019/07/23 16:13:36 [DEBUG] ProviderTransformer: "kafka_topic.acl-admin-test-topic" (*terraform.NodePlannableResource) needs provider.kafka
2019/07/23 16:13:36 [DEBUG] ReferenceTransformer: "kafka_topic.acl-admin-test-topic" references: []
2019/07/23 16:13:36 [DEBUG] ReferenceTransformer: "provider.kafka" references: []
2019/07/23 16:13:36 [DEBUG] Starting graph walk: walkPlan
2019-07-23T16:13:36.589+0800 [INFO]  plugin: configuring client automatic mTLS
2019-07-23T16:13:36.613+0800 [DEBUG] plugin: starting plugin: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka args=[/Users/01087872/.terraform.d/plugins/terraform-provider-kafka]
2019-07-23T16:13:36.616+0800 [DEBUG] plugin: plugin started: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka pid=14001
2019-07-23T16:13:36.616+0800 [DEBUG] plugin: waiting for RPC address: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka
2019-07-23T16:13:36.633+0800 [INFO]  plugin.terraform-provider-kafka: configuring server automatic mTLS: timestamp=2019-07-23T16:13:36.633+0800
2019-07-23T16:13:36.662+0800 [DEBUG] plugin: using plugin: version=5
2019-07-23T16:13:36.662+0800 [DEBUG] plugin.terraform-provider-kafka: plugin address: address=/var/folders/w1/9cq510fj2719932_y9wpyhynh55kwm/T/plugin972972548 network=unix timestamp=2019-07-23T16:13:36.662+0800
2019-07-23T16:13:36.715+0800 [DEBUG] plugin.terraform-provider-kafka: 2019/07/23 16:13:36 [DEBUG] 0:Converting kafka-test-0.stg.data.fastretailing.io:19092 to string
2019-07-23T16:13:36.715+0800 [DEBUG] plugin.terraform-provider-kafka: 2019/07/23 16:13:36 [DEBUG] configuring provider with Brokers @ &[my-broker-host:19092]
2019-07-23T16:13:36.715+0800 [DEBUG] plugin.terraform-provider-kafka: 2019/07/23 16:13:36 [DEBUG] Config @ &{0xc0005f7800 120 ./Certificate_chain.txt ./Certificate.txt ./private_key.txt true false   plain}
2019-07-23T16:13:36.715+0800 [DEBUG] plugin.terraform-provider-kafka: 2019/07/23 16:13:36 [INFO] configuring bootstrap_servers &{0xc0005f7800 120 ./Certificate_chain.txt ./Certificate.txt ./private_key.txt true false   plain}
2019-07-23T16:13:36.716+0800 [DEBUG] plugin.terraform-provider-kafka: 2019/07/23 16:13:36 [ERROR] Error creating kafka client
2019/07/23 16:13:36 [ERROR] <root>: eval: *terraform.EvalConfigProvider, err: tls: failed to parse private key
2019/07/23 16:13:36 [ERROR] <root>: eval: *terraform.EvalSequence, err: tls: failed to parse private key
2019/07/23 16:13:36 [ERROR] <root>: eval: *terraform.EvalOpFilter, err: tls: failed to parse private key
2019/07/23 16:13:36 [ERROR] <root>: eval: *terraform.EvalSequence, err: tls: failed to parse private key
2019/07/23 16:13:36 [TRACE] [walkPlan] Exiting eval tree: provider.kafka
2019/07/23 16:13:36 [TRACE] vertex "provider.kafka": visit complete
2019/07/23 16:13:36 [INFO] backend/local: plan operation completed

Error: tls: failed to parse private key

  on kafka.tf line 1, in provider "kafka":
   1: provider "kafka" {


2019-07-23T16:13:36.720+0800 [DEBUG] plugin: plugin process exited: path=/Users/01087872/.terraform.d/plugins/terraform-provider-kafka pid=14001
2019-07-23T16:13:36.720+0800 [DEBUG] plugin: plugin exited

Externally deleted ACLs are not detected as missing

I would like the ACL resource to recreate any missing ACLs, if they are deleted externally.

Currently it appears that the func aclRead(d *schema.ResourceData, meta interface{}) error implementation only relies on the Terraform state, rather than doing any state modifications based on the result of ListACLs. So missing ACLs are currently not detected.

Confluent Cloud topic creation?

I'm trying to create a topic in a Confluent Cloud Kafka cluster. My configuration is as follows:

provider "kafka" {
  bootstrap_servers = ["redacted:9092"]
  tls_enabled       = true
  sasl_username     = "redacted confluent cloud api key id"
  sasl_password     = "redacted confluent cloud api key secret"
}

resource "kafka_topic" "test_terraform" {
  name               = "test_terraform"
  replication_factor = 2
  partitions         = 10

  config = {
  }
}

Then I issue a terraform apply and get:

Error: Error applying plan:

1 error(s) occurred:

* kafka_topic.test_terraform: 1 error(s) occurred:

* kafka_topic.test_terraform: kafka server: Request parameters do not satisfy the configured policy.

This looks to be related to issue #25 but I can't go fix the keystore in Confluent's SaaS Kafka cluster.

Any ideas? Should be possible to specify ciphers somehow or maybe defaults need fixing?

Thanks!

Unable to run against ACLs created with previous version

When using version 0.1.4 and terraform 11, i was able to create topics and ACLs. Attempting to upgrade to 0.2.0 and terraform 12, i'm in a state where my remote state has been upgraded to the 12 format, but a terraform apply fails oddly. Using TRACE level debug, i see this:
2019/06/26 09:04:17 [DEBUG] kafka_acl.Sales43Outbound-create: apply errored, but we're indicating that via the Error pointer rather than returning it: Unknown pattern type filter: ''
2019/06/26 09:04:17 [TRACE] : eval: *terraform.EvalWriteState
2019/06/26 09:04:17 [TRACE] EvalWriteState: writing current state object for kafka_acl.Sales43Outbound-create
2019/06/26 09:04:17 [TRACE] : eval: *terraform.EvalApplyPost
2019/06/26 09:04:17 [ERROR] : eval: *terraform.EvalApplyPost, err: Unknown pattern type filter: ''
2019/06/26 09:04:17 [ERROR] : eval: *terraform.EvalSequence, err: Unknown pattern type filter: ''
2019/06/26 09:04:17 [ERROR] : eval: *terraform.EvalOpFilter, err: Unknown pattern type filter: ''

now, on a plan, it says it's going to destroy every acl i have because it needs to add resource_pattern_type_filter = "Literal" to them all. that's fine, except it fails on the destroy and never gets to the create. originally, i did not have that pattern type filter in my terraform at all. so now i'm stuck. can't go forward and can't go back.
job.log

Recreating a topic results in an error

When increasing the partitions in examples/main.tf

-/+ kafka_topic.foo2 (new resource required)
      id:                 "foo" => <computed> (forces new resource)
      config.%:           "1" => "1"
      config.segment.ms:  "20000" => "20000"
      name:               "foo" => "foo"
      partitions:         "1" => "2" (forces new resource)
      replication_factor: "1" => "1"


Plan: 1 to add, 0 to change, 1 to destroy.

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value: yes

kafka_topic.foo2: Destroying... (ID: foo)
kafka_topic.foo2: Destruction complete after 0s
kafka_topic.foo2: Creating...
  config.%:           "" => "1"
  config.segment.ms:  "" => "20000"
  name:               "" => "foo"
  partitions:         "" => "2"
  replication_factor: "" => "1"

Error: Error applying plan:

1 error(s) occurred:

* kafka_topic.foo2: 1 error(s) occurred:

* kafka_topic.foo2: kafka server: Topic with this name already exists.

This is probably because a successful topic deletion actually means that a topic is marked for deletion and not actually deleted. We can poll kafka to see if the topic is actually deleted.

Allow resource pattern type to be set when managing ACLs

Hi! Thanks for the awesome provider!
One thing that I'm missing in the ACL resource is an option ot set the resource-pattern-type. This allows to define ACLs with topic prefixes for example. So instead of creating the ACL with topic name I can set the topic prefix only. This is described in Kafka documentation: https://kafka.apache.org/documentation/#security_authz_cli (see --resource-pattern-type option).

It looks like implementing this would be tricky as sarama does not support this yet, am I right?

Terraform 0.11 or 0.12?

Just wonder what is the state of terraform 0.12 compatibility?

Thank you very much for the wonderful provider!

Provision topics with a Zookeeper provider

The tooling only supports connecting to an array of brokers. We use zookeeper with JAAS auth to create topics (internally I think kafka-acl.sh gets a kafka broker directly to create the topic?)

Would be nice if this TF provider was able to use Zookeeper instead to provision topics and ACLs to replaces our commands:

kafka-acls.sh --authorizer-properties zookeeper.connect=$ZOOKEEPER --list
kafka-topics.sh --create --zookeeper $ZOOKEEPER --replication-factor 3 --partitions 1 --topic my-topic

Increasing replication factor recreates the topic

Hi @Mongey

Thanks for this amazing provider!
We started to implement it and noticed that increasing replication factor force recreation of the topics although this very uncommon operation, this is something we will want to be able change it easily in case we made a mistake or business priorities.

Can you help?
Thanks
D.

kafka version

Hi,

In the readme file it is written that the Requirement is "Kafka 1.0.0".

Is this provider supposed to work with Kafka 2.x? Is the requirement Kafka >= 1.0.0?

Error during "go build" with tag v0.2.1

Hello,

Trying to build this provider by using version tag v0.2.1 and it raise an error (detailled steps below). There is no problem with master branch and version tag v0.2.0. I tested with another laptop on Mac and Linux, exactly same error message.

> git status
HEAD detached at v0.2.1
nothing to commit, working tree clean

> make build
go build .

The error message is:
github.com/Mongey/terraform-provider-kafka/kafka kafka/config.go:33:24: kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc undefined (type struct { Enable bool; Mechanism sarama.SASLMechanism; Handshake bool; User string; Password string; SCRAMAuthzID string; SCRAMClient sarama.SCRAMClient; TokenProvider sarama.AccessTokenProvider } has no field or method SCRAMClientGeneratorFunc) kafka/config.go:36:24: kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc undefined (type struct { Enable bool; Mechanism sarama.SASLMechanism; Handshake bool; User string; Password string; SCRAMAuthzID string; SCRAMClient sarama.SCRAMClient; TokenProvider sarama.AccessTokenProvider } has no field or method SCRAMClientGeneratorFunc) GNUmakefile:6: recipe for target 'build' failed make: *** [build] Error 2

`terraform import` with the option `--provider=` doesn't work ?

Hello again !

It seems that importing a kafka topics completely ignores the --provider= option.

Here is the configuration I use :

provider "kafka" {
  alias             = "kafk00"
  bootstrap_servers = ["kafka00-0000:9092"]
  timeout           = 3000
}

resource "kafka_topic" "topic00" {
  provider           = "kafka.kafk00"
  name               = "topic00"
  replication_factor = 2
  partitions         = 4
}

The command used to import is : terraform import --provider=kafka.kafk00 kafka_topic.topic00 topi00

kafka_topic.topic00: Importing from ID "topic00"...
kafka_topic.topic00: Import complete!
  Imported kafka_topic (ID: topic00)
kafka_topic.topic00: Refreshing state... (ID: topic00)

Error: provider.kafka: bootstrap_servers was not set

If I add a provider without any alias and a non-existent bootstrap_sever it will also fail but with a different message. To me this proves that the import command does not take the --provider=kafka.kafk00 into account :

provider "kafka" {
  alias             = "kafk00"
  bootstrap_servers = ["pp-li-kafk00-0001:9092"]
  timeout           = 3000
}

provider "kafka" {
  bootstrap_servers = ["nope:9092"]
  timeout           = 3000
}

resource "kafka_topic" "topic00" {
  provider           = "kafka.kafk00"
  name               = "topic00"
  replication_factor = 2
  partitions         = 4
}

Output :

terraform import --provider=kafka.kafk00 kafka_topic.topic00 topic00
kafka_topic.topic00: Importing from ID "topic00"...
kafka_topic.topic00: Import complete!
  Imported kafka_topic (ID: topic00)
kafka_topic.topic00: Refreshing state... (ID: topic00)

Error: provider.kafka: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

Is this a bug, a "missing" feature not present or am I doing something wrong ?

Regards.

Leo

Adding cluster-level ACLs

I'm trying to add ACLs similar to the ones created by running a command:

kafka-acls --add --allow-principal User:sample --operation DESCRIBE --group '*' --cluster

This principal needs DESCRIBE on group and cluster level.

The group level works without issues, however, when I try to add the same permissions on the cluster level:

resource "kafka_acl" "sample-cluster-describe" {
  acl_host                     = "*"
  acl_operation                = "Describe"
  acl_permission_type          = "Allow"
  acl_principal                = "User:sample"
  resource_name                = "*"
  resource_pattern_type_filter = "Literal"
  resource_type                = "Cluster"
}

I'm getting an error:

Error: kafka server: This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details.

  on acls.tf line 42, in resource "kafka_acl" "sample-cluster-describe":
  42: resource "kafka_acl" "sample-cluster-describe" {

I suspect it's because cluster-level permissions do not need the resource_name, however, it is a required argument and I can't omit it.

Error: Missing required argument

  on acls.tf line 42, in resource "kafka_acl" "sample-cluster-describe":
  42: resource "kafka_acl" "sample-cluster-describe" {

The argument "resource_name" is required, but no definition was found.

Error Opening Certificates for Provider - file name too long

I am hitting an error when trying to open the certificates needed to connect to my clusters. The error to me seems like the provider is trying to open the contents of the file as a file path?

Error: open -----BEGIN CERTIFICATE-----
<certificate redacted>
-----END CERTIFICATE-----
: file name too long

My provider config is:

provider "kafka" {
  version           = "~> 0.2"
  bootstrap_servers = ["production-kafka.<redacted>:9093"]
  ca_cert           = file("/home/ca.crt")
  client_cert       = file("/home/client.crt")
  client_key        = file("/home/private.key")
}

Raw string for certs

I noticed that the ca_cert, client_cert, and client_key needs to be read through the use of the file option. I am trying to skip trying to write this data out to files and am hitting Hashi Vault to pull that data out:

provider "kafka" {
  bootstrap_servers      = "${var.bootstrap_servers}"
  ca_cert           = "${vault_pki_secret_backend_cert.ca.ca_chain}"
  client_cert       = "${vault_pki_secret_backend_cert.ca.certificate}"
  client_key        = "${vault_pki_secret_backend_cert.ca.private_key}"
  skip_tls_verify        = false
  tls_enabled            = true
}

I did a fresh build of the plugin but have been unable to get it to work. Any thoughts or suggestions? Does it work this way?

Removing a config does not delete the config

If a topic has a config, and you try to delete the config, it doesn't delete.

$ kafka-topics --zookeeper localhost:2181 --describe --topic foo
Topic:foo       PartitionCount:1        ReplicationFactor:1     Configs:segment.ms=10000

terraform apply with this

diff --git a/examples/main.tf b/examples/main.tf
index 9712870..e4ae4f8 100644
--- a/examples/main.tf
+++ b/examples/main.tf
@@ -12,8 +12,4 @@ resource "kafka_topic" "foo2" {
   name               = "foo"
   replication_factor = 1
   partitions         = 1
-
-  config = {
-    "segment.ms" = "20000"
-  }
 }

This is because sarama didn't implement nullableStrings for values of ConfigEntries.

Validate config values are all strings

The provider only supports string values (for now?) in the config map.

Add a ValidateFunc to the config schema, ensure all keys are strings (and/or validate against the broker)

This will help with issues such as the one described in #40 (comment) where bools are supplied but should be strings.

Alternatively support non-string types.

Error when connecting to cluster with many brokers

When executing an action like update topic, it is failing if the first broker available is not the controller:

kafka_topic.topic1: topic1: kafka server: This is not the correct controller for this cluster.

Using client.Controller() to get controller instead of checking for available broker will solve this.

I have give a try: sysco-middleware@038d4b7 Happy to create a PR if make sense.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.