Coder Social home page Coder Social logo

apache / pulsar-client-cpp Goto Github PK

View Code? Open in Web Editor NEW
51.0 36.0 58.0 6.31 MB

Apache Pulsar C++ client library

Home Page: https://pulsar.apache.org/

License: Apache License 2.0

CMake 1.62% Shell 1.75% Python 0.27% Dockerfile 0.40% C++ 92.65% C 3.30%
event-streaming messaging pubsub pulsar queuing streaming

pulsar-client-cpp's Introduction

Pulsar C++ client library

Pulsar C++ clients support a variety of Pulsar features to enable building applications connecting to your Pulsar cluster.

For the supported Pulsar features, see Client Feature Matrix.

For how to use APIs to publish and consume messages, see examples.

Import the library into your project

CMake with vcpkg integration

Navigate to vcpkg-example for how to import the pulsar-client-cpp into your project via vcpkg.

Download pre-built binaries

For non-vcpkg projects, you can download pre-built binaries from the official release page.

Generate the API documents

Pulsar C++ client uses doxygen to build API documents. After installing doxygen, you only need to run doxygen to generate the API documents whose main page is under the doxygen/html/index.html path.

Build with vcpkg

Since it's integrated with vcpkg, see vcpkg#README for the requirements. See LEGACY_BUILD if you want to manage dependencies by yourself or you cannot install vcpkg in your own environment.

How to build from source

The simplest way is to clone this project with the vcpkg submodule.

git clone https://github.com/apache/pulsar-client-cpp.git
cd pulsar-client-cpp
git submodule update --init --recursive
cmake -B build -DINTEGRATE_VCPKG=ON
cmake --build build -j8

The 1st step will download vcpkg and then install all dependencies according to the version description in vcpkg.json. The 2nd step will build the Pulsar C++ libraries under ./build/lib/, where ./build is the CMake build directory.

You can also add the CMAKE_TOOLCHAIN_FILE option if your system already have vcpkg installed.

git clone https://github.com/apache/pulsar-client-cpp.git
cd pulsar-client-cpp
# For example, you can install vcpkg in /tmp/vcpkg
cd /tmp && git clone https://github.com/microsoft/vcpkg.git && cd -
cmake -B build -DINTEGRATE_VCPKG=ON -DCMAKE_TOOLCHAIN_FILE="/tmp/vcpkg/scripts/buildsystems/vcpkg.cmake"
cmake --build build -j8

After the build, the hierarchy of the build directory will be:

build/
  include/   -- extra C++ headers
  lib/       -- libraries
  tests/     -- test executables
  examples/  -- example executables
  generated/
    lib/     -- protobuf source files for PulsarApi.proto
    tests/   -- protobuf source files for *.proto used in tests

How to install

To install the C++ headers and libraries into a specific path, e.g. /tmp/pulsar, run the following commands:

cmake -B build -DINTEGRATE_VCPKG=ON -DCMAKE_INSTALL_PREFIX=/tmp/pulsar
cmake --build build -j8 --target install

For example, on macOS you will see:

/tmp/pulsar/
  include/pulsar     -- C/C++ headers
  lib/
    libpulsar.a      -- Static library
    libpulsar.dylib  -- Dynamic library

Tests

Tests are built by default. You should execute run-unit-tests.sh to run tests locally.

If you don't want to build the tests, disable the BUILD_TESTS option:

cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_TESTS=OFF
cmake --build build -j8

Build perf tools

If you want to build the perf tools, enable the BUILD_PERF_TOOLS option:

cmake -B build -DINTEGRATE_VCPKG=ON -DBUILD_PERF_TOOLS=ON
cmake --build build -j8

Then the perf tools will be built under ./build/perf/.

Platforms

Pulsar C++ Client Library has been tested on:

  • Linux
  • Mac OS X
  • Windows x64

Wireshark Dissector

See the wireshark directory for details.

Requirements for Contributors

It's required to install LLVM for clang-tidy and clang-format. Pulsar C++ client use clang-format 11 to format files. make format automatically formats the files.

For Ubuntu users, you can install clang-format-11 via apt install clang-format-11. For other users, run ./build-support/docker-format.sh if you have Docker installed.

We welcome contributions from the open source community, kindly make sure your changes are backward compatible with GCC 4.8 and Boost 1.53.

If your contribution adds Pulsar features for C++ clients, you need to update both the Pulsar docs and the Client Feature Matrix. See Contribution Guide for more details.

pulsar-client-cpp's People

Contributors

aahmed-se avatar bewaremypower avatar candlerb avatar coderzc avatar congbobo184 avatar demogorgon314 avatar erobot avatar gaoran10 avatar hrsakai avatar ivankelly avatar jai1 avatar jerrypeng avatar jiazhai avatar k2la avatar licht-t avatar lovelle avatar lucperkins avatar merlimat avatar michaeljmarshall avatar rdhabalia avatar robertindie avatar saandrews avatar saosir avatar shibd avatar sijie avatar srkukarni avatar tuteng avatar wolfstudy avatar zhaijack avatar zymap avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pulsar-client-cpp's Issues

[question] python jsonschema cant inheritance

schema

class BaseTaskOperateMsg(Record):
    task_ids = Array(Integer())


class TaskOffline2OnlineMsg(BaseTaskOperateMsg):
    pass

producer

client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer(
    topic="taskOffline2Online1",
    schema=JsonSchema(TaskOffline2OnlineMsg),
)
msg = TaskOffline2OnlineMsg(task_ids=[1, 2])
producer.send(msg)
client.close()

consumer

client = pulsar.Client("pulsar://localhost:6650")


consumer = client.subscribe("taskOffline2Online1", "sub1", schema=JsonSchema(TaskOffline2OnlineMsg))

while True:
    msg = consumer.receive()
    try:
        print("Received message '{}' id='{}'".format(msg.data(), msg.message_id()))
        print(msg.value())
        # Acknowledge successful processing of the message
        consumer.acknowledge(msg)
    except Exception:
        # Message failed to be processed
        consumer.negative_acknowledge(msg)
    break

when using TaskOffline2OnlineMsg, i cant get task_ids.

Question: consumer seek by timestamp for C ?

Hi, I notice that there is an member function of Consumer

    Result seek(uint64_t timestamp);

at branch-2.5

Question:
consumer seek by timestamp for C is supported? or any plan to add this feature?

[Bug] Memory keeps increasing if consumer doe not acknowledge

Search before asking

  • I searched in the issues and found nothing similar.

Version

  • OS: Ubuntu 20.04
  • Pulsar: 2.10.2
  • Client: 85b1b53

Minimal reproduce step

Modify the SampleProducer.cc:

#include <pulsar/Client.h>

#include <chrono>
#include <iostream>
#include <thread>
using namespace pulsar;

int main() {
    Client client("pulsar://localhost:6650");

    const auto topic = "my-topic";
    Producer producer;
    if (ResultOk !=
        client.createProducer(
            topic, ProducerConfiguration().setBatchingEnabled(true).setBatchingMaxMessages(2), producer)) {
        return 1;
    }
    Consumer consumer;
    if (ResultOk != client.subscribe(topic, "sub", consumer)) {
        return 2;
    }

    constexpr int numMessages = 1000000;
    std::thread backgroudWorker{[&producer] {
        for (int i = 0; i < numMessages; i++) {
            producer.sendAsync(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), nullptr);
        }
    }};

    Message msg;
    for (int i = 0; i < numMessages; i++) {
        consumer.receive(msg);
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
        if (i % 1000 == 999) {
            std::cout << "received " << msg.getDataAsString() << " from " << msg.getMessageId() << std::endl;
        }
    }

    backgroudWorker.join();
    client.close();
}

Then run the ./example/SampleProducer. Use the following script to monitor the memory usage increment.

#!/bin/bash
set -e

LOOP_CNT=0
for i in {1..1000}; do
    PROC_ID=$(ps aux | grep SampleProducer | grep -v grep | awk '{print $2}')
    if [[ $PROC_ID ]]; then
        echo "LOOP_CNT: $LOOP_CNT"
        set -x
        cat /proc/$PROC_ID/status | grep VmData
        set +x
        LOOP_CNT=$((LOOP_CNT+1))
        sleep 1
    else
        echo "No process running a.out is found"
    fi
    sleep 1
