Coder Social home page Coder Social logo

reactivex / rxcpp Goto Github PK

View Code? Open in Web Editor NEW
3.0K 163.0 387.0 61.62 MB

Reactive Extensions for C++

License: Apache License 2.0

C++ 98.28% Makefile 0.03% CMake 1.55% Shell 0.09% HTML 0.05%
reactivex c-plus-plus rxcpp algorithms virtuous-procrastination values-distributed-in-time

rxcpp's Introduction

The Reactive Extensions for C++ (RxCpp) is a library of algorithms for values-distributed-in-time. The Range-v3 library does the same for values-distributed-in-space.

Task Status
rxcpp CI rxcpp CI
Source Badges
Github GitHub license
GitHub release
GitHub commits
Gitter.im Join in on gitter.im
Packages NuGet version vcpkg port
Documentation rxcpp doxygen documentation
reactivex intro rx marble diagrams

Usage

RxCpp is a header-only C++ library that only depends on the standard library. The CMake build generates documentation and unit tests. The unit tests depend on a git submodule for the Catch library.

Example

Add Rx/v2/src to the include paths

lines from bytes

#include "rxcpp/rx.hpp"
namespace Rx {
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::util;
}
using namespace Rx;

#include <regex>
#include <random>
using namespace std;
using namespace std::chrono;

int main()
{
    random_device rd;   // non-deterministic generator
    mt19937 gen(rd());
    uniform_int_distribution<> dist(4, 18);

    // for testing purposes, produce byte stream that from lines of text
    auto bytes = range(0, 10) |
        flat_map([&](int i){
            auto body = from((uint8_t)('A' + i)) |
                repeat(dist(gen)) |
                as_dynamic();
            auto delim = from((uint8_t)'\r');
            return from(body, delim) | concat();
        }) |
        window(17) |
        flat_map([](observable<uint8_t> w){
            return w |
                reduce(
                    vector<uint8_t>(),
                    [](vector<uint8_t> v, uint8_t b){
                        v.push_back(b);
                        return v;
                    }) |
                as_dynamic();
        }) |
        tap([](const vector<uint8_t>& v){
            // print input packet of bytes
            copy(v.begin(), v.end(), ostream_iterator<long>(cout, " "));
            cout << endl;
        });

    //
    // recover lines of text from byte stream
    //
    
    auto removespaces = [](string s){
        s.erase(remove_if(s.begin(), s.end(), ::isspace), s.end());
        return s;
    };

    // create strings split on \r
    auto strings = bytes |
        concat_map([](vector<uint8_t> v){
            string s(v.begin(), v.end());
            regex delim(R"/(\r)/");
            cregex_token_iterator cursor(&s[0], &s[0] + s.size(), delim, {-1, 0});
            cregex_token_iterator end;
            vector<string> splits(cursor, end);
            return iterate(move(splits));
        }) |
        filter([](const string& s){
            return !s.empty();
        }) |
        publish() |
        ref_count();

    // filter to last string in each line
    auto closes = strings |
        filter(
            [](const string& s){
                return s.back() == '\r';
            }) |
        Rx::map([](const string&){return 0;});

    // group strings by line
    auto linewindows = strings |
        window_toggle(closes | start_with(0), [=](int){return closes;});

    // reduce the strings for a line into one string
    auto lines = linewindows |
        flat_map([&](observable<string> w) {
            return w | start_with<string>("") | sum() | Rx::map(removespaces);
        });

    // print result
    lines |
        subscribe<string>(println(cout));

    return 0;
}

Reactive Extensions

The ReactiveX Observable model allows you to treat streams of asynchronous events with the same sort of simple, composable operations that you use for collections of data items like arrays. It frees you from tangled webs of callbacks, and thereby makes your code more readable and less prone to bugs.

Credit ReactiveX.io

Other language implementations

Resources

Cloning RxCpp

RxCpp uses a git submodule (in ext/catch) for the excellent Catch library. The easiest way to ensure that the submodules are included in the clone is to add --recursive in the clone command.

git clone --recursive https://github.com/ReactiveX/RxCpp.git
cd RxCpp

Installing

To install RxCpp into your OS you need to follow standart procedure:

mkdir build
cd build
cmake ..
make install 

If you're using the vcpkg dependency manager, you can install RxCpp using a single one-line command:

vcpkg install rxcpp

Vcpkg will acquire RxCpp, build it from source in your computer, and provide CMake integration support for your projects.

See the vcpkg repository for more information.

Importing

After you have successfully installed RxCpp you can import it into any project by simply adding to your CMakeLists.txt:

find_package(rxcpp CONFIG)

Building RxCpp Unit Tests

  • RxCpp is regularly tested on OSX and Windows.
  • RxCpp is regularly built with Clang, Gcc and VC
  • RxCpp depends on the latest compiler releases.

RxCpp uses CMake to create build files for several platforms and IDE's

ide builds

XCode

mkdir projects/build
cd projects/build
cmake -G"Xcode" -DRXCPP_DISABLE_TESTS_AND_EXAMPLES=0 ../CMake -B.

Visual Studio 2017

mkdir projects\build
cd projects\build
cmake -G "Visual Studio 15" -DRXCPP_DISABLE_TESTS_AND_EXAMPLES=0 ..\CMake\
msbuild Project.sln

makefile builds

OSX

mkdir projects/build
cd projects/build
cmake -G"Unix Makefiles" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DRXCPP_DISABLE_TESTS_AND_EXAMPLES=0 -B. ../CMake
make

Linux --- Clang

mkdir projects/build
cd projects/build
cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=clang -DCMAKE_CXX_COMPILER=clang++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -DCMAKE_EXE_LINKER_FLAGS="-stdlib=libc++" -DRXCPP_DISABLE_TESTS_AND_EXAMPLES=0 -B. ../CMake
make

Linux --- GCC

mkdir projects/build
cd projects/build
cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -DRXCPP_DISABLE_TESTS_AND_EXAMPLES=0 -B. ../CMake
make

Windows

mkdir projects\build
cd projects\build
cmake -G"NMake Makefiles" -DCMAKE_BUILD_TYPE=RelWithDebInfo -DRXCPP_DISABLE_TESTS_AND_EXAMPLES=0 -B. ..\CMake
nmake

The build only produces test and example binaries.

Running tests

  • You can use the CMake test runner ctest
  • You can run the test binaries directly rxcpp_test_*
  • Tests can be selected by name or tag Example of by-tag

rxcpp_test_subscription [perf]

Documentation

RxCpp uses Doxygen to generate project documentation.

When Doxygen+Graphviz is installed, CMake creates a special build task named doc. It creates actual documentation and puts it to projects/doxygen/html/ folder, which can be published to the gh-pages branch. Each merged pull request will build the docs and publish them.

Developers Material

Contributing Code

Before submitting a feature or substantial code contribution please discuss it with the team and ensure it follows the product roadmap. Note that all code submissions will be rigorously reviewed and tested by the Rx Team, and only those that meet an extremely high bar for both quality and design/roadmap appropriateness will be merged into the source.

Microsoft Open Source Code of Conduct

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

rxcpp's People

Contributors

aargor avatar alexeymarkov avatar besser82 avatar briangru avatar daixtrose avatar diorcety avatar elelel avatar ericniebler avatar exctues avatar freezestudio avatar furuholm avatar gchudnov avatar georgis avatar guhwanbae avatar iam avatar ivan-cukic avatar kirkshoop avatar lebdron avatar mattpd avatar mattpodwysocki avatar nitrram avatar petoknm avatar ralphsteinhagen avatar ravirael avatar sblom avatar shivashankarp avatar thepont avatar tinkerbeast avatar valerykopylov avatar victimsnino avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxcpp's Issues

Memory leak in skip & take with make_hot_observable

This has a memory leak:

#include "rxcpp/rx.hpp"
namespace rx=rxcpp;
namespace rxu=rxcpp::util;
namespace rxsc=rxcpp::schedulers;

#include "rxcpp/rx-test.hpp"

int main()
{
   auto sc = rxsc::make_test();
   auto w = sc.create_worker();
   const rxsc::test::messages<int> on;

   auto xs = sc.make_hot_observable({
      on.next(240, 5),
      on.completed(250)
   });

   auto res = w.start(
         [xs]() {
            return xs
               .take(1);
//               .as_dynamic();
         }
   );
}

I can't work out why. It's fine with make_cold_observable and with buffer.
clang++-3.6 -std=c++14 -O0 -g -stdlib=libc++ -fno-omit-frame-pointer -fPIC -fsanitize=address -lc++abi -fsanitize=address -pie test_rx_leak.cpp -Irxcpp/Rx/v2/src

These are the leaks:

=================================================================
==22438==ERROR: LeakSanitizer: detected memory leaks

