Coder Social home page Coder Social logo

go-queue's Introduction

go-queue

dq

High available beanstalkd.

consumer example

consumer := dq.NewConsumer(dq.DqConf{
	Beanstalks: []dq.Beanstalk{
		{
			Endpoint: "localhost:11300",
			Tube:     "tube",
		},
		{
			Endpoint: "localhost:11300",
			Tube:     "tube",
		},
	},
	Redis: redis.RedisConf{
		Host: "localhost:6379",
		Type: redis.NodeType,
	},
})
consumer.Consume(func(body []byte) {
	fmt.Println(string(body))
})

producer example

producer := dq.NewProducer([]dq.Beanstalk{
	{
		Endpoint: "localhost:11300",
		Tube:     "tube",
	},
	{
		Endpoint: "localhost:11300",
		Tube:     "tube",
	},
})	

for i := 1000; i < 1005; i++ {
	_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second*5)
	if err != nil {
		fmt.Println(err)
	}
}

kq

Kafka Pub/Sub framework

consumer example

config.json

Name: kq
Brokers:
- 127.0.0.1:19092
- 127.0.0.1:19092
- 127.0.0.1:19092
Group: adhoc
Topic: kq
Offset: first
Consumers: 1

example code

var c kq.KqConf
conf.MustLoad("config.json", &c)

q := kq.MustNewQueue(c, kq.WithHandle(func(k, v string) error {
	fmt.Printf("=> %s\n", v)
	return nil
}))
defer q.Stop()
q.Start()

producer example

type message struct {
	Key     string `json:"key"`
	Value   string `json:"value"`
	Payload string `json:"message"`
}


pusher := kq.NewPusher([]string{
	"127.0.0.1:19092",
	"127.0.0.1:19092",
	"127.0.0.1:19092",
}, "kq")

ticker := time.NewTicker(time.Millisecond)
for round := 0; round < 3; round++ {
	select {
	case <-ticker.C:
		count := rand.Intn(100)
		m := message{
			Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
			Value:   fmt.Sprintf("%d,%d", round, count),
			Payload: fmt.Sprintf("%d,%d", round, count),
		}
		body, err := json.Marshal(m)
		if err != nil {
			log.Fatal(err)
		}

		fmt.Println(string(body))
		if err := pusher.Push(string(body)); err != nil {
			log.Fatal(err)
		}
	}
}
cmdline.EnterToContinue()

go-queue's People

Contributors

algobot76 avatar atlaschan000 avatar bittoy avatar ch3nnn avatar dylannew avatar iwctwbai avatar jiang4869 avatar juneezee avatar kevwan avatar m4n5ter avatar sgilson avatar sjatsh avatar zhangzy2345 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-queue's Issues

customized kafka message key

image

不太理解为什么不允许自定义的Key, 这对于某些必须保证消息有序性的场景来说非常不友好。 建议支持自定义的Key

kq如何订阅多个主题

conf.MustLoad("config.json", &c)只能配置一个,要订阅多个难道只有定义多个配置文件?

kq.KqConf.Consumers 感觉没什么意义啊

在kafkaQueue.startProducers中开Consumers个goroutine去拉取消息再向一个channel写,感觉没什么意义啊。FetchMessage本身也是从kafka-go的缓存channel里面获取消息,真实的拉取消息是kafka-go异步批量拉取的。从一个缓冲channel并发读再写入另一个缓冲channel,为什么不省掉这一步呢

关于分布式定时的疑问

作者你好:这里有几个疑问,希望帮忙解答
1.分布式定时是指的消息是分布式存储吗?
2.对于这个定时组件的理解就是:自己控制消息定时写入消息队列, 定时写入的并发的控制需要自己管理,这个组件不做干预。我的理解是正确的吗?
3.看例子:生产者生产消息是只能一个进程进行生产,才能做到消息定时写入到消息队列,如果生产者是多个节点,那消息是会写入多次吗? 这个组件是不是不能和 java 的 quartz 组件 功能完美匹配吗?

现在有一个诉求:在分布式服务架构的基础上,独立业务的数量可能很多,定时任务单独在该服务中实现,很可能会出现难以管理的情况,且避免不了由于定时任务的更改而导致的业务重启。这个库是不是不能解决这个诉求?

RabbitMQ

老大啥时候集成一下RabbitMQ

kq在使用中的一些疑惑

