Coder Social home page Coder Social logo

channel's Introduction

Channel

基于订阅的多进程通讯组件,用于workerman进程间通讯或者服务器集群通讯,类似redis订阅发布机制。基于workerman开发。

Channel 提供两种通讯形式,分别是发布订阅的事件机制和消息队列机制。

它们的主要区别是:

  • 事件机制是消息发出后,所有订阅该事件的客户端都能收到消息。
  • 消息队列机制是消息发出后,所有订阅该消息的客户端只有一个会收到消息,如果客户端忙消息会进行排队直到有客户端闲置后重新取到消息。
  • 需要注意的是 Channel 只是提供一种通讯方式,本身并不提供消息确认、重试、延迟、持久化等功能,请根据实际情况合理使用。

手册地址

Channel手册

服务端

use Workerman\Worker;

//Tcp 通讯方式
$channel_server = new Channel\Server('0.0.0.0', 2206);

//Unix Domain Socket 通讯方式
//$channel_server = new Channel\Server('unix:///tmp/workerman-channel.sock');

if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

客户端

use Workerman\Worker;

$worker = new Worker();
$worker->onWorkerStart = function()
{
    // Channel客户端连接到Channel服务端
    Channel\Client::connect('<Channel服务端ip>', 2206);

    // 使用 Unix Domain Socket 通讯
    //Channel\Client::connect('unix:///tmp/workerman-channel.sock');

    // 要订阅的事件名称(名称可以为任意的数字和字符串组合)
    $event_name = 'event_xxxx';
    // 订阅某个自定义事件并注册回调,收到事件后会自动触发此回调
    Channel\Client::on($event_name, function($event_data){
        var_dump($event_data);
    });
};
$worker->onMessage = function($connection, $data)
{
    // 要发布的事件名称
    $event_name = 'event_xxxx';
    // 事件数据(数据格式可以为数字、字符串、数组),会传递给客户端回调函数作为参数
    $event_data = array('some data.', 'some data..');
    // 发布某个自定义事件,订阅这个事件的客户端会收到事件数据,并触发客户端对应的事件回调
    Channel\Client::publish($event_name, $event_data);
};

if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}

消息队列示例

use Workerman\Worker;
use Workerman\Timer;

$worker = new Worker();
$worker->name = 'Producer';
$worker->onWorkerStart = function()
{
    Client::connect();

    $count = 0;
    Timer::add(1, function() {
        Client::enqueue('queue', 'Hello World '.time());
    });
};

$mq = new Worker();
$mq->name = 'Consumer';
$mq->count = 4;
$mq->onWorkerStart = function($worker) {
    Client::connect();

    //订阅消息 queue
    Client::watch('queue', function($data) use ($worker) {
        echo "Worker {$worker->id} get queue: $data\n";
    });

    //10 秒后取消订阅该消息
    Timer::add(10, function() {
        Client::unwatch('queue');
    }, [], false);
};

Worker::runAll();

channel's People

Contributors

detain avatar mgzhenhong avatar walkor avatar xeath avatar xeden3 avatar xpader avatar zgh419566 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

channel's Issues

Can we use multiple channel?

Hi walkor,

We are using channel into gateway worker project using multiple gateway and business worker. We are using channel for sharing some data into multiple business worker. So my question is if the channel is down or fail then the data will not reach into the other business worker.
Is there any way to run multiple channel to avoid single point of failure

Thanks
Abhinav

Change Channel port

Hi,

I'm trying to change the port of Channel as follow:

$channel = new \Channel\Server('127.0.0.1', 7596);

[...]

$io->on('workerStart', function()use($io) {
    echo "This message is show and then shows the error";
    $io->adapter('\PHPSocketIO\ChannelAdapter');
});

But I get the next error:

Waring channel connection closed and try to reconnect

When I trying to start the Workerman server.

关于 Channel Server 的一个较严重的问题

近期项目中的 Channel Server 有严重的内存泄露问题,百思不得解,Channel Server 只负责消息的传递,本身不运行什么业务,怎么会内存泄露,而且之前运行一两个月也没什么异常。

后经过好多天的排查,发现是通过 Channel Server 传递闭包导致的内存泄露,闭包的序列化是使用 https://github.com/opis/closure 实现的,而 opis/closure 存在某些情况下的内存泄露。

而 Channel Server 也仅是传递,为什么会内存泄露呢?这就是 Channel Server 设计上的一个较严重的问题,Channel Server 会反序列化接收到的信息,从中拿到 channel 或者 queue 的名称等一些关联信息,然后再序列化后发送给其它进程,重点就是这里,因为整个信息是在一个大的数组中传递的,反序列化再序列化并不是只涉及到 Channel 组件自己的信息,也会包含业务传递的信息,但实际上 Channel 完全不关心业务数据,可却同时也会对业务的数据进行了反序列化和序列化,造成了完全不必要的性能浪费和其它隐患。而恰恰是这里执行了 opis/closure 的反序列化和序列化,导致内存的泄露发生在了 Channel Server 进程中。

改进思路是,把传递的消息进行拆分,让业务的数据序列化和反序列化只发生在调用方和接收方的进程中,Channel Server 仅对自己需要的关联信息进行相应处理,而业务数据则完全原样直传,避免业务造成的性能浪费、内存泄露等隐患发生在 Channel Server 中。

Waring channel connection closed and try to reconnect

hello
on start get this:
Waring channel connection closed and try to reconnect

 
require_once __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;

$channel_server = new Channel\Server('0.0.0.0', 2206);

if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}
require_once __DIR__ . '/vendor/autoload.php';

use Workerman\Worker;

$ws_worker = new Worker("websocket://0.0.0.0:2021");
$ws_worker->onWorkerStart = function()
{
    // Channel客户端连接到Channel服务端
    Channel\Client::connect('0.0.0.0', 2206);
    // 要订阅的事件名称(名称可以为任意的数字和字符串组合)
    $event_name = 'event_xxxx';
    // 订阅某个自定义事件并注册回调,收到事件后会自动触发此回调
    Channel\Client::on($event_name, function($event_data){
        var_dump($event_data);

    });

};
$ws_worker->onMessage = function($connection, $data)
{
    // 要发布的事件名称
    $event_name = 'event_xxxx';
    // 事件数据(数据格式可以为数字、字符串、数组),会传递给客户端回调函数作为参数
    $event_data = array('some data.', 'some data..');
    // 发布某个自定义事件,订阅这个事件的客户端会收到事件数据,并触发客户端对应的事件回调
     Channel\Client::publish($event_name, $event_data);
};

if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}
{
    "description": "demoapp",
    "name": "dalong/app",
    "authors": [
        {
            "name": "rongfengliang",
            "email": "[email protected]"
        }
    ],
    "require": {
        "workerman/workerman": "3.5.22",
        "workerman/channel": "^1.0"
    }
}

Event has been triggered 4 times (one for each process)

When I create a worker that has 4 process the onWorkerStart closure has been called 4 times. Therefore, I created 4 clients listening the event so when the event occurs it will be triggered 4 times. How can I trigger it just once??

$worker = new Worker(....);
// 4 processes
$worker->count = 4;
$worker->onWorkerStart = function()
{
    // Channel客户端连接到Channel服务端
    Channel\Client::connect('127.0.0.1', 2206);
    // 要订阅的事件名称(名称可以为任意的数字和字符串组合)
    $event_name = 'my_event';
    // 订阅某个自定义事件并注册回调,收到事件后会自动触发此回调
    Channel\Client::on($event_name, function($event_data){
        print_r($event_data);
    });
};

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.