Indirect leak of 1152 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608848e09 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608848e09 in std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608848e09 in std::__1::allocator_traits<std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> > >::allocate(std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f9608848e09 in std::__1::__split_buffer<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >&>::__split_buffer(unsigned long, unsigned long, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/__split_buffer:325
    #5 0x7f9608848c43 in void std::__1::vector<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> > >::__push_back_slow_path<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >(std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1579:49
    #6 0x7f96088482e8 in std::__1::vector<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> > >::push_back(std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1620:9
    #7 0x7f96088482e8 in std::__1::priority_queue<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::vector<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> > >, rxcpp::schedulers::detail::schedulable_queue<long>::compare_elem>::push(std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/queue:659
    #8 0x7f96088482e8 in rxcpp::schedulers::detail::schedulable_queue<long>::push(rxcpp::schedulers::detail::time_schedulable<long>&&) /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:870
    #9 0x7f9608847cc9 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:219:9
    #10 0x7f9608822aa1 in _ZNK5rxcpp10schedulers4test11test_worker17schedule_absoluteIZNKS2_5startIiZ4mainE3$_0EENS_10subscriberIT_NS_4test17testable_observerIS7_EEEET0_lllEUlRKNS0_11schedulableEE1_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionIS7_EE5valuesr15is_subscriptionIS7_EE5valuentsr14is_schedulableIS7_EE5valueEvE4typeElOS7_DpOT0_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:426:13
    #11 0x7f96088223c6 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:464:13
    #12 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #13 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #14 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 184 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f96088401e4 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f96088401e4 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::hot_observable<int>, std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f96088401e4 in std::__1::shared_ptr<rxcpp::schedulers::detail::hot_observable<int> > std::__1::shared_ptr<rxcpp::schedulers::detail::hot_observable<int> >::make_shared<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > > >(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960883fadb in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail14hot_observableIiEEJRNS_10shared_ptrINS3_9test_type15test_type_stateEEENS2_6workerENS_6vectorINS1_13notifications8recordedINS6_INSD_6detail17notification_baseIiEEEEEENS_9allocatorISJ_EEEEEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS6_ISO_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960883fadb in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336
    #6 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #7 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #8 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 136 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608830431 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608830431 in std::__1::allocator<std::__1::__shared_ptr_emplace<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::state_type, std::__1::allocator<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::state_type> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608830431 in std::__1::shared_ptr<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::state_type> std::__1::shared_ptr<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::state_type>::make_shared<rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::values const&, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&>(rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::values const&, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960882c0c9 in _ZNSt3__111make_sharedIZNK5rxcpp9operators6detail4takeIiNS1_10observableIiNS1_4test6detail11test_sourceIiEEEEiE12on_subscribeINS1_10subscriberIiNS6_17testable_observerIiEEEEEEvRKT_E10state_typeJRKNSB_6valuesERKSG_EEENS_9enable_ifIXntsr8is_arrayISH_EE5valueENS_10shared_ptrISH_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960882c0c9 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:66
    #6 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #7 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #8 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #9 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #10 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #11 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #12 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #13 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #14 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #15 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #16 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #17 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #18 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #19 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #20 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #21 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #22 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #23 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960882eb35 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960882eb35 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::dynamic_observer<int>::specific_observer<rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> >, std::__1::allocator<{lambda()#1}> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960882eb35 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1} std::__1::shared_ptr<rxcpp::dynamic_observer<int>::specific_observer<rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >::make_shared<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}>(void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960882e978 in _ZNSt3__111make_sharedIN5rxcpp16dynamic_observerIiE17specific_observerINS1_15static_observerIiZNKS1_9operators6detail4takeIiNS1_10observableIiNS1_4test6detail11test_sourceIiEEEEiE12on_subscribeINS1_10subscriberIiNSA_17testable_observerIiEEEEEEvRKT_EUliE_ZNKSG_ISK_EEvSN_EUlSt13exception_ptrE_ZNKSG_ISK_EEvSN_EUlvE_EEEEJSS_EEENS_9enable_ifIXntsr8is_arrayISL_EE5valueENS_10shared_ptrISL_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960882e978 in std::__1::shared_ptr<rxcpp::dynamic_observer<int>::virtual_observer> rxcpp::dynamic_observer<int>::make_destination<rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> >(rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>) /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:176
    #6 0x7f960882e853 in rxcpp::dynamic_observer<int>::dynamic_observer<rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> >(rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>) /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:194:23
    #7 0x7f960882e67e in rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> >::as_dynamic() const /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:263:9
    #8 0x7f960882e4cd in rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >::as_dynamic() const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:157:44
    #9 0x7f960882dc68 in _ZNK5rxcpp4test6detail11test_sourceIiE12on_subscribeINS_10subscriberIiNS_8observerIiNS_15static_observerIiZNKS_9operators6detail4takeIiNS_10observableIiS3_EEiE12on_subscribeINS5_IiNS0_17testable_observerIiEEEEEEvRKT_EUliE_ZNKSE_ISH_EEvSK_EUlSt13exception_ptrE_ZNKSE_ISH_EEvSK_EUlvE_EEEEEEEENSt3__19enable_ifIXntsr3std7is_sameISI_NS5_IiNS6_IiNS_16dynamic_observerIiEEEEEEEE5valueEvE4typeESI_ /home/ben/development/yy/pictet/rxcpp/rx-test.hpp:45:26
    #10 0x7f960882da21 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #11 0x7f960882ce29 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:349:13
    #12 0x7f960882cb9d in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::subscribe<rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>(rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}&&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #13 0x7f960882c2e4 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:72:9
    #14 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #15 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #16 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #17 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #18 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #19 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #20 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #21 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #22 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #23 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #24 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #25 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #26 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #27 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #28 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #29 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #30 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #31 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 112 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845ba7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845ba7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::__1::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845ba7 in std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844d78 in _ZNSt3__111make_sharedIN5rxcpp6detail28composite_subscription_inner28composite_subscription_stateEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS6_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844d78 in rxcpp::detail::composite_subscription_inner::composite_subscription_inner() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:295
    #6 0x7f9608825964 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f960882c0d1 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:68:32
    #8 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #9 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #10 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #11 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #12 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #13 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #14 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #15 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #16 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #17 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #18 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #19 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #20 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #21 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #22 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #23 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #24 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #25 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 112 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845ba7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845ba7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::__1::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845ba7 in std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844d78 in _ZNSt3__111make_sharedIN5rxcpp6detail28composite_subscription_inner28composite_subscription_stateEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS6_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844d78 in rxcpp::detail::composite_subscription_inner::composite_subscription_inner() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:295
    #6 0x7f9608825964 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f960883fab4 in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336:52
    #8 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #9 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #10 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 112 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845ba7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845ba7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::__1::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845ba7 in std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844d78 in _ZNSt3__111make_sharedIN5rxcpp6detail28composite_subscription_inner28composite_subscription_stateEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS6_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844d78 in rxcpp::detail::composite_subscription_inner::composite_subscription_inner() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:295
    #6 0x7f9608825964 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f9608836a4b in _ZN5rxcpp15make_subscriberIiNS_4test17testable_observerIiEEEENSt3__19enable_ifIXsr11is_observerIT0_EE5valueENS_10subscriberIT_S6_EEE4typeERKS6_ /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:241:5
    #8 0x7f96088367f8 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:191:12
    #9 0x7f9608826227 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:522:20
    #10 0x7f96088221f3 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:456:55
    #11 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #12 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #13 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 104 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608847697 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608847697 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::test_type::test_type_state, std::__1::allocator<rxcpp::schedulers::detail::test_type::test_type_state> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608847697 in std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state> std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f96088475e5 in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail9test_type15test_type_stateEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f96088475e5 in rxcpp::schedulers::detail::test_type::test_type() /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:127
    #6 0x7f9608846fad in std::__1::__libcpp_compressed_pair_imp<std::__1::allocator<rxcpp::schedulers::detail::test_type>, rxcpp::schedulers::detail::test_type, 1u>::__libcpp_compressed_pair_imp(std::__1::allocator<rxcpp::schedulers::detail::test_type>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:2047:40
    #7 0x7f9608846fad in std::__1::__compressed_pair<std::__1::allocator<rxcpp::schedulers::detail::test_type>, rxcpp::schedulers::detail::test_type>::__compressed_pair(std::__1::allocator<rxcpp::schedulers::detail::test_type>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:2317
    #8 0x7f9608846fad in std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::test_type, std::__1::allocator<rxcpp::schedulers::detail::test_type> >::__shared_ptr_emplace(std::__1::allocator<rxcpp::schedulers::detail::test_type>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:3699
    #9 0x7f9608846fad in std::__1::shared_ptr<rxcpp::schedulers::detail::test_type> std::__1::shared_ptr<rxcpp::schedulers::detail::test_type>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4280
    #10 0x7f9608825720 in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail9test_typeEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS6_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #11 0x7f9608825720 in rxcpp::schedulers::make_test() /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:574
    #12 0x7f9608821d0f in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:10:14
    #13 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 96 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883755f in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883755f in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::dynamic_observer<int>::specific_observer<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >, std::__1::allocator<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883755f in rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> std::__1::shared_ptr<rxcpp::dynamic_observer<int>::specific_observer<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > > >::make_shared<{lambda()#1}>({lambda()#1}&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608837358 in _ZNSt3__111make_sharedIN5rxcpp16dynamic_observerIiE17specific_observerINS1_8observerIiNS1_15static_observerIiZNKS1_10schedulers6detail9test_type16test_type_worker15make_subscriberIiEENS1_10subscriberIT_NS1_4test17testable_observerISD_EEEEvEUliE_ZNKSB_IiEESH_vEUlSt13exception_ptrE_ZNKSB_IiEESH_vEUlvE_EEEEEEJSN_EEENS_9enable_ifIXntsr8is_arrayISD_EE5valueENS_10shared_ptrISD_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608837358 in std::__1::shared_ptr<rxcpp::dynamic_observer<int>::virtual_observer> rxcpp::dynamic_observer<int>::make_destination<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >(rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> >) /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:176
    #6 0x7f9608837225 in rxcpp::dynamic_observer<int>::dynamic_observer<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >(rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> >) /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:194:23
    #7 0x7f9608836c0c in _ZN5rxcpp21make_observer_dynamicIiZNKS_10schedulers6detail9test_type16test_type_worker15make_subscriberIiEENS_10subscriberIT_NS_4test17testable_observerIS7_EEEEvEUliE_ZNKS5_IiEESB_vEUlSt13exception_ptrE_ZNKS5_IiEESB_vEUlvE_EENSt3__19enable_ifIXaaaasr6detail13is_on_next_ofIS7_T0_EE5valuesr6detail11is_on_errorIT1_EE5valuesr6detail15is_on_completedIT2_EE5valueENS_8observerIS7_NS_16dynamic_observerIS7_EEEEE4typeEOSI_OSJ_OSK_ /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:380:5
    #8 0x7f96088367d6 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:191:68
    #9 0x7f9608826227 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:522:20
    #10 0x7f96088221f3 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:456:55
    #11 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #12 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #13 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 88 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883eed5 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883eed5 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::mock_observer<int>, std::__1::allocator<rxcpp::schedulers::detail::mock_observer<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883eed5 in std::__1::shared_ptr<rxcpp::schedulers::detail::mock_observer<int> > std::__1::shared_ptr<rxcpp::schedulers::detail::mock_observer<int> >::make_shared<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&>(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f96088365c4 in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail13mock_observerIiEEJRNS_10shared_ptrINS3_9test_type15test_type_stateEEEEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS6_ISC_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f96088365c4 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:189
    #6 0x7f9608826227 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:522:20
    #7 0x7f96088221f3 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:456:55
    #8 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #9 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #10 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 72 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845098 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845098 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::__1::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845098 in std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >::make_shared<rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844ebd in _ZNSt3__111make_sharedIN5rxcpp12subscription18subscription_stateINS1_6detail28composite_subscription_innerEEEJS5_EEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS8_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844ebd in _ZN5rxcpp12subscriptionC2INS_6detail28composite_subscription_innerEEET_NSt3__19enable_ifIXntsr15is_subscriptionIS4_EE5valueEPPvE4typeE /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:130
    #6 0x7f9608825988 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f960883fab4 in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336:52
    #8 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #9 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #10 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 72 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845098 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845098 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::__1::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845098 in std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >::make_shared<rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844ebd in _ZNSt3__111make_sharedIN5rxcpp12subscription18subscription_stateINS1_6detail28composite_subscription_innerEEEJS5_EEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS8_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844ebd in _ZN5rxcpp12subscriptionC2INS_6detail28composite_subscription_innerEEET_NSt3__19enable_ifIXntsr15is_subscriptionIS4_EE5valueEPPvE4typeE /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:130
    #6 0x7f9608825988 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f9608836a4b in _ZN5rxcpp15make_subscriberIiNS_4test17testable_observerIiEEEENSt3__19enable_ifIXsr11is_observerIT0_EE5valueENS_10subscriberIT_S6_EEE4typeERKS6_ /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:241:5
    #8 0x7f96088367f8 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:191:12
    #9 0x7f9608826227 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:522:20
    #10 0x7f96088221f3 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:456:55
    #11 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #12 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #13 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 72 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845098 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845098 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::__1::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845098 in std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >::make_shared<rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844ebd in _ZNSt3__111make_sharedIN5rxcpp12subscription18subscription_stateINS1_6detail28composite_subscription_innerEEEJS5_EEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS8_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844ebd in _ZN5rxcpp12subscriptionC2INS_6detail28composite_subscription_innerEEET_NSt3__19enable_ifIXntsr15is_subscriptionIS4_EE5valueEPPvE4typeE /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:130
    #6 0x7f9608825988 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f960882c0d1 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:68:32
    #8 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #9 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #10 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #11 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #12 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #13 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #14 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #15 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #16 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #17 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #18 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #19 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #20 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #21 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #22 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #23 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #24 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #25 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608846575 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608846575 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::test_type::test_type_worker, std::__1::allocator<rxcpp::schedulers::detail::test_type::test_type_worker> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608846575 in std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_worker> std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_worker>::make_shared<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&>(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608847454 in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail9test_type16test_type_workerEJRNS_10shared_ptrINS4_15test_type_stateEEEEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS6_ISB_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608847454 in rxcpp::schedulers::detail::test_type::create_worker(rxcpp::composite_subscription) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:136
    #6 0x7f960883fac0 in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336:52
    #7 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #8 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #9 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 56 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608842ef2 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608842ef2 in std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608842ef2 in std::__1::allocator_traits<std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > > >::allocate(std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > >&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f9608842ef2 in std::__1::__split_buffer<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >, std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > >&>::__split_buffer(unsigned long, unsigned long, std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > >&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/__split_buffer:325
    #5 0x7f9608842d92 in void std::__1::vector<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >, std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > > >::__push_back_slow_path<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > const&>(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > const&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1579:49
    #6 0x7f9608840a4b in std::__1::vector<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >, std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > > >::push_back(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > const&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1600:9
    #7 0x7f9608840a4b in rxcpp::schedulers::detail::hot_observable<int>::on_subscribe(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:313
    #8 0x7f960882dc71 in _ZNK5rxcpp4test6detail11test_sourceIiE12on_subscribeINS_10subscriberIiNS_8observerIiNS_15static_observerIiZNKS_9operators6detail4takeIiNS_10observableIiS3_EEiE12on_subscribeINS5_IiNS0_17testable_observerIiEEEEEEvRKT_EUliE_ZNKSE_ISH_EEvSK_EUlSt13exception_ptrE_ZNKSE_ISH_EEvSK_EUlvE_EEEEEEEENSt3__19enable_ifIXntsr3std7is_sameISI_NS5_IiNS6_IiNS_16dynamic_observerIiEEEEEEEE5valueEvE4typeESI_ /home/ben/development/yy/pictet/rxcpp/rx-test.hpp:45:9
    #9 0x7f960882da21 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #10 0x7f960882ce29 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:349:13
    #11 0x7f960882cb9d in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::subscribe<rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>(rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}&&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #12 0x7f960882c2e4 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:72:9
    #13 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #14 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #15 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #16 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #17 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #18 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #19 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #20 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #21 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #22 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #23 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #24 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #25 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #26 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #27 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #28 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #29 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #30 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 56 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883d31a in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883d31a in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::notifications::notification<int>::on_next_notification, std::__1::allocator<rxcpp::notifications::notification<int>::on_next_notification> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883d31a in std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_next_notification> std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_next_notification>::make_shared<int>(int&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960883d153 in _ZNSt3__111make_sharedIN5rxcpp13notifications12notificationIiE20on_next_notificationEJiEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960883d153 in std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > rxcpp::notifications::notification<int>::on_next<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-notification.hpp:211
    #6 0x7f960883cf19 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}::operator()(int) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:196:62
    #7 0x7f96088379bc in rxcpp::dynamic_observer<int>::specific_observer<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >::on_next(int) const /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:161:13
    #8 0x7f960882f750 in void rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >::nextdetacher::operator()<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:47:13
    #9 0x7f960882f5d8 in void rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >::on_next<int&>(int&) const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:168:9
    #10 0x7f960882f427 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}::operator()(int) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:82:25
    #11 0x7f960883dc90 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::nextdetacher::operator()<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:47:13
    #12 0x7f960883db18 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::on_next<int const&>(int const&) const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:168:9
    #13 0x7f9608844944 in rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:305:29
    #14 0x7f960884474e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #15 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #16 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #17 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #18 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #19 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #20 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #21 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #22 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #23 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 56 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883d31a in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883d31a in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::notifications::notification<int>::on_next_notification, std::__1::allocator<rxcpp::notifications::notification<int>::on_next_notification> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883d31a in std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_next_notification> std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_next_notification>::make_shared<int>(int&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960883d153 in _ZNSt3__111make_sharedIN5rxcpp13notifications12notificationIiE20on_next_notificationEJiEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960883d153 in std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > rxcpp::notifications::notification<int>::on_next<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-notification.hpp:211
    #6 0x7f9608825c41 in rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > rxcpp::schedulers::test::messages<int>::next<int>(long, int) /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:381:41
    #7 0x7f9608821d54 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:15:7
    #8 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f96088380f7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f96088380f7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::notifications::notification<int>::on_completed_notification, std::__1::allocator<rxcpp::notifications::notification<int>::on_completed_notification> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f96088380f7 in std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_completed_notification> std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_completed_notification>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608837e84 in _ZNSt3__111make_sharedIN5rxcpp13notifications12notificationIiE25on_completed_notificationEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608837e84 in rxcpp::notifications::notification<int>::on_completed() /home/ben/development/yy/pictet/rxcpp/rx-notification.hpp:215
    #6 0x7f9608837c40 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:208:62
    #7 0x7f960882f147 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >::on_completed() const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:182:9
    #8 0x7f960882f44a in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}::operator()(int) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:85:25
    #9 0x7f960883dc90 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::nextdetacher::operator()<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:47:13
    #10 0x7f960883db18 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::on_next<int const&>(int const&) const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:168:9
    #11 0x7f9608844944 in rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:305:29
    #12 0x7f960884474e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #13 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #14 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #15 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #16 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #17 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #18 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #19 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #20 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #21 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f96088380f7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f96088380f7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::notifications::notification<int>::on_completed_notification, std::__1::allocator<rxcpp::notifications::notification<int>::on_completed_notification> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f96088380f7 in std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_completed_notification> std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_completed_notification>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608837e84 in _ZNSt3__111make_sharedIN5rxcpp13notifications12notificationIiE25on_completed_notificationEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608837e84 in rxcpp::notifications::notification<int>::on_completed() /home/ben/development/yy/pictet/rxcpp/rx-notification.hpp:215
    #6 0x7f9608825d29 in rxcpp::schedulers::test::messages<int>::completed(long) /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:385:41
    #7 0x7f9608821d65 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:16:7
    #8 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883f56f in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883f56f in std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883f56f in std::__1::allocator_traits<std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::allocate(std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f960883f56f in std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:936
    #5 0x7f960883f4da in std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::vector(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > > const&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1214:9
    #6 0x7f96088436ce in rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:294:11
    #7 0x7f9608840385 in std::__1::__libcpp_compressed_pair_imp<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >, rxcpp::schedulers::detail::hot_observable<int>, 1u>::__libcpp_compressed_pair_imp<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >&, std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&, 0ul, 0ul, 1ul, 2ul>(std::__1::piecewise_construct_t, std::__1::tuple<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >&>, std::__1::tuple<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&>, std::__1::__tuple_indices<0ul>, std::__1::__tuple_indices<0ul, 1ul, 2ul>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:2100:15
    #8 0x7f9608840385 in std::__1::__compressed_pair<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >, rxcpp::schedulers::detail::hot_observable<int> >::__compressed_pair<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >&, std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&>(std::__1::piecewise_construct_t, std::__1::tuple<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >&>, std::__1::tuple<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:2366
    #9 0x7f9608840385 in std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::hot_observable<int>, std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> > >::__shared_ptr_emplace<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > > >(std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >, std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:3704
    #10 0x7f9608840385 in std::__1::shared_ptr<rxcpp::schedulers::detail::hot_observable<int> > std::__1::shared_ptr<rxcpp::schedulers::detail::hot_observable<int> >::make_shared<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > > >(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4280
    #11 0x7f960883fadb in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail14hot_observableIiEEJRNS_10shared_ptrINS3_9test_type15test_type_stateEEENS2_6workerENS_6vectorINS1_13notifications8recordedINS6_INSD_6detail17notification_baseIiEEEEEENS_9allocatorISJ_EEEEEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS6_ISO_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #12 0x7f960883fadb in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336
    #13 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #14 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #15 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883a51a in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883a51a in std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883a51a in std::__1::allocator_traits<std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::allocate(std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f960883a51a in std::__1::__split_buffer<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >&>::__split_buffer(unsigned long, unsigned long, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/__split_buffer:325
    #5 0x7f960883a3b2 in void std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::__push_back_slow_path<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >(rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1579:49
    #6 0x7f9608837cfd in std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::push_back(rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1620:9
    #7 0x7f9608837cfd in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:207
    #8 0x7f960882f147 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >::on_completed() const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:182:9
    #9 0x7f960882f44a in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}::operator()(int) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:85:25
    #10 0x7f960883dc90 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::nextdetacher::operator()<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:47:13
    #11 0x7f960883db18 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::on_next<int const&>(int const&) const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:168:9
    #12 0x7f9608844944 in rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:305:29
    #13 0x7f960884474e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #14 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #15 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #16 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #17 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #18 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #19 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #20 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #21 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #22 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 16 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608842675 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608842675 in std::__1::allocator<rxcpp::notifications::subscription>::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608842675 in std::__1::allocator_traits<std::__1::allocator<rxcpp::notifications::subscription> >::allocate(std::__1::allocator<rxcpp::notifications::subscription>&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f9608842675 in std::__1::__split_buffer<rxcpp::notifications::subscription, std::__1::allocator<rxcpp::notifications::subscription>&>::__split_buffer(unsigned long, unsigned long, std::__1::allocator<rxcpp::notifications::subscription>&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/__split_buffer:325
    #5 0x7f9608842500 in void std::__1::vector<rxcpp::notifications::subscription, std::__1::allocator<rxcpp::notifications::subscription> >::__push_back_slow_path<rxcpp::notifications::subscription>(rxcpp::notifications::subscription&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1579:49
    #6 0x7f9608840b39 in std::__1::vector<rxcpp::notifications::subscription, std::__1::allocator<rxcpp::notifications::subscription> >::push_back(rxcpp::notifications::subscription&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1620:9
    #7 0x7f9608840b39 in rxcpp::schedulers::detail::hot_observable<int>::on_subscribe(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:314
    #8 0x7f960882dc71 in _ZNK5rxcpp4test6detail11test_sourceIiE12on_subscribeINS_10subscriberIiNS_8observerIiNS_15static_observerIiZNKS_9operators6detail4takeIiNS_10observableIiS3_EEiE12on_subscribeINS5_IiNS0_17testable_observerIiEEEEEEvRKT_EUliE_ZNKSE_ISH_EEvSK_EUlSt13exception_ptrE_ZNKSE_ISH_EEvSK_EUlvE_EEEEEEEENSt3__19enable_ifIXntsr3std7is_sameISI_NS5_IiNS6_IiNS_16dynamic_observerIiEEEEEEEE5valueEvE4typeESI_ /home/ben/development/yy/pictet/rxcpp/rx-test.hpp:45:9
    #9 0x7f960882da21 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #10 0x7f960882ce29 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:349:13
    #11 0x7f960882cb9d in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::subscribe<rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>(rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}&&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #12 0x7f960882c2e4 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:72:9
    #13 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #14 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #15 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #16 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #17 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #18 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #19 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #20 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #21 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #22 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #23 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #24 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #25 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #26 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #27 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #28 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #29 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #30 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

SUMMARY: AddressSanitizer: 2880 byte(s) leaked in 22 allocation(s).

linq_selectmany.hpp:34:26: error: use of undeclared identifier 'from'

Hi,
I am a newbie and I have something like:

#include <vector>

#include <cpplinq/linq.hpp>

void ReadEventData(std::vector<std::vector<Particle>>& v);

std::vector<std::vector<Particle>> ED;

void ROOTLinq() {
   ReadEventData(ED);

   using namespace cpplinq;

   auto n = from(ED).select_many([](std::vector<Particle> v) {return 1;}).
      where([](const Particle&p){return p.fCharge > 0;}).count();
...

I think select_many doesn't have the definition of the template from. I compile with:
clang -v
Apple LLVM version 5.1 (clang-503.0.40) (based on LLVM 3.4svn)
Target: x86_64-apple-darwin13.3.0
Thread model: posix
Cheers,
Vassil

Access Violation Exception during parallel concat calls at startup

Workaround:

spin up rxcpp ahead of time using this code.

rxcpp::sources::from(1).subscribe([](int i) {});

Issue:

Consider the following code.

#include "stdafx.h"
#include "rx.hpp"

int _tmain(int argc, _TCHAR* argv[])
{
    std::thread t1([]()
    {
        std::vector<int> ints1 = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        rxcpp::sources::iterate(ints1)
            .map([](int i) { return rxcpp::sources::range(i, i); })
            .concat()
            .subscribe([](int i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << i << std::endl; });
    });

    std::thread t2([]()
    {
        std::vector<int> ints2 = { 11, 12, 13, 14, 15, 16, 17, 18, 19, 20 };
        rxcpp::sources::iterate(ints2)
            .map([](int i) { return rxcpp::sources::range(i, i); })
            .concat()
            .subscribe([](int i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << i << std::endl; });
    });

    t1.join();
    t2.join();

    return 0;
}

The following error is thrown a fair amount of the time. It appears to be a library initialization issue, as I've only seen this when the first calls into rxcpp are done using the concat operator in parallel.

Unhandled exception at 0x003356E5 in RxBug.exe: 0xC0000005: Access violation reading location 0x00000000.

RxBug.exe!rxcpp::schedulers::scheduler::create_worker(rxcpp::composite_subscription cs) Line 374 C++
RxBug.exe!rxcpp::identity_one_worker::create_coordinator(rxcpp::composite_subscription cs) Line 159 C++
RxBug.exe!rxcpp::operators::detail::concatrxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker>::on_subscribe<rxcpp::subscriber<int,rxcpp::observer<int,rxcpp::static_observer<int,void (int),rxcpp::detail::OnErrorEmpty,rxcpp::detail::OnCompletedEmpty> > > >(rxcpp::subscriber<int,rxcpp::observer<int,rxcpp::static_observer<int,void (int),rxcpp::detail::OnErrorEmpty,rxcpp::detail::OnCompletedEmpty> > > scbr) Line 131 C++
RxBug.exe!rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::detail_subscribe::__l11::() Line 330 C++
RxBug.exe!rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::detail_subscribe::__l15::(const rxcpp::schedulers::schedulable & scbl) Line 346 C++
RxBug.exe!rxcpp::schedulers::make_action::__l11::(const rxcpp::schedulers::schedulable & s, const rxcpp::schedulers::recurse & r) Line 661 C++
RxBug.exe!std::_Callable_obj<void (const rxcpp::schedulers::schedulable &, const rxcpp::schedulers::recurse &),0>::_ApplyX<void,rxcpp::schedulers::schedulable const &,rxcpp::schedulers::recurse const &>(const rxcpp::schedulers::schedulable & <_Args_0>, const rxcpp::schedulers::recurse & <_Args_1>) Line 284 C++
RxBug.exe!std::_Func_impl<std::_Callable_obj<void (const rxcpp::schedulers::schedulable &, const rxcpp::schedulers::recurse &),0>,std::allocator<std::_Func_class<void,rxcpp::schedulers::schedulable const &,rxcpp::schedulers::recurse const &> >,void,rxcpp::schedulers::schedulable const &,rxcpp::schedulers::recurse const &>::_Do_call(const rxcpp::schedulers::schedulable & <_Args_0>, const rxcpp::schedulers::recurse & <_Args_1>) Line 229 C++
RxBug.exe!std::_Func_class<void,rxcpp::schedulers::schedulable const &,rxcpp::schedulers::recurse const &>::operator()(const rxcpp::schedulers::schedulable & <_Args_0>, const rxcpp::schedulers::recurse & <_Args_1>) Line 315 C++
RxBug.exe!rxcpp::schedulers::detail::action_type::operator()(const rxcpp::schedulers::schedulable & s, const rxcpp::schedulers::recurse & r) Line 631 C++
RxBug.exe!rxcpp::schedulers::action::operator()(const rxcpp::schedulers::schedulable & s, const rxcpp::schedulers::recurse & r) Line 638 C++
RxBug.exe!rxcpp::schedulers::schedulable::operator()(const rxcpp::schedulers::recurse & r) Line 597 C++
RxBug.exe!rxcpp::schedulers::current_thread::current_worker::schedule(std::chrono::time_pointstd::chrono::system_clock,std::chrono::duration<__int64,std::ratio<1,10000000 > > when, const rxcpp::schedulers::schedulable & scbl) Line 202 C++
RxBug.exe!rxcpp::schedulers::current_thread::current_worker::schedule(const rxcpp::schedulers::schedulable & scbl) Line 173 C++
RxBug.exe!rxcpp::schedulers::worker::schedule<void (const rxcpp::schedulers::schedulable &) >(rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::detail_subscribe::__l15::void (const rxcpp::schedulers::schedulable &) && a0) Line 740 C++
RxBug.exe!rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::detail_subscribe<rxcpp::subscriber<int,rxcpp::observer<int,rxcpp::static_observer<int,void (int),rxcpp::detail::OnErrorEmpty,rxcpp::detail::OnCompletedEmpty> > > >(rxcpp::subscriber<int,rxcpp::observer<int,rxcpp::static_observer<int,void (int),rxcpp::detail::OnErrorEmpty,rxcpp::detail::OnCompletedEmpty> > > o) Line 346 C++
RxBug.exe!rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::subscribe<void (int) >(wmain::__l8::void (void)::__l12::void (int) && <an_0>) Line 448 C++
RxBug.exe!wmain::__l8::() Line 21 C++
RxBug.exe!std::_Bind<0,void,void (void) >::_Do_call<>(std::tuple<> _Myfargs, std::_Arg_idx<> __formal) Line 1150 C++
RxBug.exe!std::_Bind<0,void,void (void) >::operator()<>() Line 1138 C++
RxBug.exe!std::_LaunchPad<std::_Bind<0,void,void (void) > >::_Run(std::_LaunchPad<std::_Bind<0,void,void (void) > > * _Ln) Line 196 C++
RxBug.exe!std::_LaunchPad<std::_Bind<0,void,void (void) > >::_Go() Line 187 C++
msvcp120d.dll!_Call_func(void * _Data) Line 28 C++
msvcr120d.dll!_callthreadstartex() Line 376 C
msvcr120d.dll!_threadstartex(void * ptd) Line 359 C
kernel32.dll!@BaseThreadInitThunk@12๏ฟฝ() Unknown
ntdll.dll!___RtlUserThreadStart@8๏ฟฝ() Unknown
ntdll.dll!__RtlUserThreadStart@8๏ฟฝ() Unknown

heap-use-after-free in rx-subject.hpp

When I was testing with Thread Sanitizer, it kept complaining about this piece of code in rx-subject.hpp and it took a while for me to realize the problem here.

    if (b->current_generation != b->state->generation) {
        std::unique_lock<std::mutex> guard(b->state->lock);
        b->current_generation = b->state->generation;
        b->current_completer = b->completer;
    }

    auto current_completer = b->current_completer;

What happens if Thread-A updates b->current_completer just before Thread-B assigns b->current_completer to the local variable? Assuming no one else is holding reference to b->current_completer, the instance will be destroyed by Thread-A and Thread-B may start operating on the destructed heap space. Can you confirm?

The obvious solution is to not depend on generation numbers and always acquire the mutex and assign b->completer to a local variable. I can submit a patch if we agree on the issue and a possible solution.

lifetime issue

This is similar to #127.

consider this:

auto published_observable =
    rxcpp::sources::create<int>([](const rxcpp::subscriber<int>& s)
    {
        for (int i = 0; i < 1000; i++)
        {
            if (!s.is_subscribed())
                return;
            s.on_next(i);
        }
    })
    .filter([](int i)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << i << std::endl;
        return true;
    })
    .subscribe_on(rxcpp::observe_on_new_thread())
    .publish()
    ;

auto subscription = published_observable.connect();
std::this_thread::sleep_for(std::chrono::seconds(1));
subscription.unsubscribe();
std::cout << "unsubscribed" << std::endl;

I'd expect this to print out 10ish integers followed by "unsubscribed". However all 1000 integers are printed out.

In my specific use case, I am actually implementing rxcpp::sources::source_base and creating an observable from it.

    class av_source
        : public rxcpp::sources::source_base<std::shared_ptr<av_sample>>
    {
    public:
        av_source(ipc::media::clip clip);
        void on_subscribe(rxcpp::subscriber<std::shared_ptr<av_sample>> destination) const;

    private:
        const ipc::media::clip clip_;
    };

    inline rxcpp::observable<std::shared_ptr<av_sample>> demux(clip clip)
    {
        return rxcpp::observable<std::shared_ptr<av_sample>, av_source>(av_source(std::move(clip)));
    }

So really, I need to know how to quit processing inside of my source_base implementation. However the issue also presents itself in rxcpp::sources::create.

Thanks,

Jeff

[Question] Does the return value of observable.subscribe() disposable ? and How?

Q1) Does "rx::observable<>::create(on_subscribe)" store return value of on_subscribe?
I want "return value of on_subscribe == on_dispose"
Q2) How can I manually dispose subscription and register on_dispose ?
...
auto subscription = observable.subscribe(handler_next, handler_error, handler_completed);
?? subscription.dispose() ??
...

