Coder Social home page Coder Social logo

conduitio / conduit Goto Github PK

View Code? Open in Web Editor NEW
359.0 16.0 41.0 10.37 MB

Conduit streams data between data stores. Kafka Connect replacement. No JVM required.

Home Page: https://conduit.io

License: Apache License 2.0

Dockerfile 0.03% Makefile 0.14% Go 87.96% Shell 0.27% HTML 0.33% JavaScript 8.10% Handlebars 3.09% CSS 0.09%
go data-integration etl data-pipeline data-engineering data-stream conduit kafka kafkaconnect

conduit's Introduction

Conduit

Logo

Data Integration for Production Data Stores. ๐Ÿ’ซ

scarf pixel License Test Go Report Card Discord Go Reference Conduit docs API docs

Overview

Conduit is a data streaming tool written in Go. It aims to provide the best user experience for building and running real-time data pipelines. Conduit comes with batteries included, it provides a UI, common connectors, processors and observability data out of the box.

Conduit pipelines are built out of simple building blocks which run in their own goroutines and are connected using Go channels. This makes Conduit pipelines incredibly performant on multi-core machines. Conduit guarantees the order of received records won't change, it also takes care of consistency by propagating acknowledgments to the start of the pipeline only when a record is successfully processed on all destinations.

Conduit connectors are plugins that communicate with Conduit via a gRPC interface. This means that plugins can be written in any language as long as they conform to the required interface.

Conduit was created and open-sourced by Meroxa.

Quick start

  1. Download and extract the latest release.

  2. Download the example pipeline and put it in the directory named pipelines in the same directory as the Conduit binary.

  3. Run Conduit (./conduit). The example pipeline will start automatically.

  4. Write something to file example.in in the same directory as the Conduit binary.

    echo "hello conduit" >> example.in
  5. Read the contents of example.out and notice an OpenCDC record:

    $ cat example.out
    {"position":"MTQ=","operation":"create","metadata":{"file.path":"./example.in","opencdc.readAt":"1663858188836816000","opencdc.version":"v1"},"key":"MQ==","payload":{"before":null,"after":"aGVsbG8gY29uZHVpdA=="}}
  6. The string hello conduit is a base64 encoded string stored in the field payload.after, let's decode it:

    $ cat example.out | jq ".payload.after | @base64d"
    "hello conduit"
  7. Explore the UI by opening http://localhost:8080 and build your own pipeline!

Installation guide

Download binary and run

Download a pre-built binary from the latest release and simply run it!

./conduit

Once you see that the service is running you may access a user-friendly web interface at http://localhost:8080. You can also interact with the Conduit API directly, we recommend navigating to http://localhost:8080/openapi and exploring the HTTP API through Swagger UI.

Conduit can be configured through command line parameters. To view the full list of available options, run ./conduit --help or see configuring Conduit.

Homebrew

Make sure you have homebrew installed on your machine, then run:

brew update
brew install conduit

Debian

Download the right .deb file for your machine architecture from the latest release, then run:

dpkg -i conduit_0.8.0_Linux_x86_64.deb

RPM

Download the right .rpm file for your machine architecture from the latest release, then run:

rpm -i conduit_0.8.0_Linux_x86_64.rpm

Build from source

Requirements:

git clone [email protected]:ConduitIO/conduit.git
cd conduit
make
./conduit

Note that you can also build Conduit with make build-server, which only compiles the server and skips the UI. This command requires only Go and builds the binary much faster. That makes it useful for development purposes or for running Conduit as a simple backend service.

Docker

Our Docker images are hosted on GitHub's Container Registry. To run the latest Conduit version, you should run the following command:

docker run -p 8080:8080 conduit.docker.scarf.sh/conduitio/conduit:latest

The Docker image includes the UI, you can access it by navigating to http://localhost:8080.

Configuring Conduit

Conduit accepts CLI flags, environment variables and a configuration file to configure its behavior. Each CLI flag has a corresponding environment variable and a corresponding field in the configuration file. Conduit uses the value for each configuration option based on the following priorities:

  • CLI flags (highest priority) - if a CLI flag is provided it will always be respected, regardless of the environment variable or configuration file. To see a full list of available flags run conduit --help.

  • Environment variables (lower priority) - an environment variable is only used if no CLI flag is provided for the same option. Environment variables have the prefix CONDUIT and contain underscores instead of dots and hyphens (e.g. the flag -db.postgres.connection-string corresponds to CONDUIT_DB_POSTGRES_CONNECTION_STRING).

  • Configuration file (lowest priority) - Conduit by default loads the file conduit.yaml placed in the same folder as Conduit. The path to the file can be customized using the CLI flag -config. It is not required to provide a configuration file and any value in the configuration file can be overridden by an environment variable or a flag. The file content should be a YAML document where keys can be hierarchically split on .. For example:

    db:
      type: postgres # corresponds to flag -db.type and env variable CONDUIT_DB_TYPE
      postgres:
        connection-string: postgres://localhost:5432/conduitdb # -db.postgres.connection-string or CONDUIT_DB_POSTGRES_CONNECTION_STRING

