Comments (9)
According to #19798, there is no need to specify persistent://
or non-persistent://
prefix in the topicsPattern
After discuss, we don't need this PIP, we just need to:
- Add the warn log when the user-configured pattern contains a domain(‘persistent://public/default/topic.*')
- Enhancement of the documentation, patternTopics cannot contain domains.
from pulsar.
ok Thank you @visortelle
But for non-persistent I was not able to do multi topic subscription and as you said "Once in about ~5 runs I see some messages from non-persistent topic"
means for non-persistent it is not working properly for non-persistent topic.
from pulsar.
@ragaur-tibco I completely agree and don't argue with that.
from pulsar.
@ragaur-tibco the current behavior is correct. See my comment here: #22527 (comment)
from pulsar.
@ragaur-tibco I fixed your code.
When using non-persistent delivery, killing a Pulsar broker or disconnecting a subscriber to a topic means that all in-transit messages are lost on that (non-persistent) topic, meaning that clients may see message loss.
Source: https://pulsar.apache.org/docs/next/cookbooks-non-persistent/#overview
In your code, the subscriber was created after messages were sent.
Code:
package b;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class App {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String SUBSCRIPTION_NAME = "your-subscription";
public static void main(String[] args) throws Exception {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Producer<String> producerA = pulsarClient.newProducer(Schema.STRING)
.topic("non-persistent://my-tenant/new-name/topic-non-1")
.enableBatching(false).create();
Producer<String> producerB = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-tenant/new-name/topic-pers-1")
.enableBatching(false).create();
Pattern allTopicsPattern = Pattern.compile("my-tenant/new-name/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(allTopicsPattern)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();
producerA.send("=========from topic non-persistent://my-tenant/new-name/topic-non-1 ");
producerB.send("=========from topic persistent://my-tenant/new-name/topic-pers-1 ");
while (true) {
Message<byte[]> message = allTopicsConsumer.receive();
System.out.println("Received message from topic " + message.getTopicName()
+ ": " + new String(message.getValue()));
allTopicsConsumer.acknowledge(message);
}
}
}
Logs:
a mvn exec:java <aws:aws-superadmin> <region:us-east-2>
[INFO] Scanning for projects...
[INFO]
[INFO] --------------------------------< c:a >---------------------------------
[INFO] Building a 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- exec-maven-plugin:3.2.0:java (default-cli) @ a ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received message from topic non-persistent://my-tenant/new-name/topic-non-1: =========from topic non-persistent://my-tenant/new-name/topic-non-1
Received message from topic persistent://my-tenant/new-name/topic-pers-1: =========from topic persistent://my-tenant/new-name/topic-pers-1
from pulsar.
@ragaur-tibco please check and let me know if it resolves the issue.
from pulsar.
Hi @visortelle
I tried creating subscriber before sending the messages
package Pulsar;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class AllTopicsConsumerExample {
private static PulsarAdmin adm;
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String NAMESPACE = "my-tenant/new-name";
private static final String SUBSCRIPTION_NAME = "your-subscription-1";
public static void main(String[] args) throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
// Pattern allTopicsPattern = Pattern.compile("tenant-1/name/topic.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern("tenant-1/name/topic.*").subscriptionType(SubscriptionType.Shared)
.subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("non-persistent://tenant-1/name/topic-1")
.enableBatching(false).create();
System.out.println("new producer");
producer.send("=========from topic non-persistent://tenant-1/name/topic-1 ");
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://tenant-1/name/topic-8")
.enableBatching(false).create();
producer1.send("=======from topic persistent://tenant-1/name/topic-8 ");
while (true) {
Message<byte[]> message = allTopicsConsumer.receive();
System.out.println("Received message from topic " + message.getTopicName()
+ ": " + new String(message.getValue()));
allTopicsConsumer.acknowledge(message);
}
}
}
response: only getting the response from persistent topic but not from non-persistent
from pulsar.
Interesting.
My observation is that after we create a pattern consumer, for an existing non-persistent topic it doesn't "immediately" create the underlying subscription and consumers if there are no connected producers at this moment. But it will eventually be created after a short time.
TIP: you can display the list of the underlying consumers by casting your consumer to PatternMultiTopicsConsumerImpl
and calling the .getConsumers()
method.
List<ConsumerImpl<byte[]>> consumers =
((PatternMultiTopicsConsumerImpl<byte[]>) allTopicsConsumer).getConsumers();
for (ConsumerImpl<byte[]> consumer : consumers) {
System.out.println("consumer: " + consumer.getTopic());
}
If you modify your code to send a lot of messages asynchronously, you'll start to receive them after a short time.
for (int i = 0; i < 100000; i++) {
producer.sendAsync("=========from topic non-persistent://tenant-1/name/topic-1 " + i);
}
...
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10732
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10733
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10734
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10735
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10736
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10737
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10738
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10739
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10740
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10741
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10742
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10743
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10744
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10745
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10746
I don't know if it can qualify as a bug. cc @lhotari
@ragaur-tibco do you have a real use-case in mind? I wouldn't rely on non-persistent topics if losing a non-significant amount of messages could affect my application.
from pulsar.
Here is the reason. Before adding a topic to the topics list, it checks that the topic isActive()
, which checks for !subscriptions.isEmpty() || hasLocalProducers();
.
from pulsar.
Related Issues (20)
- [Bug] maven build fails with Java 22 HOT 1
- [Bug] nslookup in apachepulsar/pulsar:3.3.0 isn't compatible with kubernetes search domains
- Pulsar Standalone: --wipe-data does not work with RocksDB backend in 3.2.3
- [Doc] Search doesn't work on pulsar website HOT 2
- [Bug] [docs] Pulsar 3.3 javadoc is in Chinese HOT 1
- [Bug] Broker became irresponsive due to too many open files error HOT 2
- [Doc] Document the removal of compaction
- [Bug] Major compaction is not recovered automatically after the disk is writable again
- [Bug] `status.html` can't access using 3.3.0 image
- [Bug] Ledger can not recover with Digest Mismatch Error HOT 5
- [Bug] Pulsar Functions Runtime doesn't properly enable direct byte buffer access for Netty on Java 17+
- [Bug] Pulsar broker CPU stratification problem HOT 5
- [Bug] [broker] broker log a full thread dump when a deadlock is detected in healthcheck every time
- [Doc][Improve] Backlog increase during subscription replication
- [Doc] add golang in transaction support list
- [improve]Perform health checks on the endpoints passed in by serviceUrl
- [Bug] Unable to initialize Stream metadata
- Jetty Upgrade: 12.x.x or latest HOT 4
- [Bug][broker] cursor will read in dead loop when do tailing-read with enableTransaction
- [Bug] Pulsar Functions ignores compressionType and crypto config for producers created with Context produce/newOutputMessage methods
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from pulsar.