如何在php后端及时推送消息给客户端

eriodesign

walkor大神,目前需求是这样的:

有一群商家在后台网页处理批量导入产品 -》 服务器接受请求 -》 开始foreach一个一个处理导入请求;

我现在想每成功导入一个就推送到前台显示已经导入成功,直到全部导入自动结束推送。

看了聊天室代码,消息推送都是靠前端js+event.php,我想直接在php里面不需要onMessage触发.

我从下午看到现在文档,也看了很多问答,依然非常糊涂,不奢望给整段代码,但是希望walkor大神给点思路。

93932 41 20
41个回答

walkor 打赏

后端代码
push.php

<?php
use Workerman\Worker;
require_once './Workerman/Autoloader.php';
// 初始化一个worker容器,监听1234端口
global $worker;
$worker = new Worker('websocket://0.0.0.0:1234');
// 这里进程数必须设置为1
$worker->count = 1;
// worker进程启动后建立一个内部通讯端口
$worker->onWorkerStart = function($worker)
{
    // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    $inner_text_worker = new Worker('Text://0.0.0.0:5678');
    $inner_text_worker->onMessage = function($connection, $buffer)
    {
        global $worker;
        // $data数组格式,里面有uid,表示向那个uid的页面推送数据
        $data = json_decode($buffer, true);
        $uid = $data['uid'];
        // 通过workerman,向uid的页面推送数据
        $ret = sendMessageByUid($uid, $data['percent']);
        // 返回推送结果
        $connection->send($ret ? 'ok' : "uid $uid not online");
    };
    $inner_text_worker->listen();
};
// 新增加一个属性,用来保存uid到connection的映射
$worker->uidConnections = array();
// 当有客户端发来消息时执行的回调函数
$worker->onMessage = function($connection, $data)use($worker)
{
    // 判断当前客户端是否已经验证,既是否设置了uid
    if(!isset($connection->uid))
    {
        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
        $connection->uid = $data;
        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
         * 实现针对特定uid推送数据
         */
        $worker->uidConnections[$connection->uid] = $connection;
        return;
    }
};
// 当有客户端连接断开时
$worker->onClose = function($connection)use($worker)
{
    global $worker;
    if(isset($connection->uid))
    {
        // 连接断开时删除映射
        unset($worker->uidConnections[$connection->uid]);
    }
};
// 向所有验证的用户推送数据
function broadcast($message)
{
    global $worker;
    foreach($worker->uidConnections as $connection)
    {
        $connection->send($message);
    }
}
// 针对uid推送数据
function sendMessageByUid($uid, $message)
{
    global $worker;
    if(isset($worker->uidConnections[$uid]))
    {
        $connection = $worker->uidConnections[$uid];
        $connection->send($message);
        return true;
    }
    return false;
}
// 运行所有的worker(其实当前只定义了一个)
Worker::runAll();

启动后端服务
php push.php start -d

前端接收推送的js代码

var ws = new WebSocket('ws://127.0.0.1:1234');
ws.onopen = function(){
    var uid = 'uid1';
    ws.send(uid);
};
ws.onmessage = function(e){
    alert(e.data);
};

后端推送消息的代码

// 建立socket连接到内部推送端口
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// 推送的数据,包含uid字段,表示是给这个uid推送
$data = array('uid'=>'uid1', 'percent'=>'88%');
// 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
fwrite($client, json_encode($data)."\n");
// 读取推送结果
echo fread($client, 8192);

这里的uid不一定是用户的id,也可以理解为任务id即 taskid

记得开放1234 5678 两个端口的防火墙。如果是云服务器,还要开放这两个端口的安全组。

