Coder Social home page Coder Social logo

eventing-kafka's People

Contributors

aavarghese avatar adrcunha avatar aliok avatar chaodaig avatar chizhg avatar devguyio avatar dprotaso avatar dubee avatar eric-sap avatar evankanderson avatar grantr avatar harwayne avatar knative-automation avatar lberk avatar lionelvillard avatar markusthoemmes avatar mattmoor avatar mattmoor-sockpuppet avatar matzew avatar n3wscott avatar nachocano avatar nicolaferraro avatar pierdipi avatar slinkydeveloper avatar srvaroa avatar steven0711dong avatar syedriko avatar travis-minke-sap avatar tzununbekov avatar vaikas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

eventing-kafka's Issues

[KafkaSource] Conversion webhook drops conditions

Describe the bug
The conversion webhook from v1beta1 to v1alpha1 (stored version) does not preserve conditions, leading to infinite reconciling loop

Expected behavior
A clear and concise description of what you expected to happen.
All conditions should be carried over.

To Reproduce
Steps to reproduce the behavior.
Install a v1beta1 KafkaSource and dump the logs

Knative release version
head

Additional context
Add any other context about the problem here such as proposed priority

Upgrading v0.18.1 fails if a KafkaChannel already exists

Describe the bug
Given:

  1. An existing knative cluster
  2. An existing Knative Kafka channel with an older version (e.g. v0.17.5)

Applying the v0.18.1 Kafka channel yaml fails with following error and the upgrade fails

The Deployment "kafka-ch-dispatcher" is invalid: spec.template.spec.containers[0].env[0].valueFrom: Invalid value: "": may not be specified when `value` is not empty

Expected behavior
Applying the yaml should succeed and upgrade should complete successfully

To Reproduce

  1. Install Knative Eventing and Knative Kafka Channel v0.17.5
  2. Create a KafkaChannel
  3. Upgrade Eventing: Apply the Knative Eventing yaml v0.18.3
  4. Apply the Knative Kafka Channel yaml v0.18.1

Knative release version
v0.18.3

Additional context
Pre v0.18 Kafka Channel controller created the kafka-dispatcher deployment, while starting v0.18 it seems the deployment is precreated by default in the release yaml but it doesn't expect that another deployment exists. Hence the failure.

Observability Configuration Follow-Up

This issue is a placeholder to look further into the config-observability settings and determine if they are feasible to add to the eventing-kafka controller and dispatcher.

Background

The main function of observability seems to be the enabling of profiling content on individual pods in the knative-eventing namespace. More specifically, on any pod that uses the sharedmain.Main() implementation from knative.dev/pkg/injection/sharedmain/main.go

This is the (abbreviated) code that appears to enable the observability server in sharedmain.Main():

profilingHandler := profiling.NewHandler(logger, false)
profilingServer := profiling.NewServer(profilingHandler)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)
go func() {
    <-egCtx.Done()
    profilingServer.Shutdown(context.Background())
    eg.Wait();
}()
cmw := sharedmain.SetupConfigMapWatchOrDie(ctx, logger.Sugar())
sharedmain.WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger.Sugar(), environment.ServiceName)

However, putting only this block of code into the controller or dispatcher's non-shared main() function does not accomplish the intented effect. You can browse to a server on port 8008 after the controller/dispatcher executes the go function above, but it returns 404s for all paths.

Research Question: What else is necessary to start a profiling server that actually serves the /debug/pprof path?

Also, we start our own Prometheus server via prometheus.NewMetricsServer() in the channel and dispatcher non-shared main() functions. It is possible that the observability code requires the use of Prometheus in a different manner.

Research Question: Do we need to use Prometheus differently in order to leverage the knative observability machanism?

Instructions

Starting a profiling server on the controller pod:

  • Modify the config-observability configmap in the knative-eventing namespace and add these lines to the "data" section:

    metrics.backend-destination: prometheus
    profiling.enable: "true"
    
  • Wait for the watcher to update the configuration, or restart the eventing-kafka-channel-controller pod manually

  • Run a port-forwarding command to the controller pod, for example

    kubectl -n knative-eventing port-forward eventing-kafka-channel-controller-xxxxxxxxxx-xxxxx 8008
    
  • Browse to the local forwarded address

    http://localhost:8008/debug/pprof/
    

