Coder Social home page Coder Social logo

hpgrahsl / kryptonite-for-kafka Goto Github PK

View Code? Open in Web Editor NEW
78.0 7.0 7.0 231 KB

Kryptonite for Kafka is a client-side 🔒 field level 🔓 cryptography library for Apache Kafka® offering a Kafka Connect SMT, ksqlDB UDFs, and a standalone HTTP API service. It's an ! UNOFFICIAL ! community project

Java 100.00%
apache-kafka kafka kafka-connect smt transformations cryptography encryption-decryption end-to-end-encryption field-level-encryption ksqldb

kryptonite-for-kafka's Introduction

Kryptonite for Kafka: Client-Side 🔒 Field-Level 🔓 Cryptography for Apache Kafka®

Donate

Disclaimer: This is an UNOFFICIAL community project!

Overview

Kryptonite for Kafka is a library to do field-level cryptography for records on their way into and out of Apache Kafka®. Currently, it targets three main use cases:

  1. data integration scenarios based on Kafka Connect by means of a turn-key ready transformation (SMT) to run encryption / decryption operations on selected fields of records with or without schema
  2. stream processing scenarios based on ksqlDB by providing custom user-defined functions (UDF) to encrypt / decrypt selected data columns in STREAMs and TABLEs respectively
  3. cross language/runtime scenarios by running a co-located Quarkus Funqy service exposing a lightweight web API to encrypt / decrypt payloads, or fields thereof, from any client application talking HTTP.

Build, Installation and Deployment

Either you build this project from sources via Maven or you can download pre-built, self-contained packages of the latest Kryptonite for Kafka artefacts.

Kafka Connect SMT

Starting with Kryptonite for Kafka 0.4.0, the pre-built Kafka Connect SMT can be downloaded directly from the release pages.

In order to deploy this custom SMT put the root folder of the extracted archive into your 'connect plugin path' that is configured to be scanned during boostrap of the kafka connect worker node(s).

After that, configure Kryptonite's CipherField transformation for any of your source / sink connectors. Read here about the configuration options and how to apply the SMT based on simple examples.

ksqlDB UDFs

Starting with Kryptonite for Kafka 0.4.0, the pre-built ksqlDB UDFs can be downloaded directly from the release pages.

In order to deploy the UDFs put the jar into your 'ksql extension directory' that is configured to be scanned during bootstrap of the ksqlDB server process(es).

After that, start using the UDFs, namely K4KENCRYPT and K4KDECRYPT, to selectively encrypt and decrypt column values in ksqlDB rows of TABLES and STREAMS respectively. Read here about the configuration options and how to apply the UDFs based on simple examples.

Quarkus Funqy HTTP API service

Starting with Kryptonite for Kafka 0.4.0, the pre-built Quarkus application can be downloaded directly from the release pages.

When building from sources and before running the Quarkus application in dev mode (./mvnw quarkus:dev or quarkus dev), make sure to specify your individual configuration options in application.properties. In case you run the pre-built binaries in prod mode (java -jar target/quarkus-app/quarkus-run.jar), you have to properly override any of the mandatory/default settings when starting the application. Read here about the configuration options and how to use the HTTP API based on example requests.

Cipher Algorithm Specifics

The project uses authenticated encryption with associated data (AEAD) and in particular applies AES in GCM mode for probabilistic encryption (default) or SIV mode for uses cases which either require or at least benefit from deterministic encryption. The preferred and new default way is to configure Kryptonite to use Google's Tink multi-language, cross-platform open-source cryptography library. Kryptonite for Kafka version 0.4.0+ provides the following cipher algorithms:

  • AEAD using AES in GCM mode for probabilistic encryption based on Tink's implementation
  • DAEAD using AES in SIV mode for deterministic encryption based on Tink's implementation

These cryptographic primitives offer support for authenticated encryption with associated data (AEAD). This basically means that besides the ciphertext, an encrypted field additionally contains unencrypted but authenticated meta-data. In order to keep the storage overhead per encrypted field relatively low, the implementation currently only incorporates a version identifier for Kryptonite itself together with a short identifier representing the algorithm as well as the keyset identifier which was used to encrypt the field in question. Future versions might benefit from additional meta-data.

