Coder Social home page Coder Social logo

sogou / workflow Goto Github PK

View Code? Open in Web Editor NEW
12.5K 12.5K 2.3K 4.57 MB

C++ Parallel Computing and Asynchronous Networking Framework

License: Apache License 2.0

CMake 1.39% Makefile 0.30% C++ 76.15% C 20.81% Starlark 0.56% Lua 0.80%
consul dag http kafka mysql redis tasking

workflow's Issues

如何汇总一个ParallelWork的结果

背景

我打算实现一个http服务,提供一个接口用来并行获取若干个http url的响应结果并汇总返回。基本上算是合并了tutorial-06-parallel_wgettutorial-04-http_echo_server的功能。

存在的一点区别是tutorial-06-parallel_wget只是在ParallelWorkcallback中往标准输出中打印了每个请求的结果,我的需求是想要处理一下这些抓取结果并汇总作为http 接口的返回。

我想到的一个方案是在ParallelWork的callback里获取到当前http server task(不是很确认正确的获取方式),并设置响应。

void callback(const ParallelWork *pwork)
{
	tutorial_series_context *ctx;
	const void *body;
	size_t size;
	size_t i;
        // 获取到当前http server task 
        HttpResponse *resp = task->get_resp();

	for (i = 0; i < pwork->size(); i++)
	{
		ctx = (tutorial_series_context *)pwork->series_at(i)->get_context();
		printf("%s\n", ctx->url.c_str());
		if (ctx->state == WFT_STATE_SUCCESS)
		{
			ctx->resp.get_parsed_body(&body, &size);
			resp->append_output_body_nocopy(body, size);
		}
		else
			printf("ERROR! state = %d, error = %d\n", ctx->state, ctx->error);

		delete ctx;
	}
}
int process(WFHttpTask *server_task)
{
	ParallelWork *pwork = Workflow::create_parallel_work(callback);
	SeriesWork *series;
	WFHttpTask *task;
	HttpRequest *req;
	tutorial_series_context *ctx;
	int i;

	for (i = 1; i < 100; i++)
	{
		std::string url = "http://localhost:8100/worker";

		task = WFTaskFactory::create_http_task(
                                   url, 
                                   REDIRECT_MAX, RETRY_MAX,
				   [](WFHttpTask *task) {
						tutorial_series_context *ctx =
						 (tutorial_series_context *)series_of(task)->get_context();
						ctx->state = task->get_state();
						ctx->error = task->get_error();
						ctx->resp = std::move(*task->get_resp());
						});

		req = task->get_req();
		req->add_header_pair("Accept", "*/*");
		req->add_header_pair("User-Agent", "Wget/1.14 (linux-gnu)");
		req->add_header_pair("Connection", "close");

		ctx = new tutorial_series_context;
		ctx->url = std::move(url);
		series = Workflow::create_series_work(task, nullptr);
		series->set_context(ctx);
		pwork->add_series(series);
	}

	WFFacilities::WaitGroup wait_group(1);

	Workflow::start_series_work(pwork, [&wait_group](const SeriesWork *) {
		wait_group.done();
	});

	wait_group.wait();
	return 0;
}

问题

想知道如何在ParallelWork里获取到当前请求的Http server task,或者把http server task设置到ParallelWork的上下文中

workflow并行串行处理的机制

请教下,SubTask.cc中任务分发及任务处理中红色框框中大概的含义是什么那?dispath与done均调用subtask_done,看到if条件,但不清楚具体的条件是什么?能否解答下(不论是并行还是串行,任务的分发都是顺序的,只不过分发到不同的request,如dns dispatch到WFRouterTask::dispatch,http task dispatch到WFComplexClientTask::dispatch,进而Communicator::request,但这个过程是通过ParallelTask::dispatch()顺序依次分发的);另外从服务端接收到数据,放进msgqueue,网络线程池触发handle,每次都必须调用subtask_done,最终调用用户注册的回掉函数,如果是基于此,并行处理的主要机制其实就是利用queue的特性,多个线程调用各自的注册的回掉函数;
image
另外,附加问一个小问题,类似这样的硬编码为1 (node == (struct __poller_node *)1)是什么含义那?
image

MySql client issue

1.创建mysql任务执行数据库的插入,插入记录成功,但是会出现Abnormal packet_type=1。想请教下一般可能的原因是什么,控制台输出来自

else
{
fprintf(stderr, "Abnormal packet_type=%d\n", resp->get_packet_type());
}

2.对于需要插入多条记录的任务,是分别每条记录建立一个task比较好呢还是建立一个task执行多条insert语句好呢。

数据库小白,问的问题可能比较zz ==!

about the benchmark

workflow 内部线程池的设计方式本身是有瓶颈的,其是一种常规的线程池设计模型,但当线程池中线程数较大时会因 pthread_cond_signal 的设计导致惊群问题(具体原因可以 man 一下 pthread_cond_signal 或 https://blog.csdn.net/zsxxsz/article/details/88388425 );
另外,与 nginx 的性能对比可能也存在问题,你们不妨把 nginx 的日志关闭再对比测试一下(我对比测试后发现nginx性能还是远高于workflow的)。nginx 因为其单线程非阻塞的设计方式,其CPU亲和性要远好于线程池的设计方式,其性能基本是随着CPU核数和进程数增加呈线性增长的,而 workflow 中的 demo 在线程数(其中的 pollers 和 handlers)到一定数时就无法再提升了,应该还是线程池中的锁碰撞和 CPU 亲和性等原因);
还有就是nginx的 HTTP 协议解析过程是规范的(从其对url的解析便可以看出),并且功能逻辑也是较为复杂的。

C++ Workflow 增加kafka client支持kafka协议

C++ Workflow增加对kafka协议的支持。

具有以下功能特点:

  • 基于C++ Workflow内部的任务流机制实现,高效简洁。

  • 支持kafka常用的消息生产、获取和消费者组等特性。

  • 以插件形式发布,编译时通过make KAFKA=y,生成独立的类库。

  • 搜狗实际业务锤炼,稳定可靠。

Installation on MacOS

