Coder Social home page Coder Social logo

adyliu / jafka Goto Github PK

View Code? Open in Web Editor NEW
718.0 124.0 322.0 2.87 MB

a fast and simple distributed publish-subscribe messaging system (mq)

Home Page: https://github.com/adyliu/jafka/wiki

License: Apache License 2.0

Shell 10.36% Python 2.39% Java 87.25%
jafka-mq kafka mq message-queue jms

jafka's Introduction

#A fast distributed messaging system (MQ)

License Build Status Maven Central

Jafka mq is a distributed publish-subscribe messaging system cloned from Apache Kafka.

So it has the following features:

  • Persistent messaging with O(1) disk structures that provide constant time performance even with many TB of stored messages.
  • High-throughput: even with very modest hardware single broker can support hundreds of thousands of messages per second.
  • Explicit support for partitioning messages over broker servers and distributing consumption over a cluster of consumer machines while maintaining per-partition ordering semantics.
  • Simple message format for many language clients.
  • Pure Java work

If you are interested in scala, please use the origin kafka at apache. Also it has a git repository at github.

News

[2019-05-13] released v3.0.6

Document & Wiki

Wiki: https://github.com/adyliu/jafka/wiki

Download

You can download the full package from Google Drive:

Maven & Gradle Dependencies

Maven

<dependency>
    <groupId>io.jafka</groupId>
    <artifactId>jafka</artifactId>
    <version>3.0.6</version>
</dependency>

Gradle

'io.jafka:jafka:3.0.6'

Contributor

  • @rockybean
  • @tiny657

License

Apache License 2.0 => https://github.com/adyliu/jafka/blob/master/LICENSE


[Keywords: jafka, kafka, messaging system, mq, jafka mq, sohu]

jafka's People

Contributors

adyliu avatar dependabot[bot] avatar javadevvictoria avatar rockybean avatar youngwan657 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

jafka's Issues

Error Exception

Exception(com.sohu.jafka.common.UnavaliableProducerException) is error and duplicates with 'com.sohu.jafka.common.UnavailableProducerException'.

Delete the file:

/jafka/src/main/java/com/sohu/jafka/common/UnavaliableProducerException.java

Jafka#start(Properties, Properties, Properties) 方法判断有误

此方法如下:

public void start(Properties mainProperties, Properties consumerProperties, Properties producerProperties) {
        final ServerConfig config = new ServerConfig(mainProperties);
        final ConsumerConfig consumerConfig = consumerProperties == null ? null : new ConsumerConfig(consumerProperties);
        final ProducerConfig producerConfig = consumerConfig == null ? null : new ProducerConfig(producerProperties);
        start(config, consumerConfig, producerConfig);
}

ProducerConfig producerConfig ... 这行代码判断是否有误,应该是:

final ProducerConfig producerConfig = producerProperties == null ? null : new ProducerConfig(producerProperties);

是否应该是判断 producerProperties == null 而不是 consumerConfig == null.

http client of producer

This fetures of http client:

  • send text message
  • send binary message
  • create topic
  • delete topic

关于历史message的问题

还是以前开发碰到的,如果一个消息已经上线很久,比如积累了100w条记录,一个新的消费者刚刚上线,这时有两种需求,一种是从头收取所有的消息,逐一处理(支持的很棒),而还有种需求,是不处理老数据,直接从新上线时处理才收到的msg。 这个好像还是不支持把,或者得用很复杂的方式支持?

Connection Refused error when try to run sample producer program remotely

I keep getting Connection refused error when try to run sample producer program from remote system as explained in Quick Tutorial demo.

The exception as follows:
Caused by: java.lang.RuntimeException: Connection refused: connect
at com.sohu.jafka.producer.SyncProducer.connect(SyncProducer.java:148)
at com.sohu.jafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.java:125)
at com.sohu.jafka.producer.SyncProducer.send(SyncProducer.java:97)

The following observations have been found in Server logs:

2012-09-13 12:52:48.783 INFO NIOServerCnxn$Factory Accepted socket connection from /10.94.22.50:30660
2012-09-13 12:52:48.790 INFO NIOServerCnxn Client attempting to establish new session at /10.94.22.50:30660
2012-09-13 12:52:48.795 INFO NIOServerCnxn Established session 0x139be612cff0006 with negotiated timeout 6000 for clien t /10.94.22.50:30660
2012-09-13 12:52:53.042 INFO PrepRequestProcessor Processed session termination for sessionid: 0x139be612cff0006
2012-09-13 12:52:53.047 INFO NIOServerCnxn Closed socket connection for client /10.94.22.50:30660 which had sessionid 0 x139be612cff0006

If I run the same program locally in server where Jafka process is running; I see no issues and able to send messages as expected. But If I run the same program remotely; I see connection issues.

I still debugging the issue; appreciate any comments or help in this regard.

Thank you,
Vish

高吞吐量的实现机制是什么?

对jafka的说明中有这样一条:高吞吐量:即使是低配制的硬件条件,单个Broker也能支持每秒数十万的消息吞吐。
请问高吞吐量的技术支撑是什么?nio吗?

Can not shutdown jvm

If the cache in the thread is full then the FetcherRunner thread is pause and never give up.

