Comments (11)
I guess I need some more informations about your use case. "Cannot access 0x2" looks like a dereferenced nullptr. However, in unicast_network.cpp line 78 is an io_stream_ptr accessed which cannot be nullptr, since cppa throws an exception in case of errors beforehand. I cannot reproduce a bug from the snippet above.
Are you calling remote_actor from several threads/actors in parallel? Or is there anything else in your setup I that might affect the call to remote_actor?
from actor-framework.
Seems to me that could only happen if new
returned NULL
. Maybe an Electric Fence artefact. Sorry I haven't looked at the trace site first.
from actor-framework.
On further investigation it is not an Electrif Fence falut, but I can't make much sense of it yet.
from actor-framework.
Here is it from a different vantage point (Valgrind):
==32344== Syscall param socketcall.recvfrom(buf) points to unaddressable byte(s)
==32344== at 0x8DDA54C: recv (recv.c:34)
==32344== by 0x801BE30: cppa::detail::ipv4_io_stream::read(void*, unsigned long) (ipv4_io_stream.cpp:68)
==32344== by 0x8065845: cppa::remote_actor(std::pair<cppa::intrusive_ptr<cppa::util::input_stream>, cppa::intrusive_ptr<cppa::util::output_stream> >) (unicast_network.cpp:77)
==32344== by 0x8065BC0: cppa::remote_actor(char const*, unsigned short) (unicast_network.cpp:104)
==32344== by 0x5149703: img2::Service::scaleRpc(img2::ScaleReply&, std::string const&, std::string const&, std::string const&, int, int, int) (service.cpp:190)
==32344== by 0x514A870: img2::Service::serve(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:327)
==32344== by 0x514B62C: img2::TntService::operator()(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:80)
==32344== by 0x5406203: tnt::Worker::dispatch(tnt::HttpRequest&, tnt::HttpReply&) (worker.cpp:431)
==32344== by 0x5406D6E: tnt::Worker::processRequest(tnt::HttpRequest&, std::iostream&, unsigned int) (worker.cpp:240)
==32344== by 0x5407886: tnt::Worker::run() (worker.cpp:139)
==32344== by 0x51540F1: cxxtools::DetachedThread::exec() (thread.h:315)
==32344== by 0x5703AFE: thread_entry (callable.tpp:314)
==32344== Address 0x2 is not stack'd, malloc'd or (recently) free'd
from actor-framework.
So far, I've instrumented your code:
void ipv4_io_stream::read(void* vbuf, size_t len) {
if ((intptr_t) vbuf < 10) VALGRIND_PRINTF ("trace11, vbuf < 10\n");
auto buf = reinterpret_cast<char*>(vbuf);
if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace12, buf < 10\n");
size_t rd = 0;
while (rd < len) {
if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace13, buf < 10\n");
if (rd < 0) VALGRIND_PRINTF ("trace14, rd < 0: %i", (int) rd);
if (rd > 99111222) VALGRIND_PRINTF ("trace14, rd > 99111222: %i", (int) rd);
auto recv_result = ::recv(m_fd, buf + rd, len - rd, 0);
handle_read_result(recv_result, true);
if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace15, buf < 10\n");
if (recv_result > 0) {
rd += static_cast<size_t>(recv_result);
}
if (rd < len) {
if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace21, buf < 10\n");
fd_set rdset;
if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace22, buf < 10\n");
FD_ZERO(&rdset);
if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace23, buf < 10\n");
FD_SET(m_fd, &rdset);
if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace24, buf < 10\n");
if (select(m_fd + 1, &rdset, nullptr, nullptr, nullptr) < 0) {
throw network_error("select() failed");
}
if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace25, buf < 10\n");
}
if ((intptr_t) buf < 10) VALGRIND_PRINTF ("trace16, buf < 10\n");
}
}
and found that "buf" is first corrupted at "trace25":
**19461** trace25, buf < 10
**19461** trace16, buf < 10
**19461** trace13, buf < 10
==19461== Thread 17:
==19461== Syscall param socketcall.recvfrom(buf) points to unaddressable byte(s)
==19461== at 0x8DDC54C: recv (recv.c:34)
==19461== by 0x801D0D7: cppa::detail::ipv4_io_stream::read(void*, unsigned long) (ipv4_io_stream.cpp:73)
==19461== by 0x8066EAF: cppa::remote_actor(std::pair<cppa::intrusive_ptr<cppa::util::input_stream>, cppa::intrusive_ptr<cppa::util::output_stream> >) (unicast_network.cpp:81)
==19461== by 0x8067236: cppa::remote_actor(char const*, unsigned short) (unicast_network.cpp:108)
==19461== by 0x514A093: img2::Service::scaleRpc(img2::ScaleReply&, std::string const&, std::string const&, std::string const&, int, int, int) (service.cpp:190)
==19461== by 0x514B210: img2::Service::serve(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:327)
==19461== by 0x514BFBC: img2::TntService::operator()(tnt::HttpRequest&, tnt::HttpReply&, tnt::QueryParams&) (service.cpp:80)
==19461== by 0x5407203: tnt::Worker::dispatch(tnt::HttpRequest&, tnt::HttpReply&) (worker.cpp:431)
==19461== by 0x5407D6E: tnt::Worker::processRequest(tnt::HttpRequest&, std::iostream&, unsigned int) (worker.cpp:240)
==19461== by 0x5408886: tnt::Worker::run() (worker.cpp:139)
==19461== by 0x5154E71: cxxtools::DetachedThread::exec() (thread.h:315)
==19461== by 0x5704AFE: thread_entry (callable.tpp:314)
==19461== Address 0x2 is not stack'd, malloc'd or (recently) free'd
from actor-framework.
Looks like a bug in select
or select
-related macros (or GCC optimisations of them), because when I replace select
with poll
, cppa::detail::ipv4_io_stream::read
no longer crashes and the application becomes more stable.
diff --git a/src/ipv4_io_stream.cpp b/src/ipv4_io_stream.cpp
index 8271026..36f06f8 100644
--- a/src/ipv4_io_stream.cpp
+++ b/src/ipv4_io_stream.cpp
@@ -45,6 +45,7 @@
# include <sys/socket.h>
# include <netinet/in.h>
# include <netinet/tcp.h>
+# include <poll.h>
#endif
namespace cppa { namespace detail {
@@ -71,12 +72,9 @@ void ipv4_io_stream::read(void* vbuf, size_t len) {
rd += static_cast<size_t>(recv_result);
}
if (rd < len) {
- fd_set rdset;
- FD_ZERO(&rdset);
- FD_SET(m_fd, &rdset);
- if (select(m_fd + 1, &rdset, nullptr, nullptr, nullptr) < 0) {
- throw network_error("select() failed");
- }
+ struct pollfd fds;
+ fds.fd = m_fd; fds.events = POLLIN; fds.revents = 0;
+ if (poll (&fds, 1, -1) < 0) throw network_error("poll() failed");
}
}
}
The app now crashes less often, in middleman.cpp:787. I'll try to change it to poll as well.
from actor-framework.
Here is my attempt patching middleman to use poll:
diff --git a/src/middleman.cpp b/src/middleman.cpp
index 825fbf6..537a8a6 100644
--- a/src/middleman.cpp
+++ b/src/middleman.cpp
@@ -35,6 +35,11 @@
#include <sstream>
#include <iostream>
+#ifdef CPPA_WINDOWS
+#else
+# include <poll.h>
+#endif
+
#include "cppa/on.hpp"
#include "cppa/actor.hpp"
#include "cppa/match.hpp"
@@ -58,7 +63,7 @@
using namespace std;
-//#define VERBOSE_MIDDLEMAN
+#define VERBOSE_MIDDLEMAN
#ifdef VERBOSE_MIDDLEMAN
#define DEBUG(arg) { \
@@ -441,7 +446,7 @@ bool peer_connection::continue_reading() {
binary_deserializer bd(m_rd_buf.data(), m_rd_buf.size());
m_meta_msg->deserialize(&msg, &bd);
auto& content = msg.content();
- DEBUG("<-- " << to_string(msg));
+ //DEBUG("<-- " << to_string(msg));
match(content) (
// monitor messages are sent automatically whenever
// actor_proxy_cache creates a new proxy
@@ -670,7 +675,7 @@ class middleman_overseer : public network_channel {
DEBUG("message to an unknown peer: " << to_string(out_msg));
break;
}
- DEBUG("--> " << to_string(out_msg));
+ //DEBUG("--> " << to_string(out_msg));
auto had_unwritten_data = peer->has_unwritten_data();
try {
peer->write(out_msg);
@@ -702,19 +707,17 @@ class middleman_overseer : public network_channel {
void middleman::operator()(int pipe_fd, middleman_queue& queue) {
DEBUG("pself: " << to_string(*m_pself));
- int maxfd = 0;
- fd_set rdset;
- fd_set wrset;
- fd_set* wrset_ptr = nullptr;
+ std::vector<struct pollfd> fdset;
+ bool wrset = false;
m_channels.emplace_back(new middleman_overseer(this, pipe_fd, queue));
auto update_fd_sets = [&] {
- FD_ZERO(&rdset);
- maxfd = 0;
+ fdset.clear();
CPPA_REQUIRE(m_channels.size() > 0);
for (auto& channel : m_channels) {
- auto fd = channel->read_handle();
- maxfd = max(maxfd, fd);
- FD_SET(fd, &rdset);
+ struct pollfd pfd;
+ pfd.fd = channel->read_handle();
+ pfd.events = POLLIN; pfd.revents = 0;
+ fdset.push_back (pfd);
}
// check consistency of m_peers_with_unwritten_data
if (!m_peers_with_unwritten_data.empty()) {
@@ -728,17 +731,17 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
}
}
if (m_peers_with_unwritten_data.empty()) {
- if (wrset_ptr) wrset_ptr = nullptr;
+ wrset = false;
}
else {
for (auto& peer : m_peers_with_unwritten_data) {
- auto fd = peer->write_handle();
- maxfd = max(maxfd, fd);
- FD_SET(fd, &wrset);
+ struct pollfd pfd;
+ pfd.fd = peer->write_handle();
+ pfd.events = POLLOUT; pfd.revents = 0;
+ fdset.push_back (pfd);
}
- wrset_ptr = &wrset;
+ wrset = true;
}
- CPPA_REQUIRE(maxfd > 0);
};
auto continue_reading = [&](const network_channel_ptr& ch) {
bool erase_channel = false;
@@ -799,14 +802,13 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
};
do {
update_fd_sets();
- //DEBUG("select()");
int sresult;
do {
- DEBUG("select() on "
+ DEBUG("poll() on "
<< (m_peers_with_unwritten_data.size() + m_channels.size())
<< " sockets");
- sresult = select(maxfd + 1, &rdset, wrset_ptr, nullptr, nullptr);
- DEBUG("select() returned " << sresult);
+ sresult = poll (fdset.data(), fdset.size(), -1);
+ DEBUG("poll() returned " << sresult);
if (sresult < 0) {
// try again or die hard
sresult = 0;
@@ -819,9 +821,7 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
// nfds is negative or the value
// contained within timeout is invalid
case EINVAL: {
- if ((maxfd + 1) < 0) {
- CPPA_CRITICAL("overflow: maxfd + 1 > 0");
- }
+ CPPA_CRITICAL("poll EINVAL");
break;
}
case ENOMEM: {
@@ -855,17 +855,27 @@ void middleman::operator()(int pipe_fd, middleman_queue& queue) {
//DEBUG("continue reading ...");
{ // iterate over all channels and remove channels as needed
for (auto& ch : m_channels) {
- if (FD_ISSET(ch->read_handle(), &rdset)) {
- continue_reading(ch);
- }
+ for (auto& pfd: fdset)
+ if (pfd.fd == ch->read_handle()) {
+ if (pfd.revents) DEBUG("fd " << pfd.fd << "; read revents: " << pfd.revents);
+ if (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL))
+ m_erased_channels.insert (ch);
+ else if (pfd.revents != 0)
+ continue_reading(ch);
+ }
}
}
- if (wrset_ptr) { // iterate over peers with unwritten data
+ if (wrset) { // iterate over peers with unwritten data
DEBUG("continue writing ...");
for (auto& peer : m_peers_with_unwritten_data) {
- if (FD_ISSET(peer->write_handle(), &wrset)) {
- continue_writing(peer);
- }
+ for (auto& pfd: fdset)
+ if (pfd.fd == peer->write_handle()) {
+ if (pfd.revents) DEBUG("fd " << pfd.fd << "; write revents: " << pfd.revents);
+ if (pfd.revents & (POLLRDHUP | POLLERR | POLLHUP | POLLNVAL))
+ m_peers_with_unwritten_data.erase (peer);
+ else if (pfd.revents != 0)
+ continue_writing(peer);
+ }
}
}
insert_new_handlers();
In debug output I see poll returning POLLIN and POLLOUT events only.
The number of opened sockets being polled grows indefinitely.
The client code which leads to that growths of opened sockets is, as I've already mentioned earlier:
using namespace cppa; using std::chrono::seconds;
actor_ptr img2actor = remote_actor ("127.0.0.1", 1100); // Calling into "img2.scaleService".
sync_send (img2actor, atom("scale"), id, original, watermark, maxWidth, maxHeight, version) .await (
on_arg_match >> [&scaled](ScaleReply reply){
scaled = reply;
},
after (seconds (3)) >> [this]{log_error ("scaleRpc: timeout waiting for reply");}
);
If I change it to
using namespace cppa; using std::chrono::seconds;
static actor_ptr img2actor = remote_actor ("127.0.0.1", 1100); // Calling into "img2.scaleService".
sync_send (img2actor, atom("scale"), id, original, watermark, maxWidth, maxHeight, version) .await (
on_arg_match >> [&scaled](ScaleReply reply){
scaled = reply;
},
after (seconds (3)) >> [this]{log_error ("scaleRpc: timeout waiting for reply");}
);
then the number of sockets being polled doesn't grow (remains = 2).
I THINK that the select
memory corruption mentioned earlier might be related to that growing number of sockets (reaching some internal limits in select
).
IMO, the libcppa fails to automatically close the remote_actor sockets where it should.
libcppa should also guard against reaching select
limits.
P.S. In retrospect, I should have used epoll
...
from actor-framework.
Thanks a lot for your effort! I think I'll keep your changes (I'm going to review your patch next week). Using poll() is just fine for me, because epoll() isn't available on all platforms (e.g., MacOS).
You're right about automatically disposal of unused remote actors. Right now, the middleman has a reference to the proxy on its own, since it needs to be able to deserialize it. However, this obviously keeps the reference count above zero, even if no other actor has a reference to the proxy any longer. I'll see how to fix that. Again, thanks a lot for pointing this out.
Btw, sorry for the late answer, but I was ill the whole week.
from actor-framework.
NP!
If the semantics of my patch are okay then I'd like to do a LINUX version with epoll
later (with #ifdef __linux__
).
from actor-framework.
Based on your patch, I have updated the middleman to use poll
rather than select
. However, I haven't fixed the reference-counting problem yet.
from actor-framework.
I have opened a new issue for the reference-counting problem: #75. I'll close this one, since it starts getting too big.
from actor-framework.
Related Issues (20)
- Test `caf.flow.generation` unstable on CI HOT 3
- Implement new `set_idle_handler`
- Properly implement await semantics in actor shells
- Support splitting and UTF-8 conversions in flow::byte
- Regular sections recognize <arg> syntax
- Deprecate legacy APIs for 1.0
- Error when calling a function_view with an terminated actor
- `uri::can_parse` rejects username and password
- Simplify the error class HOT 2
- Implement on-backpressure-buffer operator
- Documentation update for 1.0
- Fix handling of MS-style line endings when parsing JSON
- bug in chrono.cpp/ chrono.test.cpp? HOT 1
- Investigate frequent failures of caf-robot-lp-communication
- [Bug] Work stealing doesn't work HOT 4
- a segmentation fault at format.hpp:155 HOT 2
- Clean up for version 2.0
- Catch-all handlers apply to system messages
- Can CAF run on top of WebSocket connections? HOT 4
- json_builder issue HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from actor-framework.