Coder Social home page Coder Social logo

killme2008 / metamorphosis Goto Github PK

View Code? Open in Web Editor NEW
1.3K 267.0 694.0 3.21 MB

A high available,high performance distributed messaging system.

License: Apache License 2.0

C# 0.51% Python 1.30% Shell 0.37% Ruby 0.59% Java 95.33% Clojure 0.75% CSS 1.06% JavaScript 0.01% Protocol Buffer 0.03% Batchfile 0.05%

metamorphosis's Introduction

Logo

#新闻

#介绍

Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。

这是MetaQ的一个github分支,我做了部分优化和改进,并将meta发布到maven central repository,还做了一些wiki文档,我能承诺的是我个人会持续维护本项目。

MetaQ的淘宝版本已经发展到2.0乃至3.0版本,设计原则发生了变更,有兴趣的可以上它的官方网站看。

本分支应用在京东,支付宝,腾讯等公司的产品上。

#关于我

我在淘宝中间件部门工作了近3年,主要负责分布式消息中间件,高性能网络框架,分布式配置产品等的设计和开发,Metamorphosis是我在淘宝发起和实现的项目。目前我在AVOS.com工作。我的个人主页

#协议

基于Apache License 2.0

#下载

因为github不再提供下载服务,因此请前往googlecode下载。

或者从我的个人网站下载,请下载后自行对照md5checksum:

http://fnil.net/downloads/index.html

#更多

参见wiki

metamorphosis's People

Contributors

allwmh avatar gongweijun86 avatar hengyunabc avatar killme2008 avatar kimmking avatar losingle avatar wq163 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

metamorphosis's Issues

异步复制bug查找

异步复制,在运行一段时间后出现offset错乱的问题,需测试重现下。

Metamorphosis事务支持疑问Consumer

看到介绍说,meta支持事务
动手配置的时候也看到system.ini部分有事务参数的配置

实际编写java的producer部分,看到有事务支持的api

consumer部分没有相关事务支持或者ack相关的api,是否consumer层面不支持事务或者ack相关的特性?

btwl:consumer消费单条message,有没有超时的概念?

改进同步复制

  • 当slave写入失败的时候,将master标识为不可用,从zookeeper移除
  • 被标识为不可用的master-slave组,自行探测恢复,如果可以正常恢复,即自动加入到zookeeper
  • 如果不能恢复,需通知到相关人员。

如果product和broker都部署在一台机器上,是否可以通过localhost连接broker,不占用网络资源

有一种情况是product,本身就在监听一个端口获取信息后存入broker
为了不让这部分数据再在product和broker中传输占用网络带宽

是否可以
在重载PartitionSelector的getPartition方法后,选择一个本机brokerid的分区(product肯定有办法知道在本机运行的brokerid),让product通过localhost来连接brokered,而不是通过ip来连接,减少网络带宽占用

提供Java客户端使用详细指南

补充一些高级主题:

  • 自定义消息发送选择器
  • 订阅参数控制
  • 拒绝消费处理器
  • 中断处理
  • 消息回滚
  • storm集成
    等等。

提供强大友好的管理平台。

功能应该包括:

  • 集群状况查看
  • 单个broker的信息,包括统计等
  • consumer的队列长度查询
  • 在线修改参数并reload。
  • etc...

metamorphosis例子出错

com.taobao.metamorphosis.example.AsyncProducer

Type a message to send:
Send message failed,error message:等待响应超时
[2012-12-21 00:21:00,919] INFO 远端连接192.168.1.12:8123关闭,启动重连任务 (com.taobao.gecko.service.impl.GeckoHandler)
[2012-12-21 00:21:01,967] INFO 尝试重新连接192.168.1.12:8123 (com.taobao.gecko.service.impl.ReconnectManager)

metamophoisis的日志:

