acceptor.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package tcp
  2. import (
  3. "context"
  4. "log"
  5. "net"
  6. "rocommon"
  7. "rocommon/socket"
  8. "rocommon/util"
  9. "time"
  10. )
  11. // 监听器实现(启动时可能会有多个连接器)
  12. type tcpAcceptor struct {
  13. socket.NetRuntimeTag //运行状态
  14. socket.NetTCPSocketOption //socket相关设置
  15. socket.NetProcessorRPC //事件处理相关
  16. socket.SessionManager //会话管理
  17. socket.NetServerNodeProperty
  18. socket.NetContextSet
  19. listener net.Listener
  20. }
  21. // //interface ServerNode
  22. func (this *tcpAcceptor) Start() rocommon.ServerNode {
  23. //正在停止先等待
  24. this.StopWg.Wait()
  25. //防止重入导致错误
  26. if this.GetRuneState() {
  27. return this
  28. }
  29. //https://github.com/gogf/greuse/blob/master/greuse.go
  30. var listenCfg = net.ListenConfig{Control: Control}
  31. util.InfoF("tcpAcceptor GetAddr:%v", this.GetAddr())
  32. ln, err := listenCfg.Listen(context.Background(), "tcp", this.GetAddr())
  33. //ln, err := net.Listen("tcp", this.GetAddr())
  34. if err != nil {
  35. util.PanicF("tcpAcceptor listen failure=%v", err)
  36. }
  37. this.listener = ln
  38. util.InfoF("tcpAcceptor listen success=%v", this.GetAddr())
  39. go this.tcpAccept()
  40. return this
  41. }
  42. func (this *tcpAcceptor) tcpAccept() {
  43. this.SetRuneState(true)
  44. for {
  45. conn, err := this.listener.Accept()
  46. util.InfoF("Accept successful:")
  47. //结束中
  48. if this.GetCloseFlag() {
  49. break
  50. }
  51. if err != nil {
  52. if ne, ok := err.(net.Error); ok && ne.Temporary() {
  53. select {
  54. case <-time.After(time.Millisecond): //尝试重新获取连接
  55. continue
  56. }
  57. }
  58. util.InfoF("[tcpAcceptor] accept err:%v", err)
  59. break
  60. }
  61. util.InfoF("accept ok:%v", conn)
  62. this.SocketOpt(conn) //option 设置
  63. func() {
  64. session := newTcpSession(conn, this, nil)
  65. //util.InfoF("[tcpAcceptor] accept session:start:%v", session)
  66. session.Start()
  67. //通知上层事件(这边的回调要放到队列中,否则会有多线程冲突)
  68. this.ProcEvent(&rocommon.RecvMsgEvent{Sess: session, Message: &rocommon.SessionAccepted{}})
  69. }()
  70. }
  71. this.SetRuneState(false)
  72. this.SetCloseFlag(false)
  73. this.StopWg.Done()
  74. }
  75. func (this *tcpAcceptor) Stop() {
  76. if !this.GetRuneState() {
  77. return
  78. }
  79. this.StopWg.Add(1)
  80. this.SetCloseFlag(true)
  81. this.listener.Close()
  82. //关闭当前监听服务器的所有连接
  83. this.CloseAllSession()
  84. //等待协程结束
  85. this.StopWg.Wait()
  86. }
  87. func (this *tcpAcceptor) TypeOfName() string {
  88. return "tcpAcceptor"
  89. }
  90. func init() {
  91. log.Println("tcpAcceptor server node register")
  92. socket.RegisterServerNode(func() rocommon.ServerNode {
  93. node := &tcpAcceptor{
  94. SessionManager: socket.NewNetSessionManager(),
  95. }
  96. node.NetTCPSocketOption.Init()
  97. return node
  98. })
  99. }