acceptor.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package tcp
  2. import (
  3. "context"
  4. "log"
  5. "net"
  6. "rocommon"
  7. "rocommon/socket"
  8. "rocommon/util"
  9. "time"
  10. )
  11. // 监听器实现(启动时可能会有多个连接器)
  12. type tcpAcceptor struct {
  13. socket.NetRuntimeTag //运行状态
  14. socket.NetTCPSocketOption //socket相关设置
  15. socket.NetProcessorRPC //事件处理相关
  16. socket.SessionManager //会话管理
  17. socket.NetServerNodeProperty
  18. socket.NetContextSet
  19. listener net.Listener
  20. }
  21. // //interface ServerNode
  22. func (this *tcpAcceptor) Start() rocommon.ServerNode {
  23. //正在停止先等待
  24. this.StopWg.Wait()
  25. //防止重入导致错误
  26. if this.GetRuneState() {
  27. return this
  28. }
  29. //https://github.com/gogf/greuse/blob/master/greuse.go
  30. var listenCfg = net.ListenConfig{Control: Control}
  31. ln, err := listenCfg.Listen(context.Background(), "tcp", this.GetAddr())
  32. //ln, err := net.Listen("tcpa", this.GetAddr())
  33. if err != nil {
  34. util.PanicF("tcpAcceptor listen failure=%v", err)
  35. }
  36. this.listener = ln
  37. util.InfoF("tcpAcceptor listen success=%v", this.GetAddr())
  38. go this.tcpAccept()
  39. return this
  40. }
  41. func (this *tcpAcceptor) tcpAccept() {
  42. this.SetRuneState(true)
  43. for {
  44. conn, err := this.listener.Accept()
  45. //结束中
  46. if this.GetCloseFlag() {
  47. break
  48. }
  49. if err != nil {
  50. if ne, ok := err.(net.Error); ok && ne.Temporary() {
  51. select {
  52. case <-time.After(time.Millisecond): //尝试重新获取连接
  53. continue
  54. }
  55. }
  56. util.InfoF("[tcpAcceptor] accept err:%v", err)
  57. break
  58. }
  59. //util.DebugF("accept ok:%v", conn)
  60. this.SocketOpt(conn) //option 设置
  61. func() {
  62. session := newTcpSession(conn, this, nil)
  63. //util.InfoF("[tcpAcceptor] accept session:start:%v", session)
  64. session.Start()
  65. //通知上层事件(这边的回调要放到队列中,否则会有多线程冲突)
  66. this.ProcEvent(&rocommon.RecvMsgEvent{Sess: session, Message: &rocommon.SessionAccepted{}})
  67. }()
  68. }
  69. this.SetRuneState(false)
  70. this.SetCloseFlag(false)
  71. this.StopWg.Done()
  72. }
  73. func (this *tcpAcceptor) Stop() {
  74. if !this.GetRuneState() {
  75. return
  76. }
  77. this.StopWg.Add(1)
  78. this.SetCloseFlag(true)
  79. this.listener.Close()
  80. //关闭当前监听服务器的所有连接
  81. this.CloseAllSession()
  82. //等待协程结束
  83. this.StopWg.Wait()
  84. }
  85. func (this *tcpAcceptor) TypeOfName() string {
  86. return "tcpAcceptor"
  87. }
  88. func init() {
  89. log.Println("tcpAcceptor server node register")
  90. socket.RegisterServerNode(func() rocommon.ServerNode {
  91. node := &tcpAcceptor{
  92. SessionManager: socket.NewNetSessionManager(),
  93. }
  94. node.NetTCPSocketOption.Init()
  95. return node
  96. })
  97. }