By design, every application of AEAD in probabilistic mode on a specific record field results in different ciphertexts for one and the same plaintext. This is in general not only desirable but very important to make attacks harder. However, in the context of Kafka records this has an unfavorable consequence for producing clients e.g. a source connector. Applying Kryptonite using AEAD in probabilistic mode on a source record's key would result in a 'partition mix-up' because records with the same original plaintext key would end up in different topic partitions. In other words, if you plan to use Kryptonite for source record keys make sure to configure it to apply deterministic AEAD i.e. AES in SIV mode. Doing so safely supports the encryption of record keys and keeps topic partitioning and record ordering intact.

Donate

If you like this project and want to support its future development and maintenance we are happy about your PayPal donation.

License Information

This project is licensed according to Apache License Version 2.0

Copyright (c) 2021 - present. Hans-Peter Grahsl ([email protected])

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

kryptonite-for-kafka's People

Contributors

hpgrahsl avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

kryptonite-for-kafka's Issues

Option to use funqy-http-kryptonite with single field values without requiring to wrap strings in quotes

In the end this is a bit of a "nice to have" but it would be quite helpful and less confusing for us if there was a way to use the funqy-http-kryptonite with single value strings without having to wrap the value in quotes, and to receive the value back without it being wrapped in quotes.

E.g. if I run an instance of the server and try to encrypt a value like this:

$ curl --data "test" http://localhost:9080/encrypt/value

Then the response is an empty HTTP 400 error and there is an exception logged in the container:

2024-03-07 15:43:55,364 ERROR [io.qua.funqy] (vert.x-eventloop-thread-3) Failed to unmarshal input: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'test': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (io.netty.buffer.ByteBufInputStream); line: 1, column: 5]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2418)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:759)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3693)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3666)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchToken2(UTF8StreamJsonParser.java:3007)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._matchTrue(UTF8StreamJsonParser.java:2941)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:878)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:793)
        at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357)
        at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:2095)
        at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1481)
        at io.quarkus.funqy.runtime.bindings.http.VertxRequestHandler.lambda$handle$2(VertxRequestHandler.java:107)
        at io.vertx.core.impl.future.FutureImpl$1.onSuccess(FutureImpl.java:91)
        at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
        at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:86)
        at io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:163)
        at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:51)
        at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
        at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
        at io.vertx.core.http.impl.HttpEventHandler.handleEnd(HttpEventHandler.java:79)
        at io.vertx.core.http.impl.Http1xServerRequest.onEnd(Http1xServerRequest.java:581)
        at io.vertx.core.http.impl.Http1xServerRequest.lambda$pendingQueue$1(Http1xServerRequest.java:130)
        at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
        at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129)
        at io.vertx.core.http.impl.Http1xServerRequest.handleEnd(Http1xServerRequest.java:562)
        at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:76)
        at io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:153)
        at io.vertx.core.http.impl.Http1xServerConnection.onEnd(Http1xServerConnection.java:191)
        at io.vertx.core.http.impl.Http1xServerConnection.onContent(Http1xServerConnection.java:181)
        at io.vertx.core.http.impl.Http1xServerConnection.handleOther(Http1xServerConnection.java:161)
        at io.vertx.core.http.impl.Http1xServerConnection.handleMessage(Http1xServerConnection.java:149)
        at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:157)
        at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
        at io.netty.handler.codec.http.websocketx.extensions.WebSocketServerExtensionHandler.channelRead(WebSocketServerExtensionHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.vertx.core.http.impl.Http1xOrH2CHandler.end(Http1xOrH2CHandler.java:61)
        at io.vertx.core.http.impl.Http1xOrH2CHandler.channelRead(Http1xOrH2CHandler.java:38)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:829)

But if we instead wrap the value in quotes then it works (though the value we receive back is also wrapped in quotes):

$ curl --data "\"test\"" http://localhost:9080/encrypt/value
"JwFp/j8gmvvsp15jTZOGdSp89FQzUmevY0ERBzBIYNvw3UG8MXYaDDCybXktdGVzdC1rZXktdrFrsQ=="

