thinkphp5.1集成gatewayWorker


class Events
{
    /**
     * Controller实例
     * @var \app\http\Controller
     */
    public static  $control ;
    /**
     * 当businessWorker进程启动时触发。每个进程生命周期内都只会触发一次。可以在这里为每一个businessWorker进程做一些全局初始化工作,例如设置定时器,初始化redis等连接等。
     * 注意:$businessworker->onWorkerStart和Event::onWorkerStart不会互相覆盖,如果两个回调都设置则都会运行。
     * 不要在onWorkerStart内执行长时间阻塞或者耗时的操作,这样会导致BusinessWorker无法及时与Gateway建立连接,造成应用异常(SendBufferToWorker fail.
     * The connections between Gateway and BusinessWorker are not ready错误)。
     * @param BusinessWorker $businessWorker
     */
    public static function onWorkerStart(BusinessWorker $businessWorker)
    {
        Events::$control = new Controller();
        echo "WorkerStart\n";
    }

    /**
     * $client_id是服务端自动生成的并且无法自定义。
     * 可以用过Gateway::bindUid($client_id, $uid)把自己系统的id与client_id绑定,绑定后就可以通过Gateway::sendToUid($uid)发送数据,通过Gateway::isUidOnline($uid)用户是否在线了。
     * onConnect事件仅仅代表客户端与gateway完成了TCP三次握手,这时客户端还没有发来任何数据,此时除了通过$_SERVER['REMOTE_ADDR']获得对方ip,没有其他可以鉴别客户端的数据或者信息,所以在onConnect事件里无法确认对方是谁。要想知道对方是谁,需要客户端发送鉴权数据,例如某个token或者用户名密码之类,在onMesssge里做鉴权。
     * 
     * @param string $client_id            
     */
    public static function onConnect($client_id)
    {
        Gateway::sendToCurrentClient("Your client_id is $client_id");
    }
    /**
     * 当客户端连接上gateway完成websocket握手时触发的回调函数。
     * 注意:此回调只有gateway为websocket协议并且gateway没有设置onWebSocketConnect时才有效。
     * var ws = new WebSocket('ws://127.0.0.1:7272/?token=kjxdvjkasfh');
     * @param string $client_id   client_id固定为20个字符的字符串,用来全局标记一个socket连接,每个客户端连接都会被分配一个全局唯一的client_id。
     * @param string $data websocket握手时的http头数据,包含get、server等变量
     */
    public static function onWebSocketConnect($client_id, $data)
    {
        if (isset($data['get']) && isset($data['get']['token'])) {
            $token = $data['get']['token']; // 自己的账号ID
            $userInfo = Users::where('token', $token)->find();
            if ($userInfo) {
                if ($userInfo['frozen'] != 1) {
                    Gateway::sendToCurrentClient(json_encode([
                        'code' => ReturnCode::ERROR_CODE,
                        'action' => 'onError',
                        'msg' => '用户已冻结'
                    ]));
                    Gateway::closeClient($client_id);
                }
            } else {
                Gateway::sendToCurrentClient(json_encode([
                    'code' => ReturnCode::ERROR_CODE,
                    'action' => 'onError',
                    'msg' => '用户已不存在'
                ]));
                Gateway::closeClient($client_id);
            }
            // $id = decrypt($token, $key); // 解密好的ID
            Gateway::bindUid($client_id, $userInfo['user_id']); // 机器ID与用户ID绑定
            if (! isset($_SESSION['uid'])) {
                // 消息类型不是登录视为非法请求,关闭连接
                // if($data['type'] !== 'login')
                // {
                // return Gateway::closeClient($client_id);
                // }
                // 设置session,标记该客户端已经登录
                $_SESSION['uid'] = $userInfo;
            }
            Gateway::sendToCurrentClient(json_encode([
                'code' => ReturnCode::SUCCESS_CODE,
                'action' => 'onLogin',
                'msg' => '登陆成功',
                'data' => $client_id
            ]));
        } else {
            Gateway::closeClient($client_id);
        }
    }
    /**
     * 有消息时触发该方法
     * @param int $client_id 发消息的client_id
     * @param mixed $message 消息
     * @return void
     */
    public static function onMessage($client_id, $data)
    {
        $data = json_decode($data, true);
        $uid = Gateway::getUidByClientId($client_id);
        if (empty($uid)) {
            return Gateway::sendToCurrentClient(json_encode([
                'data' => [],
                'code' => ReturnCode::ERROR_CODE,
                'msg' => '用户未登录',
                'event' => 'onError'
            ]));
        }
        $a = $data['event'];
        if (empty($a) || ! method_exists(Events::$control, $a)) {
            return Gateway::sendToCurrentClient(json_encode([
                'data' => '',
                'code' => ReturnCode::ERROR_CODE,
                'msg' => '事件不存在',
                'event' => 'onError'
            ]));
        }
        return Events::$control->$a($client_id, $data['data']);
    }

    /**
     * 客户端与Gateway进程的连接断开时触发。不管是客户端主动断开还是服务端主动断开,都会触发这个回调。一般在这里做一些数据清理工作。
     * 注意:onClose回调里无法使用Gateway::getSession()来获得当前用户的session数据,但是仍然可以使用$_SESSION变量获得。
     * 注意:onClose回调里无法使用Gateway::getUidByClientId()接口来获得uid,解决办法是在Gateway::bindUid()时记录一个$_SESSION['uid'],onClose的时候用$_SESSION['uid']来获得uid。
     * 注意:断网断电等极端情况可能无法及时触发onClose回调,因为这种情况客户端来不及给服务端发送断开连接的包(fin包),服务端就无法得知连接已经断开。检测这种极端情况需要心跳检测,并且必须设置$gateway->pingNotResponseLimit>0。这种断网断电的极端情况onClose将被延迟触发,延迟时间为小于$gateway->pingInterval*$gateway->pingNotResponseLimit秒,如果$gateway->pingInterval 和 $gateway->pingNotResponseLimit 中任何一个为0,则可能会无限延迟。
     * @param string $client_id
     */
    public static function onClose($client_id)
    {
        
    }
    /**
     * 当businessWorker进程退出时触发。每个进程生命周期内都只会触发一次
     * 可以在这里为每一个businessWorker进程做一些清理工作,例如保存一些重要数据等。
     * 注意:某些情况将不会触发onWorkerStop,例如业务出现致命错误FatalError,或者进程被强行杀死等情况。
     * @param BusinessWorker $businessWorker businessWorker进程实例
     */
    public static function onWorkerStop(BusinessWorker $businessWorker)
    {
        echo "WorkerStop\n";
    }
}

class Controller
{
    public function onLogin($client_id, $data)
    {
        if (! isset($_SESSION['uid'])) {
            return Gateway::sendToCurrentClient(json_encode([
                'data' => [
                    'code' => ReturnCode::ERROR_CODE,
                    'msg' => '事件不能为空'
                ],
                'event' => 'onError'
            ]));
        }
        return Gateway::sendToCurrentClient(json_encode([
            'data' => $_SESSION['uid'],
            'code' => ReturnCode::ERROR_CODE,
            'msg' => '登陆成功',
            'event' => 'onLogin'
        ]));
    }
}



评论0



    0.153593s