knative-extensions / eventing-kafka Goto Github PK
View Code? Open in Web Editor NEWKafka integrations with Knative Eventing.
License: Apache License 2.0
Kafka integrations with Knative Eventing.
License: Apache License 2.0
Once we're up to date with the latest eventing-contrib/eventing we should refactor the Dispatcher's custom retry mechanism to make use of the newly added DispatchMessageWithRetries() functionality!
Describe the bug
The conversion webhook from v1beta1 to v1alpha1 (stored version) does not preserve conditions, leading to infinite reconciling loop
Expected behavior
A clear and concise description of what you expected to happen.
All conditions should be carried over.
To Reproduce
Steps to reproduce the behavior.
Install a v1beta1 KafkaSource and dump the logs
Knative release version
head
Additional context
Add any other context about the problem here such as proposed priority
Describe the bug
Given:
v0.17.5
)Applying the v0.18.1
Kafka channel yaml fails with following error and the upgrade fails
The Deployment "kafka-ch-dispatcher" is invalid: spec.template.spec.containers[0].env[0].valueFrom: Invalid value: "": may not be specified when `value` is not empty
Expected behavior
Applying the yaml should succeed and upgrade should complete successfully
To Reproduce
v0.17.5
KafkaChannel
v0.18.3
v0.18.1
Knative release version
v0.18.3
Additional context
Pre v0.18
Kafka Channel controller created the kafka-dispatcher deployment, while starting v0.18
it seems the deployment is precreated by default in the release yaml but it doesn't expect that another deployment exists. Hence the failure.
This issue is a placeholder to look further into the config-observability settings and determine if they are feasible to add to the eventing-kafka controller and dispatcher.
The main function of observability seems to be the enabling of profiling content on individual pods in the knative-eventing namespace. More specifically, on any pod that uses the sharedmain.Main() implementation from knative.dev/pkg/injection/sharedmain/main.go
This is the (abbreviated) code that appears to enable the observability server in sharedmain.Main():
profilingHandler := profiling.NewHandler(logger, false)
profilingServer := profiling.NewServer(profilingHandler)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)
go func() {
<-egCtx.Done()
profilingServer.Shutdown(context.Background())
eg.Wait();
}()
cmw := sharedmain.SetupConfigMapWatchOrDie(ctx, logger.Sugar())
sharedmain.WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger.Sugar(), environment.ServiceName)
However, putting only this block of code into the controller or dispatcher's non-shared main() function does not accomplish the intented effect. You can browse to a server on port 8008 after the controller/dispatcher executes the go function above, but it returns 404s for all paths.
Also, we start our own Prometheus server via prometheus.NewMetricsServer() in the channel and dispatcher non-shared main() functions. It is possible that the observability code requires the use of Prometheus in a different manner.
Modify the config-observability configmap in the knative-eventing namespace and add these lines to the "data" section:
metrics.backend-destination: prometheus
profiling.enable: "true"
Wait for the watcher to update the configuration, or restart the eventing-kafka-channel-controller pod manually
Run a port-forwarding command to the controller pod, for example
kubectl -n knative-eventing port-forward eventing-kafka-channel-controller-xxxxxxxxxx-xxxxx 8008
Browse to the local forwarded address
http://localhost:8008/debug/pprof/
# Original Tracing and Observability issue
https://github.com/knative-sandbox/eventing-kafka/issues/22
# KNative information related to observability (mainly for knative-serving, but generic profiling seems common to eventing and service)
https://knative.dev/docs/serving/installing-logging-metrics-traces/
# Information about various profiling URLs
https://github.com/knative/serving/blob/master/test/performance/profiling.md
Remove KafkaSource and KafkaBinding v1alpha1 API shape as written here: knative/eventing-contrib#1504 (review)
Part of umbrella issue, #65
confluent-kafka-go was originally chosen for this channel implementation over sarama for several reasons (this was more than a year ago)
The choice of confluent-kafka-go does introduce some challenges though, primarily around cgo builds. This causes a divergence in developer experience here vs all other knative components. We should revisit the initial reasons for the choice as a lot has changed since then, and see if the decision is still justified. Performance, and open source community / governance model should also be taken into consideration and discussion.
Currently the implementation has custom retry logic which is preventing the use of the eventing MessageDispatcher.DispatchMessage()'s ability to send to the Dead Letter Queue. Doing so would result in a separate Dead Letter Queue message for every retry attempt. Need to look at retry implementations in conjunction with the Knative Eventing implementations to design a more flexible solution.
Problem
The dispatcher deployment updates the status of the KafkaChannel and every update needs to be done by the leader and replicas don't receive informer updates, since we use only ReconcileKind
.
This behavior breaks dispatcher horizontal scalability and if there are multiple replicas of a dispatcher the information that is in the status isn't really accurate since it reflects the status of the leader only.
In knative/eventing-contrib#1446 we had a discussion on how we can solve this problem.
There are other problems related to updating the status in the dispatcher like in knative/eventing#4280.
Persona:
Which persona is this feature for?
*
Exit Criteria
One of the solutions outlined in the linked discussion.
Time Estimate (optional):
How many developer-days do you think this may take to resolve?
1
Additional context (optional)
The architecture of the eventing-sandbox/eventing-kafka
implementation is inherently different from that of eventing-contrib/kafka
. The former is, however, based on the same KafkaChannel CRD as the latter. This means that the Status tracking is not exactly aligned for tracking the resources READY states. Further we've never tracked K8S Endpoints directly and might want to do so. This likely cannot be worked until plans for merging the two implementations are better defined.
Make v1beta1 storage version for KafkaSource and KafkaBinding
Existing WIP PR: knative/eventing-contrib#1504
Part of umbrella issue, #65
Problem
Given the eventing contrib KafkaChannel, with no subscriber based offset, it is impossible to provide at least once delivery guarantees. Retries and dead letter queue are the user's best options.
Race conditions and randomly failing health check tests.
This issue is a placeholder to look a bit deeper into the new HA Controller support being introduced in Knative once things have settled out a bit more. Research as of the time of writing is captured below for reference, but is of course subject to change...
The Knative Operator appears to be driving this use case. It currently/previously had a list of hardcoded Controllers from Knative Serving that could be made HA with the specified number of replicas. This is being expanded into Knative Eventing and being made a bit more dynamic via the use of the knative.dev/high-availability: "true"
label. Some of the core Controllers in the eventing repo have been labelled as such. Nothing in the eventing-contrib repo has been labelled as of yet.
We would only want to run the main eventing-kafka controller in this HA mode if even that. The Channel / Dispatcher (Kafka Producer/Consumer) Deployments will be scaled/managed in other ways.
The eventing-contrib/kafka implementation seems to propagate theCONFIG_LEADERELECTION_NAME="config-leader-election-kafka"
Environment Variable around but it's not clear how this is used (if at all?). The config YAML includes the corresponding ConfigMap 400-leader-election-config.yaml
.
There is some leader-election logic in the eventing webhook - but that just seems to validate/block creation of configmaps with unsupported controller lists?
# Original Knative Operator Issue (Open)
https://github.com/knative/operator/issues/73
# Eventing PRs Which Added The HA Label To Ko YAML
https://github.com/knative/eventing/pull/3311
https://github.com/knative/eventing/pull/3324
# Knative Operator Docs On HA (probably not up to date with in-flight changes)
https://github.com/knative/operator/blob/master/docs/configuration.md#spechigh-availability
# HA Leader Election Docs From May
https://github.com/knative/docs/issues/2454
https://github.com/knative/docs/pull/2455/files
When configured for azure
KAFKA_PROVIDER usage, the current implementation handles the case where there are no Kafka Secrets (ie - those labelled with eventing-kafka.knative.dev/kafka-secret: "true"
). It does not do so when using standard kafka
KAFKA_PROVIDER.
In general we should have the controller be stable when there are no Kafka Secrets, or when the Kafka Secrets are malformed or include the dummy/default data from the yaml.
It would also be good to document the current handling of edge cases (e.g. deleting secret when KafkaChannels exist, etc) so that we can determine requirements for them in a future issue.
/non-trivial
/area eventing-delivery
/kind feature-proposal
It seems that some Kafka (or kafka-like) implementations provide support for the standard Kafka Producer / Consumer APIs, but not the Admin API (Topic management, etc). Azure EventHubs is such an example, and while the current implementation supports that specific use case, it is possible that third-party users might want to provide their own AdminClient without contributing to the open-source knative-sandbox/eventing-kafka implementation directly. Therefore, it would be nice to support a mechanism by which a third party AdminClient could be used by the eventing-kafka implementation at runtime.
Refactor to standard context usage instead of shutdown channels managing lifecycle
The Knative Channel Specification specifies the expected behavior of handling W3C Tracecontext headers. The current implementation does NOT meet these requirements in that the traceparent and tracestate headers are lost in transmission. A completely new traceparent value is instead sent to the subscriber.
Currently, the Knative In-Memory Channel reference implementation DOES adhere to the specification, but the eventing-contrib/kafka implementation does not. Similar to this implementation the latter has the intermediate step of persisting to/from a Kafka Topic. There is currently confusion and a debate around the best way to handle this persistence, how it relates to the potential use of ce-traceparent and ce-tracestate, and where in the Knative / CloudEvent stack to handle this. Further, it is unclear whether the CloudEvents DTE Extension is useful or not.
See the following Github Issues/PRs for the history...
Knative / CloudEvents
knative/eventing#2918 & cloudevents/spec#603 & cloudevents/spec#607
eventing/in-memory
knative/eventing#1757
eventing-contrib/kafka
knative/eventing-contrib#1139 & knative/eventing-contrib#1155
knative/eventing-contrib#563
Previous Knative Implementation Removal
knative/eventing#2873
Therefore, we are going to wait until this is resolved before attempting to implement a solution that complies with the expected specification.
There are also larger tracing issue to be dealt with within the Knative Eventing ecosystem such as how to integrate Istio B3 Header usage with the current OpenTelemetry Headers. See knative/eventing#2581.
W3C Tracecontext Notes
The traceparent field is composed of the following values...
- - -
...where the (usually 00) specifies the format of the following data fields, and the is the end-to-end ID of the entire trace through a distributed system. The is also referred to as the , and can/should change with each new HTTP Client Request being made. Therefore the expectation is that when the Channel receives an HTTP request with a traceparent header, that it is forwarded to the Subscriber with the same traceparent header, albeit with a new .
In preparation for the pending repository consolidation (moving eventing-contrib/kafka -> knative-sandbox/eventing-kafka) we need to get our implementation up to the same dependency versions as eventing-contrib. This was recently done (week or two ago) and should be pretty painless. Need to move forward to 0.17.1 / master and associated dependencies.
Problem
Need to update KafkaChannel storage version to v1beta1.
Moved from knative/eventing-contrib#1396
Relevant work is tracked here.
Make v1beta1 storage version for KafkaChannel
Example WIP PR: knative/eventing-contrib#1504
Part of umbrella issue, #70
The recently introduced ConfigMap support could be improved in a few ways and we'd like to refactor to reduce the complexity / maintenance. The following areas need some love...
NOTE - We need to keep the new custom extraction logic which custom loads the Sarama.Version and TLS.Config info from the YAML string. The overall lifecycle of the ConfigMap watchers is reasonable as well and doesn't necessarily require attention at this point.
There are a lot of different exposed (and package private) lifecycle functions related to the ConfigMap handling including...
..which have a tangled call stack and expose too many different usage patterns. We do need the ability to support a couple of slightly different usage patterns between the SharedMain Controller and non-SharedMain Channel/Dispatcher but it should be possible to reduce the complexity here so that it's easier to reason about. Essentially just create a single entry point constructor and maybe another merge function or something along those lines.
Because the original requirements weren't super well defined the comparison of before/after Sarama Config structs was made more complicated than is necessary. We don't need to Marshall the struct out to JSON String, and can ignore the function pointer fields, etc. Currently we have a mirror struct with JSON advice as a means of excluding things from comparison but a more straightforward approach would be to use the cmp
package with ignores and some manual comparision similar to this unit test...
func TestCmp(t *testing.T) {
usernameA := "TestUsernameA"
usernameB := "TestUsernameB"
config1 := sarama.NewConfig()
config1.Net.SASL.User = usernameA
config2 := sarama.NewConfig()
config2.Net.SASL.User = usernameA
config3 := sarama.NewConfig()
config3.Net.SASL.User = usernameB
ignoredTypes := cmpopts.IgnoreTypes(config1.Consumer.Group.Rebalance.Strategy, config1.MetricRegistry, config1.Producer.Partitioner)
ignoredUnexported := cmpopts.IgnoreUnexported(config1.Version)
diff := cmp.Diff(config1, config2, ignoredTypes, ignoredUnexported)
t.Log("Diff = " + diff)
assert.True(t, cmp.Equal(config1, config2, ignoredTypes, ignoredUnexported))
assert.Equal(t, config1.Version, config2.Version)
assert.Equal(t, config1.Consumer.Group.Rebalance.Strategy, config2.Consumer.Group.Rebalance.Strategy)
if config1.Version != config2.Version {
t.Errorf("Version Inequality")
}
assert.False(t, cmp.Equal(config1, config3, ignoredTypes, ignoredUnexported))
}
Problem
There is no automation in this repo.
Persona:
Which persona is this feature for?
Developer
Currently the implementation is based on the Confluent / Librdkafka libraries. Originally (many moons ago) this library was deemed superior but Sarama has made strides in the meantime and is much easier to deal with from a build/deployment perspective. Therefore, based on a recent re-evaluation (see #8) of the libraries we've decided to move to Sarama. At the end of this story there should be no confluent/librdkafka dependencies and everything should work pretty-much as it does today ; )
knative/eventing#2884
/assign
We've been working on e2e tests (still pending) and the major Sarama conversion and have fallen behind on our use of eventing and eventing-contrib/kafka and need to catch up ; ). Aside from moving to the latest master version of eventing-contrib we also want to move to v1beta1 kafkachannels.
E2E tests verifying end to end kafka channel functionality should be added following the paradigm of https://github.com/knative/eventing-contrib/tree/master/test/e2e
We need to implement upgrade tests for KafkaSource and KafkaBinding so that in the future we can change the storage version to v1beta1 from v1alpha1.
Best idea is to keep eventing at latest release and only changing the eventing-kafka version. We can in the future run the tests again with eventing head too.
Part of umbrella issue, #65
Currently the Azure EventHubs AdminClient supports multiple Kafka Authorization Secrets and the "normal" Kafka AdminClient does not. It would be good to step back and define requirements around the consistent support for multiple Kafka instances.
Background
Azure EventHubs (Kafka compatibility / emulation) limit the number of EventHubs (Kafka Topics) can exist in a specific EventHub Namespace to 10. The EventHub Namespaces each have distinct Authorization. The current implementation supporting multiple Kafka Authorizations for EventHubs was put in place to work around this limitation by load-balancing across as many EventHub Namespaces (Kafka Auths) as a user might need.
This is obviously an edge use case in the Kafka community, and the more important issue is whether (and how) to support multiple Kafka instances going forward for multi-tenant type use cases.
This issue is a placeholder reminder to circle back and define specific requirements around the handling of various configuration values for the Channel (Producer) and Dispatcher (Consumer) K8S Deployments.
The eventing-kafka ConfigMap exposes values related to the runtime characteristics of the Channel / Dispatcher deployments created by the Controller (i.e. replicas, cpu_request, cpu_limit, number of consumers, etc...). These values are used when creating new Deployments for the Channel and Dispatcher. Currently, however, changing them in the ConfigMap will only apply the new values to new Deployments and will not update existing Deployments.
It is unclear whether the current behavior is desirable or not, as there are pros and cons for either approach. For instance, making the ConfigMap always update existing Deployments removes the ability to fine-tune or manually change existing deployments (since the Controller would re-reconcile them back to their ConfigMap values).
So... we need to think through the needs of an administrator responsible for running an eventing-kafka cluster and what level of automation vs customizability is appropriate.
We need to implement upgrade tests for KafkaChannel. So that, when in the future we change the storage version to v1beta1 from v1alpha1, we will have things tested.
Best idea is to keep eventing at latest release and only changing the eventing-kafka version. We can in the future run the tests again with eventing head too.
Part of umbrella issue, #70
Problem
Need to update KafkaSource storage version to v1beta1.
Moved from knative/eventing-contrib#1394
Relevant work is tracked here.
Problem
As suggested here: knative/eventing-contrib#1332 (comment)
Persona:
Which persona is this feature for?
Exit Criteria
A measurable (binary) test that would indicate that the problem has been resolved.
Time Estimate (optional):
How many developer-days do you think this may take to resolve?
Additional context (optional)
Add any other context about the feature request here.
Currently we are manually committing offsets in a synchronous fashion inside the event receiving loop for a given consumer. There is no reason that this should be done synchronously and it can have a significant performance impact with some kafka providers (Azure EventHubs being one).
This issue is to redesign how we are handling offset commits. Either we need to change the code to perform offset commit in an asynchronous fashion, or perhaps re-evaluate if we can use the built in offset commit functionality in librdkafka. At the time we thought it was important to manually handle the commit but rather the important thing seems to be manually handling the offset store rather than the commit itself.
Remove KafkaChannel v1alpha1 API shape as written here: knative/eventing-contrib#1504 (review)
Part of umbrella issue, #70
Currently the implementation is based on CloudEvents SDK V1 and we need to catch up with the rest of knative eventing by moving the SDK to V2. Additionally, and probably more importantly, the implementation needs to move to the new/latest knative-eventing channel implementations for sending/receiving CloudEvents.
This is expected to be the final set of changes carried over from the kyma-incubator/knative-kafka project and is a patch from Issue #108 in that repo.
Currently the "dispatcher" deployments are named by convention based on the KafkaChannel's Namespace/Name values. Since we're also limited to 63 char DNS safe K8S names we have truncation logic in place. Therefore, IF there are multiple namespaces with long names which only differ after the first 20 chars, AND there are multiple KafkaChannels in those namespaces with long names which only differ after the first 30 chars, THEN we will have a conflict and the second Dispatcher Deployment will not be created. Therefore we should improve the naming logic so that it is less likely to have a conflict while still maintaining some readability via kubectl queries. We do not have to guarantee there will never be a conflict as we don't want to use generated names and have to track their affiliation with labels or some other mechanism. A simple hashing should provide a reasonable implementation whereby the risk is effectively minimized.
Note - This was discovered by virtue of the e2e tests creating long namespace/kafkachannel names which only differ in their suffix.
Now that eventing-contrib has released v0.17.0 it would be good for us to move forward to the latest on master/. Should be a smallish upgrade ; )
Need to implement performance tests, once we have integration tests working (e2e/conformance - see #4)!
A /hack/release.sh script must be added to support nightly builds. An early poc shows that ko and yaml can be used for this purpose. This was done in the poc by adding and using the librdkafka c libraries in the dependency tree (now supported by current versions of confluent-kafka-go).
However, a local developer experience will need to be somewhat different as cross compilation has some special requirements with cgo builds. An option could be running the ko build from inside of a debian container, driven by a makefile, and connecting to the docker daemon and kubectl running on the local host. Otherwise, it seems that there would be quite a few initial setup steps (or script) which would need to be written for each os.
If these approaches are taken or something similar, helm charts and helm dependency can be removed at the same time
Adequate developer documentation should be added for whichever solution is determined here
Currently the Sarama configuration is set (in pkg/common/kafka/util/util.go) via some parameters and constants. We would like to use a configmap for these settings where possible and reasonable. The eventing-contrib/kafka implementation already has support for such a map, though with minimal content. We’d like to reuse whatever makes sense (such as map watching) but with our own defined content and parsing. This configmap may also include any other custom eventing-kafka config as well (i.e. it will not be Sarama-specific).
This is what eventing-contrib/kafka does, which seems like a good starting-point for eventing-kafka work toward a similar end:
kafka/channel/pkg/reconciler/controller/controller.go::NewController (passed in from sharedmain) uses the passed-in configmap.Watcher to call cmw.Watch("config-kafka", updateKafkaConfig), where "updateKafkaConfig" is a function that loads the configmap data and replaces the Reconciler.kafkaConfig with the one parsed out of the configmap.
The configmap parser itself uses code from knative.dev/pkg/configmap/parse.go that might be useful for our purposes as well (common way to parse out durations, ints, floats, etc).
If we don't want to switch to using sharedmain, we will need to create the watcher ourselves via sharedmain.SetupConfigMapWatchOrDie, which is what the pkg/common/k8s/observability.go code already does (not sure if there's a benefit to sharing a single watcher or not)
Whether we need the huge block of all of the sarama settings or not, putting them as JSON into a configmap is a relatively ordinary procedure and results in something that looks like this:
apiVersion: v1
data:
sarama: '{"Admin":{"Retry":{"Max":5,"Backoff":100000000},"Timeout":3000000000},"Net":{"MaxOpenRequests":5,"DialTimeout":30000000000,"ReadTimeout":30000000000,"WriteTimeout":30000000000,"TLS":{"Enable":true},"SASL":{"Enable":true,"Mechanism":"PLAIN","Version":1,"Handshake":true,"AuthIdentity":"","User":"TestUsername","Password":"TestPassword","SCRAMAuthzID":"","TokenProvider":null,"GSSAPI":{"AuthType":0,"KeyTabPath":"","KerberosConfigPath":"","ServiceName":"","Username":"","Password":"","Realm":""}},"KeepAlive":30000000000,"LocalAddr":null,"Proxy":{"Enable":false,"Dialer":null}},"Metadata":{"Retry":{"Max":3,"Backoff":250000000},"RefreshFrequency":300000000000,"Full":true,"Timeout":0},"Producer":{"MaxMessageBytes":1000000,"RequiredAcks":1,"Timeout":10000000000,"Compression":0,"CompressionLevel":-1000,"Idempotent":false,"Return":{"Successes":false,"Errors":true},"Flush":{"Bytes":0,"Messages":0,"Frequency":0,"MaxMessages":0},"Retry":{"Max":3,"Backoff":100000000}},"Consumer":{"Group":{"Session":{"Timeout":10000000000},"Heartbeat":{"Interval":3000000000},"Rebalance":{"Timeout":60000000000,"Retry":{"Max":4,"Backoff":2000000000}},"Member":{"UserData":null}},"Retry":{"Backoff":2000000000},"Fetch":{"Min":1,"Default":1048576,"Max":0},"MaxWaitTime":250000000,"MaxProcessingTime":100000000,"Return":{"Errors":false},"Offsets":{"CommitInterval":0,"AutoCommit":{"Enable":true,"Interval":1000000000},"Initial":-1,"Retention":0,"Retry":{"Max":3}},"IsolationLevel":0},"ClientID":"TestClientId","RackID":"","ChannelBufferSize":256,"Version":{}}'
kind: ConfigMap
metadata:
name: test
namespace: test
Of course, the point here is that you can omit any entries from the "sarama" JSON that you want to leave as the defaults, so you could just have something that looks considerably simpler:
apiVersion: v1
data:
sarama: '{"Net":{"TLS":{"Enable":true},"SASL":{"Enable":true,"Mechanism":"PLAIN","Version":1,"User":"TestUsername","Password":"TestPassword"}},"Metadata":{"RefreshFrequency":300000000000},"ClientID":"TestClientId"}'
kind: ConfigMap
metadata:
name: test
namespace: test
Merging the JSON with the default sarama.Config is also not complex:
config := NewSaramaConfig(...)
json.Unmarshal([]byte(configMap.Data["sarama"]), &config)
If we need to save our current settings back to a configmap, that gets complicated (and will likely require our own in-sync copy of the sarama.Config struct with custom tags to ignore some JSON-unfriendly fields). This is possible and has been POC'd but will hopefully not be necessary for this issue (writing to the configmap will only be done manually when an administrator wants to make changes).
Regarding the procedures necessary to rebuild/restart sarama infrastructure when the config map changes, there are three main components of note:
The ReconcileKind function calls SetKafkaAdminClient(), which calls kafkaadmin.CreateAdminClient() every time, so we don't have an "old admin client with old settings" to worry about for the moment. This will change if we decide to keep the admin client around in the future but for now this appears to be a non-issue (the only change would be to load the settings from whatever the current configmap is instead of using constants and/or environment variables).
The implementation for the ConsumerGroup is all in one place, but there could be several pods, which would need to be reconfigured (or recreated). The UpdateSubscriptions() function loops through the subscribers and calls CreateConsumerGroup whenever it finds no subscriber wrapper for a particular subscriber UID. The ConsumerGroup goes into the SubscriberWrapper and remains as part of the DispatcherImpl (created in the dispatcher's main.go and remaining for the life of the pod).
If the ConfigMap watcher is only going to run on the controller, it might be easiest to restart all of the dispatcher pods to pick up any new configuration data (the controller could parse the configmap and see if the changes are actually relevant to the ConsumerGroup first if desired)
Risk of trouble from restarting dispatcher pods is low, since the critical data is all stored in a kafka topic, not the eventing-kafka pods themselves, but unnecessary pod churn is, of course, preferably avoided. We could easily start with just having the controller restart the dispatcher pods when the configmap changes (which will presumably not be particularly often). Adjustments to a working system are likely easier than trying to work through all of the potential logic of changing the ConsumerGroups on the fly (either by having the dispatchers also watch the configmap or having a different IPC mechanism from the controller).
The Producer is similar to the ConsumerGroup in that it is created (once, in the NewProducer() function called by the channel's main.go code) and then used repeatedly (during handleMessage() which is called by when a message is received (due to the eventingchannel.NewMessageReceiver() call in the channel's main.go).
If we handle configmap changes by restarting the producer pod, we must be certain that we do not encounter any situation in which an event has been received (i.e. handleMessage has been called) and a non-error returned, but the message not persisted in the Kafka topic.
I don't see any real way for this to happen, as the ProduceKafkaMessage waits for the delivery report via the SyncProducer's SendMessage function before giving handleMessage the go-ahead to return no-error. If the process is terminated anywhere in that execution path, the caller of handleMessage (which is the ServeHTTP function in eventing's message_receiver.go file) will get an error (network or otherwise) that will be passed back to the caller (that is, the customer). Errors are acceptable for a brief time during reconfiguration; if less downtime is desired, the producer could watch the configmap directly and reconfigure the SyncProducer itself instead of letting the controller bounce it.
As with the ConsumerGroup it is probably best to start with a naïve restarting of pods and see if the required performance during configuration changes demands a modification to that approach.
Get rid of dep and switch over! Lots of examples in eventing and eventing-contrib to reference here.
Problem
The Kafka configuration is currently being hardcoded in the KafkaSource adapter. For instance, the commit policy is currently (assuming #100 is merged) set to autocommit
every 1s, compromising speed vs duplicate events. Some applications might want the opposite compromise.
Persona:
Which persona is this feature for?
Application developer
Exit Criteria
A measurable (binary) test that would indicate that the problem has been resolved.
Check the Kafka configuration varies upon user-input.
Time Estimate (optional):
How many developer-days do you think this may take to resolve?
1
Additional context (optional)
Add any other context about the feature request here.
I would like to test this without having to build from source
Recent refactoring of environment variable loading in channel/dispatcher resulted in passwords being logged - need to fix that. Also need to remove the local-dev.sh script which is no longer used now that we're on Sarama and don't need to build inside linux docker container.
/kind bug
Describe the bug
The channel dispatcher does not inherit the main context when creating consumer group, preventing subscribers to gracefully shut down
Expected behavior
The channel dispatcher should inherit the main context
To Reproduce
Knative release version
HEAD
Additional context
Add any other context about the problem here such as proposed priority
In order to improve our alignment with the core Knative Eventing implementations we should support the config-tracing
and config-observability
global ConfigMaps in the knative-eventing
namespace. This is similar to the work we did to align on the config-logging
implementation.
Problem
With knative/pkg#1714, pod names passed as env vars are used in logging.
Persona:
Which persona is this feature for?
Exit Criteria
Pod name passed as env vars properly to pods in deployments.
Time Estimate (optional):
0.125
Additional context (optional)
Ref impl: knative/eventing#4072
adoption of Knative Kafka
Let's have a document on the repo that shows users of the Knative Kafka component:
KafkaChannel
KafkaSource
In the eventing-kafka code (specifically in pkg/common/prometheus/metrics.go) we create a Prometheus server manually, and use the Observe function in channel/producer/producer.go and dispatcher/dispatcher.go to record metrics.
We want to take out this explicit Prometheus server and switch to using the knative-eventing built-in observability functionality (which will by default create a Prometheus server, but should be able to be changed to use anything that opencensus supports).
Code changes
When this is all complete there should be (if possible) no references to Prometheus explicitly in the eventing-kafka codebase (barring vendor code).
Examples
There are a few examples of how to use the StatsReporter.ReportEventCount function in the eventing code, such as pkg/broker/ingress/ingress_handler.go
The github.com/census-ecosystem/opencensus-go-exporter-prometheus project has examples of how to use opencensus to create views and verify that metrics are being produced and exported properly.
InitializeObservability sample code:
observability.zip
Original Observability research issue: #29
Describe the bug
In the previous version eventing.knative.dev/scope: namespace would allow me to connect to different Kafka clusters at least at the namespace level.
Expected behavior
The ability to connect to more than one kafka cluster with knative.
To Reproduce
Add the eventing.knative.dev/scope: namespace to a kafka channel (distributed) and nothing happens.
Knative release version
nightly
Additional context
We don't absolutely need eventing.knative.dev/scope: namespace to work, we just need a way to connect to multiple kafka clusters, like maybe even specifying it at the KafkaChannel level similar to how it is in KafkaSource.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.