Coder Social home page Coder Social logo

kafka-go's Introduction

concurrency kafka consumer

Go versions

kafka-go requires Go version 1.15 or later.

Producer

p := kafka.NewProducer(kafka.ProducerConfig{
    Version: "",
    Brokers: []string{broker},
})

_, _, err := p.Publish(context.Background(), topic, "key", []byte(string("val")))

Consumer

// config
singleConf := kafka.ConsumerConfig{
    Version:        "",
    Brokers:        []string{"127.0.0.1:9092"},
    Group:          "test-group",
    Topic:          "test",
    CacheCapacity:  100,
    ConnectTimeout: time.Millisecond * time.Duration(5000),
}
batchConf := kafka.BatchConsumerConf{
    CacheCapacity: 100,
    Consumers:     4,
    Processors:    4,
}

consumer := kafka.NewConsumer(singleConf, nil)
bc := kafka.NewBatchConsumer(batchConf, consumer, kafka.WithHandle(func(ctx context.Context, key string, data []byte) error {
    log.Info("receive msg:", "value", data)
    time.Sleep(time.Millisecond * 500)
    return nil
}))

bc.Start()

trace interceptor

imports(
	"github.com/oofpgDLD/kafka-go"
    "github.com/oofpgDLD/kafka-go/trace"
)

// new consumer with trace interceptor
func newConsumer(singleConf kafka.ConsumerConfig, batchConf kafka.BatchConsumerConf) {
    consumer := kafka.NewConsumer(singleConf, nil)
    bc := kafka.NewBatchConsumer(batchConf, consumer, kafka.WithHandle(func(ctx context.Context, key string, data []byte) error {
        log.Info("receive msg:", "value", data)
        time.Sleep(time.Millisecond * 500)
        return nil
    }), xkafka.WithBatchConsumerInterceptors(trace.ConsumeInterceptor))
}

// new producer with trace interceptor
func newProducer() {
    p := kafka.NewProducer(kafka.ProducerConfig{
        Version: "",
        Brokers: []string{broker},
    }, xkafka.WithProducerInterceptors(trace.ProducerInterceptor))
}

Test

[kafka producer and consumer test | kafka 消费者和生产者测试]

License

kafka-go is under the MIT license. See the LICENSE file for details.

kafka-go's People

Contributors

danielhookx avatar

Stargazers

 avatar owa avatar

Watchers

 avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.