Connectors

For the full list of available connectors, see the Connector List. If there's a connector that you're looking for that isn't available in Conduit, please file an issue .

Conduit loads standalone connectors at startup. The connector binaries need to be placed in the connectors directory relative to the Conduit binary so Conduit can find them. Alternatively, the path to the standalone connectors can be adjusted using the CLI flag -connectors.path.

Conduit ships with a number of built-in connectors:

  • File connector provides a source/destination to read/write a local file (useful for quickly trying out Conduit without additional setup).
  • Kafka connector provides a source/destination for Apache Kafka.
  • Postgres connector provides a source/destination for PostgreSQL.
  • S3 connector provides a source/destination for AWS S3.
  • Generator connector provides a source which generates random data (useful for testing).

Additionally, we have prepared a Kafka Connect wrapper that allows you to run any Apache Kafka Connect connector as part of a Conduit pipeline.

If you are interested in writing a connector yourself, have a look at our Go Connector SDK. Since standalone connectors communicate with Conduit through gRPC they can be written in virtually any programming language, as long as the connector follows the Conduit Connector Protocol .

Processors

A processor is a component that operates on a single record that flows through a pipeline. It can either change the record (i.e. transform it) or filter it out based on some criteria.

Conduit provides a number of builtin processors, which can be used to manipulate fields, send requests to HTTP endpoints, and more, check Builtin processors for the list of builtin processors and documentations.

Conduit also provides the ability to write your own Standalone Processor, or you can use the builtin processor custom.javascript to write custom processors in JavaScript.

More detailed information as well as examples can be found in the Processors documentation.

API

Conduit exposes a gRPC API and an HTTP API.

The gRPC API is by default running on port 8084. You can define a custom address using the CLI flag -grpc.address. To learn more about the gRPC API please have a look at the protobuf file .

The HTTP API is by default running on port 8080. You can define a custom address using the CLI flag -http.address. It is generated using gRPC gateway and is thus providing the same functionality as the gRPC API. To learn more about the HTTP API please have a look at the API documentation, OpenAPI definition or run Conduit and navigate to http://localhost:8080/openapi to open a Swagger UI which makes it easy to try it out.

UI

Conduit comes with a web UI that makes building data pipelines a breeze, you can access it at http://localhost:8080. See the installation guide for instructions on how to build Conduit with the UI.

For more information about the UI refer to the Readme in /ui.

animation

Documentation

To learn more about how to use Conduit visit docs.Conduit.io.

If you are interested in internals of Conduit we have prepared some technical documentation:

Contributing

For a complete guide to contributing to Conduit, see the Contribution Guide .

We welcome you to join the community and contribute to Conduit to make it better! When something does not work as intended please check if there is already an issue that describes your problem, otherwise please open an issue and let us know. When you are not sure how to do something please open a discussion or hit us up on Discord.

We also value contributions in form of pull requests. When opening a PR please ensure:

  • You have followed the Code Guidelines .
  • There is no other pull request for the same update/change.
  • You have written unit tests.
  • You have made sure that the PR is of reasonable size and can be easily reviewed.

conduit's People

Contributors

adamhaffar avatar dependabot[bot] avatar devarismeroxa avatar dylanlott avatar hariso avatar jayjayjpg avatar jmar910 avatar lovromazgon avatar lyuboxa avatar maha-hajja avatar nassor avatar neovintage avatar raulb avatar samirketema avatar uchennakevinm1 avatar zeina1i avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

conduit's Issues

Plugin documentation workflow

Figure out how we get documentation for plugins from repositories to the docs repository. (Sidenote: try to do it with pull, not push, then we can include 3rd party plugins in the future)

End-to-end tests

We need to create end-to-end tests that treat Conduit as a black box, trigger requests to the API and make sure they produce the expected result. For now they should cover at least the main paths (e.g. creating a pipeline, starting it, stopping it, deleting it). These tests should be easy to run locally without setting up a specific environment (ideally with one make target), additionally we should trigger the tests in the CI as regression tests before merging any PR.

Cleanup and release v0.1.0

