proc_rpc.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. package model
  2. import (
  3. "io"
  4. "net"
  5. "rocommon"
  6. "rocommon/rpc"
  7. "rocommon/service"
  8. "rocommon/socket"
  9. _ "rocommon/socket/tcp"
  10. "rocommon/util"
  11. "roserver/baseserver/model"
  12. "roserver/baseserver/router"
  13. "roserver/serverproto"
  14. "strings"
  15. "github.com/gorilla/websocket"
  16. )
  17. // 消息解析操作
  18. // MessageProcessor def.go
  19. // TCPMessageProcessor procrpc.go 基础实现方法
  20. type DirectTCPMessageProcessor struct {
  21. }
  22. // recv直接原始数据传递到后端 / 返回消息给client
  23. func (this *DirectTCPMessageProcessor) OnRecvMsg(s rocommon.Session) (msg interface{}, seqId uint32, err error) {
  24. reader, ok := s.Raw().(io.Reader)
  25. if !ok || reader == nil {
  26. util.InfoF("[DirectTCPMessageProcessor] OnRecvMsg err")
  27. return nil, 0, nil
  28. }
  29. //opt := s.Node().(socket.SocketOption)
  30. var opt socket.SocketOption
  31. if s.GetSessionOptFlag() {
  32. opt = s.Node().(socket.SocketOption)
  33. } else {
  34. opt = s.GetSessionOpt().(socket.SocketOption)
  35. }
  36. opt.SocketReadTimeout(reader.(net.Conn), func() {
  37. var msgId uint16
  38. var seqId uint32 //包序列号,客户端发送时的序列从1开始
  39. var msgData []byte
  40. var flagId uint16 //加密方式
  41. //获取原始格式数据包(pb序列化的二进制数据)
  42. msgId, seqId, flagId, msgData, err = rpc.RecvPackageData(reader, opt.MaxMsgLen())
  43. //尝试直接发送到其他后端服务器或者解析
  44. if err == nil {
  45. msg, err = FrontendPackageProc(int(msgId), seqId, flagId, msgData, s)
  46. util.DebugF("OnRecvMsg-FrontendPackageProc msg=%v error=%v", msg, err)
  47. if msgId != 1000 {
  48. util.InfoF("OnRecvMsg-RecvPackageData msgId=%v", msgId)
  49. //log.Printf("OnRecvMsg-RecvPackageData msgData=%v", string(msgData))
  50. }
  51. }
  52. })
  53. return
  54. }
  55. // send 直接发往客户端的消息
  56. func (this *DirectTCPMessageProcessor) OnSendMsg(s rocommon.Session, msg interface{}) (err error) {
  57. writer, ok := s.Raw().(io.Writer)
  58. if !ok || writer == nil {
  59. util.InfoF("[DirectTCPMessageProcessor] OnSendMsg err")
  60. return nil
  61. }
  62. //opt := s.Node().(socket.SocketOption)
  63. var opt socket.SocketOption
  64. if s.GetSessionOptFlag() {
  65. opt = s.Node().(socket.SocketOption)
  66. } else {
  67. opt = s.GetSessionOpt().(socket.SocketOption)
  68. }
  69. opt.SocketWriteTimeout(writer.(net.Conn), func() {
  70. err = rpc.SendMessage(writer, msg, s.GetAES(), opt.MaxMsgLen(), "")
  71. })
  72. return
  73. }
  74. var (
  75. PingReqMsgID = rocommon.MessageInfoByName("CSPingReq").ID
  76. LoginReqMsgId = rocommon.MessageInfoByName("CSLoginReq").ID
  77. ReconnectMsgId = rocommon.MessageInfoByName("CSReconnectReq").ID
  78. //CSPlayerMoveReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveReq").ID
  79. //CSPlayerMoveToReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveToReq").ID
  80. CSPlayerGMReqMsgId = rocommon.MessageInfoByName("CSGMCommandReq").ID
  81. )
  82. // 尝试直接发送到其他后端服务器或者解析后直接回复client
  83. func FrontendPackageProc(msgId int, seqId uint32, flagId uint16, msgData []byte, s rocommon.Session) (msg interface{}, err error) {
  84. switch flagId {
  85. case 1: //rsa加密的数据
  86. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateKey)
  87. if err != nil {
  88. return nil, err
  89. }
  90. case 2: //aes加密的数据
  91. //msgData, err = rpc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...)
  92. msgData, err = rpc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...)
  93. if err != nil {
  94. return nil, err
  95. }
  96. }
  97. //gate接收到客户端消息的处理部分
  98. switch int(msgId) {
  99. case PingReqMsgID, LoginReqMsgId, ReconnectMsgId:
  100. msg, _, err = rpc.DecodeMessage(msgId, msgData)
  101. if err != nil {
  102. util.InfoF("FrontendPackageProc err=%v msgId=%v flagId=%v", err, msgId, flagId)
  103. return nil, err
  104. }
  105. //switch msgType := msg.(type) {
  106. switch in := msg.(type) {
  107. case *serverproto.CSPingReq:
  108. //登陆成功后开始发送pingreq消息
  109. //log.Println("[FrontendPackageProc] PingReq->PingAck:", msgType)
  110. s.Send(&serverproto.SCPingAck{})
  111. case *serverproto.CSLoginReq:
  112. //in.OpenId = in.OpenId + "@" + in.Platform
  113. // 加密处理
  114. //s.SetAES(in.CryptPass)
  115. //发送给auth进行验证,操作并获取是否有断线数据(断线重连使用)
  116. //第一个到gate的消息,绑定gameserver
  117. // 通过auth服务器来验证用户的合法性,并返回之前是否已经绑定过一个gameserver,可能
  118. // 有离线数据(如果重新绑定新的gameserver会有数据一致性问题),这边暂时处理成直接绑定第一个
  119. switch rawType := s.Raw().(type) {
  120. case net.Conn:
  121. in.Ip = rawType.RemoteAddr().String()
  122. case *websocket.Conn:
  123. in.Ip = rawType.RemoteAddr().String()
  124. }
  125. util.WarnF("CSLoginReq msg=%v sessionid=%v ip=%v", msg, s.ID(), in.Ip)
  126. //authNodeStr := model.SelectServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR)
  127. authNodeStr := model.SelectAuthServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR, s.ID())
  128. tmpNode := model.GetServiceNode(authNodeStr)
  129. if tmpNode == nil {
  130. model.RemoveServiceNodeByName(authNodeStr)
  131. authNodeStr, _ = model.GetServiceNodeAndSession("", model.SERVICE_NODE_TYPE_AUTH_STR, s.ID())
  132. }
  133. cliUser, err := model.BindClientToBackendNew(authNodeStr, s, in.OpenId, in.Platform)
  134. if err == nil {
  135. newMsgData, _, _ := rpc.EncodeMessage(in)
  136. cliUser.LoginData = newMsgData
  137. //绑定成功
  138. //转发给对应的服务器做处理,例如game,auth
  139. err = cliUser.ClientDirect2Backend(authNodeStr, int(msgId), seqId, newMsgData, model.SERVICE_NODE_TYPE_AUTH_STR)
  140. if err != nil {
  141. util.WarnF("CSLoginReq ClientDirect2Backend err=%v auth=%v msg=%v", err, authNodeStr, msg)
  142. s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
  143. }
  144. } else {
  145. util.WarnF("CSLoginReq BindClientToBackendNew err=%v auth=%v msg=%v", err, authNodeStr, msg)
  146. s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
  147. }
  148. case *serverproto.CSReconnectReq:
  149. ////加密处理
  150. //s.SetAES(in.CryptPass)
  151. ////发送消息给game,重新进行session绑定
  152. //cliUser := model.ClientMag.GetReconnectFromOpenId(in.OpenId)
  153. //err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0,
  154. // msgData, model.SERVICE_NODE_TYPE_GAME_STR)
  155. //if err != nil {
  156. // //重连超时,不允许继续重连
  157. // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  158. // baseutil.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg)
  159. //}
  160. //
  161. //ret := cliUser.ReconnectReset(s)
  162. //加密处理
  163. //s.SetAES(in.CryptPass)
  164. // 1.后续sdk需要拿token去服务器进行验证,避免盗号
  165. util.WarnF("CSReconnectReq msg=%v sessionId=%v", msg, s.ID())
  166. cliUser := model.ClientMag.GetConnectedFromOpenId(in.OpenId, in.Platform)
  167. if cliUser == nil {
  168. //该玩家没有登录过无法进行断线重连
  169. s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
  170. util.InfoF("CSReconnectReq cliUser=%v msg=%v user not connected before", cliUser, msg)
  171. } else {
  172. //if cliUser.IsReconnectExpire() {
  173. // //重连超时,不允许继续重连 / 没有断线不允许发送重连消息
  174. // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  175. // util.InfoF("CSReconnectReq cliUser=%v msg=%v user connect time out", cliUser, msg)
  176. // return
  177. //}
  178. ret := cliUser.ConnectReset(s)
  179. if ret != nil {
  180. //session绑定出错
  181. s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  182. util.InfoF("CSReconnectReqErr msg=%v session bind use err=%v", msg, ret)
  183. s.Close()
  184. return
  185. }
  186. //发送消息给game,重新进行session绑定
  187. err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0,
  188. msgData, model.SERVICE_NODE_TYPE_GAME_STR)
  189. if err != nil {
  190. //重连超时,不允许继续重连
  191. s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  192. util.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg)
  193. } else {
  194. s.SetSessionOptFlag(true)
  195. util.WarnF("CSReconnectReq success msg=%v sessionid=%v", msg, s.ID())
  196. }
  197. }
  198. }
  199. default:
  200. //gm白名单处理
  201. if msgId == CSPlayerGMReqMsgId && len(service.GetServiceConfig().Node.WhiteListGM) >= 0 {
  202. bFind := false
  203. gmIPStr := ""
  204. switch t := s.Raw().(type) {
  205. case net.Conn:
  206. gmIPStr = t.RemoteAddr().String()
  207. case *websocket.Conn:
  208. gmIPStr = t.RemoteAddr().String()
  209. }
  210. //gmIPStr := s.Raw().(net.Conn).RemoteAddr().String()
  211. for idx := 0; idx < len(service.GetServiceConfig().Node.WhiteListGM); idx++ {
  212. if strings.Contains(gmIPStr, service.GetServiceConfig().Node.WhiteListGM[idx]) {
  213. bFind = true
  214. break
  215. }
  216. if strings.Contains("0.0.0.0", service.GetServiceConfig().Node.WhiteListGM[idx]) {
  217. bFind = true
  218. break
  219. }
  220. }
  221. if !bFind {
  222. s.Close()
  223. return
  224. }
  225. }
  226. //todo...目前暂时只针对gate做了透传,后期需要对所有服务器节点都可以执行透传
  227. routeRule := router.GetRuleByMsgID(msgId)
  228. if routeRule == nil {
  229. util.ErrorF("message not in route table, msgid=%v", msgId)
  230. //return nil, fmt.Errorf("message not in route table, msgid=%v", msgId)
  231. return nil, nil
  232. }
  233. //查看当前session对应的用户是否存在,否则不处理
  234. cliUser := model.Session2User(s)
  235. //cliUser.CheckAoi(msgId, CSPlayerMoveReqMsgId, CSPlayerMoveToReqMsgId, msgData)
  236. if cliUser != nil {
  237. //这边需要加锁(在调用GetServiceBackend时使用了读写锁)
  238. serviceId := cliUser.GetServiceBackend(routeRule.Mod)
  239. if serviceId == "" {
  240. //还没有绑定game/battlerecord节点就发送了别的消息
  241. //非正常流程导致,可能节点暂时未绑定
  242. if routeRule.Mod != model.SERVICE_NODE_TYPE_BATTLERECORD_STR {
  243. s.Close()
  244. }
  245. } else {
  246. //打印增加多线程冲突
  247. //baseutil.InfoF("receive other msg send to game server serviceId=%v msgid=%v", serviceId, msgId)
  248. //包学序列号合法校验,避免发送重复的包
  249. //if cliUser.SeqId >= seqId {
  250. // ntfMsg := &serverproto.SCKickOutNtf{
  251. // Error: int32(serverproto.ErrorCode_ERROR_FAIL),
  252. // }
  253. //
  254. // s.Send(ntfMsg)
  255. // s.(rocommon.ContextSet).SetContextData("user", nil, "SSUserKickNtf")
  256. // return
  257. //}
  258. //cliUser.SeqId = seqId
  259. //转发给后台服务器
  260. err = cliUser.ClientDirect2Backend(serviceId, msgId, seqId, msgData, routeRule.Mod)
  261. if err != nil {
  262. util.WarnF("ClientDirect2Backend sessionid=%v msgid=%v err=%v", s.ID(), msgId, err)
  263. //非正常流程导致,可能节点暂时未绑定
  264. model.ClientMag.RemoveConnectedFromOpenId(cliUser.OpenId, cliUser.Platform)
  265. s.Close()
  266. }
  267. }
  268. } else {
  269. //todo...是否需要断开连接
  270. util.InfoF("cliUser not exist s=%v msgId=%v", s, msgId)
  271. }
  272. }
  273. return
  274. }