PHP做为一种常用的服务器端脚本语言,被广泛用于Web研发。相比Java、C++、Golang等编程语言,PHP缺少多线程的支持,只能运用fork创建多个进程来实现并行处理。 因为进程之间并不共享内存堆栈和文件句柄,PHP只能借助Redis或APCu等内存数据库或共享内存来实现进程间的数据共享,编程的局限性很强。
所幸的是Swoole6增多了对多线程的支持,为PHP供给了一个稳定靠谱的多线程支持。此刻PHP亦能够创建多线程,更加有效地编写并发程序。
本文将深入介绍PHP的ZTS机制和Swoole6多线程的实现原理,帮忙PHP研发者彻底理解把握Swoole6多线程的运用。
进程与线程的对比
怎样创建线程
在 Linux C++ 中,能够运用多种方式来创建线程。最常用的办法是运用POSIX 线程(pthread)库。以下是经过 pthread 库创建线程的基本示例。 #include <pthread.h>
#include <iostream>
void* threadFunction(void* arg) {
int* num =static_cast<int*>(arg);
std::cout << "Hello from thread! Number: " << *num << std::endl;
return nullptr;
}
int main() {
pthread_t thread;
int num = 42;
// 创建线程
if (pthread_create(&thread, nullptr, threadFunction, &num) != 0) {
std::cerr << "Error creating thread" << std::endl;
return 1;
}
// 等待线程结束pthread_join(thread,nullptr);
std::cout << "Main thread ending." << std::endl;
return 0;
}能够运用 g++ 编译器编译此代码,而后执行就会创建多个线程并行地处理任务。 g++ -o test test.cpp -lpthread
./test
PHP ZTS
初期的PHP仅支持Apache服务器,做为Apache的prefork模块来运行,不支持Windows的IIS和Apache (worker threads)服务器。为认识决此问题,PHP加入了ZTS的支持,亦便是TSRM模块,能够在php-src/TSRM目录下找到相应的代码。
与Python GIL的实现区别,PHP ZTS无运用全局锁守护全局资源,而是一种thread_local的模式,将全局资源变成为了线程局部资源。
Python语言虽然供给了threading模块,实质是伪多线程,Python代码并不可并行执行,仅在出现阻塞IO时,让出了掌控权,利用IO等待的间隙,运行其他Python线程。而PHP ZTS多线程模式(例如:IIS+PHP)下,PHP 程序是并行执行的,但并不可读取到当前线程以外的资源。
PHP 底层的全局变量
在 PHP 的 Zend 引擎中,有有些全局变量和结构体用于存储运行时的状态和关联信息。以下是有些平常的全局变量,如 EG、PG 和 CG:
AG ZendVM 内存管理器
AG 保留了内存管理器关联的资源,重点的结构是:zend_mm_heap *mm_heap,PHP 所有变量的内存分配所有由zend_mm_alloc_heap(AG(mm_heap), size, ...)函数所实现。
GC_G ZendVM 垃圾回收器
GC_G是垃圾回收器对象,经过引用计数和循环引用分析、写时复制 (Copy-on-Write) 垃圾回收算法进行PHP变量的生命周期管理和资源回收。
EG (Executor Globals)
EG 是一个指向 executor_globals 结构的指针,包括了执行器的全局状态信息,包含当前执行的上下文、错误处理、安全上下文等。
重点字段: • current_execute_data: 指向当前正在执行的函数调用的执行数据结构• active_symbol_table: 当前活动的符号表,用于存储变量及其值• HashTable *function_table:函数表• HashTable *class_table:类表• zend_object *exception:运行时的反常• zend_vm_stack vm_stack:运行的函数调用栈PG (Persistent Globals)
PG 是一个指向 persistent_globals 结构的指针,包括了持久化(跨请求)全局状态信息,重点用于存储在请求之间保持不变的数据。
重点字段: • auto_prepend_file: 自动包括在脚本执行前的文件• auto_append_file: 自动包括在脚本执行后的文件• display_errors: 掌控是不是表示错误的配置选项CG (Compiler Globals)
CG 是一个指向 compiler_globals结构的指针,包括了与编译关联的全局状态和信息,在PHP 代码的编译周期运用。
重点字段: • compiler_options: 编译选项的配置• active_symbol_table: 当前编译周期的活动符号表• open_files:当前打开的文件列表SG (SAPI Globals)
SG是一个用于存储与当前脚本执行关联的全局变量的结构。它重点用于管理与当前请求或执行上下文关联的信息。
重点字段: • request_info:包括与当前请求关联的信息,例如请求的 URI 和办法等。• sapi_headers:当前HTTP Headers• rfc1867_uploaded_files:当前上传的文件列表其他扩展的全局变量
除了ZendVM之外,加载的每一个扩展可能都运用全局变量保留了数据,例如: • BCG:bcmath• MYSQLND_G:mysqlnd
在php-src中运用ZEND_BEGIN_MODULE_GLOBALS定义全局变量。 ZEND_BEGIN_MODULE_GLOBALS(gmp)
bool rand_initialized;
gmp_randstate_t rand_state;
ZEND_END_MODULE_GLOBALS(gmp)TSRM 介绍
TSRM(Thread Safe Resource Management)是 PHP中的一种机制,旨在为多线程环境供给资源管理的线程安全支持。它准许多个线程安全地拜访和操作共享资源,保证在并发执行时不会出现数据竞争或状态不一致的问题。
TSRM由编译参数掌控,因此呢是不是开启ZTS决定于php-src编译时的选项。增多--enable-zts就能够开启ZTS。
NTS
以AG为例,在NTS下AG(mm_heap)宏展开后是:alloc_globals.mm_heap,实质定义是 staticzend_alloc_globals alloc_globals;即进程全局变量,此全局变量保留了所有内存分配器的资源。
ZTS
在ZTS下宏展开后实质的符号是: (((zend_alloc_globals *) (((char*) tsrm_get_ls_cache())+(alloc_globals_offset)))->mm_heap)而tsrm_get_ls_cache()函数便是获取一个Thread Local变量,在Linux系统下运用了pthread_getspecific()实现。
pthread_getspecific 是 POSIX线程库中的一个函数,用于在多线程程序中拜访与特定线程关联的线程局部存储(Thread Local Storage, TLS)数据。该函数准许线程获取已存储的特定数据指针,这些指针是在先前经过 pthread_setspecific 存储的。
另一一个关键的函数是ts_resource_ex(),在线程创建时分配内存,调用pthread_setspecific设置为TLS指针。 /* fetches the requested resource for the current thread */
TSRM_API void *ts_resource_ex(ts_rsrc_id id, THREAD_T *th_id) {
...
if(!thread_resources) {allocate_new_resource(&tsrm_tls_table[hash_value], thread_id);
tsrm_mutex_unlock(tsmm_mutex);
return ts_resource_ex(id, &thread_id);
}
}
总结
这些全局资源和关联的规律构成为了ZendVM,在ZTS模式下,底层的全局变量被编译为了TLS线程局部变量。这就相当于每一个线程都有一个独立的ZendVM环境,彼此是隔离的。因此呢ZTS模式下,即便在同一个线程内,实质上程序中创建的全局变量或资源,例如:$_GET/$_POST/$_FILES或其他运用global $vars,以及include $file等均为TLS资源,只能在当前线程内运用。
这相当于是PHP层面,线程变成为了进程,但在底层视角(C/C++)仍然是共享堆栈的线程环境。
Swoole6 线程
因为Swoole运用了C++11,因此呢能够直接运用C++标准的多线程支持,而不是直接运用pthread底层库。
创建线程static PHP_METHOD(swoole_thread, __construct) {
char *script_file;
size_t l_script_file;
zval *args;
int argc;
ZendArray *argv = nullptr;
ZEND_PARSE_PARAMETERS_START(1, -1)
Z_PARAM_STRING(script_file, l_script_file)
Z_PARAM_VARIADIC(+, args, argc)
ZEND_PARSE_PARAMETERS_END();if (l_script_file < 1) {
zend_throw_exception(swoole_exception_ce, "exec file name is empty", SW_ERROR_INVALID_PARAMS);
return;
}
ThreadObject *to = thread_fetch_object(Z_OBJ_P(ZEND_THIS));
zend_string *file = zend_string_init(script_file, l_script_file,1);
if (argc > 0) {
argv = newZendArray();for (int i = 0; i < argc; i++) {
argv->append(&args[i]);
}
}
try {
to->thread = new std::thread([file, argv]() { php_swoole_thread_start(file, argv); });
}catch (const std::exception &e) {
zend_throw_exception(swoole_exception_ce, e.what(), SW_ERROR_SYSTEM_CALL_FAIL);
return;
}
zend_update_property_long(
swoole_thread_ce, SW_Z8_OBJ_P(ZEND_THIS), ZEND_STRL("id"), (zend_long) to->thread->native_handle());
}底层直接运用了C++的std::thread创建线程,子线程会执行php_swoole_thread_start()函数初始化子线程。
构造办法接受2个参数,第1个是子线程要执行的脚本文件,第二个是线程参数数组。
线程初始化void php_swoole_thread_start(zend_string *file, ZendArray *argv) {
thread_num.fetch_add(1);
ts_resource(0);
#if defined(COMPILE_DL_SWOOLE) && defined(ZTS)
ZEND_TSRMLS_CACHE_UPDATE();
#endif
zend_file_handle file_handle{};
zval global_argc, global_argv;
PG(expose_php) = 0;
PG(auto_globals_jit) = 1;
#if PHP_VERSION_ID >= 80100
PG(enable_dl) = false;
#else
PG(enable_dl) = 0;
#endif
swoole_thread_init();
if (php_request_startup() != SUCCESS) {
EG(exit_status) = 1;
goto _startup_error;
}
PG(during_request_startup) = 0;
SG(sapi_started) = 0;
SG(headers_sent) = 1;
SG(request_info).no_headers = 1;
SG(request_info).path_translated = request_info.path_translated;
SG(request_info).argc = request_info.argc;
zend_stream_init_filename(&file_handle, ZSTR_VAL(file));
file_handle.primary_script = 1;
zend_first_try {
thread_bailout = EG(bailout);
if (request_info.argv_serialized) {
php_swoole_unserialize(request_info.argv_serialized, &global_argv);
ZVAL_LONG(&global_argc, request_info.argc);
zend_hash_update(&EG(symbol_table), ZSTR_KNOWN(ZEND_STR_ARGV), &global_argv);
zend_hash_update(&EG(symbol_table), ZSTR_KNOWN(ZEND_STR_ARGC), &global_argc);
}
if (argv) {
argv->toArray(&thread_argv);
argv->del_ref();
}
php_swoole_thread_register_stdio_file_handles(true);
php_execute_script(&file_handle);
}
zend_end_try();
zend_destroy_file_handle(&file_handle);
php_request_shutdown(NULL);
file_handle.filename = NULL;
_startup_error:
zend_string_release(file);
ts_free_thread();
swoole_thread_clean();
thread_num.fetch_sub(1);
}
关键的几个流程: • ts_resource 运用 TSRM API 分配了 TLS 资源• php_request_startup 在子线程内执行 RINIT ,这会调用所有扩展的RINIT函数• php_execute_script 在子线程内执行PHP脚本• php_request_shutdown 执行RSHUTDOWN函数• ts_free_thread 运用 TSRM API 释放 TLS 资源线程结束后,会调用std::thread的join()办法回收线程。
线程创建的线程就能够并行地执行了,但每一个线程彼此是完全隔离的,这和多进程并无区别。接下来就需要实现线程资源的共享。
ThreadResource
Swoole底层封装了ThreadResource来管理跨线程的共享资源。这个类运用引用计数来管理内存。底层运用了atomic来增多、减少引用计数,因此呢不需要加锁。当无任何线程持有此资源时就会执行delete释放对象。 class ThreadResource {
sw_atomic_t ref_count;
public:
ThreadResource() {
ref_count = 1;
}
void add_ref() {
sw_atomic_add_fetch(&ref_count, 1);
}
void del_ref() {
if(sw_atomic_sub_fetch(&ref_count,1) == 0) {
delete this;
}
}
protected:
virtual ~ThreadResource() {}
};包含以下对象,均继承了ThreadResource: • Swoole\Thread\Atomic• Swoole\Thread\Lock• Swoole\Thread\ArrayList• Swoole\Thread\Map• Swoole\Thread\Queue• Swoole\Thread\Barrier
这些对象能够安全地在线程之间传递。
ZendArray
ArrayList和Map运用了ZendVM供给的zend_array(persistent)来实现,因此呢内存是直接由glibc和malloc/free管理。针对数组的操作底层运用了RWLock来防止竞争。 class ZendArray : public ThreadResource {
protected:
swoole::RWLock lock_;
zend_array ht;static void item_dtor(zval *pDest) {
ArrayItem *item = (ArrayItem *) Z_PTR_P(pDest);
delete item;
}
public:
ZendArray() : ThreadResource(), lock_(0) {
zend_hash_init(&ht, 0, NULL, item_dtor, 1);
}
~ZendArray() override {
zend_hash_destroy(&ht);
}
...
void strkey_offsetGet(zval *zkey, zval *return_value) {
zend::String skey(zkey);
lock_.lock_rd();
ArrayItem *item = (ArrayItem *) zend_hash_find_ptr(&ht, skey.get());if (item) {
item->fetch(return_value);
}
lock_.unlock();
}
void strkey_offsetSet(zval *zkey, zval *zvalue) {
zend::String skey(zkey);
auto item = newArrayItem(zvalue);
item->setKey(skey);
lock_.lock();
zend_hash_update_ptr(&ht, item->key, item);
lock_.unlock();
}
...
}• 读操作运用lock_rd()共享锁,因此呢$map[key]这般的操作,多线程并行执行时不会显现竞争• 写操作运用lock()独霸锁,若多线程向同一个$map写入时会显现竞争ArrayItem
所有写入线程数据容器的元素,均运用此类操作。 • 数值:例如int、float、null、bool,直接复制其值• 字符串:需要完全复制字符串的内存• PHP对象:需要序列化后,做为字符串存储,读取时再进行反序列化• 资源:例如php socket、php stream、swoole co_socket需要进行dup(fd)对文件描述符增多一次引用计数,读取时再增多一次引用计数• 线程资源:调用ThreadResource::add_ref()增多引用计数,删除时减少引用计数• 数组:转为ArrayList或Map对象数据容器是支持嵌套结构的,例如Map中能够写入ArrayList,ArrayList中能够再添加一个Queue。
线程参数
线程参数本身是一个ArrayList对象,经过引用计数管理,在区别的线程之间传递。
Queue
Queue运用了C++的std::queue实现,它不仅是一个数据容器,还内置了线程要求变量(std::condition_variable),队列的消费者在队列为空时等待要求变量,生产者push()写入数据时能够唤醒队列的消费者。 struct Queue : ThreadResource {
std::queue<ArrayItem *> queue;
std::mutex lock_;
std::condition_variable cv_;
}等待void pop_wait(zval *return_value,double timeout) {
ArrayItem *item = nullptr;
std::unique_lock<std::mutex> _lock(lock_);
SW_LOOP {
if (!queue.empty()) {
item = queue.front();
queue.pop();
break;
} else {
if (timeout > 0) {
if (cv_.wait_for(_lock, std::chrono::duration<double>(timeout)) == std::cv_status::timeout) {
break;
}
}else {
cv_.wait(_lock);
}
// All threads have been awakened,
// but the data has already been acquired by other thread, returning NULL.
if (queue.empty()) {
RETVAL_NULL();
swoole_set_last_error(SW_ERROR_NO_PAYLOAD);
break;
}
}
}
_lock.unlock();
if(item) {
item->fetch(return_value);delete item;
}
}这儿有一个细节是队列弹出的元素转为PHP变量时,是在锁的同步区域之外,原由是pop之后仅当前的线程持有此元素,能够安全地进行操作,因此不需要加锁。
通告void push_notify(zval *zvalue, boolnotify_all) {
auto item = new ArrayItem(zvalue);
std::unique_lock<std::mutex> _lock(lock_);
queue.push(item);
if (notify_all) {
cv_.notify_all();
} else{
cv_.notify_one();
}
}调用了要求变量的notify_one()/notify_all()办法唤醒处在等待状态的消费者线程。
其他实现细节
1. 线程中的协程调度器
在线程中能够创建协程调度器,底层实现直接运用了C++的thread_local关键词来隔离全局变量。每一个线程的协程和异步IO环境是隔离的。包含: • EventLoop• Coroutine Scheduler• Timer• Async Threads• Logger
相比ZendVM的TLS要简单非常多,可读性更高。
#ifdef SW_THREAD
#defineSW_THREAD_LOCAL thread_local
extern std::mutex sw_thread_lock;
#else
#define SW_THREAD_LOCAL
#endif
SW_THREAD_LOCAL bool PHPCoroutine::activated = false;
SW_THREAD_LOCAL zend_array *PHPCoroutine:ptions =nullptr;2. Server 的多线程模式
多线程模式下将Worker进程、Task进程、UserWorker进程所有修改为线程的方式运行。因为线程模式下,没法复制线程的资源,需要在线程创建之后,重新创建一次。
工作线程是将一样的代码,再次执行一遍。例如 new Server 和 Server:n(),但worker线程不准许执行 Server::set() 办法。在 Server::start() 办法中,工作进程将进入 worker_thread_fn() 执行单元,而主线程则是创建线程,以及管理子线程,负责退出线程的重启和回收,以及shutdown。 static PHP_METHOD(swoole_server, start) {
zval *zserv = ZEND_THIS;
Server *serv = php_swoole_server_get_and_check_server(zserv);
#ifdef SW_THREAD
if(serv->is_worker_thread()) {
worker_thread_fn();
RETURN_TRUE;
}#endif
if (serv->is_started()) {
php_swoole_fatal_error(
E_WARNING, "server is running, unable to execute %s->start()", SW_Z_OBJCE_NAME_VAL_P(zserv));
RETURN_FALSE;
}
...
}3. AIO 线程池
AIO线程池是共享的,它是一个多对多的队列MMCQ (Many To Many Concurrent Queue),避免创建太多AIO线程。 async_thread_lock.lock();
if(!async_thread_pool) {
async_thread_pool = std::make_shared<async::ThreadPool>(
SwooleG.aio_core_worker_num, SwooleG.aio_worker_num, SwooleG.aio_max_wait_time, SwooleG.aio_max_idle_time);
}if(!async_thread_pool->is_running()) {
async_thread_pool->start();
}
pool = async_thread_pool;
async_thread_lock.unlock();需要为每一个PHP线程创建一个独立的管道来获取AIO线程池的通告。 class AsyncThreads {
public:
size_t task_num = 0;
Pipe *pipe = nullptr;
std::shared_ptr<async::ThreadPool> pool;
network::Socket *read_socket = nullptr;
network::Socket *write_socket =nullptr;
}结语
Swoole v6为PHP供给了一个稳定靠谱的多线程方法。Swoole的核心仍然是协程,多线程的支持只是为了补齐了Swoole的最后一起短板,相比APCu和Redis,多线程在数据和资源共享有巨大的优良。
除了当前供给的数据容器之外,将来Swoole会连续增多更加多高性能的多线程C++组件,持续加强多线程支持。
原文
https://mp.weixin.qq.com/s/HzPEg7g3PuN2Xky4EQfnHw
|