以上代码亲测可以直接使用

  • 人世几回伤往事 2019-11-14

    @1 群主经测试您给的后端代码前端js,workerman推送代码只对一个页面有效,并不是对所有打开的页面有效,我打算对所有页面有效,我该怎么做呢?或者我如何设置成可以向所有打开页面的uid推送。希望群主给点思路呀。

  • walkor 2019-11-14

    所有页面uid传同一个

  • 人世几回伤往事 2019-11-14

    @1 群主push.php里面有broadcast方法向所有页面推送,但是我前端js中怎样把所有页面uid传同一个呢?如果所有页面uid传同一个的话那后端php代码是不是不用向push.php发送指定的uid了呢?

  • 人世几回伤往事 2019-11-14

    @1 群主我补充一下我的应用场景呀 我是在不同浏览器,或者不同的电脑下相同或不同的浏览器打开同一个页面(网址一样)让他们都能推送,我已经设置了定时刷新。我希望每个用户打开这个界面都能定时看到推送。

  • 陆路 2019-12-15

    @2932:我也遇到同样的问题,您怎么解决的?如果大家看到,麻烦也帮忙解答一下,着急,在线等

  • 凌笑然 2020-03-04

    @6607:Linux不能用应该是防火墙没设置这个端口导致的,在防火墙加这条规则,然后重启防火墙就可以了:-A INPUT -p tcp -m tcp --dport 1234 -j ACCEPT,我刚好遇到这个问题,这样解决的

walkor 打赏

哈啰 !push.php line 4: $worker->count = 1;为什么只能设置一个进程啊

假如:
客户端1连接进程A
客户端2连接进程B

客户端2无法直接通过进程B给客户端1发送数据,因为客户端1属于进程A不属于进程B,B进程控制不到客户端1(要想两个进程之间通讯需要一些进程间通讯手段,可以使用http://doc3.workerman.net/component/channel.html)。
所以所有客户端都只能连接同一个进程才能直接互相通讯,为了避免客户端连到不同进程,count设置为1。

  • jackie 2019-01-24

    channel文档中说在php-fpm无法使用,那么php-fpm下的多进程如何实现呢?

ivan

写得太好了!

  • 我很恨你 2016-06-06

    问一下前段js和后端推送消息的代码写在哪里呢

  • ivan 2016-06-07

    @1080::前端js是用来连ws的,在你需要接收数据的地方写上,后端推送的可以用老大的代码用php写 也可以随便找tcp客户端来实现。这两个可以是独立的跟workerman本身关系不大

workercat

我想问,workman 怎么与 PHP 脚本进行数据传输呢?

  • 暂无评论
walkor 打赏

@workercat 新问题请新建帖子。把问题描述清楚,不要问“一句话”问题,尤其不要问“XXXX怎么做“这种,这种太笼统,没法回。不同场景有不同的做法。

  • 暂无评论
workercat

执行后端推送代码时,出现 unable to connect, connect refused. 请问这是什么原因造成的尼?我已经更换了多个端口进行测试,依然是同样的提示。还有什么测试方法和手段来找出原因嘛?

后端推送消息的代码
// 建立socket连接到内部推送端口
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1, STREAM_CLIENT_CONNECT|STREAM_CLIENT_PERSISTENT);
// 推送的数据,包含uid字段,表示是给这个uid推送
$data = array('uid'=>'uid1', 'percent'=>'88%');
// 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
fwrite($client, json_encode($data)."\n");
// 读取推送结果
echo fread($client, 8192);

  • 暂无评论
workercat

workerman 代码是完全与上面的一致。

  • 暂无评论
walkor 打赏

发生这个问题原因有
1.服务端没启动
2.端口错误
3.客户端IP错了
4.防火墙挡住了

  • 暂无评论
workercat

首先,我先建一个 ump worker , 再在 work 进程启动后建立一个内部通讯端口。然后,我再写 php socket 脚本来反问内部通讯端口,提示 connect refused。防火墙是否阻挡,端口是否被占用,服务是否开启,客户端链接的 ip 都这几个问题都确认没有问题。

结果,却是一直提示 connection refused.

