hook_event.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. package baseserver
  2. import (
  3. "math"
  4. "rocommon"
  5. "rocommon/rpc"
  6. "rocommon/service"
  7. "rocommon/util"
  8. "roserver/baseserver/model"
  9. "roserver/baseserver/router"
  10. "roserver/serverproto"
  11. "strconv"
  12. )
  13. // /////////////////////////////////////////ServerTCPEventHook
  14. // game.backend
  15. // 服务器之间的消息处理派发
  16. type ServerTCPEventHook struct {
  17. recvPingNum int32
  18. }
  19. // def.go EventHook interface
  20. func (this *ServerTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  21. switch msg := in.Msg().(type) {
  22. case *serverproto.ServiceIdentifyACK: //来自其他服务器的连接确认信息
  23. util.InfoF("[RecvServiceIdentifyACK]%v id=%v", msg.ServiceId, in.Session().ID())
  24. // 重连时会有问题,重连上来时,但是上一个连接还未移除(正在移除中),导致重连失败(想连接的没连接上,该移除的正在移除)
  25. // 通过PingReq超时断开连接,来触发断线重連
  26. if serviceNode := model.GetServiceNode(msg.ServiceId); serviceNode == nil {
  27. //添加连接上来的对端服务
  28. model.AddServiceNode(in.Session(), msg.ServiceId, msg.ServiceName, "remote")
  29. //服务器之间才启用heartbeat操作(只能反应ack端的send和发起连接端的recv是否正常) 5s
  30. in.Session().HeartBeat(&serverproto.PingReq{NeedAck: true})
  31. }
  32. case *serverproto.PingReq:
  33. {
  34. //来自ack服务器的ping消息
  35. ctx := in.Session().(rocommon.ContextSet)
  36. var sid *service.ETCDServiceDesc
  37. in.Session().IncRecvPingNum(1)
  38. if in.Session().RecvPingNum() >= 10 { //50s打印一次,收到10次打印一次
  39. in.Session().IncRecvPingNum(-1)
  40. if ctx.RawContextData("ctx", &sid) {
  41. util.InfoF("[RecvServicePing]Receive PingReq from session=%v node=%v", in.Session().ID(), sid.ID)
  42. }
  43. }
  44. if msg.NeedAck {
  45. in.Session().Send(&serverproto.PingReq{NeedAck: false})
  46. }
  47. }
  48. case *rocommon.SessionConnected:
  49. //连接上对应类型的服务器节点后,发送确认信息(ServiceIdentifyACK),告诉对端自己的服务器类型
  50. ctx := in.Session().Node().(rocommon.ContextSet)
  51. var sid *service.ETCDServiceDesc
  52. //sid在CreateConnector中会指定
  53. if ctx.RawContextData("sid", &sid) {
  54. in.Session().Send(&serverproto.ServiceIdentifyACK{
  55. //发送自身服务器节点的信息
  56. ServiceName: service.GetServiceName(), //service/init.go
  57. ServiceId: service.GetLocalServiceID(),
  58. ServerStartTime: util.GetTimeMilliseconds(),
  59. })
  60. //添加远程的服务器节点到本地,sid服务器信息是从etcd中获取的
  61. model.AddServiceNode(in.Session(), sid.ID, sid.Name, "local")
  62. util.InfoF("[SendServiceIdentifyACK_local][%v]->[%v] id=%v", service.GetLocalServiceID(), sid.ID, in.Session().ID())
  63. } else {
  64. util.InfoF("connector not exist sid")
  65. }
  66. case *rocommon.SessionClosed:
  67. closeSID := model.RemoveServiceNode(in.Session())
  68. if closeSID != "" {
  69. msg.CloseSId = closeSID
  70. }
  71. util.InfoF("[ServerTCPEventHook::InEvent] Readmsg error SessionClosed session=%v", in.Session().ID())
  72. case *rocommon.SessionConnectError:
  73. util.InfoF("[ServerTCPEventHook::InEvent] connector error=%v", msg.String())
  74. }
  75. return in
  76. }
  77. func (this *ServerTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  78. return out
  79. }
  80. // /////////////////////////////////////////BackendTCPEventHook
  81. type BackendTCPEventHook struct {
  82. selectRouterIdx int
  83. }
  84. // def.go EventHook interface
  85. // 后端服务器接收到来自gate/db/auth的消息
  86. func (this *BackendTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  87. switch inMsg := in.Msg().(type) {
  88. case *serverproto.GateTransmitAck:
  89. userMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  90. if err != nil {
  91. util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  92. return nil
  93. }
  94. //if inMsg.MsgId == 1173 {
  95. // util.DebugF("kvtime recv cli=%v deltime=%v", inMsg.ClientId, util.GetTimeMilliseconds()-inMsg.KvTime)
  96. //}
  97. //封装成来自gate的消息事件
  98. //todo...这边需要添加gate和game的连接信息,否则game工作线程获取session时会有多线程冲突
  99. return &model.RecvGateMsgEvent{
  100. Sess: in.Session(),
  101. Message: userMsg,
  102. ClientID: inMsg.ClientId,
  103. MsgSeqId: inMsg.SeqId,
  104. KvTime: inMsg.KvTime,
  105. }
  106. case *serverproto.ServiceTransmitAck:
  107. //log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg)
  108. transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  109. if err != nil {
  110. util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  111. return nil
  112. }
  113. //cross begin
  114. //判断是否是跨服操作(针对于social服务器)
  115. routeRule := router.GetRuleByMsgID(int(inMsg.MsgId))
  116. if routeRule != nil {
  117. var serviceNodeList []string
  118. switch routeRule.CrossMode {
  119. case router.CrossMode_Type_Section:
  120. //转发给router服务器
  121. serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_CROSSROUTER_STR)
  122. case router.CrossMode_Type_Global:
  123. serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_GLOBALCROSSROUTER_STR)
  124. }
  125. if len(serviceNodeList) > 0 {
  126. this.selectRouterIdx++
  127. if this.selectRouterIdx >= math.MaxInt32 {
  128. this.selectRouterIdx = 0
  129. }
  130. selectIdx := this.selectRouterIdx % len(serviceNodeList)
  131. serviceNode := model.GetServiceNode(serviceNodeList[selectIdx])
  132. if serviceNode == nil {
  133. return in
  134. }
  135. crossMsg := &serverproto.ServiceTransmitRouterNtf{
  136. FromZone: int32(service.GetServiceConfig().Node.Zone),
  137. MsgId: inMsg.MsgId,
  138. MsgData: inMsg.MsgData,
  139. ClientId: inMsg.ClientId,
  140. TargetServiceNode: inMsg.TargetServiceNode,
  141. }
  142. serviceNode.Send(crossMsg)
  143. return in
  144. } else if routeRule.CrossMode > 0 {
  145. util.FatalF("CrossNode Not Find corssMode=%v msgId=%v id=%v", routeRule.CrossMode, inMsg.MsgId, inMsg.ClientId)
  146. return nil
  147. }
  148. }
  149. //cross end
  150. ctx := model.Session2Context(in.Session())
  151. if ctx != nil {
  152. return &model.RecvServiceMsgEvent{
  153. Sess: in.Session(),
  154. Message: transmitMsg,
  155. ClientID: inMsg.ClientId,
  156. ClientIDList: inMsg.ClientIdList,
  157. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
  158. IsMaster: inMsg.IsMaster,
  159. }
  160. } else {
  161. return &model.RecvServiceMsgEvent{
  162. Sess: in.Session(),
  163. Message: transmitMsg,
  164. ClientID: inMsg.ClientId,
  165. ClientIDList: inMsg.ClientIdList,
  166. //ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
  167. IsMaster: inMsg.IsMaster,
  168. }
  169. }
  170. case *serverproto.ServiceTransmitRouterNtf:
  171. //social不需要特殊处理的协议通过RecvRouterServiceMsgEvent,透传到上层进行转发
  172. //log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg)
  173. transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  174. if err != nil {
  175. util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  176. return nil
  177. }
  178. ctx := model.Session2Context(in.Session())
  179. return &model.RecvRouterServiceMsgEvent{
  180. Sess: in.Session(),
  181. Message: transmitMsg,
  182. ClientID: inMsg.ClientId,
  183. ClientIDList: inMsg.ClientIdList,
  184. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
  185. IsMaster: inMsg.IsMaster,
  186. FromZone: inMsg.FromZone,
  187. }
  188. case *serverproto.ClientClosedACK:
  189. //todo...
  190. // 客户端关闭做处理,game做离线处理 放到default中处理
  191. //log.Println("ClientClosedACK", inMsg)
  192. default:
  193. return in
  194. }
  195. return in
  196. }
  197. // 后端服务器发送到gate/db的消息
  198. func (this *BackendTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  199. //todo...
  200. switch out.Msg().(type) {
  201. case *serverproto.ServiceTransmitAck:
  202. //log.Println("[BackendTCPEventHook::OutEvent] ServiceTransmitAck game to gate/db", outMsg)
  203. }
  204. return out
  205. }
  206. // /////////////////////////////////////////BackendTCPEventHook
  207. // 跨服router节点处理
  208. // 收到social节点的消息,或者收到跨服功能节点的消息
  209. type BackendTCPEventForCrossRouterHook struct {
  210. selectRouterIdx int
  211. }
  212. func (this *BackendTCPEventForCrossRouterHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  213. switch inMsg := in.Msg().(type) {
  214. case *serverproto.ServiceTransmitRouterNtf:
  215. msgId := int(inMsg.MsgId)
  216. routeRule := router.GetRuleByMsgID(msgId)
  217. var serviceNodeList []string
  218. if routeRule == nil {
  219. //router 返回给所有social服务器
  220. if inMsg.FromZone <= 0 {
  221. //发给所有social节点
  222. util.InfoF("BackendTCPEventForCrossRouterHook fromzone<=0 msgid=%v to all zone", msgId)
  223. serviceNodeList = model.GetAllZoneSocialServiceNode(model.SERVICE_NODE_TYPE_SOCIAL_STR)
  224. for idx := 0; idx < len(serviceNodeList); idx++ {
  225. serviceNode := model.GetServiceNode(serviceNodeList[idx])
  226. if serviceNode == nil {
  227. continue
  228. }
  229. serviceNode.Send(inMsg)
  230. }
  231. return in
  232. }
  233. //router 返回给对应zone服务器
  234. serviceNodeList = model.GetAllSocialServiceNodeByZone(int(inMsg.FromZone), model.SERVICE_NODE_TYPE_SOCIAL_STR)
  235. } else {
  236. //表示发往router并需要router转发给其他功能服务器
  237. //通过proto中的RouteRule来确定当前协议的功能服务器
  238. if inMsg.TargetServiceNode != "" {
  239. //发送到目地节点服务器
  240. serviceNode := model.GetServiceNode(inMsg.TargetServiceNode)
  241. if serviceNode != nil {
  242. serviceNode.Send(inMsg)
  243. }
  244. return in
  245. } else {
  246. serviceNodeList = model.GetAllServiceNodeByName(routeRule.Mod)
  247. }
  248. }
  249. if len(serviceNodeList) <= 0 {
  250. util.ErrorF("BackendTCPEventForCrossRouterHook service node not exist nodename=%v msgid=%v fromZone=%V", routeRule, msgId, inMsg.FromZone)
  251. return in
  252. }
  253. this.selectRouterIdx++
  254. if this.selectRouterIdx >= math.MaxInt32 {
  255. this.selectRouterIdx = 0
  256. }
  257. selectIdx := this.selectRouterIdx % len(serviceNodeList)
  258. serviceNode := model.GetServiceNode(serviceNodeList[selectIdx])
  259. if serviceNode == nil {
  260. return in
  261. }
  262. serviceNode.Send(inMsg)
  263. case *serverproto.ServiceTransmitAck:
  264. transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  265. if err != nil {
  266. util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  267. return nil
  268. }
  269. //chybenchmark
  270. //this.kvTimeMsgLog(int32(inMsg.MsgId))
  271. //log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg)
  272. ctx := model.Session2Context(in.Session())
  273. return &model.RecvServiceMsgEvent{
  274. Sess: in.Session(),
  275. Message: transmitMsg,
  276. ClientID: inMsg.ClientId,
  277. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
  278. }
  279. }
  280. return in
  281. }
  282. func (this *BackendTCPEventForCrossRouterHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  283. return out
  284. }
  285. // /////////////////////////////////////////DBTCPEventHook
  286. // 处理game和db之间的消息
  287. type ServiceTCPEventHook struct {
  288. kvTimeMsgNumList []serverproto.KeyValueType
  289. CurTime uint64
  290. }
  291. func (this *ServiceTCPEventHook) kvTimeMsgLog(msgId int32) {
  292. bChange := false
  293. for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ {
  294. if this.kvTimeMsgNumList[idx].Key == msgId {
  295. this.kvTimeMsgNumList[idx].Value++
  296. bChange = true
  297. break
  298. }
  299. }
  300. if !bChange {
  301. this.kvTimeMsgNumList = append(this.kvTimeMsgNumList,
  302. serverproto.KeyValueType{Key: msgId, Value: 1})
  303. }
  304. nowTime := util.GetTimeMilliseconds()
  305. if this.CurTime <= 0 {
  306. this.CurTime = nowTime
  307. } else if nowTime-this.CurTime > 1000 {
  308. this.CurTime = nowTime
  309. printfListStr := ""
  310. for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ {
  311. printfListStr += "\n" +
  312. strconv.Itoa(int(this.kvTimeMsgNumList[idx].Key)) + "-" +
  313. strconv.Itoa(int(this.kvTimeMsgNumList[idx].Value))
  314. }
  315. util.DebugF("printfListStr=%v", printfListStr)
  316. }
  317. }
  318. // db接收来自其他服务器的消息
  319. func (this *ServiceTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  320. switch inMsg := in.Msg().(type) {
  321. case *serverproto.ServiceTransmitAck:
  322. dbMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  323. if err != nil {
  324. util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  325. return nil
  326. }
  327. //chybenchmark
  328. //this.kvTimeMsgLog(int32(inMsg.MsgId))
  329. //log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg)
  330. ctx := model.Session2Context(in.Session())
  331. return &model.RecvServiceMsgEvent{
  332. Sess: in.Session(),
  333. Message: dbMsg,
  334. ClientID: inMsg.ClientId,
  335. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
  336. }
  337. }
  338. return in
  339. }
  340. // db发送到其他服务器的消息
  341. func (this *ServiceTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  342. //todo...
  343. switch out.Msg().(type) {
  344. case *serverproto.ServiceTransmitAck:
  345. //log.Println("[DBTCPEventHook::OutEvent] DBTransmitAck db to game...", outMsg)
  346. }
  347. return out
  348. }
  349. /*
  350. ///////////////////////////////////////////AuthTCPEventHook
  351. //处理auth和其他服务器之间的消息
  352. type AuthTCPEventHook struct{
  353. }
  354. //auth接收到来自其他服务器的消息
  355. func (this *AuthTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
  356. switch inMsg := in.Msg().(type) {
  357. case *serverproto.ServiceTransmitAck:
  358. gateMsg, _, err := rpcc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
  359. if err != nil {
  360. util.WarnF("[AuthTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
  361. return nil
  362. }
  363. ctx := model.Session2Context(in.Session())
  364. return &model.RecvServiceMsgEvent{
  365. Sess: in.Session(),
  366. Message: gateMsg,
  367. ClientID: inMsg.ClientId,
  368. ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
  369. }
  370. }
  371. return in
  372. }
  373. //auth发送给其他服务器的消息
  374. func (this *AuthTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
  375. //todo...
  376. return out
  377. }
  378. */