procrpc.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package socket
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "rocommon"
  9. "rocommon/rpc"
  10. "rocommon/util"
  11. "github.com/gorilla/websocket"
  12. )
  13. type (
  14. NetProcessorRPC struct {
  15. //解析消息数据,发送消息数据处理
  16. MsgRPC rocommon.MessageProcessor
  17. //def.go 消息解析操作放到队列直接的过滤操作(已经序列化为protobuf消息结构,如果在转换之前就做处理的,可以在MsgRPC中直接处理
  18. Hooker rocommon.EventHook
  19. //def.go 注册的具体函数回掉(具体的逻辑实现方法,例如:pbbind_gen.go中的gateHandler),没有回调函数时设置为nil
  20. Callback rocommon.EventCallBack
  21. }
  22. )
  23. type ProcessorRPCBinder func(b rocommon.ProcessorRPCBundle, usercb rocommon.EventCallBack, args ...interface{})
  24. var (
  25. //当前执行的进程名称,和回调相关的函数操作
  26. procRPCByName = map[string]ProcessorRPCBinder{}
  27. )
  28. func RegisterProcessRPC(procName string, f ProcessorRPCBinder) {
  29. if _, ok := procRPCByName[procName]; ok {
  30. panic("procRPC has register:" + procName)
  31. }
  32. procRPCByName[procName] = f
  33. }
  34. func SetProcessorRPC(node rocommon.ServerNode, procName string, callback rocommon.EventCallBack, args ...interface{}) {
  35. if proc, ok := procRPCByName[procName]; ok {
  36. b := node.(rocommon.ProcessorRPCBundle)
  37. proc(b, callback, args)
  38. } else {
  39. panic("procRPC not register:" + procName)
  40. }
  41. }
  42. //加入回调队列或者直接执行回调操作
  43. func QueueEventCall(cb rocommon.EventCallBack) rocommon.EventCallBack {
  44. return func(e rocommon.ProcEvent) {
  45. if cb != nil {
  46. SessionQueueCall(e.Session(), func() {
  47. //now1 := time.Now()
  48. cb(e)
  49. //deltaT := time.Now().Sub(now1)
  50. //if deltaT > 1*time.Millisecond {
  51. // if e.Msg() != nil && reflect.TypeOf(e.Msg()) != nil {
  52. // tmpMsg := reflect.TypeOf(e.Msg()).Elem().String()
  53. // util.DebugF("t=%v profile=%v", deltaT, tmpMsg)
  54. // }
  55. //}
  56. })
  57. }
  58. }
  59. }
  60. //在会话上执行事件回调,有队列则加入队列,没有就直接执行回调
  61. func SessionQueueCall(s rocommon.Session, cb func()) {
  62. if s == nil {
  63. return
  64. }
  65. que := s.Node().(interface{ Queue() rocommon.NetEventQueue }).Queue()
  66. if que != nil {
  67. que.PostCb(cb) //加入事件队列中
  68. } else {
  69. //todo...
  70. cb() //不存在直接执行回调函数(注意多线程冲突问题)
  71. }
  72. }
  73. //注册和回掉函数相关操作
  74. func init() {
  75. RegisterProcessRPC("tcp.pb",
  76. func(b rocommon.ProcessorRPCBundle, usercb rocommon.EventCallBack, arg ...interface{}) {
  77. b.SetTransmitter(new(TCPMessageProcessor))
  78. b.SetHooker(new(TCPEventHook))
  79. b.SetCallback(QueueEventCall(usercb))
  80. })
  81. }
  82. /////////////////////////////////////////////
  83. //NetProcessorRPC
  84. func (this *NetProcessorRPC) GetRPC() *NetProcessorRPC {
  85. return this
  86. }
  87. //收到消息后调用该函数入队列操作
  88. func (this *NetProcessorRPC) ProcEvent(e rocommon.ProcEvent) {
  89. //todo... hooker callback
  90. if this.Hooker != nil {
  91. e = this.Hooker.InEvent(e) //对不同消息类型进行解析,并进行处理
  92. }
  93. if this.Callback != nil && e != nil {
  94. this.Callback(e)
  95. }
  96. }
  97. func (this *NetProcessorRPC) ReadMsg(session rocommon.Session) (interface{}, uint32, error) {
  98. if this.MsgRPC != nil {
  99. return this.MsgRPC.OnRecvMsg(session)
  100. }
  101. return nil, 0, errors.New("msgrpc not set!!!")
  102. }
  103. func (this *NetProcessorRPC) SendMsg(ev rocommon.ProcEvent) error {
  104. //执行hook
  105. if this.Hooker != nil {
  106. ev = this.Hooker.OutEvent(ev)
  107. }
  108. if this.MsgRPC != nil {
  109. return this.MsgRPC.OnSendMsg(ev.Session(), ev.Msg())
  110. }
  111. return nil
  112. }
  113. func (self *NetProcessorRPC) SetTransmitter(mp rocommon.MessageProcessor) {
  114. self.MsgRPC = mp
  115. }
  116. func (self *NetProcessorRPC) SetHooker(ev rocommon.EventHook) {
  117. self.Hooker = ev
  118. }
  119. func (self *NetProcessorRPC) SetCallback(ecb rocommon.EventCallBack) {
  120. self.Callback = ecb
  121. }
  122. /////////////////////////////////////////////
  123. //EventHook interface def.go
  124. type TCPEventHook struct {
  125. }
  126. func (this *TCPEventHook) InEvent(e rocommon.ProcEvent) rocommon.ProcEvent {
  127. //todo... important
  128. //根据收到的消息类型进行过滤处理,例如如果是RecvMsgEvent事件,那么说明进过了protobuf解析,直接返回
  129. //例如远程过程调用的方式
  130. inEvent, handled, err := RPCResolveInEvent(e)
  131. if err != nil {
  132. util.InfoF("rpc ResolveInEvent err:%v", err)
  133. return nil
  134. }
  135. if !handled {
  136. //todo... delay resolve event
  137. }
  138. return inEvent
  139. }
  140. //获得发送事件
  141. func (this *TCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  142. //todo...
  143. handled, err := RPCResloveOutEvent(out)
  144. if err != nil {
  145. util.InfoF("rpc RPCResolveOutEvent err:%v", err)
  146. return nil
  147. }
  148. if !handled {
  149. //todo... delay reslove event
  150. }
  151. return out
  152. }
  153. //multiHook 例如game server有多个处理操作
  154. type MultiTCPEventHook []rocommon.EventHook
  155. func (this MultiTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  156. for _, ev := range this {
  157. in = ev.InEvent(in)
  158. if in == nil {
  159. break
  160. }
  161. }
  162. return in
  163. }
  164. //获得发送事件
  165. func (this MultiTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  166. for _, ev := range this {
  167. out = ev.OutEvent(out)
  168. if out == nil {
  169. break
  170. }
  171. }
  172. return out
  173. }
  174. func NewMultiTCPEventHook(args ...rocommon.EventHook) rocommon.EventHook {
  175. return MultiTCPEventHook(args)
  176. }
  177. //根据收到的消息类型进行过滤处理,例如如果是RecvMsgEvent事件,那么说明经过了protobuf解析,直接返回
  178. //例如远程过程调用的方式 / RPC消息解析
  179. func RPCResolveInEvent(inEvent rocommon.ProcEvent) (rocommon.ProcEvent, bool, error) {
  180. //是接收处理消息
  181. if _, ok := inEvent.(*rocommon.RecvMsgEvent); ok {
  182. return inEvent, false, nil
  183. }
  184. //todo...其他消息类型处理 important
  185. return inEvent, false, nil
  186. }
  187. func RPCResloveOutEvent(outEvent rocommon.ProcEvent) (bool, error) {
  188. //todo... RemoteCallMsg
  189. return true, nil
  190. }
  191. /////////////////////////////////////////////
  192. //MessageProcessor interface def.go
  193. type TCPMessageProcessor struct {
  194. }
  195. //recv
  196. func (this *TCPMessageProcessor) OnRecvMsg(s rocommon.Session) (msg interface{}, msgSeqId uint32, err error) {
  197. //todo...
  198. reader, ok := s.Raw().(io.Reader)
  199. if !ok || reader == nil {
  200. util.InfoF("[TCPMessageProcessor] OnRecvMsg err")
  201. return nil, 0, nil
  202. }
  203. opt := s.Node().(SocketOption)
  204. opt.SocketReadTimeout(reader.(net.Conn), func() {
  205. msg, msgSeqId, err = rpc.ReadMessage(reader, opt.MaxMsgLen(), s.GetAES())
  206. })
  207. return
  208. }
  209. //send
  210. var tmpClient = []byte("client")
  211. func (this *TCPMessageProcessor) OnSendMsg(s rocommon.Session, msg interface{}) (err error) {
  212. util.InfoF("[TCPMessageProcessor] OnSendMsg session=%v msg=%v", s, msg)
  213. //todo...
  214. writer, ok := s.Raw().(io.Writer)
  215. if !ok || writer == nil {
  216. util.InfoF("[TCPMessageProcessor] OnSendMsg err")
  217. return nil
  218. }
  219. opt := s.Node().(SocketOption)
  220. opt.SocketWriteTimeout(writer.(net.Conn), func() {
  221. nodeName := s.Node().(rocommon.ServerNodeProperty).GetName()
  222. if nodeName == "client" {
  223. err = rpc.SendMessage(writer, msg, s.GetAES(), opt.MaxMsgLen(), nodeName)
  224. } else {
  225. err = rpc.SendMessage(writer, msg, s.GetAES(), opt.MaxMsgLen(), nodeName)
  226. }
  227. })
  228. return
  229. }
  230. /////////////////////////////////////////////
  231. //MessageProcessor interface def.go
  232. type WSMessageProcessor struct {
  233. }
  234. const (
  235. lenMaxLen = 2 //包体大小2个字节 uint16
  236. msgIdLen = 2 //包ID大小2个字节 uint16
  237. msgSeqlen = 4 //发送序列号2个字节大小,用来断线重连
  238. msgFlaglen = 2 //暂定标记,加解密 1表示RSA,2表示AES
  239. )
  240. //recv
  241. func (this *WSMessageProcessor) OnRecvMsg(s rocommon.Session) (msg interface{}, msgSeqId uint32, err error) {
  242. conn, ok := s.Raw().(*websocket.Conn)
  243. if !ok || conn == nil {
  244. util.InfoF("[WSMessageProcessor] OnRecvMsg err")
  245. return nil, 0, nil
  246. }
  247. //reader, ok := s.Raw().(io.Reader)
  248. //if !ok || reader == nil {
  249. // util.InfoF("[TCPMessageProcessor] OnRecvMsg err")
  250. // return nil, 0, nil
  251. //}
  252. messageType, raw, err := conn.ReadMessage()
  253. if err != nil {
  254. util.InfoF("[WSMessageProcessor] OnRecvMsg err=%v", err)
  255. return nil, 0, nil
  256. }
  257. if messageType != websocket.BinaryMessage {
  258. util.InfoF("[WSMessageProcessor] OnRecvMsg err messageType=%v", messageType)
  259. return nil, 0, nil
  260. }
  261. var msgId uint16
  262. //var seqId uint32 //包序列号,客户端发送时的序列从1开始
  263. var flagId uint16 //加密方式
  264. var msgData []byte
  265. binary.BigEndian.Uint16(raw) //msgDataLen
  266. msgId = binary.BigEndian.Uint16(raw[lenMaxLen:])
  267. msgSeqId = binary.BigEndian.Uint32(raw[lenMaxLen+msgIdLen:])
  268. flagId = binary.BigEndian.Uint16(raw[lenMaxLen+msgIdLen+msgSeqlen:])
  269. msgData = raw[msgIdLen+msgSeqlen+msgFlaglen+lenMaxLen:]
  270. aesKey := s.GetAES()
  271. switch flagId {
  272. case 1:
  273. if int(msgId) == rpc.SC_HAND_SHAKE_NTFMsgId { //SC_HAND_SHAKE_NTF
  274. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateClientKey)
  275. if err != nil {
  276. return nil, 0, err
  277. }
  278. } else if int(msgId) == rpc.CS_HAND_SHAKE_REQMsgId { //CS_HAND_SHAKE_REQ
  279. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateServerKey)
  280. if err != nil {
  281. return nil, 0, err
  282. }
  283. } else if int(msgId) == rpc.SC_HAND_SHAKE_ACKMsgId { //SC_HAND_SHAKE_ACK
  284. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateClientKey)
  285. if err != nil {
  286. return nil, 0, err
  287. }
  288. } else {
  289. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateKey)
  290. if err != nil {
  291. return nil, 0, err
  292. }
  293. }
  294. case 2:
  295. msgData, err = rpc.AESCtrDecrypt(msgData, *aesKey, *aesKey...)
  296. //msgData, err = AESCtrDecrypt(msgData, *aesKey)
  297. if err != nil {
  298. return nil, 0, err
  299. }
  300. }
  301. //服务器内部不做加密处理
  302. msg, _, err = rpc.DecodeMessage(int(msgId), msgData)
  303. if err != nil {
  304. //log.Println("[DecodeMessage] err:", err)
  305. return nil, 0, errors.New(fmt.Sprintf("msg decodeMessage failed:%v %v", msgId, err))
  306. }
  307. return
  308. }
  309. func (this *WSMessageProcessor) OnSendMsg(s rocommon.Session, msg interface{}) (err error) {
  310. opt := s.Node().(SocketOption)
  311. conn, ok := s.Raw().(*websocket.Conn)
  312. if !ok || conn == nil {
  313. util.InfoF("[WSMessageProcessor] OnRecvMsg err")
  314. return nil
  315. }
  316. nodeName := s.Node().(rocommon.ServerNodeProperty).GetName()
  317. if nodeName != "wsclient" {
  318. return
  319. }
  320. aesKey := s.GetAES()
  321. var (
  322. msgData []byte
  323. msgId uint16
  324. seqId uint32
  325. msgInfo *rocommon.MessageInfo
  326. )
  327. switch m := msg.(type) {
  328. case *rocommon.TransmitPacket:
  329. msgData = m.MsgData
  330. msgId = uint16(m.MsgId)
  331. seqId = m.SeqId
  332. default:
  333. msgData, msgInfo, err = rpc.EncodeMessage(msg)
  334. if err != nil {
  335. return err
  336. }
  337. msgId = uint16(msgInfo.ID)
  338. }
  339. //todo
  340. // 注意上层发包不要超过最大值
  341. msgLen := len(msgData)
  342. var cryptType uint16 = 0
  343. //握手阶段
  344. if msgId == uint16(rpc.SC_HAND_SHAKE_NTFMsgId) {
  345. cryptType = 1
  346. msgData, err = rpc.RSAEncrypt(msgData, rpc.PublicClientKey)
  347. if err != nil {
  348. return err
  349. }
  350. msgLen = len(msgData)
  351. } else {
  352. if len(*aesKey) > 0 && msgId != rpc.SC_PING_ACKMsgId {
  353. cryptType = 2
  354. msgData, err = rpc.AESCtrEncrypt(msgData, *aesKey, *aesKey...)
  355. //msgData, err = AESCtrEncrypt(msgData, *aesKey)
  356. if err != nil {
  357. return err
  358. }
  359. msgLen = len(msgData)
  360. }
  361. }
  362. if msgLen > opt.MaxMsgLen() {
  363. err = errors.New(fmt.Sprintf("message too big msgId=%v msglen=%v maxlen=%v", msgId, msgLen, opt.MaxMsgLen()))
  364. util.FatalF("SendMessage err=%v", err)
  365. err = nil
  366. return
  367. }
  368. //data := make([]byte, lenMaxLen + msgIdLen + msgLen)
  369. data := make([]byte, lenMaxLen+msgIdLen+msgSeqlen+msgFlaglen+msgLen) //head + body
  370. //lenMaxLen
  371. binary.BigEndian.PutUint16(data, uint16(msgLen))
  372. //msgIdLen
  373. binary.BigEndian.PutUint16(data[lenMaxLen:], uint16(msgId))
  374. //seq 返回客户端发送的序列号
  375. binary.BigEndian.PutUint32(data[lenMaxLen+msgIdLen:], seqId)
  376. //log.Println("sendSeqId:", seqId)
  377. //使用的加密方式AES
  378. binary.BigEndian.PutUint16(data[lenMaxLen+msgIdLen+msgSeqlen:], cryptType)
  379. //body
  380. if msgLen > 0 {
  381. copy(data[lenMaxLen+msgIdLen+msgSeqlen+msgFlaglen:], msgData)
  382. }
  383. conn.WriteMessage(websocket.BinaryMessage, data)
  384. return
  385. }