use Workerman\Worker;
    include './Workerman/Autoloader.php';

    $worker = new Worker("udp://192.168.50.190:8800");
    //var_dump($worker);
    $worker->count = 1;

    $worker->onWorkerStart = function ($worker)
    {
        // 用于 Laravel 与 Workerman 的内部通讯
        $inner_text_worker = new Worker ("text://192.168.50.190:5678");

        // 接收到 Laravel 请求信息,就向设备发起 UDP 请求
        $inner_text_worker->onMessage = function ($connection, $arr_data)
        {

            $connection->send('success udp');
        };
    };

    //  接收 UDP 请求,如:心跳
    $worker->onMessage  = function ($connection, $data) 
    {
        echo $data . "<br/>";
        $client_ip = $connection->getRemoteIp();
        $client_port = $connection->getRemotePort();
        $connection->send(strrev($data));
    };

}}

// 开启内部通讯端口打印输出的对象:object(Workerman\Worker)#6 (24) {
  =>
  int(0)
  =>
  string(4) "none"
  =>
  int(1)
  =>
  string(0) ""
  =>
  string(0) ""
  =>
  bool(true)
  =>
  bool(false)
  =>
  NULL
  =>
  NULL
  =>
  object(Closure)#7 (1) {
    =>
    _RECURSION_
  }
  =>
  NULL
  =>
  NULL
  =>
  NULL
  =>
  NULL
  =>
  NULL
  =>
  NULL
  =>
  string(3) "tcp"
  =>
  array(0) {
  }
  =>
  string(0) ""
  =>
  string(37) "/Library/WebServer/Documents/camerawk"
  =>
  NULL
  =>
  string(26) "text://192.168.50.190:5678"
  =>
  resource(17) of type (stream-context)
  =>
  string(32) "000000000f4c6bca0000000043696ce7"
}

// php socket 脚本对内部端口通讯发起请求,报错:stream_socket_client(): unable to connect to tcp://192.168.50.190:5678 (Connection refused)
  • 暂无评论
walkor 打赏

少了一句$inner_text_worker->listen();
归根到底还是对应的端口服务没启动。

workercat

问题:如果我 new 的 worker 是监听是的 http 通讯 8080端口,当有接收到信息时,那么 workerman 的onMessage 方法执行 $connection->send("message") 。

我的疑问时 $connection->send("message") 发送的信息,会以什么端口,从服务器发出去呢?

$worker->onMessage = function($connection, $data) use ($worker)
{
$connection->send("返回信息");
};

  • 暂无评论
walkor 打赏

@workercat 新问题开新的帖子吧。

  • 暂无评论
夏虫

上面的没看得太明白,push.php是运行在workman里面的 后端推送消息代码是运行在web后台的对吗,通过后端推送消息代码调用push.php.是这个思路吗

  • 暂无评论
walkor 打赏

@夏虫 对

  • 暂无评论
夏虫

要实现我这种模式 问号部分该用什么方法实现呢

  • jackie 2019-01-24

    请问下你的问题解决了吗?

  • adminppper 2019-12-31

    @5344:这个我很会玩,多进程服务器,你可以加我570600495

  • lloyou00 2020-03-04

    // 建立socket连接到内部推送端口
    $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    // 推送的数据,包含uid字段,表示是给这个uid推送
    $data = array('uid'=>'uid1', 'percent'=>'88%');
    // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    fwrite($client, json_encode($data)."\n");
    // 读取推送结果
    echo fread($client, 8192);

    这里不是写得很清楚嘛?

    workerman 的文档里面也有
    http://doc3.workerman.net/315240

    推荐使用 GatewayWorker 的方式

    本质就是进程间通讯而已

小V

$inner_text_worker = new Worker('Text://0.0.0.0:5678');
第一次运行没有问题,但第二次运行

fwrite($client, json_encode($data)."\n");
这里会报错,换一个端口运行,又可以了,这是为什么呢?

  • walkor 2016-11-03

    把STREAM_CLIENT_PERSISTENT选项去掉就好了

  • walkor 2016-11-03

    改成 $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);

  • 小V 2016-11-04

    可以了,点个赞

httpp886
use Yii;
use yii\console\Controller;
use \Workerman\Worker;
require_once 'vendor/autoload.php';

