killme2008 / metamorphosis Goto Github PK
View Code? Open in Web Editor NEWA high available,high performance distributed messaging system.
License: Apache License 2.0
A high available,high performance distributed messaging system.
License: Apache License 2.0
Added a new stats item "reset" to reset realtime statistics.
"SinkRunner-PollingRunner-DefaultSinkProcessor" prio=10 tid=0x00002aaab0653000 nid=0x4141 runnable [0x0000000044169000]
java.lang.Thread.State: RUNNABLE
at com.taobao.gecko.core.util.LinkedTransferQueue$Itr.advance(LinkedTransferQueue.java:714)
at com.taobao.gecko.core.util.LinkedTransferQueue$Itr.(LinkedTransferQueue.java:691)
at com.taobao.gecko.core.util.LinkedTransferQueue.iterator(LinkedTransferQueue.java:673)
at com.taobao.gecko.core.core.impl.AbstractSession.close0(AbstractSession.java:350)
at com.taobao.gecko.core.nio.impl.AbstractNioSession.close0(AbstractNioSession.java:92)
at com.taobao.gecko.core.core.impl.AbstractController.stop(AbstractController.java:506)
at com.taobao.gecko.service.impl.BaseRemotingController.stop(BaseRemotingController.java:196)
- locked <0x00000007b153ac18> (a com.taobao.gecko.service.impl.DefaultRemotingClient)
at com.taobao.metamorphosis.client.RemotingClientWrapper.stop(RemotingClientWrapper.java:339)
at com.taobao.metamorphosis.client.MetaMessageSessionFactory.shutdown(MetaMessageSessionFactory.java:304)
at org.apache.flume.sink.MetaSink.destroyConnection(MetaSink.java:295)
at org.apache.flume.sink.MetaSink.process(MetaSink.java:496)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:662)
Locked ownable synchronizers:
- None
这个函数运行几个星期了。
class Itr implements Iterator {
QNode nextNode; // Next node to return next
QNode currentNode; // last returned node, for remove()
QNode prevNode; // predecessor of last returned node
E nextItem; // Cache of next item, once commited to in next
Itr() {
this.nextNode = traversalHead();
advance();
}
E advance() {
this.prevNode = this.currentNode;
this.currentNode = this.nextNode;
E x = this.nextItem;
QNode p = this.nextNode.next;
for (;;) {
if (p == null || !p.isData) {
this.nextNode = null;
this.nextItem = null;
return x;
}
Object item = p.get();
if (item != p && item != null) {
this.nextNode = p;
this.nextItem = cast(item);
return x;
}
this.prevNode = p;
p = p.next;
}
}
public boolean hasNext() {
return this.nextNode != null;
}
public E next() {
if (this.nextNode == null) {
throw new NoSuchElementException();
}
return advance();
}
public void remove() {
QNode p = this.currentNode;
QNode prev = this.prevNode;
if (prev == null || p == null) {
throw new IllegalStateException();
}
Object x = p.get();
if (x != null && x != p && p.compareAndSet(x, p)) {
clean(prev, p);
}
}
}
这个for函数应该死循环了
哪位大侠帮忙查一下
public class LinkedTransferQueue extends AbstractQueue implements BlockingQueue {
package com.taobao.gecko.core.util;
LinkedTransferQueue 这个类在 com.taobao.gecko.core.util;包里面.
不容易也容易(183167601) 14:57:17
714行 是循环那里?
小规模(245885697) 14:58:15
对应for语句的 }
觉得一直在那个地方for
整合tools和server-wrapper工程,提供统一的脚本用于管理broker。
获取所有可用的分区列表,获取所有分区列表两个新的API,consumer和producer都应当可用。
客户端提交消息到broker端的时候,偶尔会收到类似这样的异常信息"Clock moved backwards. Refusing to generate id for xxxxx milliseconds"。我们这边会在每天晚上0点进行服务器时间同步调整,但这些异常是在晚上1点多报的。这之间是一直有消息在发送,如果是0点左右报错那可以解释,主要是到了1点多才报错。这个请确认一下。
在brokerl列表变动的时候,减少客户端异常的产生。
1.替换新的列表
2.关闭不存在的broker
减少这个时间差内发送失败的消息。
com.taobao.metamorphosis.example.AsyncProducer
Type a message to send:
Send message failed,error message:等待响应超时
[2012-12-21 00:21:00,919] INFO 远端连接192.168.1.12:8123关闭,启动重连任务 (com.taobao.gecko.service.impl.GeckoHandler)
[2012-12-21 00:21:01,967] INFO 尝试重新连接192.168.1.12:8123 (com.taobao.gecko.service.impl.ReconnectManager)
metamophoisis的日志:
[ERROR] [notify-remoting-ReadEvent-2-thread-8] 12-21 00:20:57,597 [AbstractSession] - Decode error
java.lang.IllegalArgumentException: Illegal transaction key:1756872259
at com.taobao.metamorphosis.transaction.TransactionId.valueOf(TransactionId.java:62)
at com.taobao.metamorphosis.network.MetamorphosisWireFormatType$MetaCodecFactory$1.getTransactionId(MetamorphosisWireFormatType.java:299)
at com.taobao.metamorphosis.network.MetamorphosisWireFormatType$MetaCodecFactory$1.decodePut(MetamorphosisWireFormatType.java:345)
at com.taobao.metamorphosis.network.MetamorphosisWireFormatType$MetaCodecFactory$1.decode(MetamorphosisWireFormatType.java:145)
at com.taobao.gecko.core.nio.impl.NioTCPSession.decode(NioTCPSession.java:328)
at com.taobao.gecko.core.nio.impl.NioTCPSession.readFromBuffer(NioTCPSession.java:247)
at com.taobao.gecko.core.nio.impl.AbstractNioSession.onRead(AbstractNioSession.java:309)
at com.taobao.gecko.core.nio.impl.AbstractNioSession.onEvent(AbstractNioSession.java:508)
at com.taobao.gecko.core.nio.impl.SocketChannelController.dispatchReadEvent(SocketChannelController.java:80)
at com.taobao.gecko.core.nio.impl.NioController$ReadTask.run(NioController.java:139)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
hi dennis
broker中,updateConsumerOffsets=false
我在consumer设置ConsumerConfig.setConsumeFromMaxOffset
然后启动consumer后,再关闭此consumer
关闭consumer后,product一条信息
此时再设置ConsumerConfig.setConsumeFromMaxOffset后启动consumer,会收到刚才发的那条信息
感觉这个setConsumeFromMaxOffset没起作用
不太确定这是否是setConsumeFromMaxOffset的一个bug
请问python client的consumer部分有时间表吗?
为协议添加版本信息,为未来添加更多协议做准备,做好向前兼容。
一个broker的磁盘空间满后,客户端不能够感知;消息仍然会往该broker写;集群性能下降。需要智能剔除该broker。
Added a new stats item "help" to print help menu,just like:
stats help
=>
valid items:
realtime -- Realtime statistics
topics -- Topics statistics
config -- server.ini content
As caption, 在metamorphosis-server-wrapper中出现引用错误,请问这个以github.killme2008的包是哪个开源包还是未公布的库?
看到介绍说,meta支持事务
动手配置的时候也看到system.ini部分有事务参数的配置
实际编写java的producer部分,看到有事务支持的api
consumer部分没有相关事务支持或者ack相关的api,是否consumer层面不支持事务或者ack相关的特性?
btwl:consumer消费单条message,有没有超时的概念?
用户可指定备份间隔。
服务端配置参数校验,尽量在启动的时候发现问题。
可设置全局的实时统计
stat=true
目前topic没有状态,都是可接收可发送的,需要可控制,比如不可以发送,不可以接收这样的
类似JMS规范中TopicBrowser
metaServer.sh新增命令slave-status,打印异步复制状态是否正常。
包括:
如题
如题
When slave crash,producer must not keep seeding message to master when using sync replications
有一种情况是product,本身就在监听一个端口获取信息后存入broker
为了不让这部分数据再在product和broker中传输占用网络带宽
是否可以
在重载PartitionSelector的getPartition方法后,选择一个本机brokerid的分区(product肯定有办法知道在本机运行的brokerid),让product通过localhost来连接brokered,而不是通过ip来连接,减少网络带宽占用
在消息少量的时候,做到更好的负载均衡。
补充一些高级主题:
更友好地支持spring框架。
异步复制,在运行一段时间后出现offset错乱的问题,需测试重现下。
Move meta.get.tellMaxOffset
to server.ini
如题
com.taobao.gecko:gecko:jar:1.1.1-SNAPSHOT
消除不能使用broker集群的缺陷。
设置
deletePolicy=delete,168
系统运行一段时间后,新加入一个consumer group进行消费
此时新加入的group是从168小时前的那个message store文件开始消费
有什么办法可以在group第一次接入的时候,不从那个168小时前的文件进行消费?,而从当前最新message store的offset开始消费
提供一个新的消息接收监听器,可设置rollback替代抛出异常来回滚消息。
防止在PartitionSelector内修改message的任何属性。
Consumer实现中,用到的MessageListener,其中,public Executor getExecutor(),返回处理消息的线程池。比如,我采用Executors.newFixedThreadPool(8)返回一个简单的线程池。可是通过计算消息的平均处理时间,发现性能并没有提高多少?这个地方,是否我理解有误?不能这样实现?
sorry一共有6个问题,标题没办法都写下来
1
现在是否支持一个topic有n个消费者分组,这n个消费分组得到的消息是一样的。
2
消费者在拉取数据后实在无法完成消费(比如依赖的消费下线宕机),此时消费者是否有什么api,来主动暂停一会拉取的操作。
3
数据删除策略,是否是说系统根据删除策略,查找所有最后修改时间大于配置中规定的最长存活时间的数据文件,并且消费的offset不存在于这些文件中就进行删除?
4
手动调整了zookeeper上offset的值,如果把此值改小是否就可以做到,让某些消息被重复消费。如果把此值改大了是否可以做到,让某些信息跳过消费,然后手工删除跳过部分的数据文件(Broker是否会缓存offset)。
5
如果某个broker硬件损坏了(非磁盘问题),最好的补救方法是否是,用此broker的数据文件和配置的brokerid来clone此broker?如果我想把此数据文件拷到到其他broker,然后通过一些设置来实现救灾,是否是个好方案呢?
6
两台正常的broker,配置一个slaveGroup,这么这一个slaveGroup是否是备份2台的数据?
Added checksum to put command for server validation.
broker 目前的Stop方法,不够优雅,不能做到处理掉进来的请求,不接受新的请求,至少很突然的就全部关掉
Run meta broker on windows machine.Maybe we have to modify some config options and scripts.
现在的group commit实现是利用单独的一个线程,考虑在写的线程上做group commit,减少线程数。
功能应该包括:
Start metaq in standalone mode that embed a zookeeper server.
如题
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.