Workflow需要依赖OpenSSL(推荐1.1及以上版本)和Cmake(要求3.6以上版本),以下为安装步骤:

  • 安装 OpenSSL

    brew install openssl
    
  • 安装 CMake

    brew install cmake
    
  • 指定 OpenSSL 环境变量
    由于MacOS下默认有LibreSSL,因此在brew安装后,并不会自动建软链,我们需要手动把执行路径、编译路径、cmake时的find_package路径都配置到bash的环境变量中。用户可以执行brew info openssl查看相关信息,也可以如下配置:

    echo 'export PATH="/usr/local/opt/[email protected]/bin:$PATH"' >> ~/.bash_profile
    echo 'export LDFLAGS="-L/usr/local/opt/[email protected]/lib"' >> ~/.bash_profile
    echo 'export CPPFLAGS="-I/usr/local/opt/[email protected]/include"' >> ~/.bash_profile
    echo 'export PKG_CONFIG_PATH="/usr/local/opt/[email protected]/lib/pkgconfig"' >> ~/.bash_profile
    echo 'export OPENSSL_ROOT_DIR=/usr/local/opt/openssl' >> ~/.bash_profile
    echo 'export OPENSSL_LIBRARIES=/usr/local/opt/openssl/lib' >> ~/.bash_profile
    

    如果使用zsh,则还需要以下一步,把bash的配置加载一下:

    echo 'test -f ~/.bash_profile  && source ~/.bash_profile' >> ~/.zshrc
    source ~/.zshrc
    

How to compile to dynamic library?

Hi, guys,

I just get a 76-byte libworkflow.so and 11.7M libworkflow.a after I run the make command in the project root folder. How to generate a real dynamic shared library?

Even using the libworkflow.a library, I can't compile my code. The error is below:
XXXXX@XXXXX-HPWS:~/personal/gitee/dl4cpp$ make
g++ -g -std=c++11 -Wall -I./sogou-workflow/include -L./sogou-workflow/lib -l:libworkflow.a src/main.cc -o src/main.o
/tmp/cc1vPAdt.o: In function protocol::HttpMessage::append_output_body(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&)': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/HttpMessage.h:159: undefined reference to protocol::HttpMessage::append_output_body(void const*, unsigned long)'
/tmp/cc1vPAdt.o: In function EncodeStream::~EncodeStream()': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/EncodeStream.h:101: undefined reference to EncodeStream::clear_buffer()'
/tmp/cc1vPAdt.o: In function protocol::RedisValue::~RedisValue()': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/RedisMessage.h:228: undefined reference to protocol::RedisValue::free_data()'
/tmp/cc1vPAdt.o: In function protocol::RedisMessage::~RedisMessage()': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/RedisMessage.h:324: undefined reference to redis_parser_deinit'
/tmp/cc1vPAdt.o: In function WFServerBase::WFServerBase(WFServerParams const*)': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:56: undefined reference to vtable for WFServerBase'
/tmp/cc1vPAdt.o: In function WFServerBase::start(unsigned short)': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:69: undefined reference to WFServerBase::start(int, char const*, unsigned short, char const*, char const*)'
/tmp/cc1vPAdt.o: In function WFServerBase::stop()': /home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:133: undefined reference to WFServerBase::shutdown()'
/home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:134: undefined reference to WFServerBase::wait_finish()' /tmp/cc1vPAdt.o: In function WFServerBase::~WFServerBase()':
/home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFServer.h:52: undefined reference to vtable for WFServerBase' /tmp/cc1vPAdt.o: In function WFServer<protocol::HttpRequest, protocol::HttpResponse>::new_session(long long, CommConnection*)':
/home/XXXXX/personal/gitee/dl4cpp/./sogou-workflow/include/WFHttpServer.h:53: undefined reference to WFServerTaskFactory::create_http_task(std::function<void (WFNetworkTask<protocol::HttpRequest, protocol::HttpResponse>*)>&)' /tmp/cc1vPAdt.o:(.rodata._ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE[_ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE]+0x20): undefined reference to WFServerBase::handle_unbound()'
/tmp/cc1vPAdt.o:(.rodata._ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE[_ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE]+0x28): undefined reference to WFServerBase::create_listen_fd()' /tmp/cc1vPAdt.o:(.rodata._ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE[_ZTV8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE]+0x30): undefined reference to WFServerBase::new_connection(int)'
/tmp/cc1vPAdt.o:(.rodata._ZTVN8protocol12RedisMessageE[_ZTVN8protocol12RedisMessageE]+0x10): undefined reference to protocol::RedisMessage::encode(iovec*, int)' /tmp/cc1vPAdt.o:(.rodata._ZTVN8protocol12RedisMessageE[_ZTVN8protocol12RedisMessageE]+0x28): undefined reference to protocol::RedisMessage::append(void const*, unsigned long*)'
/tmp/cc1vPAdt.o:(.rodata._ZTVN8protocol12RedisMessageE[_ZTVN8protocol12RedisMessageE]+0x48): undefined reference to non-virtual thunk to protocol::RedisMessage::append(void const*, unsigned long*)' /tmp/cc1vPAdt.o:(.rodata._ZTI8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE[_ZTI8WFServerIN8protocol11HttpRequestENS0_12HttpResponseEE]+0x10): undefined reference to typeinfo for WFServerBase'
collect2: error: ld returned 1 exit status
GNUmakefile:15: recipe for target 'src/main.o' failed
make: *** [src/main.o] Error 1

有相关的user_guide文档么

是否有一个user_guide文档,整体上介绍workflow的整体架构,以及核心的数据结构和API。
现在只看到一些不同场景的tutorial例子,了解起来比较分散

SRPC项目正式GitHub开源了!

作为C++ Workflow生态的最重要一个项目,SRPC已经在GitHub开源。

项目地址:https://github.com/sogou/srpc
SRPC是一个工程学角度设计完美的rpc项目,具有以下功能特点:

  • 支持pbthrift格式的接口描述文件,一键迁移基于thrift的项目,性能更优。
  • 支持baidu-std协议,支持thrift framed协议,与brpcapache thrift互通。支持sogou-std协议,这个协议同时支持pbthrift格式的接口描述。
  • 支持binaryhttp/jsonhttp/binary的传输,对业务无感。
  • 性能极其优越,实测使用baidu-std协议时,性能大多数场景大幅超越原生brpc。而且更为易用。
  • 支持Windows。你可以在Windows上运行brpc了。相同配置的机器,Windows性能优于Linux(iocp厉害)。
  • 与C++ Workflow无缝结合,各种任务流,计算调度,upstream,负载均衡,各种workflow内置协议无缝融合。说白了,SRPC是workflow上的一套协议。
  • 搜狗搜索实际业务锤炼,稳定可靠。

强烈推荐。SRPC优秀的表现也印证了Sogou C++ Workflow的强大功能。

一个已知的网络相关问题

通讯引擎在handler线程被占满,或者用户callback执行时间过长,可能会出现对方断开连接而本地没有及时关闭socket fd,从而导致本地出现CLOSE_WAIT状态的TCP连接。这个问题不会造成严重后果,一般也不可见,但可能影响短连接服务的性能。
出现这个问题的原因是,通讯器里,连接上下文与socket fd的生命周期是一致的,在Communicator::realase_conn()里一起被释放:

