session.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335
  1. package tcp
  2. import (
  3. "net"
  4. "rocommon"
  5. "rocommon/socket"
  6. "rocommon/util"
  7. "runtime/debug"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. )
  12. type SessionIdentify struct {
  13. id uint64
  14. }
  15. func (this *SessionIdentify) ID() uint64 {
  16. return this.id
  17. }
  18. func (this *SessionIdentify) SetID(id uint64) {
  19. this.id = id
  20. }
  21. //Session interface def.go
  22. type tcpSession struct {
  23. sync.Mutex
  24. SessionIdentify //添加到SessionManager中时会设置tcpSession的ID属性
  25. *socket.NetProcessorRPC //事件处理相关 procrpc.go
  26. socket.NetContextSet //记录session绑定信息 nodeproperty.go
  27. node rocommon.ServerNode
  28. //net.Conn
  29. conn net.Conn
  30. sendQueue chan interface{}
  31. exitWg sync.WaitGroup
  32. endCallback func()
  33. closeInt int64
  34. aesMutex sync.RWMutex
  35. aesStr []byte
  36. handCodeMutex sync.RWMutex
  37. handCodeStr string
  38. sessionOpt socket.NetTCPSocketOption
  39. optMutex sync.RWMutex
  40. sessionOptFlag bool //是否启用无读无写超时处理
  41. sendQueueMaxLen int
  42. recvPingNum int
  43. }
  44. func (this *tcpSession) GetSessionOpt() interface{} {
  45. return &this.sessionOpt
  46. }
  47. func (this *tcpSession) GetSessionOptFlag() bool {
  48. this.optMutex.RLock()
  49. defer this.optMutex.RUnlock()
  50. return this.sessionOptFlag
  51. }
  52. func (this *tcpSession) SetSessionOptFlag(flag bool) {
  53. this.optMutex.Lock()
  54. defer this.optMutex.Unlock()
  55. this.sessionOptFlag = flag
  56. }
  57. func (this *tcpSession) setConn(c net.Conn) {
  58. this.Lock()
  59. defer this.Unlock()
  60. this.conn = c
  61. }
  62. func (this *tcpSession) GetConn() net.Conn {
  63. this.Lock()
  64. defer this.Unlock()
  65. return this.conn
  66. }
  67. func (this *tcpSession) Raw() interface{} {
  68. return this.GetConn()
  69. }
  70. func (this *tcpSession) Node() rocommon.ServerNode {
  71. return this.node
  72. }
  73. func (this *tcpSession) GetAES() *[]byte {
  74. this.aesMutex.RLock()
  75. defer this.aesMutex.RUnlock()
  76. return &this.aesStr
  77. }
  78. func (this *tcpSession) SetAES(aes string) {
  79. this.aesMutex.Lock()
  80. defer this.aesMutex.Unlock()
  81. this.aesStr = []byte(aes)
  82. //log.Println("SetAES:", aes)
  83. }
  84. func (this *tcpSession) GetHandCode() string {
  85. this.handCodeMutex.RLock()
  86. defer this.handCodeMutex.RUnlock()
  87. return this.handCodeStr
  88. }
  89. func (this *tcpSession) SetHandCode(code string) {
  90. this.handCodeMutex.Lock()
  91. defer this.handCodeMutex.Unlock()
  92. this.handCodeStr = code
  93. //log.Println("SetAES:", aes)
  94. }
  95. func (this *tcpSession) IncRecvPingNum(incNum int) {
  96. if incNum <= 0 {
  97. this.recvPingNum = incNum
  98. } else {
  99. this.recvPingNum += incNum
  100. }
  101. }
  102. func (this *tcpSession) RecvPingNum() int {
  103. return this.recvPingNum
  104. }
  105. var sendQueueMaxLen = 2000
  106. var sendQueuePool = sync.Pool{
  107. New: func() interface{} {
  108. return make(chan interface{}, sendQueueMaxLen+1)
  109. },
  110. }
  111. func (this *tcpSession) Start() {
  112. atomic.StoreInt64(&this.closeInt, 0)
  113. //重置发送队列
  114. this.sendQueueMaxLen = sendQueueMaxLen
  115. if this.node.(rocommon.ServerNodeProperty).GetName() == "gate" {
  116. this.sendQueueMaxLen = 200
  117. }
  118. this.sendQueue = make(chan interface{}, this.sendQueueMaxLen+1)
  119. //this.sendQueue = make(chan interface{}, 32) //todo..暂时默认发送队列长度2000
  120. //this.sendQueue = make(chan interface{}, sendQueueMaxLen+1) //todo..暂时默认发送队列长度2000
  121. //this.sendQueue = sendQueuePool.Get().(chan interface{})
  122. this.exitWg.Add(2)
  123. //this.node tcpAcceptor
  124. this.node.(socket.SessionManager).Add(this) //添加到session管理器中
  125. if this.node.TypeOfName() == "tcpAcceptor" {
  126. //log.Println("sessionMagNum:", this.node.(socket.SessionManager).SessionNum())
  127. }
  128. go func() {
  129. this.exitWg.Wait()
  130. //结束操作处理
  131. close(this.sendQueue)
  132. //sendQueuePool.Put(this.sendQueue)
  133. this.node.(socket.SessionManager).Remove(this)
  134. if this.endCallback != nil {
  135. this.endCallback()
  136. }
  137. //debug.FreeOSMemory()
  138. }()
  139. go this.RunRecv()
  140. go this.RunSend()
  141. }
  142. func (this *tcpSession) Close() {
  143. //已经关闭
  144. if ok := atomic.SwapInt64(&this.closeInt, 1); ok != 0 {
  145. return
  146. }
  147. conn := this.GetConn()
  148. if conn != nil {
  149. conn.Close()
  150. //关闭读
  151. //conn.(*net.TCPConn).CloseRead()
  152. }
  153. //util.InfoF("close session")
  154. }
  155. func (this *tcpSession) Send(msg interface{}) {
  156. //已经关闭
  157. if atomic.LoadInt64(&this.closeInt) != 0 {
  158. util.ErrorF("SendLen-sendQueue closeInt connId=%v", this.id)
  159. return
  160. }
  161. //this.sendQueue <- msg
  162. sendLen := len(this.sendQueue)
  163. if sendLen < sendQueueMaxLen {
  164. this.sendQueue <- msg
  165. return
  166. }
  167. util.ErrorF("SendLen-sendQueue=%v addr=%v", sendLen, this.conn.LocalAddr())
  168. }
  169. //服务器进程之前启用ping操作
  170. func (this *tcpSession) HeartBeat(msg interface{}) {
  171. //已经关闭
  172. if atomic.LoadInt64(&this.closeInt) != 0 {
  173. return
  174. }
  175. go func() {
  176. tmpMsg := msg
  177. delayTimer := time.NewTimer(15 * time.Second)
  178. for {
  179. delayTimer.Reset(5 * time.Second)
  180. select {
  181. case <-delayTimer.C:
  182. if atomic.LoadInt64(&this.closeInt) != 0 {
  183. break
  184. }
  185. this.Send(tmpMsg)
  186. //util.InfoF("Send PingReq id=%v", this.ID())
  187. }
  188. }
  189. }()
  190. }
  191. func (this *tcpSession) RunRecv() {
  192. //util.DebugF("start RunRecv goroutine")
  193. defer func() {
  194. //打印奔溃信息
  195. //if err := recover(); err != nil {
  196. // this.onError(err)
  197. //}
  198. //util.InfoF("Stack---:\n%s\n", string(debug.Stack()))
  199. //打印堆栈信息
  200. if err := recover(); err != nil {
  201. debug.PrintStack()
  202. }
  203. }()
  204. recvCount := 0
  205. for {
  206. msg, seqId, err := this.ReadMsg(this) //procrpc.go
  207. if err != nil {
  208. util.ErrorF("Readmsg-RunRecv error=%v sessionId=%v", err, this.ID())
  209. //这边需要加锁,避免主线程继续在closInt还未设置成断开时还继续往session写数据,导致多线程冲突
  210. //this.Lock()
  211. //做关闭处理,发送数据时已经无法进行发送
  212. atomic.StoreInt64(&this.closeInt, 1)
  213. //close(this.sendQueue) //用来退出写协程
  214. this.sendQueue <- nil //用来退出写协程
  215. //this.Unlock()
  216. //抛出错误事件
  217. this.ProcEvent(&rocommon.RecvMsgEvent{Sess: this, Message: &rocommon.SessionClosed{}, Err: err})
  218. //todo...或者通过关闭sendQueue来实现关闭
  219. break
  220. }
  221. //接收数据事件放到队列中(需要放到队列中,否则会有线程冲突)
  222. this.ProcEvent(&rocommon.RecvMsgEvent{Sess: this, Message: msg, Err: nil, MsgSeqId: seqId, KvTime: util.GetTimeMilliseconds()})
  223. //this.ProcEvent(&rocommon.RecvMsgEvent{Sess: this, Message: msg, Err: nil, MsgSeqId: seqId})
  224. recvCount++
  225. if recvCount >= 100 {
  226. recvCount = 0
  227. //util.InfoF("RunRecv recCount sessionId=%v", this.ID())
  228. }
  229. }
  230. util.DebugF("exit RunRecv goroutine addr=%v remoteAddr=%v id=%v", this.conn.LocalAddr(), this.conn.RemoteAddr(), this.id)
  231. this.exitWg.Done()
  232. }
  233. func (this *tcpSession) RunSend() {
  234. //util.DebugF("start RunSend goroutine")
  235. defer func() {
  236. //打印奔溃信息
  237. //if err := recover(); err != nil {
  238. // this.onError(err)
  239. //}
  240. //util.InfoF("Stack---:\n%s\n", string(debug.Stack()))
  241. //打印堆栈信息
  242. if err := recover(); err != nil {
  243. debug.PrintStack()
  244. }
  245. }()
  246. sendCount := 0
  247. //放到另外的队列中c
  248. for data := range this.sendQueue {
  249. if data == nil {
  250. break
  251. }
  252. err := this.SendMsg(&rocommon.SendMsgEvent{Sess: this, Message: data})
  253. //err := this.SendMsg(this, data) //procrpc.go
  254. if err != nil {
  255. util.ErrorF("SendMsg RunSend error %v", err)
  256. break
  257. }
  258. sendCount++
  259. if sendCount >= 100 {
  260. sendCount = 0
  261. //util.InfoF("RunSend sendCount sessionId=%v", this.ID())
  262. }
  263. }
  264. util.DebugF("exit RunSend goroutine addr=%v remoteAddr=%v id=%v", this.conn.LocalAddr(), this.conn.RemoteAddr(), this.id)
  265. c := this.GetConn()
  266. if c != nil {
  267. c.Close()
  268. }
  269. this.exitWg.Done()
  270. }
  271. ///////////////////////
  272. //acceptor中获取到连接后创建session使用
  273. func newTcpSession(c net.Conn, node rocommon.ServerNode, endCallback func()) *tcpSession {
  274. session := &tcpSession{
  275. conn: c,
  276. node: node,
  277. endCallback: endCallback,
  278. NetProcessorRPC: node.(interface {
  279. GetRPC() *socket.NetProcessorRPC
  280. }).GetRPC(), //使用外层node的RPC处理接口
  281. }
  282. node.(socket.SocketOption).CopyOpt(&session.sessionOpt)
  283. return session
  284. }