Coder Social home page Coder Social logo

castorm / kafka-connect-http Goto Github PK

View Code? Open in Web Editor NEW
119.0 8.0 52.0 842 KB

Kafka Connect connector that enables Change Data Capture from JSON/HTTP APIs into Kafka.

Home Page: https://castorm.github.io/kafka-connect-http/

License: Apache License 2.0

Java 100.00%
kafka kafka-connect connectors http rest source connector cdc change-data-capture api

kafka-connect-http's Introduction

Kafka Connect HTTP Connector

Build Codacy Badge FOSSA Status Release to GitHub Release to Maven Central Maven Central

Kafka Connect connector that enables Change Data Capture from JSON/HTTP APIs into Kafka.

This connector is for you if

  • You want to (live) replicate a dataset exposed through JSON/HTTP API
  • You want to do so efficiently
  • You want to capture only changes, not full snapshots
  • You want to do so via configuration, with no custom coding
  • You want to be able to extend the connector if it comes to that

Examples

See examples, e.g.

Getting Started

If your Kafka Connect deployment is automated and packaged with Maven, you can unpack the artifact on Kafka Connect plugins folder.

<plugin>
    <artifactId>maven-dependency-plugin</artifactId>
    <execution>
        <id>copy-kafka-connect-plugins</id>
        <phase>prepare-package</phase>
        <goals>
            <goal>unpack</goal>
        </goals>
        <configuration>
            <outputDirectory>${project.build.directory}/docker-build/plugins</outputDirectory>
            <artifactItems>
                <artifactItem>
                    <groupId>com.github.castorm</groupId>
                    <artifactId>kafka-connect-http</artifactId>
                    <version>0.8.11</version>
                    <type>tar.gz</type>
                    <classifier>plugin</classifier>
                </artifactItem>
            </artifactItems>
        </configuration>
    </execution>
</plugin>

Otherwise, you'll have to do it manually by downloading the package from the Releases Page.

More details on how to Install Connectors.

Source Connector

com.github.castorm.kafka.connect.http.HttpSourceConnector

Extension points

The connector can be easily extended by implementing your own version of any of the components below.

These are better understood by looking at the source task implementation:

public List<SourceRecord> poll() throws InterruptedException {

    throttler.throttle(offset.getTimestamp().orElseGet(Instant::now));

    HttpRequest request = requestFactory.createRequest(offset);

    HttpResponse response = requestExecutor.execute(request);

    List<SourceRecord> records = responseParser.parse(response);

    List<SourceRecord> unseenRecords = recordSorter.sort(records).stream()
            .filter(recordFilterFactory.create(offset))
            .collect(toList());

    confirmationWindow = new ConfirmationWindow<>(extractOffsets(unseenRecords));

    return unseenRecords;
}

public void commitRecord(SourceRecord record, RecordMetadata metadata) {
    confirmationWindow.confirm(record.sourceOffset());
}

public void commit() {
    offset = confirmationWindow.getLowWatermarkOffset()
            .map(Offset::of)
            .orElse(offset);
}

Controls the rate at which HTTP requests are performed by informing the task, how long until the next execution is due.

http.timer

public interface Timer extends Configurable {

    Long getRemainingMillis();

    default void reset(Instant lastZero) {
        // Do nothing
    }
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.timer.AdaptableIntervalTimer
  • Available implementations:
    • com.github.castorm.kafka.connect.timer.FixedIntervalTimer
    • com.github.castorm.kafka.connect.timer.AdaptableIntervalTimer

Throttling HttpRequest with FixedIntervalThrottler

Throttles rate of requests based on a fixed interval.

http.timer.interval.millis

Interval in between requests

  • Type: Long
  • Default: 60000

Throttling HttpRequests with AdaptableIntervalThrottler

Throttles rate of requests based on a fixed interval. It has, however, two modes of operation, with two different intervals:

  • Up to date No new records in last poll, or there were new records, but "recently" created (shorter than interval)
  • Catching up There were new records in last poll, but they were created "long ago" (longer than interval)
http.timer.interval.millis

Interval in between requests when up-to-date

  • Type: Long
  • Default: 60000
http.timer.catchup.interval.millis

Interval in between requests when catching up

  • Type: Long
  • Default: 30000

The first thing our connector will need to do is creating a HttpRequest.

http.request.factory

public interface HttpRequestFactory extends Configurable {

    HttpRequest createRequest(Offset offset);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory
  • Available implementations:
    • com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory

http.offset.initial

Initial offset, comma separated list of pairs.

  • Example: property1=value1, property2=value2
  • Type: String
  • Default: ""

Creating a HttpRequest with TemplateHttpRequestFactory

This HttpRequestFactory is based on template resolution.

http.request.method

Http method to use in the request.

  • Type: String
  • Default: GET
http.request.url

Http url to use in the request.

  • Required
  • Type: String
http.request.headers

Http headers to use in the request, , separated list of : separated pairs.

  • Example: Name: Value, Name2: Value2
  • Type: String
  • Default: ""
http.request.params

Http query parameters to use in the request, & separated list of = separated pairs.

  • Example: name=value & name2=value2
  • Type: String
  • Default: ""
http.request.body

Http body to use in the request.

  • Type: String
  • Default: ""
http.request.template.factory
public interface TemplateFactory {

    Template create(String template);
}

public interface Template {

    String apply(Offset offset);
}

Class responsible for creating the templates that will be used on every request.

  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.request.template.freemarker.BackwardsCompatibleFreeMarkerTemplateFactory
  • Available implementations:
    • com.github.castorm.kafka.connect.http.request.template.freemarker.BackwardsCompatibleFreeMarkerTemplateFactory Implementation based on FreeMarker which accepts offset properties without offset namespace (Deprecated)
    • com.github.castorm.kafka.connect.http.request.template.freemarker.FreeMarkerTemplateFactory Implementation based on FreeMarker
    • com.github.castorm.kafka.connect.http.request.template.NoTemplateFactory
Creating a HttpRequest with FreeMarkerTemplateFactory

FreeMarker templates will have the following data model available:

  • offset
    • key
    • timestamp (as ISO8601 string, e.g.: 2020-01-01T00:00:00Z)
    • ... (custom offset properties)

Accessing any of the above withing a template can be achieved like this:

http.request.params=after=${offset.timestamp}

For an Epoch representation of the same string, FreeMarker built-ins should be used:

http.request.params=after=${offset.timestamp?datetime.iso?long}

For a complete understanding of the features provided by FreeMarker, please, refer to the User Manual


Once our HttpRequest is ready, we have to execute it to get some results out of it. That's the purpose of the HttpClient

http.client

public interface HttpClient extends Configurable {

