Coder Social home page Coder Social logo

Comments (11)

Neverlord avatar Neverlord commented on June 22, 2024

I guess I need some more informations about your use case. "Cannot access 0x2" looks like a dereferenced nullptr. However, in unicast_network.cpp line 78 is an io_stream_ptr accessed which cannot be nullptr, since cppa throws an exception in case of errors beforehand. I cannot reproduce a bug from the snippet above.

Are you calling remote_actor from several threads/actors in parallel? Or is there anything else in your setup I that might affect the call to remote_actor?

from actor-framework.

ArtemGr avatar ArtemGr commented on June 22, 2024

Seems to me that could only happen if new returned NULL. Maybe an Electric Fence artefact. Sorry I haven't looked at the trace site first.

from actor-framework.

ArtemGr avatar ArtemGr commented on June 22, 2024

On further investigation it is not an Electrif Fence falut, but I can't make much sense of it yet.

from actor-framework.

ArtemGr avatar ArtemGr commented on June 22, 2024

Here is it from a different vantage point (Valgrind):

==32344== Syscall param socketcall.recvfrom(buf) points to unaddressable byte(s)
==32344==    at 0x8DDA54C: recv (recv.c:34)
==32344==    by 0x801BE30: cppa::detail::ipv4_io_stream::read(void*, unsigned long) (ipv4_io_stream.cpp:68)
==32344==    by 0x8065845: cppa::remote_actor(std::pair<cppa::intrusive_ptr<cppa::util::input_stream>, cppa::intrusive_ptr<cppa::util::output_stream> >) (unicast_network.cpp:77)
==32344==    by 0x8065BC0: cppa::remote_actor(char const*, unsigned short) (unicast_network.cpp:104)
==32344==    by 0x5149703: img2::Service::scaleRpc(img2::ScaleReply&, std::string const&, std::string const&, std::string const&, int, int, int) (service.cpp:190)
==32344==    by 0x514A870: img2::Service::serve(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:327)
==32344==    by 0x514B62C: img2::TntService::operator()(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:80)
==32344==    by 0x5406203: tnt::Worker::dispatch(tnt::HttpRequest&, tnt::HttpReply&) (worker.cpp:431)
==32344==    by 0x5406D6E: tnt::Worker::processRequest(tnt::HttpRequest&, std::iostream&, unsigned int) (worker.cpp:240)
==32344==    by 0x5407886: tnt::Worker::run() (worker.cpp:139)
==32344==    by 0x51540F1: cxxtools::DetachedThread::exec() (thread.h:315)
==32344==    by 0x5703AFE: thread_entry (callable.tpp:314)
==32344==  Address 0x2 is not stack'd, malloc'd or (recently) free'd

from actor-framework.

ArtemGr avatar ArtemGr commented on June 22, 2024

So far, I've instrumented your code:

void ipv4_io_stream::read(void* vbuf, size_t len) {
  if ((intptr_t) vbuf < 10) VALGRIND_PRINTF ("trace11, vbuf < 10\n");
    auto buf = reinterpret_cast<char*>(vbuf);
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace12, buf < 10\n");
    size_t rd = 0;
    while (rd < len) {
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace13, buf < 10\n");
  if (rd < 0) VALGRIND_PRINTF ("trace14, rd < 0: %i", (int) rd);
  if (rd > 99111222) VALGRIND_PRINTF ("trace14, rd > 99111222: %i", (int) rd);
        auto recv_result = ::recv(m_fd, buf + rd, len - rd, 0);
        handle_read_result(recv_result, true);
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace15, buf < 10\n");
        if (recv_result > 0) {
            rd += static_cast<size_t>(recv_result);
        }
        if (rd < len) {
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace21, buf < 10\n");
            fd_set rdset;
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace22, buf < 10\n");
            FD_ZERO(&rdset);
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace23, buf < 10\n");
            FD_SET(m_fd, &rdset);
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace24, buf < 10\n");
            if (select(m_fd + 1, &rdset, nullptr, nullptr, nullptr) < 0) {
                throw network_error("select() failed");
            }
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace25, buf < 10\n");
        }
  if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace16, buf < 10\n");
    }
}