I can understand that it seems to be expecting "valid json" as the input and returning "valid json" as the output, but honestly if we had a way avoid wrapping the values in quotes and then that the value we receive back were also not wrapped in quotes, then it would remove pre processing and post processing that we need to do every time we are using this service currently 😎

Maybe it could be a separate endpoint to specify we are sending a string e.g. /encrypt/string or /encrypt/value/string which could handle the pre- and postprocessing logic for us? Or any other idea would be welcome, of course!

NPE when encrypting fields for SQLServer databases using debezium

I am using debezium to do a CDC from SQLServer to kafka, and as per the business needs, some of the columns must be encrypted.

Here is the snippet of the connector json file

{
"name": "live.sql.users",
...
        "transforms.unwrap.delete.handling.mode": "drop",
        "transforms": "unwrap,cipher",
        "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
        "transforms.unwrap.drop.tombstones": "false",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.cipher.predicate": "isTombstone",
        "transforms.cipher.negate": "true",
        "transforms.cipher.cipher_data_keys": "[ { \"identifier\": \"my-key\", \"material\": { \"primaryKeyId\": 1000000001, \"key\": [ { \"keyData\": { \"typeUrl\": \"type.googleapis.com/google.crypto.tink.AesGcmKey\", \"value\": \"GhDLeulEJRDC8/19NMUXqw2jK\", \"keyMaterialType\": \"SYMMETRIC\" }, \"status\": \"ENABLED\", \"keyId\": 2000000002, \"outputPrefixType\": \"TINK\" } ] } } ]",
        "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
        "transforms.cipher.cipher_mode": "ENCRYPT",
        "predicates": "isTombstone",
        "transforms.cipher.field_config": "[{\"name\":\"Password\"},{\"name\":\"MobNumber\"}, {\"name\":\"UserName\"}]",
        "transforms.cipher.cipher_data_key_identifier": "my-key"
...
}

and when I applied it, after few seconds I got the below error, when I call the /connectors/<connector_name>/status api

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:346)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)\n\t
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\t
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\t
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\t
at java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: error: ENCRYPT of field path 'UserName' having data 'deleted605' failed unexpectedly\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:90)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.lambda$matchFields$0(SchemaawareRecordHandler.java:73)\n\t
at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)\n\t
at java.base/java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaawareRecordHandler.matchFields(SchemaawareRecordHandler.java:50)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:163)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:140)\n\t
at org.apache.kafka.connect.runtime.PredicatedTransformation.apply(PredicatedTransformation.java:56)\n\t
at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\t
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t
... 11 more\nCaused by: java.lang.NullPointerException\n\t
at com.esotericsoftware.kryo.util.DefaultGenerics.nextGenericTypes(DefaultGenerics.java:77)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.pushTypeVariables(FieldSerializer.java:144)\n\t
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:102)\n\t
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:627)\n\t
at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.RecordHandler.processField(RecordHandler.java:75)\n\t
... 21 more\n

Knowing that, the same configs working with other connectors with no problems

Java 17 seems to be not yet supported

Hi @hpgrahsl,

Thanks for your great library, I have tried it using Java 11 and it's working fine. But when I use different environment with Java 17, it's throwing an error as below:

Unable to make field final java.util.Collection java.util.Collections$UnmodifiableCollection.c accessible: module java.base does not "opens java.util" to unnamed module @30ed8e6f

aused by: java.lang.RuntimeException: Could not access source collection field in java.util.Collections$UnmodifiableCollection.
at de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer.(UnmodifiableCollectionsSerializer.java:62)
... 30 more
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field final java.util.Collection java.util.Collections$UnmodifiableCollection.c accessible: module java.base does not "opens java.util" to unnamed module @30ed8e6f
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
at de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer.(UnmodifiableCollectionsSerializer.java:55)
... 30 more

I come to search over the internet and find out it's Java incompatibility issue.
When you have time (I know you were busy), we can work together to fix this.

Appreciate your help,
Vincent Trinh.

READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE exception thrown when using Kryptonite together with Strimzi's kafka-kubernetes-config-provider v 1.1.2