class WorkerController extends Controller {
    public function actionPush(){
        // 初始化一个worker容器,监听1234端口
        require_once dirname(Yii::$app->basePath).'/vendor/workerman/workerman/Autoloader.php';
        $worker = new Worker('websocket://127.0.0.1:1234');

        // 这里进程数必须设置为1
        $worker->count = 1;
        // worker进程启动后建立一个内部通讯端口
        $worker->onWorkerStart = function($worker)
        {
            // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
            $inner_text_worker = new Worker('http://127.0.0.1:5678');
            $inner_text_worker->onMessage = function($connection, $buffer)
            {
                global $worker;
                // $data数组格式,里面有uid,表示向那个uid的页面推送数据
                $data = json_decode($buffer, true);
                $uid = $data;
                // 通过workerman,向uid的页面推送数据
                $ret = $this->sendMessageByUid($uid, $buffer);
                // 返回推送结果
                $connection->send($ret ? 'ok' : 'fail');
            };
            $inner_text_worker->listen();
        };
        // 新增加一个属性,用来保存uid到connection的映射
        $worker->uidConnections = array();
        // 当有客户端发来消息时执行的回调函数
        $worker->onMessage = function($connection, $data)use($worker)
        {
            // 判断当前客户端是否已经验证,既是否设置了uid
            if(!isset($connection->uid))
            {
                // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
                $connection->uid = $data;
                /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
                 * 实现针对特定uid推送数据
                 */
                $worker->uidConnections = $connection;
                return;
            }
        };

        // 当有客户端连接断开时
        $worker->onClose = function($connection)use($worker)
        {
            global $worker;
            if(isset($connection->uid))
            {
                // 连接断开时删除映射
                unset($worker->uidConnections);
            }
        };
        // 运行所有的worker(其实当前只定义了一个)
        Worker::runAll();
    }

    // 向所有验证的用户推送数据
    function broadcast($message)
    {
        global $worker;
        foreach($worker->uidConnections as $connection)
        {
            $connection->send($message);
        }
    }

    // 针对uid推送数据
    function sendMessageByUid($uid, $message)
    {
        global $worker;
        if(isset($worker->uidConnections))
        {
            $connection = $worker->uidConnections;
            $connection->send($message);
            return true;
        }
        return false;
    }

}

stream_socket_client(): unable to connect to tcp://127.0.0.1:5678 (������ӷ���һ��ʱ���û���ȷ�

  • httpp886 2017-03-14

    老是返回上边的错误,服务器端也运行成功(Press Ctrl-C to quit. Start success.)

  • httpp886 2017-03-14
    public function pushma($msg){
        header("Content-Type: text/html; charset=UTF-8");
        // 建立socket连接到内部推送端口
        $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
        // 推送的数据,包含uid字段,表示是给这个uid推送
        $data = array('uid'=>'uid1', 'percent'=>$msg);
        // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
        fwrite($client, json_encode($data)."\n");
        // 读取推送结果
        echo fread($client, 8192);
    }

    这是调用代码

  • walkor 2017-03-14

    服务没启动成功导致的

  • Victoryship 2017-03-20

    @1:大佬问下,为什么我用例子发到客户端浏览器上。消息显示一下就马上消失了?

  • walkor 2017-03-20

    估计你哪里没弄好,要自己定位下

Victoryship

大佬问下,为什么我用例子发到客户端浏览器上。消息显示一下就马上消失了?

  • 暂无评论
joy

一天中,经常隔一两小时,sendMessageByUid发送的结果是false,过一会儿又自己好了
查看状态没有false
这种情况,需要不需要心跳,本身已是长连接

  • joy 2017-04-01

    几小时后,是连上了,可能是之前一直没连上,回头做个测试

walkor 打赏

如果需要链接长时间维持(大于1分钟)必须加心跳。客户端定时向服务端发送点心跳数据(数据任意,服务端能识别就行),服务端onMessage里判断是心跳忽略即可。

楼主的需求是浏览器里展示进度,预期这个进度会在1分钟内完成,则不用加心跳。

  • 暂无评论
joy

