最新消息:觉得本站不错的话 记得收藏哦 博客内某些功能仅供测试 讨论群:135931704 快养不起小站了 各位有闲钱就打赏下把 My Email weicots#gmail.com Please replace # with @

一个简单的Swoole IM 示例 支持离线 断线从连 已读未读

PHP ajiang-tuzi 89浏览
<?php
/**
 * Created by PhpStorm.
 * User: ajing
 * Site: www.weicot.com
 * Date: 2018/8/13
 * Time: 17:17
 */

namespace Weicot\IM;



class  Server
{

    public $serv;
    public $storage;
    public $swoole_table;
    public $message;
    public $user;


    function __construct()
    {
        $redis = new  \Weicot\Drive\Redis([ // redis 储存器
            "host" => "127.0.0.1",
            "prot" => "6379",
            "prefix" => "user_"
        ]);
        $this->storage = new Storage($redis->redis);
        $this->message = new Message();  // 消息存储
        $this->user = new User();
        $this->user->setStorage($this->storage);
        $this->swoole_table = new \swoole_table(1024);  //行数 //内存表储存器
        $this->swoole_table->column('uid', \swoole_table::TYPE_INT, 8);
        $this->swoole_table->create();
        $this->serv = new \swoole_websocket_server("0.0.0.0", 2018);
        $this->serv->set([ // 设置配置
            'daemonize' => 1,      // 是否是守护进程 1 是
            'max_request' => 10000,    // 最大连接数量
            'dispatch_mode' => 2,
            'debug_mode' => 1,
            'log_file' => './swoole.log',    //日志存储路径
            // 心跳检测的设置,自动踢掉掉线的fd
            'heartbeat_check_interval' => 5, //表示每60秒,遍历所有连接,如果该连接在60秒内,没有向服务器发送任何数据,此连接将被强制关闭
            'heartbeat_idle_time' => 600,
        ]);
        $this->serv->on('open', array($this, 'onOpen'));    //启动server时候会触发
        $this->serv->on('message', array($this, 'onMessage'));    //
        $this->serv->on('close', array($this, 'onClose'));    //。
        $this->serv->start();
    }


    /**
     * 打开连接时执行
     * @param swoole_websocket_server $server
     * @param $req
     */
    function onOpen(\swoole_websocket_server $server, $req)
    {  // 连接进来时执行
        echo "server: success with fd{$req->fd}\n";
    }

    /***
     * 消息进来时执行
     * @param swoole_websocket_server $server
     * @param $frame
     */
    function onMessage(\swoole_websocket_server $server, $frame)
    {
        $fd = $frame->fd;
        //  $connectionInfo = $server->connection_info($fd);  //获取连接参数
        $data = $frame->data;
        if (!$this->isClientHeartbeat($data, $server, $fd)) {   //判断是否是心跳包
            $re_msg = json_decode($data, true);  // 将json 解码并放入路由中
            $this->routing($re_msg, $fd, $server);  //路由
        }
    }

    /***
     * 关闭时执行
     * @param swoole_websocket_server $server
     * @param $fd
     */
    function onClose(\swoole_websocket_server $server, $fd)
    {
        if ($this->swoole_table->exist($fd)) {  //如果客户端ID 存在
            $uid = $this->swoole_table->get($fd);
            //  $this->storage->logout($uid["uid"]); //登出这个用户
            $this->swoole_table->del($fd);
            $this->offlineMessage($server, $uid["uid"]);
            echo "close  {$uid["uid"] }  \n";
        }
        echo "client {$fd} closed\n";
    }


    /***
     * 路由
     * @param $re_msg
     * @param $fd
     * @param $server
     */
    function routing($re_msg, $fd, $server)
    {
        $type = $this->getType($re_msg);
        switch ($type) {
            case "login":  //如果是登录
                $msgData = $this->login($re_msg, $fd, $server);
                $server->push($fd, json_encode($msgData));
                if ($msgData["status"]) { //如果登录成功则执行
                    $this->getOfflineMessage($server, $fd, $msgData["uid"]);
                }
                break;
            case  "reconnection": //断线从连
                $msgData = $this->reconnection($server, $re_msg, $fd);
                if ($msgData["status"]) {  //成功连接
                    $server->push($fd, json_encode($msgData));
                    //断线重连后获取系统消息
                    $this->getOfflineMessage($server, $fd, $re_msg["uid"]);
                } else {
                    $server->push($fd, json_encode($msgData));
                }
                break;
            case "online_list":  //在线列表
                $msgData = json_encode($this->getOnlineUsers($re_msg, $fd));
                $server->push($fd, $msgData);
                break;
            case "friends_list":
                $msgData = json_encode($this->getFriendsLists($re_msg, $fd));
                $server->push($fd, $msgData);
                break;
            case "message": //消息处理
                $this->message($re_msg, $fd, $server);
                break;
            case "tmp_message": //临时消息处理
                $this->message($re_msg, $fd, $server, true);
                break;
            case "del_session": // 删除会话
                $msgData = $this->delSession($re_msg, $fd);
                $server->push($fd, json_encode($msgData));
                break;
            case "sign_out": // 登出
                $this->signOut($re_msg, $fd, $server);
                break;
            case "error": // 删除会话
                $server->push($fd, json_encode($re_msg));
                break;
            default:
                $server->push($fd, json_encode($re_msg));
        }
    }