Reducing of empty observable

When Reduce, Average, First, or Last operator is applied to an empty sequence, on Rx.NET and RxJava it causes OnError().

On RxCpp we have different behavior:
reduce returns result_selector(seed)
average returns 0 (T() to be correct)
first calls abort()
last calls abort()
I propose to call OnError() for all operators. @kirkshoop, please let me know your opinion.

Can't create observable

Hello,

I can't create an observable like this "cep" example :
https://github.com/Reactive-Extensions/RxCpp/blob/bd97cedb5a11c0666c14efa9d5abb9769480a884/Rx/v2/examples/cep/main.cpp#L19

IntelliSense: no instance of function template "rxcpp::observable<void, void>::create" matches the argument list
    argument types are: (lambda []void (rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int>>> dest)->void) ...  

Is the example broken or did I do something wrong?

Backpressure operators

From what I saw, in RxCpp operators like onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest described here are not implemented.
Is there a plan to implement them in the near future?

Master branch vs. release/v1.0.2 branch

I am looking to write code that uses this library and am wondering what the difference is between branch master and branch release/v1.0.2 - it seems like master is currently being worked on, as it was updated earlier this week and the other was last updated in February.

However, the master branch doesn't seem to define a method for CreateObservable to create an observable from a function. The other branch does define such a method. Is there a reason for this lack of functionality in the master branch (ie, it's still being worked on and isn't ready for use yet)? Which branch would you recommend using?