done

What did you expect to see?

The memory usage should keep stable.

What did you see instead?

VmData:    35900 kB
VmData:    35908 kB
VmData:    35912 kB
VmData:    36036 kB
...
VmData:    53844 kB

We can see the VmData is increasing.

Anything else?

The root cause is that we use a BatchAcknowledgementTracker to maintain a map from the MessageId (without batch index) to a bit set that represents which messages are already acknowledged in a batch. The entry are only removed in

this->batchAcknowledgementTracker_.deleteAckedMessage(messageId, proto::CommandAck::Cumulative);

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[python client] cannot close consumer after an unsubscribe

Describe the bug
Currently, calling close() after unsubscribe() with the latest python client crashes. The connection with pulsar is not fully released if close() is not called, causing a leak in file descriptor.

To Reproduce
Steps to reproduce the behavior:

from pulsar import Client
client = Client(service_url='pulsar://localhost:6650')
consumer = client.subscribe("test_topic", subscription_name="Consumer 1")
consumer.unsubscribe()
consumer.close()
  • The consumer.close() crashes and raises an AlreadyClosed: Pulsar error exception,
  • Several file descriptor are left open because the connection wasn't fully closed.

full log:

2022-06-10 14:40:35.586 INFO  [139972470286144] Client:88 | Subscribing on Topic :test_topic
2022-06-10 14:40:35.586 INFO  [139972470286144] ClientConnection:189 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2022-06-10 14:40:35.586 INFO  [139972470286144] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2022-06-10 14:40:35.587 INFO  [139972139529984] ClientConnection:375 | [[::1]:51540 -> [::1]:6650] Connected to broker
2022-06-10 14:40:35.589 INFO  [139972139529984] HandlerBase:64 | [persistent://public/default/test_topic, Consumer 1, 0] Getting connection from pool
2022-06-10 14:40:35.609 INFO  [139972139529984] ConsumerImpl:224 | [persistent://public/default/test_topic, Consumer 1, 0] Created consumer on broker [[::1]:51540 -> [::1]:6650] 
2022-06-10 14:40:35.609 INFO  [139972470286144] ConsumerImpl:275 | [persistent://public/default/test_topic, Consumer 1, 0] Unsubscribing
2022-06-10 14:40:35.612 INFO  [139972139529984] ConsumerImpl:308 | [persistent://public/default/test_topic, Consumer 1, 0] Unsubscribed successfully
---------------------------------------------------------------------------
AlreadyClosed                             Traceback (most recent call last)
Input In [12], in <cell line: 5>()
      3 consumer = client.subscribe("test_topic", subscription_name="Consumer 1")
      4 consumer.unsubscribe()
----> 5 consumer.close()

File ~/.virtualenvs/api_converter/lib/python3.8/site-packages/pulsar/__init__.py:1239, in Consumer.close(self)
   1235 def close(self):
   1236     """
   1237     Close the consumer.
   1238     """
-> 1239     self._consumer.close()
   1240     self._client._consumers.remove(self)

AlreadyClosed: Pulsar error: AlreadyClosed

Expected behavior
we can close the consumer after unsubcribing, releasing the file descriptors. This is the current normal behaviour in the Java client.

Desktop (please complete the following information):

  • OS: Ubuntu 20.04
  • python 3.8
  • pulsar-client 2.10.0

Additional context
This issue follows a discussion with @BewareMyPower in apache/pulsar#14714

Thanks!

Flaky test ProducerTest.testMaxMessageSize

Search before asking

  • I searched in the issues and found nothing similar.

Version

main

Minimal reproduce step

What did you expect to see?

What did you see instead?

[----------] 1 test from Pulsar/ProducerTest
[ RUN      ] Pulsar/ProducerTest.testMaxMessageSize/0
2022-10-07 16:43:40.303 INFO  [139995075075712] Client:88 | Subscribing on Topic :ProducerTest-NoBatchMaxMessageSize-1665161020
2022-10-07 16:43:40.303 INFO  [139995075075712] ClientConnection:191 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2022-10-07 16:43:40.303 INFO  [139995075075712] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2022-10-07 16:43:40.306 INFO  [139995071952640] ClientConnection:381 | [[::1]:56366 -> [::1]:6650] Connected to broker
2022-10-07 16:43:40.311 INFO  [139995071952640] HandlerBase:61 | [persistent://public/default/ProducerTest-NoBatchMaxMessageSize-1665161020, sub, 0] Getting connection from pool
2022-10-07 16:43:40.356 WARN  [139995071952640] ClientConnection:1094 | [[::1]:56366 -> [::1]:6650] Received error response from server: ConsumerBusy (Exclusive consumer is already connected) -- req_id: 0
2022-10-07 16:43:40.356 ERROR [139995071952640] ConsumerImpl:267 | [persistent://public/default/ProducerTest-NoBatchMaxMessageSize-1665161020, sub, 0] Failed to create consumer: ConsumerBusy
/home/runner/work/pulsar-client-cpp/pulsar-client-cpp/tests/ProducerTest.cc:225: Failure
Expected equality of these values:
  ResultOk
    Which is: Ok
  client.subscribe(topic, "sub", consumer)
    Which is: ConsumerBusy
2022-10-07 16:43:40.356 INFO  [139995075075712] ClientConnection:1582 | [[::1]:56366 -> [::1]:6650] Connection closed with ConnectError
[  FAILED  ] Pulsar/ProducerTest.testMaxMessageSize/0, where GetParam() = true (55 ms)
[----------] 1 test from Pulsar/ProducerTest (55 ms total)
[----------] Global test environment tear-down
[==========] 1 test from 1 test suite ran. (55 ms total)
[  PASSED  ] 0 tests.
[  FAILED  ] 1 test, listed below:
[  FAILED  ] Pulsar/ProducerTest.testMaxMessageSize/0, where GetParam() = true
 1 FAILED TEST

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[Bug] Pulsar client can't have static storage duration

Search before asking

  • I searched in the issues and found nothing similar.

Version

  • pulsar client 2.9.0
  • ubuntu linux

Minimal reproduce step

int main(int argc, char** argv)
{
static ::pulsar::Client client{"pulsar://0.0.0.0:6650"};
// don't call client.close();
return 0;
}

What did you expect to see?

The program should terminate gracefully, instead it seg faults. If I call client.close(), there is no issue.

What did you see instead?

The program seg faults with the following trace:

In gdb:

Program received signal SIGSEGV, Segmentation fault.
0x000000000000001e in ?? ()
(gdb) bt
#0  0x000000000000001e in ?? ()
#1  0x00007fb5adff4bef in ?? () from /lib/libpulsar.so.2.9.2.13
#2  0x00007fb5adff76e4 in ?? () from /lib/libpulsar.so.2.9.2.13
#3  0x00000000002e0873 in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release (this=0xb1e400) at /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/shared_ptr_base.h:155
#4  0x00000000002e082a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count (this=0x51b370 <main::_client+8>) at /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/shared_ptr_base.h:730
#5  0x00000000002e0f29 in std::__shared_ptr<pulsar::ClientImpl, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr (this=0x51b368 <main::_client>) at /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/shared_ptr_base.h:1169
#6  0x00000000002e0f05 in std::shared_ptr<pulsar::ClientImpl>::~shared_ptr (this=0x51b368 <main::_client>) at /usr/bin/../lib/gcc/x86_64-linux-gnu/9/../../../../include/c++/9/bits/shared_ptr.h:103
#7  0x00000000002dfde5 in pulsar::Client::~Client (this=0x51b368 <main::_client>) at /usr/include/pulsar/Client.h:46
#8  0x00007fb5ad93f8a7 in __run_exit_handlers (status=0, listp=0x7fb5adae5718 <__exit_funcs>, run_list_atexit=run_list_atexit@entry=true, run_dtors=run_dtors@entry=true) at exit.c:108
#9  0x00007fb5ad93fa60 in __GI_exit (status=<optimized out>) at exit.c:139
#10 0x00007fb5ad91d08a in __libc_start_main (main=0x2df330 <main(int, char**)>, argc=3, argv=0x7ffd9fae0c18, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7ffd9fae0c08) at ../csu/libc-start.c:342
#11 0x00000000002df26e in _start ()

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[Bug] TEST_P related tests are very flaky with paralle tests

Search before asking

  • I searched in the issues and found nothing similar.

Version

master after #54

Minimal reproduce step

Run ShutdownTest in parallel

python3 ./gtest_parallel.py --dump_json_test_results=/tmp/gtest_parallel_results.json \
  --workers=4 --retry_failed=2 -d /tmp \
  ./tests/pulsar-tests --gtest_filter='*ShutdownTest*'

What did you expect to see?

All tests passed.

What did you see instead?

FAILED TESTS (8/6):
     192 ms: ./tests/pulsar-tests Pulsar/ShutdownTest.testDestructor/1 (try #1)
     190 ms: ./tests/pulsar-tests Pulsar/ShutdownTest.testDestructor/2 (try #1)
     190 ms: ./tests/pulsar-tests Pulsar/ShutdownTest.testClose/2 (try #1)
     129 ms: ./tests/pulsar-tests Pulsar/ShutdownTest.testDestructor/2 (try #2)
     128 ms: ./tests/pulsar-tests Pulsar/ShutdownTest.testClose/2 (try #2)
     134 ms: ./tests/pulsar-tests Pulsar/ShutdownTest.testDestructor/1 (try #2)
     106 ms: ./tests/pulsar-tests Pulsar/ShutdownTest.testDestructor/2 (try #3)
     117 ms: ./tests/pulsar-tests Pulsar/ShutdownTest.testClose/2 (try #3)

Anything else?

It's because gtest-parallel runs tests in different processes, not threads. So the topic name could be the same even if it has the timestamp suffix. We can see it from the logs:

2022-10-21 10:42:06.749 ERROR [140341161174784] MultiTopicsConsumerImpl:449 | Closing the consumer failed for partition - persistent://public/default/shutdown-test-0-1666320126-partition-0 with error - AlreadyClosed
2022-10-21 10:42:06.849 ERROR [140219768280832] MultiTopicsConsumerImpl:449 | Closing the consumer failed for partition - persistent://public/default/shutdown-test-0-1666320126-partition-1 with error - AlreadyClosed

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[pulsar-client-cpp] Partition key from producer does arrive in broker

Describe the bug
When using the Java client producer, the setting of Key in MessageBuilder correctly coveys this metadata to the broker. C++ client consumers can then successfully use Key_Shared subscription on this topic and we're seeing the expected keys-based consumption behavior.

However, when we use a C++ client producer, we do not get a "shared key" consumption behavior over the subscribed consumers.

To Reproduce
Steps to reproduce the behavior:

  1. Initiate a Java client producer. Add a Key to the MessageBuidler.
  2. Send messages using the MessageBuilder
  3. Initiate two C++ consumers using the "Key_Shared" subscription
  4. See key-based distributed messages over the consumers.
  5. Now stop the Java client producer
  6. Start a CPP client producer that sets the same metadata
  7. Now only one of the C++ client consumers retrieves all messages

Expected behavior
We'd expect to see the same "key-shared" message distribution over the consumers with a C++ client producer as with its Java equivalent.

Additional context
We suspect a difference in the protobuf schema between the Java and C++ client producers for setting the partition or ordering key and conveying this metadata to the broker. It seems that the broker does not receive the message metadata from the C++ client producer correctly.

NB: We are using the same version of C++ client and broker (both from Pulsar v2.5.0).

Improvements on building the project

Currently the build time is long and the binary size is large. For example, building the C++ client with the default CMake options in my laptop with 8 threads is:

$ cmake -B build
$ time cmake --build build -j8
real    4m36.426s
user    31m50.682s
sys     1m24.831s
$ ls -lhS build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/*.o | head -n 10
-rw-r--r-- 1 xyz xyz   25M Oct 21 14:09 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/ClientConnection.cc.o
-rw-r--r-- 1 xyz xyz   19M Oct 21 14:08 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/ClientImpl.cc.o
-rw-r--r-- 1 xyz xyz   13M Oct 21 14:08 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/ConsumerImpl.cc.o
-rw-r--r-- 1 xyz xyz   12M Oct 21 14:09 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/PatternMultiTopicsConsumerImpl.cc.o
-rw-r--r-- 1 xyz xyz   11M Oct 21 14:09 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/MultiTopicsConsumerImpl.cc.o
-rw-r--r-- 1 xyz xyz   11M Oct 21 14:09 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/ProducerImpl.cc.o
-rw-r--r-- 1 xyz xyz  7.4M Oct 21 14:08 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/HTTPLookupService.cc.o
-rw-r--r-- 1 xyz xyz  6.6M Oct 21 14:09 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/Url.cc.o
-rw-r--r-- 1 xyz xyz  6.5M Oct 21 14:08 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/BinaryProtoLookupService.cc.o
-rw-r--r-- 1 xyz xyz  6.0M Oct 21 14:09 build/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/PartitionedProducerImpl.cc.o

The top 10 largest objects all include the PulsarApi.pb.h.

The main cause is the protobuf-generated files are too large and the header is unnecessarily included in many headers.

$ wc -l generated/lib/PulsarApi.pb.h
29866 generated/lib/PulsarApi.pb.h
$ wc -l generated/lib/PulsarApi.pb.cc
29393 generated/lib/PulsarApi.pb.cc

We should avoid the direct includes on PulsarApi.pb.h as much as possible, which increases the binary size and compilation time.

Another problem is that the forward declarations are not common used in the project, which does not only applies for PulsarApi.pb.h. When some header files are modified, many source files needs to be recompiled. It really harms the develop experience.

Add license headers and related checks

The LICENSE file is missed currently.

Before moving the C++ client from Pulsar main repo here, the license check was performed by mvn license:check. After adding the LICENSE file, we should add a license verification workflow.

[feature request] Support ACK response

Motivation

Catch up apache/pulsar#8996. The acknowledge and acknowledgeCumulative methods should be synchronized APIs, but they don't wait until the ACK response is received. #8161 adds the ACK response at broker side introduced from 2.8.0 and v17 protocol

Specify the dependency version in the vcpkg

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Currently, we doesn't specify the version for every dependency in the cpp client: https://github.com/apache/pulsar/blob/374b3a18155cba33c4b4cfdb18c8119487be565e/pulsar-client-cpp/vcpkg.json
This may create uncertainty for packing behavior. Need to specify the dependency version in the vcpkg.

Solution

Specify the dependency version in the vcpkg

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Shared consumers do not receive messages in C++ and acknowledged messages get redelivered

When using shared subscriptions with the C++ client we see the following issues:

  • The consumers stop consuming silently after a period of time, without any obvious reasons (e.g. connection failure) or any unusual log messages. The issue seems to happen randomly. Recreating the consumers with the same parameters seems to fix this issue for a while. We are experiencing this issue both using the synchronous and the async APIs. The topic is non-partitioned persistent topic. The server is a standalone server. We only see this problem with shared consumers, it doesn't seem to happen if we change to exclusive.

  • Acknowledged messages get redelivered to shared consumers in C++ after subscription. It seems to happen both with synchronous and async acknowledgment. The API docs on acknowledge() states that acknowleded messages should not be redelivered.

To Reproduce
Create shared consumers in C++ for a topic and call receive. The problem seems to happen randomly but relatively frequently. It does not occur with exclusive consumers.

Expected behavior

  • At least one of the shared consumers should receive each message.
  • Acknowledged messages should not be redelivered.

Desktop (please complete the following information):

  • RedHat 7.x

[Feature] Supports producers to automatically obtain partitions

Search before asking

  • I searched in the issues and found nothing similar.

Version

2.7.3

Minimal reproduce step

create producer and don't close it, when server adjust partition producer still write data to old partion

What did you expect to see?

keep producer don't close , producer will write date to new partion

What did you see instead?

kafka client auto get new partion

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[Bug] Producer was not clean up after closed

Search before asking

  • I searched in the issues and found nothing similar.

Version

  • OS: Ubuntu 20.04
  • Pulsar: 2.10.1
  • C++ client: master 7f7653b

Minimal reproduce step

#include <pulsar/Client.h>
#include <chrono>
#include <thread>
using namespace pulsar;

int main() {
    Client client("pulsar://localhost:6650");
    while (true) {
        Producer producer;
        auto result = client.createProducer("my-topic", producer);
        if (result != ResultOk) {
            break;
        }
        producer.close();
        std::this_thread::sleep_for(std::chrono::milliseconds(500));
    }
    client.close();
}

What did you expect to see?

The memory usage should be stable.

What did you see instead?

The program above just create producers in loop. However, the memory usage keeps increasing after some time (in my local env, after the 27th producer is created, each time a producer is created and closed, 16KB memory usage will be increased)

Here is a simple script to detect memory usage on Linux (assuming the program name is SampleProducer):

#!/bin/bash
set -ex
PROC_ID=$(ps aux | grep "SampleProducer" | grep -v grep | awk '{print $2}')
if [[ $PROC_ID ]]; then
    for i in {1..100}; do
        cat /proc/$PROC_ID/status | grep VmData
        sleep 1
    done
fi

20221019155148

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[Bug] Message::getDataAsString() will crash with Windows debug library

Search before asking

  • I searched in the issues and found nothing similar.

Version

It works for master branch, but you can also use the release here x64-windows-static-Debug in here.

Uncompress it to a directory, e.g. D:\pulsar-cpp-debug.

D:\pulsar-cpp-debug
    bin/pulsar.dll
    lib/pulsar.lib
    include/

Minimal reproduce step

CMakeLists.txt

cmake_minimum_required(VERSION 3.15)

project("PulsarApp" CXX)

set(PULSAR_ROOT "D:\\pulsar-cpp-debug")
add_executable(PulsarApp main.cc)
include_directories("${PULSAR_ROOT}/include")
target_link_libraries(PulsarApp PRIVATE "${PULSAR_ROOT}/lib/pulsar.lib")
set_property(TARGET PulsarApp PROPERTY MSVC_RUNTIME_LIBRARY "MultiThreaded$<$<CONFIG:Debug>:Debug>DLL")

main.cc

#include <pulsar/Client.h>
using namespace pulsar;

int main() {
  auto msg = MessageBuilder().setContent("hello").build();
  std::cout << msg.getDataAsString() << std::endl;
  return 0;
}

Then run the following commands in PowerShell

cmake -B build
cmake --build build --config Debug
cp D:\pulsar-cpp-debug\bin\pulsar.dll .
.\build\Debug\PulsarApp.exe

What did you expect to see?

"hello" is printed.

What did you see instead?

image

Anything else?

It looks like to be caused by the combination of static linking of 3rd party dependencies (LINK_STATIC=ON) and the std::string API compatibility.

For example, the following example works well.

#include <assert.h>
#include <pulsar/Client.h>
using namespace pulsar;

int main() {
  auto topic = "topic";
  Client client("pulsar://172.24.101.226:6650");
  Producer producer;
  assert(ResultOk == client.createProducer(topic, producer));
  Consumer consumer;
  assert(ResultOk == client.subscribe(topic, "sub", consumer));
  producer.send(MessageBuilder().setContent("msg").build());
  Message msg;
  assert(ResultOk == consumer.receive(msg));
  // NOTE: it's the same with the `getDataAsString()` implementation
  std::cout << std::string(static_cast<const char*>(msg.getData()), msg.getLength()) << std::endl;
  client.close();
}

Are you willing to submit a PR?

  • I'm willing to submit a PR!

ConnectError with Python Client when importing other CPP library first

Hello,

I have the an issue with the pyproj library using Python>=3.9.

When I import pyproj first, I have the exact same issue with Pulsar.
If I import pulsar first, I have a bug in pyproj when I try to do this for example:

import pulsar
import pyproj

pyproj.Transformer.from_crs('epsg:4326', 'epsg:3035')
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.9/site-packages/pyproj/transformer.py", line 600, in from_crs
    cstrencode(CRS.from_user_input(crs_from).srs),
  File "/usr/local/lib/python3.9/site-packages/pyproj/crs/crs.py", line 501, in from_user_input
    return cls(value, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/pyproj/crs/crs.py", line 348, in __init__
    self._local.crs = _CRS(self.srs)
  File "pyproj/_crs.pyx", line 2352, in pyproj._crs._CRS.__init__
pyproj.exceptions.CRSError: Invalid projection: epsg:4326: (Internal Proj Error: proj_create: cannot build geodeticCRS 4326: cannot build unit of measure 9122: non double value)

So there seems to be some kind of conflict between cpp libraries since pyproj is also a wrapper for the PROJ cpp library and python 3.9+
I do not need to install any version of grpcio for this to happen though.

Thank you for looking into this because we are stuck at python 3.8 for now...

Also posted in pyproj here

_Originally posted by @laurent-chriqui in apache/pulsar-client-python#53

Attempting to connect a consumer to a very recently created topic/subscription sometimes fails with UnknownError on the client

Describe the bug
If I create a topic/subscription via the management API and then immediately attempt to connect a consumer to that subscription via the C++ client, the subscribe call sometimes (rarely; less than half of the time) fails with UnknownError.

This error does not coincide with any obvious issues in the broker or proxy log; sorry for the light detail on this bug.

To Reproduce
Run reproduction plan for apache/pulsar#12551; sometimes no error will occur, sometimes the error described in that issue or others will occur, and sometimes this error will occur during the last step (connecting a consumer).

Expected behavior

  1. Consumer connection either succeeds or fails with an informative error indicating what action is needed to correct this condition.
  2. This is probably more important: interactions with Pulsar (e.g. creating consumers) that happen a very short time after the entities being interacted with (topics/subscriptions in this case) should succeed; . This bug and the other similar ones I filed (see github links below) all seem to arise from CRUD operations with the management API being asynchronous: i.e. when I create/delete a tenant/topic/namespace, the actual side effects of that creation or deletion (e.g. adding/removing ledgers in BookKeeper, updating metadata in ZK) occur later, not during the API post. Not only is that bound to cause bugs like this, but it's also not what users expect; I would be happy to wait seconds or minutes for management API operations to complete in exchange for knowing that when they successfully complete that the thing I requested has actually been done.

Client logs

[persistent://blt6/chariot_ns_test/chariot_topic_test-partition-1, blt, 1] Failed to create consumer: UnknownError
Closing the consumer failed for partition - 1
Unable to create Consumer for partition - 1 Error - UnknownError
Closing the consumer failed for partition - 0
[persistent://blt6/chariot_ns_test/chariot_topic_test-partition-3, blt, 3] Failed to close consumer: ConnectError

This error is surfaced in Python (client version 2.8.1, on EKS amazon linux) as UnknownError.

[Bug] The Windows artifacts build from GA is incompatible with MSVC 19.29

Search before asking

  • I searched in the issues and found nothing similar.

Version

Microsoft Visual Studio Community 2019
Version 16.11.19

See the compiler info from the CMake outputs:

-- Building for: Visual Studio 16 2019
-- Selecting Windows SDK version 10.0.19041.0 to target Windows 10.0.19044.
-- The C compiler identification is MSVC 19.29.30146.0
-- The CXX compiler identification is MSVC 19.29.30146.0

Minimal reproduce step

There are some artifacts in my own fork that are built from the workflow added in #73.

https://github.com/BewareMyPower/pulsar-client-cpp/releases/tag/v3.1.0-rc-20221028

Download the x64-windows-static.zip and uncompress to a directory, e.g D:\pulsar-cpp.

CMakeLists.txt:

cmake_minimum_required(VERSION 3.15)

project(pulsar-cpp)

set(CMAKE_PREFIX_PATH "D:\\pulsar-cpp")
find_path(PULSAR_INCLUDE_DIR NAMES "pulsar/Client.h")
find_library(PULSAR_LIBRARY NAMES "pulsarWithDeps.lib")
message(STATUS "PULSAR_INCLUDE_DIR: " ${PULSAR_INCLUDE_DIR})
message(STATUS "PULSAR_LIBRARY: " ${PULSAR_LIBRARY})

add_executable(PulsarApp PulsarApp.cc)
set_target_properties(PulsarApp PROPERTIES MSVC_RUNTIME_LIBRARY MultiThreaded)
target_compile_options(PulsarApp PRIVATE "-DPULSAR_STATIC")
target_include_directories(PulsarApp PRIVATE ${PULSAR_INCLUDE_DIR})
target_link_libraries(PulsarApp PRIVATE ${PULSAR_LIBRARY})

PulsarApp.cc:

#include <pulsar/Client.h>

int main() {
  pulsar::Client client("pulsar://localhost:6650");
  client.close();
}

Run the following commands:

cmake -B build
cmake --build build --config Release

What did you expect to see?

The build should succeed.

What did you see instead?

pulsarWithDeps.lib(PatternMultiTopicsConsumerImpl.obj) : error LNK2001: unresolved external symbol __std_find_trivial_1 [D
:\VS2019\pulsar-cpp-demo\build\PulsarApp.vcxproj]
pulsarWithDeps.lib(Url.obj) : error LNK2001: unresolved external symbol __std_find_trivial_1 [D:\VS2019\pulsar-cpp-demo\bu
ild\PulsarApp.vcxproj]
pulsarWithDeps.lib(AuthOauth2.obj) : error LNK2001: unresolved external symbol __std_find_trivial_1 [D:\VS2019\pulsar-cpp-
demo\build\PulsarApp.vcxproj]
pulsarWithDeps.lib(ZTSClient.obj) : error LNK2001: unresolved external symbol __std_find_trivial_1 [D:\VS2019\pulsar-cpp-d
emo\build\PulsarApp.vcxproj]
pulsarWithDeps.lib(Schema.obj) : error LNK2001: unresolved external symbol __std_find_trivial_1 [D:\VS2019\pulsar-cpp-demo
\build\PulsarApp.vcxproj]
pulsarWithDeps.lib(HTTPLookupService.obj) : error LNK2001: unresolved external symbol __std_find_trivial_1 [D:\VS2019\puls
ar-cpp-demo\build\PulsarApp.vcxproj]
pulsarWithDeps.lib(AuthOauth2.obj) : error LNK2019: unresolved external symbol __std_init_once_link_alternate_names_and_ab
ort referenced in function "void __cdecl std::call_once<void (__cdecl pulsar::ClientCredentialFlow::*)(void),class pulsar:
:ClientCredentialFlow *>(struct std::once_flag &,void (__cdecl pulsar::ClientCredentialFlow::*&&)(void),class pulsar::Clie
ntCredentialFlow * &&)" (??$call_once@P8ClientCredentialFlow@pulsar@@EAAXXZPEAV12@@std@@YAXAEAUonce_flag@0@$$QEAP8ClientCr
edentialFlow@pulsar@@EAAXXZ$$QEAPEAV23@@Z) [D:\VS2019\pulsar-cpp-demo\build\PulsarApp.vcxproj]
pulsarWithDeps.lib(message_lite.cc.obj) : error LNK2001: unresolved external symbol __std_init_once_link_alternate_names_a
nd_abort [D:\VS2019\pulsar-cpp-demo\build\PulsarApp.vcxproj]
pulsarWithDeps.lib(arenastring.cc.obj) : error LNK2001: unresolved external symbol __std_init_once_link_alternate_names_an
d_abort [D:\VS2019\pulsar-cpp-demo\build\PulsarApp.vcxproj]
pulsarWithDeps.lib(crc32c_sw.obj) : error LNK2001: unresolved external symbol __std_init_once_link_alternate_names_and_abo
rt [D:\VS2019\pulsar-cpp-demo\build\PulsarApp.vcxproj]
D:\VS2019\pulsar-cpp-demo\build\Release\PulsarApp.exe : fatal error LNK1120: 2 unresolved externals [D:\VS2019\pulsar-cpp-
demo\build\PulsarApp.vcxproj]

Anything else?

The artifacts were built on Windows Server 2022.

From here we can see the version of MSVC 2022 runtime is 14.32.31332, which might be incompatible with MSVC 19.29

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[Improve] Show Invalid Token Error Message

Search before asking

  • I searched in the issues and found nothing similar.

Version

2.9.3.11+

Minimal reproduce step

pass an invalid token and try to create a producer

What did you expect to see?

User error messages that specify the input token is invalid.

Error Checking/Getting Partition Metadata while creating producer on {topicName} -- InvalidToken

What did you see instead?

Error Checking/Getting Partition Metadata while creating producer on {topicName} -- ConnectError

Anything else?

Similar stack-overflow questions:

https://stackoverflow.com/questions/62227305/apache-pulsar-error-checking-getting-partition-metadata-while-subscribing

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[feature request] C++ client support producer access mode.

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

The current java client supports four access modes: Shared, Exclusive, ExclusiveWithFencing, and WaitForExclusive.

C++ and Python also needs support.

The issue is responsible for tracking, and specific tasks can be disassembled to do.

Solution

Refer PIP:

  1. PIP 68: Exclusive Producer
  2. PIP161: ExclusiveWithFencing

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[feature request] C++ and Python client support consumer memory limits.

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

PIP-74

C++ client consumers have not implemented memory limits yet.

Solution

There are several key points to achieving this feature.

  1. Reconstructing the consumer structure.
  2. Refactoring single-consumer and MultiTopicConsumer transports type.
    • In the current implementation of C++, messages are sent to multi-topic consumers by blocking queue + listener, which is an old historical implementation. If you want to achieve memory limits, you need to do the same as the current java implementation, let MuiltiTopicConsumer take the initiative to control the message.

Alternatives

Here's an imperfect draft: apache/pulsar#16933

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Producer#flush doesn't respect non-batched messages

Search before asking

  • I searched in the issues and found nothing similar.

Version

main( ba1a7e1 )

Minimal reproduce step

  1. Create a producer with batchingEnabled=false
  2. Send some messages asynchronously
  3. Call Producer#flush or Producer#flushAsync
  4. (Close a producer)
  • to get AlreadyClosed exception
    // ensure any remaining send callbacks are called before calling the close callback
    failPendingMessages(ResultAlreadyClosed, false);

What did you expect to see?

When Producer#flush or Producer#flushAsync is called, wait for the completion of sending messages, even if non-batched messages.

What did you see instead?

Producer#flush and Producer#flushAsync don't guarantee the completion of sending non-batched messages.

Anything else?

Currently, ProducerImpl#flushAsync do nothing if batchMessageContainer_ is not initialized.

} else {
callback(ResultOk);
}

In Java client, wait last sent messages. We should implement C++ client like it.
apache/pulsar#2103

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[C++ client] Deadletter queue functionality

Is anyone working on porting DLQ from the Java client to Cpp apache/pulsar#2508? I'd like to help/contribute.

I'm thinking on the following API

Client client("pulsar://localhost:6650");

Consumer consumer;
ConsumerConfiguration config;
config.setMaxRedeliverCount(10);
config.setDeadLetterTopic("my-dlq-topic");

Result result = client.subscribe("persistent://prop/r1/ns1/my-topic", "consumer-1", config, consumer);

Key-based batching and hashingScheme parameter for Python client

Short intro
We're running a platform that's effectively just a bunch of microservices written in Python, Java and Go which are communicating through Kafka. Currently we're considering moving from Kafka to Pulsar.
However, one of the main features we need is key-based message routing for solving concurrency problems. Pulsar has that option with KeyShared subscription mode.

The problem
In Python, KeyShared subscription works only if producer disables batching. I ran some tests in my local environment and disabling batching results in high decrease of throughput (~25k/s with batching vs ~9k/s without).

Features that would've solved the problem

  • Key-based batching for Python client
  • HashingScheme parameter when creating producer so all of our services (Java, Python and Go) could have the same hashing scheme

[pulsar-client-cpp] Excessive locking cause significant performance degradation

Describe the bug
Implementation of statistics in cpp client have two concurrency issues.

  1. ProducerStatsImpl (and ConsumerStatsImpl) classes use a single shared lock to protect access to internal data. The lock is taken on each sent or received message. Under high load this shared lock causes signficant contention and performance degradation.
    Profiler shows that sending and receiving threads block each-other.

original-profiling

Since sending and receving functions access different member subset they should be protected by different mutex or other approach should be selected.
As example after patching issue I've got about 1/3 throughtput improvement. As you can see on screenshot below threads are witing on I/O but not on mutexes.
pathed-profiling

  1. ProducerStatsImpl implementation has races between destructor and DeadlineTimer callback. Consider following scenario:

    1. ProducerStatsImpl destructor acquire the mutex
    2. DeadlineTimer calls calback flushAndReset and blocked on mutex
    3. ProducerStatsImpl calls timer.cancel and cancel any pending operation but it cannot cancel already executed callback at step 2
    4. ProducerStatsImpl destructor release mutex
    5. DeadlineTimer acquire the mutex
    6. ProducerStatsImpl destructor destroy object
    7. DeadlineTimer callback access to deallocated memory

Are you willing accept PR for issue number one or both?

disableReplication in MessageBuilder does not work in PulsarClient C++ and Java

Describe the bug
Data is still replicated when disableReplication is enabled

To Reproduce
Steps to reproduce the behavior:

  1. Setup Geo-Replication with 3 cluster(A,B,C)
  2. Set disableReplication = true in MessageBuilder
Message msg = MessageBuilder().setContent("repl").disableReplication(true).build();
  1. Producer create connect in cluster C and send Message msg
producer.sendAsync(msg, notifySendMessage);

Actual output is consumer

  1. Cluster A
Received message: repl
  1. Cluster A
Received message: repl
  1. Cluster C
Received message: repl

Expected output is

  1. Cluster A
  1. Cluster A
  1. Cluster C
Received message: repl

Pulsar version: 2.3.0

[Build] Can't use it as a submodule

Search before asking

  • I searched in the issues and found nothing similar.

Version

Ubuntu 22.04, cmake 3.22, Pulsar Repo: master

Minimal reproduce step

files structure:
project_dir
-> source/main.cpp
-> thirdparty/pulsar-client (<- this is submodule 'git submodule add https://github.com/apache/pulsar-client-cpp.git thirdparty/pulsar-client')
-> CMakeLists.txt

CMakeLists.txt:
`cmake_minimum_required(VERSION 3.14)

project(
ppp
VERSION 0.1.0
DESCRIPTION "Short description"
HOMEPAGE_URL "https://example.com/"
LANGUAGES CXX
)

set(BUILD_STATIC_LIB ON CACHE BOOL "" FORCE)
set(BUILD_DYNAMIC_LIB OFF CACHE BOOL "" FORCE)
set(BUILD_TESTS OFF CACHE BOOL "" FORCE)
add_subdirectory(thirdparty/pulsar-client)

add_executable(ppp_exe source/main.cpp)
set_property(TARGET ppp_exe PROPERTY OUTPUT_NAME ppp)
target_compile_features(ppp_exe PRIVATE cxx_std_17)

target_link_libraries(ppp_exe PRIVATE pulsarStatic)
`

What did you expect to see?

Compiles and links without error

What did you see instead?

Minimal example does not compiles - errors like

In file included from /p/thirdparty/pulsar-client/lib/Backoff.cc:19:
/p/thirdparty/pulsar-client/lib/Backoff.h:21:10: fatal error: pulsar/defines.h: No such file or directory
21 | #include <pulsar/defines.h>

and so on - all include paths for pulsar client are wrong.

Verbose output:
usr/bin/c++ -DBOOST_ALLOW_DEPRECATED_HEADERS -DBOOST_ALL_NO_LIB -DBUILDING_PULSAR -DHAS_SNAPPY=0 -DHAS_ZSTD=0 -DLOG_CATEGORY_NAME="pulsar." -I/p -I/p/include -I/_build/p/include -I/_build/p/generated -I/_build/p/generated/lib -g -fPIC -Wall -Wformat-security -Wvla -Werror -Wno-sign-compare -Wno-deprecated-declarations -Wno-error=cpp -msse4.2 -mpclmul -Wno-stringop-truncation -fdiagnostics-show-option -fdiagnostics-color -fvisibility=hidden -Wl,--exclude-libs,ALL -std=gnu++11 -MD -MT thirdparty/pulsar-client/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/Backoff.cc.o -MF thirdparty/pulsar-client/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/Backoff.cc.o.d -o thirdparty/pulsar-client/lib/CMakeFiles/PULSAR_OBJECT_LIB.dir/Backoff.cc.o -c /p/thirdparty/pulsar-client/lib/Backoff.cc

Anything else?

I looked at main CMakeLists.txt in pulsar-client-cpp and there is error, I guess - you have to use PROJECT_<SOURCE/BINARY>DIR instead of CMAKE<>_DIR, in that case everything compiles and links.

Also cmake for tests using CMAKE_SOURCE_DIR to address some include dirs

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[C++] pulsar-client-cpp stability enhancements and code cleanup

Howdy! My company is investing heavily into event driven workflows, and we're utilizing Pulsar to accomplish this end. We're primarily a C++ shop, and I'm a Principal Engineer and in charge of the integration of Pulsar into our system, and experienced in C++. I've made a few bug fixes to the C++ client already, and in doing so I've noticed that it could use a bit of love to bring greater consistency to the design and fix up some low-hanging fruit. We've integrated Pulsar into our own build system, and it required a lot of changes to pass our build (which uses many more warnings and runs tests with ASAN). We've found that enforcing a more strict set of warnings helps to keep a c++ codebase healthy and stable, and I'd like to upstream these changes for the benefit of everyone, and to make continued maintenance easier.

Would anyone be opposed to me undertaking some general cleanup of the C++ client?

My proposed TODO list:

  • Enable many more warnings (-Wall, -Wvla, -Wformat-security, etc)
  • Enforce warnings as errors to prevent changes being submitted with warnings
  • Fix all the subsequent warnings that will be generated
  • Fix compatibility with boost 1.41 (the documentation claims it should be compatible, but it's not, and that happens to be the version we're stuck on).
  • Use consistent format for #includes of local files (some have a /lib prefix, others don't)
  • Run the tests via ASAN or Valgrind, if available (not sure how easy this will be yet...)
  • Update syntax to take advantage of more C++11 features (like std::make_shared, move semantics, etc)

I'll obviously split this stuff up into a series of smaller changes. I'm hopeful that by doing this I'll gain more experience working with Pulsar and could potentially contribute more significant improvements in the future, i.e. performance enhancements and helping to keep feature parity between the C++ and Java clients.

Thanks!

[Improve] Add interfaces in `class Message` to avoid unnecessary memory copy.

Motivation

Currently class Message provides two groups of functions to access the message's payload:

  1. Message::getData and Message::getLength.
  2. getDataAsString: this is a wrapper of the two above.

The second one performs std::string's construction, which contains expensive memory copy. If class Message can provide a new function that gives payload's ownership to the caller like this:

// In SharedBuffer.h add new function:
bool SharedBuffer::pop(std::string &target) {
    // Swap *data_ into target, then reset ptr_, readIdx_ etc.
}

// Message.cc
void Message::moveDataIntoString(std::string &data) {
    // Move payload into data via SharedBuffer::pop.
}

In high-throughput scenario, these new functions could bring significant performance improvement at data consuming.

[Bug] Consumer acked the wrong message when pending chunked messages exceed maxPendingChunkMessages

Search before asking

  • I searched in the issues and found nothing similar.

Version

master

Minimal reproduce step

TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) {
    if (toString(GetParam()) != "None") {
        return;
    }
    const std::string topic = "MessageChunkingTest-testExpireIncompleteChunkMessage-" + toString(GetParam()) +
                              std::to_string(time(nullptr));
    Consumer consumer;
    ConsumerConfiguration consumerConf;
    consumerConf.setMaxPendingChunkedMessage(1);
    consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true);
    createConsumer(topic, consumer, consumerConf);
    Producer producer;
    createProducer(topic, producer);

    auto msg = MessageBuilder().setContent("chunk-0").build();
    auto& metadata = PulsarFriend::getMessageMetadata(msg);
    metadata.set_num_chunks_from_msg(2);
    metadata.set_chunk_id(0);
    metadata.set_uuid("0");
    metadata.set_total_chunk_msg_size(100);

    producer.send(msg);

    auto msg2 = MessageBuilder().setContent("chunk-1").build();
    auto& metadata2 = PulsarFriend::getMessageMetadata(msg2);
    metadata2.set_num_chunks_from_msg(2);
    metadata2.set_uuid("1");
    metadata2.set_chunk_id(0);
    metadata2.set_total_chunk_msg_size(100);

    producer.send(msg2);

    auto msg3 = MessageBuilder().setContent("chunk-1").build();
    auto& metadata3 = PulsarFriend::getMessageMetadata(msg3);
    metadata3.set_num_chunks_from_msg(2);
    metadata3.set_uuid("1");
    metadata3.set_chunk_id(1);
    metadata3.set_total_chunk_msg_size(100);

    producer.send(msg3);

    Message receivedMsg;
    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 3000));
    ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-1chunk-1");

    consumer.redeliverUnacknowledgedMessages();

    Message receivedMsg2;
    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg2, 3000));
    ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-1chunk-1"); // Should pass, but failed

    producer.close();
    consumer.close();
}

The test should work.

What did you expect to see?

The test should be passed

What did you see instead?

The consumer ack the last message when the chunked messages exceed maxPendingChunkMessages.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Flaky tests: ShutdownTest.testDestructor

Search before asking

  • I searched in the issues and found nothing similar.

Version

master

Minimal reproduce step

Run the following script in CMake build directory. (Download

#!/bin/bash
set -e
cd `dirname $0`

curl -o -L https://raw.githubusercontent.com/google/gtest-parallel/master/gtest_parallel.py
for i in {1..20}; do
    echo "TEST $i"
    python3 ./gtest_parallel.py --dump_json_test_results=/tmp/gtest_parallel_results.json   --workers=4 --retry_failed=2 -d /tmp   ./tests/pulsar-tests --gtest_filter='*ShutdownTest*'
done

What did you expect to see?

It should never fail.

What did you see instead?

At least 1 failure happened.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[Enhancement] Do not use custom written Optional<T> class

Search before asking

  • I searched in the issues and found nothing similar.

Version

repo/master

Minimal reproduce step

template <> class Optional;

What did you expect to see?

Use optional class from boost or std

What did you see instead?

Used custom written Optional class

Anything else?

At this moment pulsar-client-cpp heavily used boost and std library. I suggest to use boost::optional instead of Optional class.
Boost::optional well-tested and has similar API with std::optional (at most).
In this issue I do not suggest to increase C++ standart to C++17 (from C++11) where std::optional is available, but in the future it can be done easily.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

[Bug] create producer and close producer in send callback will dead lock

Search before asking

  • I searched in the issues and found nothing similar.

Version

2.10.1

Minimal reproduce step

  1. create producer
  2. pulsar_producer_send_async
  3. pulsar_send_callback close producer , then thread will dead lock

What did you expect to see?

thread dead lock

Thread 3 (Thread 0x7f3f45341700 (LWP 30627)):
#0  0x00007f4548f63a35 in pthread_cond_wait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0
#1  0x00007f454943baec in std::condition_variable::wait(std::unique_lock<std::mutex>&) () from /lib64/libstdc++.so.6
#2  0x0000000000c5f423 in pulsar::Future<bool, pulsar::Result>::get (this=this@entry=0x7f3f4533d700, result=@0x7f3f4533d6f0: pulsar::ResultOk) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/Future.h:70
#3  0x0000000000cbc438 in pulsar::Producer::close (this=0xd0f6ef0) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/Producer.cc:82
#4  0x0000000000b2b2de in PulsarProducerObject::~PulsarProducerObject (this=0xd12c810, __in_chrg=<optimized out>) at source/gse-data/server_data/exporter/pulsar/pulsar_producer.h:74
#5  0x0000000000b2a296 in PulsarProducer::handleSendCallback (result=pulsar_result_Ok, msgId=0xd0f79a0, ctx=0xd12c810) at source/gse-data/server_data/exporter/pulsar/pulsar_producer.cpp:169
#6  0x0000000000c2586d in __call<void, pulsar::Result&&, const pulsar::MessageId&, 0ul, 1ul, 2ul, 3ul> (__args=<optimized out>, this=<optimized out>) at /usr/include/c++/4.8.2/functional:1296
#7  operator()<pulsar::Result, const pulsar::MessageId&, void> (this=<optimized out>) at /usr/include/c++/4.8.2/functional:1355
#8  std::_Function_handler<void (pulsar::Result, pulsar::MessageId const&), std::_Bind<void (*(std::_Placeholder<1>, std::_Placeholder<2>, void (*)(pulsar_result, _pulsar_message_id*, void*), void*))(pulsar::Result, pulsar::MessageId, void (*)(pulsar_result, _pulsar_message_id*, void*), void*)> >::_M_invoke(std::_Any_data const&, pulsar::Result, pulsar::MessageId const&) (__functor=..., __args#0=<optimized out>, __args#1=...) at /usr/include/c++/4.8.2/functional:2071
#9  0x0000000000d6b0a9 in operator() (__args#1=..., __args#0=pulsar::ResultOk, this=<optimized out>) at /usr/include/c++/4.8.2/functional:2471
#10 pulsar::completeSendCallbacks(const std::vector<std::function<void(pulsar::Result, const pulsar::MessageId&)>, std::allocator<std::function<void(pulsar::Result, const pulsar::MessageId&)> > > &, pulsar::Result, const pulsar::MessageId &) (callbacks=std::vector of length 1, capacity 1 = {...}, result=pulsar::ResultOk, id=...) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/MessageAndCallbackBatch.cc:57
#11 0x0000000000cc1313 in complete (messageId=..., result=pulsar::ResultOk, this=0x7f3f4533dbd0) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/OpSendMsg.h:58
#12 pulsar::ProducerImpl::ackReceived (this=0xd3f0018, sequenceId=sequenceId@entry=0, rawMessageId=...) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/ProducerImpl.cc:834
#13 0x0000000000d3af27 in pulsar::ClientConnection::handleIncomingCommand (this=this@entry=0xd3fb000) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/ClientConnection.cc:833
#14 0x0000000000d413da in pulsar::ClientConnection::processIncomingBuffer (this=this@entry=0xd3fb000) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/ClientConnection.cc:686
#15 0x0000000000d41ef8 in pulsar::ClientConnection::handleRead (this=0xd3fb000, err=..., bytesTransferred=<optimized out>, minReadSize=4) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/ClientConnection.cc:610
#16 0x0000000000d30cc8 in _M_call<std::shared_ptr<pulsar::ClientConnection>&, boost::system::error_code&, unsigned long&, unsigned int&> (__ptr=..., this=<optimized out>) at /usr/include/c++/4.8.2/functional:558
#17 operator()<std::shared_ptr<pulsar::ClientConnection>&, boost::system::error_code&, long unsigned int&, unsigned int&, void> (__object=..., this=<optimized out>) at /usr/include/c++/4.8.2/functional:610
#18 __call<void, boost::system::error_code&, unsigned long&, 0ul, 1ul, 2ul, 3ul> (__args=<optimized out>, this=<optimized out>) at /usr/include/c++/4.8.2/functional:1296
#19 operator()<boost::system::error_code&, long unsigned int&, void> (this=<optimized out>) at /usr/include/c++/4.8.2/functional:1355
#20 operator()<boost::system::error_code, long unsigned int> (arg2=<optimized out>, arg1=..., this=<optimized out>) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/UtilAllocator.h:63
#21 operator() (this=<optimized out>) at /usr/local/include/boost/asio/detail/bind_handler.hpp:127
#22 asio_handler_invoke<boost::asio::detail::binder2<AllocHandler<std::_Bind<std::_Mem_fn<void (pulsar::ClientConnection::*)(const boost::system::error_code&, long unsigned int, unsigned int)>(std::shared_ptr<pulsar::ClientConnection>, std::_Placeholder<1>, std::_Placeholder<2>, unsigned int)> >, boost::system::error_code, long unsigned int> > (function=...) at /usr/local/include/boost/asio/handler_invoke_hook.hpp:64
#23 boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder2<AllocHandler<std::_Bind<std::_Mem_fn<void (pulsar::ClientConnection::*)(const boost::system::error_code&, long unsigned int, unsigned int)>(std::shared_ptr<pulsar::ClientConnection>, std::_Placeholder<1>, std::_Placeholder<2>, unsigned int)> >, boost::system::error_code, long unsigned int>, AllocHandler<std::_Bind<std::_Mem_fn<void (pulsar::ClientConnection::*)(const boost::system::error_code&, long unsigned int, unsigned int)>(std::shared_ptr<pulsar::ClientConnection>, std::_Placeholder<1>, std::_Placeholder<2>, unsigned int)> > >(boost::asio::detail::binder2<AllocHandler<std::_Bind<std::_Mem_fn<void (pulsar::ClientConnection::*)(const boost::system::error_code&, long unsigned int, unsigned int)>(std::shared_ptr<pulsar::ClientConnection>, std::_Placeholder<1>, std::_Placeholder<2>, unsigned int)> >, boost::system::error_code, long unsigned int> &, AllocHandler<std::_Bind<std::_Mem_fn<void (pulsar::ClientConnection::*)(const boost::system::error_code&, long unsigned int, unsigned int)>(std::shared_ptr<pulsar::ClientConnection>, std::_Placeholder<1>, std::_Placeholder<2>, unsigned int)> > &) (function=..., context=...) at /usr/local/include/boost/asio/detail/handler_invoke_helpers.hpp:37
#24 0x0000000000d4c6c8 in boost::asio::detail::reactive_socket_recv_op<boost::asio::mutable_buffers_1, AllocHandler<std::_Bind<std::_Mem_fn<void (pulsar::ClientConnection::*)(boost::system::error_code const&, unsigned long, unsigned int)> (std::shared_ptr<pulsar::ClientConnection>, std::_Placeholder<1>, std::_Placeholder<2>, unsigned int)> > >::do_complete(boost::asio::detail::task_io_service*, boost::asio::detail::task_io_service_operation*, boost::system::error_code const&, unsigned long) (owner=0xce020b0, base=<optimized out>) at /usr/local/include/boost/asio/detail/reactive_socket_recv_op.hpp:110
#25 0x0000000000c7bcdf in complete (bytes_transferred=<optimized out>, ec=..., owner=..., this=0xd0f50e0) at /usr/local/include/boost/asio/detail/task_io_service_operation.hpp:37
#26 do_run_one (ec=..., this_thread=..., lock=..., this=0xce020b0) at /usr/local/include/boost/asio/detail/impl/task_io_service.ipp:384
#27 boost::asio::detail::task_io_service::run (this=0xce020b0, ec=...) at /usr/local/include/boost/asio/detail/impl/task_io_service.ipp:153
#28 0x0000000000c77b2a in run (this=<optimized out>, ec=...) at /usr/local/include/boost/asio/impl/io_service.ipp:66
#29 pulsar::ExecutorService::__lambda0::operator() (__closure=0xd10efb0) at /data/workspace/third-party-lib/pulsar-2.10.1/pulsar-client-cpp/lib/ExecutorService.cc:43
#30 0x00007f454943f340 in ?? () from /lib64/libstdc++.so.6
#31 0x00007f4548f5fea5 in start_thread () from /lib64/libpthread.so.0
#32 0x00007f454857a9fd in clone () from /lib64/libc.so.6

What did you see instead?

producer close and no dead lock thread

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

linker error for curl when using pulsar lib

Describe the bug
Can anyone help me here? I built the pulsar-client-cpp from source, but when i try to compile my project with the libpulsar.a files, i get linker errors all over.

I was able to resolve it by adding the curl library as a target_link in my own CMake, but I've been told this a cheat and i need to resolve the dependency between pulsar and curl.

The linker errors i see are as below...

Undefined symbols for architecture x86_64:
  "_curl_easy_cleanup", referenced from:
      pulsar::ClientCredentialFlow::initialize() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::ClientCredentialFlow::authenticate() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::HTTPLookupService::sendHTTPRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&) in libpulsar.a(HTTPLookupService.cc.o)
      pulsar::ZTSClient::getRoleToken() const in libpulsar.a(ZTSClient.cc.o)
  "_curl_easy_escape", referenced from:
      pulsar::TopicName::getEncodedName(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libpulsar.a(TopicName.cc.o)
  "_curl_easy_getinfo", referenced from:
      pulsar::ClientCredentialFlow::initialize() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::ClientCredentialFlow::authenticate() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::HTTPLookupService::sendHTTPRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&) in libpulsar.a(HTTPLookupService.cc.o)
      pulsar::ZTSClient::getRoleToken() const in libpulsar.a(ZTSClient.cc.o)
  "_curl_easy_init", referenced from:
      pulsar::ClientCredentialFlow::initialize() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::ClientCredentialFlow::authenticate() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::HTTPLookupService::sendHTTPRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&) in libpulsar.a(HTTPLookupService.cc.o)
      pulsar::TopicName::getCurlHandle() in libpulsar.a(TopicName.cc.o)
      pulsar::TopicName::getEncodedName(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libpulsar.a(TopicName.cc.o)
      pulsar::ZTSClient::getRoleToken() const in libpulsar.a(ZTSClient.cc.o)
  "_curl_easy_perform", referenced from:
      pulsar::ClientCredentialFlow::initialize() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::ClientCredentialFlow::authenticate() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::HTTPLookupService::sendHTTPRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&) in libpulsar.a(HTTPLookupService.cc.o)
      pulsar::ZTSClient::getRoleToken() const in libpulsar.a(ZTSClient.cc.o)
  "_curl_easy_setopt", referenced from:
      pulsar::ClientCredentialFlow::initialize() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::ClientCredentialFlow::authenticate() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::HTTPLookupService::sendHTTPRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&) in libpulsar.a(HTTPLookupService.cc.o)
      pulsar::ZTSClient::getRoleToken() const in libpulsar.a(ZTSClient.cc.o)
  "_curl_free", referenced from:
      pulsar::TopicName::getEncodedName(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libpulsar.a(TopicName.cc.o)
  "_curl_global_cleanup", referenced from:
      pulsar::HTTPLookupService::CurlInitializer::~CurlInitializer() in libpulsar.a(HTTPLookupService.cc.o)
      pulsar::HTTPLookupService::CurlInitializer::~CurlInitializer() in libpulsar.a(HTTPLookupService.cc.o)
  "_curl_global_init", referenced from:
      pulsar::HTTPLookupService::CurlInitializer::CurlInitializer() in libpulsar.a(HTTPLookupService.cc.o)
      pulsar::HTTPLookupService::CurlInitializer::CurlInitializer() in libpulsar.a(HTTPLookupService.cc.o)
      __GLOBAL__sub_I_HTTPLookupService.cc in libpulsar.a(HTTPLookupService.cc.o)
  "_curl_slist_append", referenced from:
      pulsar::ClientCredentialFlow::initialize() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::ClientCredentialFlow::authenticate() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::HTTPLookupService::sendHTTPRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&) in libpulsar.a(HTTPLookupService.cc.o)
      pulsar::ZTSClient::getRoleToken() const in libpulsar.a(ZTSClient.cc.o)
  "_curl_slist_free_all", referenced from:
      pulsar::ClientCredentialFlow::initialize() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::ClientCredentialFlow::authenticate() in libpulsar.a(AuthOauth2.cc.o)
      pulsar::HTTPLookupService::sendHTTPRequest(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >&) in libpulsar.a(HTTPLookupService.cc.o)
      pulsar::ZTSClient::getRoleToken() const in libpulsar.a(ZTSClient.cc.o)

i would very much appreciate this and can provide more information as necessary. I am running on MacOS bigSur 11.4.

Expected behavior
Project to compile and link as normal.

Desktop (please complete the following information):

  • OS: MacOS bigSur 11.4

[feature request] C++ and Python client support Dead Letter Topic

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Current C++ and Python clients do not support the dead letter topics.

Dead letter topic serves message redelivery, which is triggered by acknowledgement timeout or negative acknowledgement or retry letter topic .

C++ and Python also do not support acknowledgement-timeout and retry-letter-topic.

Solution

  1. You can refer to the implementation of java and this PIP-22.
  2. PIP-58, here display retry-letter-topic trigger send to dead letter topic.
  3. PIP-124 Need support Create init subscription before sending message to DLQ

Alternatives

Can be first implemented negative-acknowledgement trigger send to dead letter topic, and then implement acknowledgement-timeout and retry-letter-topic.

Anything else?

Reference Java implement PR:

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.