References

# Original Tracing and Observability issue
https://github.com/knative-sandbox/eventing-kafka/issues/22

# KNative information related to observability (mainly for knative-serving, but generic profiling seems common to eventing and service)
https://knative.dev/docs/serving/installing-logging-metrics-traces/

# Information about various profiling URLs
https://github.com/knative/serving/blob/master/test/performance/profiling.md

Reevaluate confluent-kafka-go decision

confluent-kafka-go was originally chosen for this channel implementation over sarama for several reasons (this was more than a year ago)

  • Ordering was not guaranteed in error cases by sarama producer
  • consumer group distribution with sarama-cluster over multiple processes elastically had issues
  • manual offset commit support lacking in sarama
  • only supported older kafka versions and did not have full kafka feature support
  • confluent backing of confluent-kafka-go and librdkafka is more or less guaranteed and kept up to date

The choice of confluent-kafka-go does introduce some challenges though, primarily around cgo builds. This causes a divergence in developer experience here vs all other knative components. We should revisit the initial reasons for the choice as a lot has changed since then, and see if the decision is still justified. Performance, and open source community / governance model should also be taken into consideration and discussion.

Dispatcher Retry & Dead Letter Queues

Currently the implementation has custom retry logic which is preventing the use of the eventing MessageDispatcher.DispatchMessage()'s ability to send to the Dead Letter Queue. Doing so would result in a separate Dead Letter Queue message for every retry attempt. Need to look at retry implementations in conjunction with the Knative Eventing implementations to design a more flexible solution.

[consolidated] Dispatcher deployment readonly

Problem
The dispatcher deployment updates the status of the KafkaChannel and every update needs to be done by the leader and replicas don't receive informer updates, since we use only ReconcileKind.
This behavior breaks dispatcher horizontal scalability and if there are multiple replicas of a dispatcher the information that is in the status isn't really accurate since it reflects the status of the leader only.

In knative/eventing-contrib#1446 we had a discussion on how we can solve this problem.

There are other problems related to updating the status in the dispatcher like in knative/eventing#4280.

Persona:
Which persona is this feature for?
*

Exit Criteria
One of the solutions outlined in the linked discussion.

Time Estimate (optional):
How many developer-days do you think this may take to resolve?
1

Additional context (optional)

KafkaChannel Status Alignment

The architecture of the eventing-sandbox/eventing-kafka implementation is inherently different from that of eventing-contrib/kafka. The former is, however, based on the same KafkaChannel CRD as the latter. This means that the Status tracking is not exactly aligned for tracking the resources READY states. Further we've never tracked K8S Endpoints directly and might want to do so. This likely cannot be worked until plans for merging the two implementations are better defined.

HA Controller Follow-Up

This issue is a placeholder to look a bit deeper into the new HA Controller support being introduced in Knative once things have settled out a bit more. Research as of the time of writing is captured below for reference, but is of course subject to change...

Research...

The Knative Operator appears to be driving this use case. It currently/previously had a list of hardcoded Controllers from Knative Serving that could be made HA with the specified number of replicas. This is being expanded into Knative Eventing and being made a bit more dynamic via the use of the knative.dev/high-availability: "true" label. Some of the core Controllers in the eventing repo have been labelled as such. Nothing in the eventing-contrib repo has been labelled as of yet.

We would only want to run the main eventing-kafka controller in this HA mode if even that. The Channel / Dispatcher (Kafka Producer/Consumer) Deployments will be scaled/managed in other ways.

The eventing-contrib/kafka implementation seems to propagate theCONFIG_LEADERELECTION_NAME="config-leader-election-kafka" Environment Variable around but it's not clear how this is used (if at all?). The config YAML includes the corresponding ConfigMap 400-leader-election-config.yaml.

There is some leader-election logic in the eventing webhook - but that just seems to validate/block creation of configmaps with unsupported controller lists?

Open Questions...

  • Is this HA Controller concept intended to include the individual Channel Controllers?
  • Does having an HA Controller buy us anything useful in production? K8S will restart any failed controller Pods which will likely also just fail. I suppose might protect against a "stuck / hung" Pod?
  • Is this label only used by the Knative Operator?
  • Does the Knative Operator deal with individual Channel Controllers like ours?
  • What would be required of our controller implementation (if anything) in order to support leader-election?