Observer / observable as member of a class

Hello,

I created an observer by applying the onNext and onError functions.
After I get an observer from the subscriber (by creating an observable on same data type).
This observers, get from 2 differents sources, but handling the same data type, seams to have different signatures.

How to align the signature of this 2 observers get from different sources, and copy to a member of a class? (No "auto" allowed here).

EDIT:
Similar problem with the observable.
I create it by setting to it a lambda. I need to copy this observable in a member of a class.
Here is the error displayed by vc++

Error C2440: 'return' : cannot convert from 'rxcpp::observable<T,rxcpp::sources::detail::create<T,Scene::onMessage::<lambda_4e1311247554cfa7219cec4f9e2964dd>>>' to 'rxcpp::observable<int,rxcpp::dynamic_observable<T>>' ...

The signature I use is rx::observable<int>

How do avoid racing conditions with is_subscribed

Hi!

I feel somewhat uncomfortable with is_subscribed. There is a sequence point after calling is_subscribed. So another thread might unsubscribe the subscriber and then s.on_next() gets called on an unsubscribed object? Is this safe? And if yes, why do we call is_subscribed anyway?

auto published_observable =
    rxcpp::sources::create<int>([](const rxcpp::subscriber<int>& s)
    {
        for (int i = 0; i < 1000; i++)
        {
            if (!s.is_subscribed())
                return;
            s.on_next(i);
        }
    })