void Communicator::release_conn(struct CommConnEntry *entry)
{
    delete entry->conn;
    if (!entry->service)
        pthread_mutex_destroy(&entry->mutex);

    if (entry->ssl)
        SSL_free(entry->ssl);

    close(entry->sockfd);
    free(entry);
}

连接上下文类似一个shared_ptr,用户在callback过程中可以引用连接上下文,于是这个上下文至少需要保持到callback结束。在此过程中,连接被对方断开,本地socket fd无法立即关闭。同样,如果系统过于繁忙,没有handler线程处理结果,也会出现这个问题。
解决方案非常明确,我们需要在处理网络事件的poller线程里就关闭socket fd,而不是先把结果放进消息队列等待handler处理。于是我们需要把消息队列从poller里独立出来,poller通过callback的方式返回结果。这么做我们可能还可以减少消息队列的进出次数,对性能有一定帮助。此外,callback方式的poller对于我们实现streaming通讯引擎至关重要。

关于异步写操作的实现以及poller的几个操作

咨询一下,在wget wget_to_redis http_echo_server等tutorial测试中发现,向对等方发送消息均是通过Communicator::send_message_sync中的writev(entry->sockfd, vectors, cnt <= IOV_MAX ? cnt : IOV_MAX)来实现,那epoll.c中的__poller_handle_write的功能是?向对方发送消息时为何不用这个epoll.c中__poller_handle_write?__poller_handle_event和__poller_handle_notify具体的应用场景是什么那?(可以基于具体的场景或者业务谈一下设计时的考虑)
image
image

keepalive、idle状态对应alive_list、idle_list处理机制

第一个问题:当CommConnEntry处于CONN_STATE_KEEPALIVE状态时,add entry alive_list;当CommConnEntry处于CONN_STATE_IDLE状态时,add entry idle_list;alive_list与idle_list有何区别?CONN_STATE_KEEPALIVE状态与CONN_STATE_IDLE状态有何区别?alive_list与idle_list释放的entry时机在什么情况下发生?
第二个小问题:ref的主要功能是什么那?CommService中ref与CommConnEntry中ref区别?(entry->ref handle前加1,handle后减1)
第三个问题:以下宏中CONN_STATE_RECEIVING的含义是?(不知为何没有CONN_STATE_SEND状态);
#define CONN_STATE_CONNECTING 0
#define CONN_STATE_CONNECTED 1
#define CONN_STATE_RECEIVING 2
#define CONN_STATE_SUCCESS 3
#define CONN_STATE_IDLE 4
#define CONN_STATE_KEEPALIVE 5
#define CONN_STATE_CLOSING 6
#define CONN_STATE_ERROR 7

基于counter实现多入边节点(node为什么基于WFCounterTask实现)

第一类问题:WFGraphNode是DAG 其中的一个节点(node),但为什么是基于WFCounterTask类实现的(node和WFCounterTask难道有什么关系吗);WFContainerTask也是基于WFCounterTask实现的,能否也解释下?多入边的节点都需要一个counter实现,这是有什么历史背景吗?还是有和渊源?我理解一个DAG 包含多个node(每个node其实就是一个任务),但是不清楚workflow中基于WFCounterTask类实现的node是基于何种场景考虑的?(node为什么基于WFCounterTask实现?)
第二类问题(仅仅是个人建议):在具体的task实现中,FileIOTask等类似的任务实现是在WFTaskFactory.cc中实现的,能否可以像HttpTaskInpl.cc似的,单独弄一个文件完成FileIOTask的实现,其他任务实现也类似这么做(目前实现了http,redis等常规的)(这样的好处是比较容易理解,通过文件了解整体框架脉络(一层一层继承),比较清楚);

How to get multi-part form file from the http request?

Hi, guys,

I want to use the workflow to build a RESTful server which can receiver image file from client and do some AI works. As the title says, how to get multi-part form file from the http request? Can you share a code example?

Thanks!

tcp/udp server

看例子只有http的,可以作为tcp,udp服务器吗?

有关windows编译问题

在linux下轻松的编译过了, 但是...

我切换到windows分之后, 编译无法通过, 希望有相关的文档可以参考,

修改代码 添加openssl库
[src/CMakeLists.txt :2]

  • list(APPEND CMAKE_PREFIX_PATH "D:/workflow/openssl-v1.0.0-x86")

cmake配置: Visual Studio 12 x86 Release

报错如下
[main] 正在生成文件夹: workflow
[build] 正在启动生成
...
...
[build] D:\workflow\src\util\URIParser.cc(351): warning C4996: '_strdup': The POSIX name for this item is deprecated. Instead, use the ISO C and C++ conformant name: _strdup. See online help for details. [D:\workflow\build\src\util\util.vcxproj]

