proc_rpc_ws.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package model
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "rocommon"
  7. "rocommon/rpc"
  8. _ "rocommon/service"
  9. "rocommon/socket"
  10. "rocommon/util"
  11. _ "roserver/baseserver/model"
  12. _ "roserver/baseserver/router"
  13. _ "roserver/serverproto"
  14. "github.com/gorilla/websocket"
  15. )
  16. const (
  17. MsgIDLen = 2 // uint16
  18. lenMaxLen = 2 //包体大小2个字节 uint16
  19. msgIdLen = 2 //包ID大小2个字节 uint16
  20. msgSeqlen = 4 //发送序列号2个字节大小,用来断线重连
  21. msgFlaglen = 2 //暂定标记,加解密 1表示RSA,2表示AES
  22. MsgBodyIdx = 2 + 4 + 2 + 2 //->10
  23. )
  24. type DirectWSMessageTransmitter struct {
  25. }
  26. //recv直接原始数据传递到后端 / 返回消息给client
  27. func (this *DirectWSMessageTransmitter) OnRecvMsg(s rocommon.Session) (msg interface{}, seqId uint32, err error) {
  28. conn, ok := s.Raw().(*websocket.Conn)
  29. if !ok || conn == nil {
  30. util.InfoF("[DirectWSMessageTransmitter] OnRecvMsg err")
  31. return nil, 0, nil
  32. }
  33. //opt := s.Node().(socket.SocketOption)
  34. //var opt socket.SocketOption
  35. //if s.GetSessionOptFlag() {
  36. // opt = s.Node().(socket.SocketOption)
  37. //} else {
  38. // opt = s.GetSessionOpt().(socket.SocketOption)
  39. //}
  40. messageType, raw, err1 := conn.ReadMessage()
  41. util.InfoF("[DirectWSMessageTransmitter] OnRecvMsg start raw=%v", raw)
  42. if err1 != nil {
  43. err = err1
  44. util.InfoF("[DirectWSMessageTransmitter] OnRecvMsg ReadMessage err=%v", err)
  45. return nil, 0, err1
  46. }
  47. switch messageType {
  48. case websocket.BinaryMessage:
  49. var msgId uint16
  50. //var seqId uint32 //包序列号,客户端发送时的序列从1开始
  51. var flagId uint16 //加密方式
  52. var msgData []byte
  53. var msgDataLen uint16
  54. if len(raw) < lenMaxLen {
  55. return nil, 0, nil
  56. }
  57. msgDataLen = binary.BigEndian.Uint16(raw) //msgDataLen
  58. raw = raw[lenMaxLen:]
  59. util.InfoF("[DirectWSMessageTransmitter] OnRecvMsg msgDataLen=%v raw=%v", msgDataLen, raw)
  60. if msgDataLen >= 0 {
  61. //msgIdLen
  62. if len(raw) < msgIdLen {
  63. return
  64. }
  65. msgId = binary.BigEndian.Uint16(raw)
  66. raw = raw[msgIdLen:]
  67. //msgSeqlen
  68. if len(raw) < msgSeqlen {
  69. return
  70. }
  71. seqId = binary.BigEndian.Uint32(raw)
  72. raw = raw[msgSeqlen:]
  73. //msgFlaglen
  74. if len(raw) < msgFlaglen {
  75. return
  76. }
  77. flagId = binary.BigEndian.Uint16(raw)
  78. msgData = raw[msgFlaglen:]
  79. util.InfoF("[DirectWSMessageTransmitter] OnRecvMsg msgData=%v", msgData)
  80. //尝试直接发送到其他后端服务器或者解析
  81. if err == nil {
  82. msg, err = FrontendPackageProc(int(msgId), seqId, flagId, msgData, s)
  83. }
  84. }
  85. }
  86. return
  87. }
  88. //send 直接发往客户端的消息
  89. func (this *DirectWSMessageTransmitter) OnSendMsg(s rocommon.Session, msg interface{}) (err error) {
  90. conn, ok := s.Raw().(*websocket.Conn)
  91. if !ok || conn == nil {
  92. util.InfoF("[DirectWSMessageTransmitter] OnRecvMsg err")
  93. return nil
  94. }
  95. //opt := s.Node().(socket.SocketOption)
  96. var opt socket.SocketOption
  97. if s.GetSessionOptFlag() {
  98. opt = s.Node().(socket.SocketOption)
  99. } else {
  100. opt = s.GetSessionOpt().(socket.SocketOption)
  101. }
  102. aesKey := s.GetAES()
  103. var (
  104. msgData []byte
  105. msgId uint16
  106. seqId uint32
  107. msgInfo *rocommon.MessageInfo
  108. )
  109. switch m := msg.(type) {
  110. case *rocommon.TransmitPacket:
  111. msgData = m.MsgData
  112. msgId = uint16(m.MsgId)
  113. seqId = m.SeqId
  114. default:
  115. msgData, msgInfo, err = rpc.EncodeMessage(msg)
  116. if err != nil {
  117. return err
  118. }
  119. msgId = uint16(msgInfo.ID)
  120. }
  121. //todo
  122. // 注意上层发包不要超过最大值
  123. msgLen := len(msgData)
  124. var cryptType uint16 = 0
  125. //握手阶段
  126. if msgId == uint16(rpc.SC_HAND_SHAKE_NTFMsgId) {
  127. cryptType = 1
  128. msgData, err = rpc.RSAEncrypt(msgData, rpc.PublicClientKey)
  129. if err != nil {
  130. return err
  131. }
  132. msgLen = len(msgData)
  133. } else {
  134. if len(*aesKey) > 0 && msgId != rpc.SC_PING_ACKMsgId {
  135. cryptType = 2
  136. msgData, err = rpc.AESCtrEncrypt(msgData, *aesKey, *aesKey...)
  137. //msgData, err = AESCtrEncrypt(msgData, *aesKey)
  138. if err != nil {
  139. return err
  140. }
  141. msgLen = len(msgData)
  142. }
  143. }
  144. if msgLen > opt.MaxMsgLen() {
  145. err = errors.New(fmt.Sprintf("message too big msgId=%v msglen=%v maxlen=%v", msgId, msgLen, opt.MaxMsgLen()))
  146. util.FatalF("SendMessage err=%v", err)
  147. err = nil
  148. return
  149. }
  150. //data := make([]byte, lenMaxLen + msgIdLen + msgLen)
  151. data := make([]byte, lenMaxLen+msgIdLen+msgSeqlen+msgFlaglen+msgLen) //head + body
  152. //lenMaxLen
  153. binary.BigEndian.PutUint16(data, uint16(msgLen))
  154. //msgIdLen
  155. binary.BigEndian.PutUint16(data[lenMaxLen:], uint16(msgId))
  156. //seq 返回客户端发送的序列号
  157. binary.BigEndian.PutUint32(data[lenMaxLen+msgIdLen:], seqId)
  158. //log.Println("sendSeqId:", seqId)
  159. //使用的加密方式AES
  160. binary.BigEndian.PutUint16(data[lenMaxLen+msgIdLen+msgSeqlen:], cryptType)
  161. //body
  162. if msgLen > 0 {
  163. copy(data[lenMaxLen+msgIdLen+msgSeqlen+msgFlaglen:], msgData)
  164. }
  165. conn.WriteMessage(websocket.BinaryMessage, data)
  166. return
  167. }