<?php
/**
* Created by PhpStorm.
* User: ajing
* Site: www.weicot.com
* Date: 2018/8/13
* Time: 17:17
*/
namespace Weicot\IM;
class Server
{
public $serv;
public $storage;
public $swoole_table;
public $message;
public $user;
function __construct()
{
$redis = new \Weicot\Drive\Redis([ // redis 储存器
"host" => "127.0.0.1",
"prot" => "6379",
"prefix" => "user_"
]);
$this->storage = new Storage($redis->redis);
$this->message = new Message(); // 消息存储
$this->user = new User();
$this->user->setStorage($this->storage);
$this->swoole_table = new \swoole_table(1024); //行数 //内存表储存器
$this->swoole_table->column('uid', \swoole_table::TYPE_INT, 8);
$this->swoole_table->create();
$this->serv = new \swoole_websocket_server("0.0.0.0", 2018);
$this->serv->set([ // 设置配置
'daemonize' => 1, // 是否是守护进程 1 是
'max_request' => 10000, // 最大连接数量
'dispatch_mode' => 2,
'debug_mode' => 1,
'log_file' => './swoole.log', //日志存储路径
// 心跳检测的设置,自动踢掉掉线的fd
'heartbeat_check_interval' => 5, //表示每60秒,遍历所有连接,如果该连接在60秒内,没有向服务器发送任何数据,此连接将被强制关闭
'heartbeat_idle_time' => 600,
]);
$this->serv->on('open', array($this, 'onOpen')); //启动server时候会触发
$this->serv->on('message', array($this, 'onMessage')); //
$this->serv->on('close', array($this, 'onClose')); //。
$this->serv->start();
}
/**
* 打开连接时执行
* @param swoole_websocket_server $server
* @param $req
*/
function onOpen(\swoole_websocket_server $server, $req)
{ // 连接进来时执行
echo "server: success with fd{$req->fd}\n";
}
/***
* 消息进来时执行
* @param swoole_websocket_server $server
* @param $frame
*/
function onMessage(\swoole_websocket_server $server, $frame)
{
$fd = $frame->fd;
// $connectionInfo = $server->connection_info($fd); //获取连接参数
$data = $frame->data;
if (!$this->isClientHeartbeat($data, $server, $fd)) { //判断是否是心跳包
$re_msg = json_decode($data, true); // 将json 解码并放入路由中
$this->routing($re_msg, $fd, $server); //路由
}
}
/***
* 关闭时执行
* @param swoole_websocket_server $server
* @param $fd
*/
function onClose(\swoole_websocket_server $server, $fd)
{
if ($this->swoole_table->exist($fd)) { //如果客户端ID 存在
$uid = $this->swoole_table->get($fd);
// $this->storage->logout($uid["uid"]); //登出这个用户
$this->swoole_table->del($fd);
$this->offlineMessage($server, $uid["uid"]);
echo "close {$uid["uid"] } \n";
}
echo "client {$fd} closed\n";
}
/***
* 路由
* @param $re_msg
* @param $fd
* @param $server
*/
function routing($re_msg, $fd, $server)
{
$type = $this->getType($re_msg);
switch ($type) {
case "login": //如果是登录
$msgData = $this->login($re_msg, $fd, $server);
$server->push($fd, json_encode($msgData));
if ($msgData["status"]) { //如果登录成功则执行
$this->getOfflineMessage($server, $fd, $msgData["uid"]);
}
break;
case "reconnection": //断线从连
$msgData = $this->reconnection($server, $re_msg, $fd);
if ($msgData["status"]) { //成功连接
$server->push($fd, json_encode($msgData));
//断线重连后获取系统消息
$this->getOfflineMessage($server, $fd, $re_msg["uid"]);
} else {
$server->push($fd, json_encode($msgData));
}
break;
case "online_list": //在线列表
$msgData = json_encode($this->getOnlineUsers($re_msg, $fd));
$server->push($fd, $msgData);
break;
case "friends_list":
$msgData = json_encode($this->getFriendsLists($re_msg, $fd));
$server->push($fd, $msgData);
break;
case "message": //消息处理
$this->message($re_msg, $fd, $server);
break;
case "tmp_message": //临时消息处理
$this->message($re_msg, $fd, $server, true);
break;
case "del_session": // 删除会话
$msgData = $this->delSession($re_msg, $fd);
$server->push($fd, json_encode($msgData));
break;
case "sign_out": // 登出
$this->signOut($re_msg, $fd, $server);
break;
case "error": // 删除会话
$server->push($fd, json_encode($re_msg));
break;
default:
$server->push($fd, json_encode($re_msg));
}
}
/**
* 获取消息类型
* @param $msg
*/
function getType($msg)
{
if (isset($msg["type"])) {
return $msg["type"];
}
return "error";
}
/***
* 上线后发送离线消息
* @param $server
* @param $fd
* @param $uid
*/
function getOfflineMessage($server, $fd, $uid)
{
$data = $this->message->getOfflineMessage($uid);
if (!empty($data)) {
$msgId = [];
foreach ($data as $value) {
$msgId[] = $value["id"];
$msg = [
"omesg_id" => $value["id"],
"uid" => $value["uid"],
"to_uid" => $value["to_uid"],
"type" => $value["type"],
"send_time" => $value["send_time"],
"content" => $value["content"],
];
$server->push($fd, json_encode($msg));
}
$this->message->upOfflineMessageStatus($msgId);
};
}
/***
* 登出
* @param $re_msg
* @param $fd
* @param $server
*/
function signOut($re_msg, $fd, $server)
{
$existsIsLogin = $this->storage->exists($re_msg["uid"]);
if ($existsIsLogin) {
if ($this->swoole_table->exist($existsIsLogin["fd"])) { //如果客户端ID 存在
$this->storage->logout($existsIsLogin["uid"]); //登出这个用户
$this->swoole_table->del($existsIsLogin["fd"]);
if ($server->exist($existsIsLogin["fd"])) {
$server->send($existsIsLogin["fd"], json_encode([
"type" => "sign_out",
"uid" => $existsIsLogin["uid"],
"fd" => $existsIsLogin["fd"],
"msg" => "the user has sign out ",
"status" => true
]));
$server->close($existsIsLogin["fd"]);
}
echo "logout {$existsIsLogin["uid"] } \n";
}
echo "client {$fd} closed\n";
}
}
/***
* 判断是否是客户端的心跳包
* @param $data
* @param $server
* @param $fd
* @return bool
*/
function isClientHeartbeat($data, $server, $fd)
{
if ($data == '{"uid":"hi"}') {
$server->push($fd, '{"uid":"hi"}'); // 返回数据
return true;
};
return false;
}
/***
* 登录逻辑
* {
* "uid":"4155", 用户id
* "type":"login", 类型
* "auth_key":"jkafjkdsagg", 授权key
* "time":1530599947267
* }
*/
function login($re_msg, $fd, $server)
{
if (!(isset($re_msg["user_key"]) && isset($re_msg["uid"]))) { //判断key 是否纯在
return $this->loginMsg("login", 00, $fd, false, [
"msg" => "no user key"
]);
}
$this->kickOut($re_msg, $fd, $server); // 如果这个用户在其他地方登陆 则踢出去
//抢登
$data = $this->user->getUserData($re_msg["user_key"]);
if ($data) { //判断用户是否存在
$data["fd"] = $fd; //链接句柄
$data["session_key"] = md5(time() . "_" . rand(1, 9));
$addLogin = $this->storage->login($data["uid"], $data);
if ($addLogin) {//将登录状态添加进 swoole table
$this->swoole_table->set($fd, ["uid" => $data["uid"]]);
$this->onlineMessage($server, $data["uid"]); //将上线消息群发给所有好友
return $this->loginMsg("login", $data["uid"], $fd, true, [
"session_key" => $data["session_key"],
]);
};
return $this->loginMsg("login", $data["uid"], $fd, false, [
"msg" => "is login", //已经等录
]);
}
return $this->loginMsg("login", $data["uid"], $fd, false, [
"msg" => "user not found" //用户不存在
]);
}
/***
* 如果用户在其他地方登陆 则登出这个用户
* @param $re_msg
* @param $fd
* @param $server
*/
function kickOut($re_msg, $fd, $server)
{
$existsIsLogin = $this->storage->exists($re_msg["uid"]);
if ($existsIsLogin) {
if ($this->swoole_table->exist($existsIsLogin["fd"])) { //如果客户端ID 存在
$this->storage->logout($existsIsLogin["uid"]); //登出这个用户
$this->swoole_table->del($existsIsLogin["fd"]);
if ($server->exist($existsIsLogin["fd"])) {
$server->send($existsIsLogin["fd"], json_encode([
"type" => "kick_out",
"uid" => $existsIsLogin["uid"],
"fd" => $existsIsLogin["fd"],
"msg" => "the user has logged in elsewhere",
"status" => true
]));
$server->close($existsIsLogin["fd"]);
}
echo "logout {$existsIsLogin["uid"] } \n";
}
echo "client {$fd} closed\n";
}
}
/***
* 断线从连
* @param $re_msg
* @param $fd
*/
function reconnection($server, $re_msg, $fd)
{
if (isset($re_msg["session_key"]) && isset($re_msg["uid"])) {
$data = $this->storage->getUser($re_msg["uid"]);
if (isset($data)) {
if ($data["session_key"] == $re_msg["session_key"]) {
$this->swoole_table->set($fd, ["uid" => $data["uid"]]);
$data["fd"] = $fd; //刷新链接句柄
$this->storage->login($re_msg["uid"], $data);
return $this->reconnectionMsg($re_msg, $fd, 1001, "reconnection is ok", true);
}
return $this->reconnectionMsg($re_msg, $fd, 1002, "the user has logged in elsewhere", false);
}
return $this->reconnectionMsg($re_msg, $fd, 1003, " user has logged out", false);
}
return $this->reconnectionMsg($re_msg, $fd, 1004, " the parameter is invalid", false);
}
/***
* @param $re_msg
* @param $fd
* @param $code
* @param $msg
* @param $status
* @return array
*/
function reconnectionMsg($re_msg, $fd, $code, $msg, $status)
{
return [
"type" => "reconnection",
"uid" => $re_msg["uid"],
"fd" => $fd,
"code" => $code,
"msg" => $msg,
"status" => $status
];
}
/**
* 获得在线用户
* @param $re_msg
* @param $fd
* @return array
*/
function getOnlineUsers($re_msg, $fd)
{
$OnlineUsers = $this->storage->getOnlineUsers();
if (empty($OnlineUsers)) {
return [
"type" => "online_list",
"uid" => $re_msg["uid"],
"fd" => $fd,
"list" => false,
"status" => false
];
} else {
return [
"type" => "online_list",
"uid" => $re_msg["uid"],
"fd" => $fd,
"list" => $this->user->getAllOnlineUsersData($OnlineUsers),
"status" => true
];
}
}
/**
* 获得我的用户列表
* @param $re_msg
* @param $fd
* @return array
*/
function getFriendsLists($re_msg, $fd)
{
$friendsList = $this->getFriendsList($re_msg["uid"]);
if (!empty($friendsList)) {
return [
"type" => "friends_list",
"uid" => $re_msg["uid"],
"fd" => $fd,
"list" => $friendsList,
"status" => true
];
}
return [
"type" => "friends_list",
"uid" => $re_msg["uid"],
"fd" => $fd,
"list" => false,
"status" => false
];
}
/**
* 转发消息
* @param $re_msg
* @param $fd
* @param $server
*/
function message($re_msg, $fd, $server, $isTmpMessage = false)
{
if ($re_msg['to_uid'] == "") {
$server->push($fd, json_encode($re_msg));
} else {
$uidData = $this->storage->getUser($re_msg['to_uid']); //获取所要发送的 用户 fd
if ($server->exist($uidData["fd"])) { // 判断是否已断开通客户端的链接或者push失败 https://group.swoole.com/question/106856
$this->message->save($re_msg, true); //保存消息到记录
$message = [
"time" => time(),
"type" => "message",
"uid" => $re_msg["uid"],
"to_uid" => $re_msg['to_uid'],
"content" => $re_msg['content'],
"status" => true
];
if ($isTmpMessage) {
// 互相加好友
if (!($re_msg["uid"] == $re_msg['to_uid'])) { // 防止自己加自己好友
$this->user->addSession($re_msg["uid"], $re_msg['to_uid']);//如果是临时会话
$this->user->addSession($re_msg["to_uid"], $re_msg['uid']);//如果是临时会话
}
$myData = $this->storage->getUser($re_msg['uid']); //获取所要发送的 用户 fd
$message["uid_data"] = $myData;
$message["type"] = "tmp_message";
// $message["user_data"] = $this->getUserWithUid($re_msg["uid"]);
}
$server->push($uidData["fd"], json_encode($message));
} else { //用户离线
$this->message->save($re_msg, false);
//将离线消息发送给自己
$server->push($fd, json_encode(
[
"type" => "offline_message",
"offline_uid" => $re_msg['to_uid'],
"content" => "用户已经下线",
"status" => true
]
));
};
}
}
/***
* @param $type
* @param $uid
* @param $fb
* @param $status
* @param array $data
* @return array
*/
function loginMsg($type, $uid, $fb, $status, $data = [])
{
return [
"type" => $type,
"uid" => $uid,
"fd" => $fb,
"status" => $status,
"data" => $data,
];
}
/***
* 发送消息
* @param $type
* @param $uid
* @param $to_uid
* @param $content
* @param $status
* @return array
*/
function sendMsg($type, $uid, $to_uid, $content, $status)
{
return [
"type" => $type,
"uid" => $uid,
"to_uid" => $to_uid,
"content" => $content,
"status" => $status
];
}
/**
* 通过UID 获取用户数据
* @param $uid
* @return array
*/
function getUserWithUid($uid)
{
return [
"uid" => $uid,
"name" => $uid,
"avatar" => "https://b-ssl.duitang.com/uploads/item/201605/05/20160505145557_dtYHf.thumb.700_0.jpeg",
"online" => $this->storage->exists($uid),
];
}
/**
* 获得我的朋友列表
* @param $userId
*/
function getFriendsList($uid)
{
return $this->user->getSessionData($uid);
}
/**
* 给所有好友发消息
* @param $server
* @param $uid 用户ID
* @param $msgData
* @param bool $online
*/
function sendFullFriendsMessage($server, $uid, $msgData, $online = true)
{
$friendsList = $this->getFriendsList($uid);
if (!empty($friendsList)) {
if ($online) { //只给在线的群发
foreach ($friendsList as $friend) {
if ($friend["online"]) { // 只给在线的群发
$uidData = $this->storage->getUser($friend['uid']); //获取所要发送的 用户 fd
if ($server->exist($uidData["fd"])) { // 判断是否已断开通客户端的链接或者push失败 https://group.swoole.com/question/106856
$server->push($uidData["fd"], json_encode($msgData));
} else {
$this->storage->logout($friend['uid']); // 如果朋友不在线则登出它
}
}
}
return;
} else {
// 给所有在线的群发
foreach ($friendsList as $friend) {
$uidData = $this->storage->getUser($friend['uid']); //获取所要发送的 用户 fd
$server->push($uidData["fd"], json_encode($msgData));
}
}
}
}
/***
* 将离线消息群发给好友
* @param $server
* @param $uid
*/
function offlineMessage($server, $uid)
{
$this->sendFullFriendsMessage($server, $uid, [
"type" => "offline_message",
"offline_uid" => $uid,
"content" => "用户已经下线",
"status" => true
]);
}
/***
* 群发上线消息
* @param $server
* @param $uid
*/
function onlineMessage($server, $uid)
{
$this->sendFullFriendsMessage($server, $uid, [
"type" => "online_message",
"offline_uid" => $uid,
"content" => "用户已经上线",
"status" => true
]);
}
//删除会话
function delSession($re_msg, $fd)
{
if (isset($re_msg['del_uid'])) {
$uid = $re_msg['uid'];
$del_uid = $re_msg['del_uid'];
if ($this->user->delSessionWithUid($del_uid, $uid)) {
return [
"type" => "del_session",
"uid" => $re_msg["uid"],
"fd" => $fd,
"del_uid" => $re_msg["del_uid"],
"status" => true
];
};
}
return [
"type" => "del_session",
"uid" => $re_msg["uid"],
"fd" => $fd,
"del_uid" => $re_msg["del_uid"],
"status" => false
];
}
}
转载请注明:(●--●) Hello.My Weicot » 一个简单的Swoole IM 示例 支持离线 断线从连 已读未读