Coder Social home page Coder Social logo

killme2008 / metamorphosis Goto Github PK

View Code? Open in Web Editor NEW
1.3K 266.0 696.0 3.21 MB

A high available,high performance distributed messaging system.

License: Apache License 2.0

C# 0.51% Python 1.30% Shell 0.37% Ruby 0.59% Java 95.33% Clojure 0.75% CSS 1.06% JavaScript 0.01% Protocol Buffer 0.03% Batchfile 0.05%

metamorphosis's Issues

Meta 客户端关闭连接时,进入死循环。

"SinkRunner-PollingRunner-DefaultSinkProcessor" prio=10 tid=0x00002aaab0653000 nid=0x4141 runnable [0x0000000044169000]
java.lang.Thread.State: RUNNABLE
at com.taobao.gecko.core.util.LinkedTransferQueue$Itr.advance(LinkedTransferQueue.java:714)
at com.taobao.gecko.core.util.LinkedTransferQueue$Itr.(LinkedTransferQueue.java:691)
at com.taobao.gecko.core.util.LinkedTransferQueue.iterator(LinkedTransferQueue.java:673)
at com.taobao.gecko.core.core.impl.AbstractSession.close0(AbstractSession.java:350)
at com.taobao.gecko.core.nio.impl.AbstractNioSession.close0(AbstractNioSession.java:92)
at com.taobao.gecko.core.core.impl.AbstractController.stop(AbstractController.java:506)
at com.taobao.gecko.service.impl.BaseRemotingController.stop(BaseRemotingController.java:196)
- locked <0x00000007b153ac18> (a com.taobao.gecko.service.impl.DefaultRemotingClient)
at com.taobao.metamorphosis.client.RemotingClientWrapper.stop(RemotingClientWrapper.java:339)
at com.taobao.metamorphosis.client.MetaMessageSessionFactory.shutdown(MetaMessageSessionFactory.java:304)
at org.apache.flume.sink.MetaSink.destroyConnection(MetaSink.java:295)
at org.apache.flume.sink.MetaSink.process(MetaSink.java:496)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:662)

Locked ownable synchronizers:
- None
这个函数运行几个星期了。
class Itr implements Iterator {
QNode nextNode; // Next node to return next
QNode currentNode; // last returned node, for remove()
QNode prevNode; // predecessor of last returned node
E nextItem; // Cache of next item, once commited to in next

    Itr() {
        this.nextNode = traversalHead();
        advance();
    }

    E advance() {
        this.prevNode = this.currentNode;
        this.currentNode = this.nextNode;
        E x = this.nextItem;

        QNode p = this.nextNode.next;
        for (;;) {
            if (p == null || !p.isData) {
                this.nextNode = null;
                this.nextItem = null;
                return x;
            }
            Object item = p.get();
            if (item != p && item != null) {
                this.nextNode = p;
                this.nextItem = cast(item);
                return x;
            }
            this.prevNode = p;
            p = p.next;
        }
    }

    public boolean hasNext() {
        return this.nextNode != null;
    }

    public E next() {
        if (this.nextNode == null) {
            throw new NoSuchElementException();
        }
        return advance();
    }

    public void remove() {
        QNode p = this.currentNode;
        QNode prev = this.prevNode;
        if (prev == null || p == null) {
            throw new IllegalStateException();
        }
        Object x = p.get();
        if (x != null && x != p && p.compareAndSet(x, p)) {
            clean(prev, p);
        }
    }
}

这个for函数应该死循环了
哪位大侠帮忙查一下

public class LinkedTransferQueue extends AbstractQueue implements BlockingQueue {
package com.taobao.gecko.core.util;
LinkedTransferQueue 这个类在 com.taobao.gecko.core.util;包里面.

不容易也容易(183167601) 14:57:17
714行 是循环那里?
小规模(245885697) 14:58:15
对应for语句的 }
觉得一直在那个地方for

偶尔收到异常信息"Clock moved backwards. Refusing to generate id for xxxxx milliseconds"

客户端提交消息到broker端的时候,偶尔会收到类似这样的异常信息"Clock moved backwards. Refusing to generate id for xxxxx milliseconds"。我们这边会在每天晚上0点进行服务器时间同步调整,但这些异常是在晚上1点多报的。这之间是一直有消息在发送,如果是0点左右报错那可以解释,主要是到了1点多才报错。这个请确认一下。

metamorphosis例子出错

com.taobao.metamorphosis.example.AsyncProducer

Type a message to send:
Send message failed,error message:等待响应超时
[2012-12-21 00:21:00,919] INFO 远端连接192.168.1.12:8123关闭,启动重连任务 (com.taobao.gecko.service.impl.GeckoHandler)
[2012-12-21 00:21:01,967] INFO 尝试重新连接192.168.1.12:8123 (com.taobao.gecko.service.impl.ReconnectManager)

metamophoisis的日志:

[ERROR] [notify-remoting-ReadEvent-2-thread-8] 12-21 00:20:57,597 [AbstractSession] - Decode error
java.lang.IllegalArgumentException: Illegal transaction key:1756872259
    at com.taobao.metamorphosis.transaction.TransactionId.valueOf(TransactionId.java:62)
    at com.taobao.metamorphosis.network.MetamorphosisWireFormatType$MetaCodecFactory$1.getTransactionId(MetamorphosisWireFormatType.java:299)
    at com.taobao.metamorphosis.network.MetamorphosisWireFormatType$MetaCodecFactory$1.decodePut(MetamorphosisWireFormatType.java:345)
    at com.taobao.metamorphosis.network.MetamorphosisWireFormatType$MetaCodecFactory$1.decode(MetamorphosisWireFormatType.java:145)
    at com.taobao.gecko.core.nio.impl.NioTCPSession.decode(NioTCPSession.java:328)
    at com.taobao.gecko.core.nio.impl.NioTCPSession.readFromBuffer(NioTCPSession.java:247)
    at com.taobao.gecko.core.nio.impl.AbstractNioSession.onRead(AbstractNioSession.java:309)
    at com.taobao.gecko.core.nio.impl.AbstractNioSession.onEvent(AbstractNioSession.java:508)
    at com.taobao.gecko.core.nio.impl.SocketChannelController.dispatchReadEvent(SocketChannelController.java:80)
    at com.taobao.gecko.core.nio.impl.NioController$ReadTask.run(NioController.java:139)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)

不太确定这是否是setConsumeFromMaxOffset的一个bug

hi dennis
broker中,updateConsumerOffsets=false

我在consumer设置ConsumerConfig.setConsumeFromMaxOffset
然后启动consumer后,再关闭此consumer

关闭consumer后,product一条信息

此时再设置ConsumerConfig.setConsumeFromMaxOffset后启动consumer,会收到刚才发的那条信息

感觉这个setConsumeFromMaxOffset没起作用

不太确定这是否是setConsumeFromMaxOffset的一个bug

版本化协议

为协议添加版本信息,为未来添加更多协议做准备,做好向前兼容。

Metamorphosis事务支持疑问Consumer

看到介绍说,meta支持事务
动手配置的时候也看到system.ini部分有事务参数的配置

实际编写java的producer部分,看到有事务支持的api

consumer部分没有相关事务支持或者ack相关的api,是否consumer层面不支持事务或者ack相关的特性?

btwl:consumer消费单条message,有没有超时的概念?

topic状态问题

目前topic没有状态,都是可接收可发送的,需要可控制,比如不可以发送,不可以接收这样的

改进同步复制

  • 当slave写入失败的时候,将master标识为不可用,从zookeeper移除
  • 被标识为不可用的master-slave组,自行探测恢复,如果可以正常恢复,即自动加入到zookeeper
  • 如果不能恢复,需通知到相关人员。

如果product和broker都部署在一台机器上,是否可以通过localhost连接broker,不占用网络资源

有一种情况是product,本身就在监听一个端口获取信息后存入broker
为了不让这部分数据再在product和broker中传输占用网络带宽

是否可以
在重载PartitionSelector的getPartition方法后,选择一个本机brokerid的分区(product肯定有办法知道在本机运行的brokerid),让product通过localhost来连接brokered,而不是通过ip来连接,减少网络带宽占用

提供Java客户端使用详细指南

补充一些高级主题:

  • 自定义消息发送选择器
  • 订阅参数控制
  • 拒绝消费处理器
  • 中断处理
  • 消息回滚
  • storm集成
    等等。

异步复制bug查找

异步复制,在运行一段时间后出现offset错乱的问题,需测试重现下。

新加入的consumer group如何进行offset指定

设置
deletePolicy=delete,168
系统运行一段时间后,新加入一个consumer group进行消费
此时新加入的group是从168小时前的那个message store文件开始消费
有什么办法可以在group第一次接入的时候,不从那个168小时前的文件进行消费?,而从当前最新message store的offset开始消费

Consumer实现中关于线程池的疑问

Consumer实现中,用到的MessageListener,其中,public Executor getExecutor(),返回处理消息的线程池。比如,我采用Executors.newFixedThreadPool(8)返回一个简单的线程池。可是通过计算消息的平均处理时间,发现性能并没有提高多少?这个地方,是否我理解有误?不能这样实现?

消费者是否可以主动停止拉取?

sorry一共有6个问题,标题没办法都写下来

1
现在是否支持一个topic有n个消费者分组,这n个消费分组得到的消息是一样的。

2
消费者在拉取数据后实在无法完成消费(比如依赖的消费下线宕机),此时消费者是否有什么api,来主动暂停一会拉取的操作。

3
数据删除策略,是否是说系统根据删除策略,查找所有最后修改时间大于配置中规定的最长存活时间的数据文件,并且消费的offset不存在于这些文件中就进行删除?

4
手动调整了zookeeper上offset的值,如果把此值改小是否就可以做到,让某些消息被重复消费。如果把此值改大了是否可以做到,让某些信息跳过消费,然后手工删除跳过部分的数据文件(Broker是否会缓存offset)。

5
如果某个broker硬件损坏了(非磁盘问题),最好的补救方法是否是,用此broker的数据文件和配置的brokerid来clone此broker?如果我想把此数据文件拷到到其他broker,然后通过一些设置来实现救灾,是否是个好方案呢?

6
两台正常的broker,配置一个slaveGroup,这么这一个slaveGroup是否是备份2台的数据?

broker 关闭不够优雅

broker 目前的Stop方法,不够优雅,不能做到处理掉进来的请求,不接受新的请求,至少很突然的就全部关掉

提供强大友好的管理平台。

功能应该包括:

  • 集群状况查看
  • 单个broker的信息,包括统计等
  • consumer的队列长度查询
  • 在线修改参数并reload。
  • etc...

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.