Apr
24
2023
分享两个用php写的队列服务woker代码
作者:
绝缘体.. 发布:
2023-04-24 12:21 分类:
未分类 阅读:
抢沙发
分享两个用php写的队列服务woker代码。
1. 单进程队列服务
<?php
include_once __DIR__ . DIRECTORY_SEPARATOR . '../../config.php';
include_once ROOT_DIR . 'lib/config.php';
include_once ROOT_DIR . 'lib/common.php';
include_once __DIR__ . DIRECTORY_SEPARATOR . 'Exception.php';
include_once ROOT_DIR . 'lib/ActionException.php';
ini_set('memory_limit', '32M');//设置最大内存使用量
/**
* @param int $errno
* @param string $errstr
* @param string $errfile
* @param int $errline
* @throws QueueException
*/
function error_handler($errno, $errstr, $errfile, $errline) {
$content = sprintf('队列程序运行过程中发生错误,文件:%s,第 %d 行,具体错误:%s,调用流程:%s', $errfile, $errline, $errstr, getCallTraceStr());
wlog($content, true, 5);
throw new QueueException($content, $errno);
}
function fatal_handle() {
$error = error_get_last();
if (isset($error['type']) && $error['type'] == E_ERROR) {
error_handler($error['type'], $error['message'], $error['file'], $error['line']);
}
}
$wsRedis = getWebSocketRedis();
QueueServer::run($argv, PHP_SCRIPT_USER);
/**
* Class QueueServer
* 单进程 redis 队列服务类
*
* @version 0.0.1
* @link https://www.phpernote.com/
* @author yhm.1234@163.com
*/
class QueueServer {
static $param = [];
static $masterPid = 0;
static $pidFile = '/dev/shm/' . PROJECT . '-qs.pid';
static $statusFile = '/dev/shm/' . PROJECT . '-qs.status';
static $owner = '';
static $processName = PROJECT . '-queue-server';
public static function run($param, $owner = '') {
exit("本程序为单进程运行,随时都会因为程序错误而导致进程终止,因此不推荐使用本程序,推荐使用本目录下的 multiProcessServer.php 多进程队列服务\r\n");
self::$param = $param;
self::$owner = $owner;
self::checkParam();
self::{$param[1]}();
}
/**
* 开启
* @throws QueueException
*/
private static function start() {
self::init();
self::daemonize();
self::status();
self::doSignal();
self::work();
}
private static function init() {
strtolower(php_sapi_name()) != 'cli' && exit('仅允许在cli模式下运行');
set_time_limit(0);
set_error_handler('error_handler');
register_shutdown_function('fatal_handle');
}
private static function work() {
while (true) {
#信号分发
pcntl_signal_dispatch();
$redis = getWebSocketRedis(false);
while ($value = $redis->blpop(QUEUE_MASTER, 6)) {
//self::log($value);
try {
if (!$value || !is_array($value) || empty($value[1])) {
throw new QueueException('消息体格式错误', 1011);
}
$value = json_decode($value[1], true);
if (json_last_error() || !$value || !is_array($value)) {
throw new QueueException('消息体格式错误', 1012);
}
$file = __DIR__ . DIRECTORY_SEPARATOR . 'consumer/' . $value['type'] . '.php';
if (!file_exists($file)) {
throw new QueueException('不存在的消费者类型', 1404);
}
require_once $file;
$value['type']::run($value);
$redis->incr(getQueueCountKey('success'));//队列任务成功处理的数
} catch (QueueException $exception) {
$redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
self::log('队列执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
} catch (ActionException $exception) {
$redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
self::log('执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
} catch (Exception $exception) {
$redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
self::log('发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
}
unset($value);
}
}
}
/**
* 停止
* @param bool $exit
*/
private static function stop($exit = true) {
/*if (!self::processHasRun()) {
exit("服务未运行\r\n");
}*/
if (!file_exists(self::$pidFile)) {
echo "警告:关闭服务过程中发现 pid 文件不存在\r\n";
} else {
if (!unlink(self::$pidFile)) {
echo "警告:关闭服务过程中删除 pid 文件失败\r\n";
}
}
exec('pkill -f ' . self::$processName);
$exit && exit("服务关闭成功\r\n");
}
/**
* 重启
*/
private static function restart() {
self::stop(false);
self::start();
}
/**
* 输出服务当前的状态信息
*/
private static function status() {
if (!self::processHasRun() || !file_exists(self::$pidFile)) {
exit("服务未运行\r\n");
}
$status = file_get_contents(self::$statusFile);
if (!$status) {
exit("状态文件不存在\r\n");
}
$status = explode('|', $status);
$redis = getWebSocketRedis();
$start_time = fileatime($status[1]);
$str = sprintf(
"系统当前时间:%s\r\n进程名称:%s\r\n主进程id:%d\r\n进程所属用户:%s\r\n启动时间:%s\r\n已运行:%s\r\n内存占用:%s\r\n内存占用峰值:%s\r\n" .
"处理任务数:丢包 %d 总 %d 入队成功 %d 入队失败 %d 处理成功 %d 处理失败 %d 待处理 %d\r\n",
date('Y-m-d H:i:s', getServerTimeNow()),
self::$processName,
$status[0],
self::$owner,
date('Y-m-d H:i:s', $start_time),
secToTime(getServerTimeNow() - $start_time),
getSizeDesc(memory_get_usage(true)),
getSizeDesc(memory_get_peak_usage(true)),
$redis->get(getQueueCountKey('failIncrTotal')),
$redis->get(getQueueCountKey('total')),
$redis->get(getQueueCountKey('stotal')),
$redis->get(getQueueCountKey('ftotal')),
$redis->get(getQueueCountKey('success')),
$redis->get(getQueueCountKey('fail')),
$redis->lLen(QUEUE_MASTER)
);
echo $str;
if ($GLOBALS['argv'] == 'status') {
exit();
}
}
/**
* 检查是否以守护进程的模式运行
* @throws QueueException
*/
private static function daemonize() {
if (!empty(self::$param[2]) && self::$param[2] == '-d') {
$pid = pcntl_fork();
if (-1 === $pid) {
exit("守护进程创建失败\r\n");
} elseif ($pid > 0) {
//主进程会在这里退出,下面的代码还会继续执行,不过是子进程接手继续执行的
exit(0);
}
posix_setsid();//设置新会话组长,脱离终端
//关闭打开的文件描述符
fclose(STDIN);
//fclose(STDOUT);
//fclose(STDERR);
}
self::single();
//设置进程名称
cli_set_process_title(self::$processName);
self::setServerProcessOwner();
self::$masterPid = posix_getpid();
file_exists(self::$pidFile) && unlink(self::$pidFile);
if (false === file_put_contents(self::$pidFile, self::$masterPid)) {
throw new QueueException('保存 pid 信息到 pid 文件:' . self::$pidFile . '失败');
}
chmod(self::$pidFile, 0777);
$status = sprintf('%d|%s', self::$masterPid, self::$pidFile);
if (false === file_put_contents(self::$statusFile, $status)) {
throw new QueueException('保存状态信息到文件:' . self::$statusFile . '失败');
}
chmod(self::$statusFile, 0777);
}
/**
* 记录日志
* @param $content
*/
private static function log($content) {
wlog($content, true, 5);
}
/**
* 单实例运行
*/
private static function single() {
if (self::processHasRun()) {
exit("程序已经在运行\r\n");
}
}
/**
* 检查执行时候传的参数
*/
private static function checkParam() {
if (empty(self::$param[1]) || !in_array(self::$param[1], ['start', 'stop', 'restart', 'status'])) {
exit('必须加参数,如:start|stop|restart|status');
}
}
/**
* 安装信号处理器
*/
private static function doSignal() {
pcntl_signal(SIGINT, function ($signo) {
//fprintf(STDOUT, "pid:" . posix_getpid() . "接收到一个信号,编号为:%d \n", $signo);
if ($signo == '2') {
self::stop();
}
});
}
/**
* 检验进程已经存在
* @return bool
*/
private static function processHasRun(): bool {
return (int)exec('ps -ef|grep ' . self::$processName . '|grep -v grep|wc -l') ? true : false;
}
/**
* 获取当前运行进程的执行用户名
* @return string mixed
*/
private static function getCurrentProcessUser() {
$user_info = posix_getpwuid(posix_getuid());
if (!$user_info) {
exit('获取当前运行进程的用户信息失败' . "\r\n");
}
return $user_info['name'];
}
/**
* 设置当前脚本所属的用户名和用户组
*/
private static function setServerProcessOwner() {
self::$owner = self::$owner ? self::$owner : self::getCurrentProcessUser();
$info = posix_getpwnam(self::$owner);
if (!$info) {
exit('设置进程所属用户的过程中获取用户 ' . self::$owner . ' 的信息失败' . "\r\n");
}
if (!posix_setgid($info['gid'])) {
echo '设置进程所属用户组为 ' . $info['gid'] . ' 失败' . "\r\n";
}
if (!posix_setuid($info['uid'])) {
echo '设置进程所属用户名为 ' . $info['uid'] . ' 失败' . "\r\n";
}
}
}
2. 多进程,master worker 模式的队列服务
<?php
include_once __DIR__ . DIRECTORY_SEPARATOR . '../../config.php';
include_once ROOT_DIR . 'lib/config.php';
include_once ROOT_DIR . 'lib/common.php';
include_once __DIR__ . DIRECTORY_SEPARATOR . 'Exception.php';
include_once ROOT_DIR . 'lib/ActionException.php';
ini_set('memory_limit', '32M');//设置最大内存使用量
$system_set = getSystemSet();
/**
* @param int $errno
* @param string $errstr
* @param string $errfile
* @param int $errline
* @throws QueueException
*/
function error_handler($errno, $errstr, $errfile, $errline) {
$content = sprintf('队列程序运行过程中发生错误,文件:%s,第 %d 行,具体错误:%s,调用流程:%s', $errfile, $errline, $errstr, getCallTraceStr());
wlog($content, true, 5, $GLOBALS['system_set'], ['action' => '', 'uid' => 0]);
throw new QueueException($content, $errno);
}
function fatal_handle() {
$error = error_get_last();
if (isset($error['type']) && $error['type'] == E_ERROR) {
error_handler($error['type'], $error['message'], $error['file'], $error['line']);
}
}
$wsRedis = getWebSocketRedis();
MultiProcessQueueServer::run();
/**
* Class MultiProcessQueueServer
* 多进程 redis 多队列服务类,支持多队列处理,不同的队列名可以设置不同的处理进程数
*
* @version 0.0.1
* @link https://www.phpernote.com/
* @author yhm.1234@163.com
*/
class MultiProcessQueueServer {
public static $param = [];
public static $statusFile = '/dev/shm/' . PROJECT . '-qs-multi.status';
public static $owner = PHP_SCRIPT_USER;
public static $processName = PROJECT . '-queue-server';
public static $workerNum = 2;
public static $masterPid = 0;
public static $childProcessList = []; // ['pid' => ['worker_id' => '', 'queue' => '']]
public static $queueList = [QUEUE_MASTER => ['worker_num' => 2]/*, QUEUE_TEST => ['worker_num' => 2]*/];
public static function start() {
self::single();
self::runMaster();
self::installSignal();
self::writeStatus();
self::status();
self::monitorWorkerProcess();
}
public static function stop($exit = true) {
exec('pkill -f \'' . self::getMasterProcessName() . '\'', $output);
foreach (self::$queueList as $key => $queue) {
for ($i = 1; $i <= $queue['worker_num']; $i++) {
exec('pkill -f \'' . self::getWorkerProcessName($key, $i) . '\'', $output);
}
}
if (file_exists(self::$statusFile) && !unlink(self::$statusFile)) {
echo "状态文件删除失败\r\n";
}
$exit && exit("服务关闭成功\r\n");
}
public static function restart() {
self::stop(false);
self::start();
}
public static function status() {
if (!self::masterProcessHasRun()) {
exit("服务未运行\r\n");
}
$status = file_get_contents(self::$statusFile);
if (!$status) {
exit("状态文件不存在\r\n");
}
$status = explode('|', $status);
$redis = getWebSocketRedis();
$start_time = fileatime(self::$statusFile);
$now = getServerTimeNow();
$str = sprintf(
"系统当前时间:%s\r\n启动时间:%s\r\n已运行:%s\r\n主进程名称:%s\r\n主进程id:%d\r\nworker进程id:%s\r\n进程所属用户:%s\r\n内存占用:%s\r\n内存占用峰值:%s\r\n当前内存占用:%s\r\n" .
"处理任务数:丢包 %d 总 %d 入队成功 %d 入队失败 %d 处理成功 %d 处理失败 %d 待处理 %d 未知 %d\r\n处理任务统计时间段:%s - %s \r\n",
date('Y-m-d H:i:s', $now),
date('Y-m-d H:i:s', $start_time),
secToTime($now - $start_time),
self::getMasterProcessName(),
$status[0],
implode(' ', array_slice($status, 1)),
self::$owner,
getSizeDesc(memory_get_usage(true)),
getSizeDesc(memory_get_peak_usage(true)),
getSizeDesc(self::getMemUsedTotal() * 1024),
$redis->get(getQueueCountKey('failIncrTotal')),
$redis->get(getQueueCountKey('total')),
$redis->get(getQueueCountKey('stotal')),
$redis->get(getQueueCountKey('ftotal')),
$redis->get(getQueueCountKey('success')),
$redis->get(getQueueCountKey('fail')),
$redis->lLen(QUEUE_MASTER),
$redis->get(getQueueCountKey('stotal')) - $redis->get(getQueueCountKey('success')) - $redis->get(getQueueCountKey('fail')),
date('Y-m-d H:i:s', $redis->get(getQueueCountKey('stime'))),
date('Y-m-d H:i:s', $now)
);
echo $str;
}
public static function initData() {
$redis = getWebSocketRedis(false);
$redis->set(getQueueCountKey('stime'), getServerTimeNow());//统计的开始时间的时间戳
$redis->set(getQueueCountKey('total'), 0);//发送数据的总数
$redis->set(getQueueCountKey('failIncrTotal'), 0);//数据压入队列时增长总数失败的个数
$redis->set(getQueueCountKey('stotal'), 0);//成功压入到队列的总数
$redis->set(getQueueCountKey('ftotal'), 0);//压入到队列失败的总数
$redis->set(getQueueCountKey('success'), 0);//任务处理成功的总数
$redis->set(getQueueCountKey('fail'), 0);//任务处理失败的总数
}
public static function writeStatus() {
$str = self::$masterPid . '|' . implode('|', array_keys(self::$childProcessList));
if (!file_put_contents(self::$statusFile, $str, LOCK_EX)) {
exit("写入信息到状态文件失败\r\n");
}
}
public static function run() {
exit("本程序仅能启动 worker 进程,没有定时任务进程,因此不推荐使用本程序,推荐使用本目录下的 multiProcessServer.php 多进程队列服务,该服务同时支持 worker 定时任务 子进程!\r\n");
self::init();
self::checkParams();
self::{$GLOBALS['argv'][1]}();
}
/**
* 单实例运行
*/
private static function single() {
if (self::masterProcessHasRun()) {
exit("程序已经在运行\r\n");
}
}
private static function installSignal() {
pcntl_signal(SIGINT, function ($signo) {
//fprintf(STDOUT, "pid:" . posix_getpid() . "接收到一个信号,编号为:%d \n", $signo);
if ($signo == '2') {
//echo self::$masterPid ."\r\n";print_r(self::$childProcessList);
self::stop();
}
});
}
private static function init() {
strtolower(php_sapi_name()) != 'cli' && exit('仅允许在cli模式下运行');
set_time_limit(0);
set_error_handler('error_handler');
register_shutdown_function('fatal_handle');
}
private static function checkParams() {
if (empty($GLOBALS['argv'][1])) {
exit("必须输入参数,必须参数如:start|stop|restart|status\r\n");
}
if (!in_array($GLOBALS['argv'][1], ['start', 'stop', 'restart', 'status', 'initData'])) {
exit("参数输入错误,正确参数如:start|stop|restart|status\r\n");
}
if (self::$workerNum < 1 || self::$workerNum > 9) {
exit("worker数设置不正确\r\n");
}
}
//开启主进程
private static function runMaster() {
//确保进程有最大操作权限
umask(0);
if (!empty($GLOBALS['argv'][2]) && $GLOBALS['argv'][2] == '-d') {
$pid = pcntl_fork();
if ($pid > 0) {
exit();
}
}
self::$masterPid = getmypid();
foreach (self::$queueList as $key => $queue) {
for ($i = 1; $i <= $queue['worker_num']; $i++) {
self::runWorker($key, $i);
}
}
cli_set_process_title(self::getMasterProcessName());
self::setServerProcessOwner();
}
//开启子进程
private static function runWorker($queue_name, $worker_id) {
umask(0);
$pid = pcntl_fork();
if ($pid > 0) {//父进程执行空间
self::$childProcessList[$pid] = ['worker_id' => $worker_id, 'queue' => $queue_name];
} else if ($pid == 0) {//子进程执行空间
cli_set_process_title(self::getWorkerProcessName($queue_name, $worker_id));
self::setServerProcessOwner();
self::work($queue_name, $worker_id);
} else {
exit("创建子进程失败\r\n");
}
}
//监控worker进程
private static function monitorWorkerProcess() {
while ($pid = pcntl_wait($status)) {
$worker = self::$childProcessList[$pid];
pcntl_signal_dispatch();
if ($pid == -1) {
break;
} else {
unset(self::$childProcessList[$pid]);
self::runWorker($worker['queue'], $worker['worker_id']);
self::writeStatus();
}
}
}
/**
* 业务处理逻辑
*/
private static function work($queue_name, $worker_id) {
while (true) {
try {
#信号分发
pcntl_signal_dispatch();
$redis = getWebSocketRedis(false);
while ($value = $redis->blpop($queue_name, 5 + $worker_id)) {
//self::log($value);
if (!$value || !is_array($value) || empty($value[1])) {
throw new QueueException('消息体格式错误', 1011);
}
$value = json_decode($value[1], true);
if (json_last_error() || !$value || !is_array($value)) {
throw new QueueException('消息体格式错误', 1012);
}
$file = __DIR__ . DIRECTORY_SEPARATOR . 'consumer/' . $value['type'] . '.php';
if (!file_exists($file)) {
throw new QueueException('不存在的消费者类型:' . $value['type'], 1404);
}
require_once $file;
$value['type']::run($value);
$redis->incr(getQueueCountKey('success'));//队列任务成功处理的数
unset($value);
}
} catch (QueueException $exception) {
saveMyRedisQueueFailData(isset($value['type']) ? $value['type'] : '', 'fail', '队列处理失败:' . $exception->getMessage(), is_array($value) ? json_encode($value) : $value);
$redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
self::log('队列执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
} catch (ActionException $exception) {
saveMyRedisQueueFailData(isset($value['type']) ? $value['type'] : '', 'fail', 'action 异常:' . $exception->getMessage(), is_array($value) ? json_encode($value) : $value);
$redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
self::log('执行发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
} catch (Exception $exception) {
saveMyRedisQueueFailData(isset($value['type']) ? $value['type'] : '', 'fail', '未知异常:' . $exception->getMessage(), is_array($value) ? json_encode($value) : $value);
$redis->incr(getQueueCountKey('fail'));//队列任务处理失败的数
self::log('发生错误:' . $exception->getMessage() . ',错误号:' . $exception->getCode());
}
}
}
private static function getMasterProcessName(): string {
return self::$processName . ': master process';
}
private static function getWorkerProcessName($queue_name, $worker_id): string {
return self::$processName . ': worker process ' . $queue_name . ' ' . $worker_id;
}
/**
* 检验进主程已经存在
* @return bool
*/
private static function masterProcessHasRun(): bool {
return (int)exec('ps -ef|grep \'' . self::getMasterProcessName() . '\'|grep -v grep|wc -l') ? true : false;
}
/**
* 获取当前运行进程的执行用户名
* @return string mixed
*/
private static function getCurrentProcessUser() {
$user_info = posix_getpwuid(posix_getuid());
if (!$user_info) {
exit('获取当前运行进程的用户信息失败' . "\r\n");
}
return $user_info['name'];
}
/**
* 设置当前脚本所属的用户名和用户组
*/
private static function setServerProcessOwner() {
self::$owner = self::$owner ? self::$owner : self::getCurrentProcessUser();
$info = posix_getpwnam(self::$owner);
if (!$info) {
exit('设置进程所属用户的过程中获取用户 ' . self::$owner . ' 的信息失败' . "\r\n");
}
if (!posix_setgid($info['gid'])) {
echo '设置进程所属用户组为 ' . $info['gid'] . ' 失败' . "\r\n";
}
if (!posix_setuid($info['uid'])) {
echo '设置进程所属用户名为 ' . $info['uid'] . ' 失败' . "\r\n";
}
}
/**
* 记录日志
* @param $content
*/
private static function log($content) {
wlog($content, true, 5);
}
/**
* 获取所有进程的内存当前实际使用总量
* @return int
*/
private static function getMemUsedTotal(): int {
$used = 0;
$exec = 'ps -aux|grep \'' . self::getMasterProcessName() . '\'|grep -v \'grep\'|awk \'{print $6}\'';
exec($exec, $output);
$used += $output[0];
foreach (self::$queueList as $key => $queue) {
for ($i = 1; $i <= $queue['worker_num']; $i++) {
$exec = 'ps -aux|grep \'' . self::getWorkerProcessName($key, $i) . '\'|grep -v \'grep\'|awk \'{print $6}\'';
exec($exec, $output);
$used += $output[0];
}
}
return $used;
}
}
微信扫一扫,打赏作者吧~