If we try to use Kryptonite and Strimzi's kafka-kubernetes-config-provider v1.1.2 or higher together in the same Connect instance (and even more specifically that we are using Confluent's "confluent-security" plugin where it needs to load the Strimzi classes upon startup and for initial connection to the Kafka brokers), then we get an exception when creating connectors that use the Kryptonite SMT. I think this might be due to an incompatibility with Jackson 2.15.x+ dependency from the Strimzi package and what is used in older versions of Jackson with Kryptonite.

Here is an example stack trace, I think specifically on the line with com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.FieldConfigValidator.ensureValid(FieldConfigValidator.java:34) is where Kryptonite comes into play here:

javax.servlet.ServletException: org.glassfish.jersey.server.ContainerException: java.lang.NoSuchFieldError: READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:410)
	at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:346)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:358)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:311)
	at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:205)
	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:554)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)
	at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:560)
	at io.confluent.common.security.jetty.initializer.ConnectConstraintSecurityHandler.handle(ConnectConstraintSecurityHandler.java:35)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)
	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)
	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)
	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
	at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)
	at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)
	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
	at org.eclipse.jetty.server.Server.handle(Server.java:516)
	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)
	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)
	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)
	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
	at org.eclipse.jetty.io.ssl.SslConnection$DecryptedEndPoint.onFillable(SslConnection.java:555)
	at org.eclipse.jetty.io.ssl.SslConnection.onFillable(SslConnection.java:410)
	at org.eclipse.jetty.io.ssl.SslConnection$2.succeeded(SslConnection.java:164)
	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:409)
	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)
	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.glassfish.jersey.server.ContainerException: java.lang.NoSuchFieldError: READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE
	at org.glassfish.jersey.servlet.internal.ResponseWriter.rethrow(ResponseWriter.java:255)
	at org.glassfish.jersey.servlet.internal.ResponseWriter.failure(ResponseWriter.java:237)
	at org.glassfish.jersey.server.ServerRuntime$Responder.process(ServerRuntime.java:438)
	at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:263)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
	at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
	at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
	at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
	at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)
	at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)
	at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
	... 43 more
Caused by: java.lang.NoSuchFieldError: READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE
	at com.fasterxml.jackson.databind.deser.std.EnumDeserializer.createContextual(EnumDeserializer.java:211)
	at com.fasterxml.jackson.databind.DeserializationContext.handlePrimaryContextualization(DeserializationContext.java:836)
	at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.resolve(BeanDeserializerBase.java:550)
	at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:294)
	at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244)
	at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142)
	at com.fasterxml.jackson.databind.DeserializationContext.findContextualValueDeserializer(DeserializationContext.java:621)
	at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.createContextual(CollectionDeserializer.java:188)
	at com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.createContextual(CollectionDeserializer.java:28)
	at com.fasterxml.jackson.databind.DeserializationContext.handleSecondaryContextualization(DeserializationContext.java:867)
	at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:659)
	at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:4956)
	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4826)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3772)
	at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3755)
	at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.validators.FieldConfigValidator.ensureValid(FieldConfigValidator.java:34)
	at org.apache.kafka.common.config.ConfigDef$1.ensureValid(ConfigDef.java:1578)
	at org.apache.kafka.common.config.ConfigDef.parseForValidate(ConfigDef.java:625)
	at org.apache.kafka.common.config.ConfigDef.parseForValidate(ConfigDef.java:559)
	at org.apache.kafka.common.config.ConfigDef.validateAll(ConfigDef.java:550)
	at org.apache.kafka.connect.runtime.AbstractHerder.validateSourceConnectorConfig(AbstractHerder.java:371)
	at org.apache.kafka.connect.runtime.distributed.DistributedHerder.validateSourceConnectorConfig(DistributedHerder.java:925)
	at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:532)
	at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$4(AbstractHerder.java:442)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more

If I roll back kafka-kubernetes-config-provider to 1.1.1 (uses Jackson 2.14.2) then it seems to work ok.

Support for array of objects

Any plan to support ELEMENT encryption for an array of objects?
Example data:

{
  "_id":"gabd9a39-9856-38b6-b983-94513f746f34",
  "currency":"ABC",
  "fields":[
    {
      "fieldOne":"someValueOne",
      "fieldTwo":"someValueTwo"
    },
    {
      "fieldOne":"anotherValueOne",
      "fieldTwo":"anotherValueTwo"
    },
  ],
  "nested":{
    "value": 123,
    "valueString":"abc"
  }
}

I would like to encrypt all `fields[*].fieldOne'. I have tried that by setting:

        "transforms.cipher.field_config": "[{\"name\":\"fields\"},{\"name\":\"fields.fieldOne\"}]",
        "transforms.cipher.field_mode": "ELEMENT"

But that fails with exception:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:395)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:361)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:829)
Caused by: java.util.NoSuchElementException: no default type mapping found for type STRUCT (optional true) and cipher mode ENCRYPT
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.TypeSchemaMapper.lambda$getSchemaForPrimitiveType$0(TypeSchemaMapper.java:63)
    at java.util.Optional.orElseThrow(Optional.java:408)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.TypeSchemaMapper.getSchemaForPrimitiveType(TypeSchemaMapper.java:62)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptArraySchema(SchemaRewriter.java:110)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptField(SchemaRewriter.java:87)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.adaptSchema(SchemaRewriter.java:74)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.processWithSchema(CipherField.java:184)
    at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField.apply(CipherField.java:165)
    at org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:50)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 13 more

AWS KMS support

From what I understood, currently only Azure Key Vault and Google Cloud KMS are supported. Would it be possible to also add support for AWS KMS?
Is there another way to use AWS KMS currently?

Decryption using python and aws kms

Hello @hpgrahsl, we are planning to create an encryption/decryption architecture using kryptonite-for-kafka in Debezium source connector but facing some issues mentioned below:

  1. I have successfully produced encrypted data in Kafka using the kryptonite transformation package, but I want to decrypt this data using Python/PySpark. How can I achieve it? As per my understanding, in your code, you have used Kryo serialization, but this is not available in Python. Can you please help me with this or provide me a sample Python script for decryption?

  2. How can we pass AWS KMS key payload directly in the source connector?

  transforms.cipher.cipher_data_keys: {
      "KeyMetadata": {
          "AWSAccountId": "123456789012",
          "KeyId": "arn:aws:kms:us-east-1:123456789012:key/abcd1234-a123-456a-a12b-a123b4cd56ef",
          "Arn": "arn:aws:kms:us-east-1:123456789012:key/abcd1234-a123-456a-a12b-a123b4cd56ef",
          "CreationDate": 1642604273.418,
          "Enabled": true,
          "Description": "",
          "KeyUsage": "ENCRYPT_DECRYPT",
          "KeyState": "Enabled",
          "Origin": "AWS_KMS",
          "KeyManager": "CUSTOMER",
          "CustomerMasterKeySpec": "SYMMETRIC_DEFAULT",
          "EncryptionAlgorithms": [
              "SYMMETRIC_DEFAULT"
          ],
          "SigningAlgorithms": [
              "RSASSA_PSS_SHA_512"
          ]
      }
  }
  1. How to use field-level keys (different keys for different fields)?
exm: tabl1 has three column c1,c2 and c3 i want to encrypt those column with three different keys

5.I have a single source connector for multiple fact tables then, how to configure the transforms.cipher.field_config parameter for different tables with different fields?

table.include.list: 'dbo.table1,dbo.table2,dbo.table3,...dbo.tableN'
encrypt.fields.table1: 'mobile'
encrypt.fields.table2: 'userid'

Hope you will provide a response with sample examples.

Invalid value com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value, for configuration transforms.cipher.type: Class com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value, could not be found.

As per the suggestion, I tried to configure but it showed that it was not able to find the value. I ensured I have the jar file inside my path (kryptonite-0.4.1.jar and connect-transform-kryptonite-0.4.1.jar) I see that it is loaded. Any reason why com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value is not working?

Decryption issue when using JDBC Sink Connector

I'm using JDBC source/sink connector to transfer data between different MSSQL Servers.
And I want to encrypt a field values in Kafka, So I'm trying to using "kryptonite-for-kafka".

This is my sample source table schema.

NAME TYPE COMMENT
SEQ BIGINT PRIMARY_KEY
ITEM_NO BIGINT
ITEM_NAME VARCHAR(20) Encryption target
CHANGE_DATE DATETIME2

So, I setted JDBC SourceConnector using AvroConverter

{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://{schema}:8081",
    "value.converter.schema.registry.url": "http://{schema}:8081",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",   
    "transforms": "cipher",
    "transforms.cipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
    "transforms.cipher.cipher_mode": "ENCRYPT",
    "transforms.cipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
    "transforms.cipher.cipher_data_key_identifier": "my-demo-secret-key-123",
    "transforms.cipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
    "transforms.cipher.predicate":"isTombstone",
    "transforms.cipher.negate":true,
    "predicates": "isTombstone",
    "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
    "connection.url": "jdbc:sqlserver://{MSSQL}:1433",
    "connection.user": "user",
    "connection.password": "pass",
    "mode": "timestamp+incrementing",
    "incrementing.column.name": "SEQ",
    "timestamp.column.name": "CHANGE_DATE",
    "validate.non.null": "true",
    "query": "SELECT SEQ, ITEM_NO, ITEM_NAME, CHANGE_DATE FROM source_table WHERE ITEM_NO = 1111",   
    "poll.interval.ms": "5000",
    "topic.prefix": "test-topic"
  }

And I successfully encryted and produced to kafka topic.
here is message and Schema

VALUE
{  
  "ITEM_NO": 1111,
  "ITEM_NAME": "LQE7msoBMszuPICzYwhPzeKbzDYPzMkAS8gG2wyeZZFDoLeAnRPGfRrTImHfDDCybXktZGVtby1zZWNyZXQta2V5LTEys2ux",
  "CHANGE_DATE": {
    "long": 1686219238466
  },
  "SEQ": 44
}


VALUE SCHEMA

{
  "fields": [    
    {
      "name": "ITEM_NO",
      "type": "long"
    },
    {
      "name": "ITEM_NAME",
      "type": "string"
    },
    {
      "default": null,
      "name": "CHANGE_DATE",
      "type": [
        "null",
        {
          "[connect.name](http://connect.name/)": "org.apache.kafka.connect.data.Timestamp",
          "connect.version": 1,
          "logicalType": "timestamp-millis",
          "type": "long"
        }
      ]
    },
    {
      "name": "SEQ",
      "type": "long"
    }
  ],
  "name": "ConnectDefault",
  "namespace": "io.confluent.connect.avro",
  "type": "record"
}

And I tried to decrypt and insert the message to another MSSQL using JDBC Sink Connector.
But a problem occurred here. (table schema is same)

I setted JDBC Sink Connector like this.


{
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://{schema}:8081",
    "value.converter.schema.registry.url": "http://{schema}:8081",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true",
    "topics": "test-topic",
    "transforms": "decipher",   
    "transforms.decipher.type": "com.github.hpgrahsl.kafka.connect.transforms.kryptonite.CipherField$Value",
    "transforms.decipher.cipher_mode": "DECRYPT",
    "transforms.decipher.field_config": "[{\"name\":\"ITEM_NAME\"}]",
    "transforms.decipher.cipher_data_keys": "${file:/etc/kafka/connect-classified.properties:cipher_data_keys}",
    "transforms.decipher.cipher_data_key_identifier": "my-demo-secret-key-123",
    "transforms.decipher.predicate":"isTombstone",
    "transforms.decipher.negate":true,
    "predicates": "isTombstone",
    "predicates.isTombstone.type": "org.apache.kafka.connect.transforms.predicates.RecordIsTombstone",
    "connection.url": "jdbc:sqlserver://{MSSQL2}:1433",
    "connection.user": "user",
    "connection.password": "pass",
    "table.name.format": "test_sink_table",
    "delete.on.null.values": true
  }

I got these error messages

[2023-06-08 10:07:16,577] ERROR WorkerSinkTask{id=jaeshim_jdbc_sink_connector_cipher-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:208)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:237)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:159)
        at org.apache.kafka.connect.runtime.TransformationChain.transformRecord(TransformationChain.java:70)
        at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:50)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:543)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:494)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: schema-aware data needs schema spec for DECRYPT but none was given for field path 'ITEM_NAME'
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewriter.lambda$derivePrimaryType$0(SchemaRewriter.java:170)
        at java.base/java.util.Optional.orElseThrow(Optional.java:408)
        at com.github.hpgrahsl.kafka.connect.transforms.kryptonite.SchemaRewri

I think Sink Connector using "DECRYPT" but there is any schema related to DECRYPT with 'ITEM_NAME'.
I tried using JsonSchemaConverter or JsonConvert with schema, but it's the same problem.

Could you tell me what could be the problem with these errors?

Separate function for Decryption not working

Hi @hpgrahsl ,

I'm trying to write an function to decrypt the encrypted values from the output of this plugin.
But currently it's not able to decrypt. Here is the code.
key.json

`import com.google.crypto.tink.Aead;
import com.google.crypto.tink.CleartextKeysetHandle;
import com.google.crypto.tink.JsonKeysetReader;
import com.google.crypto.tink.KeysetHandle;
import com.google.crypto.tink.aead.AeadConfig;
import com.google.crypto.tink.aead.AeadFactory;
import com.google.crypto.tink.aead.AeadKeyTemplates;
import com.google.crypto.tink.config.TinkConfig;