    /**
     * 获取消息类型
     * @param $msg
     */
    function getType($msg)
    {
        if (isset($msg["type"])) {
            return $msg["type"];
        }
        return "error";
    }


    /***
     * 上线后发送离线消息
     * @param $server
     * @param $fd
     * @param $uid
     */
    function getOfflineMessage($server, $fd, $uid)
    {
        $data = $this->message->getOfflineMessage($uid);
        if (!empty($data)) {
            $msgId = [];
            foreach ($data as $value) {
                $msgId[] = $value["id"];
                $msg = [
                    "omesg_id" => $value["id"],
                    "uid" => $value["uid"],
                    "to_uid" => $value["to_uid"],
                    "type" => $value["type"],
                    "send_time" => $value["send_time"],
                    "content" => $value["content"],
                ];
                $server->push($fd, json_encode($msg));
            }
            $this->message->upOfflineMessageStatus($msgId);
        };
    }

    /***
     * 登出
     * @param $re_msg
     * @param $fd
     * @param $server
     */
    function signOut($re_msg, $fd, $server)
    {
        $existsIsLogin = $this->storage->exists($re_msg["uid"]);
        if ($existsIsLogin) {
            if ($this->swoole_table->exist($existsIsLogin["fd"])) {  //如果客户端ID 存在
                $this->storage->logout($existsIsLogin["uid"]); //登出这个用户
                $this->swoole_table->del($existsIsLogin["fd"]);
                if ($server->exist($existsIsLogin["fd"])) {
                    $server->send($existsIsLogin["fd"], json_encode([
                        "type" => "sign_out",
                        "uid" => $existsIsLogin["uid"],
                        "fd" => $existsIsLogin["fd"],
                        "msg" => "the user has sign out ",
                        "status" => true
                    ]));
                    $server->close($existsIsLogin["fd"]);
                }
                echo "logout  {$existsIsLogin["uid"] }  \n";
            }
            echo "client {$fd} closed\n";
        }
    }


    /***
     * 判断是否是客户端的心跳包
     * @param $data
     * @param $server
     * @param $fd
     * @return bool
     */
    function isClientHeartbeat($data, $server, $fd)
    {
        if ($data == '{"uid":"hi"}') {
            $server->push($fd, '{"uid":"hi"}');  // 返回数据
            return true;
        };
        return false;
    }

    /***
     * 登录逻辑
     * {
     * "uid":"4155",  用户id
     * "type":"login", 类型
     * "auth_key":"jkafjkdsagg", 授权key
     * "time":1530599947267
     * }
     */
    function login($re_msg, $fd, $server)
    {
        if (!(isset($re_msg["user_key"]) && isset($re_msg["uid"]))) {  //判断key 是否纯在
            return $this->loginMsg("login", 00, $fd, false, [
                "msg" => "no user key"
            ]);
        }
        $this->kickOut($re_msg, $fd, $server);  // 如果这个用户在其他地方登陆 则踢出去
        //抢登
        $data = $this->user->getUserData($re_msg["user_key"]);
        if ($data) {  //判断用户是否存在
            $data["fd"] = $fd;  //链接句柄
            $data["session_key"] = md5(time() . "_" . rand(1, 9));
            $addLogin = $this->storage->login($data["uid"], $data);
            if ($addLogin) {//将登录状态添加进  swoole table
                $this->swoole_table->set($fd, ["uid" => $data["uid"]]);
                $this->onlineMessage($server, $data["uid"]);  //将上线消息群发给所有好友
                return $this->loginMsg("login", $data["uid"], $fd, true, [
                    "session_key" => $data["session_key"],
                ]);
            };
            return $this->loginMsg("login", $data["uid"], $fd, false, [
                "msg" => "is login",  //已经等录
            ]);
        }
        return $this->loginMsg("login", $data["uid"], $fd, false, [
            "msg" => "user not found"  //用户不存在
        ]);
    }


    /***
     * 如果用户在其他地方登陆 则登出这个用户
     * @param $re_msg
     * @param $fd
     * @param $server
     */

