Coder Social home page Coder Social logo

kotlin-coworker's Introduction

Community badge: Incubating Community extension badge Compatible with: Camunda Platform 8

Kotlin-coworker

This project aims to provide a neat Kotlin Coroutines API to Zeebe Gateway. Right now there is just a worker coroutine API.

Motivation

I decided to create it in trying to gain all performance that I can gain using Kotlin Coroutines stack. So, if you replace blocking calls with Coroutines suspension you can take more jobs in parallel, instead of one (in Zeebe Client Java default settings).

You can see the performance comparison test for yourself, but in my machine, the numbers are next:

For Zeebe Client it took 41.186149961s to process 40
...
For Coworker it took 1.522231647s to process 40
So, Zeebe Client Java duration / Coworker duration = 27.056427346106805

So, the same worker with delay, but in the reactive stack takes in 27 times less time to complete process instances.

Usage

  • Add the dependency
<dependency>
    <groupId>org.camunda.community.extension.kotlin.coworker</groupId>
    <artifactId>coworker-core</artifactId>
    <version>x.y.z</version>
</dependency>
  • Obtain a ZeebeClient instance, for example, ZeebeClient.newClient()
  • Use the extension function to obtain a Cozeebe instance zeebeClient.toCozeebe()
  • Create a new Coworker instance:
val coworker = cozeebe.newCoWorker(jobType, object : JobHandler {
    override suspend fun handle(client: JobClient, job: ActivatedJob) {
        val variables = job.variablesAsMap
        val aVar = variables["a"] as Int
        val bVar = variables["b"] as Int
        variables["c"] = aVar + bVar

        client.newCompleteCommand(job).variables(variables).send().await()
    }
})
  • Open it, like Zeebe's Java Worker: coworker.open()

Features

  • Coroutine native implementation (you can use suspend functions inside JobHandler methods)
  • Easily combine with existing Zeebe Client Java libs.
  • Because of using coroutines Coworker could activate more jobs containing blocking logic (Database queries, HTTP REST calls, etc.) if they adopted coroutines (a.k.a non-blocking API) than a classic Zeebe Java worker. You can see results for yourself in the benchmark module.
  • Spring Boot Starter

Spring Boot Starter

