04-websocket并发场景下发送消息出错的解决方案
前言:上一篇文章,主要演示了websocket并发场景下发送消息出错的问题,本文首先填上一篇的埋下的坑,也会给出解决方案
1 填坑-为什么调用的是 getBasicRemote().sendText方法
1.1 getBasicRemote().sendText 与 getAsyncRemote().sendText
上一篇提到,spring封装的websocket在发送消息的时候,调用的是javax.websocket的getBasicRemote().sendText方法,但是javax.websocket是支持异步的,因为它提供了异步发送消息的方法
...
//org.apache.tomcat.websocket;
//WsSession类
@Override
public RemoteEndpoint.Async getAsyncRemote() {
checkState();
return remoteEndpointAsync;
}
...
//org.apache.tomcat.websocket;
//WsRemoteEndpointAsync类
@Override
public Future<Void> sendText(String text) {
return base.sendStringByFuture(text);
}
...
//代码示例
session.getAsyncRemote().sendText(content);
那为什么spring内部在封装websocket的时候,没有是使用这个异步调用的方法呢?而是采用了基于同步发送的方法。如果采用的是基于异步调用的方法,可以避免并发出错的问题吗?
首先,javax.websocket提供的异步调用的方法,是有特定的条件的,特定条件就是:如果某一时刻来的100消息,他们对应的客户端都是不用一样的,也就是一条消息对应一个特定的客户端,那么在这种并发情况下,使用这个异步调用的方法是没有任何问题的,因为不同的客户端对应的session都不一样,里面的状态机也不一样,都是独立的,不会互相影响,没有共享数据,自然也就没有并发安全问题。
但是 ,但是,在实际的业务中,来了100条消息,很有可能其中某几条,比如10条是发给同一个客户端的,这个时候就涉及到共享同一个session的状态了,就会出现并发场景下,发送消息出错,也就是说,有的消息,能正常 发送出去,客户端能接收到消息,但是有的客户端收不到消息,或者只收到了几条消息,没有收到全部的消息。显然,这就是问题所在
结论:由于实际的业务场景中,一条消息并不能对应一个唯一的客户端(一对一),并且这种对应关系是十分复杂的,是很容易出错的(除非把所有的消息按照对应的客户端分组,每个分组一个线程,不同的分组不同的线程…没错,这已经是一种解决方案了!!!),所以虽然javax.websocket本身提供了异步发送消息的方法,但是spring并没有采用,而是采用的同步调用的方法。
另外一个问题:虽然spring用的是同步调用的方法,但是还是会出现并发发送消息出错的情况,为什么呢?因为这个同步调用并没有考虑并发,也就是没有使用synchronized等手段来保证并发同步,所以,即使是javax.websocket的同步调用方法,在并发场景下,还是会出错。
2 解决方案
2.1 synchronized锁住发送消息的方法
这种解决方案效果是很明显的,道理也是显而易见的,只是这样的话,全局的消息都会阻塞住,某些场景下性能会十分槽糕。需要结合实际的业务场景考虑,是否适合采用这种解决方案
spring框架下使用websocket
...
if (session.isOpen()) {
synchronized (session){
try {
session.sendMessage(new TextMessage(message));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
...
javax.websocket
public synchronized void sendMessage(String id, String content) {
Session session = (Session) clients.get(id);
if (session == null) {
log.error("服务端给客户端发送消息失败 ==> toid = {} 不存在, content = {}", id, content);
} else {
try {
session.getBasicRemote().sendText(content);
} catch (IOException e) {
throw new RuntimeException(e);
}
log.info("服务端给客户端发送消息 ==> toid = {}, content = {}", id, content);
}
}
2.2 spring websocketSession的加强版
org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator
这个目前是最优解,兼顾并发和性能。不过仅限于spring框架下的websocket,如果是使用javax下的websocket,需要自己按照spring的解决方案封装一些东西(把spring的源码复制过来,简化即可使用),可以参考我的代码
使用方法就是:用ConcurrentWebSocketSessionDecorator替换websocketSession
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// 当WebSocket连接建立成功时调用
// 获取url指定的参数信息:ws://127.0.0.1:10010/ws?scanPoint=01&userId=123
//String scanPoint = extractParams(session, "scanPoint");
//if (Objects.isNull(scanPoint)) {
// return;
//}
//原来是WebSocketSession,换成ConcurrentWebSocketSessionDecorator
//sessions.put(scanPoint, session);
//ConcurrentWebSocketSessionDecorator的构造方法后面2个参数的含义是:发送超时时间限制,发送内容大小的限制
//看到这个你就会发现,不愧是spring啊,强的一批
sessions.put(scanPoint, new ConcurrentWebSocketSessionDecorator(session, 10*1000, 10*1024));
//...
}
这里先埋个坑,后面打算写一篇文章来解析spring的ConcurrentWebSocketSessionDecorator
javax.websocket
(用到了spring的其他依赖,Component的什么的)
下面是我把spring源码复制过来,简化后的代码。sendMessage是对外提供的发送消息的方法
import java.io.IOException;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ServerEndpoint("/ws/{id}")
@Component
public class MyWebsocketDecorator {
private static final Logger log = LoggerFactory.getLogger(MyWebsocketDecorator.class);
private static final AtomicInteger onlineClientCount = new AtomicInteger(0);
private static final ConcurrentMap<String, Session> clients = new ConcurrentHashMap<>();
private String id;
private Session session;
//=======================================================
/**
* 发送时间限制:毫秒
*/
private final int sendTimeLimit = 10 * 1000;
/**
* websocket的缓存消息队列:所有的消息全部先放入这里:我这里写3000足够了
*/
private final Queue<String> buffer = new LinkedBlockingQueue<>(3000);
/**
* 开始发送的时间戳
*/
private volatile long sendStartTime;
/**
* 是否达到限制条件
*/
private volatile boolean limitExceeded;
/**
* 关闭websocket
*/
private volatile boolean closeInProgress;
/**
* 发送消息时需要获取的锁
*/
private final Lock flushLock = new ReentrantLock();
/**
* 检查websocket状态时需要获取的锁
*/
private final Lock closeLock = new ReentrantLock();
@OnOpen
public void open(@PathParam("id") String id, Session session) {
this.id = id;
this.session = session;
clients.put(id, session);
onlineClientCount.incrementAndGet();
log.info("连接建立成功,当前在线数为:{} ==> 开始监听新连接:session_id = {}, id = {}", new Object[]{onlineClientCount, session.getId(), id});
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("服务端接收到客户端消息 ==> id = {}, content = {}", this.id, message);
}
@OnClose
public void close(@PathParam("id") String id, Session session) {
clients.remove(id);
onlineClientCount.decrementAndGet();
log.info("连接关闭成功,当前在线数为:{} ==> 关闭该连接信息:session_id = {}, id = {}", new Object[]{onlineClientCount, session.getId(), id});
}
public void sendMessage(String id, String message) throws IOException {
//把id和message拼接在一起,发消息的时候,在拆开
this.buffer.add(id + "|" + message);
do {
if (!tryFlushMessageBuffer()) {
//if (logger.isTraceEnabled()) {
// logger.trace(String.format("Another send already in progress: " +
// "session id '%s':, "in-progress" send time %d (ms), buffer size %d bytes",
// getId(), getTimeSinceSendStarted(), getBufferSize()));
//}
log.info("================>有线程正在发送消息,当前线程检查是否超时!");
checkSessionLimits();
break;
}
}
while (!this.buffer.isEmpty());
}
public void sendSynchronize(String id, String content) {
Session session = (Session) clients.get(id);
if (session == null) {
log.error("服务端给客户端发送消息失败 ==> toid = {} 不存在, content = {}", id, content);
} else {
try {
session.getBasicRemote().sendText(content);
} catch (Exception e) {
log.info("异常信息:{}", e.getMessage());
log.error("服务端给客户端发送消息失败 ==> toid = {}, content = {}", id, content);
}
log.info("服务端给客户端发送消息 ==> toid = {}, content = {}", id, content);
}
}
public void sendSynchronize(String content) {
clients.forEach((onlineid, session) -> {
if (!this.id.equalsIgnoreCase(onlineid)) {
try {
session.getBasicRemote().sendText(content);
log.info("服务端给客户端群发消息 ==> id = {}, toid = {}, content = {}", new Object[]{this.id, onlineid, content});
} catch (Exception e) {
log.info("异常信息:{}", e.getMessage());
log.error("服务端给客户端发送消息失败 ==> toid = {}, content = {}", id, content);
}
}
});
}
private boolean tryFlushMessageBuffer() throws IOException {
if (this.flushLock.tryLock()) {
try {
while (true) {
String message = this.buffer.poll();
if (message == null) {
break;
}
this.sendStartTime = System.currentTimeMillis();
//发送消息
String[] split = message.split("\|");
String key = split[0];
if (split.length != 2) {
sendSynchronize(key);
} else {
sendSynchronize(key, split[1]);
}
this.sendStartTime = 0;
}
} finally {
this.sendStartTime = 0;
this.flushLock.unlock();
}
return true;
}
return false;
}
private void checkSessionLimits() {
if (this.closeLock.tryLock()) {
try {
if (getTimeSinceSendStarted() > getSendTimeLimit()) {
//String format = "Send time %d (ms) for session '%s' exceeded the allowed limit %d";
//String reason = String.format(format, getTimeSinceSendStarted(), getId(), getSendTimeLimit());
//limitExceeded(reason);
//超时异常处理
throw new RuntimeException("ws消息超时");
}
} finally {
this.closeLock.unlock();
}
}
}
public long getTimeSinceSendStarted() {
long start = this.sendStartTime;
return (start > 0 ? (System.currentTimeMillis() - start) : 0);
}
public int getSendTimeLimit() {
return this.sendTimeLimit;
}
}
这个是原始代码,没有引入spring的解决方案
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@ServerEndpoint("/ws/{id}")
@Component
public class WS {
private static final Logger log = LoggerFactory.getLogger(WS.class);
private static AtomicInteger onlineClientCount = new AtomicInteger(0);
private static final ConcurrentMap<String, Session> clients = new ConcurrentHashMap<>();
private String id;
private Session session;
public WS() {
}
@OnOpen
public void open(@PathParam("id") String id, Session session) {
this.id = id;
this.session = session;
clients.put(id, session);
onlineClientCount.incrementAndGet();
log.info("连接建立成功,当前在线数为:{} ==> 开始监听新连接:session_id = {}, id = {},。", new Object[]{onlineClientCount, session.getId(), id});
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("服务端接收到客户端消息 ==> id = {}, content = {}", this.id, message);
}
@OnClose
public void close(@PathParam("id") String id, Session session) {
clients.remove(id);
onlineClientCount.decrementAndGet();
log.info("连接关闭成功,当前在线数为:{} ==> 关闭该连接信息:session_id = {}, id = {},。", new Object[]{onlineClientCount, session.getId(), id});
}
public void send(String id, String content) {
Session session = (Session)clients.get(id);
if (session == null) {
log.error("服务端给客户端发送消息 ==> toid = {} 不存在, content = {}", id, content);
} else {
session.getAsyncRemote().sendText(content);
log.info("服务端给客户端发送消息 ==> toid = {}, content = {}", id, content);
}
}
public void send(String content) {
clients.forEach((onlineid, session) -> {
if (!this.id.equalsIgnoreCase(onlineid)) {
session.getAsyncRemote().sendText(content);
log.info("服务端给客户端群发消息 ==> id = {}, toid = {}, content = {}", new Object[]{this.id, onlineid, content});
}
});
}
}
2.3 其他博主的解决方案
刚才在上面已经提到了,不同的客户端不同的线程,已经有博主实现了,我就不造轮子了(后面有时间的话,看看能不能造一个,手动狗头)
这是文章地址:https://blog.csdn.net/qq_35634154/article/details/122576665
2.4 一些想法,还未实践
-
引入生产者和消费者队列,生产者只管往队列里面翻消息,消费者判断消息队列是否为空,不为空就取出消息,发送
-
引入事件驱动队列
不过,感觉都有些过度解决问题了。。。
3 总结
本文主要给出了websocket的在并发场景下发送消息出错的几种解决方案,有些方案仅限于思路,并未实现
参考
https://blog.csdn.net/qq_35634154/article/details/122576665
https://blog.csdn.net/abu935009066/article/details/131218149