基于Swoole的分布式socket消息服务器架构

消息服务器使用socket,为避免服务器过载,单台只允许500个socket连接,当一台不够的时候,扩充消息服务器是必然,问题来了,如何让链接在不同消息服务器上的用户可以实现消息发送呢?

要实现消息互通就必须要让这些消息服务器本身能互通,想了两个方式,一种是消息服务器之间交叉链接,另一种是增加一个特殊的消息服务器,这个消息服务器不对外开放,只负责消息转发和推送。

下列测试不考虑防火墙等。仅测试可行性和效率。

测试环境

  • 消息服务器
    192.168.0.201 9501 
    192.168.0.202 9501
    
  • 转发服务器
    192.168.0.203 9501
    
  • 公共缓存
    Redis 192.168.0.231 6379
    
  • 软件环境
    centos 6.5 mini swoole php
    

流程图

  • 整个流程图如下:

    enter image description here

  • 流程图说明:
    client1可向client2或者其他client发送消息,并接收其他client发送的消息.

    Redis中保存client连接的信息,给每个用户分配唯一的key,包括链接的哪台服务器,转发服务器定时检测消息服务器,如消息服务器挂掉,由转发服务器清理掉Redis已经挂掉的所有链接。

  • 完整的流程:

    1.Client1Client2发送一条消息

    2.Socket1接收到消息,根据key从Redis取出Client2的连接信息,连接在本机,直接推送给Client2,流程结束。

    3.如果连接不在本机,把消息推送到转发服务器,由转发服务器把该消息推送给连接所在消息服务器,消息服务器接收消息,推送给Client2

    4.消息发送结束。

编码实现

  • Socket
    在socket1上创建一个server.php,内容如下:

      <?php //服务端 
       $serv = new swoole_server("0.0.0.0", 9501);
    
      //redis
      $redis = new \Redis();        
      $redis->connect("192.168.0.231", 6379);
      
      //client
      $proxy = new swoole_client(SWOOLE_TCP | SWOOLE_KEEP);
      $proxy->connect("192.168.0.203", 9501);
      
      $serv->on('start', function($serv) {
          echo "Service:Start...";
      });
      $serv->on('connect', function ($serv, $fd) {
      
      });
      $serv->on('receive', function ($serv, $fd, $from_id, $data) {
          global $redis;
      
          $data = (array) json_decode($data);
          $cmd = $data['cmd'];
    
         switch ($cmd) {
    
              case "login"://登陆
                  //保存连接信息
                  $save = array(
                      'fd' => $fd,
                      'socket_ip' => "192.168.0.201"
                  );
                  $redis->set($data['name'], serialize($save));
                  break;
    
              case "chat":
                  $recv = unserialize($redis->get($data['recv']));
                  if ($recv['socket_ip'] != "192.168.0.201") {
                      //需要转发
                      $data['cmd'] = 'forward';
                      $data['recv_ip'] = $recv['socket_ip'];
                      $serv->task(json_encode($data));
                  } else {
                      //直接发送
                      $serv->send($recv['fd'], "{$data['send']}给您发了消息:{$data['content']}");
                  }
              break;
    
              case "forward"://接收转发消息
                  $recv = unserialize($redis->get($data['recv']));
                  $serv->send($recv['fd'], "{$data['send']}给您发了消息:{$data['content']}");
    
              break;
          }
          //$serv->send($fd, 'Swoole: ' . $data);
      });
      $serv->on('task', function ($serv, $task_id, $from_id, $data) {
          global $proxy;
          $proxy->send($data);
      });
    
      $serv->on('finish', function ($serv, $task_id, $data) {
    
      });
      $serv->on('close', function ($serv, $fd) {
          echo "Client: Close.\n";
      });
    
      $serv->set(array('task_worker_num' => 4));
    
      $serv->start();
    

    在socket2上只需把ip变更一下即可。192.168.0.201变更为192.168.0.202.

  • Proxy
    在转发服务器上建立脚本proxy.php,内容如下:

     $serv = new swoole_server("0.0.0.0", 9501); //服务端
     $serv->on('start', function($serv) {
         echo "Service:Start...";
     });
     $serv->on('connect', function ($serv, $fd) {
    
     });
     $serv->on('receive', function ($serv, $fd, $from_id, $data) {
         global $redis;
         $serv->task($data);
     });
     $serv->on('task', function ($serv, $task_id, $from_id, $data) {
         $forward = (array) json_decode($data);
         $client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
    
         $client->connect($forward['recv_ip'], 9501);
         unset($forward['recv_ip']);
         $client->send(json_encode($forward));
         $client->close();
     });
    
     $serv->on('finish', function ($serv, $task_id, $data) {
    
     });
     $serv->on('close', function ($serv, $fd) {
         echo "Client: Close.\n";
     });
     
     $serv->set(array('task_worker_num' => 4));
    
     $serv->start();
    

