Coder Social home page Coder Social logo

ksqldb-php's Introduction

ksqlDB PHP client

Currently under development. API stability is not guaranteed until v1.

Requires PHP 8

composer require dev-this/ksqldb-php

Features

Usage

Create a client

There is a factory available for client creation

DevThis\KsqlDB\ClientFactory::create(string $hostname): DevThis\KsqlDB\Client

No HTTP connection will be established until a client command has been called.

Usage:

$hostname = 'http://localhost:8088';

$client = (new DevThis\KsqlDB\Factory\ClientFactory())->create($hostname);

Streaming callbacks

Streaming a query requires a callback class that implements a callback interface. Establishing a stream is purposefully blocking until the header has been received (along with query ID).

DevThis\KsqlDB\Factory\ClientFactory::stream(Statement $statement, StreamCallback $callback): Amp\Promise

Callback class must implement StreamCallback

interface StreamCallback {
    // Invoked once, at the start of the stream
    // StreamHeader has getters for the query ID, and column names and their data types.
    public function onHeader(StreamHeader $header): void;
    
    // OnEvent will be invoked on each new event
    // StreamEvent is an \ArrayObject
    public function onEvent(StreamEvent $event): void;
}

Usage:

use DevThis\KsqlDB\Interfaces\StreamCallback;
use DevThis\KsqlDB\Statement

$transactionStatement = new Statement("SELECT * FROM transactions EMIT CHANGES;");

$transactionHandler = new class implements StreamCallback {
    public function onHeader(StreamHeader $header): void
    {
        echo sprintf(">Query ID: %s\n", $header->getQueryId());
    }

    public function onEvent(StreamEvent $event): void
    {
        echo "Processing new transaction\n";
        // do something with $event...
    }
}

$stream = $client->stream($transactionStatement, $transactionHandler);
// Query ID
echo $stream->getQueryId();

// Terminate the query
$client->terminate($stream);

// wait indefinitely
\Amp\Promise\wait($promise);

Executing a statement

Executing a statement works similarly to Streaming a statement. The main difference is that executed statements are not continous operations.

DevThis\KsqlDB\Client::execute(Statement $statement): ArrayObject

ArrayObject will contain the response.

Functional example

Asynchronous application that will eat its own dogfood. Consuming the very events it created:

use DevThis\KsqlDB\Interfaces\StreamCallback;
use DevThis\KsqlDB\Statement;
use DevThis\KsqlDB\Factory\ClientFactory;
use DevThis\KsqlDB\StreamEvent;
use DevThis\KsqlDB\StreamHeader;

$client = (new ClientFactory())->create('http://localhost:8088');

$createStatement = new Statement("CREATE STREAM cool_data (
    id VARCHAR KEY,
    message VARCHAR,
    timestamp VARCHAR,
) WITH (
    kafka_topic = 'cool_data',
    partitions = 1,
    value_format = 'avro',
    timestamp = 'timestamp',
    timestamp_format = 'yyyy-MM-dd''T''HH:mm:ss'
);");
$streamStatement = new Statement("SELECT * FROM cool_data EMIT CHANGES;");
$coolDataCallback = new class implements \DevThis\KsqlDB\Interfaces\StreamCallback {
    private const SCHEMA_ID = 0;
    private const SCHEMA_MESSAGE = 1;
    private const SCHEMA_TIMESTAMP = 2;

    public function onHeader(StreamHeader $header): void
    {
        echo sprintf(">Query ID: %s\n", $header->getQueryId());
        echo sprintf(">Columns: %s", print_r($header->getColumns(), true));
        echo "--------------------\n";
    }

    public function onEvent(StreamEvent $event): void
    {
        echo "Processing new transaction\n";
        echo sprintf(">ID: %s\n", $event[static::SCHEMA_ID]);
        echo sprintf(">Message: %s\n", $event[static::SCHEMA_MESSAGE]);
        echo sprintf(">Timestamp: %s\n", $event[static::SCHEMA_TIMESTAMP]);
    }
};

$stream = $client->execute($createStatement);

// Run event loop
// https://amphp.org/amp/event-loop/
\Amp\Loop::run(function () use ($client) {
    $stream = $client->streamAsync($streamStatement, $coolDataCallback);

    Loop::repeat(1000, static function() {
        // insert into stream example.
    });
    
    // Terminate stream after 100 seconds.
    Loop::delay(1000 * 100, static function () use ($client, $stream) {
        $client->terminateStream($stream->getQueryId());
    });
});

Alternatives

ksqldb-php's People

Contributors

verystrongfingers avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.