[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(235): error C2011: “sockaddr”:“struct”类型重定义 (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]

[build] D:\workflow\src\client\WFMySQLConnection.cc(41): warning C4996: '_strdup': The POSIX name for this item is deprecated. Instead, use the ISO C and C++ conformant name: _strdup. See online help for details. [D:\workflow\build\src\client\client.vcxproj]
[build] D:\workflow_include\workflow\RedisMessage.h(329): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\HttpTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow_include\workflow\RedisMessage.h(329): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\MySQLTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\ucrt\string.h(143): note: 参见“_strdup”的声明
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(1007): note: 参见“sockaddr”的声明 (编译源文件 D:\workflow\src\server\WFServer.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\ucrt\string.h(143): note: 参见“_strdup”的声明
[build] D:\workflow\src\util\URIParser.cc(358): warning C4996: '_strdup': The POSIX name for this item is deprecated. Instead, use the ISO C and C++ conformant name: _strdup. See online help for details. [D:\workflow\build\src\util\util.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(62): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\HttpTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(437): error C2059: 语法错误:“常量” (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(437): error C3805: “常量”: 意外标记,应输入“}”或者“,” (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(67): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\HttpTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(572): warning C4005: “IN_CLASSA”: 宏重定义 (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(284): note: 参见“IN_CLASSA”的前一个定义 (编译源文件 D:\workflow\src\server\WFServer.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\ucrt\string.h(143): note: 参见“_strdup”的声明
[build] D:\workflow_include\workflow\HttpMessage.h(62): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\MySQLTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(67): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\MySQLTaskImpl.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow_include\workflow\RedisMessage.h(329): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\WFTaskFactory.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(62): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\WFTaskFactory.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(578): warning C4005: “IN_CLASSB”: 宏重定义 (编译源文件 D:\workflow\src\server\WFServer.cc) [D:\workflow\build\src\server\server.vcxproj]
[build] D:\workflow\src\util\URIParser.cc(365): warning C4996: '_strdup': The POSIX name for this item is deprecated. Instead, use the ISO C and C++ conformant name: _strdup. See online help for details. [D:\workflow\build\src\util\util.vcxproj]
[build] D:\workflow_include\workflow\HttpMessage.h(67): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\factory\WFTaskFactory.cc) [D:\workflow\build\src\factory\factory.vcxproj]
[build] D:\workflow\src\factory\WFTaskFactory.cc(55): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) [D:\workflow\build\src\factory\factory.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(290): note: 参见“IN_CLASSB”的前一个定义 (编译源文件 D:\workflow\src\server\WFServer.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(457): note: 参见“AF_IPX”的前一个定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(147): warning C4005: “AF_MAX”: 宏重定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc) [D:\workflow\build\src\manager\manager.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(476): note: 参见“AF_MAX”的前一个定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(185): warning C4005: “SO_DONTLINGER”: 宏重定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc) [D:\workflow\build\src\manager\manager.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(399): note: 参见“SO_DONTLINGER”的前一个定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc)
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\shared\ws2def.h(235): error C2011: “sockaddr”:“struct”类型重定义 (编译源文件 D:\workflow\src\manager\RouteManager.cc) [D:\workflow\build\src\manager\manager.vcxproj]
[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(1007): note: 参见“sockaddr”的声明 (编译源
.
.
.
.
.

[build] C:\Program Files (x86)\Windows Kits\10\Include\10.0.14393.0\um\winsock.h(1007): note: 参见“sockaddr”的声明 (编译源文件 D:\workflow\src\protocol\HttpUtil.cc)
[build] D:\workflow_include\workflow\Communicator.h(204): error C2227: “->sa_family”的左边必须指向类/结构/联合/泛型类型 (编译源文件 D:\workflow\src\protocol\HttpUtil.cc) [D:\workflow\build\src\protocol\protocol.vcxproj]
[build] d:\workflow\src\protocol\HttpMessage.h(62): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\protocol\HttpUtil.cc) [D:\workflow\build\src\protocol\protocol.vcxproj]
[build] d:\workflow\src\protocol\HttpMessage.h(67): warning C4800: “int”: 将值强制为布尔值“true”或“false”(性能警告) (编译源文件 D:\workflow\src\protocol\HttpUtil.cc) [D:\workflow\build\src\protocol\protocol.vcxproj]
[build] 生成已完成,退出代码为 1

Got no_reply error for my customized protocol

Hi, I'm playing with workflow + protobuf, here's my example:

the server

#include <workflow/ProtocolMessage.h>
#include <workflow/WFTask.h>
#include <workflow/WFServer.h>
#include <workflow/WFFacilities.h>
#include "proto/common.pb.h"
using namespace std;
namespace protocol {
    class OrderMessage : public ProtocolMessage {
    public:
        trade::Order order;
    private:
        int encode(struct iovec *vectors, int max) override {
            int size = order.ByteSizeLong();
            void *buffer = new char[size];
            order.SerializePartialToArray(buffer, size);
            vectors[0].iov_base = buffer;
            vectors[0].iov_len = size;
            return 1;
        }
        int append(const void *buf, size_t *size) override {
            order.ParsePartialFromArray(buf, size[0]);
            return 1;
        }
    };
}
using WFTestTask = WFNetworkTask<protocol::OrderMessage, protocol::OrderMessage>;
void process(WFTestTask *task)
{
    protocol::OrderMessage *req = task->get_req();
    protocol::OrderMessage *resp = task->get_resp();
    cout << req->order.price() << endl;
    resp->order.set_price("123");
}

static WFFacilities::WaitGroup wait_group(1);

int main()
{
    using WFTestServer = WFServer<protocol::OrderMessage, protocol::OrderMessage>;
    WFServerParams params = SERVER_PARAMS_DEFAULT;
    WFTestServer server(&params, process);
    if (server.start(AF_INET, 8080) == 0) {
        cout << "success!" << endl;
        wait_group.wait();
        server.stop();
    } else {

    }
    return 0;
}

the client

using WFTestTask = WFNetworkTask<protocol::OrderMessage, protocol::OrderMessage>;
using tutorial_callback_t = std::function<void (WFTestTask *)>;
class MyFactory : public WFTaskFactory
{
public:
    static WFTestTask *create_tutorial_task(const std::string& host,
                                                unsigned short port,
                                                int retry_max,
                                                tutorial_callback_t callback)
    {
        using NTF = WFNetworkTaskFactory<protocol::OrderMessage, protocol::OrderMessage>;
        WFTestTask *task = NTF::create_client_task(TT_TCP, host, port,
                                                       retry_max,
                                                       std::move(callback));
        task->set_keep_alive(30 * 1000);
        return task;
    }
};

int main()
{
    std::string host = "0.0.0.0";
    int port = 8080;
    std::function<void (WFTestTask *task)> callback =
    [&host, port, &callback](WFTestTask *task) {
        int state = task->get_state();
        int error = task->get_error();
        cout << state << " " << error << endl;
        if (state != WFT_STATE_SUCCESS)
        {
            if (state == WFT_STATE_SYS_ERROR)
                fprintf(stderr, "SYS error: %s\n", strerror(error));
            else if (state == WFT_STATE_DNS_ERROR)
                fprintf(stderr, "DNS error: %s\n", gai_strerror(error));
            else
                fprintf(stderr, "other error.\n");
            return;
        }
        protocol::OrderMessage *resp = task->get_resp();
        cout << resp->order.price() << endl;
    };
    WFFacilities::WaitGroup wait_group(1);
    WFTestTask *task = MyFactory::create_tutorial_task(host, port, 0, callback);
    auto req = task->get_req();
    req->order.set_price("333");
    task->start();
}

and I got the no reply error (namely state = 2, error = 0) on the client side. Any suggestion? Thanks !

运行教程中的parallel_wget遇到错误`ERROR! state = 1, error = 11`

背景

修改了一下tutorial-06-parallel_wget.cc的源码,同时发起1000个http请求(每个请求等待5秒后简单的返回hello world),前面的请求正常返回,后面的请求遇到了ERROR! state = 1, error = 11的错误。

在http服务里记录了请求数量,发现实际只有200个请求,似乎在哪里有请求数量限制。

问题

想知道这个错误的意义是什么,这个200个请求的限制是http client带来的还是并行任务处理器带来的。

基础任务层WFNetworkTask中keep_alive_timeo超时问题

咨询个小问题,在get_idle_conn的方法中,遍历idle_list,并设置sockfd的timeout为-1;那这个设置的sockfd time为-1,一直处于等待状态,直到数据处理完,才会设置此sockfd time为keepalive time(如果这么理解也有问题,我看代码部份实现中epolladd 设置的时间为keepalive time,并设置idle_list时间也为keepalive time)?
image
image
image

在server的process函数里关停server的方法(Shutdown server in server’s process function)

例如当收到/stop的请求URL是,http server关闭,方法如下:

#include <string.h>
#include <atomic>
#include “workflow/WFHttpServer.h”

extern void process(WFHttpTask *task);
WFHttpServer server(process);

void process(WFHttpTask *task)
{
    if (strcmp(task->get_req()->get_request_uri(), “/stop”) == 0)
    {
        static std::atomic<int> flag;

        if (flag++ == 0)
            server.shutdown();

        task->get_resp()->append_output_body(“<html>server stop</html>”);
        return;
    }

    /* Server’s logic */
    //  ....
}

int main()
{
    if (server.start(8888) == 0)
        server.wait_finish();

    return 0;
}

因为在WFServer.h里:

class WFServerBase : protected CommService
{
    ...
public:
    void stop()
    {
        this->shutdown();
        this->wait_finish();
    }

    void shutdown();
    void wait_finish();
};

Server的stop方法无非就是shutdown+wait_finish,上述的方法就是将关停和等待结束分别在两个线程里调用(注意shutdown只能调用一次,因此我们用了原子变量保护),所以肯定是严密的。但显然在process里直接调用stop则是一种错误。
同理,关闭很多个server最好的方法是:

server1.start();
server2.start();
server3.start();
pause(); // or better: wait_group.wait();
server1.shutdown();
server2.shutdown();
server3.shutdown();
server1.wait_finish();
server2.wait_finish();
server3.wait_finish();

link errors on mac

Hi, I got the following build errors on mac, any suggestion? Thanks!

[ 33%] Linking CXX executable test
Undefined symbols for architecture x86_64:
  "_BIO_free", referenced from:
      Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
  "_BIO_new_socket", referenced from:
      Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
  "_CONF_modules_unload", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_CRYPTO_THREADID_set_callback", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_CRYPTO_cleanup_all_ex_data", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_CRYPTO_num_locks", referenced from:
      __SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_CRYPTO_set_locking_callback", referenced from:
      __SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_ENGINE_cleanup", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_ERR_free_strings", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_ERR_remove_thread_state", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
      ___poller_thread_routine in libworkflow.a(poller.c.o)
  "_EVP_cleanup", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_FIPS_mode_set", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_MD5_Final", referenced from:
      MD5Util::md5_bin(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_string_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_string_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_integer_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_integer_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
  "_MD5_Init", referenced from:
      MD5Util::md5_bin(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_string_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_string_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_integer_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_integer_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
  "_MD5_Update", referenced from:
      MD5Util::md5_bin(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_string_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_string_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_integer_32(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
      MD5Util::md5_integer_16(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) in libworkflow.a(MD5Util.cc.o)
  "_SSL_COMP_get_compression_methods", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_SSL_CTX_free", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_SSL_CTX_new", referenced from:
      __SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_SSL_CTX_set_verify", referenced from:
      WFServerBase::init(sockaddr const*, unsigned int, char const*, char const*) in libworkflow.a(WFServer.cc.o)
  "_SSL_CTX_use_PrivateKey_file", referenced from:
      WFServerBase::init(sockaddr const*, unsigned int, char const*, char const*) in libworkflow.a(WFServer.cc.o)
  "_SSL_CTX_use_certificate_file", referenced from:
      WFServerBase::init(sockaddr const*, unsigned int, char const*, char const*) in libworkflow.a(WFServer.cc.o)
  "_SSL_accept", referenced from:
      ___poller_thread_routine in libworkflow.a(poller.c.o)
  "_SSL_connect", referenced from:
      ___poller_thread_routine in libworkflow.a(poller.c.o)
  "_SSL_free", referenced from:
      Communicator::release_conn(CommConnEntry*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_incoming_request(poller_result*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_incoming_reply(poller_result*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_reply_result(poller_result*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_request_result(poller_result*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
      ...
  "_SSL_get_error", referenced from:
      CommMessageIn::feedback(char const*, unsigned long) in libworkflow.a(Communicator.cc.o)
      ___poller_handle_ssl_error in libworkflow.a(poller.c.o)
  "_SSL_library_init", referenced from:
      __SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_SSL_load_error_strings", referenced from:
      __SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_SSL_new", referenced from:
      Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
  "_SSL_read", referenced from:
      ___poller_thread_routine in libworkflow.a(poller.c.o)
  "_SSL_set_bio", referenced from:
      Communicator::handle_listen_result(poller_result*) in libworkflow.a(Communicator.cc.o)
      Communicator::handle_connect_result(poller_result*) in libworkflow.a(Communicator.cc.o)
  "_SSL_shutdown", referenced from:
      ___poller_thread_routine in libworkflow.a(poller.c.o)
  "_SSL_write", referenced from:
      CommMessageIn::feedback(char const*, unsigned long) in libworkflow.a(Communicator.cc.o)
      ___poller_thread_routine in libworkflow.a(poller.c.o)
  "_SSLv23_client_method", referenced from:
      __SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_SSLv23_server_method", referenced from:
      __SSLManager::__SSLManager() in libworkflow.a(WFGlobal.cc.o)
  "_sk_free", referenced from:
      __SSLManager::~__SSLManager() in libworkflow.a(WFGlobal.cc.o)
ld: symbol(s) not found for architecture x86_64
clang: error: linker command failed with exit code 1 (use -v to see invocation)
make[3]: *** [test] Error 1
make[2]: *** [CMakeFiles/test.dir/all] Error 2
make[1]: *** [CMakeFiles/test.dir/rule] Error 2
make: *** [test] Error 2

my CMakeLists.txt:

cmake_minimum_required(VERSION 3.13)
set(CMAKE_CXX_STANDARD 20)
find_package(WorkFlow REQUIRED)
add_executable(test test.cpp proto/common.pb.cc)
target_link_libraries(test -lworkflow -lprotobuf)

一直提示找不到openssl

在执行cmake的时候出现如下错误
-- Detecting CXX compile features - done
CMAKE_C_FLAGS_DEBUG is -g
CMAKE_C_FLAGS_RELEASE is -O3 -DNDEBUG
CMAKE_C_FLAGS_RELWITHDEBINFO is -O2 -g -DNDEBUG
CMAKE_C_FLAGS_MINSIZEREL is -Os -DNDEBUG
CMAKE_CXX_FLAGS_DEBUG is -g
CMAKE_CXX_FLAGS_RELEASE is -O3 -DNDEBUG
CMAKE_CXX_FLAGS_RELWITHDEBINFO is -O2 -g -DNDEBUG
CMAKE_CXX_FLAGS_MINSIZEREL is -Os -DNDEBUG
CMake Error at /usr/local/share/cmake-3.8/Modules/FindPackageHandleStandardArgs.cmake:137 (message):
Could NOT find OpenSSL, try to set the path to OpenSSL root folder in the
system variable OPENSSL_ROOT_DIR (missing: OPENSSL_INCLUDE_DIR)
Call Stack (most recent call first):
/usr/local/share/cmake-3.8/Modules/FindPackageHandleStandardArgs.cmake:377 (_FPHSA_FAILURE_MESSAGE)
/usr/local/share/cmake-3.8/Modules/FindOpenSSL.cmake:387 (find_package_handle_standard_args)
src/CMakeLists.txt:3 (find_package)
但是机器上已经安装了openssl,按照cmake ../ -DOPENSSL_ROOT_DIR=/usr/local/Cellar/[email protected]/1.1.1d -DOPENSSL_LIBRARIES=/usr/local/Cellar/[email protected]/1.1.1d/lib来指定执行也还是报这个错误,请教一下

[How to] Build on macOS

  • Install OpenSSL

    brew install openssl
    
  • Install CMake

    brew install cmake
    
  • Build with OpenSSL

    mkdir build && cd build
    cmake -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl  ..
    make -j8
    

FAQ(持续更新)

项目背景以及解决的问题

C++ Workflow项目起源于搜狗公司的分布式存储项目的通讯引擎,并且发展成为搜狗公司级C++标准,应用于搜狗大多数C++后端服务。项目将通讯与计算和谐统一,帮助用户建立通讯与计算关系非常复杂的高性能服务。但同时用户也可以只把它当成简易的异步网络引擎或并行计算框架来使用。

如何开始使用

以Linux系统为例:

$ git clone https://github.com/sogou/workflow
$ cd workflow
$ make
$ cd tutorial
$ make

然后就可以愉快的运行示例了。每个示例都有对应的文档讲解。如果需要用到kafka协议,请预先安装snappy和lz4,并且:

$ make KAFKA=y
$ cd tutorial
$ make KAFKA=y

另外,make DEBUG=y,可以编译调试版。通过make REDIS=n MYSQL=n UPSTREAM=n CONSUL=n可以裁剪掉一个或多个功能,让库文件减小到最低400KB,更加适合嵌入式开发。

与其它的网络引擎,RPC项目相比,有什么优势

  • 简单易上手,无依赖
  • 性能和稳定性优异benchmark
  • 丰富的通用协议实现
  • 通讯与计算统一
  • 任务流管理

与其它并行计算框架相比,有什么优势

  • 使用简单
  • 有网络

项目原生包含哪些网络协议

目前我们实现了HTTP,Redis,MySQL和kafka协议。除kafka目前只支持客户端以外,其他协议都是client+server。也就是说,用户可以用于构建Redis或MySQL协议的代理服务器。kafka模块是插件,默认不编译。

为什么用callback

我们用C++11 std::function类型的callback和process来包装用户行为,因此用户需要知道自己是在编写异步程序。我们认为callback方式比future或用户态协程能给程序带来更高的效率,并且能很好的实现通信与计算的统一。由于我们的任务封装方式以及std::function带来的便利,在我们的框架里使用callback并没有太多心智负担,反而非常简单明了。

callback在什么线程里调用

项目的一个特点是由框架来管理线程,除了一些很特殊情况,callback的调用线程必然是处理网络收发和文件IO结果的handler线程(默认数量20)或者计算线程(默认数量等于CPU总核数)。但无论在哪个线程里执行,都不建议在callback里等待或执行特别复杂的计算。需要等待可以用counter任务进行不占线程的wait,复杂计算则应该包装成计算任务。
需要说明的是,框架里的一切资源都是使用时分配。如果用户没有用到网络通信,那么所有和通信相关的线程都不会被创建。

为什么我的任务启动之后没有反应

int main(void)
{
    ...
    task->start();
    return 0;
}

这是很多新用户都会遇到的问题。框架中几乎所有调用都是非阻塞的,上面的代码在task启动之后main函数立刻return,并不会等待task的执行结束。正确的做法应该是通过某种方式在唤醒主进程,例如:

WFFaciliies::WaitGroup wait_group(1);

void callback(WFHttpTask *task)
{
    ....
    wait_group.done();
}

int main(void)
{
    WFHttpTask *task = WFTaskFactory::create_http_task(url, 0, 0, callback);
    task->start();
    wait_group.wait();
    return 0;
}

任务对象的生命周期是什么

框架中任何任务(以及SeriesWork),都是以裸指针形式交给用户。所有任务对象的生命周期,是从对象被创建,到对象的callback完成。也就是说callback之后task指针也就失效了,同时被销毁的也包括task里的数据。如果你需要保留数据,可以用std::move()把数据移走,例如我们需要保留http任务中的resp:

void http_callback(WFHttpTask *task)
{
    protocol::HttpResponse *resp = task->get_resp();
    protocol::HttpResponse *my_resp = new protocol::HttpResponse(std::move(*resp));
    /* or
    protocol::HttpResponse *my_resp = new protocol::HttpResponse;
    *my_resp = std::move(*resp);
    */
}

某些情况下,如果用户创建完任务又不想启动了,那么需要调用task->dismiss()直接销毁任务。
需要特别强调,server的process函数不是callback,server任务的callback发生在回复完成之后,而且默认为nullptr。

为什么SeriesWork(串行)不是一种任务

我们关于串并联的定义是:

  • 串行由任务组成
  • 并行由串行组成
  • 并行是一种任务

显然通过这三句话的定义我们可以递归出任意复杂的串并联结构。如果把串行也定义为一种任务,串行就可以由多个子串行组成,那么使用起来就很容易陷入混乱。同样并行只能是若干串行的并,也是为了避免混乱。其实使用中你会发现,串行本质上就是我们的协程。

我需要更一般的有向无环图怎么办

可以使用WFGraphTask,或自己用WFCounterTask来构造。
示例:https://github.com/sogou/workflow/blob/master/tutorial/tutorial-11-graph_task.cc

server是在process函数结束后回复请求吗

不是。server是在server task所在series没有别的任务之后回复请求。如果你不向这个series里添加任何任务,就相当于process结束之后回复。注意不要在process里等待任务的完成,而应该把这个任务添加到series里。

如何让server在收到请求后等一小段时间再回复

错误的方法是在process里直接sleep。正确做法,向server所在的series里添加一个timer任务。以http server为例:

void process(WFHttpTask *server_task)
{
    WFTimerTask *timer = WFTaskFactory::create_timer_task(100000, nullptr);
    server_task->get_resp()->append_output_body("hello");
    series_of(server_task)->push_back(timer);
}

以上代码实现一个100毫秒延迟的http server。一切都是异步执行,等待过程没有线程被占用。

怎么知道回复成功没有

首先回复成功的定义是成功把数据写入tcp缓冲,所以如果回复包很小而且client端没有因为超时等原因关闭了连接,几乎可以认为一定回复成功。需要查看回复结果,只需给server task设置一个callback,callback里状态码和错误码的定义与client task是一样的,但server task不会出现dns错误。

能不能不回复

可以。任何时候调用server task的noreply()方法,那么在原本回复的时机,连接直接关闭。

计算任务的调度规则是什么

我们发现包括WFGoTask在内的所有计算任务,在创建时都需要指定一个计算队列名,这个计算队列名可用于指导我们内部的调度策略。首先,只要有空闲计算线程可用,任务将实时调起,计算队列名不起作用。当计算线程无法实时调起每个任务的时候,那么同一队列名下的任务将按FIFO的顺序被调起,而队列与队列之间则是平等对待。例如,先连续启动n个队列名为A的任务,再连续启动n个队列名为B的任务。那么无论每个任务的cpu耗时分别是多少,也无论计算线程数多少,这两个队列将近倾向于同时执行完毕。这个规律可以扩展到任意队列数量以及任意启动顺序。

为什么使用redis client时无需先建立连接

首先看一下redis client任务的创建接口:

class WFTaskFactory
{
public:
    WFRedisTask *create_redis_task(const std::string& url, int retry_max, redis_callback_t callback);
}

其中url的格式为:redis://:password@host:port/dbnum。port默认值为6379,dbnum默认值为0。
workflow的一个重要特点是由框架来管理连接,使用户接口可以极致的精简,并实现最有效的连接复用。框架根据任务的用户名密码以及dbnum,来查找一个可以复用的连接。如果找不到则发起新连接并进行用户登陆,数据库选择等操作。如果是一个新的host,还要进行DNS解析。请求出错还可能retry。这每一个步骤都是异步并且透明的,用户只需要填写自己的request,将任务启动,就可以在callback里得到请求的结果。唯一需要注意的是,每次任务的创建都需要带着password,因为可能随时有登陆的需要。
同样的方法我们可以用来创建mysql任务。但对于有事务需求的mysql,则需要通过我们的WFMySQLConnection来创建任务了,否则无法保证整个事务都在同一个连接上进行。WFMySQLConnection依然能做到连接和认证过程的异步性。

连接的复用规则是什么

大多数情况下,用户使用框架产生的client任务都是无法指定具体连接。框架会有连接的复用策略:

  • 如果同一地址端口有满足条件的空闲连接,从中选择最近一个被释放的那个。即空闲连接的复用是先进后出的。
  • 当前地址端口没有满足条件的空闲连接时:
    • 如果当前并发连接数小于最大值(默认200),立刻发起新连接。
    • 并发连接数已经达到最大值,任务将得到系统错误EAGAIN。
  • 并不是所有相同目标地址和端口上的连接都满足复用条件。例如不同用户名或密码下的数据库连接,就不能复用。

虽然我们的框架无法指定任务要使用的连接,但是我们支持连接上下文的功能。这个功能对于实现有连接状态的server非常重要。相关的内容可以参考关于连接上下文相关文档。

同一域名下如果有多个IP地址,是否有负载均衡

是的,我们会认为同一域名下的所有目标IP对等,服务能力也相同。因此任何一个请求都会寻找一个从本地看起来负载最轻的目标进行通信,同时也内置了熔断与恢复策略。同一域名下的负载均衡,目标都必须服务在同一端口,而且无法配置不同权重。负载均衡的优先级高于连接复用,也就是说会先选择好通信地址再考虑复用连接问题。

如何实现带权重或不同端口上的负载均衡

可以参考upstream相关文档。upstream还可以实现很多更复杂的服务管理需求。

chunked编码的http body如何最高效访问

很多情况下我们使用HttpMessage::get_parsed_body()来获得http消息体。但从效率角度上考虑,我们并不自动为用户解码chunked编码,而是返回原始body。解码chunked编码可以用HttpChunkCursor,例如:

#include "workflow/HttpUtil.h"

void http_callback(WFHttpTask *task)
{
    protocol::HttpResponse *resp = task->get_resp();
    protocol::HttpChunkCursor cursor(resp);
    const void *chunk;
    size_t size;

    while (cursor.next(&chunk, &size))
    {
        ...
    }
}

cursor.next操作每次返回一个chunk的起始位置指针和chunk大小,不进行内存拷贝。使用HttpChunkCursor之前无需判断消息是不是chunk编码,因为非chunk编码也可以认为整体就是一个chunk。

能不能在callback或process里同步等待一个任务完成

我们不推荐这个做法,因为任何任务都可以串进任务流,无需占用线程等待。如果一定要这样做,可以用我们提供的WFFuture来实现。请不要直接使用std::future,因为我们所有通信的callback和process都在一组线程里完成,使用std::future可能会导致所有线程都陷入等待,引发整体死锁。WFFuture通过动态增加线程的方式来解决这个问题。使用WFFuture还需要注意在任务的callback里把要保留的数据(一般是resp)通过std::move移动到结果里,否则callback之后数据会随着任务一起被销毁。

数据如何在task之间传递

最常见的,同一个series里的任务共享series上下文,通过series的get_context()和set_context()的方法来读取和修改。而parallel在它的callback里,也可以通过series_at()获到它所包含的各个series(这些series的callback已经被调用,但会在parallel callback之后才被销毁),从而获取它们的上下文。由于parallel也是一种任务,所以它可以把汇总的结果通过它所在的series context继续传递。
总之,series是协程,series context就是协程的局部变量。parallel是协程的并行,可汇总所有协程的运行结果。

Workflow和rpc的关系

在我们的架构里,rpc是workflow上的应用,或者说rpc是workflow上的一组协议实现。如果你有接口描述,远程接口调用的需求,一定要试用一下srpc,这是一个把workflow的功能发挥到极致又和workflow完美融合的rpc系统,同时兼容brpc和thrift协议且更快更易用,满足你的任何rpc需求。地址:https://github.com/sogou/srpc

Server的stop()操作完成时机

Server的stop()操作是优雅关闭,程序结束之前必须关闭所有server。stop()由shutdown()和wait_finish()组成,wait_finish会等待所有运行中server task所在series结束。也就是说,你可以在server task回复完成的callback里,继续向series追加任务。stop()操作会等待这些任务的结束。另外,如果你同时开多个server,最好的关闭方法是:

int main()
{
    // 一个server对象不能start多次,所以多端口服务需要定义多个server对象
    WFRedisServer server1(process);
    WFRedisServer server2(process);
    server1.start(8080);
    server2.start(8888);
    getchar(); // 输入回车结束
    // 先全部关闭,再等待。
    server1.shutdown();
    server2.shutdown();
    server1.wait_finish();
    server2.wait_finish();
    return 0;
}

如何在收到某个特定请求时,结束server

因为server的结束由shutdown()和wait_finish()组成,显然就可以在process里shutdown,在main()里wait_finish,例如:

#include <string.h>
#include <atomic>
#include “workflow/WFHttpServer.h”

extern void process(WFHttpTask *task);
WFHttpServer server(process);

void process(WFHttpTask *task) {
    if (strcmp(task->get_req()->get_request_uri(), “/stop”) == 0) {
        static std::atomic<int> flag;
        if (flag++ == 0)
            server.shutdown();
        task->get_resp()->append_output_body(“<html>server stop</html>”);
        return;
    }

    /* Server’s logic */
    //  ....
}

int main() {
    if (server.start(8888) == 0)
        server.wait_finish();

    return 0;
}

以上代码实现一个http server,在收到/stop的请求时结束程序。process中的flag是必须的,因为process并发执行,只能有一个线程来调用shutdown操作。

Server里需要调用非Workflow框架的异步操作怎么办

还是使用counter。在其它异步框架的回调里,对counter进行count操作。

void other_callback(server_task, counter, ...)
{
    server_task->get_resp()->append_output_body(result);
    counter->count();
}

void process(WFHttpTask *server_task)
{
    WFCounterTask *counter = WFTaskFactory::create_counter_task(1, nullptr);
    OtherAsyncTask *other_task = create_other_task(other_callback, server_task, counter);//非workflow框架的任务
    other_task->run();
    series_of(server_task)->push_back(counter);
}

注意以上代码里,counter->count()的调用可能先于counter的启动。但无论什么时序,程序都是完全正确的。

个别https站点抓取失败是什么原因

如果浏览器可以访问,但用workflow抓取失败,很大概率是因为站点使用了TLS扩展功能的SNI。可以通过全局配置打开workflow的客户端SNI功能:

    struct WFGlobalSettings settings = GLOBAL_SETTINGS_DEFAULT;
    settings.endpoint_params.use_tls_sni = true;
    WORKFLOW_library_init(&settings);

开启这个功能是有一定代价的,所有https站点都会启动SNI,相同IP地址但不同域名的访问,将无法复用SSL连接。
因此用户也可以通过upstream功能,只打开对某个确定域名的SNI功能:

#Include "workflow/UpstreamManager.h"

int main()
{
    UpstreamManager::upstream_create_weighted_random("www.sogou.com", false);
    struct AddressParams params = ADDRESS_PARAMS_DEFAULT;
    params.endpoint_params.use_tls_sni = true;
    UpstreamManager::upstream_add_server("www.sogou.com", "www.sogou.com", &params);
    ...
}

上面的代码把www.sogou.com设置为upstream名,并且加入一个同名的server,同时打开SNI功能。

怎么通过代理服务器访问http资源

方法一(只适用于http任务且无法重定向):
可以通过代理服务器的地址创建http任务,并重新设置request_uri和Host头。假设我们想通过代理服务器www.proxy.com:8080访问http://www.sogou.com/ ,方法如下:

task = WFTaskFactory::create_http_task("http://www.proxy.com:8080", 0, 0, callback);
task->set_request_uri("http://www.sogou.com/");
task->set_header_pair("Host", "www.sogou.com");

方法二(通用。但有些代理服务器只支持HTTPS。HTTP还是推荐用方法一):
通过带proxy_url的接口创建http任务:

class WFTaskFactory
{
public:
    static WFHttpTask *create_http_task(const std::string& url,
                                        const std::string& proxy_url,
                                        int redirect_max, int retry_max,
                                        http_callback_t callback);
};

其中proxy_url的格式为:http://user:[email protected]:port/
proxy只能是"http://"开头,而不能是"https://"。port默认值为80。
这个方法适用于http和https URL的代理,可以重定向,重定向时继续使用该代理服务器。

http server是否支持RESTful接口

推荐使用wfrest项目,这是基于workflow的一套RESTful API开发框架,项目地址:https://github.com/wfrest/wfrest

tutorial/wget not friendly in default, it seems to print the content of target

When I build the tutorial and try do download sth with wget, it print outs a lot of logs, such as below:

tutorial/wget   https://xxxx

#output 
HTTP/1.1 200 OK^M
Server: nginx/1.15.6^M
Date: Mon, 28 Dec 2020 01:49:37 GMT^M
Content-Type: application/octet-stream^M
Content-Length: 485985932^M
Last-Modified: Wed, 23 Dec 2020 11:28:39 GMT^M
Connection: close^M
ETag: "5fe329e7-1cf78e8c"^M
Accept-Ranges: bytes^M
^M
^_<8b>^H^@^V!¶]^@^Cì=isã6<96>ùì_<81>u¾Ø^ZÇÝ<92>ÝGº7SEK<94>Í<89>®PT;<94><8a><96> <8b>^SJÔð°¬Lå¿ï{^@xß2<9d>ÝT­k&-^Ax^O^OïÆ©<85>·Ôß^XÛ<85>é-é<9b><85>·Ün/×ß4ü÷^Vþ®¯¯Ù¿ð^Wÿ÷úí<87>«vû<9b>öÕ»ë«ÎÕuû^C<94>·¯®ß¶¿!o<9b>&$ëÏs&ä^[۲ܢveõ^?Ñ¿7­^SÒ"]kw°<8d>ǵKÚß^?^?õ]çmû^]^Y}Qz<8a>^DUöβu×°¶<97><84>H¦IXC<87>ØÔ¡ö^S]^^B^BÄ1^ZkJW&Ú<98>^LàßÑT<96>?<89>^Zmm8ı<{AÉÂZR¢o<97>o,<9b>,­<85>·¡[<97>!'g§^CcA·^N]<92>^^5<8d>'jë^O&uNÏ<89>nSDãx^Oÿ¢^K<97>¸<96>O<9b>±u©iB<99>§<9b>dg[;j»^G<9f>>o»¤6<99>]N/±?D<80>Íí-ë^MÚ<87>c6õ½s^YÐ
Ã"<99><94>^@í@«±<85>â5µ©±%0¬<89>:<9e>¨<8a>¬IêW¿<9b>îxÔWzòHS¤A<84>X¨E<80>^G^@|Db<9f><8c>%`âTºkJ<80><8d>ãã<80>®<96>^F^Rê^PkEt²²ì^M~^R¸^\kåî<81>-Ää<84>^RýѦ^TyI^^^N><8e>^Gêî)ÝFû^Wã¢!³<89>äC^B£A&^TÙi[[c¡<9b>æ^AÑè<8b>^Eݹ@*`öáA^SF<96>»7Ü5X^OP

能否不依赖OpenSSL?

这个项目很棒,一次编译过,示例代码也能正常运行。
我看了下代码,涉及到 SSL 的地方其实不多,能否改造一下,把这个依赖变为可选项呢?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.