测试

注意开启顺序

1.开启转发服务器php proxy.php

2.分别开启socket服务器php server.php

enter image description here

可以在转发服务器上看到两个消息服务器已经连接
3.开始测试,分别打开两个telnet,连接两个消息服务器,发送消息测试:
登陆

enter image description here

发送消息测试

enter image description here

消息成功接收。

基于强大的swoole扩展,让php高效的实现这些成为可能,目前消息服务器到转发服务器是长连接,转发服务器到消息服务器是短连接,存在性能瓶颈,也浪费了连接资源。下一步改造成长连接,消息服务器的client使用异步。

标签: swoole, socket, 分布式

已有 21 条评论

  1. yulo

    请问 我在前端 怎么连接不同ip呢?我一个项目,每次判断 或者请求一次?

  2. ucan

    并没有做到真正的分布式,根据域名,负载信息连接到相应的服务器。

  3. 受用,收藏了

  4. 奋斗

    很屌

  5. [...]http://blog.molibei.com/archives/105[...]

  6. simon

    运行了您的代码,结果发现在 web 端用 js 连接上服务器是发不出消息的,请问这是 websocket吗?

  7. dandelion

    转发服务器send完之后,就直接colse,不会有问题吗,这里?
    $client->send(json_encode($forward));
    $client->close();

    1. 半桶水

      最好不要立即close,可由client来发起close

  8. 微笑刺客

    如何才能实现主动从服务器推送,而不是在用户建立连接或发送消息时,才向其它保持连接的用户推送消息?

  9. 楼主大神

  10. 我只是想应对php每次请求都要重新把一些静态的东西加载到内存中效率比较低这个问题,有人推荐了swoole,感觉有点杀鸡用牛刀了

    1. 啊啊

      用memcached

  11. proudbird

    如何利用server.php 中的swoole_client 来接收proxy转发来的消息 ,交给swoole_server发给用户呢?

  12. 不知道人

    看了您写的swoole的分布式 受益匪浅啊 但是 请问一下 转发服务器与消息服务器怎么做长连接啊 求教

    1. 额,这是以前刚接触的时候写的很挫的一个例子,例子中已经是长链接了,只不过使用的是同步,你测试的时候可以改成异步,并把连接接复用

      1. 不知道人

        我用转发服务连接消息服务器时 改成了$cli = new swoole_client(SWOOLE_TCP | SWOOLE_KEEP); 但是消息服务器上的fd还是递增的 我的服务器是nginx的环境 请问这个问题怎么破 谢谢

        1. 这个都是要在命令行守护运行的,你用web方式运行,每一次都会重复连接,fd自然是增长的

  13. ibrahim

    在这个例子中,只需要把转发服务器到消息服务器的短连接改为长连接,性能是否就可以提升了?

    1. 本身就是长链接,只是没有复用

      1. albert.qiu

        不复用好点吧

        1. 不复用每次发送消息都会有连接和断开流程

添加新评论