References...

# Original Knative Operator Issue (Open)
https://github.com/knative/operator/issues/73

# Eventing PRs Which Added The HA Label To Ko YAML
https://github.com/knative/eventing/pull/3311
https://github.com/knative/eventing/pull/3324

# Knative Operator Docs On HA (probably not up to date with in-flight changes)
https://github.com/knative/operator/blob/master/docs/configuration.md#spechigh-availability

# HA Leader Election Docs From May
https://github.com/knative/docs/issues/2454
https://github.com/knative/docs/pull/2455/files

Missing / Invalid Kafka Secret Handling

When configured for azure KAFKA_PROVIDER usage, the current implementation handles the case where there are no Kafka Secrets (ie - those labelled with eventing-kafka.knative.dev/kafka-secret: "true"). It does not do so when using standard kafka KAFKA_PROVIDER.

In general we should have the controller be stable when there are no Kafka Secrets, or when the Kafka Secrets are malformed or include the dummy/default data from the yaml.

It would also be good to document the current handling of edge cases (e.g. deleting secret when KafkaChannels exist, etc) so that we can determine requirements for them in a future issue.

Support Custom Kafka AdminClient

/non-trivial
/area eventing-delivery
/kind feature-proposal

Problem Statement

It seems that some Kafka (or kafka-like) implementations provide support for the standard Kafka Producer / Consumer APIs, but not the Admin API (Topic management, etc). Azure EventHubs is such an example, and while the current implementation supports that specific use case, it is possible that third-party users might want to provide their own AdminClient without contributing to the open-source knative-sandbox/eventing-kafka implementation directly. Therefore, it would be nice to support a mechanism by which a third party AdminClient could be used by the eventing-kafka implementation at runtime.

Proposal

Proposal

CloudEvent Tracing Compliance

The Knative Channel Specification specifies the expected behavior of handling W3C Tracecontext headers. The current implementation does NOT meet these requirements in that the traceparent and tracestate headers are lost in transmission. A completely new traceparent value is instead sent to the subscriber.

Currently, the Knative In-Memory Channel reference implementation DOES adhere to the specification, but the eventing-contrib/kafka implementation does not. Similar to this implementation the latter has the intermediate step of persisting to/from a Kafka Topic. There is currently confusion and a debate around the best way to handle this persistence, how it relates to the potential use of ce-traceparent and ce-tracestate, and where in the Knative / CloudEvent stack to handle this. Further, it is unclear whether the CloudEvents DTE Extension is useful or not.

See the following Github Issues/PRs for the history...

Knative / CloudEvents
knative/eventing#2918 & cloudevents/spec#603 & cloudevents/spec#607
eventing/in-memory
knative/eventing#1757
eventing-contrib/kafka
knative/eventing-contrib#1139 & knative/eventing-contrib#1155
knative/eventing-contrib#563
Previous Knative Implementation Removal
knative/eventing#2873
Therefore, we are going to wait until this is resolved before attempting to implement a solution that complies with the expected specification.

There are also larger tracing issue to be dealt with within the Knative Eventing ecosystem such as how to integrate Istio B3 Header usage with the current OpenTelemetry Headers. See knative/eventing#2581.

W3C Tracecontext Notes
The traceparent field is composed of the following values...

- - -

...where the (usually 00) specifies the format of the following data fields, and the is the end-to-end ID of the entire trace through a distributed system. The is also referred to as the , and can/should change with each new HTTP Client Request being made. Therefore the expectation is that when the Channel receives an HTTP request with a traceparent header, that it is forwarded to the Subscriber with the same traceparent header, albeit with a new .

Uplift to latest eventing-contrib

In preparation for the pending repository consolidation (moving eventing-contrib/kafka -> knative-sandbox/eventing-kafka) we need to get our implementation up to the same dependency versions as eventing-contrib. This was recently done (week or two ago) and should be pretty painless. Need to move forward to 0.17.1 / master and associated dependencies.

Refactor ConfigMap Loading/Comparison

The recently introduced ConfigMap support could be improved in a few ways and we'd like to refactor to reduce the complexity / maintenance. The following areas need some love...

