执行效果
增加了以下内容,参照的swoole
/** 异步任务task方法时触发
* @var callable
*/
public $onTask = null;
/** 异步任务进程数 大于0时创建异步进程
* @var int
*/
public $task_worker_num = 0;
/** 异步进程端口 不指定时以当主服务端口+100
* @var int
*/
public $task_port = 0;
/** 异步任务进程
* @var Worker
*/
public $taskWorker = null;
/** 不设置默认使用 onWorkerStart
* @var callable
*/
public $onTaskWorkerStart = null;
/**
* @var int
*/
public $port = null;
/** 异步任务
* @param mixed $data
* @return bool|int 失败false 成功 返回任务进程id
*/
public function task($data){}
这属于业务类,估计官方不会合并进来,主要是保持workerman内核的精简性。
看了文档有如何实现异步任务的,就是觉得麻烦了点,所以就动手集成进去方便使用。
@6873: 刚看了下任务源码的抽象实现,个人感觉你这想法挺好的。//不过为啥你把127.0.0.1给写死了呢,这样分布式就不行了。
这个异步任务就是需要和主服务一起使用的 所以用的127.0.0.1 本来是想用unix来的 结果测试发现只能开启一个进程 不是很明白原理 所以就换成tcp方式了 至于要把异步任务用于分布式的话 就应该参考文档里的异步任务实现方式了
异步任务集成workerman实现
修改 __construct
//这是增加的代码 用于获取port
list($scheme, $address) = explode(':', $this->_socketName, 2);
if ($scheme != 'unix') {
list(, $port) = explode(':', $address);
$this->port = (int)$port;
}
//这是增加的代码 end
}
增加以下属性
/** 异步任务task方法时触发
@var callable
*/
public $onTask = null;
/** 异步任务进程数 大于0时创建异步进程
@var int
*/
public $task_worker_num = 0;
/** 异步进程端口 不指定时以当主服务端口+100
@var int
*/
public $task_port = 0;
/** 异步任务进程
@var Worker
*/
public $taskWorker = null;
/** 不设置默认使用 onWorkerStart
@var callable
*/
public $onTaskWorkerStart = null;
/**
*/
public $port = null;
此段代码放置到 protected static function init() 方法内头部
增加异步执行方法 使用了wokerman自带的异步tcp连接就没法得到task_id了 所以才用了下面直接连接的方式
/** 异步任务
*/
public function task($data){
$fp = stream_socket_client("tcp://127.0.0.1:".$this->task_port, $errno, $errstr, 1);
if (!$fp) {
//echo "$errstr ($errno)",PHP_EOL;
static::log("$errstr ($errno)");
return false;
} else {
$taskId = (int)substr(fread($fp, 10),4);
$send_data = serialize($data);
$len = strlen($send_data)+4;
$send_data = pack('N', $len) . $send_data;
if(!fwrite($fp, $send_data, $len)){
$taskId = false;
}
fclose($fp);
return $taskId;
}
}
以上是直接在Worker.php修改代码
也可以继承Worker单独集成
你这task()方法的实现依然是同步的写法,而workerman自带的AsyncTcpconnection则是异步客户端实现,而且支持多种应用协议,另外onMessage回调里就能拿到task_id;
仅投递任务是同步的 通过返回task_id就可以判断是否投递成功了 如果不需要拿task_id 就可以用AsyncTcpconnection ; AsyncTcpconnection 方式我拿不到task_id的【就是执行任务的worker_id】 因为我要通过task()方法返回这个task_id的
感谢 @ncwsky 的提交。
正如 @blogdaren 所说:
鉴于以下几点,这个提交不打算合并。
1、保持workerman的精简
2、这个提交基本上属于业务逻辑
3、本地task任务有局限性,不好做服务器间的任务投递与集群
4、官网手册有提供更灵活的异步任务做法,可以跨服务器投递,可以做集群,还可以自定义很多细节如指定投递协议。