Coder Social home page Coder Social logo

Program exited with code -11 about elembuf HOT 3 CLOSED

cyroxin avatar cyroxin commented on August 29, 2024
Program exited with code -11

from elembuf.

Comments (3)

Cyroxin avatar Cyroxin commented on August 29, 2024

@mingwugmail

Here is something I worked up. It may, and probably does, have a race condition but the buffer is working well. The issue you had is that you did not construct the buffer. It is also good to remember that you cannot fill the buffer as much as you want. What ended up happening was that the producer filled memory that the buffer did not own as it did not check for how much room is available (buf.avail).

You also seemed to skip the expert -level documentation explaining how to share a buffer. It should not be passed by parameter as it ends up de-allocating the buffer due to RAII. I changed it into a module -level shared variable instead so it wouldn't be deallocated by the producer before the consumer is done. This will be removed in 1.2.X with explicit de-initiation, which will also allow implicit conversion to shared removing all those ugly casts.

I chose not to touch on performance aspects of your code to keep it as close as possible to what you intended to do. I did fix your incorrect sum issue and some syncronisation issues with atomics. There are indepth comments explaining why I did the changes I did, so that it can be learned from.

All in all, while it works, it does not offer any performance whatsoever. I did some testing and the biggest issue is the synchronized mutex that D offers, as expected, but also the repeated single item fill instead of filling using the whole array. You'll get more performance out of it if you try to do as much as possible in the mutex instead of hopping in and out of it, like my version currently does, otherwise it reserves and unreserves the synchronized buffer continuously for every small task.

The new version is many times less performant than the experimental threaded version. But it serves as a great study for how things work and how the buffer should be used. If you do end up creating a version with only atomics, do not let the producer touch the buffer at all, instead give it a pointer or a non-buffer slice to the available space and let it fill the memory the consumer does not see.

code
#!/usr/bin/env dub
/+ dub.sdl:
name "app"
dependency "elembuf" version="~>1.1.5"
dflags "-release" "-m64" "-boundscheck=off" "-O" platform="dmd"
dflags "-O4" "--release" "--boundscheck=off" platform="ldc2"
+/

import std.stdio;
import std.concurrency : receiveOnly, send, spawn, Tid, thisTid;
import std.datetime;
import core.atomic : atomicOp, atomicLoad, MemoryOrder;

import buffer;

alias T = int;

/*
Queue that can be used safely among
different threads. All access to an
instance is automatically locked thanks to
synchronized keyword.
*/
synchronized class SafeQueue(T) // Syncronised is SLOW as it is a mutex on everything! Use atomics with a shared or __gshared variable.
{
    // Note: must be private in synchronized
    // classes otherwise D complains.
    //private T[] elements;
    alias BufferT = Buffer!T;  // (T, true);
    private BufferT elements;

    alias elements this;

    this(bool uselessVariable) // Constructor must not have 0 parameters.
	{
        // We cannot pass a buffer as it will deallocate when it exits scope. Thus we construct it here locally in the class and pass it on as a non-destructable global.
        elements = cast (shared) BufferT(); 
	}

    auto length()
	{
        return (cast(BufferT)elements).length;
	}

    void push(T value) 
	{

        // auto arr = [value];

        (cast(BufferT)elements) << [value]; //.fill(cast(T[])arr);

    }

    /// Return T.init if queue empty
    void pop() {
        //import std.array : empty;

		//if (elements.empty)
        cast(BufferT)elements = (cast(BufferT)elements)[1 .. $];
    }
}

//This must be a module variable so that once it exits producer scope, 
//it does not deallocate before the consumer is done
//this is due to RAII, but will be removed in 1.2.X with explicit deinit function calls.
shared SafeQueue!(T) queue; 

/*
Safely print messages independent of
number of concurrent threads.
Note that variadic parameters 
are used
for args! That is args might be 0 .. N
parameters.
*/
void safePrint(T...)(T args)
{
    // Just executed by one concurrently
    synchronized {
        import std.stdio : writeln;
        writeln(args);
    }
}

const n=100_000_000;

shared int global_tls = 0;

void threadProducer(shared(int)* queueCounter)
{
    // Push values 0 to n-1
    foreach (i; 0..n) {

        while(queue.length == queue.max){} // You cannot fill forever, only fill when there is space available, i.e. buf.avail > 0
		queue.push(i); // Push will write element to available space. If pushed without available space, thread will error.

        //if(i % 1000 == 0)
        //safePrint("Pushed ", i);
        atomicOp!"+="(*queueCounter,1);
        atomicOp!"+="(global_tls,1);
    }
	safePrint("threadProducer:", atomicLoad(global_tls));
}

void threadConsumer(Tid owner,
					shared(int)* queueCounter)
{

	StopWatch sw;
	sw.start();  
	long s = 0;

    int popped = 0;
    while (popped != n) // IMPORTANT! This was popped = n, but it should be < n, because we start from 0, so the last pop should be n-1
	{
        if(queue.length == 0)
            continue;
  
		s += queue[0];
        // if (i == int.init) continue; //If you wish sum to be correct, this should not be done as the first item should be 0.
        //safePrint("Popped ", queue[0], " (Consumer pushed ", atomicLoad(*queueCounter), ")");
        queue.pop;
        ++popped;
        // safely fetch current value of
        // queueCounter using atomicLoad
		atomicOp!"-="(global_tls,1);
    }

	sw.stop();
	safePrint("threadConsumer:", atomicLoad(global_tls));
	safePrint("finished receiving");
	safePrint("received ",n," messages in ",sw.peek().msecs," msec sum=",s," speed=",n/sw.peek().msecs," msg/msec");

    // I'm done!
    owner.send(true); // Only send this after you are actually done and not printing.
}

void main()
{
    queue = new shared(SafeQueue!T)(true);
    // Buffer would deallocate when it exits the first spawn function scope if it were constructed here.
    // Thus we construct it inside the class. See EXPERT -level docs.
    shared int counter = 0;

    spawn(&threadProducer, &counter);
	auto consumer = spawn(&threadConsumer, thisTid, &counter);
	
	auto stopped = receiveOnly!bool;
    assert(stopped);

    //readln; // Had to add this as it goes away too fast for me. REMOVE THIS LINE!

}


from elembuf.

Cyroxin avatar Cyroxin commented on August 29, 2024

I assume this issue is closed with the optimized version sent to you privately instead of this?

from elembuf.

mw66 avatar mw66 commented on August 29, 2024

Yes, no longer code -11.

from elembuf.

Related Issues (4)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.