It requires:

  • Spring Boot 2.7.+ (should work with Spring Boot 3.0.x but haven't tested properly).
  • JDK 11

First, you need to add dependency:

<dependency>
    <groupId>org.camunda.community.extension.kotlin.coworker</groupId>
    <artifactId>coworker-spring-boot-starter</artifactId>
    <version>x.y.z</version>
</dependency>

Then, if you need to define Zeebe Worker with coroutines, like this:

@Coworker(type = "test")
suspend fun testWorker(jobClient: JobClient, job: ActivatedJob) {
  someService.callSomeSuspendMethod(job.variables)
  jobClient.newCompleteCommand(activatedJob.key).send().await()
}

Note:

  1. Method should be suspend
  2. Method should be annotated with @Coworker
  3. Method should not call thread-blocking functions. Use Kotlin's .await() instead of .join() in the example upward.
  4. It hasn't had all the features from Spring Zeebe, but it seems that some features will be ported eventually. Create an issue or PR with the feature that you need :)

Override coroutine context for each coworker execution

Sometimes you need to provide some data in a coroutine context (an MDC map, for example) based on the job. To do so, you have to override additionalCoroutineContextProvider from JobCoworkerBuilder. Something, like this:

client.toCozeebe().newCoWorker(jobType) { client, job ->
            // your worker logic
            client.newCompleteCommand(job).send().await()
        }
            // override additionalCoroutineContextProvider
            .also { it.additionalCoroutineContextProvider = JobCoroutineContextProvider { testCoroutineContext } }
            // open worker
            .open().use {
                // logic to keep the worker running
            }

If you are using the Spring Boot Starter, you need just to create a bean with the type JobCoroutineContextProvider in your Spring context. Like this:

    @Bean
    fun loggingJobCoroutineContextProvider(): JobCoroutineContextProvider {
        return JobCoroutineContextProvider {
          MDCContext()
        }
    }

Custom error handling

Sometimes, you want to override the default error handling mechanism. To do so, you need to customize your worker like this:

        client.toCozeebe().newCoWorker(jobType) { job: ActivatedJob, jobClient: JobClient ->
            // worker's logic
        }
            .also {
                // override job error handler
                it.jobErrorHandler = JobErrorHandler { e, activatedJob, jobClient ->
                    if (e is IgnorableException) {
                        jobClient.newCompleteCommand(activatedJob).variables(mapOf("ignored" to true)).send().await()
                    } else {
                        jobClient.newFailCommand(activatedJob).retries(activatedJob.retries - 1).send().await()
                    }
                }
            }

Error handling in Spring Boot

If you are using the Spring Boot Starter, you need to define a JobErrorHandler bean in your context:

        @Bean
        open fun customErrorHandler(): JobErrorHandler {
            val defaultErrorHandler = DefaultSpringZeebeErrorHandler()
            return JobErrorHandler { e, activatedJob, jobClient ->
                logger.error(e) { "Got error: ${e.message}, on job: $activatedJob" }
                defaultErrorHandler.handleError(e, activatedJob, jobClient)
            }
        }

Warning: It is highly recommend to use the DefaultSpringZeebeErrorHandler wrapper to wrap your error handling logic. More info in: #54

Override annotation parameters via configuration properties

This works basically the same as in the Spring Zeebe project. So, you can override values in the @Coworker annotation with type foo like this:

zeebe.client.worker.override.foo.enabled=false

Note: you can't use the SpEL and properties placeholders in this value. You should return the same type in the @Coworker annotation. The exception is Duration. You should return Long values in milliseconds.

Annotation parameters

If you want to redefine org.camunda.community.extension.coworker.spring.annotation.Coworker parameters, you should use SPeL to define annotation values. See the property's JavaDoc for the type that should be resolved. Also, you may use property placeholders (${}) in the annotation parameters to replace them with configuration properties if needed.

As an example you may refer to the test.

Metrics

If you want to observe your coworkers, there is a port of some metrics from the Spring Zeebe project in the Coworker's Spring Boot Starter:

  1. camunda.job.invocations
    1. It supports the following tags:
      1. action - what happens to a job
        1. activated - The job was activated and started to process an item
        2. failed - The processing failed with some exception
      2. type - job's type

Missing Features

  • Coroutines native JobClient

kotlin-coworker's People

Contributors

aivinog1 avatar dependabot[bot] avatar v-b-a avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

kotlin-coworker's Issues

Get rid of `io.camunda.zeebe.spring.client.bean.MethodInfo#invoke`

We need to get rid of io.camunda.zeebe.spring.client.bean.MethodInfo#invoke call because coworkers can produce exceptions that are not subclasses of java.lang.RuntimeException. To mitigate this we create the org.camunda.community.extension.coworker.spring.error.DefaultSpringZeebeErrorHandler wrapper that strips off java.lang.RuntimeException.

It will be not a blocker since it will be done.

Add support for a nullable variables via `@Variable`

We can't just use:

@Coworker
suspend fun someMethod(jobClient: JobClient, job: ActivatedJob, @Variable someValue: SomeValue?)

Right now it isn't supported. To avoid NPE we need to rewrite the whole reflection part of methods calling. Could be difficult.

Coworker doesn't work on zeebe client 8.4.1

Hi,

I am trying to connect to zeebe version 8.4.1 with coworker and upload DMN resource.
Connection works, but when I try to upload resource like this

test("should upload resources") {
ZeebeClientSingleton.zeebeClient
.newDeployResourceCommand()
.addResourceFromClasspath("/gsm.dmn")
.send()
.join()
}

it returns this error

'io.camunda.zeebe.gateway.protocol.GatewayOuterClass$DeployResourceRequest$Builder io.camunda.zeebe.gateway.protocol.GatewayOuterClass$DeployResourceRequest$Builder.setTenantId(java.lang.String)' java.lang.NoSuchMethodError: 'io.camunda.zeebe.gateway.protocol.GatewayOuterClass$DeployResourceRequest$Builder io.camunda.zeebe.gateway.protocol.GatewayOuterClass$DeployResourceRequest$Builder.setTenantId(java.lang.String)' at io.camunda.zeebe.client.impl.command.DeployResourceCommandImpl.tenantId(DeployResourceCommandImpl.java:197) at io.camunda.zeebe.client.impl.command.DeployResourceCommandImpl.<init>(DeployResourceCommandImpl.java:65) at io.camunda.zeebe.client.impl.ZeebeClientImpl.newDeployResourceCommand(ZeebeClientImpl.java:291) at hr.ht.rnd.sandbox.zeebe.UploadResources$1$1.invokeSuspend(UploadResources.kt:14)

In my build.gradle I am forcing zeebe.client to use version 8.4.1

Problem is that zeebe-coworker uses version 8.2.7 and in version 8.4.1 there is no longer method setTenantId().

I pulled locally your main branch, updated zeebe.version in

<zeebe.version>8.2.7</zeebe.version>
to 8.4.1 and all tests are passing.

Can you please update zeebe.version and publish new version of library (0.5.1 presumably)?

Kind regards,
Ivan Sokol

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.