Create observable with disposable

In some implementation of Rx, when observable is manually created, api allow to optionally provide a manual disposable cleanup.

Here is the example from RxJs

var o = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();
    return function () {
        console.log('disposed');
    };
});

I expect something along this line, but it doesn't seem to be supported.

auto o = rxcpp::observable<>::create<int>([](rxcpp::subscriber<int> s) {
  s.on_next(42);
  s.on_complete();
  return []() { std::cout << "manual disposed" << std::endl; };
});

How can we achieve the same thing with RxCpp?

compilation error when rxcpp/rx.hpp is included after boost/asio.hpp on linux

simple program below causes compile error on linux:

#include <boost/asio.hpp>
#include <rxcpp/rx.hpp>

int main(int ,char**){
return 0;
}

The problem is that asio on linux include termios.h which in turn include bits/termios.h

bits/termios.h defindes B0 which conflicts with
rx-util.hpp
all_true struct.

Solution would be change B0 to another name in rx-util.hpp

Make_event_loop::inner is empty

I encounter a bug when I create two workers using this code :
rxcpp::schedulers::make_event_loop().create_worker()

Not everytime, but sometimes it crash because inner member is empty (in create_worker)
Do you know this issue?

I encounter this issue sometimes on a standalone app, and most of the time with the Unreal Engine 4 (when I call it several times).

Default coordination on iterate

Is there a reason why 'iterate' operator has 'identity_current_thread' as a default coordination, while for example 'from' uses 'identity_immediate' as default? I happend to miss this and assumed immediate and got results I did not expect.
Thanks for a great library!
/Mattias

New schedulers?

I'm working on integration for RxCPP and Boost.ASIO as I think they are perfect match, a test scheduler using boost::asio::io_service can be found at https://gist.github.com/windoze/a6e684143833d5d65ba6 and more complete version is coming.
Before submitting PR, I want to know:

  1. Does RxCPP accept 3rd-party dependencies?
  2. Where to put 3rd party dependent components?

Any suggestions?

Not all source code available in VS2012 solution?

Maybe I missed something, but AFAICS the header files are not included with the VS solution/project generated by CMake-3.2.2, are they? I can only access these via external dependencies.
I do not see any files from the Rx/v2/src/rxcpp directory included directly. Is this by purpose and can we add them to the solution?

