Coder Social home page Coder Social logo

ko-process's Introduction

ko-process

Build Status Latest Stable Version Code Coverage Scrutinizer Code Quality Total Downloads Latest Unstable Version License

Ko-Process allows for easy callable forking. It is object-oriented wrapper around fork part of PCNTL PHP's extension. Background process, detaching process from the controlling terminal, signals and exit codes and simple IPC via shared memory. This is well tested library used in real world high load projects.

Installation

Requirements

PHP >= 5.4
pcntl extension installed
posix extension installed

Via Composer

The recommended way to install library is composer. You can see package information on Packagist.

{
	"require": {
		"misterion/ko-process": "*"
	}
}

Do not use composer?

Just clone the repository and care about autoload for namespace Ko.

Usage

Basic usage looks like this:

$manager = new Ko\ProcessManager();
$process = $manager->fork(function(Ko\Process $p) {
    echo 'Hello from ' . $p->getPid();
})->onSuccess(function() {
    echo 'Success finish!';
})->wait();

If should wait for all forked process

$manager = new Ko\ProcessManager();
for ($i = 0; $i < 10; $i++) {
    $manager->fork(function(Ko\Process $p) {
        echo 'Hello from ' . $p->getPid();
        sleep(1);
    });
}
$manager->wait();

Process title?

Yes, both ProcessManager and Process can change process title with setProcessTitle function. Or you may use trait Ko\Mixin\ProcessTitle to add this to any class you want. Take attention about ProcessManager::onShutdown - use can set callable which would be called if ProcessManager catch SIGTERM. The handler would be called before child process would be shutdown. We use demonize to detach from terminal. Run sample with code

$manager = new Ko\ProcessManager();
$manager->demonize();
$manager->setProcessTitle('I_am_a_master!');
$manager->onShutdown(function() use ($manager) {
    echo 'Catch sigterm.Quiting...' . PHP_EOL;
    exit();
});

echo 'Execute `kill ' . getmypid() . '` from console to stop script' . PHP_EOL;
while(true) {
    $manager->dispatch();
    sleep(1);
}

and ps aux|grep I_am_a_master or top to see you process title in linux process list.

Spawn

Making master - child process pattern application you should care about child process be alive. The spawn function will help you with that - once spawn will keep forked process alive after he exit with some error code.

$manager = new Ko\ProcessManager();
for ($i = 0; $i < 10; $i++) {
    $manager->spawn(function(Ko\Process $p) {
        echo 'Hello from ' . $p->getPid();
        sleep(1);
        exit(1); //exit with non 0 exit code
    });
}
$manager->wait(); //we have auto respawn for 10 forks

