分布式WebSocket集群解决方案

发布时间:2021-09-27 16:49:28

首先声明改文章为转载:https://segmentfault.com/a/1190000017307713
非常感谢大佬的文章!!
问题起因
最*做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Session共享的问题。
期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分享这方面的想法与研究。

以下是我的场景描述



资源:4台服务器。其中只有一台服务器具备ssl认证域名,一台redis+mysql服务器,两台应用服务器(集群)应用发布限制条件:由于场景需要,应用场所需要ssl认证的域名才能发布。因此ssl认证的域名服务器用来当api网关,负责https请求与wss(安全认证的ws)连接。俗称https卸载,用户请求https域名服务器(eg:https://oiscircle.com/xxx),但真实访问到的是http+ip地址的形式。只要网关配置高,能handle多个应用需求:用户登录应用,需要与服务器建立wss连接,不同角色之间可以单发消息,也可以群发消息集群中的应用服务类型:每个集群实例都负责http无状态请求服务与ws长连接服务

系统架构图






在我的实现里,每个应用服务器都负责http and ws请求,其实也可以将ws请求建立的聊天模型单独成立为一个模块。从分布式的角度来看,这两种实现类型差不多,但从实现方便性来说,一个应用服务http+ws请求的方式更为方便。下文会有解释


本文涉及的技术栈

Eureka 服务发现与注册Redis Session共享Redis 消息订阅Spring BootZuul 网关Spring Cloud Gateway 网关Spring WebSocket 处理长连接Ribbon 负载均衡Netty 多协议NIO网络通信框架Consistent Hash 一致性哈希算法

相信能走到这一步的人都了解过我上面列举的技术栈了,如果还没有,可以先去网上找找入门教程了解一下。下面的内容都与上述技术相关,题主默认大家都了解过了...


技术可行性分析

下面我将描述session特性,以及根据这些特性列举出n个解决分布式架构中处理ws请求的集群方案


WebSocketSession与HttpSession

protected void handleTextMessage(WebSocketSession session, TextMessage message) { System.out.println("服务器接收到的消息: "+ message ); //send message to client session.sendMessage(new TextMessage("message")); }

那么问题来了:ws的session无法序列化到redis,因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。每台服务器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前没有websocket session共享的方案,因此走redis websocket session共享这条路是行不通的。


以上便是websocket session与http session共享的区别,总的来说就是http session共享已经有解决方案了,而且很简单,只要引入相关依赖:spring-session-data-redisspring-boot-starter-redis,大家可以从网上找个demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底层实现的方式,我们无法做到真正的websocket session共享。


解决方案的演变
Netty与Spring WebSocket

刚开始的时候,我尝试着用netty实现了websocket服务端的搭建。在netty里面,并没有websocket session这样的概念,与其类似的是channel,每一个客户端连接都代表一个channel。前端的ws请求通过netty监听的端口,走websocket协议进行ws握手连接之后,通过一些列的handler(责链模式)进行消息处理。与websocket session类似地,服务端在连接建立后有一个channel,我们可以通过channel进行与客户端的通信



/**
* TODO 根据服务器传进来的id,分配到不同的group
*/
private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { //retain增加引用计数,防止接下来的调用引用失效 System.out.println("服务器接收到来自 " + ctx.channel().id() + " 的消息: " + msg.text()); //将消息发送给group里面的所有channel,也就是发送消息给客户端 GROUP.writeAndFlush(msg.retain()); }


那么,服务端用netty还是用spring websocket?以下我将从几个方面列举这两种实现方式的优缺点


使用netty实现websocket

玩过netty的人都知道netty是的线程模型是nio模型,并发量非常高,spring5之前的网络线程模型是servlet实现的,而servlet不是nio模型,所以在spring5之后,spring的底层网络实现采用了netty。如果我们单独使用netty来开发websocket服务端,速度快是绝对的,但是可能会遇到下列问题:
使用spring websocket实现ws服务

spring websocket已经被springboot很好地集成了,所以在springboot上开发ws服务非常方便,做法非常简单



org.springframework.boot
spring-boot-starter-websocket

第二步:添加配置类



@Configuration
public class WebSocketConfig implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(myHandler(), "/") .setAllowedOrigins("*"); } @Bean public WebSocketHandler myHandler() { return new MessageHandler(); } }

第三步:实现消息监听类



@Component
@SuppressWarnings("unchecked")
public class MessageHandler extends TextWebSocketHandler { private List clients = new ArrayList<>(); @Override public void afterConnectionEstablished(WebSocketSession session) { clients.add(session); System.out.println("uri :" + session.getUri()); System.out.println("连接建立: " + session.getId()); System.out.println("current seesion: " + clients.size()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { clients.remove(session); System.out.println("断开连接: " + session.getId()); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) { String payload = message.getPayload(); Map map = JSONObject.parseObject(payload, HashMap.class); System.out.println("接受到的数据" + map); clients.forEach(s -> { try { System.out.println("发送消息给: " + session.getId()); s.sendMessage(new TextMessage("服务器返回收到的信息," + payload)); } catch (Exception e) { e.printStackTrace(); } }); } }

从这个demo中,使用spring websocket实现ws服务的便利性大家可想而知了。为了能更好地向spring cloud大家族看齐,我最终采用了spring websocket实现ws服务。

从zuul技术转型到spring cloud gateway

要实现websocket集群,我们必不可免地得从zuul转型到spring cloud gateway。原因如下:



zuul1.0版本不支持websocket转发,zuul 2.0开始支持websocket,zuul2.0几个月前开源了,但是2.0版本没有被spring boot集成,而且文档不健全。因此转型是必须的,同时转型也很容易实现。

在gateway中,为了实现ssl认证和动态路由负载均衡,yml文件中以下的某些配置是必须的,在这里提前避免大家采坑
server:
port: 443
ssl: enabled: true key-store: classpath:xxx.jks key-store-password: xxxx key-store-type: JKS key-alias: alias spring: application: name: api-gateway cloud: gateway: httpclient: ssl: handshake-timeout-millis: 10000 close-notify-flush-timeout-millis: 3000 close-notify-read-timeout-millis: 0 useInsecureTrustManager: true discovery: locator: enabled: true lower-case-service-id: true routes: - id: dc uri: lb://dc predicates: - Path=/dc/** - id: wecheck uri: lb://wecheck predicates: - Path=/wecheck/**

如果要愉快地玩https卸载,我们还需要配置一个filter,否则请求网关时会出现错误not an SSL/TLS record



@Component
public class HttpsToHttpFilter implements GlobalFilter, Ordered { private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099; @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { URI originalUri = exchange.getRequest().getURI(); ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest.Builder mutate = request.mutate(); String forwardedUri = request.getURI().toString(); if (forwardedUri != null && forwardedUri.startsWith("https")) { try { URI mutatedUri = new URI("http", originalUri.getUserInfo(), originalUri.getHost(), originalUri.getPort(), originalUri.getPath(), originalUri.getQuery(), originalUri.getFragment()); mutate.uri(mutatedUri); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } ServerHttpRequest build = mutate.build(); ServerWebExchange webExchange = exchange.mutate().request(build).build(); return chain.filter(webExchange); } @Override public int getOrder() { return HTTPS_TO_HTTP_FILTER_ORDER; }

}



这样子我们就可以使用gateway来卸载https请求了,到目前为止,我们的基本框架已经搭建完毕,网关既可以转发https请求,也可以转发wss请求。接下来就是用户多对多之间session互通的通讯解决方案了。接下来,我将根据方案的优雅性,从最不优雅的方案开始讲起。


session广播

这是最简单的websocket集群通讯解决方案。场景如下:

    教师的消息请求发给网关,内容包含{我是教师A,我想把xxx消息发送我的学生们}网关接收到消息,获取集群所有ip地址,逐个调用教师的请求集群中的每台服务器获取请求,根据教师A的信息查找本地有没有与学生关联的session,有则调用sendMessage方法,没有则忽略请求






session广播实现很简单,但是有一个致命缺陷:计算力浪费现象,当服务器没有消息接收者session的时候,相当于浪费了一次循环遍历的计算力,该方案在并发需求不高的情况下可以优先考虑,实现很容易。



spring cloud中获取服务集群中每台服务器信息的方法如下

@Resource
private EurekaClient eurekaClient;

Application app = eurekaClient.getApplication("service-name");
//instanceInfo包括了一台服务器ip,port等消息
InstanceInfo instanceInfo = app.getInstances().get(0); System.out.println("ip address: " + instanceInfo.getIPAddr());


服务器需要维护关系映射表,将用户的id与session做映射,session建立时在映射表中添加映射关系,session断开后要删除映射表内关联关系

一致性哈希算法实现(本文的要点)

这种方法是本人认为最优雅的实现方案,理解这种方案需要一定的时间,如果你耐心看下去,相信你一定会有所收获。再强调一次,不了解一致性哈希算法的同学请先看这里,现先假设哈希环是顺时针查找的。


首先,想要将一致性哈希算法的思想应用到我们的websocket集群,我们需要解决以下新问题:


    集群节点DOWN,会影响到哈希环映射到状态是DOWN的节点。集群节点UP,会影响到旧key映射不到对应的节点。哈希环读写共享。

在集群中,总会出现服务UP/DOWN的问题。

针对节点DOWN的问题分析如下:



一个服务器DOWN的时候,其拥有的websocket session会自动关闭连接,并且前端会收到通知。此时会影响到哈希环的映射错误。我们只需要当监听到服务器DOWN的时候,删除哈希环上面对应的实际结点和虚结点,避免让网关转发到状态是DOWN的服务器上。


针对节点UP的问题分析如下:



现假设集群中有服务
CacheB上线了,该服务器的ip地址刚好被映射到key1和
cacheA之间。那么key1对应的用户每次要发消息时都跑去
CacheB发送消息,结果明显是发送不了消息,因为
CacheB没有key1对应的session。







此时我们有两种解决方案。



eureka监听到节点UP事件之后,根据现有集群信息,更新哈希环。并且断开所有session连接,让客户端重新连接,此时客户端会连接到更新后的哈希环节点,以此避免消息无法送达的情况。


方案B复杂,动作小:


我们先看看没有虚拟节点的情况,假设
CacheC
CacheA之间上线了服务器
CacheB。所有映射在
CacheC
CacheB的用户发消息时都会去
CacheB里面找session发消息。也就是说
CacheB一但上线,便会影响到
CacheC
CacheB之间的用户发送消息。所以我们只需要将
CacheA断开
CacheC
CacheB的用户所对应的session,让客户端重连。







接下来是有虚拟节点的情况,假设浅色的节点是虚拟节点。我们用长括号来代表某段区域映射的结果属于某个
Cache。首先是C节点未上线的情况。图大家应该都懂吧,所有B的虚拟节点都会指向真实的B节点,所以所有B节点逆时针那一部分都会映射到B(因为我们规定哈希环顺时针查找)。







接下来是C节点上线的情况,可以看到某些区域被C占领了。







由以上情况我们可以知道:节点上线,会有许多对应虚拟节点也同时上线,因此我们需要将多段范围key对应的session断开连接(上图红色的部分)。具体算法有点复杂,实现的方式因人而异,大家可以尝试一下自己实现算法。

哈希环应该放在哪里?



    gateway本地创建并维护哈希环。当ws请求进来的时候,本地获取哈希环并获取映射服务器信息,转发ws请求。这种方法看上去不错,但实际上是不太可取的,回想一下上面服务器DOWN的时候只能通过eureka监听,那么eureka监听到DOWN事件之后,需要通过io来通知gateway删除对应节点吗?显然太麻烦了,将eureka的职责分散到gateway,不建议这么做。eureka创建,并放到redis共享读写。这个方案可行,当eureka监听到服务DOWN的时候,修改哈希环并推送到redis上。为了请求响应时间尽量地短,我们不可以让gateway每次转发ws请求的时候都去redis取一次哈希环。哈希环修改的概率的确很低,gateway只需要应用redis的消息订阅模式,订阅哈希环修改事件便可以解决此问题。



至此我们的spring websocket集群已经搭建的差不多了,最重要的地方还是一致性哈希算法。现在有最后一个技术瓶颈,网关如何根据ws请求转发到指定的集群服务器上?答案在负载均衡。spring cloud gateway或zuul都默认集成了ribbon作为负载均衡,我们只需要根据建立ws请求时客户端发来的user id,重写ribbon负载均衡算法,根据user id进行hash,并在哈希环上寻找ip,并将ws请求转发到该ip便完事了。流程如下图所示:







接下来用户沟通的时候,只需要根据id进行hash,在哈希环上获取对应ip,便可以知道与该用户建立ws连接时的session存在哪台服务器上了!

spring cloud?Finchley.RELEASE?版本中ribbon未完善的地方

题主在实际操作的时候发现了ribbon两个不完善的地方......


    根据网上找的方法,继承AbstractLoadBalancerRule重写负载均衡策略之后,多个不同应用的请求变得混乱。假如eureka上有两个service A和B,重写负载均衡策略之后,请求A或B的服务,最终只会映射到其中一个服务上。非常奇怪!可能spring cloud gateway官网需要给出一个正确的重写负载均衡策略的demo。一致性哈希算法需要一个key,类似user id,根据key进行hash之后在哈希环上搜索并返回ip。但是ribbon没有完善choose函数的key参数,直接写死了default






难道这样子我们就没有办法了吗?其实还有一个可行并且暂时可替代的办法!






由于ribbon未完善key的处理,我们暂时无法在ribbon上实现一致性哈希算法。只能间接地通过客户端发起两次请求(一次http,一次ws)的方式来实现一致性哈希。希望不久之后ribbon能更新这个缺陷!让我们的websocket集群实现得更优雅一点。


后记

以上便是我这几天探索的结果。期间遇到了许多问题,并逐一解决难题,列出两个websocket集群解决方案。第一个是session广播,第二个是一致性哈希。这两种方案针对不同场景各有优缺点,本文并未用到ActiveMQ,Karfa等消息队列实现消息推送,只是想通过自己的想法,不依靠消息队列来简单地实现多用户之间的长连接通讯。希望能为大家提供一条不同于寻常的思路。

相关文档

  • 湖北财税职业学院2018年6月英语四级报名时间及报名条件湖北财税职业学院贴吧
  • 84消毒液能和洗衣液一起用吗,84消毒液和洗衣液能同时用么,84消毒液和洗衣粉能混用吗
  • 夏季如何使用手竿钓鲶鱼
  • 蒸鸡蛋日记
  • 篦冷机常见的系统故障处理
  • catalina.out文件过大 用 cronolog报错cronolog Bootstrap: command not found解决
  • 回复邀约的礼仪
  • 政协工作者学习科学发展观心得体会
  • redis概念
  • 怎么做好吃的土豆烧肉?
  • 大肠癌的饮食保健
  • 微博手机号注册上限怎么办
  • SpringBoot(三)JSR303数据校验/多环境切换/XXXProperties
  • linux中创建定时任务与取消
  • XCode8 真机测试打包,让发布测试更轻松
  • py学习记Day33
  • 广东移动每月免费流量怎么领取
  • 佛山职业技术学院_广东佛山科学技术学院2018年6月英语四级报名时间及报名条件
  • 冰箱冷藏室结冰是什么原因电冰箱冷藏室结冰怎么办
  • 关于描写月亮的句子
  • 平凡的世界大结局是什么?
  • 写人随笔作文
  • 培训心得讲话稿简短
  • 12.9级螺丝一般用什么材料
  • OpenCMS 7.0安装和汉化
  • Python新手也可以做出超有趣的项目
  • 公共管理系毕业生的求职信参考
  • 《数据可视化与数据挖掘??基于Tableau和SPSS Modeler图形界面》之可视化数据挖掘概述
  • 速成马甲线全攻略15个练腹肌动作甩去小肚子
  • 提高打字速度https://www.zhihu.com/question/21509685
  • 猜你喜欢

  • 中国国际服务外包产业发展规划纲要(2011-2015)
  • 2018八年级语文上册第5单元17中国石拱桥作业课件新人教版
  • 人教课标版 近代中国经济结构的变动与资本主义的曲折发展优秀ppt课件10 (1)
  • 2019版七年级数学下册第一章整式的乘除1.4整式的乘法(第1课时)一课一练基础闯关(新版)北师大版
  • 佛山市禅城区绿建源建筑研究院(企业信用报告)- 天眼查
  • 重庆市八校2017-2018学年人教版七年级语文下学期第二阶段测试试题含答案
  • 褒贬两用的成语精选
  • 表达一个人失望心酸的签名
  • 1月工作总结开头
  • 【精编范文】伤中的暖暖温情750字作文-实用word文档 (1页)
  • 20XX大学新生军训个人心得体会
  • 20XX年劳动力转移就业工作总结
  • 第1部分C语言概述及组成纪钢2010年2月
  • Unity经验任意位置的透视问题
  • 广饶县李鹊镇桂龙建材经销部企业信息报告-天眼查
  • 劳务派遣工应如何维护自己的权益
  • 民办高校贫困大学生就业焦虑现状与心理教育策略研究
  • 送给姐妹的祝福语
  • 2016届高考化学二轮复*学案专题11 第1单元《《化学实验基础知识和技能》.doc
  • 二年级上册音乐课件-《摇橹》冀少版 (共8张PPT)
  • 【最新】人教PEP版五年级英语上册Unit 2 My week Part A 第3课时课件ppt
  • 【范文学*】财政局2018年瑟肽旯ぷ髯芙岷拖掳肽旯ぷ骷苹?段
  • 文学毕业论文范文
  • 泰坦尼克号真相 泰坦尼克号的真实历史
  • 山东省高密市第三中学高中政治 第一课 美好生活的向导导学案(创新班,无答案)新人教版必修4
  • 武汉宅典房地产策划咨询有限公司企业信用报告-天眼查
  • 2019秋部编版二年级上册语文9黄山奇石(课堂教学课件3)
  • 2017年7月初六结婚日子怎么样
  • 建筑设计项目进度计划表
  • 上海证券交易所编制证交所股价指数的说明
  • 世界主要自由贸易区概况
  • 广东省高中学业水*测试政治测试:必修3第8课走进文化生活Word版含答案
  • 2019秋八年级英语上册 Unit 1 Where did you go on vacation Wednesday(复现式周周练)新人教 新目标版
  • 2013年河北省公务员考试公告
  • 蓝丝绒虾好养吗,怎么养
  • 丹徒县上会汽车修理厂企业信息报告-天眼查
  • 安徽省桐城中学2019届高三数学上学期第三次月考试题理20181101025
  • 中国二手车市场调研报告
  • 七年级数学有理数的混合运算(2019年新版)
  • 29例危重病人经皮气管切开术的配合及护理
  • 阿里云Fabric 1个order1个peer部署步骤
  • nba骑士夺冠是哪一年
  • 电脑版