com.sohu.jafka.consumer.FetcherRunnable.fetchOnce()

fixed to :

        try {
            read += processMessages(messages, info);
        } catch (IOException e) {
            throw e;
        } catch (InterruptedException e) {
            if (!stopped) {
                logger.error("error in FetcherRunnable for " + info, e);
                info.enqueueError(e, info.getFetchedOffset());
            }
            throw e;
        }catch (RuntimeException e) {
            if (!stopped) {
                logger.error("error in FetcherRunnable for " + info, e);
                info.enqueueError(e, info.getFetchedOffset());
            }
            throw e;
        }

你好,有一个疑问想问下

我用的ZK3.5 ,jafka是1.4 .用demo里面的测试代码 发送消息 在获取brokerList 节点时 格式化错误,"partitions" 分区什么,的

一次 Fetch 只能获取一个 LogSegment 的内容吗?

在 Log.read(long offset, int length) 方法中看到一次 Fetch 操作只会二分寻找 offset 所在的那个 LogSegment 并最多只读到该日志段的末尾,如果 offset + length 横跨多个 LogSegment 的情况下该怎么处理?

关于 offsets 问题

消费者消费topic 中的一些消息 为什么zookeeper 中的offsets 是空的啊
ls /consumers
[test_group1, test_group123, test_group, test_group2, test_group1235]
ls /consumers/test_group
[ids, owners, offsets]
ls /consumers/test_group/offsets
[test1234, ly1234567, test123456, lin123, test, demo123, ly123456, ssss, MJ]
ls /consumers/test_group/offsets/lin123
[1-0]
ls /consumers/test_group/offsets/lin123/1-0
[]

The newly broker would not accept the old topic messages

If a newly broker starts up which means there is no file in its data directory, it cannot accept the previous topic messages because it could not register itself to /brokers/topics/[topic]/[broker_id]. As a result, it can only accept new topic messages. One solution is to create topic-partition directory in the data directory manually. Or is this the actual usage??

LogManager registers wrong partition Number

LogManager registers wrong partition number to zookeeper when it starts up in some cases. For example, a user enlarges the partition number ,such as 4 ,of a topic by the admin script. Then everything goes on well. But if the broker restarts, the partition number in zookeeper turns out to be 3. Problem occurs!

rebalance fail

zookeeperconsumerconnector$zkrebalancerlistener - [adyliu_tc_156_44-1340242264959-60554954] rebalanced failed. try #1, cost 12 ms

Question

Hello Team,

I was just thinking do you accept code contribution from new people or this project is open for reading but closed for contribution. If it is open I would like to explore this project. Waiting for your reply.

Thanks,
Siddharth

Producer cannot receive new topic

Producer sends some messages with new topic(not registed at broker).

The producer processes the partitions changing incorrectly if the broker creates this topic more than one partition.

See the code(https://github.com/adyliu/jafka/blob/v1.0/src/main/java/com/sohu/jafka/producer/ZKBrokerPartitionInfo.java):

class BrokerTopicsListener implements IZkChildListener {

    private Map<String, SortedSet<Partition>> originalBrokerTopicsParitions;

    private Map<Integer, Broker> originBrokerIds;

    public BrokerTopicsListener(Map<String, SortedSet<Partition>> originalBrokerTopicsParitions, Map<Integer, Broker> originBrokerIds) {
        super();
        this.originalBrokerTopicsParitions = originalBrokerTopicsParitions;
        this.originBrokerIds = originBrokerIds;

We need a new map before the producer creating a default partition for new topic.

    public BrokerTopicsListener(Map<String, SortedSet<Partition>> originalBrokerTopicsParitions,
            Map<Integer, Broker> originBrokerIds) {
        super();
        this.originalBrokerTopicsParitions = new HashMap<String, SortedSet<Partition>>(
                originalBrokerTopicsParitions);
        this.originBrokerIds = new HashMap<Integer, Broker>(originBrokerIds);

A simple message dumper

A simple message dumper is useful while debuging message content.

Dumping binary message maybe is difficult. But we can dump the string (utf-8) message.

topic creation

We need the function of topic creation.

Sending messages to the broker first time, we need to create the topic with given partition.

Though this operation can be finished in the broker configuration. But this need restarting.

This tools can enlarge the partition of exist topic also.

add topic deleter operation

Deleter operation can delete the topic who is never used any more.

The messages of this topic will be deleted also.

watcher tools

Creating a tool to watching the consumer status is amusive.

It can watch the consumer status of all topic with groups.

consumer的问题

[hadoop@ZK1 bin]$ ./consumer-console.sh --zookeeper 192.168.1.97:2181,192.168.1.96:2181,192.168.1.98:2181 --topic ly --from-beginning
log4j:WARN No appenders could be found for logger (com.sohu.jafka.consumer.ZookeeperConsumerConnector).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NullPointerException
at com.github.zkclient.ZkClient.deleteRecursive(ZkClient.java:442)
at com.sohu.jafka.console.ConsoleConsumer.tryCleanupZookeeper(ConsoleConsumer.java:174)
at com.sohu.jafka.console.ConsoleConsumer.main(ConsoleConsumer.java:106)

消费者接受消息为什么会报错啊,我用给的java demo 是可以接收的 ,但是这个却不可以。求解

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.