and found that "buf" is first corrupted at "trace25":

**19461** trace25, buf < 10
**19461** trace16, buf < 10
**19461** trace13, buf < 10
==19461== Thread 17:
==19461== Syscall param socketcall.recvfrom(buf) points to unaddressable byte(s)
==19461==    at 0x8DDC54C: recv (recv.c:34)
==19461==    by 0x801D0D7: cppa::detail::ipv4_io_stream::read(void*, unsigned long) (ipv4_io_stream.cpp:73)
==19461==    by 0x8066EAF: cppa::remote_actor(std::pair<cppa::intrusive_ptr<cppa::util::input_stream>, cppa::intrusive_ptr<cppa::util::output_stream> >) (unicast_network.cpp:81)
==19461==    by 0x8067236: cppa::remote_actor(char const*, unsigned short) (unicast_network.cpp:108)
==19461==    by 0x514A093: img2::Service::scaleRpc(img2::ScaleReply&, std::string const&, std::string const&, std::string const&, int, int, int) (service.cpp:190)
==19461==    by 0x514B210: img2::Service::serve(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:327)
==19461==    by 0x514BFBC: img2::TntService::operator()(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:80)
==19461==    by 0x5407203: tnt::Worker::dispatch(tnt::HttpRequest&, tnt::HttpReply&) (worker.cpp:431)
==19461==    by 0x5407D6E: tnt::Worker::processRequest(tnt::HttpRequest&, std::iostream&, unsigned int) (worker.cpp:240)
==19461==    by 0x5408886: tnt::Worker::run() (worker.cpp:139)
==19461==    by 0x5154E71: cxxtools::DetachedThread::exec() (thread.h:315)
==19461==    by 0x5704AFE: thread_entry (callable.tpp:314)
==19461==  Address 0x2 is not stack'd, malloc'd or (recently) free'd

from actor-framework.

ArtemGr avatar ArtemGr commented on June 22, 2024

Looks like a bug in select or select-related macros (or GCC optimisations of them), because when I replace select with poll, cppa::detail::ipv4_io_stream::read no longer crashes and the application becomes more stable.

diff --git a/src/ipv4_io_stream.cpp b/src/ipv4_io_stream.cpp
index 8271026..36f06f8 100644
--- a/src/ipv4_io_stream.cpp
+++ b/src/ipv4_io_stream.cpp
@@ -45,6 +45,7 @@
 #   include <sys/socket.h>
 #   include <netinet/in.h>
 #   include <netinet/tcp.h>
+#   include <poll.h>
 #endif

 namespace cppa { namespace detail {
@@ -71,12 +72,9 @@ void ipv4_io_stream::read(void* vbuf, size_t len) {
             rd += static_cast<size_t>(recv_result);
         }
         if (rd < len) {
-            fd_set rdset;
-            FD_ZERO(&rdset);
-            FD_SET(m_fd, &rdset);
-            if (select(m_fd + 1, &rdset, nullptr, nullptr, nullptr) < 0) {
-                throw network_error("select() failed");
-            }
+            struct pollfd fds;
+            fds.fd = m_fd; fds.events = POLLIN; fds.revents = 0;
+            if (poll (&fds, 1, -1) < 0) throw network_error("poll() failed");
         }
     }
 }

The app now crashes less often, in middleman.cpp:787. I'll try to change it to poll as well.

from actor-framework.

ArtemGr avatar ArtemGr commented on June 22, 2024

Here is my attempt patching middleman to use poll:

diff --git a/src/middleman.cpp b/src/middleman.cpp
index 825fbf6..537a8a6 100644
--- a/src/middleman.cpp
+++ b/src/middleman.cpp
@@ -35,6 +35,11 @@
 #include <sstream>
 #include <iostream>

+#ifdef CPPA_WINDOWS
+#else
+#   include <poll.h>
+#endif
+
 #include "cppa/on.hpp"
 #include "cppa/actor.hpp"
 #include "cppa/match.hpp"
@@ -58,7 +63,7 @@

 using namespace std;