    function kickOut($re_msg, $fd, $server)
    {
        $existsIsLogin = $this->storage->exists($re_msg["uid"]);
        if ($existsIsLogin) {
            if ($this->swoole_table->exist($existsIsLogin["fd"])) {  //如果客户端ID 存在
                $this->storage->logout($existsIsLogin["uid"]); //登出这个用户
                $this->swoole_table->del($existsIsLogin["fd"]);
                if ($server->exist($existsIsLogin["fd"])) {
                    $server->send($existsIsLogin["fd"], json_encode([
                        "type" => "kick_out",
                        "uid" => $existsIsLogin["uid"],
                        "fd" => $existsIsLogin["fd"],
                        "msg" => "the user has logged in elsewhere",
                        "status" => true
                    ]));
                    $server->close($existsIsLogin["fd"]);
                }
                echo "logout  {$existsIsLogin["uid"] }  \n";
            }
            echo "client {$fd} closed\n";
        }
    }


    /***
     * 断线从连
     * @param $re_msg
     * @param $fd
     */
    function reconnection($server, $re_msg, $fd)
    {
        if (isset($re_msg["session_key"]) && isset($re_msg["uid"])) {
            $data = $this->storage->getUser($re_msg["uid"]);
            if (isset($data)) {
                if ($data["session_key"] == $re_msg["session_key"]) {
                    $this->swoole_table->set($fd, ["uid" => $data["uid"]]);
                    $data["fd"] = $fd;  //刷新链接句柄
                    $this->storage->login($re_msg["uid"], $data);
                    return $this->reconnectionMsg($re_msg, $fd, 1001, "reconnection is ok", true);
                }
                return $this->reconnectionMsg($re_msg, $fd, 1002, "the user has logged in elsewhere", false);
            }
            return $this->reconnectionMsg($re_msg, $fd, 1003, " user has logged out", false);
        }
        return $this->reconnectionMsg($re_msg, $fd, 1004, " the parameter is invalid", false);
    }

    /***
     * @param $re_msg
     * @param $fd
     * @param $code
     * @param $msg
     * @param $status
     * @return array
     */

    function reconnectionMsg($re_msg, $fd, $code, $msg, $status)
    {
        return [
            "type" => "reconnection",
            "uid" => $re_msg["uid"],
            "fd" => $fd,
            "code" => $code,
            "msg" => $msg,
            "status" => $status
        ];
    }

    /**
     * 获得在线用户
     * @param $re_msg
     * @param $fd
     * @return array
     */
    function getOnlineUsers($re_msg, $fd)
    {
        $OnlineUsers = $this->storage->getOnlineUsers();
        if (empty($OnlineUsers)) {
            return [
                "type" => "online_list",
                "uid" => $re_msg["uid"],
                "fd" => $fd,
                "list" => false,
                "status" => false
            ];
        } else {
            return [
                "type" => "online_list",
                "uid" => $re_msg["uid"],
                "fd" => $fd,
                "list" => $this->user->getAllOnlineUsersData($OnlineUsers),
                "status" => true
            ];
        }

    }

    /**
     * 获得我的用户列表
     * @param $re_msg
     * @param $fd
     * @return array
     */
    function getFriendsLists($re_msg, $fd)
    {
        $friendsList = $this->getFriendsList($re_msg["uid"]);
        if (!empty($friendsList)) {
            return [
                "type" => "friends_list",
                "uid" => $re_msg["uid"],
                "fd" => $fd,
                "list" => $friendsList,
                "status" => true
            ];
        }
        return [
            "type" => "friends_list",
            "uid" => $re_msg["uid"],
            "fd" => $fd,
            "list" => false,
            "status" => false
        ];


    }


    /**
     * 转发消息
     * @param $re_msg
     * @param $fd
     * @param $server
     */
    function message($re_msg, $fd, $server, $isTmpMessage = false)
    {
        if ($re_msg['to_uid'] == "") {
            $server->push($fd, json_encode($re_msg));
        } else {
            $uidData = $this->storage->getUser($re_msg['to_uid']);   //获取所要发送的  用户  fd
            if ($server->exist($uidData["fd"])) {  // 判断是否已断开通客户端的链接或者push失败 https://group.swoole.com/question/106856
                $this->message->save($re_msg, true);  //保存消息到记录
                $message = [
                    "time" => time(),
                    "type" => "message",
                    "uid" => $re_msg["uid"],
                    "to_uid" => $re_msg['to_uid'],
                    "content" => $re_msg['content'],
                    "status" => true
                ];
                if ($isTmpMessage) {
                    // 互相加好友
                    if (!($re_msg["uid"] == $re_msg['to_uid'])) { // 防止自己加自己好友
                        $this->user->addSession($re_msg["uid"], $re_msg['to_uid']);//如果是临时会话
                        $this->user->addSession($re_msg["to_uid"], $re_msg['uid']);//如果是临时会话
                    }
                    $myData = $this->storage->getUser($re_msg['uid']);   //获取所要发送的  用户  fd
                    $message["uid_data"] = $myData;
                    $message["type"] = "tmp_message";
                    //  $message["user_data"] = $this->getUserWithUid($re_msg["uid"]);
                }
                $server->push($uidData["fd"], json_encode($message));
            } else {  //用户离线
                $this->message->save($re_msg, false);
                //将离线消息发送给自己
                $server->push($fd, json_encode(
                    [
                        "type" => "offline_message",
                        "offline_uid" => $re_msg['to_uid'],
                        "content" => "用户已经下线",
                        "status" => true
                    ]
                ));
            };
        }
    }


