hook_event.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. package baseserver
  2. import (
  3. "math"
  4. "rocommon"
  5. "rocommon/rpc"
  6. "rocommon/service"
  7. "rocommon/util"
  8. "roserver/baseserver/model"
  9. "roserver/baseserver/router"
  10. "roserver/serverproto"
  11. "strconv"
  12. )
  13. // /////////////////////////////////////////ServerTCPEventHook
  14. // game.backend
  15. // 服务器之间的消息处理派发
  16. type ServerTCPEventHook struct {
  17. recvPingNum int32
  18. }
  19. // def.go EventHook interface
  20. func (this *ServerTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  21. util.DebugF("hook_event in_event msg=%v in=%v", in.Msg(), in)
  22. switch msg := in.Msg().(type) {
  23. case *serverproto.ServiceIdentifyACK: //来自其他服务器的连接确认信息
  24. util.InfoF("[RecvServiceIdentifyACK]%v id=%v", msg.ServiceId, in.Session().ID())
  25. // 重连时会有问题,重连上来时,但是上一个连接还未移除(正在移除中),导致重连失败(想连接的没连接上,该移除的正在移除)
  26. // 通过PingReq超时断开连接,来触发断线重連
  27. if serviceNode := model.GetServiceNode(msg.ServiceId); serviceNode == nil {
  28. //添加连接上来的对端服务
  29. model.AddServiceNode(in.Session(), msg.ServiceId, msg.ServiceName, "remote")
  30. //服务器之间才启用heartbeat操作(只能反应ack端的send和发起连接端的recv是否正常) 5s
  31. in.Session().HeartBeat(&serverproto.PingReq{NeedAck: true})
  32. }
  33. //util.ErrorF("927 ServiceConnBySID:%+v", model.ServiceConnBySID)
  34. //util.ErrorF("927 ServiceConnByZone:%+v", model.ServiceConnByZone)
  35. case *serverproto.PingReq:
  36. {
  37. //来自ack服务器的ping消息
  38. ctx := in.Session().(rocommon.ContextSet)
  39. var sid *service.ETCDServiceDesc
  40. in.Session().IncRecvPingNum(1)
  41. if in.Session().RecvPingNum() >= 10 { //50s打印一次,收到10次打印一次
  42. in.Session().IncRecvPingNum(-1)
  43. if ctx.RawContextData("ctx", &sid) {
  44. util.DebugF("[RecvServicePing]Receive PingReq from session=%v node=%v", in.Session().ID(), sid.ID)
  45. }
  46. }
  47. if msg.NeedAck {
  48. in.Session().Send(&serverproto.PingReq{NeedAck: false})
  49. }
  50. }
  51. case *rocommon.SessionConnected:
  52. util.InfoF("[SessionConnected] node=%v id=%v", in.Session().Node(), in.Session().ID())
  53. //连接上对应类型的服务器节点后,发送确认信息(ServiceIdentifyACK),告诉对端自己的服务器类型
  54. ctx := in.Session().Node().(rocommon.ContextSet)
  55. var sid *service.ETCDServiceDesc
  56. //sid在CreateConnector中会指定
  57. if ctx.RawContextData("sid", &sid) {
  58. util.InfoF("[SessionConnected] endmsg before")
  59. in.Session().Send(&serverproto.ServiceIdentifyACK{
  60. //发送自身服务器节点的信息
  61. ServiceName: service.GetServiceName(), //service/init.go
  62. ServiceId: service.GetLocalServiceID(),
  63. ServerStartTime: util.GetTimeMilliseconds(),
  64. })
  65. //添加远程的服务器节点到本地,sid服务器信息是从etcd中获取的
  66. model.AddServiceNode(in.Session(), sid.ID, sid.Name, "local")
  67. util.InfoF("[SendServiceIdentifyACK_local][%v]->[%v] id=%v", service.GetLocalServiceID(), sid.ID, in.Session().ID())
  68. } else {
  69. util.InfoF("connector not exist sid")
  70. }
  71. case *rocommon.SessionClosed:
  72. closeSID := model.RemoveServiceNode(in.Session())
  73. if closeSID != "" {
  74. msg.CloseSId = closeSID
  75. }
  76. util.InfoF("[ServerTCPEventHook::InEvent] Readmsg error SessionClosed session=%v", in.Session().ID())
  77. case *rocommon.SessionConnectError:
  78. util.InfoF("[ServerTCPEventHook::InEvent] connector error=%v", msg.String())
  79. }
  80. return in
  81. }
  82. func (this *ServerTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  83. return out
  84. }
  85. // /////////////////////////////////////////BackendTCPEventHook
  86. type BackendTCPEventHook struct {
  87. selectRouterIdx int
  88. }
  89. // def.go EventHook interface
  90. // 后端服务器接收到来自gate/db/auth的消息
  91. func (this *BackendTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  92. util.DebugF("BackendTCPhook_event BackendTCPin_event msg=%v in=%v", in.Msg(), in)
  93. switch inMsg := in.Msg().(type) {
  94. case *serverproto.GateTransmitAck:
  95. userMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  96. util.InfoF("BackendTCPEventHook InEvent clientID=%v msgID=%v msg=%v", inMsg.ClientId, inMsg.MsgId, inMsg.MsgData)
  97. if err != nil {
  98. util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  99. return nil
  100. }
  101. //if inMsg.MsgId == 1173 {
  102. // util.DebugF("kvtime recv cli=%v deltime=%v", inMsg.ClientId, util.GetTimeMilliseconds()-inMsg.KvTime)
  103. //}
  104. //封装成来自gate的消息事件
  105. //todo...这边需要添加gate和game的连接信息,否则game工作线程获取session时会有多线程冲突
  106. return &model.RecvGateMsgEvent{
  107. Sess: in.Session(),
  108. Message: userMsg,
  109. ClientID: inMsg.ClientId,
  110. MsgSeqId: inMsg.SeqId,
  111. KvTime: inMsg.KvTime,
  112. }
  113. case *serverproto.ServiceTransmitAck:
  114. //log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg)
  115. transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  116. if err != nil {
  117. util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  118. return nil
  119. }
  120. //cross begin
  121. //判断是否是跨服操作(针对于social服务器)
  122. routeRule := router.GetRuleByMsgID(int(inMsg.MsgId))
  123. if routeRule != nil {
  124. var serviceNodeList []string
  125. switch routeRule.CrossMode {
  126. case router.CrossMode_Type_Section:
  127. //转发给router服务器
  128. serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_CROSSROUTER_STR)
  129. case router.CrossMode_Type_Global:
  130. serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_GLOBALCROSSROUTER_STR)
  131. }
  132. if len(serviceNodeList) > 0 {
  133. this.selectRouterIdx++
  134. if this.selectRouterIdx >= math.MaxInt32 {
  135. this.selectRouterIdx = 0
  136. }
  137. selectIdx := this.selectRouterIdx % len(serviceNodeList)
  138. serviceNode := model.GetServiceNode(serviceNodeList[selectIdx])
  139. if serviceNode == nil {
  140. return in
  141. }
  142. crossMsg := &serverproto.ServiceTransmitRouterNtf{
  143. FromZone: int32(service.GetServiceConfig().Node.Zone),
  144. MsgId: inMsg.MsgId,
  145. MsgData: inMsg.MsgData,
  146. ClientId: inMsg.ClientId,
  147. TargetServiceNode: inMsg.TargetServiceNode,
  148. }
  149. serviceNode.Send(crossMsg)
  150. return in
  151. } else if routeRule.CrossMode > 0 {
  152. util.FatalF("CrossNode Not Find corssMode=%v msgId=%v id=%v", routeRule.CrossMode, inMsg.MsgId, inMsg.ClientId)
  153. return nil
  154. }
  155. }
  156. //cross end
  157. ctx := model.Session2Context(in.Session())
  158. if ctx != nil {
  159. return &model.RecvServiceMsgEvent{
  160. Sess: in.Session(),
  161. Message: transmitMsg,
  162. ClientID: inMsg.ClientId,
  163. ClientIDList: inMsg.ClientIdList,
  164. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
  165. IsMaster: inMsg.IsMaster,
  166. }
  167. } else {
  168. return &model.RecvServiceMsgEvent{
  169. Sess: in.Session(),
  170. Message: transmitMsg,
  171. ClientID: inMsg.ClientId,
  172. ClientIDList: inMsg.ClientIdList,
  173. //ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
  174. IsMaster: inMsg.IsMaster,
  175. }
  176. }
  177. case *serverproto.ServiceTransmitRouterNtf:
  178. //social不需要特殊处理的协议通过RecvRouterServiceMsgEvent,透传到上层进行转发
  179. //log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg)
  180. transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  181. if err != nil {
  182. util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  183. return nil
  184. }
  185. ctx := model.Session2Context(in.Session())
  186. return &model.RecvRouterServiceMsgEvent{
  187. Sess: in.Session(),
  188. Message: transmitMsg,
  189. ClientID: inMsg.ClientId,
  190. ClientIDList: inMsg.ClientIdList,
  191. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
  192. IsMaster: inMsg.IsMaster,
  193. FromZone: inMsg.FromZone,
  194. }
  195. case *serverproto.ClientClosedACK:
  196. //todo...
  197. // 客户端关闭做处理,game做离线处理 放到default中处理
  198. //log.Println("ClientClosedACK", inMsg)
  199. default:
  200. return in
  201. }
  202. return in
  203. }
  204. // 后端服务器发送到gate/db的消息
  205. func (this *BackendTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  206. //todo...
  207. switch out.Msg().(type) {
  208. case *serverproto.ServiceTransmitAck:
  209. //log.Println("[BackendTCPEventHook::OutEvent] ServiceTransmitAck game to gate/db", outMsg)
  210. }
  211. return out
  212. }
  213. // /////////////////////////////////////////BackendTCPEventHook
  214. // 跨服router节点处理
  215. // 收到social节点的消息,或者收到跨服功能节点的消息
  216. type BackendTCPEventForCrossRouterHook struct {
  217. selectRouterIdx int
  218. }
  219. func (this *BackendTCPEventForCrossRouterHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  220. switch inMsg := in.Msg().(type) {
  221. case *serverproto.ServiceTransmitRouterNtf:
  222. msgId := int(inMsg.MsgId)
  223. routeRule := router.GetRuleByMsgID(msgId)
  224. var serviceNodeList []string
  225. if routeRule == nil {
  226. //router 返回给所有social服务器
  227. if inMsg.FromZone <= 0 {
  228. //发给所有social节点
  229. util.InfoF("BackendTCPEventForCrossRouterHook fromzone<=0 msgid=%v to all zone", msgId)
  230. serviceNodeList = model.GetAllZoneSocialServiceNode(model.SERVICE_NODE_TYPE_SOCIAL_STR)
  231. for idx := 0; idx < len(serviceNodeList); idx++ {
  232. serviceNode := model.GetServiceNode(serviceNodeList[idx])
  233. if serviceNode == nil {
  234. continue
  235. }
  236. serviceNode.Send(inMsg)
  237. }
  238. return in
  239. }
  240. //router 返回给对应zone服务器
  241. serviceNodeList = model.GetAllSocialServiceNodeByZone(int(inMsg.FromZone), model.SERVICE_NODE_TYPE_SOCIAL_STR)
  242. } else {
  243. //表示发往router并需要router转发给其他功能服务器
  244. //通过proto中的RouteRule来确定当前协议的功能服务器
  245. if inMsg.TargetServiceNode != "" {
  246. //发送到目地节点服务器
  247. serviceNode := model.GetServiceNode(inMsg.TargetServiceNode)
  248. if serviceNode != nil {
  249. serviceNode.Send(inMsg)
  250. }
  251. return in
  252. } else {
  253. serviceNodeList = model.GetAllServiceNodeByName(routeRule.Mod)
  254. }
  255. }
  256. if len(serviceNodeList) <= 0 {
  257. util.ErrorF("BackendTCPEventForCrossRouterHook service node not exist nodename=%v msgid=%v fromZone=%v", routeRule, msgId, inMsg.FromZone)
  258. return in
  259. }
  260. this.selectRouterIdx++
  261. if this.selectRouterIdx >= math.MaxInt32 {
  262. this.selectRouterIdx = 0
  263. }
  264. selectIdx := this.selectRouterIdx % len(serviceNodeList)
  265. serviceNode := model.GetServiceNode(serviceNodeList[selectIdx])
  266. if serviceNode == nil {
  267. return in
  268. }
  269. serviceNode.Send(inMsg)
  270. case *serverproto.ServiceTransmitAck:
  271. transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  272. if err != nil {
  273. util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  274. return nil
  275. }
  276. //chybenchmark
  277. //this.kvTimeMsgLog(int32(inMsg.MsgId))
  278. //log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg)
  279. ctx := model.Session2Context(in.Session())
  280. return &model.RecvServiceMsgEvent{
  281. Sess: in.Session(),
  282. Message: transmitMsg,
  283. ClientID: inMsg.ClientId,
  284. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
  285. }
  286. }
  287. return in
  288. }
  289. func (this *BackendTCPEventForCrossRouterHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  290. return out
  291. }
  292. // /////////////////////////////////////////DBTCPEventHook
  293. // 处理game和db之间的消息
  294. type ServiceTCPEventHook struct {
  295. kvTimeMsgNumList []serverproto.KeyValueType
  296. CurTime uint64
  297. }
  298. func (this *ServiceTCPEventHook) kvTimeMsgLog(msgId int32) {
  299. bChange := false
  300. for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ {
  301. if this.kvTimeMsgNumList[idx].Key == msgId {
  302. this.kvTimeMsgNumList[idx].Value++
  303. bChange = true
  304. break
  305. }
  306. }
  307. if !bChange {
  308. this.kvTimeMsgNumList = append(this.kvTimeMsgNumList,
  309. serverproto.KeyValueType{Key: msgId, Value: 1})
  310. }
  311. nowTime := util.GetTimeMilliseconds()
  312. if this.CurTime <= 0 {
  313. this.CurTime = nowTime
  314. } else if nowTime-this.CurTime > 1000 {
  315. this.CurTime = nowTime
  316. printfListStr := ""
  317. for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ {
  318. printfListStr += "\n" +
  319. strconv.Itoa(int(this.kvTimeMsgNumList[idx].Key)) + "-" +
  320. strconv.Itoa(int(this.kvTimeMsgNumList[idx].Value))
  321. }
  322. util.DebugF("printfListStr=%v", printfListStr)
  323. }
  324. }
  325. // db接收来自其他服务器的消息
  326. func (this *ServiceTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  327. switch inMsg := in.Msg().(type) {
  328. case *serverproto.ServiceTransmitAck:
  329. dbMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  330. if err != nil {
  331. util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  332. return nil
  333. }
  334. //chybenchmark
  335. //this.kvTimeMsgLog(int32(inMsg.MsgId))
  336. //log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg)
  337. ctx := model.Session2Context(in.Session())
  338. return &model.RecvServiceMsgEvent{
  339. Sess: in.Session(),
  340. Message: dbMsg,
  341. ClientID: inMsg.ClientId,
  342. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
  343. }
  344. }
  345. return in
  346. }
  347. // db发送到其他服务器的消息
  348. func (this *ServiceTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  349. //todo...
  350. switch out.Msg().(type) {
  351. case *serverproto.ServiceTransmitAck:
  352. //log.Println("[DBTCPEventHook::OutEvent] DBTransmitAck db to game...", outMsg)
  353. }
  354. return out
  355. }
  356. /*
  357. ///////////////////////////////////////////AuthTCPEventHook
  358. //处理auth和其他服务器之间的消息
  359. type AuthTCPEventHook struct{
  360. }
  361. //auth接收到来自其他服务器的消息
  362. func (this *AuthTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  363. switch inMsg := in.Msg().(type) {
  364. case *serverproto.ServiceTransmitAck:
  365. gateMsg, _, err := rpcc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  366. if err != nil {
  367. util.WarnF("[AuthTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  368. return nil
  369. }
  370. ctx := model.Session2Context(in.Session())
  371. return &model.RecvServiceMsgEvent{
  372. Sess: in.Session(),
  373. Message: gateMsg,
  374. ClientID: inMsg.ClientId,
  375. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
  376. }
  377. }
  378. return in
  379. }
  380. //auth发送给其他服务器的消息
  381. func (this *AuthTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  382. //todo...
  383. return out
  384. }
  385. */