proc_rpc.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package model
  2. import (
  3. "github.com/gorilla/websocket"
  4. "io"
  5. "net"
  6. "rocommon"
  7. "rocommon/rpc"
  8. "rocommon/socket"
  9. _ "rocommon/socket/tcp"
  10. "rocommon/util"
  11. "roserver/baseserver/model"
  12. "roserver/baseserver/router"
  13. "roserver/serverproto"
  14. )
  15. // 消息解析操作
  16. // MessageProcessor def.go
  17. // TCPMessageProcessor procrpc.go 基础实现方法
  18. type DirectTCPMessageProcessor struct {
  19. }
  20. // recv直接原始数据传递到后端 / 返回消息给client
  21. func (this *DirectTCPMessageProcessor) OnRecvMsg(s rocommon.Session) (msg interface{}, seqId uint32, err error) {
  22. reader, ok := s.Raw().(io.Reader)
  23. if !ok || reader == nil {
  24. util.InfoF("[DirectTCPMessageProcessor] OnRecvMsg err")
  25. return nil, 0, nil
  26. }
  27. //opt := s.Node().(socket.SocketOption)
  28. var opt socket.SocketOption
  29. if s.GetSessionOptFlag() {
  30. opt = s.Node().(socket.SocketOption)
  31. } else {
  32. opt = s.GetSessionOpt().(socket.SocketOption)
  33. }
  34. opt.SocketReadTimeout(reader.(net.Conn), func() {
  35. var msgId uint16
  36. var seqId uint32 //包序列号,客户端发送时的序列从1开始
  37. var msgData []byte
  38. var flagId uint16 //加密方式
  39. //获取原始格式数据包(pb序列化的二进制数据)
  40. msgId, seqId, flagId, msgData, err = rpc.RecvPackageData(reader, opt.MaxMsgLen())
  41. //尝试直接发送到其他后端服务器或者解析
  42. if err == nil {
  43. msg, err = FrontendPackageProc(int(msgId), seqId, flagId, msgData, s)
  44. util.DebugF("OnRecvMsg-FrontendPackageProc msg=%v error=%v", msg, err)
  45. if msgId != 1000 {
  46. util.InfoF("OnRecvMsg-RecvPackageData msgId=%v", msgId)
  47. //log.Printf("OnRecvMsg-RecvPackageData msgData=%v", string(msgData))
  48. }
  49. }
  50. })
  51. return
  52. }
  53. // send 直接发往客户端的消息
  54. func (this *DirectTCPMessageProcessor) OnSendMsg(s rocommon.Session, msg interface{}) (err error) {
  55. writer, ok := s.Raw().(io.Writer)
  56. if !ok || writer == nil {
  57. util.InfoF("[DirectTCPMessageProcessor] OnSendMsg err")
  58. return nil
  59. }
  60. //opt := s.Node().(socket.SocketOption)
  61. var opt socket.SocketOption
  62. if s.GetSessionOptFlag() {
  63. opt = s.Node().(socket.SocketOption)
  64. } else {
  65. opt = s.GetSessionOpt().(socket.SocketOption)
  66. }
  67. opt.SocketWriteTimeout(writer.(net.Conn), func() {
  68. err = rpc.SendMessage(writer, msg, s.GetAES(), opt.MaxMsgLen(), "")
  69. })
  70. return
  71. }
  72. var (
  73. PingReqMsgID = rocommon.MessageInfoByName("CSPingReq").ID
  74. LoginReqMsgId = rocommon.MessageInfoByName("CSLoginReq").ID
  75. ReconnectMsgId = rocommon.MessageInfoByName("CSReconnectReq").ID
  76. //CSPlayerMoveReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveReq").ID
  77. //CSPlayerMoveToReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveToReq").ID
  78. CSPlayerGMReqMsgId = rocommon.MessageInfoByName("CSGMCommandReq").ID
  79. )
  80. // 尝试直接发送到其他后端服务器或者解析后直接回复client
  81. func FrontendPackageProc(msgId int, seqId uint32, flagId uint16, msgData []byte, s rocommon.Session) (msg interface{}, err error) {
  82. switch flagId {
  83. case 1: //rsa加密的数据
  84. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateKey)
  85. if err != nil {
  86. return nil, err
  87. }
  88. case 2: //aes加密的数据
  89. //msgData, err = rpcc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...)
  90. msgData, err = rpc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...)
  91. if err != nil {
  92. return nil, err
  93. }
  94. }
  95. //gate接收到客户端消息的处理部分
  96. switch int(msgId) {
  97. case PingReqMsgID, LoginReqMsgId, ReconnectMsgId:
  98. msg, _, err = rpc.DecodeMessage(msgId, msgData)
  99. if err != nil {
  100. util.InfoF("FrontendPackageProc err=%v msgId=%v flagId=%v", err, msgId, flagId)
  101. return nil, err
  102. }
  103. //switch msgType := msg.(type) {
  104. switch in := msg.(type) {
  105. case *serverproto.CSPingReq:
  106. //登陆成功后开始发送pingreq消息
  107. //log.Println("[FrontendPackageProc] PingReq->PingAck:", msgType)
  108. s.Send(&serverproto.SCPingAck{})
  109. case *serverproto.CSLoginReq:
  110. //in.OpenId = in.OpenId + "@" + in.Platform
  111. // 加密处理
  112. //s.SetAES(in.CryptPass)
  113. //发送给auth进行验证,操作并获取是否有断线数据(断线重连使用)
  114. //第一个到gate的消息,绑定gameserver
  115. // 通过auth服务器来验证用户的合法性,并返回之前是否已经绑定过一个gameserver,可能
  116. // 有离线数据(如果重新绑定新的gameserver会有数据一致性问题),这边暂时处理成直接绑定第一个
  117. switch rawType := s.Raw().(type) {
  118. case net.Conn:
  119. in.Ip = rawType.RemoteAddr().String()
  120. case *websocket.Conn:
  121. in.Ip = rawType.RemoteAddr().String()
  122. }
  123. util.WarnF("CSLoginReq msg=%v sessionid=%v ip=%v", msg, s.ID(), in.Ip)
  124. //authNodeStr := model.SelectServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR)
  125. authNodeStr := model.SelectAuthServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR, s.ID())
  126. tmpNode := model.GetServiceNode(authNodeStr)
  127. if tmpNode == nil {
  128. model.RemoveServiceNodeByName(authNodeStr)
  129. authNodeStr, _ = model.GetServiceNodeAndSession("", model.SERVICE_NODE_TYPE_AUTH_STR, s.ID())
  130. }
  131. cliUser, err := model.BindClientToBackendNew(authNodeStr, s, in.OpenId, in.Platform)
  132. if err == nil {
  133. newMsgData, _, _ := rpc.EncodeMessage(in)
  134. cliUser.LoginData = newMsgData
  135. //绑定成功
  136. //转发给对应的服务器做处理,例如game,auth
  137. err = cliUser.ClientDirect2Backend(authNodeStr, int(msgId), seqId, newMsgData, model.SERVICE_NODE_TYPE_AUTH_STR)
  138. if err != nil {
  139. util.WarnF("CSLoginReq ClientDirect2Backend err=%v auth=%v msg=%v", err, authNodeStr, msg)
  140. s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
  141. }
  142. } else {
  143. util.WarnF("CSLoginReq BindClientToBackendNew err=%v auth=%v msg=%v", err, authNodeStr, msg)
  144. s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
  145. }
  146. case *serverproto.CSReconnectReq:
  147. ////加密处理
  148. //s.SetAES(in.CryptPass)
  149. ////发送消息给game,重新进行session绑定
  150. //cliUser := model.ClientMag.GetReconnectFromOpenId(in.OpenId)
  151. //err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0,
  152. // msgData, model.SERVICE_NODE_TYPE_GAME_STR)
  153. //if err != nil {
  154. // //重连超时,不允许继续重连
  155. // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  156. // baseutil.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg)
  157. //}
  158. //
  159. //ret := cliUser.ReconnectReset(s)
  160. //加密处理
  161. //s.SetAES(in.CryptPass)
  162. // 1.后续sdk需要拿token去服务器进行验证,避免盗号
  163. util.WarnF("CSReconnectReq msg=%v sessionId=%v", msg, s.ID())
  164. cliUser := model.ClientMag.GetConnectedFromOpenId(in.OpenId, in.Platform)
  165. if cliUser == nil {
  166. //该玩家没有登录过无法进行断线重连
  167. s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
  168. util.InfoF("CSReconnectReq cliUser=%v msg=%v user not connected before", cliUser, msg)
  169. } else {
  170. //if cliUser.IsReconnectExpire() {
  171. // //重连超时,不允许继续重连 / 没有断线不允许发送重连消息
  172. // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  173. // util.InfoF("CSReconnectReq cliUser=%v msg=%v user connect time out", cliUser, msg)
  174. // return
  175. //}
  176. ret := cliUser.ConnectReset(s)
  177. if ret != nil {
  178. //session绑定出错
  179. s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  180. util.InfoF("CSReconnectReqErr msg=%v session bind use err=%v", msg, ret)
  181. s.Close()
  182. return
  183. }
  184. //发送消息给game,重新进行session绑定
  185. err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0,
  186. msgData, model.SERVICE_NODE_TYPE_GAME_STR)
  187. if err != nil {
  188. //重连超时,不允许继续重连
  189. s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  190. util.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg)
  191. } else {
  192. s.SetSessionOptFlag(true)
  193. util.WarnF("CSReconnectReq success msg=%v sessionid=%v", msg, s.ID())
  194. }
  195. }
  196. }
  197. default:
  198. //gm白名单处理
  199. //if msgId == CSPlayerGMReqMsgId && len(service.GetServiceConfig().Node.WhiteListGM) >= 0 {
  200. // bFind := false
  201. // gmIPStr := ""
  202. // switch t := s.Raw().(type) {
  203. // case net.Conn:
  204. // gmIPStr = t.RemoteAddr().String()
  205. // case *websocket.Conn:
  206. // gmIPStr = t.RemoteAddr().String()
  207. // }
  208. // //gmIPStr := s.Raw().(net.Conn).RemoteAddr().String()
  209. // for idx := 0; idx < len(service.GetServiceConfig().Node.WhiteListGM); idx++ {
  210. // if strings.Contains(gmIPStr, service.GetServiceConfig().Node.WhiteListGM[idx]) {
  211. // bFind = true
  212. // break
  213. // }
  214. // if strings.Contains("0.0.0.0", service.GetServiceConfig().Node.WhiteListGM[idx]) {
  215. // bFind = true
  216. // break
  217. // }
  218. // }
  219. // if !bFind {
  220. // s.Close()
  221. // return
  222. // }
  223. //}
  224. //todo...目前暂时只针对gate做了透传,后期需要对所有服务器节点都可以执行透传
  225. routeRule := router.GetRuleByMsgID(msgId)
  226. if routeRule == nil {
  227. util.ErrorF("message not in route table, msgid=%v", msgId)
  228. //return nil, fmt.Errorf("message not in route table, msgid=%v", msgId)
  229. return nil, nil
  230. }
  231. //查看当前session对应的用户是否存在,否则不处理
  232. cliUser := model.Session2User(s)
  233. //cliUser.CheckAoi(msgId, CSPlayerMoveReqMsgId, CSPlayerMoveToReqMsgId, msgData)
  234. if cliUser != nil {
  235. //这边需要加锁(在调用GetServiceBackend时使用了读写锁)
  236. serviceId := cliUser.GetServiceBackend(routeRule.Mod)
  237. if serviceId == "" {
  238. //还没有绑定game/battlerecord节点就发送了别的消息
  239. //非正常流程导致,可能节点暂时未绑定
  240. if routeRule.Mod != model.SERVICE_NODE_TYPE_BATTLERECORD_STR {
  241. s.Close()
  242. }
  243. } else {
  244. //打印增加多线程冲突
  245. //baseutil.InfoF("receive other msg send to game server serviceId=%v msgid=%v", serviceId, msgId)
  246. //包学序列号合法校验,避免发送重复的包
  247. //if cliUser.SeqId >= seqId {
  248. // ntfMsg := &serverproto.SCKickOutNtf{
  249. // Error: int32(serverproto.ErrorCode_ERROR_FAIL),
  250. // }
  251. //
  252. // s.Send(ntfMsg)
  253. // s.(rocommon.ContextSet).SetContextData("user", nil, "SSUserKickNtf")
  254. // return
  255. //}
  256. //cliUser.SeqId = seqId
  257. //转发给后台服务器
  258. err = cliUser.ClientDirect2Backend(serviceId, msgId, seqId, msgData, routeRule.Mod)
  259. if err != nil {
  260. util.WarnF("ClientDirect2Backend sessionid=%v msgid=%v err=%v", s.ID(), msgId, err)
  261. //非正常流程导致,可能节点暂时未绑定
  262. model.ClientMag.RemoveConnectedFromOpenId(cliUser.OpenId, cliUser.Platform)
  263. s.Close()
  264. }
  265. }
  266. } else {
  267. //todo...是否需要断开连接
  268. util.InfoF("cliUser not exist s=%v msgId=%v", s, msgId)
  269. }
  270. }
  271. return
  272. }