Comments (8)
肯定可以的。因为zk记录了各个消费者的偏移量,消费者再启动时会从上次的偏移量开始消费消息。
如果有疑问,可贴出代码来分析下。
from jafka.
Properties props = new Properties();
props.put("zk.connect", zkHosts);
props.put("groupid", groupId.groupID());
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector connector = Consumer.create(config);
Map<String, List<MessageStream>> topicStreams = connector.createMessageStreams(ImmutableMap.of(groupId.topic().topicName(), 3), new StringDecoder());
List<MessageStream> streams = topicStreams.get(groupId.topic().topicName());
ExecutorService executor = Executors.newFixedThreadPool(3);
for (final MessageStream stream : streams) {
executor.submit(new Runnable() {
@Override
public void run() {
for (String msg : stream) {
System.err.println(msg);
}
}
});
}
这是我根据gi thub例子写的,麻烦给分析分析
from jafka.
如果把组ID换个新的,就能全部收到!
from jafka.
broker对应的topic有几个分区partition?
ExecutorService executor = Executors.newFixedThreadPool(3);
全部消费端的线程数加起来和 要比 broker的所有partition加起来和要多。
另外,有几个消费端?每个消费端都不消费消息么?
日志有什么WARN 提示?
from jafka.
broker 启动了三个,每个broker partition默认1个,消费端1,2个都试了! 但是我生产端发送过消息之后应用就停了。
from jafka.
broker和consumer 日志中是否有ERROR/WARN 可供分析?
from jafka.
2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-0 for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-1 for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-4 for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-5 for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-2 for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-3 for topic test
[2016-01-28 18/:51/:38]WARN com.sohu.j
from jafka.
看起来像是zookeeper的配置问题,consumer连接的zk和broker发送的zk不是同一个。QQ ④①〇⑨④③②〇〇 临时沟通下
from jafka.
Related Issues (20)
- jafka的zk中broker信息存储的hostname和port,为什么不是ip:port HOT 2
- jafka实现了主备高可用机制了吗?
- 一次 Fetch 只能获取一个 LogSegment 的内容吗? HOT 1
- 是否支持分区备份及选 leader HOT 2
- Jafka#start(Properties, Properties, Properties) 方法判断有误 HOT 1
- 高吞吐量的实现机制是什么? HOT 2
- 关于 offsets 问题 HOT 1
- consumer的问题 HOT 13
- producer到broker消息质量级别仅仅支持‘At most once’这种级别吗
- EmbeddedConsumer 构造函数中 blackListTopics 取值错误 HOT 2
- Processor#run() 方法的改进的一个建议
- jafka 消息的存储地址 问题 HOT 1
- 不同组的Consumer订阅相同的一个或者多个topic问题 HOT 4
- 还在维护吗?目前版本兼容于kafka的哪个版本? HOT 1
- 升级到Java 8+
- Is there any aarch64 config file under bin/ ? (jafka aarch64 配置文件支持)
- 支持集群模式吗,支持Topic副本吗
- 请问支持windows64位系统吗?如果不支持,该怎么做? HOT 1
- 测试用例的运行 HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from jafka.