| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- package websocket
- import (
- "rocommon"
- "rocommon/socket"
- tcpBase "rocommon/socket/tcp"
- "rocommon/util"
- "runtime/debug"
- "sync"
- "sync/atomic"
- "github.com/gorilla/websocket"
- )
- // Session interface def.go
- type wsSession struct {
- sync.Mutex
- tcpBase.SessionIdentify //添加到SessionManager中时会设置tcpSession的ID属性
- *socket.NetProcessorRPC //事件处理相关 procrpc.go
- socket.NetContextSet //记录session绑定信息 nodeproperty.go
- node rocommon.ServerNode
- //net.Conn
- conn *websocket.Conn
- sendQueue chan interface{}
- exitWg sync.WaitGroup
- endCallback func()
- closeInt int64
- aesMutex sync.RWMutex
- aesStr []byte
- handCodeMutex sync.RWMutex
- handCodeStr string
- sessionOpt socket.NetTCPSocketOption
- optMutex sync.RWMutex
- sessionOptFlag bool //是否启用无读无写超时处理
- sendQueueMaxLen int
- }
- func (this *wsSession) GetSessionOpt() interface{} {
- return &this.sessionOpt
- }
- func (this *wsSession) GetSessionOptFlag() bool {
- this.optMutex.RLock()
- defer this.optMutex.RUnlock()
- return this.sessionOptFlag
- }
- func (this *wsSession) SetSessionOptFlag(flag bool) {
- this.optMutex.Lock()
- defer this.optMutex.Unlock()
- this.sessionOptFlag = flag
- }
- func (this *wsSession) setConn(c *websocket.Conn) {
- this.Lock()
- defer this.Unlock()
- this.conn = c
- }
- func (this *wsSession) GetConn() *websocket.Conn {
- this.Lock()
- defer this.Unlock()
- return this.conn
- }
- func (this *wsSession) Raw() interface{} {
- return this.GetConn()
- }
- func (this *wsSession) Node() rocommon.ServerNode {
- return this.node
- }
- func (this *wsSession) GetAES() *[]byte {
- this.aesMutex.RLock()
- defer this.aesMutex.RUnlock()
- return &this.aesStr
- }
- func (this *wsSession) SetAES(aes string) {
- this.aesMutex.Lock()
- defer this.aesMutex.Unlock()
- this.aesStr = []byte(aes)
- //log.Println("SetAES:", aes)
- }
- func (this *wsSession) GetHandCode() string {
- this.handCodeMutex.RLock()
- defer this.handCodeMutex.RUnlock()
- return this.handCodeStr
- }
- func (this *wsSession) SetHandCode(code string) {
- this.handCodeMutex.Lock()
- defer this.handCodeMutex.Unlock()
- this.handCodeStr = code
- //log.Println("SetAES:", aes)
- }
- func (this *wsSession) IncRecvPingNum(incNum int) {
- }
- func (this *wsSession) RecvPingNum() int {
- return 0
- }
- var sendQueueMaxLen = 2000
- var sendQueuePool = sync.Pool{
- New: func() interface{} {
- return make(chan interface{}, sendQueueMaxLen+1)
- },
- }
- func (this *wsSession) Start() {
- atomic.StoreInt64(&this.closeInt, 0)
- //重置发送队列
- this.sendQueueMaxLen = sendQueueMaxLen
- if this.node.(rocommon.ServerNodeProperty).GetName() == "gate" {
- this.sendQueueMaxLen = 200
- }
- this.sendQueue = make(chan interface{}, this.sendQueueMaxLen+1)
- //this.sendQueue = make(chan interface{}, 32) //todo..暂时默认发送队列长度2000
- //this.sendQueue = make(chan interface{}, sendQueueMaxLen+1) //todo..暂时默认发送队列长度2000
- //this.sendQueue = sendQueuePool.Get().(chan interface{})
- this.exitWg.Add(2)
- //this.node tcpAcceptor
- this.node.(socket.SessionManager).Add(this) //添加到session管理器中
- if this.node.TypeOfName() == "wsAcceptor" {
- //log.Println("sessionMagNum:", this.node.(socket.SessionManager).SessionNum())
- }
- go func() {
- this.exitWg.Wait()
- //结束操作处理
- close(this.sendQueue)
- //sendQueuePool.Put(this.sendQueue)
- this.node.(socket.SessionManager).Remove(this)
- if this.endCallback != nil {
- this.endCallback()
- }
- //debug.FreeOSMemory()
- }()
- go this.RunRecv()
- go this.RunSend()
- }
- func (this *wsSession) Close() {
- //已经关闭
- if ok := atomic.SwapInt64(&this.closeInt, 1); ok != 0 {
- return
- }
- conn := this.GetConn()
- if conn != nil {
- //conn.Close()
- //关闭读
- conn.Close()
- conn.CloseHandler()
- }
- //util.InfoF("close session")
- }
- func (this *wsSession) Send(msg interface{}) {
- //已经关闭
- if atomic.LoadInt64(&this.closeInt) != 0 {
- return
- }
- //this.sendQueue <- msg
- sendLen := len(this.sendQueue)
- util.ErrorF("wsSession sendQueue len:%v", sendLen)
- if sendLen < sendQueueMaxLen {
- this.sendQueue <- msg
- return
- }
- util.ErrorF("SendLen-sendQueue=%v addr=%v", sendLen, this.conn.LocalAddr())
- }
- // 服务器进程之前启用ping操作
- func (this *wsSession) HeartBeat(msg interface{}) {
- //已经关闭
- if atomic.LoadInt64(&this.closeInt) != 0 {
- return
- }
- }
- func (this *wsSession) RunRecv() {
- // util.DebugF("start RunRecv goroutine")
- defer func() {
- //打印奔溃信息
- //if err := recover(); err != nil {
- // this.onError(err)
- //}
- //util.InfoF("Stack---:\n%s\n", string(debug.Stack()))
- //打印堆栈信息
- if err := recover(); err != nil {
- debug.PrintStack()
- }
- }()
- for {
- msg, seqId, err := this.ReadMsg(this) //procrpc.go
- if err != nil {
- util.ErrorF("Readmsg-RunRecv error=%v", err)
- //这边需要加锁,避免主线程继续在closInt还未设置成断开时还继续往session写数据,导致多线程冲突
- //this.Lock()
- //做关闭处理,发送数据时已经无法进行发送
- atomic.StoreInt64(&this.closeInt, 1)
- //close(this.sendQueue) //用来退出写协程
- this.sendQueue <- nil //用来退出写协程
- //this.Unlock()
- //抛出错误事件
- this.ProcEvent(&rocommon.RecvMsgEvent{Sess: this, Message: &rocommon.SessionClosed{}, Err: err})
- //todo...或者通过关闭sendQueue来实现关闭
- break
- }
- //接收数据事件放到队列中(需要放到队列中,否则会有线程冲突)
- this.ProcEvent(&rocommon.RecvMsgEvent{Sess: this, Message: msg, Err: nil, MsgSeqId: seqId, KvTime: util.GetTimeMilliseconds()})
- //this.ProcEvent(&rocommon.RecvMsgEvent{Sess: this, Message: msg, Err: nil, MsgSeqId: seqId})
- }
- util.DebugF("exit RunRecv goroutine addr=%v", this.conn.LocalAddr())
- this.exitWg.Done()
- }
- func (this *wsSession) RunSend() {
- //util.DebugF("start RunSend goroutine")
- defer func() {
- //打印奔溃信息
- //if err := recover(); err != nil {
- // this.onError(err)
- //}
- //util.InfoF("Stack---:\n%s\n", string(debug.Stack()))
- //打印堆栈信息
- if err := recover(); err != nil {
- debug.PrintStack()
- }
- }()
- //放到另外的队列中
- for data := range this.sendQueue {
- if data == nil {
- break
- }
- err := this.SendMsg(&rocommon.SendMsgEvent{Sess: this, Message: data})
- //err := this.SendMsg(this, data) //procrpc.go
- if err != nil {
- util.ErrorF("SendMsg RunSend error %v", err)
- break
- }
- }
- util.DebugF("exit RunSend goroutine addr=%v", this.conn.LocalAddr())
- c := this.GetConn()
- if c != nil {
- c.Close()
- }
- this.exitWg.Done()
- }
- ///////////////////////
- //acceptor中获取到连接后创建session使用
- func newWebSocketSession(conn *websocket.Conn, node rocommon.ServerNode, endCallback func()) *wsSession {
- session := &wsSession{
- conn: conn,
- node: node,
- endCallback: endCallback,
- NetProcessorRPC: node.(interface {
- GetRPC() *socket.NetProcessorRPC
- }).GetRPC(), //使用外层node的RPC处理接口
- }
- node.(socket.SocketOption).CopyOpt(&session.sessionOpt)
- return session
- }
|