-//#define VERBOSE_MIDDLEMAN
+#define VERBOSE_MIDDLEMAN

 #ifdef VERBOSE_MIDDLEMAN
 #define DEBUG(arg) {                                                           \
@@ -441,7 +446,7 @@ bool peer_connection::continue_reading() {
                 binary_deserializer bd(m_rd_buf.data(), m_rd_buf.size());
                 m_meta_msg->deserialize(&msg, &bd);
                 auto& content = msg.content();
-                DEBUG("<-- " << to_string(msg));
+                //DEBUG("<-- " << to_string(msg));
                 match(content) (
                     // monitor messages are sent automatically whenever
                     // actor_proxy_cache creates a new proxy
@@ -670,7 +675,7 @@ class middleman_overseer : public network_channel {
                         DEBUG("message to an unknown peer: " << to_string(out_msg));
                         break;
                     }
-                    DEBUG("--> " << to_string(out_msg));
+                    //DEBUG("--> " << to_string(out_msg));
                     auto had_unwritten_data = peer->has_unwritten_data();
                     try {
                         peer->write(out_msg);
@@ -702,19 +707,17 @@ class middleman_overseer : public network_channel {

 void middleman::operator()(int pipe_fd, middleman_queue& queue) {
     DEBUG("pself: " << to_string(*m_pself));
-    int maxfd = 0;
-    fd_set rdset;
-    fd_set wrset;
-    fd_set* wrset_ptr = nullptr;
+    std::vector<struct pollfd> fdset;
+    bool wrset = false;
     m_channels.emplace_back(new middleman_overseer(this, pipe_fd, queue));
     auto update_fd_sets = [&] {
-        FD_ZERO(&rdset);
-        maxfd = 0;
+        fdset.clear();
         CPPA_REQUIRE(m_channels.size() > 0);
         for (auto& channel : m_channels) {
-            auto fd = channel->read_handle();
-            maxfd = max(maxfd, fd);
-            FD_SET(fd, &rdset);
+            struct pollfd pfd;
+            pfd.fd = channel->read_handle();
+            pfd.events = POLLIN; pfd.revents = 0;
+            fdset.push_back (pfd);
         }
         // check consistency of m_peers_with_unwritten_data
         if (!m_peers_with_unwritten_data.empty()) {
@@ -728,17 +731,17 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
             }
         }
         if (m_peers_with_unwritten_data.empty()) {
-            if (wrset_ptr) wrset_ptr = nullptr;
+            wrset = false;
         }
         else {
             for (auto& peer : m_peers_with_unwritten_data) {
-                auto fd = peer->write_handle();
-                maxfd = max(maxfd, fd);
-                FD_SET(fd, &wrset);
+                struct pollfd pfd;
+                pfd.fd = peer->write_handle();
+                pfd.events = POLLOUT; pfd.revents = 0;
+                fdset.push_back (pfd);
             }
-            wrset_ptr = &wrset;
+            wrset = true;
         }
-        CPPA_REQUIRE(maxfd > 0);
     };
     auto continue_reading = [&](const network_channel_ptr& ch) {
         bool erase_channel = false;
@@ -799,14 +802,13 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
     };
     do {
         update_fd_sets();
-        //DEBUG("select()");
         int sresult;
         do {
-            DEBUG("select() on "
+            DEBUG("poll() on "
                   << (m_peers_with_unwritten_data.size() + m_channels.size())
                   << " sockets");
-            sresult = select(maxfd + 1, &rdset, wrset_ptr, nullptr, nullptr);
-            DEBUG("select() returned " << sresult);
+            sresult = poll (fdset.data(), fdset.size(), -1);
+            DEBUG("poll() returned " << sresult);
             if (sresult < 0) {
                 // try again or die hard
                 sresult = 0;
@@ -819,9 +821,7 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
                     // nfds is negative or the value
                     // contained within timeout is invalid
                     case EINVAL: {
-                        if ((maxfd + 1) < 0) {
-                            CPPA_CRITICAL("overflow: maxfd + 1 > 0");
-                        }
+                        CPPA_CRITICAL("poll EINVAL");
                         break;
                     }
                     case ENOMEM: {
@@ -855,17 +855,27 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
         //DEBUG("continue reading ...");
         { // iterate over all channels and remove channels as needed
             for (auto& ch : m_channels) {
-                if (FD_ISSET(ch->read_handle(), &rdset)) {
-                    continue_reading(ch);
-                }
+                for (auto& pfd: fdset)
+                    if (pfd.fd == ch->read_handle()) {
+                        if (pfd.revents) DEBUG("fd " << pfd.fd << "; read revents: " << pfd.revents);
+                        if (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL))
+                            m_erased_channels.insert (ch);
+                        else if (pfd.revents != 0)
+                            continue_reading(ch);
+                    }
             }
         }
-        if (wrset_ptr) { // iterate over peers with unwritten data
+        if (wrset) { // iterate over peers with unwritten data
             DEBUG("continue writing ...");
             for (auto& peer : m_peers_with_unwritten_data) {
-                if (FD_ISSET(peer->write_handle(), &wrset)) {
-                    continue_writing(peer);
-                }
+                for (auto& pfd: fdset)
+                    if (pfd.fd == peer->write_handle()) {
+                        if (pfd.revents) DEBUG("fd " << pfd.fd << "; write revents: " << pfd.revents);
+                        if (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL))
+                            m_peers_with_unwritten_data.erase (peer);
+                        else if (pfd.revents != 0)
+                            continue_writing(peer);
+                    }
             }
         }
         insert_new_handlers();

In debug output I see poll returning POLLIN and POLLOUT events only.
The number of opened sockets being polled grows indefinitely.

The client code which leads to that growths of opened sockets is, as I've already mentioned earlier:

    using namespace cppa; using std::chrono::seconds;
    actor_ptr img2actor = remote_actor ("127.0.0.1", 1100); // Calling into "img2.scaleService".
    sync_send (img2actor, atom("scale"), id, original, watermark, maxWidth, maxHeight, version) .await (
      on_arg_match >> [&scaled](ScaleReply reply){
        scaled = reply;
      },
      after (seconds (3)) >> [this]{log_error ("scaleRpc: timeout waiting for reply");}
    );

If I change it to

    using namespace cppa; using std::chrono::seconds;
    static actor_ptr img2actor = remote_actor ("127.0.0.1", 1100); // Calling into "img2.scaleService".
    sync_send (img2actor, atom("scale"), id, original, watermark, maxWidth, maxHeight, version) .await (
      on_arg_match >> [&scaled](ScaleReply reply){
        scaled = reply;
      },
      after (seconds (3)) >> [this]{log_error ("scaleRpc: timeout waiting for reply");}
    );

then the number of sockets being polled doesn't grow (remains = 2).

I THINK that the select memory corruption mentioned earlier might be related to that growing number of sockets (reaching some internal limits in select).

IMO, the libcppa fails to automatically close the remote_actor sockets where it should.
libcppa should also guard against reaching select limits.

P.S. In retrospect, I should have used epoll...

from actor-framework.

Neverlord avatar Neverlord commented on June 22, 2024

Thanks a lot for your effort! I think I'll keep your changes (I'm going to review your patch next week). Using poll() is just fine for me, because epoll() isn't available on all platforms (e.g., MacOS).

You're right about automatically disposal of unused remote actors. Right now, the middleman has a reference to the proxy on its own, since it needs to be able to deserialize it. However, this obviously keeps the reference count above zero, even if no other actor has a reference to the proxy any longer. I'll see how to fix that. Again, thanks a lot for pointing this out.

Btw, sorry for the late answer, but I was ill the whole week.

from actor-framework.

ArtemGr avatar ArtemGr commented on June 22, 2024

NP!
If the semantics of my patch are okay then I'd like to do a LINUX version with epoll later (with #ifdef __linux__).

from actor-framework.

Neverlord avatar Neverlord commented on June 22, 2024

Based on your patch, I have updated the middleman to use poll rather than select. However, I haven't fixed the reference-counting problem yet.

from actor-framework.

Neverlord avatar Neverlord commented on June 22, 2024

I have opened a new issue for the reference-counting problem: #75. I'll close this one, since it starts getting too big.

from actor-framework.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.