Checklist before releasing:

  • Readme
    • Logo is displayed correctly
    • Links point to the correct pages
      • docs.conduit.io works?
      • conduit.io works?
    • Slogan is updated
  • Repository description is set (same as slogan probably)
  • Repository topics are set
  • Rewrite history by squashing all commits
  • Clean up test releases and Docker images
  • Create tag v0.1.0
  • Make repository public

Checklist after repo is public:

  • Readme
    • Badges are displayed correctly
  • Issue templates display correctly and link to correct places (e.g. ask questions, open documentation issues)
  • Make sure godocs is accessible
  • Make sure forking is enabled
  • Social preview image
  • Make sure goreport is accessible and badge shows up correctly (the page is down on the day of open-sourcing)

Use mock connector in tests

Our tests are currently (ab)using the file connector plugin to run pipeline tests. We should rather mock the connector instance to better isolate unit tests and either remove current tests or tag them as integration tests.

Stream Inspector

The goal is to give folks the ability to see what's happening inside the stream. This is not like tail. This is more like peek. Pull out some information from the stream so that a developer can see the data and data types.

Readme section for sharp edges

We should add a section where we describe pitfalls /sharp edges / future work / known limitations of Conduit to manage expectations. Conduit is still in its infancy and not really meant to be used in production yet, we owe it to our users to be open about this. At the same time, we can use this section to clarify we know there is more work to do and we are on it.

This section should talk about Conduit and not plugins, as the limitations of plugins are listed in the readme of each plugin (we can mention this though).

Some things we already identified (not an exhaustive list):

  • no clustering support
  • no open CDC record format support
  • can't provide firm delivery guarantees (we are missing tests to prove it)
  • can't provide performance guarantees (again, no benchmarks)
  • no plugin management (Conduit does not have a list of available plugins, UI hardcodes plugins)

Terraform Provider

Because conduit is software that is meant to be run as a service, developers will want ways to easily repeat their setup process. Conduit needs to be able to be set up via terraform.

Extract built-in plugins

The conduit repository currently contains built-in plugins. We should rather move them to separate repositories and make sure they are included in Conduit when it's compiled. This way we can easily add more built-in plugins in the future, even ones that are not built and maintained in-house. The easiest way to include them at compile time is to just import them and add them to a global variable containing built-in plugins. The drawback of this approach is that we can only include plugins written in Go (probably not even a problem in the short term).

Depends on #37.

Once a plugin is extracted, we need to adjust the code so it references the extracted SDK instead of Conduit.

  • S3 plugin
  • Kafka plugin
  • Postgres plugin
  • File plugin
  • Generator plugin

Transform: Metadata extractor

We should create a transform that allows us to extract a value from a record key or payload and insert it into the record metadata. The original field should stay untouched. The metadata field name needs to be configurable as well as the payload/key field.

Example: a metadata extractor configured to extract the value of field foo and insert it as metadata into field bar applied on the following record:

metadata: {"baz":"irrelevant"}
payload: {"foo":"some value","bar":123}

Should produce:

metadata: {"bar":"some value","baz":"irrelevant"}
payload: {"foo":"some value","bar":123}

Extract Connector plugin SDK

Extract all functionality needed for creating a plugin into a separate repository. This means most of the code in package pkg/plugin/sdk should be moved. The newly created repository should not have Conduit as a dependency but should define everything it needs locally (e.g. structures for records, logging etc.). The reason is that Conduit will import the SDK repository and use it to communicate with plugins. The goal is to create a minimal repository that can be imported by plugins and contains everything that's needed to implement a plugin.

Replace confluent-kafka-go with Segment's kafka-go client

The Kafka client we use now is confluent-kafka-go. We've initially chosen it because it's one of the most used Go client for Kafka, it's quite configurable and has a simpler APIs (it's possible to read messages from a whole topic, whereas for most other clients messages need to be explicitly read from partitions).

However, because it has a dependency on CGo (under the hood it uses librdkafka), we couldn't find a way to build Conduit for a number of platforms and architectures (Windows and Darwin, AMD64, ARM64).

  • Replace the producer (destination connector)
  • Replace the consumer (source connector) #105
  • Handle case where new partitions are added #105

CLI configuration

The goal is to make the Conduit CLI configurable. We should support the combination of parsing of flags, environment variables and/or a config file. We need to decide what things need to be configurable (e.g. log level, API ports, path to config file, path to pipeline configs) and the corresponding flag/env var/config field names. We already have some simple CLI flags, the goal of this issue is to implement it in a way that makes it easy to add new options in the future.

Postgres CDC unexpected behavior