    /***
     * @param $type
     * @param $uid
     * @param $fb
     * @param $status
     * @param array $data
     * @return array
     */
    function loginMsg($type, $uid, $fb, $status, $data = [])
    {
        return [
            "type" => $type,
            "uid" => $uid,
            "fd" => $fb,
            "status" => $status,
            "data" => $data,
        ];
    }

    /***
     * 发送消息
     * @param $type
     * @param $uid
     * @param $to_uid
     * @param $content
     * @param $status
     * @return array
     */

    function sendMsg($type, $uid, $to_uid, $content, $status)
    {
        return [
            "type" => $type,
            "uid" => $uid,
            "to_uid" => $to_uid,
            "content" => $content,
            "status" => $status
        ];
    }


    /**
     * 通过UID  获取用户数据
     * @param $uid
     * @return array
     */
    function getUserWithUid($uid)
    {
        return [
            "uid" => $uid,
            "name" => $uid,
            "avatar" => "https://b-ssl.duitang.com/uploads/item/201605/05/20160505145557_dtYHf.thumb.700_0.jpeg",
            "online" => $this->storage->exists($uid),
        ];
    }

    /**
     * 获得我的朋友列表
     * @param $userId
     */
    function getFriendsList($uid)
    {
        return $this->user->getSessionData($uid);
    }

    /**
     * 给所有好友发消息
     * @param $server
     * @param $uid 用户ID
     * @param $msgData
     * @param bool $online
     */
    function sendFullFriendsMessage($server, $uid, $msgData, $online = true)
    {
        $friendsList = $this->getFriendsList($uid);
        if (!empty($friendsList)) {
            if ($online) {  //只给在线的群发
                foreach ($friendsList as $friend) {
                    if ($friend["online"]) { // 只给在线的群发
                        $uidData = $this->storage->getUser($friend['uid']);   //获取所要发送的  用户  fd
                        if ($server->exist($uidData["fd"])) {  // 判断是否已断开通客户端的链接或者push失败 https://group.swoole.com/question/106856
                            $server->push($uidData["fd"], json_encode($msgData));
                        } else {
                            $this->storage->logout($friend['uid']); // 如果朋友不在线则登出它
                        }
                    }
                }
                return;
            } else {
// 给所有在线的群发
                foreach ($friendsList as $friend) {
                    $uidData = $this->storage->getUser($friend['uid']);   //获取所要发送的  用户  fd
                    $server->push($uidData["fd"], json_encode($msgData));
                }
            }
        }
    }


    /***
     * 将离线消息群发给好友
     * @param $server
     * @param $uid
     */
    function offlineMessage($server, $uid)
    {
        $this->sendFullFriendsMessage($server, $uid, [
            "type" => "offline_message",
            "offline_uid" => $uid,
            "content" => "用户已经下线",
            "status" => true
        ]);
    }

    /***
     * 群发上线消息
     * @param $server
     * @param $uid
     */
    function onlineMessage($server, $uid)
    {
        $this->sendFullFriendsMessage($server, $uid, [
            "type" => "online_message",
            "offline_uid" => $uid,
            "content" => "用户已经上线",
            "status" => true
        ]);
    }


    //删除会话
    function delSession($re_msg, $fd)
    {
        if (isset($re_msg['del_uid'])) {
            $uid = $re_msg['uid'];
            $del_uid = $re_msg['del_uid'];
            if ($this->user->delSessionWithUid($del_uid, $uid)) {
                return [
                    "type" => "del_session",
                    "uid" => $re_msg["uid"],
                    "fd" => $fd,
                    "del_uid" => $re_msg["del_uid"],
                    "status" => true
                ];
            };
        }
        return [
            "type" => "del_session",
            "uid" => $re_msg["uid"],
            "fd" => $fd,
            "del_uid" => $re_msg["del_uid"],
            "status" => false
        ];

    }

}

转载请注明:(●--●) Hello.My Weicot » 一个简单的Swoole IM 示例 支持离线 断线从连 已读未读