image

building rxcpp on windows

IDE build also the makefile build ends up in errors:

Error [some error number] error C3203: 'value_type_t' : unspecialized alias template can't be used as a template argument for template parameter 'T', expected a real type [some files]

Seems the VC++ compiler not fully support these C++11 features?
Any solution for that?

Observing on main thread

In RxAndroid, there's .observeOn(AndroidSchedulers.mainThread()). How do you achieve that in RxCpp?

For example, how do I ensure that the call to printf below is executed on the main thread?

auto numbers = rxcpp::observable<>::range(1, 100).subscribe_on(rxcpp::observe_on_new_thread());
numbers
    .map([](int n) { return n * 2; })
    .map([](int n) { return n + 1; })
    .as_blocking()
    .subscribe([](int n)
    {
        printf("Number: %d\n", n);
    });

operator `distinct`

It looks only operator distinct_until_changed is implemented. But there is no distinct. Any reasons for that?

I can implement it (e.g. using unordered_set) and issue a PR if you wish..

Memory leak in multicast_observer

I haven't looked at it in much detail, I suspect it's a shared_ptr cycle.

#include <rxcpp/rx.hpp>
int main()
{
   rxcpp::composite_subscription cs;
   rxcpp::subjects::detail::multicast_observer<int> mco(cs);
}
g++-5 -std=c++14 -fsanitize=address rx.cpp -I RxCpp/Rx/v2/src/ && ./a.out

=================================================================
==4642==ERROR: LeakSanitizer: detected memory leaks

Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x408ef2 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x408ef2)
    #2 0x4084f6 in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) (/home/ben/development/test/a.out+0x4084f6)
    #3 0x407883 in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> >&) (/home/ben/development/test/a.out+0x407883)
    #4 0x4071fd in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>>(std::_Sp_make_shared_tag, rxcpp::detail::composite_subscription_inner::composite_subscription_state*, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> const&) (/home/ben/development/test/a.out+0x4071fd)
    #5 0x406a9d in std::__shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>>(std::_Sp_make_shared_tag, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> const&) (/home/ben/development/test/a.out+0x406a9d)
    #6 0x40619b in std::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state>::shared_ptr<std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>>(std::_Sp_make_shared_tag, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> const&) (/home/ben/development/test/a.out+0x40619b)
    #7 0x404d87 in std::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::allocate_shared<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>>(std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> const&) (/home/ben/development/test/a.out+0x404d87)
    #8 0x403b00 in std::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::make_shared<rxcpp::detail::composite_subscription_inner::composite_subscription_state>() (/home/ben/development/test/a.out+0x403b00)
    #9 0x402ccc in rxcpp::detail::composite_subscription_inner::composite_subscription_inner() (/home/ben/development/test/a.out+0x402ccc)
    #10 0x402f73 in rxcpp::composite_subscription::composite_subscription() (/home/ben/development/test/a.out+0x402f73)
    #11 0x401999 in main (/home/ben/development/test/a.out+0x401999)
    #12 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x40b060 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x40b060)
    #2 0x40ae54 in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) (/home/ben/development/test/a.out+0x40ae54)
    #3 0x40ab4f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> >&) (/home/ben/development/test/a.out+0x40ab4f)
    #4 0x40a8ee in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, rxcpp::subjects::detail::multicast_observer<int>::state_type*, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a8ee)
    #5 0x40a750 in std::__shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::state_type, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a750)
    #6 0x40a65f in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::state_type>::shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a65f)
    #7 0x40a537 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::state_type> std::allocate_shared<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, rxcpp::composite_subscription&>(std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a537)
    #8 0x40a36b in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::state_type> std::make_shared<rxcpp::subjects::detail::multicast_observer<int>::state_type, rxcpp::composite_subscription&>(rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a36b)
    #9 0x40a0cc in rxcpp::subjects::detail::multicast_observer<int>::binder_type::binder_type(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x40a0cc)
    #10 0x409b69 in _ZN9__gnu_cxx13new_allocatorIN5rxcpp8subjects6detail18multicast_observerIiE11binder_typeEE9constructIS6_IRNS1_22composite_subscriptionEEEEvPT_DpOT0_ (/home/ben/development/test/a.out+0x409b69)
    #11 0x4096bd in _ZNSt16allocator_traitsISaIN5rxcpp8subjects6detail18multicast_observerIiE11binder_typeEEE12_S_constructIS5_IRNS0_22composite_subscriptionEEEENSt9enable_ifIXsrSt6__and_IINS7_18__construct_helperIT_IDpT0_EE4typeEEE5valueEvE4typeERS6_PSE_DpOSF_ (/home/ben/development/test/a.out+0x4096bd)
    #12 0x4092df in _ZNSt16allocator_traitsISaIN5rxcpp8subjects6detail18multicast_observerIiE11binder_typeEEE9constructIS5_IRNS0_22composite_subscriptionEEEEDTcl12_S_constructfp_fp0_spcl7forwardIT0_Efp1_EEERS6_PT_DpOSB_ (/home/ben/development/test/a.out+0x4092df)
    #13 0x408c48 in std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2>::_Sp_counted_ptr_inplace<rxcpp::composite_subscription&>(std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x408c48)
    #14 0x40801f in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, rxcpp::subjects::detail::multicast_observer<int>::binder_type*, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40801f)
    #15 0x407550 in std::__shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x407550)
    #16 0x406bbf in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type>::shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x406bbf)
    #17 0x406383 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type> std::allocate_shared<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x406383)
    #18 0x404f95 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type> std::make_shared<rxcpp::subjects::detail::multicast_observer<int>::binder_type, rxcpp::composite_subscription&>(rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x404f95)
    #19 0x404008 in rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x404008)
    #20 0x4019c8 in main (/home/ben/development/test/a.out+0x4019c8)
    #21 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 96 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x409621 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x409621)
    #2 0x40916a in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) (/home/ben/development/test/a.out+0x40916a)
    #3 0x408a3f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> >&) (/home/ben/development/test/a.out+0x408a3f)
    #4 0x407f92 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, rxcpp::subjects::detail::multicast_observer<int>::binder_type*, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x407f92)
    #5 0x407550 in std::__shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x407550)
    #6 0x406bbf in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type>::shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x406bbf)
    #7 0x406383 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type> std::allocate_shared<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x406383)
    #8 0x404f95 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type> std::make_shared<rxcpp::subjects::detail::multicast_observer<int>::binder_type, rxcpp::composite_subscription&>(rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x404f95)
    #9 0x404008 in rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x404008)
    #10 0x4019c8 in main (/home/ben/development/test/a.out+0x4019c8)
    #11 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x409532 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x409532)
    #2 0x408fbe in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) (/home/ben/development/test/a.out+0x408fbe)
    #3 0x408711 in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> >&) (/home/ben/development/test/a.out+0x408711)
    #4 0x407c60 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, rxcpp::detail::composite_subscription_inner>(std::_Sp_make_shared_tag, rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>*, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > const&, rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x407c60)
    #5 0x407456 in std::__shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, rxcpp::detail::composite_subscription_inner>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > const&, rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x407456)
    #6 0x406b5f in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >::shared_ptr<std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, rxcpp::detail::composite_subscription_inner>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > const&, rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x406b5f)
    #7 0x406225 in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::allocate_shared<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, rxcpp::detail::composite_subscription_inner>(std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > const&, rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x406225)
    #8 0x404ebe in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::make_shared<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x404ebe)
    #9 0x403c82 in rxcpp::subscription::subscription<rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner, std::enable_if<!rxcpp::is_subscription<rxcpp::detail::composite_subscription_inner>::value, void**>::type) (/home/ben/development/test/a.out+0x403c82)
    #10 0x402fa2 in rxcpp::composite_subscription::composite_subscription() (/home/ben/development/test/a.out+0x402fa2)
    #11 0x401999 in main (/home/ben/development/test/a.out+0x401999)
    #12 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x40a414 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x40a414)
    #2 0x40a1ae in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2>&, unsigned long) (/home/ben/development/test/a.out+0x40a1ae)
    #3 0x409c1b in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2> > >(std::__allocated_ptr&) (/home/ben/development/test/a.out+0x409c1b)
    #4 0x4097ee in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(std::_Sp_make_shared_tag, rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >*, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > const&, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x4097ee)
    #5 0x4093ae in std::__shared_ptr<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(std::_Sp_make_shared_tag, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > const&, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x4093ae)
    #6 0x408d59 in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > >::shared_ptr<std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(std::_Sp_make_shared_tag, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > const&, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x408d59)
    #7 0x408219 in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > > std::allocate_shared<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > const&, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x408219)
    #8 0x407620 in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > > std::make_shared<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x407620)
    #9 0x406ec2 in rxcpp::subscription::subscription<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >(rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>, std::enable_if<!rxcpp::is_subscription<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >::value, void**>::type) (/home/ben/development/test/a.out+0x406ec2)
    #10 0x406533 in std::enable_if<rxcpp::detail::is_unsubscribe_function<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>::value, rxcpp::subscription>::type rxcpp::make_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(rxcpp::detail::is_unsubscribe_function&&) (/home/ben/development/test/a.out+0x406533)
    #11 0x4051f9 in std::enable_if<rxcpp::detail::is_unsubscribe_function<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>::value, std::weak_ptr<rxcpp::subscription::base_subscription_state> >::type rxcpp::composite_subscription::add<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(rxcpp::detail::is_unsubscribe_function) const (/home/ben/development/test/a.out+0x4051f9)
    #12 0x40406c in rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x40406c)
    #13 0x4019c8 in main (/home/ben/development/test/a.out+0x4019c8)
    #14 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x408e71 in __gnu_cxx::new_allocator<std::_Rb_tree_node<rxcpp::subscription> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x408e71)
    #2 0x40847c in std::allocator_traits<std::allocator<std::_Rb_tree_node<rxcpp::subscription> > >::allocate(std::allocator<std::_Rb_tree_node<rxcpp::subscription> >&, unsigned long) (/home/ben/development/test/a.out+0x40847c)
    #3 0x407730 in std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_M_get_node() (/home/ben/development/test/a.out+0x407730)
    #4 0x40708d in std::_Rb_tree_node<rxcpp::subscription>* std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_M_create_node<rxcpp::subscription const&>(rxcpp::subscription const&) (/home/ben/development/test/a.out+0x40708d)
    #5 0x4069d9 in std::_Rb_tree_node<rxcpp::subscription>* std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_Alloc_node::operator()<rxcpp::subscription const&>(rxcpp::subscription const&) const (/home/ben/development/test/a.out+0x4069d9)
    #6 0x405c11 in std::_Rb_tree_iterator<rxcpp::subscription> std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_M_insert_<rxcpp::subscription const&, std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_Alloc_node>(std::_Rb_tree_node_base*, std::_Rb_tree_node_base*, rxcpp::subscription const&, std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_Alloc_node&) (/home/ben/development/test/a.out+0x405c11)
    #7 0x404930 in std::pair<std::_Rb_tree_iterator<rxcpp::subscription>, bool> std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_M_insert_unique<rxcpp::subscription const&>(rxcpp::subscription const&) (/home/ben/development/test/a.out+0x404930)
    #8 0x403871 in std::set<rxcpp::subscription, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::insert(rxcpp::subscription const&) (/home/ben/development/test/a.out+0x403871)
    #9 0x40293c in rxcpp::detail::composite_subscription_inner::composite_subscription_state::add(rxcpp::subscription) (/home/ben/development/test/a.out+0x40293c)
    #10 0x402ea8 in rxcpp::detail::composite_subscription_inner::add(rxcpp::subscription) const (/home/ben/development/test/a.out+0x402ea8)
    #11 0x40310e in rxcpp::composite_subscription::add(rxcpp::subscription) const (/home/ben/development/test/a.out+0x40310e)
    #12 0x405210 in std::enable_if<rxcpp::detail::is_unsubscribe_function<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>::value, std::weak_ptr<rxcpp::subscription::base_subscription_state> >::type rxcpp::composite_subscription::add<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(rxcpp::detail::is_unsubscribe_function) const (/home/ben/development/test/a.out+0x405210)
    #13 0x40406c in rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x40406c)
    #14 0x4019c8 in main (/home/ben/development/test/a.out+0x4019c8)
    #15 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