The test for Postgres CDC behavior is not expecting the correct behavior.

  1. If we call Source.Read on an empty table that never had any rows in it we should receive a recoverable error.
  2. The first call to Source.Read should be with the position nil.
  3. All following calls to Source.Read should be with the position of the record that was last returned.
  4. Once we add a row to the table Source.Read should return that record.
  5. If we call Source.Read again it should return a recoverable error, since no change happened in the database so no change should be detected.
  6. Only after updating the record (or deleting it, or inserting a new one) the Source.Read function should return another record.

Would be great to have tests for making sure we detect inserts, updates and deletes.

Design acceptance tests for plugins

The plugin interface is not only structural but also behavioral (e.g. Open is guaranteed to be called before Read or Write, certain errors have special meaning). We should provide utilities to run an acceptance test on any plugin to figure out if it is behaving as expected. The goal is to write a design doc for creating acceptance tests that determines if a plugin is generally going to work with Conduit.

Docker Getting Started

In terms of installation instructions, we need to be able to give developers the ability to install via docker if need be.

Kafka sink: Asynchronous sends

Currently, when sending messages in the Kafka sink connector, we use synchronous sends. While this is fine in the first version of the connector, we want to have asynchronous sends, which will increase performance.

Acceptance Tests

We need to get our acceptance testing up to an acceptable level. Looking for a particular % of code coverage.

Performance Benchmarks

While everyone should do their own benchmarking, we need to give developers some where to start. This will anchor expectations and then we can help folks tune their installations as they progress.

Entity locking in orchestration layer

Initially, each service (pipeline, connector, processor service) had its own instance lock. The problem was twofold - the services locked the instances only for the duration of the operation inside the service and they did not lock any related instance. Let's take the creation of a processor as an example: it requires us to get the pipeline, lock it in place so it's not modified in the meantime, create the processor, add it to the pipeline, commit the transaction to the DB and only then release the lock.

Now that we have the orchestration layer it should be the responsibility of this layer to lock any entities that will be modified in an operation before making changes. The goal is to make Conduit safe for concurrent use, i.e. multiple requests changing the same resource at the same time.

Plugins technical documentation

We should create technical documentation for plugins targeted at developers. This should contain information like how to create a new connector plugin, how to debug it, how will Conduit call plugin functions (call order guarantees), what errors should be returned, how should logging be handled etc.

Make sure the link in the readme points to the correct file after creating this doc.

Plugin management service

Create a service for managing plugins. For now this means that we need a list of pre-defined paths to plugins (built-in plugins), these plugins should be loaded up on startup to fetch their specifications and indexed in memory. When a new connector is created the plugin should be fetched by its name from this plugin manager, and not by the path to the plugin as is the case right now. In the future we can add the functionality for adding plugins on the fly, but this is not needed for now.

Postgres Source fails when handling bytea primary keys

Steps to Reproduce

  • Have a Postgres Source configured to read a table that has a primary key column of type bytea.
  • Configure a pipeline to run that plugin.

Expected Behavior

The Postgres Source should handle this functionality in the same way it would handle any other key column.

Actual Behavior

The Postgres connector cannot handle this query correctly and instead of returning the correct result or a descriptive error it returns an ErrEndData which implies that the plugin operated correctly and is now at its end.

Add CI action for generated code

We should add a Github action that runs make generate and succeeds if there is no diff. It should run on each push to a PR.