NOTE - We need to keep the new custom extraction logic which custom loads the Sarama.Version and TLS.Config info from the YAML string. The overall lifecycle of the ConfigMap watchers is reasonable as well and doesn't necessarily require attention at this point.

Lifecycle

There are a lot of different exposed (and package private) lifecycle functions related to the ConfigMap handling including...

  • LoadSettings
  • LoadConfigFromMap
  • NewSaramaConfig
  • MergeSaramaSettings
  • UpdateSaramaConfig
  • enforceSaramaConfig

..which have a tangled call stack and expose too many different usage patterns. We do need the ability to support a couple of slightly different usage patterns between the SharedMain Controller and non-SharedMain Channel/Dispatcher but it should be possible to reduce the complexity here so that it's easier to reason about. Essentially just create a single entry point constructor and maybe another merge function or something along those lines.

Comparison

Because the original requirements weren't super well defined the comparison of before/after Sarama Config structs was made more complicated than is necessary. We don't need to Marshall the struct out to JSON String, and can ignore the function pointer fields, etc. Currently we have a mirror struct with JSON advice as a means of excluding things from comparison but a more straightforward approach would be to use the cmp package with ignores and some manual comparision similar to this unit test...

func TestCmp(t *testing.T) {

	usernameA := "TestUsernameA"
	usernameB := "TestUsernameB"

	config1 := sarama.NewConfig()
	config1.Net.SASL.User = usernameA

	config2 := sarama.NewConfig()
	config2.Net.SASL.User = usernameA

	config3 := sarama.NewConfig()
	config3.Net.SASL.User = usernameB

	ignoredTypes := cmpopts.IgnoreTypes(config1.Consumer.Group.Rebalance.Strategy, config1.MetricRegistry, config1.Producer.Partitioner)
	ignoredUnexported := cmpopts.IgnoreUnexported(config1.Version)

	diff := cmp.Diff(config1, config2, ignoredTypes, ignoredUnexported)
	t.Log("Diff = " + diff)

	assert.True(t, cmp.Equal(config1, config2, ignoredTypes, ignoredUnexported))
	assert.Equal(t, config1.Version, config2.Version)
	assert.Equal(t, config1.Consumer.Group.Rebalance.Strategy, config2.Consumer.Group.Rebalance.Strategy)

	if config1.Version != config2.Version {
		t.Errorf("Version Inequality")
	}

	assert.False(t, cmp.Equal(config1, config3, ignoredTypes, ignoredUnexported))
}

Convert To Sarama Kafka API