SUMMARY: AddressSanitizer: 528 byte(s) leaked in 6 allocation(s).

compile error when scan is followed by other operator.

It seems to be omitted const qualifier in scan::on_subscribe function.

this is my test code:

#include <iostream>
#include <chrono>
#include "rxcpp/rx.hpp"

namespace rx = rxcpp;
namespace rxsc = rxcpp::schedulers;

int main()
{
  auto sc = rxsc::make_current_thread();
  auto so = rx::synchronize_in_one_worker(sc);
  auto start = sc.now() + std::chrono::seconds(2);
  auto period = std::chrono::seconds(1);

  rx::observable<>::interval(start, period, so)
    .scan(0, [] (int a, int i) { return a + i; })
    .map([] (int i) { return i * i; })
    .subscribe([] (int i) { std::cout << i << std::endl; });
}

g++ gives this error messages :

.../ext/rxcpp/Rx/v2/src/rxcpp/operators/rx-lift.hpp:65:16: error: no matching member function for call to 'on_subscribe'
        source.on_subscribe(std::move(lifted));
        ~~~~~~~^~~~~~~~~~~~
...
.../ext/rxcpp/Rx/v2/src/rxcpp/operators/rx-scan.hpp:48:10: note: candidate function not viable: 'this' argument has type 'const source_operator_type' (aka 'const rxcpp::operators::detail::scan<long, rxcpp::observable<long, rxcpp::sources::detail::interval<rxcpp::synchronize_in_one_worker> >, <lambda at
      main.cpp:17:14>, int>'), but method is not marked const
    void on_subscribe(Subscriber o) {
         ^

async_subject

Here's my first pass at adding 'async_subject' and 'to_async(f)':

#192

Please let me know what you think...

memory leak when using repeat(void)

after running snippet below memory increase infinitely

#include <rxcpp/rx.hpp>
#include <iostream>

int main(int argc,char** argv)
{
    rxcpp::sources::range(1,10)
    .repeat()
    .subscribe([](auto i)
    {
        std::cout << i << std::endl;
    });
    return 0;
}

unqualified `ptrdiff_t` & `size_t` use

Is this intentional?

As of GCC 4.9 this is no longer supported (and it was never std. compliant) -- see "Header changes": https://gcc.gnu.org/gcc-4.9/porting_to.html
// Edit: this appears to refer to the header inclusion missing entirely.

Currently #include <cstddef> is used in two places:

A fix would be to either:

  • fully qualify each use of std::ptrdiff_t and std::size_t (there aren't that many)
  • replace #include <cstddef> with #include <stddef.h>

I presume the choice may very well be a matter of preference :-)

zip tests fail with gcc-4.9.2 on linux

I've been having some problems with zip and gcc-4.9, but couldn't run the tests due to the bug in libstdc++ with uncaught_exception.

I've build libstdc++ with the fix for exception_ptr:
https://gcc.gnu.org/ml/gcc-patches/2015-02/msg00027.html

Here is the test output:

Scenario: zip return/return
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.1.cpp:578
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.1.cpp:618: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @220-on_next( 5), @240-on_completed() }
  ==
  { @220-on_next( 65534), @240-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip left completes first
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:132
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:172: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @215-on_next( 6), @225-on_completed() }
  ==
  { @215-on_next( 65534), @225-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip right completes first
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:194
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:234: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @215-on_next( 6), @225-on_completed() }
  ==
  { @215-on_next( 65534), @225-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip typical N
     Given: N hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:377
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:420: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @360-on_next( 136), @560-on_next( 392), @800-on_completed() }
  ==
  { @360-on_next( 524272), @560-on_next( 524272), @800-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip interleaved with tail
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:437
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:482: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @220-on_next( 5), @230-on_next( 9), @250-on_completed() }
  ==
  { @220-on_next( 65534), @230-on_next( 65534), @250-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip consecutive
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:504
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:547: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @235-on_next( 8), @240-on_next( 11), @250-on_completed() }
  ==
  { @235-on_next( 65534), @240-on_next( 65534), @250-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip consecutive ends with error right
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints followed by an error
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:634
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:679: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @235-on_next( 8), @240-on_next( 11), @245-on_error(zip on_error from
  source) }
  ==
  { @235-on_next( 65534), @240-on_next( 65534), @245-on_error(zip on_error from
  source) }

===============================================================================
test cases: 204 | 197 passed | 7 failed
assertions: 657 | 650 passed | 7 failed`

I haven't really looked into it yet, but I can tell you that with clang-3.5 address sanitizer does not complain. Unfortunately, I cannot link the tests using clang-3.6 (there's a problem with __attribute__((weak)))

Consider renaming of root namespace (rxcpp)

The cpp in the rxcpp namespace seems superfluous. I therefore suggest that a switch of the base namespace should be considered.

Rx.Net uses the namespace reactive and RxJava uses the namespace rx. So the two natural suggestions would be "reactive" and "rx".

I would prefer "rx" since it reads much better (in the test code @kirkshoop has aliased the rxcpp namespace to rx). I know however that @kirkshoop considers it too short and that it therefore could clash with the name of classes or a namespaces in existing codebases. However, I would argue that an existing codebase could just as well have a class called reactive as a class called rx (don't have any data whatsoever to back that up though :). It seems less likely that a user have class called rxcpp, so I guess that this comes down to a tradeoff between "grade of uniqueness" vs. "nice and readable".

interval(TimePoint when) ambiguity

Actual implementation of interval(TimePoint when) waits for the specified time point and then starts emitting integers infinitely:

    auto scheduler = rxcpp::identity_current_thread();
    auto start = scheduler.now();
    auto values = rxcpp::observable<>::interval(start, scheduler);
    auto s = values.
        map([&](int prime) { 
            return std::make_tuple(std::chrono::duration_cast<std::chrono::milliseconds>(scheduler.now() - start).count(), prime);
        });
    s.
        take(3).
        subscribe(
            rxcpp::util::apply_to(
                [](__int64 t, int v){printf("OnNext: %d. %ld ms after the first event\n", v, t);}),
            [](){printf("OnCompleted\n");});
OnNext: 1. 0 ms after the first event
OnNext: 2. 0 ms after the first event
OnNext: 3. 0 ms after the first event
OnCompleted

It happens because the default value of period is rxsc::scheduler::clock_type::duration::max(), and that causes integer overflow on the next occurence time calculating: *target += period;. So we schedule the next integer in deep past and emit it immediately.

Is this the intended behavior or an issue?

Test suite fails on ARM GCC

Hi, we are having some problems with RxCpp on ARM platform. RxCpp test suite is not passing, however on x86 and x86_64 platforms it runs just fine.

Target platform and toolchain details:
ARM Cortex-A8 VFPv3-D16
glibc 2.21
gcc-5.2.0
same errors were also generated on toolchain based on gcc-4.9.2 and uClibc (same hardware).

Test suite output:
http://hastebin.com/cugojuvexe.coffee

Coverting observable<T> to observable<list<T>>

What's the equivalent of RxJava's toList in RxCpp?

auto numbers = rxcpp::observable<>::range(1, 100);
numbers
    ./* convert rxcpp::observable<int> to rxcpp::observable<std::list<int>>> */
    .subscribe([](std::list<int> numbers)
    {
        /* do stuff */
    });

Memory Leak related to concurrency

There is a fairly easy way to leak memory in rxcpp. It only occurs once you introduced concurrency.

Here are a few contrived examples which create a leak:

while (true)
{
    rxcpp::sources::interval(std::chrono::milliseconds(100))
        .subscribe_on(rx::observe_on_new_thread())
        .take_until(rx::sources::timer(std::chrono::milliseconds(1)))
        .as_blocking()
        .subscribe();
}

this will do it too:

while (true)
{
    rxcpp::sources::range(1, 1000000000)
        .subscribe_on(rx::observe_on_new_thread())
        .filter([](int x) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); return true; })
        .take_until(rx::sources::timer(std::chrono::milliseconds(1)))
        .as_blocking()
        .subscribe();
}

and this one:

while (true)
{
    auto pipeline =
        rxcpp::sources::range(1, 1000000000)
        .filter([](int x) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); return true; })
        ;

    auto subscription = pipeline.subscribe_on(rx::observe_on_new_thread()).publish().connect();

    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    subscription.unsubscribe();
}

In my real world use case, I have my own source which produces multimedia samples and is implemented using the rxcpp scheduling abstractions (i.e. not just a loop, refer to issue #129). The examples above appear to be leaking an individual value in the pipeline (educated guess). In my case, my individual value is a multimedia sample and is much bigger than an int! So the memory leak is very noticeable.

Hopefully I've provided enough examples where someone much smarter than myself can chase down the cause.

Thanks,

Jeff

Implementation of catch operators

Are there any plans for implementing the catch operators? I wanted to use onErrorResumeNext, but I realized it's not implemented yet for RxCpp. I'm using RxCpp to write a web api client for my application. Basically, I want to be able to do something like this:

apiClient.fetchSomeResource(token)
   .on_error_resume_next([apiClient](std::exception_ptr e) {
      try {
         std::rethrow_exception(e);
      } catch (const UnauthorizedHttpException &e) {

         // When server returns 401, call authenticate() to get
         // a new token, and then recall the same api
         return apiClient.authenticate()
            .flat_map([](const AuthToken &token) {
               return apiClient.fetchSomeResource(token);
            }, [](const AuthToken &token, Resource resource) {
               return resource;
            });

      } catch (const std::exception &e) {
         std::rethrow_exception(e);
      }
   })
   .subscribe([](const Resource &resource) {
      //...
   });

[Question] rxcpp protects on_next. Is this intentional?

Q1 : rxcpp protects on_next. Is this intentional?

...
  auto handler_next      = [ ] (evt_t i) {throw "aaa";};
  auto handler_error     = [ ] (auto  e) { console::error("on_error: "s + e); };
...
  auto observable   = rx::observable<>::create(on_subscribe);
  auto subscription = observable.subscribe(handler_next,
                                           handler_error,
                                           handler_completed);
...
output>
[11:47:53] [LOG]   ----- test start -----
[11:47:53] [ERROR] on_error: aaa
[11:47:53] [LOG]   on_dispose
[11:47:53] [LOG]   ----- test end    -----

But according to

https://github.com/Reactive-Extensions/RxJS/tree/master/doc/designguidelines#53-protect-calls-to-user-code-from-within-an-operator

Note: do not protect calls to subscribe, dispose, onNext, onError and onCompleted methods. 
These calls are on the edge of the monad. 
Calling the onError method from these places will lead to unexpected behavior.

Q2 : Does Rx Specification exist ? where?

Builderror on linux 64 bit gcc

I pasted this code in my terminal while being in the root of RxCpp:

mkdir projects/build
cd projects/build
cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ../CMake
make```
I got the following output / error message: 

~/dvl/cpp/RxCpp/projects/build$ cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ../CMake
-- The C compiler identification is GNU 4.9.2
-- The CXX compiler identification is GNU 4.9.2
-- Check for working C compiler: /usr/bin/gcc
-- Check for working C compiler: /usr/bin/gcc -- works
-- Detecting C compiler ABI info
-- Detecting C compiler ABI info - done
-- Check for working CXX compiler: /usr/bin/g++
-- Check for working CXX compiler: /usr/bin/g++ -- works
-- Detecting CXX compiler ABI info
-- Detecting CXX compiler ABI info - done
-- Looking for include file pthread.h
-- Looking for include file pthread.h - found
-- Looking for pthread_create
-- Looking for pthread_create - not found
-- Looking for pthread_create in pthreads
-- Looking for pthread_create in pthreads - not found
-- Looking for pthread_create in pthread
-- Looking for pthread_create in pthread - found
-- Found Threads: TRUE
-- CMAKE_CXX_COMPILER_ID: GNU
-- using gnu settings
-- RXCPP_DIR: /home/dhaeb/dvl/cpp/RxCpp
-- Found Doxygen: /usr/bin/doxygen (found version "1.8.6")
-- Configuring done
-- Generating done
-- Build files have been written to: /home/dhaeb/dvl/cpp/RxCpp/projects/build
~/dvl/cpp/RxCpp/projects/build$ make
Scanning dependencies of target one_test
[ 2%] Building CXX object CMakeFiles/one_test.dir/home/dhaeb/dvl/cpp/RxCpp/Rx/v2/test/test.cpp.o
/home/dhaeb/dvl/cpp/RxCpp/Rx/v2/test/test.cpp:11:21: fatal error: catch.hpp: No such file or directory
#include "catch.hpp"
^
compilation terminated.
make[2]: *** [CMakeFiles/one_test.dir/home/dhaeb/dvl/cpp/RxCpp/Rx/v2/test/test.cpp.o] Error 1
make[1]: *** [CMakeFiles/one_test.dir/all] Error 2
make: *** [all] Error 2

Where is the catch.hpp file? A ```find . -name "catch.hpp"``` couldn't find it ether... 

CReate a dynamic stream (Observable)

Hello,

I would like to implement a simple example of CEP using your library. My example is the following : I have two streams (Observables). When I press the 'a' key, an event/message is created in one stream and when I press 'g' an event is created in the other stream. Then I would like to join this two streams and do something.

However, I don't know how to create an 'infinite' observable with your library. I think that I have to use the method rxcpp::observable<>::create(). But, I have no idea about which parameter I should use.

I have already implemented this example in JAVA : the parameter is an "Action1".

Is it the same thing in C++? Do you have any idea, or example, to help me?

Thank you in advance for your help,

Best regards,

Compiling for iOS

Clang gives this error when compiling for iOS (arm64):

In file included from ../deps/rxcpp/Rx/v2/src/rxcpp/rx.hpp:8:
In file included from ../deps/rxcpp/Rx/v2/src/rxcpp/rx-includes.hpp:124:
In file included from ../deps/rxcpp/Rx/v2/src/rxcpp/rx-scheduler.hpp:861:
../deps/rxcpp/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp:35:16: error: thread-local storage is unsupported for the current target
        static RXCPP_THREAD_LOCAL current_thread_queue_type* queue;

Version info (Xcode 5.1.1):

Apple LLVM version 5.1 (clang-503.0.40) (based on LLVM 3.4svn)
Target: x86_64-apple-darwin13.2.0
Thread model: posix

I get this for both device and simulator builds. Apologies if Rx isn't supported on iOS yet. I'm curious to check it out!

connect/publish threading issue

consider this:

auto published_observable =
    rx::sources::range(1, 100)
    .filter([](int i)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << i << std::endl;
        return true;
    })
    .subscribe_on(rx::observe_on_new_thread())
    .publish();

