Coder Social home page Coder Social logo

Comments (9)

visortelle avatar visortelle commented on June 22, 2024

According to #19798, there is no need to specify persistent:// or non-persistent:// prefix in the topicsPattern

After discuss, we don't need this PIP, we just need to:

  • Add the warn log when the user-configured pattern contains a domain(‘persistent://public/default/topic.*')
  • Enhancement of the documentation, patternTopics cannot contain domains.

from pulsar.

ragaur-tibco avatar ragaur-tibco commented on June 22, 2024

ok Thank you @visortelle
But for non-persistent I was not able to do multi topic subscription and as you said "Once in about ~5 runs I see some messages from non-persistent topic"

means for non-persistent it is not working properly for non-persistent topic.

from pulsar.

visortelle avatar visortelle commented on June 22, 2024

@ragaur-tibco I completely agree and don't argue with that.

from pulsar.

visortelle avatar visortelle commented on June 22, 2024

@ragaur-tibco the current behavior is correct. See my comment here: #22527 (comment)

from pulsar.

visortelle avatar visortelle commented on June 22, 2024

@ragaur-tibco I fixed your code.

When using non-persistent delivery, killing a Pulsar broker or disconnecting a subscriber to a topic means that all in-transit messages are lost on that (non-persistent) topic, meaning that clients may see message loss.

Source: https://pulsar.apache.org/docs/next/cookbooks-non-persistent/#overview

In your code, the subscriber was created after messages were sent.

Code:

package b;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class App {
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String SUBSCRIPTION_NAME = "your-subscription";

    public static void main(String[] args) throws Exception {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();

        Producer<String> producerA = pulsarClient.newProducer(Schema.STRING)
                .topic("non-persistent://my-tenant/new-name/topic-non-1")
                .enableBatching(false).create();

        Producer<String> producerB = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://my-tenant/new-name/topic-pers-1")
                .enableBatching(false).create();

        Pattern allTopicsPattern = Pattern.compile("my-tenant/new-name/.*");

        Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(allTopicsPattern)
                .subscriptionName(SUBSCRIPTION_NAME)
                .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
                .subscribe();

        producerA.send("=========from topic non-persistent://my-tenant/new-name/topic-non-1 ");
        producerB.send("=========from topic persistent://my-tenant/new-name/topic-pers-1 ");

        while (true) {
            Message<byte[]> message = allTopicsConsumer.receive();
            System.out.println("Received message from topic " + message.getTopicName()
                    + ": " + new String(message.getValue()));
            allTopicsConsumer.acknowledge(message);
        }
    }
}

Logs:

  a mvn exec:java                                                                                                                                                   <aws:aws-superadmin> <region:us-east-2>
[INFO] Scanning for projects...
[INFO] 
[INFO] --------------------------------< c:a >---------------------------------
[INFO] Building a 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- exec-maven-plugin:3.2.0:java (default-cli) @ a ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received message from topic non-persistent://my-tenant/new-name/topic-non-1: =========from topic non-persistent://my-tenant/new-name/topic-non-1 
Received message from topic persistent://my-tenant/new-name/topic-pers-1: =========from topic persistent://my-tenant/new-name/topic-pers-1 

from pulsar.

visortelle avatar visortelle commented on June 22, 2024

@ragaur-tibco please check and let me know if it resolves the issue.

from pulsar.

ragaur-tibco avatar ragaur-tibco commented on June 22, 2024

Hi @visortelle

I tried creating subscriber before sending the messages

package Pulsar;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

public class AllTopicsConsumerExample {
	private static PulsarAdmin adm;
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String NAMESPACE = "my-tenant/new-name";
    private static final String SUBSCRIPTION_NAME = "your-subscription-1";

    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl(SERVICE_URL)
                .build();
        
        // Pattern allTopicsPattern = Pattern.compile("tenant-1/name/topic.*");

        Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern("tenant-1/name/topic.*").subscriptionType(SubscriptionType.Shared)
                .subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
                .subscribe();
        
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                .topic("non-persistent://tenant-1/name/topic-1")
                .enableBatching(false).create();
       
        System.out.println("new producer");
        producer.send("=========from topic non-persistent://tenant-1/name/topic-1 ");
    
        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
                .topic("persistent://tenant-1/name/topic-8")
                .enableBatching(false).create();
        producer1.send("=======from topic  persistent://tenant-1/name/topic-8 ");
        while (true) {
            Message<byte[]> message = allTopicsConsumer.receive();
            System.out.println("Received message from topic " + message.getTopicName()
                    + ": " + new String(message.getValue()));
            allTopicsConsumer.acknowledge(message);
        }
    }
}

response: only getting the response from persistent topic but not from non-persistent
image

from pulsar.

visortelle avatar visortelle commented on June 22, 2024

Interesting.

My observation is that after we create a pattern consumer, for an existing non-persistent topic it doesn't "immediately" create the underlying subscription and consumers if there are no connected producers at this moment. But it will eventually be created after a short time.

TIP: you can display the list of the underlying consumers by casting your consumer to PatternMultiTopicsConsumerImpl and calling the .getConsumers() method.

List<ConsumerImpl<byte[]>> consumers = 
    ((PatternMultiTopicsConsumerImpl<byte[]>) allTopicsConsumer).getConsumers();

for (ConsumerImpl<byte[]> consumer : consumers) {
    System.out.println("consumer: " + consumer.getTopic());
}

If you modify your code to send a lot of messages asynchronously, you'll start to receive them after a short time.

for (int i = 0; i < 100000; i++) {
    producer.sendAsync("=========from topic non-persistent://tenant-1/name/topic-1 " + i);
}
...
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10732
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10733
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10734
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10735
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10736
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10737
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10738
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10739
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10740
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10741
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10742
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10743
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10744
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10745
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10746

I don't know if it can qualify as a bug. cc @lhotari

@ragaur-tibco do you have a real use-case in mind? I wouldn't rely on non-persistent topics if losing a non-significant amount of messages could affect my application.

from pulsar.

visortelle avatar visortelle commented on June 22, 2024

Here is the reason. Before adding a topic to the topics list, it checks that the topic isActive(), which checks for !subscriptions.isEmpty() || hasLocalProducers();.

from pulsar.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.