proc_rpc.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package model
  2. import (
  3. "github.com/gorilla/websocket"
  4. "io"
  5. "net"
  6. "rocommon"
  7. "rocommon/rpc"
  8. "rocommon/service"
  9. "rocommon/socket"
  10. _ "rocommon/socket/tcp"
  11. "rocommon/util"
  12. "roserver/baseserver/model"
  13. "roserver/baseserver/router"
  14. "roserver/serverproto"
  15. "strings"
  16. )
  17. //消息解析操作
  18. //MessageProcessor def.go
  19. //TCPMessageProcessor procrpc.go 基础实现方法
  20. type DirectTCPMessageProcessor struct {
  21. }
  22. //recv直接原始数据传递到后端 / 返回消息给client
  23. func (this *DirectTCPMessageProcessor) OnRecvMsg(s rocommon.Session) (msg interface{}, seqId uint32, err error) {
  24. reader, ok := s.Raw().(io.Reader)
  25. if !ok || reader == nil {
  26. util.InfoF("[DirectTCPMessageProcessor] OnRecvMsg err")
  27. return nil, 0, nil
  28. }
  29. //opt := s.Node().(socket.SocketOption)
  30. var opt socket.SocketOption
  31. if s.GetSessionOptFlag() {
  32. opt = s.Node().(socket.SocketOption)
  33. } else {
  34. opt = s.GetSessionOpt().(socket.SocketOption)
  35. }
  36. opt.SocketReadTimeout(reader.(net.Conn), func() {
  37. var msgId uint16
  38. var seqId uint32 //包序列号,客户端发送时的序列从1开始
  39. var msgData []byte
  40. var flagId uint16 //加密方式
  41. //获取原始格式数据包(pb序列化的二进制数据)
  42. msgId, seqId, flagId, msgData, err = rpc.RecvPackageData(reader, opt.MaxMsgLen())
  43. //尝试直接发送到其他后端服务器或者解析
  44. if err == nil {
  45. msg, err = FrontendPackageProc(int(msgId), seqId, flagId, msgData, s)
  46. }
  47. })
  48. return
  49. }
  50. //send 直接发往客户端的消息
  51. func (this *DirectTCPMessageProcessor) OnSendMsg(s rocommon.Session, msg interface{}) (err error) {
  52. writer, ok := s.Raw().(io.Writer)
  53. if !ok || writer == nil {
  54. util.InfoF("[DirectTCPMessageProcessor] OnSendMsg err")
  55. return nil
  56. }
  57. //opt := s.Node().(socket.SocketOption)
  58. var opt socket.SocketOption
  59. if s.GetSessionOptFlag() {
  60. opt = s.Node().(socket.SocketOption)
  61. } else {
  62. opt = s.GetSessionOpt().(socket.SocketOption)
  63. }
  64. opt.SocketWriteTimeout(writer.(net.Conn), func() {
  65. err = rpc.SendMessage(writer, msg, s.GetAES(), opt.MaxMsgLen(), "")
  66. })
  67. return
  68. }
  69. var (
  70. PingReqMsgID = rocommon.MessageInfoByName("CSPingReq").ID
  71. LoginReqMsgId = rocommon.MessageInfoByName("CSLoginReq").ID
  72. ReconnectMsgId = rocommon.MessageInfoByName("CSReconnectReq").ID
  73. //CSPlayerMoveReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveReq").ID
  74. //CSPlayerMoveToReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveToReq").ID
  75. CSPlayerGMReqMsgId = rocommon.MessageInfoByName("CSGMCommandReq").ID
  76. )
  77. //尝试直接发送到其他后端服务器或者解析后直接回复client
  78. func FrontendPackageProc(msgId int, seqId uint32, flagId uint16, msgData []byte, s rocommon.Session) (msg interface{}, err error) {
  79. switch flagId {
  80. case 1: //rsa加密的数据
  81. msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateKey)
  82. if err != nil {
  83. return nil, err
  84. }
  85. case 2: //aes加密的数据
  86. //msgData, err = rpc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...)
  87. msgData, err = rpc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...)
  88. if err != nil {
  89. return nil, err
  90. }
  91. }
  92. //gate接收到客户端消息的处理部分
  93. switch int(msgId) {
  94. case PingReqMsgID, LoginReqMsgId, ReconnectMsgId:
  95. msg, _, err = rpc.DecodeMessage(msgId, msgData)
  96. if err != nil {
  97. util.InfoF("FrontendPackageProc err=%v msgId=%v flagId=%v", err, msgId, flagId)
  98. return nil, err
  99. }
  100. //switch msgType := msg.(type) {
  101. switch in := msg.(type) {
  102. case *serverproto.CSPingReq:
  103. //登陆成功后开始发送pingreq消息
  104. //log.Println("[FrontendPackageProc] PingReq->PingAck:", msgType)
  105. s.Send(&serverproto.SCPingAck{})
  106. case *serverproto.CSLoginReq:
  107. //in.OpenId = in.OpenId + "@" + in.Platform
  108. // 加密处理
  109. //s.SetAES(in.CryptPass)
  110. //发送给auth进行验证,操作并获取是否有断线数据(断线重连使用)
  111. //第一个到gate的消息,绑定gameserver
  112. // 通过auth服务器来验证用户的合法性,并返回之前是否已经绑定过一个gameserver,可能
  113. // 有离线数据(如果重新绑定新的gameserver会有数据一致性问题),这边暂时处理成直接绑定第一个
  114. switch rawType := s.Raw().(type) {
  115. case net.Conn:
  116. in.Ip = rawType.RemoteAddr().String()
  117. case *websocket.Conn:
  118. in.Ip = rawType.RemoteAddr().String()
  119. }
  120. util.WarnF("CSLoginReq msg=%v sessionid=%v ip=%v", msg, s.ID(), in.Ip)
  121. //authNodeStr := model.SelectServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR)
  122. authNodeStr := model.SelectAuthServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR, s.ID())
  123. tmpNode := model.GetServiceNode(authNodeStr)
  124. if tmpNode == nil {
  125. model.RemoveServiceNodeByName(authNodeStr)
  126. authNodeStr, _ = model.GetServiceNodeAndSession("", model.SERVICE_NODE_TYPE_AUTH_STR, s.ID())
  127. }
  128. cliUser, err := model.BindClientToBackendNew(authNodeStr, s, in.OpenId, in.Platform)
  129. if err == nil {
  130. newMsgData, _, _ := rpc.EncodeMessage(in)
  131. cliUser.LoginData = newMsgData
  132. //绑定成功
  133. //转发给对应的服务器做处理,例如game,auth
  134. err = cliUser.ClientDirect2Backend(authNodeStr, int(msgId), seqId, newMsgData, model.SERVICE_NODE_TYPE_AUTH_STR)
  135. if err != nil {
  136. util.WarnF("CSLoginReq ClientDirect2Backend err=%v auth=%v msg=%v", err, authNodeStr, msg)
  137. s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
  138. }
  139. } else {
  140. util.WarnF("CSLoginReq BindClientToBackendNew err=%v auth=%v msg=%v", err, authNodeStr, msg)
  141. s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
  142. }
  143. case *serverproto.CSReconnectReq:
  144. ////加密处理
  145. //s.SetAES(in.CryptPass)
  146. ////发送消息给game,重新进行session绑定
  147. //cliUser := model.ClientMag.GetReconnectFromOpenId(in.OpenId)
  148. //err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0,
  149. // msgData, model.SERVICE_NODE_TYPE_GAME_STR)
  150. //if err != nil {
  151. // //重连超时,不允许继续重连
  152. // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  153. // baseutil.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg)
  154. //}
  155. //
  156. //ret := cliUser.ReconnectReset(s)
  157. //加密处理
  158. //s.SetAES(in.CryptPass)
  159. // 1.后续sdk需要拿token去服务器进行验证,避免盗号
  160. util.WarnF("CSReconnectReq msg=%v sessionId=%v", msg, s.ID())
  161. cliUser := model.ClientMag.GetConnectedFromOpenId(in.OpenId, in.Platform)
  162. if cliUser == nil {
  163. //该玩家没有登录过无法进行断线重连
  164. s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
  165. util.InfoF("CSReconnectReq cliUser=%v msg=%v user not connected before", cliUser, msg)
  166. } else {
  167. //if cliUser.IsReconnectExpire() {
  168. // //重连超时,不允许继续重连 / 没有断线不允许发送重连消息
  169. // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  170. // util.InfoF("CSReconnectReq cliUser=%v msg=%v user connect time out", cliUser, msg)
  171. // return
  172. //}
  173. ret := cliUser.ConnectReset(s)
  174. if ret != nil {
  175. //session绑定出错
  176. s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  177. util.InfoF("CSReconnectReqErr msg=%v session bind use err=%v", msg, ret)
  178. s.Close()
  179. return
  180. }
  181. //发送消息给game,重新进行session绑定
  182. err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0,
  183. msgData, model.SERVICE_NODE_TYPE_GAME_STR)
  184. if err != nil {
  185. //重连超时,不允许继续重连
  186. s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
  187. util.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg)
  188. } else {
  189. s.SetSessionOptFlag(true)
  190. util.WarnF("CSReconnectReq success msg=%v sessionid=%v", msg, s.ID())
  191. }
  192. }
  193. }
  194. default:
  195. //gm白名单处理
  196. if msgId == CSPlayerGMReqMsgId && len(service.GetServiceConfig().Node.WhiteListGM) >= 0 {
  197. bFind := false
  198. gmIPStr := ""
  199. switch t := s.Raw().(type) {
  200. case net.Conn:
  201. gmIPStr = t.RemoteAddr().String()
  202. case *websocket.Conn:
  203. gmIPStr = t.RemoteAddr().String()
  204. }
  205. //gmIPStr := s.Raw().(net.Conn).RemoteAddr().String()
  206. for idx := 0; idx < len(service.GetServiceConfig().Node.WhiteListGM); idx++ {
  207. if strings.Contains(gmIPStr, service.GetServiceConfig().Node.WhiteListGM[idx]) {
  208. bFind = true
  209. break
  210. }
  211. if strings.Contains("0.0.0.0", service.GetServiceConfig().Node.WhiteListGM[idx]) {
  212. bFind = true
  213. break
  214. }
  215. }
  216. if !bFind {
  217. s.Close()
  218. return
  219. }
  220. }
  221. //todo...目前暂时只针对gate做了透传,后期需要对所有服务器节点都可以执行透传
  222. routeRule := router.GetRuleByMsgID(msgId)
  223. if routeRule == nil {
  224. util.ErrorF("message not in route table, msgid=%v", msgId)
  225. //return nil, fmt.Errorf("message not in route table, msgid=%v", msgId)
  226. return nil, nil
  227. }
  228. //查看当前session对应的用户是否存在,否则不处理
  229. cliUser := model.Session2User(s)
  230. //cliUser.CheckAoi(msgId, CSPlayerMoveReqMsgId, CSPlayerMoveToReqMsgId, msgData)
  231. if cliUser != nil {
  232. //这边需要加锁(在调用GetServiceBackend时使用了读写锁)
  233. serviceId := cliUser.GetServiceBackend(routeRule.Mod)
  234. if serviceId == "" {
  235. //还没有绑定game/battlerecord节点就发送了别的消息
  236. //非正常流程导致,可能节点暂时未绑定
  237. if routeRule.Mod != model.SERVICE_NODE_TYPE_BATTLERECORD_STR {
  238. s.Close()
  239. }
  240. } else {
  241. //打印增加多线程冲突
  242. //baseutil.InfoF("receive other msg send to game server serviceId=%v msgid=%v", serviceId, msgId)
  243. //包学序列号合法校验,避免发送重复的包
  244. //if cliUser.SeqId >= seqId {
  245. // ntfMsg := &serverproto.SCKickOutNtf{
  246. // Error: int32(serverproto.ErrorCode_ERROR_FAIL),
  247. // }
  248. //
  249. // s.Send(ntfMsg)
  250. // s.(rocommon.ContextSet).SetContextData("user", nil, "SSUserKickNtf")
  251. // return
  252. //}
  253. //cliUser.SeqId = seqId
  254. //转发给后台服务器
  255. err = cliUser.ClientDirect2Backend(serviceId, msgId, seqId, msgData, routeRule.Mod)
  256. if err != nil {
  257. util.WarnF("ClientDirect2Backend sessionid=%v msgid=%v err=%v", s.ID(), msgId, err)
  258. //非正常流程导致,可能节点暂时未绑定
  259. model.ClientMag.RemoveConnectedFromOpenId(cliUser.OpenId, cliUser.Platform)
  260. s.Close()
  261. }
  262. }
  263. } else {
  264. //todo...是否需要断开连接
  265. util.InfoF("cliUser not exist s=%v msgId=%v", s, msgId)
  266. }
  267. }
  268. return
  269. }