import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.Base64;

public class TinkEncryption {
public static byte[] asBytes() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
baos.writeBytes("k1".getBytes(StandardCharsets.UTF_8));
baos.writeBytes("02".getBytes(StandardCharsets.UTF_8));
baos.writeBytes("584420223".getBytes(StandardCharsets.UTF_8));
return baos.toByteArray();
}

private static final byte[] EMPTY_ASSOCIATED_DATA = new byte[0];


public static void main(String[] args) throws Exception {
    AeadConfig.register();


    KeysetHandle handle = null;
    try (FileInputStream inputStream = new FileInputStream("key.json")) {
        handle = CleartextKeysetHandle.read(JsonKeysetReader.withInputStream(inputStream));
    } catch (GeneralSecurityException | IOException ex) {
        System.err.println("Error reading key: " + ex);
        System.exit(1);
    }

    Aead aead = null;
    try {
        aead = handle.getPrimitive(Aead.class);
    } catch (GeneralSecurityException ex) {
        System.err.println("Error creating primitive: %s " + ex);
        System.exit(1);
    }

    System.out.println("Read key file successfully");

    String plainText = "This is a plain text which needs to be encrypted!";
    String aad = "k102584420223";
    byte[] ciphertext = aead.encrypt(plainText.getBytes(), aad.getBytes());
    System.out.println(Base64.getEncoder().encodeToString(ciphertext));
    byte[] decrypted = aead.decrypt(ciphertext, aad.getBytes());
    String decrypted_text = new String(decrypted);
    System.out.println(decrypted_text);
    System.out.println("Done...");





    //String base64 = "ASLVi39Pz9uqvEa9WDTFTBfqMKZ8VhzlJ4iRxgmCXDcQMy3GPMaYzmFo1P6Q7qEOzzPPTaWLboJ60f00NuPmbkrj6mCxxbRogSy1eCW6+abxaQ==";
    //String base64 = "LgEi1Yt/HVcZLhbLctOOFDjVkmI079znSMFq8U33SIIlr6s9iIPDcTNd24ULswwwsm15LWRlbW8tc2VjcmV0LWtleS0xMrNrsQ==";
    String base64 = "OQEi1Yt/1e0OpZh4ru/ouSYU2WOErLlA56rxHGw3zryCxBviUxXzPduGpwZ/JJE0WwRwGUFR/sImDDCybXktZGVtby1zZWNyZXQta2V5LTEys2ux";
    ciphertext = Base64.getDecoder().decode(base64);
    decrypted = aead.decrypt(ciphertext, asBytes());
    decrypted_text = new String(decrypted);
    System.out.println(decrypted_text);



}

}
`

Here is the output

Exception in thread "main" java.security.GeneralSecurityException: decryption failed at com.google.crypto.tink.aead.AeadWrapper$WrappedAead.decrypt(AeadWrapper.java:112) at TinkEncryption.main(TinkEncryption.java:90)

Want to check with you if the asBytes() function here (for associatedData) is correct or not.
Really appreciate your help.

Thank you so much,
Vincent Trinh.

Cannot decrypt int64 (long) fields

Hi, @hpgrahsl

I encountered an error when trying to decrypt fields of type long (INT64). I believe there is a bug in TypeSchemaMapper.java

Map<Type, Supplier<SchemaBuilder>> DEFAULT_MAPPINGS_DECRYPT =
      Map.of(
          Type.BOOLEAN, SchemaBuilder::bool,
          Type.INT8, SchemaBuilder::int8,
          Type.INT16, SchemaBuilder::int16,
          Type.INT32, SchemaBuilder::int32,
          Type.INT64, SchemaBuilder::int16,
          Type.FLOAT32, SchemaBuilder::float32,
          Type.FLOAT64, SchemaBuilder::float64,
          Type.STRING, SchemaBuilder::string,
          Type.BYTES, SchemaBuilder::bytes
      );

INT64 is mapped to int16 - shouldn't it be int64?

keyId and primaryKeyId generated from Tinkey fails with Kryptonite v0.4.1 if the number is greater than 2147483647

We are working on an implementation of some Connectors using v0.4.1 of this Kryptonite SMT (really cool project!) but ran into a problem.

When using Google's Tinkey CLI's create-keyset function to generate our key material, it seems that it creates some kind of random number for the keyId and primaryKeyId fields (not 100% sure if there is any logic behind it or not).

(for example like this:)

tinkey create-keyset --key-template AES128_GCM --out-format json --out some_example_keyset.json

Then we ran into an issue just using the value from Tinkey straight away if ID number was higher than 2147483647. Some errors popped up in the Connect server like this:

org.apache.kafka.connect.errors.ConnectException: org.apache.kafka.common.config.ConfigException: Invalid value com.fasterxml.jackson.databind.JsonMappingException: Numeric value (3015563227) out of range of int (-2147483648 - 2147483647)
at [Source: (String)"[{"identifier":"test.20231204","material":{"key":[{"keyData":{"keyMaterialType":"SYMMETRIC","typeUrl":"type.googleapis.com/google.crypto.tink.AesGcmKey","value":"***"},"keyId":3015563227,"outputPrefixType":"TINK","status":"ENABLED"}],"primaryKeyId":3015563227}}]"; line: 1, column: 216] (through reference chain: java.util.HashSet[0]->com.github.hpgrahsl.kryptonite.config.DataKeyConfig["material"]->com.github.hpgrahsl.kryptonite.config.TinkKeyConfig["key"]->java.util.HashSet[0]->com.github.hpgrahsl.kryptonite.config.TinkKeyConfig$KeyConfig["keyId"]) for configuration Numeric value (3015563227) out of range of int (-2147483648 - 2147483647)
at [Source: (String)"[{"identifier":"test.20231204","material":{"key":[{"keyData":{"keyMaterialType":"SYMMETRIC","typeUrl":"type.googleapis.com/google.crypto.tink.AesGcmKey","value":"***"},"keyId":3015563227,"outputPrefixType":"TINK","status":"ENABLED"}],"primaryKeyId":3015563227}}]"; line: 1, column: 216] (through reference chain: java.util.HashSet[0]->com.github.hpgrahsl.kryptonite.config.DataKeyConfig["material"]->com.github.hpgrahsl.kryptonite.config.TinkKeyConfig["key"]->java.util.HashSet[0]->com.github.hpgrahsl.kryptonite.config.TinkKeyConfig$KeyConfig["keyId"])

But then if we re-generate a new keyset which happens to have less than 2147483647 for its ID, then it seems to work ok.

My assumption/hope is if the data type for these TinkKeyConfig properties can be swapped from int to something else, that this would be an easy fix? 👼

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.