We could do the same with proto files (generate them and make sure there's no diff). Proto files aren't generated in the repo anymore, we use Buf remote code generation now.

Health check

We have a health check endpoint that always returns a healthy status. We need to implement an actual health check that makes sure Conduit is running correctly. Right now the only thing to check is if the DB is working correctly (e.g. ping the DB). Once we tackle clustering this check should (probably) be improved to definitively indicate if a node in the cluster is healthy.

Error handling: Return proper status codes in API

The GRPC and HTTP API should return the proper status codes. For example GET /pipelines/{id} should return a 404 if a pipeline is not found, the corresponding GRPC endpoint should return status code 5.

This should probably be done with a middleware that contains a mapping between globally defined error values (see ConduitIO/conduit-old#262) and gRPC/HTTP status codes.

Additionally, we need to make sure to document this in protobuf annotations so that it shows up in the swagger docs.

Export Pipeline to Meroxa

If a developer wants to go to the managed platform, they'll have the option of exporting their pipeline to Meroxa.

Connector Directory Listing

Once the connector SDK has been developed, we need to be able to help other developers search and discover new connectors that have been created for conduit.

Pipeline started after being degraded still has the error

Description

A pipeline started after it's been degraded, still shows the previous error. To reproduce:

  1. Create a pipeline, e.g. with a Kafka source.
  2. Stop the Kafka broker
  3. Wait for a bit
  4. Pipeline is shown as degraded -- this is expected and actual behavior.
  5. Start pipeline
  6. Pipeline is shown as running, but with an error -- the expected behavior is that no error is shown.

Example:

[
    {
        "id": "65eb595b-57a6-45d7-85ff-21a721664855",
        "state": {
            "status": "STATUS_RUNNING",
            "error": "node cf6a452e-9261-4b64-b2f3-9b7cca6534ef stopped with error:\n    github.com/conduitio/conduit/pkg/pipeline.(*Service).runPipeline.func1\n        /Users/haris/projects/conduit/pkg/pipeline/lifecycle.go:376\n  - error reading from source:\n    github.com/conduitio/conduit/pkg/pipeline/stream.(*SourceNode).Run.func2\n        /Users/haris/projects/conduit/pkg/pipeline/stream/source.go:94\n  - source client read:\n    github.com/conduitio/conduit/pkg/plugins.(*sourceClient).Read\n        /Users/haris/projects/conduit/pkg/plugins/source.go:136\n  - rpc error: code = Unknown desc = source server read: failed getting a message received error from client localhost:9092/1: 1 request(s) timed out: disconnect (after 772261ms in state UP): received error from client localhost:9092/1: 1 request(s) timed out: disconnect (after 772261ms in state UP)"
        },
        "config": {
            "name": "pipeline-name-712442dd-64f3-43de-9483-5844b4f6649c",
            "description": "My new pipeline"
        },
        "connectorIds": [
            "cf6a452e-9261-4b64-b2f3-9b7cca6534ef",
            "7741daaa-6386-45ba-824a-6a5051993570"
        ],
        "processorIds": []
    }
]

Propagate connector persister errors

Right now connector persister errors are only logged but not propagated back to the Conduit runtime. We need to make sure that an error in the connector persister is sent back to the Conduit runtime which then initiates a shutdown.

GRPC connector state endpoint

Currently we have an endpoint for updating a connector config. We need to either allow updating the state through that endpoint or add a separate one specifically for updating the state (e.g. setting the position). The assignee should figure out which approach to take and implement it.

Provide pre-built binaries for Windows and Macs with M1 chips

This is a follow-up on ConduitIO/conduit-old#438.

Due to reasons mentioned in the PR, we currently do not have pre-built binaries of Conduit and plugins for Windows and also Macs with M1 chips.

We should investigate what are our options and what needs to be done so we get them as well.

Conduit Project Landing Site

A site for the project. Requirements include:

  • Where to have conversations with community
  • What the project is about
  • How to get started
  • Project Goals
  • Links to the documentation

We may decide to include the documentation as part of the site itself. We would need to determine the information architecture.

Clustered Conduit

The problem to solve is High Availability. Should one node go down, another will need to take it's place. The goal is not disaster recovery. We will need to develop a separate solution for that.

Load pipeline config files on startup

Conduit should by default load all YAML files in folder pipelines (relative to the conduit binary) and provision pipelines when it starts. The folder location should be configurable through a CLI flag.

Depends on #493.

Postgres source: records duplicated in certain cases

If the last saved position in a Pg source is M, and a newly inserted row has ID N, then the newly created record will be returned N-M times by the source. For example, if the last row returned by a Pg source had ID 10, and then we insert a row with the ID 30, then the new row will be returned 20 times.

Build 0ec0aa6

Steps to reproduce

  1. Setup a table in Pg, with one row, where ID is 1 (for example):
insert into persons (firstname, personid) values ('foo', 100);
  1. Create a pipeline with a Pg source and any destination (can be a file destination). Pg source configuration I used:
"settings": {
	"table": "persons",
	"url": "postgresql://meroxauser:meroxapass@localhost/conduit-test-db?sslmode=disable"
}
  1. Run the pipeline.
  2. Expected (and actual behavior): you see following in the file destination:
{"firstname":"foo","personid":100}
  1. Insert following row into Pg:
insert into persons (address, city, firstname, lastname, personid) values ('wall street', 'new york', 'foo', 'bar', 105);

Run the pipeline again.
6. Expected: you see following in the file destination:

{"firstname":"foo","personid":105}

Actual behavior:

{"firstname":"foo","personid":105}
{"firstname":"foo","personid":105}
{"firstname":"foo","personid":105}
{"firstname":"foo","personid":105}
{"firstname":"foo","personid":105}

Leveled logger for plugins

Add utility functions for plugins to create a logger with support for leveled output. Log output from plugins is already captured by Conduit and included in its own log stream, but it's currently logged without any level. The goal is to allow the plugin to decide which log level will be used for a message and possibly add structured fields.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.