procrpc.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. package socket
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/gorilla/websocket"
  7. "io"
  8. "net"
  9. "rocommon"
  10. "rocommon/rpc"
  11. "rocommon/util"
  12. )
  13. type (
  14. NetProcessorRPC struct {
  15. //解析消息数据,发送消息数据处理
  16. MsgRPC rocommon.MessageProcessor
  17. //def.go 消息解析操作放到队列直接的过滤操作(已经序列化为protobuf消息结构,如果在转换之前就做处理的,可以在MsgRPC中直接处理
  18. Hooker rocommon.EventHook
  19. //def.go 注册的具体函数回掉(具体的逻辑实现方法,例如:pbbind_gen.go中的gateHandler),没有回调函数时设置为nil
  20. Callback rocommon.EventCallBack
  21. }
  22. )
  23. type ProcessorRPCBinder func(b rocommon.ProcessorRPCBundle, usercb rocommon.EventCallBack, args ...interface{})
  24. var (
  25. //当前执行的进程名称,和回调相关的函数操作
  26. procRPCByName = map[string]ProcessorRPCBinder{}
  27. )
  28. func RegisterProcessRPC(procName string, f ProcessorRPCBinder) {
  29. if _, ok := procRPCByName[procName]; ok {
  30. panic("procRPC has register:" + procName)
  31. }
  32. procRPCByName[procName] = f
  33. }
  34. func SetProcessorRPC(node rocommon.ServerNode, procName string, callback rocommon.EventCallBack, args ...interface{}) {
  35. if proc, ok := procRPCByName[procName]; ok {
  36. b := node.(rocommon.ProcessorRPCBundle)
  37. proc(b, callback, args)
  38. } else {
  39. panic("procRPC not register:" + procName)
  40. }
  41. }
  42. // 加入回调队列或者直接执行回调操作
  43. func QueueEventCall(cb rocommon.EventCallBack) rocommon.EventCallBack {
  44. return func(e rocommon.ProcEvent) {
  45. if cb != nil {
  46. SessionQueueCall(e.Session(), func() {
  47. //now1 := time.Now()
  48. cb(e)
  49. //deltaT := time.Now().Sub(now1)
  50. //if deltaT > 1*time.Millisecond {
  51. // if e.Msg() != nil && reflect.TypeOf(e.Msg()) != nil {
  52. // tmpMsg := reflect.TypeOf(e.Msg()).Elem().String()
  53. // util.DebugF("t=%v profile=%v", deltaT, tmpMsg)
  54. // }
  55. //}
  56. })
  57. }
  58. }
  59. }
  60. // 在会话上执行事件回调,有队列则加入队列,没有就直接执行回调
  61. func SessionQueueCall(s rocommon.Session, cb func()) {
  62. if s == nil {
  63. return
  64. }
  65. que := s.Node().(interface{ Queue() rocommon.NetEventQueue }).Queue()
  66. if que != nil {
  67. que.PostCb(cb) //加入事件队列中
  68. } else {
  69. //todo...
  70. cb() //不存在直接执行回调函数(注意多线程冲突问题)
  71. }
  72. }
  73. // 注册和回掉函数相关操作
  74. func init() {
  75. RegisterProcessRPC("tcp.pb",
  76. func(b rocommon.ProcessorRPCBundle, usercb rocommon.EventCallBack, arg ...interface{}) {
  77. b.SetTransmitter(new(TCPMessageProcessor))
  78. b.SetHooker(new(TCPEventHook))
  79. b.SetCallback(QueueEventCall(usercb))
  80. })
  81. }
  82. // ///////////////////////////////////////////
  83. // NetProcessorRPC
  84. func (this *NetProcessorRPC) GetRPC() *NetProcessorRPC {
  85. return this
  86. }
  87. // 收到消息后调用该函数入队列操作
  88. func (this *NetProcessorRPC) ProcEvent(e rocommon.ProcEvent) {
  89. //todo... hooker callback
  90. if this.Hooker != nil {
  91. e = this.Hooker.InEvent(e) //对不同消息类型进行解析,并进行处理
  92. }
  93. if this.Callback != nil && e != nil {
  94. this.Callback(e)
  95. }
  96. }
  97. func (this *NetProcessorRPC) ReadMsg(session rocommon.Session) (interface{}, uint32, error) {
  98. if this.MsgRPC != nil {
  99. return this.MsgRPC.OnRecvMsg(session)
  100. }
  101. return nil, 0, errors.New("msgrpc not set!!!")
  102. }
  103. func (this *NetProcessorRPC) SendMsg(ev rocommon.ProcEvent) error {
  104. //执行hook
  105. if this.Hooker != nil {
  106. ev = this.Hooker.OutEvent(ev)
  107. }
  108. if this.MsgRPC != nil {
  109. return this.MsgRPC.OnSendMsg(ev.Session(), ev.Msg())
  110. }
  111. return nil
  112. }
  113. func (self *NetProcessorRPC) SetTransmitter(mp rocommon.MessageProcessor) {
  114. self.MsgRPC = mp
  115. }
  116. func (self *NetProcessorRPC) SetHooker(ev rocommon.EventHook) {
  117. self.Hooker = ev
  118. }
  119. func (self *NetProcessorRPC) SetCallback(ecb rocommon.EventCallBack) {
  120. self.Callback = ecb
  121. }
  122. // ///////////////////////////////////////////
  123. // EventHook interface def.go
  124. type TCPEventHook struct {
  125. }
  126. func (this *TCPEventHook) InEvent(e rocommon.ProcEvent) rocommon.ProcEvent {
  127. //todo... important
  128. //根据收到的消息类型进行过滤处理,例如如果是RecvMsgEvent事件,那么说明进过了protobuf解析,直接返回
  129. //例如远程过程调用的方式
  130. inEvent, handled, err := RPCResolveInEvent(e)
  131. if err != nil {
  132. util.InfoF("rpcc ResolveInEvent err:%v", err)
  133. return nil
  134. }
  135. if !handled {
  136. //todo... delay resolve event
  137. }
  138. return inEvent
  139. }
  140. // 获得发送事件
  141. func (this *TCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  142. //todo...
  143. handled, err := RPCResloveOutEvent(out)
  144. if err != nil {
  145. util.InfoF("rpcc RPCResolveOutEvent err:%v", err)
  146. return nil
  147. }
  148. if !handled {
  149. //todo... delay reslove event
  150. }
  151. return out
  152. }
  153. // multiHook 例如game server有多个处理操作
  154. type MultiTCPEventHook []rocommon.EventHook
  155. func (this MultiTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  156. for _, ev := range this {
  157. in = ev.InEvent(in)
  158. if in == nil {
  159. break
  160. }
  161. }
  162. return in
  163. }
  164. // 获得发送事件
  165. func (this MultiTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  166. for _, ev := range this {
  167. out = ev.OutEvent(out)
  168. if out == nil {
  169. break
  170. }
  171. }
  172. return out
  173. }
  174. func NewMultiTCPEventHook(args ...rocommon.EventHook) rocommon.EventHook {
  175. return MultiTCPEventHook(args)
  176. }
  177. // 根据收到的消息类型进行过滤处理,例如如果是RecvMsgEvent事件,那么说明经过了protobuf解析,直接返回
  178. // 例如远程过程调用的方式 / RPC消息解析
  179. func RPCResolveInEvent(inEvent rocommon.ProcEvent) (rocommon.ProcEvent, bool, error) {
  180. //是接收处理消息
  181. if _, ok := inEvent.(*rocommon.RecvMsgEvent); ok {
  182. return inEvent, false, nil
  183. }
  184. //todo...其他消息类型处理 important
  185. return inEvent, false, nil
  186. }
  187. func RPCResloveOutEvent(outEvent rocommon.ProcEvent) (bool, error) {
  188. //todo... RemoteCallMsg
  189. return true, nil
  190. }
  191. // ///////////////////////////////////////////
  192. // MessageProcessor interface def.go
  193. type TCPMessageProcessor struct {
  194. }
  195. // recv
  196. func (this *TCPMessageProcessor) OnRecvMsg(s rocommon.Session) (msg interface{}, msgSeqId uint32, err error) {
  197. //todo...
  198. reader, ok := s.Raw().(io.Reader)
  199. if !ok || reader == nil {
  200. util.InfoF("[TCPMessageProcessor] OnRecvMsg err")
  201. return nil, 0, nil
  202. }
  203. opt := s.Node().(SocketOption)
  204. opt.SocketReadTimeout(reader.(net.Conn), func() {
  205. msg, msgSeqId, err = rpc.ReadMessage(reader, opt.MaxMsgLen(), s.GetAES())
  206. })
  207. return
  208. }
  209. // send
  210. var tmpClient = []byte("client")
  211. func (this *TCPMessageProcessor) OnSendMsg(s rocommon.Session, msg interface{}) (err error) {
  212. //todo...
  213. writer, ok := s.Raw().(io.Writer)
  214. if !ok || writer == nil {
  215. util.InfoF("[TCPMessageProcessor] OnSendMsg err")
  216. return nil
  217. }
  218. opt := s.Node().(SocketOption)
  219. opt.SocketWriteTimeout(writer.(net.Conn), func() {
  220. nodeName := s.Node().(rocommon.ServerNodeProperty).GetName()
  221. if nodeName == "client" {
  222. err = rpc.SendMessage(writer, msg, s.GetAES(), opt.MaxMsgLen(), nodeName)
  223. } else {
  224. err = rpc.SendMessage(writer, msg, s.GetAES(), opt.MaxMsgLen(), nodeName)
  225. }
  226. })
  227. return
  228. }
  229. // ///////////////////////////////////////////
  230. // MessageProcessor interface def.go
  231. type WSMessageProcessor struct {
  232. }
  233. const (
  234. lenMaxLen = 2 //包体大小2个字节 uint16
  235. msgIdLen = 2 //包ID大小2个字节 uint16
  236. msgSeqlen = 4 //发送序列号2个字节大小,用来断线重连
  237. msgFlaglen = 2 //暂定标记,加解密 1表示RSA,2表示AES
  238. )
  239. // recv
  240. func (this *WSMessageProcessor) OnRecvMsg(s rocommon.Session) (msg interface{}, msgSeqId uint32, err error) {
  241. conn, ok := s.Raw().(*websocket.Conn)
  242. if !ok || conn == nil {
  243. util.InfoF("[WSMessageProcessor] OnRecvMsg err")
  244. return nil, 0, nil
  245. }
  246. //reader, ok := s.Raw().(io.Reader)
  247. //if !ok || reader == nil {
  248. // util.InfoF("[TCPMessageProcessor] OnRecvMsg err")
  249. // return nil, 0, nil
  250. //}
  251. messageType, raw, err := conn.ReadMessage()
  252. if err != nil {
  253. util.InfoF("[WSMessageProcessor] OnRecvMsg err=%v", err)
  254. return nil, 0, nil
  255. }
  256. if messageType != websocket.BinaryMessage {
  257. util.InfoF("[WSMessageProcessor] OnRecvMsg err messageType=%v", messageType)
  258. return nil, 0, nil
  259. }
  260. var msgId uint16
  261. //var seqId uint32 //包序列号,客户端发送时的序列从1开始
  262. var flagId uint16 //加密方式
  263. var msgData []byte
  264. binary.BigEndian.Uint16(raw) //msgDataLen
  265. msgId = binary.BigEndian.Uint16(raw[lenMaxLen:])
  266. msgSeqId = binary.BigEndian.Uint32(raw[lenMaxLen+msgIdLen:])
  267. flagId = binary.BigEndian.Uint16(raw[lenMaxLen+msgIdLen+msgSeqlen:])
  268. msgData = raw[msgIdLen+msgSeqlen+msgFlaglen+lenMaxLen:]
  269. aesKey := s.GetAES()
  270. switch flagId {
  271. case 1:
  272. if int(msgId) == rpc.SC_HAND_SHAKE_NTFMsgId { //SC_HAND_SHAKE_NTF
  273. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateClientKey)
  274. if err != nil {
  275. return nil, 0, err
  276. }
  277. } else if int(msgId) == rpc.CS_HAND_SHAKE_REQMsgId { //CS_HAND_SHAKE_REQ
  278. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateServerKey)
  279. if err != nil {
  280. return nil, 0, err
  281. }
  282. } else if int(msgId) == rpc.SC_HAND_SHAKE_ACKMsgId { //SC_HAND_SHAKE_ACK
  283. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateClientKey)
  284. if err != nil {
  285. return nil, 0, err
  286. }
  287. } else {
  288. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateKey)
  289. if err != nil {
  290. return nil, 0, err
  291. }
  292. }
  293. case 2:
  294. msgData, err = rpc.AESCtrDecrypt(msgData, *aesKey, *aesKey...)
  295. //msgData, err = AESCtrDecrypt(msgData, *aesKey)
  296. if err != nil {
  297. return nil, 0, err
  298. }
  299. }
  300. //服务器内部不做加密处理
  301. msg, _, err = rpc.DecodeMessage(int(msgId), msgData)
  302. if err != nil {
  303. //log.Println("[DecodeMessage] err:", err)
  304. return nil, 0, errors.New(fmt.Sprintf("msg decodeMessage failed:%v %v", msgId, err))
  305. }
  306. return
  307. }
  308. func (this *WSMessageProcessor) OnSendMsg(s rocommon.Session, msg interface{}) (err error) {
  309. opt := s.Node().(SocketOption)
  310. conn, ok := s.Raw().(*websocket.Conn)
  311. if !ok || conn == nil {
  312. util.InfoF("[WSMessageProcessor] OnRecvMsg err")
  313. return nil
  314. }
  315. nodeName := s.Node().(rocommon.ServerNodeProperty).GetName()
  316. if nodeName != "wsclient" {
  317. return
  318. }
  319. aesKey := s.GetAES()
  320. var (
  321. msgData []byte
  322. msgId uint16
  323. seqId uint32
  324. msgInfo *rocommon.MessageInfo
  325. )
  326. switch m := msg.(type) {
  327. case *rocommon.TransmitPacket:
  328. msgData = m.MsgData
  329. msgId = uint16(m.MsgId)
  330. seqId = m.SeqId
  331. default:
  332. msgData, msgInfo, err = rpc.EncodeMessage(msg)
  333. if err != nil {
  334. return err
  335. }
  336. msgId = uint16(msgInfo.ID)
  337. }
  338. //todo
  339. // 注意上层发包不要超过最大值
  340. msgLen := len(msgData)
  341. var cryptType uint16 = 0
  342. //握手阶段
  343. if msgId == uint16(rpc.SC_HAND_SHAKE_NTFMsgId) {
  344. cryptType = 1
  345. msgData, err = rpc.RSAEncrypt(msgData, rpc.PublicClientKey)
  346. if err != nil {
  347. return err
  348. }
  349. msgLen = len(msgData)
  350. } else {
  351. if len(*aesKey) > 0 && msgId != rpc.SC_PING_ACKMsgId {
  352. cryptType = 2
  353. msgData, err = rpc.AESCtrEncrypt(msgData, *aesKey, *aesKey...)
  354. //msgData, err = AESCtrEncrypt(msgData, *aesKey)
  355. if err != nil {
  356. return err
  357. }
  358. msgLen = len(msgData)
  359. }
  360. }
  361. if msgLen > opt.MaxMsgLen() {
  362. err = errors.New(fmt.Sprintf("message too big msgId=%v msglen=%v maxlen=%v", msgId, msgLen, opt.MaxMsgLen()))
  363. util.FatalF("SendMessage err=%v", err)
  364. err = nil
  365. return
  366. }
  367. //data := make([]byte, lenMaxLen + msgIdLen + msgLen)
  368. data := make([]byte, lenMaxLen+msgIdLen+msgSeqlen+msgFlaglen+msgLen) //head + body
  369. //lenMaxLen
  370. binary.BigEndian.PutUint16(data, uint16(msgLen))
  371. //msgIdLen
  372. binary.BigEndian.PutUint16(data[lenMaxLen:], uint16(msgId))
  373. //seq 返回客户端发送的序列号
  374. binary.BigEndian.PutUint32(data[lenMaxLen+msgIdLen:], seqId)
  375. //log.Println("sendSeqId:", seqId)
  376. //使用的加密方式AES
  377. binary.BigEndian.PutUint16(data[lenMaxLen+msgIdLen+msgSeqlen:], cryptType)
  378. //body
  379. if msgLen > 0 {
  380. copy(data[lenMaxLen+msgIdLen+msgSeqlen+msgFlaglen:], msgData)
  381. }
  382. conn.WriteMessage(websocket.BinaryMessage, data)
  383. return
  384. }