connector.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package tcp
  2. import (
  3. "log"
  4. "net"
  5. "rocommon"
  6. "rocommon/service"
  7. "rocommon/socket"
  8. "rocommon/util"
  9. "sync"
  10. "time"
  11. )
  12. // 连接器实现(启动时可能会有多个连接器)
  13. type tcpConnector struct {
  14. socket.NetRuntimeTag //运行状态
  15. socket.NetTCPSocketOption //socket相关设置
  16. socket.NetProcessorRPC //事件处理相关
  17. socket.NetServerNodeProperty
  18. socket.SessionManager //会话管理
  19. socket.NetContextSet
  20. connNum int //重连次数
  21. reconnectTime time.Duration
  22. wg sync.WaitGroup
  23. //连接会话
  24. sess *tcpSession
  25. }
  26. func (c *tcpConnector) connect(addr string) {
  27. c.SetRuneState(true) //true表示正在运行
  28. for {
  29. c.connNum++
  30. if c.connNum > 1 {
  31. var preDesc *service.ETCDServiceDesc
  32. c.RawContextData("sid", &preDesc)
  33. if preDesc != nil {
  34. //preDesc 目标节点信息
  35. util.DebugF("[tcpConnector] connect begin=%v sid=%v[%v]", addr, preDesc.ID, c.connNum-1)
  36. }
  37. }
  38. conn, err := net.Dial("tcp", addr)
  39. if err != nil {
  40. //log.Println("dail err:", err)
  41. if c.reconnectTime == 0 || c.GetCloseFlag() {
  42. //连接出错事件
  43. util.ErrorF("tcpConnector err=%v", err.Error())
  44. c.ProcEvent(&rocommon.RecvMsgEvent{Sess: c.sess, Message: &rocommon.SessionConnectError{}, Err: err})
  45. break
  46. }
  47. select {
  48. case <-time.After(c.reconnectTime):
  49. continue
  50. }
  51. }
  52. if c.connNum > 1 {
  53. var preDesc *service.ETCDServiceDesc
  54. c.RawContextData("sid", &preDesc)
  55. if preDesc != nil {
  56. //preDesc 目标节点信息
  57. util.DebugF("[tcpConnector] connect success:%v sid=%v[%v]", addr, preDesc.ID, c.connNum-1)
  58. }
  59. }
  60. if c.GetCloseFlag() {
  61. util.DebugF("[tcpConnector] connect success but be stoped by new node:%v [%v]", addr, c.connNum-1)
  62. break
  63. }
  64. c.wg.Add(1)
  65. //设置socket选项
  66. c.SocketOpt(conn)
  67. c.connNum = 0
  68. c.sess.setConn(conn)
  69. //放到session管理器中
  70. c.sess.Start()
  71. //连接事件
  72. c.ProcEvent(&rocommon.RecvMsgEvent{Sess: c.sess, Message: &rocommon.SessionConnected{}})
  73. c.wg.Wait()
  74. c.sess.setConn(nil)
  75. if c.reconnectTime == 0 || c.GetCloseFlag() {
  76. break
  77. }
  78. //sleep reconnectTime
  79. select {
  80. case <-time.After(c.reconnectTime):
  81. continue
  82. }
  83. }
  84. c.SetRuneState(false)
  85. //todo... 在调用stop后需要处理
  86. if c.GetCloseFlag() {
  87. c.StopWg.Done()
  88. }
  89. util.InfoF("connector stop...")
  90. }
  91. // interface ServerNode
  92. func (c *tcpConnector) Start() rocommon.ServerNode {
  93. c.StopWg.Wait()
  94. if c.GetRuneState() {
  95. return c
  96. }
  97. go c.connect(c.GetAddr())
  98. return c
  99. }
  100. func (c *tcpConnector) Stop() {
  101. if !c.GetRuneState() {
  102. return
  103. }
  104. c.SetCloseFlag(true)
  105. c.StopWg.Add(1)
  106. c.sess.Close()
  107. c.StopWg.Wait()
  108. }
  109. func (c *tcpConnector) TypeOfName() string {
  110. return "tcpConnector"
  111. }
  112. func (c *tcpConnector) Session() rocommon.Session {
  113. return c.sess
  114. }
  115. func (c *tcpConnector) SetReconnectTime(delta time.Duration) {
  116. //调试模式下重连不生效
  117. if service.DebugMode {
  118. return
  119. }
  120. c.reconnectTime = delta
  121. }
  122. func init() {
  123. log.Println("tcpConnector server node register")
  124. socket.RegisterServerNode(func() rocommon.ServerNode {
  125. node := new(tcpConnector)
  126. node.SessionManager = socket.NewNetSessionManager()
  127. node.sess = newTcpSession(nil, node, func() {
  128. node.wg.Done()
  129. })
  130. node.NetTCPSocketOption.Init()
  131. return node
  132. })
  133. }