auto subscription = published_observable.connect();
std::this_thread::sleep_for(std::chrono::seconds(1));
subscription.unsubscribe();
std::cout << "unsubscribed" << std::endl;

I expect this to print 10ish numbers then "unsubscribed". However it prints the entire range up to 100. Am I missing something? I looked around at the source for publish and connect but I couldn't figure out a solution to my problem. I need to be able to unsubscribe the connect()ed subscription.

Thanks for your help,

Jeff

feedback on coordination

The goal of coordination is to remove all synchronization from the 'join' and time operators. Re-implementing an optimized serialization of calls to multiple subscribers in each operator obscures the functionality, duplicates the same work over and over again and slows down the cases where all the sources are already serialized (UI events etc..).

An operator passes all the sources through the optional coordination and uses the returned observables instead. These are expected to never overlap calls across all the observers subscribed to them.

This results in multiple implementations of coordination that accomplish the goal in different ways with different tradeoffs, but the operators do not care. The default impl of coordination passes the original source through unchanged which results in the default having no synch overhead.

Code does not compile

The rx libraries are hardcoded to be unicode (L"foo") everywhere, and yet in some places you're using ambiguous notation that can be compiled under ANSI. This leads to errors and inability to compile. Would be much better if you either used the right macros (e.g., _T) or wrote everything in Unicode (e.g. CreateWindowExW).

And on the subject, why on earth are Windows involved in all of this? I was kind of hoping that this library would be usable on Linux. This now looks unlikely.

Is Ix supported?

I couldn't found a NuGet package with Ix.
I found more TODOs in linq.hpp:
// TODO: groupby(keyfn, eq)
// TODO: join...
// TODO: average
// TODO: concat
// TODO: default_if_empty
// TODO: distinct()
// TODO: distinct(cmp)
// TODO: except(second)
// TODO: except(second, eq)
// TODO: intersect(second)
// TODO: intersect(second, eq)
// TODO: order_by(sel)
// TODO: order_by(sel, less)
// TODO: order_by_descending(sel)
// TODO: order_by_descending(sel, less)
// TODO: sequence_equal(second)
// TODO: sequence_equal(second, eq)
// TODO: single / single_or_default
// TODO: skip_while(pred)
// TODO: sum
// TODO: take_while
// TODO: then_by / then_by_descending ?
// TODO: to_...
// TODO: union(second)
// TODO: union(eq)
// TODO: zip

Did you stop develop this very important project?
What is state of Ix part?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.