hook_event.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. package baseserver
  2. import (
  3. "math"
  4. "rocommon"
  5. "rocommon/rpc"
  6. "rocommon/service"
  7. "rocommon/util"
  8. "roserver/baseserver/model"
  9. "roserver/baseserver/router"
  10. "roserver/serverproto"
  11. "strconv"
  12. )
  13. // /////////////////////////////////////////ServerTCPEventHook
  14. // game.backend
  15. // 服务器之间的消息处理派发
  16. type ServerTCPEventHook struct {
  17. recvPingNum int32
  18. }
  19. // def.go EventHook interface
  20. func (this *ServerTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  21. util.DebugF("hook_event in_event msg=%v in=%v", in.Msg(), in)
  22. switch msg := in.Msg().(type) {
  23. case *serverproto.ServiceIdentifyACK: //来自其他服务器的连接确认信息
  24. util.InfoF("[RecvServiceIdentifyACK]%v id=%v", msg.ServiceId, in.Session().ID())
  25. // 重连时会有问题,重连上来时,但是上一个连接还未移除(正在移除中),导致重连失败(想连接的没连接上,该移除的正在移除)
  26. // 通过PingReq超时断开连接,来触发断线重連
  27. if serviceNode := model.GetServiceNode(msg.ServiceId); serviceNode == nil {
  28. //添加连接上来的对端服务
  29. model.AddServiceNode(in.Session(), msg.ServiceId, msg.ServiceName, "remote")
  30. //服务器之间才启用heartbeat操作(只能反应ack端的send和发起连接端的recv是否正常) 5s
  31. in.Session().HeartBeat(&serverproto.PingReq{NeedAck: true})
  32. }
  33. case *serverproto.PingReq:
  34. {
  35. //来自ack服务器的ping消息
  36. ctx := in.Session().(rocommon.ContextSet)
  37. var sid *service.ETCDServiceDesc
  38. in.Session().IncRecvPingNum(1)
  39. if in.Session().RecvPingNum() >= 10 { //50s打印一次,收到10次打印一次
  40. in.Session().IncRecvPingNum(-1)
  41. if ctx.RawContextData("ctx", &sid) {
  42. util.DebugF("[RecvServicePing]Receive PingReq from session=%v node=%v", in.Session().ID(), sid.ID)
  43. }
  44. }
  45. if msg.NeedAck {
  46. in.Session().Send(&serverproto.PingReq{NeedAck: false})
  47. }
  48. }
  49. case *rocommon.SessionConnected:
  50. util.InfoF("[SessionConnected] node=%v id=%v", in.Session().Node(), in.Session().ID())
  51. //连接上对应类型的服务器节点后,发送确认信息(ServiceIdentifyACK),告诉对端自己的服务器类型
  52. ctx := in.Session().Node().(rocommon.ContextSet)
  53. var sid *service.ETCDServiceDesc
  54. //sid在CreateConnector中会指定
  55. if ctx.RawContextData("sid", &sid) {
  56. util.InfoF("[SessionConnected] endmsg before")
  57. in.Session().Send(&serverproto.ServiceIdentifyACK{
  58. //发送自身服务器节点的信息
  59. ServiceName: service.GetServiceName(), //service/init.go
  60. ServiceId: service.GetLocalServiceID(),
  61. ServerStartTime: util.GetTimeMilliseconds(),
  62. })
  63. //添加远程的服务器节点到本地,sid服务器信息是从etcd中获取的
  64. model.AddServiceNode(in.Session(), sid.ID, sid.Name, "local")
  65. util.InfoF("[SendServiceIdentifyACK_local][%v]->[%v] id=%v", service.GetLocalServiceID(), sid.ID, in.Session().ID())
  66. } else {
  67. util.InfoF("connector not exist sid")
  68. }
  69. case *rocommon.SessionClosed:
  70. closeSID := model.RemoveServiceNode(in.Session())
  71. if closeSID != "" {
  72. msg.CloseSId = closeSID
  73. }
  74. util.InfoF("[ServerTCPEventHook::InEvent] Readmsg error SessionClosed session=%v", in.Session().ID())
  75. case *rocommon.SessionConnectError:
  76. util.InfoF("[ServerTCPEventHook::InEvent] connector error=%v", msg.String())
  77. }
  78. return in
  79. }
  80. func (this *ServerTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  81. return out
  82. }
  83. // /////////////////////////////////////////BackendTCPEventHook
  84. type BackendTCPEventHook struct {
  85. selectRouterIdx int
  86. }
  87. // def.go EventHook interface
  88. // 后端服务器接收到来自gate/db/auth的消息
  89. func (this *BackendTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  90. util.DebugF("BackendTCPhook_event BackendTCPin_event msg=%v in=%v", in.Msg(), in)
  91. switch inMsg := in.Msg().(type) {
  92. case *serverproto.GateTransmitAck:
  93. userMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  94. util.InfoF("BackendTCPEventHook InEvent clientID=%v msgID=%v msg=%v", inMsg.ClientId, inMsg.MsgId, inMsg.MsgData)
  95. if err != nil {
  96. util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  97. return nil
  98. }
  99. //if inMsg.MsgId == 1173 {
  100. // util.DebugF("kvtime recv cli=%v deltime=%v", inMsg.ClientId, util.GetTimeMilliseconds()-inMsg.KvTime)
  101. //}
  102. //封装成来自gate的消息事件
  103. //todo...这边需要添加gate和game的连接信息,否则game工作线程获取session时会有多线程冲突
  104. return &model.RecvGateMsgEvent{
  105. Sess: in.Session(),
  106. Message: userMsg,
  107. ClientID: inMsg.ClientId,
  108. MsgSeqId: inMsg.SeqId,
  109. KvTime: inMsg.KvTime,
  110. }
  111. case *serverproto.ServiceTransmitAck:
  112. //log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg)
  113. transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  114. if err != nil {
  115. util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  116. return nil
  117. }
  118. //cross begin
  119. //判断是否是跨服操作(针对于social服务器)
  120. routeRule := router.GetRuleByMsgID(int(inMsg.MsgId))
  121. if routeRule != nil {
  122. var serviceNodeList []string
  123. switch routeRule.CrossMode {
  124. case router.CrossMode_Type_Section:
  125. //转发给router服务器
  126. serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_CROSSROUTER_STR)
  127. case router.CrossMode_Type_Global:
  128. serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_GLOBALCROSSROUTER_STR)
  129. }
  130. if len(serviceNodeList) > 0 {
  131. this.selectRouterIdx++
  132. if this.selectRouterIdx >= math.MaxInt32 {
  133. this.selectRouterIdx = 0
  134. }
  135. selectIdx := this.selectRouterIdx % len(serviceNodeList)
  136. serviceNode := model.GetServiceNode(serviceNodeList[selectIdx])
  137. if serviceNode == nil {
  138. return in
  139. }
  140. crossMsg := &serverproto.ServiceTransmitRouterNtf{
  141. FromZone: int32(service.GetServiceConfig().Node.Zone),
  142. MsgId: inMsg.MsgId,
  143. MsgData: inMsg.MsgData,
  144. ClientId: inMsg.ClientId,
  145. TargetServiceNode: inMsg.TargetServiceNode,
  146. }
  147. serviceNode.Send(crossMsg)
  148. return in
  149. } else if routeRule.CrossMode > 0 {
  150. util.FatalF("CrossNode Not Find corssMode=%v msgId=%v id=%v", routeRule.CrossMode, inMsg.MsgId, inMsg.ClientId)
  151. return nil
  152. }
  153. }
  154. //cross end
  155. ctx := model.Session2Context(in.Session())
  156. if ctx != nil {
  157. return &model.RecvServiceMsgEvent{
  158. Sess: in.Session(),
  159. Message: transmitMsg,
  160. ClientID: inMsg.ClientId,
  161. ClientIDList: inMsg.ClientIdList,
  162. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
  163. IsMaster: inMsg.IsMaster,
  164. }
  165. } else {
  166. return &model.RecvServiceMsgEvent{
  167. Sess: in.Session(),
  168. Message: transmitMsg,
  169. ClientID: inMsg.ClientId,
  170. ClientIDList: inMsg.ClientIdList,
  171. //ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
  172. IsMaster: inMsg.IsMaster,
  173. }
  174. }
  175. case *serverproto.ServiceTransmitRouterNtf:
  176. //social不需要特殊处理的协议通过RecvRouterServiceMsgEvent,透传到上层进行转发
  177. //log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg)
  178. transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  179. if err != nil {
  180. util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  181. return nil
  182. }
  183. ctx := model.Session2Context(in.Session())
  184. return &model.RecvRouterServiceMsgEvent{
  185. Sess: in.Session(),
  186. Message: transmitMsg,
  187. ClientID: inMsg.ClientId,
  188. ClientIDList: inMsg.ClientIdList,
  189. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
  190. IsMaster: inMsg.IsMaster,
  191. FromZone: inMsg.FromZone,
  192. }
  193. case *serverproto.ClientClosedACK:
  194. //todo...
  195. // 客户端关闭做处理,game做离线处理 放到default中处理
  196. //log.Println("ClientClosedACK", inMsg)
  197. default:
  198. return in
  199. }
  200. return in
  201. }
  202. // 后端服务器发送到gate/db的消息
  203. func (this *BackendTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  204. //todo...
  205. switch out.Msg().(type) {
  206. case *serverproto.ServiceTransmitAck:
  207. //log.Println("[BackendTCPEventHook::OutEvent] ServiceTransmitAck game to gate/db", outMsg)
  208. }
  209. return out
  210. }
  211. // /////////////////////////////////////////BackendTCPEventHook
  212. // 跨服router节点处理
  213. // 收到social节点的消息,或者收到跨服功能节点的消息
  214. type BackendTCPEventForCrossRouterHook struct {
  215. selectRouterIdx int
  216. }
  217. func (this *BackendTCPEventForCrossRouterHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  218. switch inMsg := in.Msg().(type) {
  219. case *serverproto.ServiceTransmitRouterNtf:
  220. msgId := int(inMsg.MsgId)
  221. routeRule := router.GetRuleByMsgID(msgId)
  222. var serviceNodeList []string
  223. if routeRule == nil {
  224. //router 返回给所有social服务器
  225. if inMsg.FromZone <= 0 {
  226. //发给所有social节点
  227. util.InfoF("BackendTCPEventForCrossRouterHook fromzone<=0 msgid=%v to all zone", msgId)
  228. serviceNodeList = model.GetAllZoneSocialServiceNode(model.SERVICE_NODE_TYPE_SOCIAL_STR)
  229. for idx := 0; idx < len(serviceNodeList); idx++ {
  230. serviceNode := model.GetServiceNode(serviceNodeList[idx])
  231. if serviceNode == nil {
  232. continue
  233. }
  234. serviceNode.Send(inMsg)
  235. }
  236. return in
  237. }
  238. //router 返回给对应zone服务器
  239. serviceNodeList = model.GetAllSocialServiceNodeByZone(int(inMsg.FromZone), model.SERVICE_NODE_TYPE_SOCIAL_STR)
  240. } else {
  241. //表示发往router并需要router转发给其他功能服务器
  242. //通过proto中的RouteRule来确定当前协议的功能服务器
  243. if inMsg.TargetServiceNode != "" {
  244. //发送到目地节点服务器
  245. serviceNode := model.GetServiceNode(inMsg.TargetServiceNode)
  246. if serviceNode != nil {
  247. serviceNode.Send(inMsg)
  248. }
  249. return in
  250. } else {
  251. serviceNodeList = model.GetAllServiceNodeByName(routeRule.Mod)
  252. }
  253. }
  254. if len(serviceNodeList) <= 0 {
  255. util.ErrorF("BackendTCPEventForCrossRouterHook service node not exist nodename=%v msgid=%v fromZone=%V", routeRule, msgId, inMsg.FromZone)
  256. return in
  257. }
  258. this.selectRouterIdx++
  259. if this.selectRouterIdx >= math.MaxInt32 {
  260. this.selectRouterIdx = 0
  261. }
  262. selectIdx := this.selectRouterIdx % len(serviceNodeList)
  263. serviceNode := model.GetServiceNode(serviceNodeList[selectIdx])
  264. if serviceNode == nil {
  265. return in
  266. }
  267. serviceNode.Send(inMsg)
  268. case *serverproto.ServiceTransmitAck:
  269. transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  270. if err != nil {
  271. util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  272. return nil
  273. }
  274. //chybenchmark
  275. //this.kvTimeMsgLog(int32(inMsg.MsgId))
  276. //log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg)
  277. ctx := model.Session2Context(in.Session())
  278. return &model.RecvServiceMsgEvent{
  279. Sess: in.Session(),
  280. Message: transmitMsg,
  281. ClientID: inMsg.ClientId,
  282. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
  283. }
  284. }
  285. return in
  286. }
  287. func (this *BackendTCPEventForCrossRouterHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  288. return out
  289. }
  290. // /////////////////////////////////////////DBTCPEventHook
  291. // 处理game和db之间的消息
  292. type ServiceTCPEventHook struct {
  293. kvTimeMsgNumList []serverproto.KeyValueType
  294. CurTime uint64
  295. }
  296. func (this *ServiceTCPEventHook) kvTimeMsgLog(msgId int32) {
  297. bChange := false
  298. for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ {
  299. if this.kvTimeMsgNumList[idx].Key == msgId {
  300. this.kvTimeMsgNumList[idx].Value++
  301. bChange = true
  302. break
  303. }
  304. }
  305. if !bChange {
  306. this.kvTimeMsgNumList = append(this.kvTimeMsgNumList,
  307. serverproto.KeyValueType{Key: msgId, Value: 1})
  308. }
  309. nowTime := util.GetTimeMilliseconds()
  310. if this.CurTime <= 0 {
  311. this.CurTime = nowTime
  312. } else if nowTime-this.CurTime > 1000 {
  313. this.CurTime = nowTime
  314. printfListStr := ""
  315. for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ {
  316. printfListStr += "\n" +
  317. strconv.Itoa(int(this.kvTimeMsgNumList[idx].Key)) + "-" +
  318. strconv.Itoa(int(this.kvTimeMsgNumList[idx].Value))
  319. }
  320. util.DebugF("printfListStr=%v", printfListStr)
  321. }
  322. }
  323. // db接收来自其他服务器的消息
  324. func (this *ServiceTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  325. switch inMsg := in.Msg().(type) {
  326. case *serverproto.ServiceTransmitAck:
  327. dbMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  328. if err != nil {
  329. util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  330. return nil
  331. }
  332. //chybenchmark
  333. //this.kvTimeMsgLog(int32(inMsg.MsgId))
  334. //log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg)
  335. ctx := model.Session2Context(in.Session())
  336. return &model.RecvServiceMsgEvent{
  337. Sess: in.Session(),
  338. Message: dbMsg,
  339. ClientID: inMsg.ClientId,
  340. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
  341. }
  342. }
  343. return in
  344. }
  345. // db发送到其他服务器的消息
  346. func (this *ServiceTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  347. //todo...
  348. switch out.Msg().(type) {
  349. case *serverproto.ServiceTransmitAck:
  350. //log.Println("[DBTCPEventHook::OutEvent] DBTransmitAck db to game...", outMsg)
  351. }
  352. return out
  353. }
  354. /*
  355. ///////////////////////////////////////////AuthTCPEventHook
  356. //处理auth和其他服务器之间的消息
  357. type AuthTCPEventHook struct{
  358. }
  359. //auth接收到来自其他服务器的消息
  360. func (this *AuthTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  361. switch inMsg := in.Msg().(type) {
  362. case *serverproto.ServiceTransmitAck:
  363. gateMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  364. if err != nil {
  365. util.WarnF("[AuthTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  366. return nil
  367. }
  368. ctx := model.Session2Context(in.Session())
  369. return &model.RecvServiceMsgEvent{
  370. Sess: in.Session(),
  371. Message: gateMsg,
  372. ClientID: inMsg.ClientId,
  373. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
  374. }
  375. }
  376. return in
  377. }
  378. //auth发送给其他服务器的消息
  379. func (this *AuthTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  380. //todo...
  381. return out
  382. }
  383. */