| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- package websocket
- import (
- "context"
- "github.com/gorilla/websocket"
- "log"
- "net"
- "net/http"
- "rocommon"
- "rocommon/socket"
- "rocommon/util"
- )
- //监听器实现(启动时可能会有多个连接器)
- type tcpWebSocketAcceptor struct {
- socket.NetRuntimeTag //运行状态
- socket.NetTCPSocketOption //socket相关设置
- socket.NetProcessorRPC //事件处理相关
- socket.SessionManager //会话管理
- socket.NetServerNodeProperty
- socket.NetContextSet
- // 保存端口
- listener net.Listener
- certfile string
- keyfile string
- upgrader *websocket.Upgrader
- sv *http.Server
- }
- func (this *tcpWebSocketAcceptor) TypeOfName() string {
- return "wsAcceptor"
- }
- func (this *tcpWebSocketAcceptor) SetHttps(certfile, keyfile string) {
- this.certfile = certfile
- this.keyfile = keyfile
- }
- func (this *tcpWebSocketAcceptor) Start() rocommon.ServerNode {
- //正在停止先等待
- this.StopWg.Wait()
- //防止重入导致错误
- if this.GetRuneState() {
- return this
- }
- //https://github.com/gogf/greuse/blob/master/greuse.go
- var listenCfg = net.ListenConfig{Control: Control}
- ln, err := listenCfg.Listen(context.Background(), "tcp", this.GetAddr())
- //ln, err := net.Listen("tcp", this.GetAddr())
- if err != nil {
- util.PanicF("webSocketAcceptor listen failure=%v", err)
- }
- this.listener = ln
- util.InfoF("webSocketAcceptor listen success=%v", this.GetAddr())
- //process
- //结束中
- if this.GetCloseFlag() {
- return this
- }
- this.SetRuneState(true)
- mux := http.NewServeMux()
- mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
- conn, err := this.upgrader.Upgrade(w, r, nil)
- if err != nil {
- util.InfoF("[webSocketAcceptor] accept err=%v", err)
- return
- }
- this.SocketOptWebSocket(conn) //option 设置
- session := newWebSocketSession(conn, this, nil)
- session.SetContextData("request", r, "newWebSocketSession") //获取request相关信息
- //util.InfoF("[tcpAcceptor] accept session:start:%v", session)
- session.Start()
- //通知上层事件(这边的回调要放到队列中,否则会有多线程冲突)
- this.ProcEvent(&rocommon.RecvMsgEvent{Sess: session, Message: &rocommon.SessionAccepted{}})
- })
- this.sv = &http.Server{Addr: this.GetAddr(), Handler: mux}
- go func() {
- util.InfoF("ws.listen(%s) %s", this.GetName(), this.GetAddr())
- if this.certfile != "" && this.keyfile != "" {
- err = this.sv.ServeTLS(this.listener, this.certfile, this.keyfile)
- } else {
- err = this.sv.Serve(this.listener)
- }
- //服务关闭时会打印
- if err != nil {
- util.ErrorF("ws.listen. failed(%s) %v", this.GetName(), err.Error())
- }
- this.SetRuneState(false)
- this.SetCloseFlag(false)
- this.StopWg.Done()
- }()
- return this
- }
- func (this *tcpWebSocketAcceptor) Stop() {
- if !this.GetRuneState() {
- return
- }
- this.StopWg.Add(1)
- this.SetCloseFlag(true)
- this.listener.Close()
- //关闭当前监听服务器的所有连接
- this.CloseAllSession()
- //等待协程结束
- this.StopWg.Wait()
- }
- func init() {
- log.Println("webSocketAcceptor server node register")
- socket.RegisterServerNode(func() rocommon.ServerNode {
- node := &tcpWebSocketAcceptor{
- SessionManager: socket.NewNetSessionManager(),
- upgrader: &websocket.Upgrader{
- CheckOrigin: func(r *http.Request) bool {
- return true
- },
- },
- }
- node.NetTCPSocketOption.Init()
- return node
- })
- }
|