[ERROR] [notify-remoting-ReadEvent-2-thread-8] 12-21 00:20:57,597 [AbstractSession] - Decode error
java.lang.IllegalArgumentException: Illegal transaction key:1756872259
    at com.taobao.metamorphosis.transaction.TransactionId.valueOf(TransactionId.java:62)
    at com.taobao.metamorphosis.network.MetamorphosisWireFormatType$MetaCodecFactory$1.getTransactionId(MetamorphosisWireFormatType.java:299)
    at com.taobao.metamorphosis.network.MetamorphosisWireFormatType$MetaCodecFactory$1.decodePut(MetamorphosisWireFormatType.java:345)
    at com.taobao.metamorphosis.network.MetamorphosisWireFormatType$MetaCodecFactory$1.decode(MetamorphosisWireFormatType.java:145)
    at com.taobao.gecko.core.nio.impl.NioTCPSession.decode(NioTCPSession.java:328)
    at com.taobao.gecko.core.nio.impl.NioTCPSession.readFromBuffer(NioTCPSession.java:247)
    at com.taobao.gecko.core.nio.impl.AbstractNioSession.onRead(AbstractNioSession.java:309)
    at com.taobao.gecko.core.nio.impl.AbstractNioSession.onEvent(AbstractNioSession.java:508)
    at com.taobao.gecko.core.nio.impl.SocketChannelController.dispatchReadEvent(SocketChannelController.java:80)
    at com.taobao.gecko.core.nio.impl.NioController$ReadTask.run(NioController.java:139)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)

偶尔收到异常信息"Clock moved backwards. Refusing to generate id for xxxxx milliseconds"

客户端提交消息到broker端的时候,偶尔会收到类似这样的异常信息"Clock moved backwards. Refusing to generate id for xxxxx milliseconds"。我们这边会在每天晚上0点进行服务器时间同步调整,但这些异常是在晚上1点多报的。这之间是一直有消息在发送,如果是0点左右报错那可以解释,主要是到了1点多才报错。这个请确认一下。

新加入的consumer group如何进行offset指定

设置
deletePolicy=delete,168
系统运行一段时间后,新加入一个consumer group进行消费
此时新加入的group是从168小时前的那个message store文件开始消费
有什么办法可以在group第一次接入的时候,不从那个168小时前的文件进行消费?,而从当前最新message store的offset开始消费

消费者是否可以主动停止拉取?

sorry一共有6个问题,标题没办法都写下来

1
现在是否支持一个topic有n个消费者分组,这n个消费分组得到的消息是一样的。

2
消费者在拉取数据后实在无法完成消费(比如依赖的消费下线宕机),此时消费者是否有什么api,来主动暂停一会拉取的操作。

3
数据删除策略,是否是说系统根据删除策略,查找所有最后修改时间大于配置中规定的最长存活时间的数据文件,并且消费的offset不存在于这些文件中就进行删除?

4
手动调整了zookeeper上offset的值,如果把此值改小是否就可以做到,让某些消息被重复消费。如果把此值改大了是否可以做到,让某些信息跳过消费,然后手工删除跳过部分的数据文件(Broker是否会缓存offset)。

5
如果某个broker硬件损坏了(非磁盘问题),最好的补救方法是否是,用此broker的数据文件和配置的brokerid来clone此broker?如果我想把此数据文件拷到到其他broker,然后通过一些设置来实现救灾,是否是个好方案呢?

6
两台正常的broker,配置一个slaveGroup,这么这一个slaveGroup是否是备份2台的数据?

Meta 客户端关闭连接时,进入死循环。