加了心跳,但是
用的是winform接收的,打着断点
第一次有,第二次执行结果打印出来也是成功,win端但没接收到任何消息

  • walkor 2017-03-31

    你可以抓包看链接建立了没,数据推送过来没。可能你的winform写的有问题。
    你可以用上面demo写的js在浏览器里试下。

joy

包括心跳回来的消息也一样
长连接建立后,给子线程A发,第一次正常接收,第二次接收到任务消息了

  • walkor 2017-03-31

    什么子线程?这个demo服务端是单进程单线程的,没有子线程一说。

joy

[attach]574[/attach]
这个断开,是否还要用心跳来判断,还是内部自动会判断
如果没发消息过来了,会不会就自己会断开

  • walkor 2017-03-31

    这个demo心跳作用是用来防止链接由于长时间不活跃被路由节点防火墙关闭。如果你预计这个链接要维持很长时间(超过一分钟),需要客户端定时发一点数据给服务端,用来保持链接活跃。服务端onMessage里判断下如果是心跳消息忽略即可。
    如果链接预计低于一分钟,可以不用发心跳。

  • 寓言 2018-11-20

    @1:walkor大神,正如我昨天提的那个问题,如果后端消息不停推送,在onMessage里面是没法收到消息的,只能等后端消息推送完毕后,才能接受到,可能这时候已经超过了后端设置的心跳时间(比如1分钟),就会在onclose主动关闭客户端连接,然后客户端重新连接,这中间的数据就会丢失,这种情况怎么解决呢?

joy

需要在子线程上加
[attach]575[/attach]
来监测关闭吗?
如何加?

  • walkor 2017-03-31

    这个demo不用加这个逻辑

joy

感觉很不稳定,不知应该怎么来调试
客户端写了心跳,比之前的没心跳的,连接情况更差
消息返回成功的很少,而且即使返回成功,但客户端,还是没有接收到数据

以前一两小时,发生三四次失败,现在是经常失败

  • 暂无评论
walkor 打赏

可能你的客户端还没连上,你就开始从后端push消息了,这个demo里没做消息缓存,如果客户端不存在就直接把消息丢弃了。
自己多打打日志服务端抓抓包定位下吧,从你的描述中无法帮到你。
这个demo是没问题的,很多人在用了。

  • 暂无评论
joy

心跳处理这样可以吧?不需要到5678端口上吧

[attach]576[/attach]

  • joy 2017-04-01

    运行几小时后,好像又连上了,那心跳是不是可以这样处理,不需要转到mridConnections子连接里去

  • joy 2017-04-01

    因为是异步消息处理,所以还是要用这个长链接,用户手机上传东西,就自动发个消息给我这边的客户端,通知一下,有没有其他更好的处理方式

  • walkor 2017-04-01

    可以这样处理。5678端口是短链接,不用设置心跳

php_zdg

按照demo代码来,php运行一直返回fail,是那个映射问题吗?好像不能找到制定客户端啊

mir_gong

按照demo代码来,php运行一直返回fail.怎么解决~

walkor 打赏

返回fail很明显对应uid没在线

xiaoxiaolu

写的比较好,测试没问题,很好解决了PHP客户端与web socket端之间的数据监听通讯

  • xiaoxiaolu 2018-05-10

    php客户端推送数据给web端容易产生数据丢失

  • 18117303062 2018-07-28

    我测试的时候也发现这个情况,但原因是因为时间长了和服务器断开连接了,这样父进程weibsocket里原来的uid1已经删除了,这个时候php客户端发送数据,数据其实服务端的子进程接收到了,但由于没有找到uid,所以php客户端没有收到回复。加上心跳,代码逻辑上再优化些应该可以解决。

dxyzz

大佬们,
能不能问一下你这个问题的第三部分:后端推送消息的代码

这一部分代码是应该写在哪里哦?跟后端代码写一个类里面么?新手求指点。。。

  • 暂无评论
hehe
  • 暂无评论
18117303062

写得很好,学习了。我是新手,能发表下学习体会吗?

  • 暂无评论
18117303062