Let`s explain you are writing something like queue worker based on PhpAmqpLib\AMPQ. So you can write something like this

use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$manager = new Ko\ProcessManager();
$manager->setProcessTitle('Master:working...');
$manager->spawn(function(Ko\Process $p) {
    $connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();

    $channel->queue_declare('hello', false, true, false, false);

    $callback = function($msg) use (&$p) {
        $p->setProcessTitle('Worker:processJob ' . $msg->body);

        //will execute our job in separate process
        $m = new Ko\ProcessManager();
        $m->fork(function(Ko\Process $jobProcess) use ($msg) {
            $jobProcess->setProcessTitle('Job:processing ' . $msg->body);

            echo " [x] Received ", $msg->body, "\n";
            sleep(2);
            echo " [x] Done", "\n";
        })->onSuccess(function() use ($msg){
            //Ack on success
            $msg->delivery_info['channel']
                ->basic_ack($msg->delivery_info['delivery_tag']);
        })->wait();

        $p->setProcessTitle('Worker:waiting for job... ');

        //IMPORTANT! You should call dispatch() them self to process pending signals.
        $p->dispatch();

        if ($p->isShouldShutdown()) {
            exit();
        }
    };

    $channel->basic_qos(null, 1, null);
    $channel->basic_consume('hello', '', false, false, false, false, $callback);

    while(count($channel->callbacks)) {
        $channel->wait();
    }

    $channel->close();
    $connection->close();
});
$manager->wait();

Shared memory and Semaphore

The Ko\SharedMemory used Semaphore for internal locks so can be safely used for inter process communications. SharedMemory implements \ArrayAccess and \Countable interface so accessible like an array:

$sm = new SharedMemory(5000); //allocate 5000 bytes
$sm['key1'] = 'value';

echo 'Total keys is' . count($sm) . PHP_EOL;
echo 'The key with name `key1` exists: ' . isset($sm['key1'] . PHP_EOL;
echo 'The value of key1 is ' . $sm['key1'] . PHP_EOL;

unset($sm['key1']);
echo 'The key with name `key1` after unset exists: ' . isset($sm['key1'] . PHP_EOL;

You can use Semaphore for inter process locking:

$s = new Semaphore();
$s->acquire();
//do some job
$s->release();

//or
$s->tryExecute(function() {
    //do some job
});

Credits

Ko-process written as a part of GameNet project by Nikolay Bondarenko (misterionkell at gmail.com).

License

Released under the MIT license.

Links

ko-process's People

Contributors

damoclark avatar duncan3dc avatar jmt33 avatar misterion avatar ovr avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ko-process's Issues

SIGTERM and SIGINT handling

I am trying to get advantage of onShutdown handler:

$processManager = new Ko\ProcessManager()
$processManager->fork(function(Process $process) {
});

$processManager->fork(function(Process $process) {
});

$processManager->onShutdown(function () {
    echo 'Shutting down';
});

$processManager->wait();

But it is never called.

When I interrupt my script with Ctrl + C SIGINT signal is sent.

SIGTERM is expected in:

protected function setupSignalHandlers()
{
    $this->signalHandler = new SignalHandler();
    $this->signalHandler->registerHandler(SIGCHLD, function() {
        $this->handleSigChild();
    });

    $this->signalHandler->registerHandler(SIGTERM, function() {
        $this->handleSigTerm();
    });
}

I have tried adding:

$this->signalHandler->registerHandler(SIGINT, function() {
    $this->handleSigTerm();
});

But then the script is never interrupted.

Ko\Process should implements ArrayAccess

Motivation is make more easy working with shared memory,

Now

$m->fork(function(Process $p) {
     $sm= $p->getSharedMemory();
     $sm['field'] ='value';
});

Should be

$m->fork(function(Process $p) {
     $p['field'] ='value';
});

Unable to create the semaphore (0)

Hi!

I get this error:

[RuntimeException] 
Unable to create the semaphore (0)
/htdocs/vendor/misterion/ko-process/src/Ko/Semaphore.php:81
#0: Ko\Semaphore-&gt;__construct(integer)
        /htdocs/vendor/misterion/ko-process/src/Ko/SharedMemory.php:90
#1: Ko\SharedMemory-&gt;__construct()
        /htdocs/vendor/misterion/ko-process/src/Ko/ProcessManager.php:196
#2: Ko\ProcessManager-&gt;createProcess(object)
        /htdocs/vendor/misterion/ko-process/src/Ko/ProcessManager.php:190
#3: Ko\ProcessManager-&gt;fork(object)

What could be the problem?

Could not fork more than 300. `Kernel.sem` limit.

Test code:

$manager = new Ko\ProcessManager();
for ($i = 0; $i < 300; $i++) {
    $manager->fork(function(Ko\Process $p) {
        echo 'Hello from ' . $p->getPid() . "\n";
        sleep(1);
        exit(1); //exit with non 0 exit code
    });
}
$manager->wait();

Result:

[root@host-192-168-113-41 demo]# php pcntl.php 
Hello from 9167
......
Hello from 9290

Warning: sem_get(): failed for key 0x61003ddc: No space left on device in /var/www/html/queue/vendor/misterion/ko-process/src/Ko/Semaphore.php on line 79

Fatal error: Uncaught RuntimeException: Unable to create the semaphore in /var/www/html/queue/vendor/misterion/ko-process/src/Ko/Semaphore.php:81
Stack trace:
#0 /var/www/html/queue/vendor/misterion/ko-process/src/Ko/SharedMemory.php(90): Ko\Semaphore->__construct(1627405788)
#1 /var/www/html/queue/vendor/misterion/ko-process/src/Ko/ProcessManager.php(195): Ko\SharedMemory->__construct()
#2 /var/www/html/queue/vendor/misterion/ko-process/src/Ko/ProcessManager.php(189): Ko\ProcessManager->createProcess(Object(Closure))
#3 /var/www/html/queue/demo/pcntl.php(34): Ko\ProcessManager->fork(Object(Closure))
#4 {main}
  thrown in /var/www/html/queue/vendor/misterion/ko-process/src/Ko/Semaphore.php on line 81

bug

PHP Fatal error: Call to undefined function Ko\shm_attach() in /Paidata/www/process/vendor/misterion/ko-process/src/Ko/SharedMemory.php on line 83

Fatal error: Call to undefined function Ko\shm_attach() in /Paidata/www/process/vendor/misterion/ko-process/src/Ko/SharedMemory.php on line 83

Unable to create the shared memory segment

Unable to create the shared memory segment 
Warning: shm_attach(): failed for key 0x61019b9c: Too many open files in /Users/Elfet/Sites/.../vendor/misterion/ko-process/src/Ko/SharedMemory.php on line 55

I got this. What to do?

Additional Parameter

I needed some additional parameter when starting thread.
So I added it in Process class and fork and spawn method in ProcessManager class.
I don' have enough time to PR, now. So I just leave an issue here.

$param = "whatever";

$process = $manager->fork(function(Ko\Process $p, $param) {
    // do something
}, $param);

source.zip

Share Memory Requirement

Hi, the composer packages suggests ext-sysvshm, but is this actually a requires? Without ext-sysvshm installed executing code from 'Usage' in README.md fails with an fatal error:

PHP Fatal error: Call to undefined function Ko\shm_attach() in /srv/movember.com/releases/20120817072736/vendor/misterion/ko-process/src/Ko/SharedMemory.php on line 83

Thanks.

Unclean code

/**
 * Return count of currently running child process.
 *
 * @return int
 */
public function getProcessCount()
{
    return count($this);
}
/**
 * Return count of currently running child process.
 *
 * @return int
 */
public function count()
{
    return count($this->children);
}

Should be in first function?
return $this->count($this);

Sending SIGKILL to the child

Hi,
I'd like to suggest some improvement to ko-process:
In normal situation when we need to shutdown worker process we are sending to it SIGTERM signal.
The child worker with "waiting next envelope" status cann't to be closed cause need to process some message before it (and if message rate is slow, we can wait worker shutdown for a long time) . Could you send to the child SIGKILL signal in that situation?

Basic publish - consume from child process

First of all @misterion, thank you for this awesome library, I really like it.

I am playing with videlalvaro/php-amqplib.
What I wanted to do is have parallel publisher and consumer running in child processes:

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('q1');
$processManager = new ProcessManager();

$processManager->fork(function (Process $process) use ($channel) {
    echo 'Publishing from '.$process->getPid().PHP_EOL;

    $channel->basic_publish(new AMQPMessage('Hello from '.$process->getPid()), null, 'q1');
    $channel->close();
});

$processManager->fork(function (Process $process) use ($channel) {
    echo 'Consuming from '.$process->getPid().PHP_EOL;

    $channel->basic_consume('q1', '', false, false, false, false, function ($message) {
        echo 'Consuming message: '.$message->body.PHP_EOL;
    });

    while (count($channel->callbacks)) {
        $channel->wait();
    }
});

$processManager->wait();

Output:

Publishing from 9830
Consuming from 9831

Messages are published, but never consumed. Do you have any idea what can be the reason?

Full example is available on https://github.com/umpirsky/Extraload/blob/feature/queue-playground/test.php, can be easily checkout and run.

handle exit single process in pool

$manager = new \Ko\ProcessManager();
        for ($i = 0; $i < 10; $i++) {
            $manager->fork(function(\Ko\Process $p) use ($i) {
                echo 'Hello from ' . $p->getPid();
                sleep(1);
            })->onSuccess(function() {
                echo 'end';
            })->onError(function() {
                echo 'fail end';
            });
        }
$manager->wait();

OnSuccess / onError callbacks don't work if call wait() on process manager. If call wait () for each process handles call correct, but not parallel

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.