session.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. package websocket
  2. import (
  3. "rocommon"
  4. "rocommon/socket"
  5. tcpBase "rocommon/socket/tcp"
  6. "rocommon/util"
  7. "runtime/debug"
  8. "sync"
  9. "sync/atomic"
  10. "github.com/gorilla/websocket"
  11. )
  12. // Session interface def.go
  13. type wsSession struct {
  14. sync.Mutex
  15. tcpBase.SessionIdentify //添加到SessionManager中时会设置tcpSession的ID属性
  16. *socket.NetProcessorRPC //事件处理相关 procrpc.go
  17. socket.NetContextSet //记录session绑定信息 nodeproperty.go
  18. node rocommon.ServerNode
  19. //net.Conn
  20. conn *websocket.Conn
  21. sendQueue chan interface{}
  22. exitWg sync.WaitGroup
  23. endCallback func()
  24. closeInt int64
  25. aesMutex sync.RWMutex
  26. aesStr []byte
  27. handCodeMutex sync.RWMutex
  28. handCodeStr string
  29. sessionOpt socket.NetTCPSocketOption
  30. optMutex sync.RWMutex
  31. sessionOptFlag bool //是否启用无读无写超时处理
  32. sendQueueMaxLen int
  33. }
  34. func (this *wsSession) GetSessionOpt() interface{} {
  35. return &this.sessionOpt
  36. }
  37. func (this *wsSession) GetSessionOptFlag() bool {
  38. this.optMutex.RLock()
  39. defer this.optMutex.RUnlock()
  40. return this.sessionOptFlag
  41. }
  42. func (this *wsSession) SetSessionOptFlag(flag bool) {
  43. this.optMutex.Lock()
  44. defer this.optMutex.Unlock()
  45. this.sessionOptFlag = flag
  46. }
  47. func (this *wsSession) setConn(c *websocket.Conn) {
  48. this.Lock()
  49. defer this.Unlock()
  50. this.conn = c
  51. }
  52. func (this *wsSession) GetConn() *websocket.Conn {
  53. this.Lock()
  54. defer this.Unlock()
  55. return this.conn
  56. }
  57. func (this *wsSession) Raw() interface{} {
  58. return this.GetConn()
  59. }
  60. func (this *wsSession) Node() rocommon.ServerNode {
  61. return this.node
  62. }
  63. func (this *wsSession) GetAES() *[]byte {
  64. this.aesMutex.RLock()
  65. defer this.aesMutex.RUnlock()
  66. return &this.aesStr
  67. }
  68. func (this *wsSession) SetAES(aes string) {
  69. this.aesMutex.Lock()
  70. defer this.aesMutex.Unlock()
  71. this.aesStr = []byte(aes)
  72. //log.Println("SetAES:", aes)
  73. }
  74. func (this *wsSession) GetHandCode() string {
  75. this.handCodeMutex.RLock()
  76. defer this.handCodeMutex.RUnlock()
  77. return this.handCodeStr
  78. }
  79. func (this *wsSession) SetHandCode(code string) {
  80. this.handCodeMutex.Lock()
  81. defer this.handCodeMutex.Unlock()
  82. this.handCodeStr = code
  83. //log.Println("SetAES:", aes)
  84. }
  85. func (this *wsSession) IncRecvPingNum(incNum int) {
  86. }
  87. func (this *wsSession) RecvPingNum() int {
  88. return 0
  89. }
  90. var sendQueueMaxLen = 2000
  91. var sendQueuePool = sync.Pool{
  92. New: func() interface{} {
  93. return make(chan interface{}, sendQueueMaxLen+1)
  94. },
  95. }
  96. func (this *wsSession) Start() {
  97. atomic.StoreInt64(&this.closeInt, 0)
  98. //重置发送队列
  99. this.sendQueueMaxLen = sendQueueMaxLen
  100. if this.node.(rocommon.ServerNodeProperty).GetName() == "gate" {
  101. this.sendQueueMaxLen = 200
  102. }
  103. this.sendQueue = make(chan interface{}, this.sendQueueMaxLen+1)
  104. //this.sendQueue = make(chan interface{}, 32) //todo..暂时默认发送队列长度2000
  105. //this.sendQueue = make(chan interface{}, sendQueueMaxLen+1) //todo..暂时默认发送队列长度2000
  106. //this.sendQueue = sendQueuePool.Get().(chan interface{})
  107. this.exitWg.Add(2)
  108. //this.node tcpAcceptor
  109. this.node.(socket.SessionManager).Add(this) //添加到session管理器中
  110. if this.node.TypeOfName() == "wsAcceptor" {
  111. //log.Println("sessionMagNum:", this.node.(socket.SessionManager).SessionNum())
  112. }
  113. go func() {
  114. this.exitWg.Wait()
  115. //结束操作处理
  116. close(this.sendQueue)
  117. //sendQueuePool.Put(this.sendQueue)
  118. this.node.(socket.SessionManager).Remove(this)
  119. if this.endCallback != nil {
  120. this.endCallback()
  121. }
  122. //debug.FreeOSMemory()
  123. }()
  124. go this.RunRecv()
  125. go this.RunSend()
  126. }
  127. func (this *wsSession) Close() {
  128. //已经关闭
  129. if ok := atomic.SwapInt64(&this.closeInt, 1); ok != 0 {
  130. return
  131. }
  132. conn := this.GetConn()
  133. if conn != nil {
  134. //conn.Close()
  135. //关闭读
  136. conn.Close()
  137. conn.CloseHandler()
  138. }
  139. //util.InfoF("close session")
  140. }
  141. func (this *wsSession) Send(msg interface{}) {
  142. //已经关闭
  143. if atomic.LoadInt64(&this.closeInt) != 0 {
  144. return
  145. }
  146. //this.sendQueue <- msg
  147. sendLen := len(this.sendQueue)
  148. util.ErrorF("wsSession sendQueue len:%v", sendLen)
  149. if sendLen < sendQueueMaxLen {
  150. this.sendQueue <- msg
  151. return
  152. }
  153. util.ErrorF("SendLen-sendQueue=%v addr=%v", sendLen, this.conn.LocalAddr())
  154. }
  155. // 服务器进程之前启用ping操作
  156. func (this *wsSession) HeartBeat(msg interface{}) {
  157. //已经关闭
  158. if atomic.LoadInt64(&this.closeInt) != 0 {
  159. return
  160. }
  161. }
  162. func (this *wsSession) RunRecv() {
  163. // util.DebugF("start RunRecv goroutine")
  164. defer func() {
  165. //打印奔溃信息
  166. //if err := recover(); err != nil {
  167. // this.onError(err)
  168. //}
  169. //util.InfoF("Stack---:\n%s\n", string(debug.Stack()))
  170. //打印堆栈信息
  171. if err := recover(); err != nil {
  172. debug.PrintStack()
  173. }
  174. }()
  175. for {
  176. msg, seqId, err := this.ReadMsg(this) //procrpc.go
  177. if err != nil {
  178. util.ErrorF("Readmsg-RunRecv error=%v", err)
  179. //这边需要加锁,避免主线程继续在closInt还未设置成断开时还继续往session写数据,导致多线程冲突
  180. //this.Lock()
  181. //做关闭处理,发送数据时已经无法进行发送
  182. atomic.StoreInt64(&this.closeInt, 1)
  183. //close(this.sendQueue) //用来退出写协程
  184. this.sendQueue <- nil //用来退出写协程
  185. //this.Unlock()
  186. //抛出错误事件
  187. this.ProcEvent(&rocommon.RecvMsgEvent{Sess: this, Message: &rocommon.SessionClosed{}, Err: err})
  188. //todo...或者通过关闭sendQueue来实现关闭
  189. break
  190. }
  191. //接收数据事件放到队列中(需要放到队列中,否则会有线程冲突)
  192. this.ProcEvent(&rocommon.RecvMsgEvent{Sess: this, Message: msg, Err: nil, MsgSeqId: seqId, KvTime: util.GetTimeMilliseconds()})
  193. //this.ProcEvent(&rocommon.RecvMsgEvent{Sess: this, Message: msg, Err: nil, MsgSeqId: seqId})
  194. }
  195. util.DebugF("exit RunRecv goroutine addr=%v", this.conn.LocalAddr())
  196. this.exitWg.Done()
  197. }
  198. func (this *wsSession) RunSend() {
  199. //util.DebugF("start RunSend goroutine")
  200. defer func() {
  201. //打印奔溃信息
  202. //if err := recover(); err != nil {
  203. // this.onError(err)
  204. //}
  205. //util.InfoF("Stack---:\n%s\n", string(debug.Stack()))
  206. //打印堆栈信息
  207. if err := recover(); err != nil {
  208. debug.PrintStack()
  209. }
  210. }()
  211. //放到另外的队列中
  212. for data := range this.sendQueue {
  213. if data == nil {
  214. break
  215. }
  216. err := this.SendMsg(&rocommon.SendMsgEvent{Sess: this, Message: data})
  217. //err := this.SendMsg(this, data) //procrpc.go
  218. if err != nil {
  219. util.ErrorF("SendMsg RunSend error %v", err)
  220. break
  221. }
  222. }
  223. util.DebugF("exit RunSend goroutine addr=%v", this.conn.LocalAddr())
  224. c := this.GetConn()
  225. if c != nil {
  226. c.Close()
  227. }
  228. this.exitWg.Done()
  229. }
  230. ///////////////////////
  231. //acceptor中获取到连接后创建session使用
  232. func newWebSocketSession(conn *websocket.Conn, node rocommon.ServerNode, endCallback func()) *wsSession {
  233. session := &wsSession{
  234. conn: conn,
  235. node: node,
  236. endCallback: endCallback,
  237. NetProcessorRPC: node.(interface {
  238. GetRPC() *socket.NetProcessorRPC
  239. }).GetRPC(), //使用外层node的RPC处理接口
  240. }
  241. node.(socket.SocketOption).CopyOpt(&session.sessionOpt)
  242. return session
  243. }