Currently the implementation is based on the Confluent / Librdkafka libraries. Originally (many moons ago) this library was deemed superior but Sarama has made strides in the meantime and is much easier to deal with from a build/deployment perspective. Therefore, based on a recent re-evaluation (see #8) of the libraries we've decided to move to Sarama. At the end of this story there should be no confluent/librdkafka dependencies and everything should work pretty-much as it does today ; )

eventing-contrib Version Updates & v1beta1 KafkaChannels

We've been working on e2e tests (still pending) and the major Sarama conversion and have fallen behind on our use of eventing and eventing-contrib/kafka and need to catch up ; ). Aside from moving to the latest master version of eventing-contrib we also want to move to v1beta1 kafkachannels.

Upgrade tests for KafkaSource

We need to implement upgrade tests for KafkaSource and KafkaBinding so that in the future we can change the storage version to v1beta1 from v1alpha1.

Best idea is to keep eventing at latest release and only changing the eventing-kafka version. We can in the future run the tests again with eventing head too.

Part of umbrella issue, #65

Consistent Support For Multiple Kafka Auth Secrets

Currently the Azure EventHubs AdminClient supports multiple Kafka Authorization Secrets and the "normal" Kafka AdminClient does not. It would be good to step back and define requirements around the consistent support for multiple Kafka instances.

Background
Azure EventHubs (Kafka compatibility / emulation) limit the number of EventHubs (Kafka Topics) can exist in a specific EventHub Namespace to 10. The EventHub Namespaces each have distinct Authorization. The current implementation supporting multiple Kafka Authorizations for EventHubs was put in place to work around this limitation by load-balancing across as many EventHub Namespaces (Kafka Auths) as a user might need.

This is obviously an edge use case in the Kafka community, and the more important issue is whether (and how) to support multiple Kafka instances going forward for multi-tenant type use cases.

Define Dynamic Spec Config Requirements For Channel/Dispatcher

This issue is a placeholder reminder to circle back and define specific requirements around the handling of various configuration values for the Channel (Producer) and Dispatcher (Consumer) K8S Deployments.

The eventing-kafka ConfigMap exposes values related to the runtime characteristics of the Channel / Dispatcher deployments created by the Controller (i.e. replicas, cpu_request, cpu_limit, number of consumers, etc...). These values are used when creating new Deployments for the Channel and Dispatcher. Currently, however, changing them in the ConfigMap will only apply the new values to new Deployments and will not update existing Deployments.

It is unclear whether the current behavior is desirable or not, as there are pros and cons for either approach. For instance, making the ConfigMap always update existing Deployments removes the ability to fine-tune or manually change existing deployments (since the Controller would re-reconcile them back to their ConfigMap values).

So... we need to think through the needs of an administrator responsible for running an eventing-kafka cluster and what level of automation vs customizability is appropriate.

Upgrade tests for KafkaChannel

We need to implement upgrade tests for KafkaChannel. So that, when in the future we change the storage version to v1beta1 from v1alpha1, we will have things tested.

Best idea is to keep eventing at latest release and only changing the eventing-kafka version. We can in the future run the tests again with eventing head too.

Part of umbrella issue, #70

Improve Offset Commit Handling

Currently we are manually committing offsets in a synchronous fashion inside the event receiving loop for a given consumer. There is no reason that this should be done synchronously and it can have a significant performance impact with some kafka providers (Azure EventHubs being one).

This issue is to redesign how we are handling offset commits. Either we need to change the code to perform offset commit in an asynchronous fashion, or perhaps re-evaluate if we can use the built in offset commit functionality in librdkafka. At the time we thought it was important to manually handle the commit but rather the important thing seems to be manually handling the offset store rather than the commit itself.

CloudEvents SDK V2 & Eventing Channel Updates

Currently the implementation is based on CloudEvents SDK V1 and we need to catch up with the rest of knative eventing by moving the SDK to V2. Additionally, and probably more importantly, the implementation needs to move to the new/latest knative-eventing channel implementations for sending/receiving CloudEvents.

This is expected to be the final set of changes carried over from the kyma-incubator/knative-kafka project and is a patch from Issue #108 in that repo.

Improve Dispatcher Naming To Avoid Conflicts

Currently the "dispatcher" deployments are named by convention based on the KafkaChannel's Namespace/Name values. Since we're also limited to 63 char DNS safe K8S names we have truncation logic in place. Therefore, IF there are multiple namespaces with long names which only differ after the first 20 chars, AND there are multiple KafkaChannels in those namespaces with long names which only differ after the first 30 chars, THEN we will have a conflict and the second Dispatcher Deployment will not be created. Therefore we should improve the naming logic so that it is less likely to have a conflict while still maintaining some readability via kubectl queries. We do not have to guarantee there will never be a conflict as we don't want to use generated names and have to track their affiliation with labels or some other mechanism. A simple hashing should provide a reasonable implementation whereby the risk is effectively minimized.

Note - This was discovered by virtue of the e2e tests creating long namespace/kafkachannel names which only differ in their suffix.

v0.17.0 / master uplift

Now that eventing-contrib has released v0.17.0 it would be good for us to move forward to the latest on master/. Should be a smallish upgrade ; )

Performance Testing

