acceptor.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package websocket
  2. import (
  3. "context"
  4. "github.com/gorilla/websocket"
  5. "log"
  6. "net"
  7. "net/http"
  8. "rocommon"
  9. "rocommon/socket"
  10. "rocommon/util"
  11. )
  12. //监听器实现(启动时可能会有多个连接器)
  13. type tcpWebSocketAcceptor struct {
  14. socket.NetRuntimeTag //运行状态
  15. socket.NetTCPSocketOption //socket相关设置
  16. socket.NetProcessorRPC //事件处理相关
  17. socket.SessionManager //会话管理
  18. socket.NetServerNodeProperty
  19. socket.NetContextSet
  20. // 保存端口
  21. listener net.Listener
  22. certfile string
  23. keyfile string
  24. upgrader *websocket.Upgrader
  25. sv *http.Server
  26. }
  27. func (this *tcpWebSocketAcceptor) TypeOfName() string {
  28. return "wsAcceptor"
  29. }
  30. func (this *tcpWebSocketAcceptor) SetHttps(certfile, keyfile string) {
  31. this.certfile = certfile
  32. this.keyfile = keyfile
  33. }
  34. func (this *tcpWebSocketAcceptor) Start() rocommon.ServerNode {
  35. //正在停止先等待
  36. this.StopWg.Wait()
  37. //防止重入导致错误
  38. if this.GetRuneState() {
  39. return this
  40. }
  41. //https://github.com/gogf/greuse/blob/master/greuse.go
  42. var listenCfg = net.ListenConfig{Control: Control}
  43. ln, err := listenCfg.Listen(context.Background(), "tcp", this.GetAddr())
  44. //ln, err := net.Listen("tcp", this.GetAddr())
  45. if err != nil {
  46. util.PanicF("webSocketAcceptor listen failure=%v", err)
  47. }
  48. this.listener = ln
  49. util.InfoF("webSocketAcceptor listen success=%v", this.GetAddr())
  50. //process
  51. //结束中
  52. if this.GetCloseFlag() {
  53. return this
  54. }
  55. this.SetRuneState(true)
  56. mux := http.NewServeMux()
  57. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
  58. conn, err := this.upgrader.Upgrade(w, r, nil)
  59. if err != nil {
  60. util.InfoF("[webSocketAcceptor] accept err=%v", err)
  61. return
  62. }
  63. this.SocketOptWebSocket(conn) //option 设置
  64. session := newWebSocketSession(conn, this, nil)
  65. session.SetContextData("request", r, "newWebSocketSession") //获取request相关信息
  66. //util.InfoF("[tcpAcceptor] accept session:start:%v", session)
  67. session.Start()
  68. //通知上层事件(这边的回调要放到队列中,否则会有多线程冲突)
  69. this.ProcEvent(&rocommon.RecvMsgEvent{Sess: session, Message: &rocommon.SessionAccepted{}})
  70. })
  71. this.sv = &http.Server{Addr: this.GetAddr(), Handler: mux}
  72. go func() {
  73. util.InfoF("ws.listen(%s) %s", this.GetName(), this.GetAddr())
  74. if this.certfile != "" && this.keyfile != "" {
  75. err = this.sv.ServeTLS(this.listener, this.certfile, this.keyfile)
  76. } else {
  77. err = this.sv.Serve(this.listener)
  78. }
  79. //服务关闭时会打印
  80. if err != nil {
  81. util.ErrorF("ws.listen. failed(%s) %v", this.GetName(), err.Error())
  82. }
  83. this.SetRuneState(false)
  84. this.SetCloseFlag(false)
  85. this.StopWg.Done()
  86. }()
  87. return this
  88. }
  89. func (this *tcpWebSocketAcceptor) Stop() {
  90. if !this.GetRuneState() {
  91. return
  92. }
  93. this.StopWg.Add(1)
  94. this.SetCloseFlag(true)
  95. this.listener.Close()
  96. //关闭当前监听服务器的所有连接
  97. this.CloseAllSession()
  98. //等待协程结束
  99. this.StopWg.Wait()
  100. }
  101. func init() {
  102. log.Println("webSocketAcceptor server node register")
  103. socket.RegisterServerNode(func() rocommon.ServerNode {
  104. node := &tcpWebSocketAcceptor{
  105. SessionManager: socket.NewNetSessionManager(),
  106. upgrader: &websocket.Upgrader{
  107. CheckOrigin: func(r *http.Request) bool {
  108. return true
  109. },
  110. },
  111. }
  112. node.NetTCPSocketOption.Init()
  113. return node
  114. })
  115. }