总体上该方案非常优秀,框架大概二个部分,第一个是websocket作为服务端接收和发送消息的父进程,不妨称之为fatherWorker,比较巧妙的是在该对象里嵌套了一个TEXT协议的worker,不妨叫他childWorker,这个功能用一个PHP页面就能解决。第二个是客户端,该客户端可以用本地建立一个简单网站实现,用来登录界面操作,里面两个页面,第一个页面是用来和服务端的父进程websocket建立连接,并发送UID的,这个页面可以是普通的html页面。第二个页面是用来和服务端的子进程TEXT协议建立连接的,其作用是发送UID编号和数据,这里要注意的是两个页面的UID号必须一致,否则发送失败。
这样,三个页面,实现了WEB服务、SOCKET服务和TEXT协议服务三者联动的效果,构思巧妙,非常优秀!

  • 暂无评论
18117303062

代码我依样画葫芦测试过了,没有问题,感谢分享,我刚学习workerman,说些自己的理解,不当之处大家指正。

  • 暂无评论
wegl

workerman新手,根据自己入门经历的疑惑,讲一下表面的小白使用问题,也是新手小白容易产生的疑惑:
可以确定,下载workerman最新版,不修改示例代码,示例代码是百分百可以在linux和windows都可以正常运行的。
windows中出现的问题:
(1)、执行顺序!一定不能错:第一步,php push.php,第二步,浏览器的页面上运行执行js ,第三步,执行stream_socket_client的5678端口的那段代码

(2)、windows本机测试中,第二步的js,可以在任何一个浏览器的console中执行,但是要注意,如果是打算把js放进自己写的html页面,是无法用本地windows的apache服务访问页面的,原因应该就是windows下php只能用一个进程,而php进程已经被第一步占用。同样,第三步,也不能用windows本机的apache请求页面或者php命令行执行php,可以通过telnet 127.0.0.1:5678的方式,输入:{"uid":"uid1","percent":"2%"},浏览器js就可以收到了

(3)、这个代码是可以调试的(好像是废话)。可能对于小白而言,因为对workerman陌生一时忘记了怎么调试。其实和普通的php开发一样,可以直接echo/var_dump打印输出,也可以记录到文件里。
telnet 127.0.0.1 5678 的命令下,也可以调试5678接收第三步消息的信息:push.php代码里,在$connection->send($ret ? 'ok' : 'fail');前,加上自己要调试的内容send即可,如:$connection->send($buffer);
   !! push.php调试代码修改后,一定要重新启动第一步(linux下restart/reload;windows下退出进程,重新执行),才能生效
    linux下执行php push.php start 后面不加-d,不让后台运行,可以查看打印出的调试输出

(4)、一旦出现问题,检查步骤:
      1、telnet 127.0.0.1 1234
           telnet 127.0.0.1 5678
         如果不通,端口是否开启,linux下检查iptables
      2、如果是第三步返回fail,一般是uid用户连接断开了,只需要浏览器重新执行一下第二步js,在重新执行第三步

  • 暂无评论
json_decode

向端口推送数据的时候,返回的一致是false;

$client = stream_socket_client('tcp://127.0.0.1:2345',$error,$errmsg,1);
var_dump($client);
//推送的数据,包含的uid字段,表示给这个用户uid推送
$data = array('uid'=>'uid1', 'percent'=>'88%');
fwrite($client, json_encode($data)."\n");
echo fread($client, 8192);

  • xiuwang 2020-06-17

    这是返回fail,不是false。返回fail应该是对应的uid没在线,没有对应的websocket连接

yangyu

大佬呀,为啥向端口推送数据的时候,返回的一致是false呀
$client = stream_socket_client('tcp://127.0.0.1:2345',$error,$errmsg,1);
var_dump($client);

  • 暂无评论
hfxlyf

大家看看我写的对着没 通过http给客户端推送tcp消息
https://github.com/phpyii/workerman-test/tree/master/test/tcp

  • 暂无评论
workenman

收藏

  • 暂无评论
年代过于久远,无法发表回答
×
🔝