Need to implement performance tests, once we have integration tests working (e2e/conformance - see #4)!

Establish release support

A /hack/release.sh script must be added to support nightly builds. An early poc shows that ko and yaml can be used for this purpose. This was done in the poc by adding and using the librdkafka c libraries in the dependency tree (now supported by current versions of confluent-kafka-go).

However, a local developer experience will need to be somewhat different as cross compilation has some special requirements with cgo builds. An option could be running the ko build from inside of a debian container, driven by a makefile, and connecting to the docker daemon and kubectl running on the local host. Otherwise, it seems that there would be quite a few initial setup steps (or script) which would need to be written for each os.

If these approaches are taken or something similar, helm charts and helm dependency can be removed at the same time

Adequate developer documentation should be added for whichever solution is determined here

Add support for a ConfigMap containing all the Sarama Kafka configuration

Currently the Sarama configuration is set (in pkg/common/kafka/util/util.go) via some parameters and constants. We would like to use a configmap for these settings where possible and reasonable. The eventing-contrib/kafka implementation already has support for such a map, though with minimal content. We’d like to reuse whatever makes sense (such as map watching) but with our own defined content and parsing. This configmap may also include any other custom eventing-kafka config as well (i.e. it will not be Sarama-specific).

Details

Watching the Configmap

This is what eventing-contrib/kafka does, which seems like a good starting-point for eventing-kafka work toward a similar end:

kafka/channel/pkg/reconciler/controller/controller.go::NewController (passed in from sharedmain) uses the passed-in configmap.Watcher to call cmw.Watch("config-kafka", updateKafkaConfig), where "updateKafkaConfig" is a function that loads the configmap data and replaces the Reconciler.kafkaConfig with the one parsed out of the configmap.

The configmap parser itself uses code from knative.dev/pkg/configmap/parse.go that might be useful for our purposes as well (common way to parse out durations, ints, floats, etc).

If we don't want to switch to using sharedmain, we will need to create the watcher ourselves via sharedmain.SetupConfigMapWatchOrDie, which is what the pkg/common/k8s/observability.go code already does (not sure if there's a benefit to sharing a single watcher or not)

Loading Sarama Settings from the Configmap

Whether we need the huge block of all of the sarama settings or not, putting them as JSON into a configmap is a relatively ordinary procedure and results in something that looks like this:

apiVersion: v1
data:
  sarama: '{"Admin":{"Retry":{"Max":5,"Backoff":100000000},"Timeout":3000000000},"Net":{"MaxOpenRequests":5,"DialTimeout":30000000000,"ReadTimeout":30000000000,"WriteTimeout":30000000000,"TLS":{"Enable":true},"SASL":{"Enable":true,"Mechanism":"PLAIN","Version":1,"Handshake":true,"AuthIdentity":"","User":"TestUsername","Password":"TestPassword","SCRAMAuthzID":"","TokenProvider":null,"GSSAPI":{"AuthType":0,"KeyTabPath":"","KerberosConfigPath":"","ServiceName":"","Username":"","Password":"","Realm":""}},"KeepAlive":30000000000,"LocalAddr":null,"Proxy":{"Enable":false,"Dialer":null}},"Metadata":{"Retry":{"Max":3,"Backoff":250000000},"RefreshFrequency":300000000000,"Full":true,"Timeout":0},"Producer":{"MaxMessageBytes":1000000,"RequiredAcks":1,"Timeout":10000000000,"Compression":0,"CompressionLevel":-1000,"Idempotent":false,"Return":{"Successes":false,"Errors":true},"Flush":{"Bytes":0,"Messages":0,"Frequency":0,"MaxMessages":0},"Retry":{"Max":3,"Backoff":100000000}},"Consumer":{"Group":{"Session":{"Timeout":10000000000},"Heartbeat":{"Interval":3000000000},"Rebalance":{"Timeout":60000000000,"Retry":{"Max":4,"Backoff":2000000000}},"Member":{"UserData":null}},"Retry":{"Backoff":2000000000},"Fetch":{"Min":1,"Default":1048576,"Max":0},"MaxWaitTime":250000000,"MaxProcessingTime":100000000,"Return":{"Errors":false},"Offsets":{"CommitInterval":0,"AutoCommit":{"Enable":true,"Interval":1000000000},"Initial":-1,"Retention":0,"Retry":{"Max":3}},"IsolationLevel":0},"ClientID":"TestClientId","RackID":"","ChannelBufferSize":256,"Version":{}}'
kind: ConfigMap
metadata:
  name: test
  namespace: test

Of course, the point here is that you can omit any entries from the "sarama" JSON that you want to leave as the defaults, so you could just have something that looks considerably simpler:

apiVersion: v1
data:
  sarama: '{"Net":{"TLS":{"Enable":true},"SASL":{"Enable":true,"Mechanism":"PLAIN","Version":1,"User":"TestUsername","Password":"TestPassword"}},"Metadata":{"RefreshFrequency":300000000000},"ClientID":"TestClientId"}'