    HttpResponse execute(HttpRequest request) throws IOException;
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.client.okhttp.OkHttpClient
  • Available implementations:
    • com.github.castorm.kafka.connect.http.client.okhttp.OkHttpClient

Executing a HttpRequest with OkHttpClient

Uses a OkHttp client.

http.client.connection.timeout.millis

Timeout for opening a connection

  • Type: Long
  • Default: 2000
http.client.read.timeout.millis

Timeout for reading a response

  • Type: Long
  • Default: 2000
http.client.connection.ttl.millis

Time to live for the connection

  • Type: Long
  • Default: 300000
http.client.proxy.host

Hostname of the HTTP Proxy

  • Type: String
  • Default: ``
http.client.proxy.port

Port of the HTTP Proxy

  • Type: Integer
  • Default: 3128
http.client.proxy.username

Username of the HTTP Proxy

  • Type: String
  • Default: ``
http.client.proxy.password

Password of the HTTP Proxy

  • Type: String
  • Default: ``

When executing the request, authentication might be required. The HttpAuthenticator is responsible for resolving the Authorization header to be included in the HttpRequest.

http.auth

public interface HttpAuthenticator extends Configurable {

    Optional<String> getAuthorizationHeader();
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.auth.ConfigurableHttpAuthenticator
  • Available implementations:
    • com.github.castorm.kafka.connect.http.auth.ConfigurableHttpAuthenticator
    • com.github.castorm.kafka.connect.http.auth.NoneHttpAuthenticator
    • com.github.castorm.kafka.connect.http.auth.BasicHttpAuthenticator

Authenticating with ConfigurableHttpAuthenticator

Allows selecting the authentication type via configuration property

http.auth.type

Type of authentication