在go-queue中如果生产kafka消息失败了,现在好像只有失败的日志,你们是如何处理生产kafka消息失败逻辑的

关于beanstalkd 中源码部分的疑惑

image
上面这个是在insert函数中的部分代码,其中
p.Revoke(jointId) 其中调用这个函数是因为当只有一个tube存在的时候的情况嘛 ,这里看的有点疑惑 (Tubes 会在你使用到它的时候创建,如果 Tube 变空了(没有 ready Job ,没有 delayed Job , 没有 buried Job),且没有客户端连接指向它,它就会被删掉。) ,

关于使用dq的问题请教

在如下场景中:
比如我查询的记录是 A B C 三条
这时候我把A B C 三条记录循环依次调用Delay()存到beanstalk中,然后我在处理A,B,C更新逻辑,这时候假如A,B更新正常,C更新失败,我想把C从beanstalk删除,不希望C被消费,这时候怎么删除,我看了下暴露的api里面好像没有这么的操作

RabbitListener can not Manual ack calls are supported

In RabbitListener I see the following

go func() {
			for d := range msg {

				if err := q.handler.Consume(string(d.Body)); err != nil {
					logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err)
				}
			}
		}()

Is it possible to call AutoAck manually like this:

go func() {
			for d := range msg {

				if err := q.handler.Consume(string(d.Body)); err != nil {
					logx.Errorf("Error on consuming: %s, error: %v", string(d.Body), err)
				}else if !que.AutoAck {
					d.Ack(que.AutoAck)
				}
			}
		}()

dq example 疑问

NewProducer 方法里面要求 Endpoint 必须大于 2 且 Endpoints 必须是多个不同的实例,那是不是需要一个 beanstalkd 集群呢?

func NewProducer(beanstalks []Beanstalk) Producer {
	if len(beanstalks) < minWrittenNodes {
		log.Fatalf("nodes must be equal or greater than %d", minWrittenNodes)
	}
	var nodes []Producer
	producers := make(map[string]lang.PlaceholderType)
	for _, node := range beanstalks {
		if _, ok := producers[node.Endpoint]; ok {
			log.Fatal("all node endpoints must be different")
		}

		producers[node.Endpoint] = lang.Placeholder
		nodes = append(nodes, NewProducerNode(node.Endpoint, node.Tube))
	}

	return &producerCluster{nodes: nodes}
}

看了一下,好像没有 beanstalkd 集群部署方式? 都是多个 beanstalkd 实例,那假如某个 beanstalkd 实例宕机了,数据不就丢失了吗

kq下多Processors位移提交的问题

感谢对kafka-go进行服务化封装,的确用起来更简单了!

我对消费代码有个疑惑,望解答:

for i := 0; i < q.c.Processors; i++ {
		q.consumerRoutines.Run(func() {
			for msg := range q.channel {
				if err := q.consumeOne(string(msg.Key), string(msg.Value)); err != nil {
					logx.Errorf("Error on consuming: %s, error: %v", string(msg.Value), err)
				}
				q.consumer.CommitMessages(context.Background(), msg)
			}
		})
	}

多个goroutine并行提交位移是否有问题?如提前把大位移提交导致丢消息,或滞后把小位移提交导致重复消费。

kq的报错机制似乎并不完善?

比如kafka服务器挂掉的情形,我们调用者有办法获取到对应的错误吗?因为生产者客户端的缓冲机制,Push方法并没有真正的推送消息,所以拿不到错误,这点可以理解;但是在执行缓冲的Flush方法中,推送出错时似乎也只是打印了一下日志,并没有通过任何方式将错误传出来?所以目前kq客户端的调用者是不是完全没办法获取到Flush中的错误状态?以及后续是不是可以考虑增加一个eventBus之类的对象,以便我们调用者挂上监听获取kq的错误状态?谢谢

push.close() 后程序退出丢失消息

go version go1.19.2 linux/amd64
  • 示例
package main

import (
	"fmt"
	"log"
	"time"

	"github.com/zeromicro/go-queue/kq"
)

func main() {

	push := kq.NewPusher([]string{"localhost:9092"}, "pushtest",
		kq.WithChunkSize(5000000),
		kq.WithFlushInterval(5 * time.Second))
	defer push.Close()

	for i:=0; i<10; i++ {
		err := push.Push(fmt.Sprintf("Hello world: %d", i))
		if err != nil {
			log.Panic(err)
		}
	}

}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.