kind: ConfigMap
metadata:
  name: test
  namespace: test

Merging the JSON with the default sarama.Config is also not complex:

config := NewSaramaConfig(...)
json.Unmarshal([]byte(configMap.Data["sarama"]), &config)

If we need to save our current settings back to a configmap, that gets complicated (and will likely require our own in-sync copy of the sarama.Config struct with custom tags to ignore some JSON-unfriendly fields). This is possible and has been POC'd but will hopefully not be necessary for this issue (writing to the configmap will only be done manually when an administrator wants to make changes).

Implementing Config Changes

Regarding the procedures necessary to rebuild/restart sarama infrastructure when the config map changes, there are three main components of note:

  • The admin client (only one, recreated every time)

The ReconcileKind function calls SetKafkaAdminClient(), which calls kafkaadmin.CreateAdminClient() every time, so we don't have an "old admin client with old settings" to worry about for the moment. This will change if we decide to keep the admin client around in the future but for now this appears to be a non-issue (the only change would be to load the settings from whatever the current configmap is instead of using constants and/or environment variables).

  • The ConsumerGroup (potentially several; the dispatcher is horizontally scalable, so each pod would make ConsumerGroups for a collection of subscriberSpecs)

The implementation for the ConsumerGroup is all in one place, but there could be several pods, which would need to be reconfigured (or recreated). The UpdateSubscriptions() function loops through the subscribers and calls CreateConsumerGroup whenever it finds no subscriber wrapper for a particular subscriber UID. The ConsumerGroup goes into the SubscriberWrapper and remains as part of the DispatcherImpl (created in the dispatcher's main.go and remaining for the life of the pod).
If the ConfigMap watcher is only going to run on the controller, it might be easiest to restart all of the dispatcher pods to pick up any new configuration data (the controller could parse the configmap and see if the changes are actually relevant to the ConsumerGroup first if desired)

Risk of trouble from restarting dispatcher pods is low, since the critical data is all stored in a kafka topic, not the eventing-kafka pods themselves, but unnecessary pod churn is, of course, preferably avoided. We could easily start with just having the controller restart the dispatcher pods when the configmap changes (which will presumably not be particularly often). Adjustments to a working system are likely easier than trying to work through all of the potential logic of changing the ConsumerGroups on the fly (either by having the dispatchers also watch the configmap or having a different IPC mechanism from the controller).

  • The Producer (only one at the moment)

The Producer is similar to the ConsumerGroup in that it is created (once, in the NewProducer() function called by the channel's main.go code) and then used repeatedly (during handleMessage() which is called by when a message is received (due to the eventingchannel.NewMessageReceiver() call in the channel's main.go).

If we handle configmap changes by restarting the producer pod, we must be certain that we do not encounter any situation in which an event has been received (i.e. handleMessage has been called) and a non-error returned, but the message not persisted in the Kafka topic.

I don't see any real way for this to happen, as the ProduceKafkaMessage waits for the delivery report via the SyncProducer's SendMessage function before giving handleMessage the go-ahead to return no-error. If the process is terminated anywhere in that execution path, the caller of handleMessage (which is the ServeHTTP function in eventing's message_receiver.go file) will get an error (network or otherwise) that will be passed back to the caller (that is, the customer). Errors are acceptable for a brief time during reconfiguration; if less downtime is desired, the producer could watch the configmap directly and reconfigure the SyncProducer itself instead of letting the controller bounce it.

As with the ConsumerGroup it is probably best to start with a naïve restarting of pods and see if the required performance during configuration changes demands a modification to that approach.

Convert to Go Mod

Get rid of dep and switch over! Lots of examples in eventing and eventing-contrib to reference here.

[consolidated] KafkaSource: expose Kafka configuration

Problem
The Kafka configuration is currently being hardcoded in the KafkaSource adapter. For instance, the commit policy is currently (assuming #100 is merged) set to autocommit every 1s, compromising speed vs duplicate events. Some applications might want the opposite compromise.

Persona:
Which persona is this feature for?
Application developer

Exit Criteria
A measurable (binary) test that would indicate that the problem has been resolved.
Check the Kafka configuration varies upon user-input.

Time Estimate (optional):
How many developer-days do you think this may take to resolve?
1

Additional context (optional)
Add any other context about the feature request here.

Pwd Logging Bug & Cleanup

Recent refactoring of environment variable loading in channel/dispatcher resulted in passwords being logged - need to fix that. Also need to remove the local-dev.sh script which is no longer used now that we're on Sarama and don't need to build inside linux docker container.

/kind bug

[consolidated] The channel dispatcher does not inherit the main context

Describe the bug
The channel dispatcher does not inherit the main context when creating consumer group, preventing subscribers to gracefully shut down

Expected behavior
The channel dispatcher should inherit the main context

To Reproduce

Knative release version
HEAD

Additional context
Add any other context about the problem here such as proposed priority

Support Eventing Tracing & Observability Configuration

In order to improve our alignment with the core Knative Eventing implementations we should support the config-tracing and config-observability global ConfigMaps in the knative-eventing namespace. This is similar to the work we did to align on the config-logging implementation.

Add Native Observability (Profiling and Metrics)

Background

In the eventing-kafka code (specifically in pkg/common/prometheus/metrics.go) we create a Prometheus server manually, and use the Observe function in channel/producer/producer.go and dispatcher/dispatcher.go to record metrics.

We want to take out this explicit Prometheus server and switch to using the knative-eventing built-in observability functionality (which will by default create a Prometheus server, but should be able to be changed to use anything that opencensus supports).

Steps

Code changes

  • We need to add something very similar to the pkg/broker/ingress/stats_reporter.go to eventing-kafka (probably in common/metrics or similar)
    • Alter this new StatsReporter to use knative_kafka_produced_msg_count and knative_kafka_consumed_msg_count instead of the ingress broker metrics
    • Change the "metrics *prometheus.MetricsServer" field in the Producer struct to a StatsReporter instead
    • Add a call to NewStatsReporter() in the channel and dispatcher main(), pass the resulting structure into the producer.NewProducer() and dispatch.NewDispatcher() calls instead of the NewMetricsServer() result we currently use
    • Change Producer->processProducerEvents and Dispatcher->handleKafkaMessages() to create a ReportArgs structure and call the statsReporter.ReportEventCount() instead of the current metrics.Observe() calls
  • Add an observability.go to common/k8s with an InitializeObservability function to be called from the channel and dispatcher's main() functions (see attached files for examples). The main purpose of this function is to start the profiling server and update the metrics exporter (which starts it, in the case of Prometheus).
  • Need to add METRICS_DOMAIN (which should NOT have a slash in it, unlike the tests in pkg/reconciler/apiserversource/controller_test.go) that should probably just be "eventing-kafka"
  • METRICS_DOMAIN needs to be set in the controller and passed as a variable to the channel and dispatcher, much like any of a number of other environment variables.
  • We might need to pass some other pieces of information like the container name and pod name, if those are not already readily available, as the NewStatsReporter() function needs them.
  • Remove the old code from pkg/common/prometheus/metrics.go (possibly the entire file or directory; needs to be determined if anything in there needs to be kept)

When this is all complete there should be (if possible) no references to Prometheus explicitly in the eventing-kafka codebase (barring vendor code).

Examples
There are a few examples of how to use the StatsReporter.ReportEventCount function in the eventing code, such as pkg/broker/ingress/ingress_handler.go

The github.com/census-ecosystem/opencensus-go-exporter-prometheus project has examples of how to use opencensus to create views and verify that metrics are being produced and exported properly.

InitializeObservability sample code:
observability.zip

References

Original Observability research issue: #29

eventing.knative.dev/scope: namespace does not work with distributed kafka channel

Describe the bug
In the previous version eventing.knative.dev/scope: namespace would allow me to connect to different Kafka clusters at least at the namespace level.

Expected behavior
The ability to connect to more than one kafka cluster with knative.

To Reproduce
Add the eventing.knative.dev/scope: namespace to a kafka channel (distributed) and nothing happens.

Knative release version
nightly

Additional context
We don't absolutely need eventing.knative.dev/scope: namespace to work, we just need a way to connect to multiple kafka clusters, like maybe even specifying it at the KafkaChannel level similar to how it is in KafkaSource.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.