"SinkRunner-PollingRunner-DefaultSinkProcessor" prio=10 tid=0x00002aaab0653000 nid=0x4141 runnable [0x0000000044169000]
java.lang.Thread.State: RUNNABLE
at com.taobao.gecko.core.util.LinkedTransferQueue$Itr.advance(LinkedTransferQueue.java:714)
at com.taobao.gecko.core.util.LinkedTransferQueue$Itr.(LinkedTransferQueue.java:691)
at com.taobao.gecko.core.util.LinkedTransferQueue.iterator(LinkedTransferQueue.java:673)
at com.taobao.gecko.core.core.impl.AbstractSession.close0(AbstractSession.java:350)
at com.taobao.gecko.core.nio.impl.AbstractNioSession.close0(AbstractNioSession.java:92)
at com.taobao.gecko.core.core.impl.AbstractController.stop(AbstractController.java:506)
at com.taobao.gecko.service.impl.BaseRemotingController.stop(BaseRemotingController.java:196)
- locked <0x00000007b153ac18> (a com.taobao.gecko.service.impl.DefaultRemotingClient)
at com.taobao.metamorphosis.client.RemotingClientWrapper.stop(RemotingClientWrapper.java:339)
at com.taobao.metamorphosis.client.MetaMessageSessionFactory.shutdown(MetaMessageSessionFactory.java:304)
at org.apache.flume.sink.MetaSink.destroyConnection(MetaSink.java:295)
at org.apache.flume.sink.MetaSink.process(MetaSink.java:496)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:662)

Locked ownable synchronizers:
- None
这个函数运行几个星期了。
class Itr implements Iterator {
QNode nextNode; // Next node to return next
QNode currentNode; // last returned node, for remove()
QNode prevNode; // predecessor of last returned node
E nextItem; // Cache of next item, once commited to in next

    Itr() {
        this.nextNode = traversalHead();
        advance();
    }

    E advance() {
        this.prevNode = this.currentNode;
        this.currentNode = this.nextNode;
        E x = this.nextItem;

        QNode p = this.nextNode.next;
        for (;;) {
            if (p == null || !p.isData) {
                this.nextNode = null;
                this.nextItem = null;
                return x;
            }
            Object item = p.get();
            if (item != p && item != null) {
                this.nextNode = p;
                this.nextItem = cast(item);
                return x;
            }
            this.prevNode = p;
            p = p.next;
        }
    }

    public boolean hasNext() {
        return this.nextNode != null;
    }

    public E next() {
        if (this.nextNode == null) {
            throw new NoSuchElementException();
        }
        return advance();
    }

    public void remove() {
        QNode p = this.currentNode;
        QNode prev = this.prevNode;
        if (prev == null || p == null) {
            throw new IllegalStateException();
        }
        Object x = p.get();
        if (x != null && x != p && p.compareAndSet(x, p)) {
            clean(prev, p);
        }
    }
}

这个for函数应该死循环了
哪位大侠帮忙查一下

public class LinkedTransferQueue extends AbstractQueue implements BlockingQueue {
package com.taobao.gecko.core.util;
LinkedTransferQueue 这个类在 com.taobao.gecko.core.util;包里面.

不容易也容易(183167601) 14:57:17
714行 是循环那里?
小规模(245885697) 14:58:15
对应for语句的 }
觉得一直在那个地方for

topic状态问题

目前topic没有状态,都是可接收可发送的,需要可控制,比如不可以发送,不可以接收这样的

不太确定这是否是setConsumeFromMaxOffset的一个bug

hi dennis
broker中,updateConsumerOffsets=false

我在consumer设置ConsumerConfig.setConsumeFromMaxOffset
然后启动consumer后,再关闭此consumer

关闭consumer后,product一条信息

此时再设置ConsumerConfig.setConsumeFromMaxOffset后启动consumer,会收到刚才发的那条信息

感觉这个setConsumeFromMaxOffset没起作用

不太确定这是否是setConsumeFromMaxOffset的一个bug

broker 关闭不够优雅

broker 目前的Stop方法,不够优雅,不能做到处理掉进来的请求,不接受新的请求,至少很突然的就全部关掉

版本化协议

为协议添加版本信息,为未来添加更多协议做准备,做好向前兼容。

Consumer实现中关于线程池的疑问

Consumer实现中,用到的MessageListener,其中,public Executor getExecutor(),返回处理消息的线程池。比如,我采用Executors.newFixedThreadPool(8)返回一个简单的线程池。可是通过计算消息的平均处理时间,发现性能并没有提高多少?这个地方,是否我理解有误?不能这样实现?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.