工作这么多年了,从来没写过博客,2018 年打算改变一下自己,于是打算平时工作解决问题后整理一下,写成博客发出来,这个想法有一段时间了,但是因为拖延症,所以今天才写,希望这是一个好的开始。
问题起源
公司应用内有一个基于websocket的聊天室功能,一直有用户反应高峰时段聊天信息延迟的厉害,终于找时间抽空解决了一下。
首先通过日志发现,聊天室/直播间内用户稍微多一点的情况下(30-50人),广播一条消息就会非常耗时,常常达到 10-20s
检查代码发现 SessionUtil 的 sendMessage 方法内,有如下一段代码(精简):
if (session.isOpen()) { synchronized (session) { if (session.isOpen()) { try { session.sendMessage(message); } catch (IOException t) { session.close(); } } }}
相应的,聊天室内广播代码逻辑是这样的(伪码)
String message = getMessage();for(WebsocketSession session : channel.getSessions()){ SessionUtil.sendMessage(message, session);}
我未对此段代码进行运行分析,但是这段代码的问题可凭肉眼检查并分析出来。
- 广播消息的部分,是遍历聊天室内的 session,然后顺序发送。
- 发送单条消息使用 synchronized 对 session 对象加了锁,对于多个线程、针对同一 session 的多条消息,在事实上是同步发送的,因为后边的阻塞于 synchronized 锁。
设想如下场景:
聊天室内有 20 名用户,同一时间发送了 1 条聊天内容,即有 20 条广播到达服务器。
将消息标为 m1,m2,m3...m20,用户标为 s1,s2,s3...s20,每条消息都要广播给每个用户,所有的任务为这两个集合的笛卡尔积,在程序中形如:
J1: [(m1,s1), (m1,s2)...(m1,s20)]
J2: [(m2,s1), (m2,s2)...(m2,s20)]
. .
J20: [(m20,s1), (m20,s2)...(m20,s20)]
理想上 J1[0] - J1[19] 这20条顺序发送,但是与 J[2-20][n] 全都是并行执行的。
然鹅。。。s 上边是有锁的,假设 (m1,s1)...(m20,s1) 都在同一时刻执行,(m1,s1) 拿到了 s1 上的锁,那么 J2、J3...J20 全都得等 s1 的锁,每组中的后继也必须等第一条消息发送完。到后边(mn, s2)...(mn, s20) 全都在重复同一个悲剧。这400条消息实际上是同步发送的,不仅如此,牵涉到线程上下文切换和锁争用,实际上要比顺序执行还要慢一些。由此,广播过程之慢就不难理解了。
问题的解决
其实,按照 spring 官方提供的 guide :
这样我们就可以使用 @SendTo 注解来进行广播,代码可以简单很多,出于对大厂的信任,我们应该也无需去担心性能问题。但是由于种种原因,要做的改动太大,我们现在只能考虑在原有代码上做优化而非整个改变底层支持。
最简单的办法
那么,直接把 synchronized 关键字去掉,避免锁争用的情况,不是就可以解决这个问题了吗?
遗憾的是,当去掉这处的锁之后,并发稍微一大的情况下,程序会抛出 java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
这样的异常。
那么我们来到抛异常的地方,看看这个问题是怎么产生的
Note: tomcat(embeded 8.5.15)
org.apache.tomcat.websocket.WsRemoteEndpointImplBase
public void sendString(String text) throws IOException { if (text == null) { throw new IllegalArgumentException(sm.getString("wsRemoteEndpoint.nullData")); } stateMachine.textStart(); sendMessageBlock(CharBuffer.wrap(text), true); } private static class StateMachine { public synchronized void textStart() { checkState(State.OPEN); state = State.TEXT_FULL_WRITING; } private void checkState(State... required) { for (State state : required) { if (this.state == state) { return; } } throw new IllegalStateException( sm.getString("wsRemoteEndpoint.wrongState", this.state)); } }
可以看到,sendString
这个方法未做任何同步约束,它在发送之前会去调用 textStart
方法,textStart
方法会去检查当前 endPoint 的状态,如果不是 OPEN
的话,就会抛出异常,就是说,上一条消息发送完成之前,状态还是 XXXX_WRITING
的话,这里是会出问题的。所以说,sendString
方法不是线程安全的,我的这种优化是想当然了。
顺便一提,这种 checkState 的做法是一种 “速错” 的思想,这个倒是在我的工作里和“防御式编程”结合得淋漓尽致。
有趣的 issue
在说具体思路之前,讲讲我在查问题的过程中发现的一个有趣的 issue :
这个 issue 的具体情况和我们程序里还是稍微有一点不一样的,但是其实都是一样的(这个下边会提到),并且这个 issue 里有丰富的内容可以讨论。
我们程序里调用的是 Spring 提供的 org.springframework.web.socket.adapter.standard.StandardWebSocketSession#sendTextMessage
, 该方法的实现如下:
RemoteEndpoint.Basic getBasicRemote();@Overrideprotected void sendTextMessage(TextMessage message) throws IOException { getNativeSession().getBasicRemote().sendText(message.getPayload(), message.isLast());}
然而这个 issue 的创建者 Simone Bordet 在 issue 里提到
If an application calls RemoteEndpoint.Async#sendText(String, SendHandler) concurrently from 2 (or more) threads, it gets the stack trace below.
他是调用了 RemoteEndpoint.Async 的 sendText 方法,然而出现了和我们一样的问题。
那么为啥呢?
好,我们来看看 RemoteEndpoint.Async 的实现类 org.apache.tomcat.websocket.WsRemoteEndpointAsync
的实现是啥样的,我把代码都复制过来吧,其实也不是很长:
public class WsRemoteEndpointAsync extends WsRemoteEndpointBase implements RemoteEndpoint.Async { WsRemoteEndpointAsync(WsRemoteEndpointImplBase base) { super(base); } @Override public long getSendTimeout() { return base.getSendTimeout(); } @Override public void setSendTimeout(long timeout) { base.setSendTimeout(timeout); } @Override public void sendText(String text, SendHandler completion) { base.sendStringByCompletion(text, completion); } @Override public FuturesendText(String text) { return base.sendStringByFuture(text); } @Override public Future sendBinary(ByteBuffer data) { return base.sendBytesByFuture(data); } @Override public void sendBinary(ByteBuffer data, SendHandler completion) { base.sendBytesByCompletion(data, completion); } @Override public Future sendObject(Object obj) { return base.sendObjectByFuture(obj); } @Override public void sendObject(Object obj, SendHandler completion) { base.sendObjectByCompletion(obj, completion); }}
在org.apache.tomcat.websocket.WsSession
的构造方法里是这样做的:
this.remoteEndpointAsync = new WsRemoteEndpointAsync(wsRemoteEndpoint);this.remoteEndpointBasic = new WsRemoteEndpointBasic(wsRemoteEndpoint);
????????????????????
就是把 WsRemoteEndpointImplBase
整个 wrap 了一遍,说好的 Async 呢?
这也就是我上边为什么说我们的问题其实是一样的原因,这也是为什么下边维护者Mark Thomas 回复说
Given that the Javadoc for RemoteEndpoint.Basic explicitly states that concurrent attempts to send messages are not allowed and the Javadoc for RemoteEndpoint.Async doesn't
的原因,真是偷懒啊。。
在这个 issue 下边,用户和开发者展开了激烈的讨论,不过大概内容就是用户认为这种并发场景很常见,人家 jetty 和 undertow 都没问题,tomcat 也应该予以支持。开发者认为这个 websocket 标准中没有这样的内容,如果有问题,不如去 websocket 社区提个标准。
这个 issuse 下边的其他回复还是有一些意义的,主要有以下两点:
- 12楼的 balusc 提到,可以用下面的方法来解决这个问题:
synchronized (session) { results.add(session.getAsyncRemote().sendText(text));}
- 这个 issue 的 reporter Simone Bordet 在下边提到:
I am frankly surprised that an expert group states that:
"[async writes] should not be permitted and an IllegalStateException be thrown. My reasoning for this is that was allowed it would open up all sorts of buffering requirements for the container that could get tricky to manage."
Now those sorts of tricky buffering requirements must be managed by the application, with almost 100% chance that average joe websocket get them wrong. For a simple chat application, an application developer must manage the buffering and possibly stack overflows due to callback invocations.
Containers should solve those problems on behalf of the application writers, otherwise one could say that transaction management or ORM mapping are too tricky to manage and better left to JEE application writers.
对于 1,又回到了我们问题的根源上,这种做法虽然可以解决问题,但是带来了性能问题。我想我们程序里的那段代码开发者可能也是查到这个 issue 然后按照 balusc 的提议修改了代码,做了功能测试但是未做性能测试,由此可见,性能测试也是非常重要的事情啊。
对于 2,下边有人追问了这个 tricky buffering
到底是个啥,但是 Simone Bordet 本人似乎也不是很清楚,并且他认为这个 tricky buffering
应该由容器开发者来维护,应用开发者不该操心这个。
但是从这个日期(2014-01-17)和 issue 的版本(8.0.x-trunk)对比我写下这篇博客(2018-02-08,tomcat embeded 8.5.15)来看,tomcat 开发者的态度还是很坚决的。。
我一开始也不清楚这个 tricky buffering
到底是个啥,但是,我想出了自己的解决办法之后,突然意识到,这个就是 tricky buffering
啊!这是来自 expert group 的建议啊,可把我牛逼坏了,叉会儿腰再接着写。接下来,我会在第二篇文章里再讲讲这个 tricky buffering
的具体实现。