  • Type: Enum { None, Basic }
  • Default: None

Authenticating with BasicHttpAuthenticator

Allows selecting the authentication type via configuration property

http.auth.user
  • Type: String
  • Default: ""
http.auth.password
  • Type: String
  • Default: """

Once our HttpRequest has been executed, as a result we'll have to deal with a HttpResponse and translate it into the list of SourceRecords expected by Kafka Connect.

http.response.parser

public interface HttpResponseParser extends Configurable {

    List<SourceRecord> parse(HttpResponse response);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.response.PolicyHttpResponseParser
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.PolicyHttpResponseParser
    • com.github.castorm.kafka.connect.http.response.KvHttpResponseParser

Parsing with PolicyHttpResponseParser

Vets the HTTP response deciding whether the response should be processed, skipped or failed. This decision is delegated to a HttpResponsePolicy. When the decision is to process the response, this processing is delegated to a secondary HttpResponseParser.

HttpResponsePolicy: Vetting a HttpResponse
http.response.policy
public interface HttpResponsePolicy extends Configurable {

    HttpResponseOutcome resolve(HttpResponse response);

    enum HttpResponseOutcome {
        PROCESS, SKIP, FAIL
    }
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.response.StatusCodeHttpResponsePolicy
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.StatusCodeHttpResponsePolicy
http.response.policy.parser
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.response.KvHttpResponseParser
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.KvHttpResponseParser
Vetting with StatusCodeHttpResponsePolicy

Does response vetting based on HTTP status codes in the response and the configuration below.

http.response.policy.codes.process

Comma separated list of code ranges that will result in the parser processing the response

  • Example: 200..205, 207..210
  • Type: String
  • Default: 200..299
http.response.policy.codes.skip

Comma separated list of code ranges that will result in the parser skipping the response

  • Example: 300..305, 307..310
  • Type: String
  • Default: 300..399

Parsing with KvHttpResponseParser

Parses the HTTP response into a key-value SourceRecord. This process is decomposed in two steps:

  • Parsing the HttpResponse into a KvRecord
  • Mapping the KvRecord into a SourceRecord
http.response.record.parser
public interface KvRecordHttpResponseParser extends Configurable {

    List<KvRecord> parse(HttpResponse response);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParser
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.jackson.JacksonKvRecordHttpResponseParser
http.response.record.mapper
public interface KvSourceRecordMapper extends Configurable {

    SourceRecord map(KvRecord record);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapper
  • Available implementations:
    • com.github.castorm.kafka.connect.http.record.SchemedKvSourceRecordMapper Maps key to a Struct schema with a single property key, and value to a Struct schema with a single property value
    • com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper Maps both key and value to a String schema
Parsing with JacksonKvRecordHttpResponseParser

Uses Jackson to look for the records in the response.

http.response.list.pointer

JsonPointer to the property in the response body containing an array of records

  • Example: /items
  • Type: String
  • Default: /
http.response.record.pointer

JsonPointer to the individual record to be used as kafka record body. Useful when the object we are interested in is under a nested structure

  • Type: String
  • Default: /
http.response.record.offset.pointer

Comma separated list of key=/value pairs where the key is the name of the property in the offset, and the value is the JsonPointer to the value being used as offset for future requests. This is the mechanism that enables sharing state in between HttpRequests. HttpRequestFactory implementations receive this Offset.

Special properties:

  • key is used as record's identifier, used for de-duplication and topic partition routing
  • timestamp is used as record's timestamp, used for de-duplication and ordering

One of the roles of the offset, even if not required for preparing the next request, is helping in deduplication of already seen records, by providing a sense of progress, assuming consistent ordering. (e.g. even if the response returns some repeated results in between requests because they have the same timestamp, anything prior to the last seen offset will be ignored). see OffsetFilterFactory

  • Example: id=/itemId
  • Type: String
  • Default: ""
http.response.record.timestamp.parser

Class responsible for converting the timestamp property captured above into a java.time.Instant.

  • Type: String
  • Default: com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisOrDelegateTimestampParser
  • Available implementations:
    • com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisTimestampParser Implementation that captures the timestamp as an epoch millis long
    • com.github.castorm.kafka.connect.http.response.timestamp.EpochMillisOrDelegateTimestampParser Implementation that tries to capture as epoch millis or delegates to another parser in case of failure
    • com.github.castorm.kafka.connect.http.response.timestamp.DateTimeFormatterTimestampParser Implementation based on based on a DateTimeFormatter
    • com.github.castorm.kafka.connect.http.response.timestamp.NattyTimestampParser Implementation based on Natty parser
    • com.github.castorm.kafka.connect.http.response.timestamp.RegexTimestampParser Implementation that extracts substring from timestamp column and parse it
http.response.record.timestamp.parser.pattern

When using DateTimeFormatterTimestampParser, a custom pattern can be specified

  • Type: String
  • Default: yyyy-MM-dd'T'HH:mm:ss[.SSS]X
http.response.record.timestamp.parser.zone

Timezone of the timestamp. Accepts ZoneId valid identifiers

  • Type: String
  • Default: UTC
http.response.record.timestamp.parser.regex

When using RegexTimestampParser, a custom regex pattern can be specified

  • Type: String
  • Default: .*
http.response.record.timestamp.parser.regex.delegate

When using RegexTimestampParser, a delegate class to parse timestamp

  • Type: Class
  • Default: DateTimeFormatterTimestampParser

Once we have our KvRecord we have to translate it into what Kafka Connect is expecting: SourceRecords

Embeds the record properties into a common simple envelope to enable schema evolution. This envelope simply contains a key and a value properties with customizable field names.

Here is also where we'll tell Kafka Connect to what topic and on what partition do we want to send our record.

** It's worth noticing there are projects out there that allow you to infer the schema from your json document. (e.g. expandjsonsmt)

kafka.topic

Name of the topic where the record will be sent to

  • Required
  • Type: String
  • Default: ""
http.record.schema.key.property.name

Name of the key property in the key-value envelope

  • Type: String
  • Default: key
http.record.schema.value.property.name

Name of the value property in the key-value envelope

  • Type: String
  • Default: value

Some Http resources not designed for CDC, return snapshots with most recent records first. In this cases de-duplication is especially important, as subsequent request are likely to produce similar results. The de-duplication mechanisms offered by this connector are order-dependent, as they are usually based on timestamps.

To enable de-duplication in cases like this, we can instruct the connector to assume a specific order direction, either ASC, DESC, or IMPLICIT, where implicit figures it out based on records' timestamps.

http.record.sorter

public interface SourceRecordSorter extends Configurable {

    List<SourceRecord> sort(List<SourceRecord> records);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter
  • Available implementations:
    • com.github.castorm.kafka.connect.http.record.OrderDirectionSourceRecordSorter

http.response.list.order.direction

Order direction of the results in the response list.

  • Type: Enum { ASC, DESC, IMPLICIT }
  • Default: IMPLICIT

There are cases when we'll be interested in filtering out certain records. One of these would be de-duplication.

http.record.filter.factory

public interface SourceRecordFilterFactory extends Configurable {

    Predicate<SourceRecord> create(Offset offset);
}
  • Type: Class
  • Default: com.github.castorm.kafka.connect.http.record.OffsetRecordFilterFactory
  • Available implementations:
    • com.github.castorm.kafka.connect.http.record.OffsetRecordFilterFactory
    • com.github.castorm.kafka.connect.http.record.OffsetTimestampRecordFilterFactory
    • com.github.castorm.kafka.connect.http.record.PassthroughRecordFilterFactory

Filtering out SourceRecord with OffsetTimestampRecordFilterFactory

De-duplicates based on Offset's timestamp, filtering out records with earlier or the same timestamp. Useful when timestamp is used to filter the HTTP resource, but the filter does not have full timestamp precision. Assumptions:

  • Records are ordered by timestamp
  • No two records can contain the same timestamp (to whatever precision the HTTP resource uses)

If the latter assumption cannot be satisfied, check OffsetRecordFilterFactory to try and prevents data loss.

Filtering out SourceRecord with OffsetRecordFilterFactory

De-duplicates based on Offset's timestamp, key and any other custom property present in the Offset, filtering out records with earlier timestamps, or when in the same timestamp, only those up to the last seen Offset properties. Useful when timestamp alone is not unique but together with some other Offset property is. Assumptions:

  • Records are ordered by timestamp
  • There is an Offset property that uniquely identify records (e.g. key)
  • There won't be new items preceding already seen ones

Development

Building

mvn package

Debugging

Using Pre-configured docker setup

You can easily run a Kafka Connect cluster with kafka-connect-http pre-installed by executing:

mvn verify -Pdebug -DskipTests

It'll run dockerized versions of kafka and kafka-connect which you can access via REST API or attach debuggers to the url printed in console:

Kafka Connect testcontainers infra is ready
  Rest API: http://localhost:33216
  Debug agent: localhost:33217

Right after, it'll allow you to specify the file path to your connector's json configuration:

Introduce the path to your connector JSON configuration file:

It'll subscribe to the corresponding kafka topic, printing every message going through the output topic of your connector.

Using Kafka Connect standalone

These instructions are phrased in terms of the steps needed when using IntelliJ, but other integrated development environments are likely to be similar.

Point the Kafka stand-alone plugin.path at the module compile Output path. Assuming you are using the default Maven project import, this is the ./target directory, so the config/connect-standalone.properties file would contain the line

plugin.path=<directory where git clone was executed>/kafka-connect-http/kafka-connect-http/target

In the Run/Debug Configurations dialog, create a new Remote JVM Debug configuration with the mode Attach to remote JVM. When remote debugging, some Java parameters need to be specified when the program is executed. Fortunately there are hooks in the Kafka shell scripts to accommodate this. The Remote JVM Debug configuration specifies the needed Command line arguments for remote JVM. In the terminal console where you execute the connect command line, define KAFKA_DEBUG and JAVA_DEBUG_OPTS as:

export KAFKA_DEBUG=true
export JAVA_DEBUG_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005

Place a suitable breakpoint in the kafka-connect-http code, e.g. in HttpSourceTask.start(), and launch the standalone connect program:

bin/connect-standalone.sh config/connect-standalone.properties plugins/<kafka-connect-http properties file>

Click the Debug icon in IntelliJ and ensure the debugger console says Connected to the target VM, address: 'localhost:5005', transport: 'socket' and the breakpoint you placed becomes checked. The program should now break when the breakpoint is hit.

Running the tests

mvn test

Releasing

mvn release:clean release:prepare

Contributing

Contributions are welcome via pull requests, pending definition of code of conduct, please just follow existing conventions.

Versioning

We use SemVer for versioning.

License

This project is licensed under the Apache 2.0 License - see the LICENSE.txt file for details

Built With

Acknowledgments

kafka-connect-http's People

Contributors

akshatjindal1 avatar castorm avatar codacy-badger avatar dependabot-preview[bot] avatar dependabot[bot] avatar hackedd avatar lzuchowska avatar poulinjulien avatar ueisele avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-http's Issues

DepShield encountered errors while building your project

The project could not be analyzed because of build errors. Please review the error messages here. Another build will be scheduled when a change to a manifest file* occurs. If the build is successful this issue will be closed, otherwise the error message will be updated.

This is an automated GitHub Issue created by Sonatype DepShield. GitHub Apps, including DepShield, can be managed from the Developer settings of the repository administrators.

* Supported manifest files are: pom.xml, package.json, package-lock.json, npm-shrinkwrap.json, Cargo.lock, Cargo.toml, main.rs, lib.rs, build.gradle, build.gradle.kts, settings.gradle, settings.gradle.kts, gradle.properties, gradle-wrapper.properties, go.mod, go.sum

Maven build failed

Describe the bug
mvn clean install failed

To Reproduce
simply run : mvn clean install

Expected behavior
success but failed, mvn compile succeed.

Screenshots
INFO] Building zip: /Users/che/develop/kafka-connect-http/kafka-connect-http/target/components/packages/castorm-kafka-connect-http-0.8.12-SNAPSHOT.zip
[INFO]
[INFO] --- jar:3.3.0:jar (default-jar) @ kafka-connect-http ---
[INFO] Building jar: /Users/che/develop/kafka-connect-http/kafka-connect-http/target/kafka-connect-http-0.8.12-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 3.894 s
[INFO] Finished at: 2023-09-14T15:34:44-07:00
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-jar-plugin:3.3.0:jar (default-jar) on project kafka-connect-http: You have to use a classifier to attach supplemental artifacts to the project instead of replacing them. -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-jar-plugin:3.3.0:jar (default-jar) on project kafka-connect-http: You have to use a classifier to attach supplemental artifacts to the project instead of replacing them.
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute2 (MojoExecutor.java:333)
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:316)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:212)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:174)
at org.apache.maven.lifecycle.internal.MojoExecutor.access$000 (MojoExecutor.java:75)
at org.apache.maven.lifecycle.internal.MojoExecutor$1.run (MojoExecutor.java:162)
at org.apache.maven.plugin.DefaultMojosExecutionStrategy.execute (DefaultMojosExecutionStrategy.java:39)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:159)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:105)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:73)
at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:53)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:118)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:261)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:173)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:101)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:906)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:283)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:206)
at jdk.internal.reflect.DirectMethodHandleAccessor.invoke (DirectMethodHandleAccessor.java:104)
at java.lang.reflect.Method.invoke (Method.java:578)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:283)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:226)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:407)
at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:348)
Caused by: org.apache.maven.plugin.MojoExecutionException: You have to use a classifier to attach supplemental artifacts to the project instead of replacing them.
at org.apache.maven.plugins.jar.AbstractJarMojo.execute (AbstractJarMojo.java:315)
at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo (DefaultBuildPluginManager.java:126)
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute2 (MojoExecutor.java:328)
at org.apache.maven.lifecycle.internal.MojoExecutor.doExecute (MojoExecutor.java:316)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:212)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:174)
at org.apache.maven.lifecycle.internal.MojoExecutor.access$000 (MojoExecutor.java:75)
at org.apache.maven.lifecycle.internal.MojoExecutor$1.run (MojoExecutor.java:162)
at org.apache.maven.plugin.DefaultMojosExecutionStrategy.execute (DefaultMojosExecutionStrategy.java:39)
at org.apache.maven.lifecycle.internal.MojoExecutor.execute (MojoExecutor.java:159)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:105)
at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject (LifecycleModuleBuilder.java:73)
at org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build (SingleThreadedBuilder.java:53)
at org.apache.maven.lifecycle.internal.LifecycleStarter.execute (LifecycleStarter.java:118)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:261)
at org.apache.maven.DefaultMaven.doExecute (DefaultMaven.java:173)
at org.apache.maven.DefaultMaven.execute (DefaultMaven.java:101)
at org.apache.maven.cli.MavenCli.execute (MavenCli.java:906)
at org.apache.maven.cli.MavenCli.doMain (MavenCli.java:283)
at org.apache.maven.cli.MavenCli.main (MavenCli.java:206)
at jdk.internal.reflect.DirectMethodHandleAccessor.invoke (DirectMethodHandleAccessor.java:104)
at java.lang.reflect.Method.invoke (Method.java:578)
at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced (Launcher.java:283)
at org.codehaus.plexus.classworlds.launcher.Launcher.launch (Launcher.java:226)
at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode (Launcher.java:407)
at org.codehaus.plexus.classworlds.launcher.Launcher.main (Launcher.java:348)
[ERROR]
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
Kafka Connect:

  • Version [e.g. 22]

Plugin:

  • Version [e.g. 22]

Additional context
Add any other context about the problem here.

Does the connector support HTTPS connection

Hey @castorm,

I was looking in the docs and I see entry regarding HTTPS support. I was wondering does the connector support TLS/SSL request?
If you never tested, I can do it on my side and update the documenation accordingly.

The reason of the question, we want to poll ES with your connector and we are wondering if this would be feasible with TLS/SSL support without updating the HTTP client.

Regards,
Gil

version values are not loaded

Describe the bug
Error loading version.properties (com.github.castorm.kafka.connect.common.VersionUtils)

To Reproduce
Download the zip and copy to a folder use the folder path as plugins to Kafka connector. Start Connector after giving the necessary properties in the config files.

Expected behavior
Connection should be established to the Kafka topic mentioned in the properties file
castorm_version_error

Offset isn't committed directly

Describe the bug
For some reason, offsets aren't committed immediately, which lets the connector use the initial timestamp forever. Upon further logs, I found that the method commit (which is overridden) is never called.

My logs show that the timestamp is successfully extracted from the data, but it is never replaced, so the initial timestamp is always used.

[2021-05-19 13:16:39,116] INFO Record timestamp: 2021-05-19T00:00:00Z (com.github.castorm.kafka.connect.http.HttpSourceTask)
[2021-05-19 13:16:39,116] INFO Request for offset {timestamp=2020-01-01T00:00:01Z} yields 1/1 new records (com.github.castorm.kafka.connect.http.HttpSourceTask)
[2021-05-19 13:16:39,119] INFO Commit record timestamp: 2021-05-19T00:00:00Z (com.github.castorm.kafka.connect.http.HttpSourceTask)
...
[2021-05-19 13:16:39,116] INFO Request for offset {timestamp=2020-01-01T00:00:01Z} yields 1/1 new records (com.github.castorm.kafka.connect.http.HttpSourceTask)

Expected behavior
Offset committed after each record.

How to transform with REST API

Hello,
I've been struggling with applying the ExtractField transform into a simple REST API response.
This is the REST API response:

    "d": {
        "results": [
            {
                "__metadata": {
                    "uri": "https://some_uri.com",
                    "type": "c4codata.ServiceRequest",
                    "etag": "W/\"datetimeoffset'2021-10-26T10%3A21%3A19.3991180Z'\""
                },
                "ObjectID": "00163E0A47311ED89AC41CD2E1EF8932",
                "ProductRecipientPartyID": "000000000000000000000000000000000000000000000000000013597241",

With the configuration file containing this property:
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
When I try to extract a field, I get an error: Unknown field: fieldname or Expected Struct, found String.
How should I work with transforms on fields, when the REST API returns such a JSON?
Is there a way to get into deeper fields?
I've looked at issue #19 , however this solution didn't work for me either.
What I want to extract is the whole value list, without the __metadata field, and to flatten some specific fields.
Expected result:

{ "ChangedByCustomerIndicator" : false,
  "ResponseByProcessorDateTimeContent" : "", ... }

The 'base' output, without any transforms, value converters or response mappers:

{
  "value": "{\"__metadata\":{\"uri\":\"https://some_uri.com\",\"type\":\"c4codata.ServiceRequest\",\"etag\":\"W/\\\"datetimeoffset'2021-10-19T10%3A11%3A42.3716190Z'\\\"\"},\"ObjectID\":\"00163EADAC081EEC8C988DA861144CDB\",\"ProductRecipientPartyID\":\"000000000000000000000000000000000000000000000000000014555600\", .... }}",
  "key": {
    "string": "00163EADAC081EEC8C988DA861144CDB"
  },
  "timestamp": {
    "long": 1634638091225
  }
}

The result I have managed to achieve by adding:

"http.response.record.mapper": "com.github.castorm.kafka.connect.http.record.StringKvSourceRecordMapper",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
{
  "__metadata": {
    "uri": "https://some_uri.com",
    "type": "c4codata.ServiceRequest",
    "etag": "W/\"datetimeoffset'2021-10-28T10%3A06%3A09.6248630Z'\""
  },
  "ObjectID": "00163E0A47311ED89AC41CD2E1EF8932",
  "ProductRecipientPartyID": "000000000000000000000000000000000000000000000000000010005577",
  "ProductRecipientPartyUUID": "888888-111111-000000-222222",
  "ProductRecipientPartyName": "DFJSOWJRFNLDARI02URJE",
  "IncidentServiceIssueCategoryID": "VAPROGIJAL",

Duplicated records with low `http.timer.interval.millis`

Describe the bug
When the http.timer.interval.millis value is lower than the offset.flush.interval.ms kafka connect worker setting, the records in topic are duplicated every http.timer.interval.millis until offset is commited.

To Reproduce
Set http.timer.interval.millis connector config to 1000 ms.
Use default http.timer.interval.millis worker setting of 60000 ms.

Expected behavior
Records should appear once in the target topic.
For the poll, the offset from the last committed record should be used.
Updating the offset value should be done at the start of each poll instead of during commit.

Kafka Connect:

  • Version 3.4.0

Plugin:

  • Version 0.8.11

Additional context
Add any other context about the problem here.

Broken link: The "See examples" link on https://castorm.github.io/kafka-connect-http/ is broken

Describe the bug
On https://castorm.github.io/kafka-connect-http, the example section says

Examples

See [examples](https://castorm.github.io/kafka-connect-http/examples), e.g.

Clicking on https://castorm.github.io/kafka-connect-http/examples brings me to a 404.

To Reproduce
On https://castorm.github.io/kafka-connect-http, click on the word "examples".

Expected behavior
I expected that it would bring me to a top-level examples page.

Screenshots
click
404

Additional context
Documentation broken link.

Maven javadoc build failes when using JDK 11

Describe the bug
Uninstall JDK 1.8. Install JDK 11.0.7. Perform a "maven package" or "maven install". Javadoc build fails.

Found fix
Add:
<configuration> <source>8</source> </configuration>

To the <artifactId>maven-javadoc-plugin</artifactId> section of the pom.xml file.

Project now builds successfully using JDK 11.

Plugin:
Version 0.7.4

Enable HttpSourceConnector partitioning

Use case

As a user
I want to perform several requests with a different set of parameters (partitions) against the same API from the same connector
So that the number of running connectors doesn't grow linearly respect to the number of partitions

Is pre-authentication against a second url supported?

Hi,
As per our architecture each of our API needs an access token to access the API data. This means that I have to first hit the security API which gives me access token and then the main API which gives me data back. Does this connector support this?

For eg:

  1. Hit https:///security-api/api (This gives me back access token)
  2. Hit the Resource API https:///customers (Have to pass access token from the first API into header of this API)

Confluent Hub Update

Describe the bug
I notice that the version published on Confluent Hub is 0.6.1 and not the latest version

Could you publish the latest version of the plugin, using Confluence Hub to install connectors enables a simpler workflow for Docker Image generation.

Great connector BTW.

How to map the individual JSON fields to topic fields?

I may be missing something in the documentation, and if so, my apologies, but when I get the data, and put it into the topic, has all of the JSON in a VALUE field. I would like to map the individual JSON fields to their own topic message fields. Is that possible with this connector?

Node not found - parse error - flat arry

The data payload coming from the HTTP Endpoint does not have a "root" level key. The JSON array is flat.

[{key:value, key:value, key:value} , {key:value, key:value, key:value} ]

Error thrown is :

[2024-02-02 13:29:29,257] ERROR WorkerSourceTask{id=netreo-beta-test-2.http.source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:191)
java.lang.IllegalArgumentException: No node at '/create_time' (unmatched part: '/create_time')

Here is my config:

{
"name": "netreo-beta-test-2.http.source",
"config": {
"connector.class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
"tasks.max": "1",
"http.offset.initial": "timestamp=2020-05-08T07:55:44Z",
"http.request.params": "jql=create_time>=${offset.timestamp?datetime.iso?long}",
"http.request.url": "https://qa-system/fw/index.php?r=restful/devices/list",
"http.request.headers": "Accept: application/json",
"http.auth.type": "Basic",
"http.auth.password": "PASSWORD",
"http.response.record.offset.pointer": "key=/, timestamp=/create_time",
"http.timer.interval.millis": "30000",
"http.timer.catchup.interval.millis": "1000",
"kafka.topic": "beta-test-2"
}
}
~

Make timestamp parser able to use optional property?

In our datasource, we have two timestamps, "createdAt" and "updatedAt". Problem is that "updatedAt" is not available on new objects. They only get that property once they have been save at least once after their creation.

So, using the following offset pointer will break on new objects, but work fine otherwise.
"http.response.record.offset.pointer": "key=/id, timestamp=/updatedAt"

Would it be possible override some of the classes to come around this?

Debugging in Intellij

A quick question . How do I debug this in Intellij without exporting as jar everytime to libs folder of Kafka. Iam using apache kafka. I want to check the code flow in Intellij. Is there anyway to do this.

Http source or sink not taking HTTP PROXY define in system level in REDHAT 8

Is your feature request related to a problem? Please describe.
IN staging with open internet this configuration is working but in pre-prod with limited internet with http_proxy IP/Port defined
its not taking it. due to this its not able to connect to the right destination/source.
Note: With cURL command i'm able to get he expected response but with http source/sink connector getting below error:
java.net.SocketTimeoutException: connect time out

Describe the solution you'd like
Could be add below configuration
#################################
http.proxy.host=localhost
http.proxy.port=3128
http.proxy.user=proxyuser
http.proxy.password=proxypassword
###################################

Describe alternatives you've considered
With cURL command its working

http_source_error

HTTP Streams

Is your feature request related to a problem? Please describe.
I haven't seen an option on this connector for infinitely streaming data. An example could be found here on the Twitter API, that sends a gzipped stream of tweets: https://developer.twitter.com/en/docs/twitter-api/enterprise/powertrack-api/api-reference/powertrack-stream#Stream

Describe the solution you'd like
Perhaps a boolean option upon creation of the connector to say that were connecting to an infinitie stream of data. Additionally maybe another option to support converting data formats such as gzip.

Describe alternatives you've considered
This appears to be an unmaintained version, only supporting an older Twitter streaming API: https://github.com/jcustenborder/kafka-connect-twitter

Additional context
I can see a lot of people using this connector if it's able to support HTTP streaming, such as the Twitter APIs

JIRA example with propertie file

Describe the bug
I tried to use the JIRA example with a standalone connect worker and a locally hosted JIRA. During the start-up, the templating seems to fails (the following has evaluated to null or missing: ==> timestamp)

[2020-07-07 17:36:46,617] INFO WorkerSourceTask{id=sample-search-issues.jira.http.source-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:214)
[2020-07-07 17:36:47,256] ERROR Error executing FreeMarker template (freemarker.runtime:60)
FreeMarker template error:
The following has evaluated to null or missing:
==> timestamp  [in template "a5a5f524-92d3-4366-8836-200eeaa80848" at line 1, column 17]

----
Tip: If the failing expression is known to legally refer to something that's sometimes null or missing, either specify a default value like myOptionalVar!myDefault, or use <#if myOptionalVar??>when-present<#else>when-missing</#if>. (These only cover the last step of the expression; to cover the whole expression, use parenthesis: (myOptionalVar.foo)!myDefault, (myOptionalVar.foo)??
----

----
FTL stack trace ("~" means nesting-related):
        - Failed at: ${timestamp?datetime.iso?string["yyyy...  [in template "a5a5f524-92d3-4366-8836-200eeaa80848" at line 1, column 15]
----

Java stack trace (for programmers):
----
freemarker.core.InvalidReferenceException: [... Exception message was already printed; see it above ...]
        at freemarker.core.InvalidReferenceException.getInstance(InvalidReferenceException.java:134)
        at freemarker.core.EvalUtil.coerceModelToTextualCommon(EvalUtil.java:481)
        at freemarker.core.EvalUtil.coerceModelToPlainText(EvalUtil.java:455)
        at freemarker.core.Expression.evalAndCoerceToPlainText(Expression.java:117)
        at freemarker.core.BuiltInsForMultipleTypes$dateBI._eval(BuiltInsForMultipleTypes.java:253)
        at freemarker.core.Expression.eval(Expression.java:101)
        at freemarker.core.Dot._eval(Dot.java:41)
        at freemarker.core.Expression.eval(Expression.java:101)
        at freemarker.core.BuiltInsForMultipleTypes$stringBI._eval(BuiltInsForMultipleTypes.java:765)
        at freemarker.core.Expression.eval(Expression.java:101)
        at freemarker.core.DynamicKeyName._eval(DynamicKeyName.java:61)
        at freemarker.core.Expression.eval(Expression.java:101)
        at freemarker.core.DollarVariable.calculateInterpolatedStringOrMarkup(DollarVariable.java:100)
        at freemarker.core.DollarVariable.accept(DollarVariable.java:63)
        at freemarker.core.Environment.visit(Environment.java:334)
        at freemarker.core.Environment.visit(Environment.java:340)
        at freemarker.core.Environment.process(Environment.java:313)
        at freemarker.template.Template.process(Template.java:383)
        at com.github.castorm.kafka.connect.http.request.template.freemarker.FreeMarkerTemplateFactory.apply(FreeMarkerTemplateFactory.java:55)
        at com.github.castorm.kafka.connect.http.request.template.freemarker.FreeMarkerTemplateFactory.lambda$create$0(FreeMarkerTemplateFactory.java:44)
        at com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory.createRequest(TemplateHttpRequestFactory.java:64)
        at com.github.castorm.kafka.connect.http.HttpSourceTask.poll(HttpSourceTask.java:97)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[2020-07-07 17:36:47,263] INFO WorkerSourceTask{id=sample-search-issues.jira.http.source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:424)
[2020-07-07 17:36:47,263] INFO WorkerSourceTask{id=sample-search-issues.jira.http.source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:441)
[2020-07-07 17:36:47,263] ERROR WorkerSourceTask{id=sample-search-issues.jira.http.source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
FreeMarker template error:
The following has evaluated to null or missing:
==> timestamp  [in template "a5a5f524-92d3-4366-8836-200eeaa80848" at line 1, column 17]

To Reproduce
I used the provided JIRA example (JSON) to derive the following property file:

name=sample-search-issues.jira.http.source
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
tasks.max=1
http.offset.initial="timestamp=2020-05-08T07:55:44Z"
http.request.url=http://localhost:8080/rest/api/2/search
http.request.headers="Authorization: Basic xyz, Accept: application/json, Accept-Encoding: *"
http.request.params="jql=updated>=${timestamp?datetime.iso?string['yyyy/MM/dd HH:mm']} ORDER BY updated ASC&maxResults=100"
http.response.list.pointer=/issues
http.response.record.key.pointer=/id
http.response.record.timestamp.pointer=/fields/updated
http.throttler.interval.millis=30000
http.throttler.catchup.interval.millis=1000
kafka.topic=jira

Expected behavior
"CDC-like" polling of JIRA API

Kafka Connect:

  • Version 2.5

Plugin

  • castorm-kafka-connect-http-0.7.6

Authorization header

When setting an Authorization header:

http.request.headers=Authorization=Bearer xxxxxxxx

i got this error:

java.lang.IllegalStateException: Incomplete pair: Authorization=Bearer xxxxxxxx

I tried also enclosing the value in " or '.
The http service i'm scraping allows to pass the authorization token also as http argument, and it's working this way.
Thanks!

Each JSON response is packed into "payload.value" as an invalid JSON string

Describe the bug
The JSON documents returned by the REST API call are recorded to the Kafka topic in a single field of "payload.value" with extra backslashes and double quote characters. This behavior renders the string invalid JSON when passed to Elasticsearch and proper document field:value pairs are not created.

To Reproduce
I didn't track down the same exact record through the process. Examples are attached.

Use this source config:
nytimes_source_connector_config.txt

Getting this JSON response:
nytimes_response.txt

An example entry in the Kafka topic is created:
kafka_topic_entry.txt

Use this sink config:
nytime_sink_connector_config.txt

An example Elasticsearch document is created that is unparsed:
elasticsearch_document.txt

Expected behavior
The JSON responses are stored in the Kafka topic as valid JSON without the extra double quotes and backslashes that turn the response JSON into a very long string.

Kafka Connect:
Using the confluent-platform-2.12. All components are at version 5.5.0

Plugin:
Self complied 0.7.1 using Java JDK-1.8.0_251

Additional context
Perhaps I am simply misunderstanding how to properly configure the connector. I've tried many variations of configuration options and SMT's to try and work around this behavior.

Is it possible to deal with pagination?

Hello,
I started to extract data from different APIs and I ran into an endpoint that use pagination in this way:

JSON response:

{
  "success":true,
  "data":[
    {},
    {},
    ...
  ],
  "additional_data":{
    "pagination":{
      "start":0,
      "limit":100,
      "more_items_in_collection":true,
      "next_start":100
    }
  }
}

The endpoint allows to set the 'start' and the 'limit' in the GET request, but I can't set these elements as the offset in the connector conf because my list pointer is 'data'. Is there a way to deal with pagination like this? If not, I'll be grateful if there are some suggestions to implement this feature.

Thanks.

Add Bearer Token support for HTTP authentication

Is your feature request related to a problem? Please describe.
Now it only support Basic authentication, does not support Bearer Token.

Describe the solution you'd like
Add the BearerHttpAuthenticator and its config

Describe alternatives you've considered

Additional context

Setting up the http.request.body field

Hello!

First, thank you for the library, that's really great to have a free version of HTTP connector (compared to the one from Confluent which is paid). My first question is, how can I put the message's payload into the body of the request. I'm expecting to have something like:

"http.request.body": "${message.payload}"

or similar (assuming that the messages from a topic are in JSON). But this doesn't work.

The more general question is, how can I know all the possible fields that I reference in the template? I know you use Freemaker as a template engine, but what is the context for it?

Thank you,
Ivan

Repeated API calls upon data stream update exceeding API quota

Describe the bug
Am now getting locked out of the NY Times API with a 429 response code and "Rate limit quota violation. Quota limit exceeded" error.

To Reproduce
Use the following source connector config
nytimes_connector_quota_problem_config.txt

Expected behavior
To not exceed the API's rate limit.

Kafka Connect:

  • Version 5.5.0

Plugin:

  • Version 0.7.5

Additional context
The throttle limit for the NY Times is up to 10 times per minute or 4000 times per day. I created a new app to get a new API key and waited a full day to ensure I hadn't hit the daily limit for some reason. I also turned up the throttling to poll only once every 5 min. I think that when NYTimes updates it's data stream, for some reason the connector is hitting the API endpoint more than 10 times in a minute.

Unable to add kafka-connect-http as a Gradle dependency

I am unable to add kafka-connect-http as a Gradle dependency as follows:

dependencies {
    implementation(
            [group: "com.github.castorm", name: "kafka-connect-http", version: kafka_connect_http_version]
    )
}

The problem appears to be that kafka-connect-http-0.8.9.pom refers to its parent project using a relative path (../pom.xml) which is not available in the Maven Central repository:

<parent>
    <artifactId>kafka-connect-http-parent</artifactId>
    <groupId>com.github.castorm</groupId>
    <version>0.8.9</version>
    <relativePath>../pom.xml</relativePath>
</parent>

The solution is (probably?) to publish the parent POM along with the rest of the artifacts, assuming there's nothing "secret" in it.
Until/unless this is done, the only way to express this dependency is to include the kafka-connect-http.jar in the project and depend on it as a file, which is not ideal for obvious reasons. It does not appear to be possible to avoid this by excluding transitive dependencies.
The raw build output follows.

$ ./gradlew clean build
> Task :modules:lib-kafka-plugins:compileTestJava FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':modules:lib-kafka-plugins:compileTestJava'.
> Could not resolve all files for configuration ':modules:lib-kafka-plugins:testCompileClasspath'.
   > Could not resolve com.github.castorm:kafka-connect-http:0.8.9.
     Required by:
         project :modules:lib-kafka-plugins
      > Could not resolve com.github.castorm:kafka-connect-http:0.8.9.
         > Could not parse POM https://repo.maven.apache.org/maven2/com/github/castorm/kafka-connect-http/0.8.9/kafka-connect-http-0.8.9.pom
            > Could not find com.github.castorm:kafka-connect-http-parent:0.8.9.
              Searched in the following locations:
                - https://repo.maven.apache.org/maven2/com/github/castorm/kafka-connect-http-parent/0.8.9/kafka-connect-http-parent-0.8.9.pom
                - https://pkgs.dev.azure.com/mjensen65816/dsds/_packaging/dsds/maven/v1/com/github/castorm/kafka-connect-http-parent/0.8.9/kafka-connect-http-parent-0.8.9.pom

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output. Run with --scan to get full insights.

* Get more help at https://help.gradle.org

BUILD FAILED in 2s
4 actionable tasks: 3 executed, 1 up-to-date

Header values cannot contain commas

It would appear to be impossible to use http.request.headers to build a header which contains a comma in its value, since TemplateHttpRequestFactory uses ConfigUtils.breakDownHeaders() to parse comma-separated headers without any provision for allowing commas to be escaped. It ultimately uses String.split(",") to do this.

This presents a problem, for example, if one wants to include an If-Modified-Since header in the request. The recommended date format for this header (see RFC2613, 3.3.1 Full Date) somewhat stupidly includes a comma between the abbreviated day-of-week name and date/time parts. It's kind of silly that the day-of-week is in there, but it is.

If-Modified-Since is a very convenient header to include in order to minimize network traffic. It would be nice if it were supported in its "preferred" form.

It seems that the only change necessary would be to change the itemSplitter regex in ConfigUtils.breakDownHeaders() to only split on commas that do not follow some escape character. I am not familiar enough with the code base and typical usage patterns, however, to know whether this would present risk elsewhere.

Handle records in reverse chronological order

Problem

Some APIs don't offer control over the ordering of results.

This might be especially common on APIs that offer a snapshot of the N most recent records.

The source connector bases its offset advancement on chronologically ordered results, so latest committed offset would be that of the oldest record instead of the newest. Ignoring this would result in undesired consequences:

  • when offset is used in the query, offset would advance one record at a time and it would require as many queries as records in the response.
  • inability to deduplicate, which would only make the problem above worst.

Potential Solutions

  • automatically sorting records in chronological order, this would be the simplest from user perspective as it doest require even knowing of this concern, however, it would penalize performance, especially when records are reversed
  • allowing the user to specify the sorting direction of records in the response.

Using a source topic to create the body request and receive the response

Hello, I am looking to:

  1. Use my "source" topic to feed a json payload into the body of the HTTP request.

  2. Receive and store the response in an output topic. (input = request, output = response)

I believe the confluent http connector does this, but it is a paid plugin unfortunately.

I dont see a way on the document on how to use a topic.value as a body, any suggestions here??

Thank you,

Matt G.

URL encoding of query parameters

Is your feature request related to a problem? Please describe.
I need to implement CDC, but the data field I need to pass into the query params is not URL encoded, and so I get a 400 error from the server.

Describe the solution you'd like
URL encode the query parameters within the connector.

Describe alternatives you've considered
None I could think of.

Support for Microsoft's JSON date formats

I am working with an Odata API interface which presents timestamp values such as lastModified in a format similar to `//Date(1530144000000+0530)/. Would it be possible to get such date formats supported. If this is something that could be supported I'd be happy to contribute this sort of functionality, a steer in the right direction would certainly be appreciated.

Handling multi-line JSON response

Is there a way to handle a response that contains multiple JSON objects, one per line? This would not be a valid JSON document, and is not an array.
Example:

{"foo": "bar", "id": 1}
{"foo": "bar", "id": 2}
{"foo": "bar", "id": 3}

It appears the default behavior is that only the first JSON object is parsed into a record.

Thanks for any ideas.

Maven package build fails on JDK 11.0.11

Describe the bug
It's the same bug as #22, only a newer version.

See that issue for further details.

Additional context
JDK version 11.0.11 (current Ubuntu 20.04LTS default)

Quickfix:
Changing ${java.version} to 11 resolves the issue. Perhaps it would be possible to just take the major version of java instead of the entire version with bugfix?

Support for paginated response from API

Is your feature request related to a problem? Please describe.
Recently I have been working on a project that requires the HTTP source connector. But the API returns only a set number of response as an array and provide a URL at the end to access the next set of responses. So if we can have a way to support calling the next page URL till we don't have any new record, it will solve the problem

Describe the solution you'd like
I would like to work on the problem, by adding a way to extract the next page URL till it exists or till no new records are present and keep pushing the records per page to Kafka.

Describe alternatives you've considered
I have also tried to collect all the messages and push them at once till no new page URL is present, but that will lead to a lot of memory issues if the number of pages is very large.

[DepShield] (CVSS 7.5) Vulnerability due to usage of org.apache.commons:commons-compress:1.20

Vulnerabilities

DepShield reports that this application's usage of org.apache.commons:commons-compress:1.20 results in the following vulnerability(s):


Occurrences

org.apache.commons:commons-compress:1.20 is a transitive dependency introduced by the following direct dependency(s):

org.testcontainers:testcontainers:1.16.0
        └─ org.apache.commons:commons-compress:1.20

com.github.castorm:kafka-connect-http-infra:0.8.12-SNAPSHOT
        └─ org.testcontainers:testcontainers:1.16.0
              └─ org.apache.commons:commons-compress:1.20

This is an automated GitHub Issue created by Sonatype DepShield. Details on managing GitHub Apps, including DepShield, are available for personal and organization accounts. Please submit questions or feedback about DepShield to the Sonatype DepShield Community.

java.lang.NoClassDefFoundError: freemarker/template/Configuration

I am getting following error while starting my source connector:

[2022-03-14 09:48:10,669] INFO [http-source|task-0] TemplateHttpRequestFactoryConfig values:
http.request.body =
http.request.headers = Accept:application/json
http.request.method = GET
http.request.params =
http.request.template.factory = class com.github.castorm.kafka.connect.http.request.template.freemarker.BackwardsCompatibleFreeMarkerTemplateFactory
http.request.url = http://localhost:8080/api/messages
(com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactoryConfig:376)
[2022-03-14 09:48:10,671] ERROR [http-source|task-0] WorkerSourceTask{id=http-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:195)
java.lang.NoClassDefFoundError: freemarker/template/Configuration
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.getDeclaredConstructor(Class.java:2178)
at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:390)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:399)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
at com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactoryConfig.(TemplateHttpRequestFactoryConfig.java:66)
at com.github.castorm.kafka.connect.http.request.template.TemplateHttpRequestFactory.configure(TemplateHttpRequestFactory.java:48)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:405)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:434)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:419)
at com.github.castorm.kafka.connect.http.HttpSourceConnectorConfig.(HttpSourceConnectorConfig.java:71)
at com.github.castorm.kafka.connect.http.HttpSourceTask.start(HttpSourceTask.java:86)
at org.apache.kafka.connect.runtime.WorkerSourceTask.initializeAndStart(WorkerSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: freemarker.template.Configuration
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:103)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 23 more
[2022-03-14 09:48:10,675] INFO [http-source|task-0] [Producer clientId=connector-producer-http-source-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer:1208)

My Source connector properties are:

name=http-source
kafka.topic=http-messages
connector.class=com.github.castorm.kafka.connect.http.HttpSourceConnector
http.request.url=http://localhost:8080/api/messages
http.request.headers=Accept:application/json
http.timer.interval.millis=30000

I am using following version:

castorm-kafka-connect-http-0.8.11

I can also see freemarker-2.3.31.jar in my libraries with others:
image

Environment Details:
OS - Windows10
Java 1.8
Confluent version - 7.0.1 (standalone)
Http connector version - v0.8.11

[DepShield] (CVSS 5.5) Vulnerability due to usage of junit:junit:4.12

Vulnerabilities

DepShield reports that this application's usage of junit:junit:4.12 results in the following vulnerability(s):


Occurrences

junit:junit:4.12 is a transitive dependency introduced by the following direct dependency(s):

org.testcontainers:testcontainers:1.15.1
        └─ junit:junit:4.12

com.github.castorm:kafka-connect-http-infra:0.8.9-SNAPSHOT
        └─ org.testcontainers:testcontainers:1.15.1
              └─ junit:junit:4.12

This is an automated GitHub Issue created by Sonatype DepShield. Details on managing GitHub Apps, including DepShield, are available for personal and organization accounts. Please submit questions or feedback about DepShield to the Sonatype DepShield Community.

AWS Signv4 authentication

Describe the solution you'd like
I would like to connect to elastic search, but the authentication should be AWS SignV4

Describe alternatives you've considered
I have a custom python script to fetch data from elastic search.

Later release than 0.3.5 does not contains a connector

Hello,

First of all thanks for the connector and the work on it, I noticed while testing it. That the latest releases (after 0.3.5) of the connector does not contains the jar but only the source code.

According to the README, this is where the latest version should be retrieved. Did you change the site where you upload the connector?
If yes where can we find it? If not, what is it needed to automate the release so it contains the jar?

Add support for unordered results

Is your feature request related to a problem? Please describe.
I haven't seen an option to order the results when the API returns. I am working with an API where the results are not ordered and any option in the http.response.list.order.direction config allows it to be correctly sorted.

Describe the solution you'd like
Maybe another option like ASC_FORCED_BY_TIMESTAMP or even/either ASC_FORCED_BY_KEY would be added to sort the results based on the property.

Describe alternatives you've considered
I have not found any alternatives yet.

Additional context
I saw your answer for issue #126, maybe you do not see fit for this in the project. I have already worked on a solution. I will add a pull request very soon.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.