package baseserver import ( "math" "rocommon" "rocommon/rpc" "rocommon/service" "rocommon/util" "roserver/baseserver/model" "roserver/baseserver/router" "roserver/serverproto" "strconv" ) // /////////////////////////////////////////ServerTCPEventHook // game.backend // 服务器之间的消息处理派发 type ServerTCPEventHook struct { recvPingNum int32 } // def.go EventHook interface func (this *ServerTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent { switch msg := in.Msg().(type) { case *serverproto.ServiceIdentifyACK: //来自其他服务器的连接确认信息 util.InfoF("[RecvServiceIdentifyACK]%v id=%v", msg.ServiceId, in.Session().ID()) // 重连时会有问题,重连上来时,但是上一个连接还未移除(正在移除中),导致重连失败(想连接的没连接上,该移除的正在移除) // 通过PingReq超时断开连接,来触发断线重連 if serviceNode := model.GetServiceNode(msg.ServiceId); serviceNode == nil { //添加连接上来的对端服务 model.AddServiceNode(in.Session(), msg.ServiceId, msg.ServiceName, "remote") //服务器之间才启用heartbeat操作(只能反应ack端的send和发起连接端的recv是否正常) 5s in.Session().HeartBeat(&serverproto.PingReq{NeedAck: true}) } case *serverproto.PingReq: { //来自ack服务器的ping消息 ctx := in.Session().(rocommon.ContextSet) var sid *service.ETCDServiceDesc in.Session().IncRecvPingNum(1) if in.Session().RecvPingNum() >= 10 { //50s打印一次,收到10次打印一次 in.Session().IncRecvPingNum(-1) if ctx.RawContextData("ctx", &sid) { util.InfoF("[RecvServicePing]Receive PingReq from session=%v node=%v", in.Session().ID(), sid.ID) } } if msg.NeedAck { in.Session().Send(&serverproto.PingReq{NeedAck: false}) } } case *rocommon.SessionConnected: //连接上对应类型的服务器节点后,发送确认信息(ServiceIdentifyACK),告诉对端自己的服务器类型 ctx := in.Session().Node().(rocommon.ContextSet) var sid *service.ETCDServiceDesc //sid在CreateConnector中会指定 if ctx.RawContextData("sid", &sid) { in.Session().Send(&serverproto.ServiceIdentifyACK{ //发送自身服务器节点的信息 ServiceName: service.GetServiceName(), //service/init.go ServiceId: service.GetLocalServiceID(), ServerStartTime: util.GetTimeMilliseconds(), }) //添加远程的服务器节点到本地,sid服务器信息是从etcd中获取的 model.AddServiceNode(in.Session(), sid.ID, sid.Name, "local") util.InfoF("[SendServiceIdentifyACK_local][%v]->[%v] id=%v", service.GetLocalServiceID(), sid.ID, in.Session().ID()) } else { util.InfoF("connector not exist sid") } case *rocommon.SessionClosed: closeSID := model.RemoveServiceNode(in.Session()) if closeSID != "" { msg.CloseSId = closeSID } util.InfoF("[ServerTCPEventHook::InEvent] Readmsg error SessionClosed session=%v", in.Session().ID()) case *rocommon.SessionConnectError: util.InfoF("[ServerTCPEventHook::InEvent] connector error=%v", msg.String()) } return in } func (this *ServerTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent { return out } // /////////////////////////////////////////BackendTCPEventHook type BackendTCPEventHook struct { selectRouterIdx int } // def.go EventHook interface // 后端服务器接收到来自gate/db/auth的消息 func (this *BackendTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent { switch inMsg := in.Msg().(type) { case *serverproto.GateTransmitAck: userMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData) if err != nil { util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId) return nil } //if inMsg.MsgId == 1173 { // util.DebugF("kvtime recv cli=%v deltime=%v", inMsg.ClientId, util.GetTimeMilliseconds()-inMsg.KvTime) //} //封装成来自gate的消息事件 //todo...这边需要添加gate和game的连接信息,否则game工作线程获取session时会有多线程冲突 return &model.RecvGateMsgEvent{ Sess: in.Session(), Message: userMsg, ClientID: inMsg.ClientId, MsgSeqId: inMsg.SeqId, KvTime: inMsg.KvTime, } case *serverproto.ServiceTransmitAck: //log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg) transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData) if err != nil { util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId) return nil } //cross begin //判断是否是跨服操作(针对于social服务器) routeRule := router.GetRuleByMsgID(int(inMsg.MsgId)) if routeRule != nil { var serviceNodeList []string switch routeRule.CrossMode { case router.CrossMode_Type_Section: //转发给router服务器 serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_CROSSROUTER_STR) case router.CrossMode_Type_Global: serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_GLOBALCROSSROUTER_STR) } if len(serviceNodeList) > 0 { this.selectRouterIdx++ if this.selectRouterIdx >= math.MaxInt32 { this.selectRouterIdx = 0 } selectIdx := this.selectRouterIdx % len(serviceNodeList) serviceNode := model.GetServiceNode(serviceNodeList[selectIdx]) if serviceNode == nil { return in } crossMsg := &serverproto.ServiceTransmitRouterNtf{ FromZone: int32(service.GetServiceConfig().Node.Zone), MsgId: inMsg.MsgId, MsgData: inMsg.MsgData, ClientId: inMsg.ClientId, TargetServiceNode: inMsg.TargetServiceNode, } serviceNode.Send(crossMsg) return in } else if routeRule.CrossMode > 0 { util.FatalF("CrossNode Not Find corssMode=%v msgId=%v id=%v", routeRule.CrossMode, inMsg.MsgId, inMsg.ClientId) return nil } } //cross end ctx := model.Session2Context(in.Session()) if ctx != nil { return &model.RecvServiceMsgEvent{ Sess: in.Session(), Message: transmitMsg, ClientID: inMsg.ClientId, ClientIDList: inMsg.ClientIdList, ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接) IsMaster: inMsg.IsMaster, } } else { return &model.RecvServiceMsgEvent{ Sess: in.Session(), Message: transmitMsg, ClientID: inMsg.ClientId, ClientIDList: inMsg.ClientIdList, //ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接) IsMaster: inMsg.IsMaster, } } case *serverproto.ServiceTransmitRouterNtf: //social不需要特殊处理的协议通过RecvRouterServiceMsgEvent,透传到上层进行转发 //log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg) transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData) if err != nil { util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId) return nil } ctx := model.Session2Context(in.Session()) return &model.RecvRouterServiceMsgEvent{ Sess: in.Session(), Message: transmitMsg, ClientID: inMsg.ClientId, ClientIDList: inMsg.ClientIdList, ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接) IsMaster: inMsg.IsMaster, FromZone: inMsg.FromZone, } case *serverproto.ClientClosedACK: //todo... // 客户端关闭做处理,game做离线处理 放到default中处理 //log.Println("ClientClosedACK", inMsg) default: return in } return in } // 后端服务器发送到gate/db的消息 func (this *BackendTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent { //todo... switch out.Msg().(type) { case *serverproto.ServiceTransmitAck: //log.Println("[BackendTCPEventHook::OutEvent] ServiceTransmitAck game to gate/db", outMsg) } return out } // /////////////////////////////////////////BackendTCPEventHook // 跨服router节点处理 // 收到social节点的消息,或者收到跨服功能节点的消息 type BackendTCPEventForCrossRouterHook struct { selectRouterIdx int } func (this *BackendTCPEventForCrossRouterHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent { switch inMsg := in.Msg().(type) { case *serverproto.ServiceTransmitRouterNtf: msgId := int(inMsg.MsgId) routeRule := router.GetRuleByMsgID(msgId) var serviceNodeList []string if routeRule == nil { //router 返回给所有social服务器 if inMsg.FromZone <= 0 { //发给所有social节点 util.InfoF("BackendTCPEventForCrossRouterHook fromzone<=0 msgid=%v to all zone", msgId) serviceNodeList = model.GetAllZoneSocialServiceNode(model.SERVICE_NODE_TYPE_SOCIAL_STR) for idx := 0; idx < len(serviceNodeList); idx++ { serviceNode := model.GetServiceNode(serviceNodeList[idx]) if serviceNode == nil { continue } serviceNode.Send(inMsg) } return in } //router 返回给对应zone服务器 serviceNodeList = model.GetAllSocialServiceNodeByZone(int(inMsg.FromZone), model.SERVICE_NODE_TYPE_SOCIAL_STR) } else { //表示发往router并需要router转发给其他功能服务器 //通过proto中的RouteRule来确定当前协议的功能服务器 if inMsg.TargetServiceNode != "" { //发送到目地节点服务器 serviceNode := model.GetServiceNode(inMsg.TargetServiceNode) if serviceNode != nil { serviceNode.Send(inMsg) } return in } else { serviceNodeList = model.GetAllServiceNodeByName(routeRule.Mod) } } if len(serviceNodeList) <= 0 { util.ErrorF("BackendTCPEventForCrossRouterHook service node not exist nodename=%v msgid=%v fromZone=%V", routeRule, msgId, inMsg.FromZone) return in } this.selectRouterIdx++ if this.selectRouterIdx >= math.MaxInt32 { this.selectRouterIdx = 0 } selectIdx := this.selectRouterIdx % len(serviceNodeList) serviceNode := model.GetServiceNode(serviceNodeList[selectIdx]) if serviceNode == nil { return in } serviceNode.Send(inMsg) case *serverproto.ServiceTransmitAck: transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData) if err != nil { util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId) return nil } //chybenchmark //this.kvTimeMsgLog(int32(inMsg.MsgId)) //log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg) ctx := model.Session2Context(in.Session()) return &model.RecvServiceMsgEvent{ Sess: in.Session(), Message: transmitMsg, ClientID: inMsg.ClientId, ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接) } } return in } func (this *BackendTCPEventForCrossRouterHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent { return out } // /////////////////////////////////////////DBTCPEventHook // 处理game和db之间的消息 type ServiceTCPEventHook struct { kvTimeMsgNumList []serverproto.KeyValueType CurTime uint64 } func (this *ServiceTCPEventHook) kvTimeMsgLog(msgId int32) { bChange := false for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ { if this.kvTimeMsgNumList[idx].Key == msgId { this.kvTimeMsgNumList[idx].Value++ bChange = true break } } if !bChange { this.kvTimeMsgNumList = append(this.kvTimeMsgNumList, serverproto.KeyValueType{Key: msgId, Value: 1}) } nowTime := util.GetTimeMilliseconds() if this.CurTime <= 0 { this.CurTime = nowTime } else if nowTime-this.CurTime > 1000 { this.CurTime = nowTime printfListStr := "" for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ { printfListStr += "\n" + strconv.Itoa(int(this.kvTimeMsgNumList[idx].Key)) + "-" + strconv.Itoa(int(this.kvTimeMsgNumList[idx].Value)) } util.DebugF("printfListStr=%v", printfListStr) } } // db接收来自其他服务器的消息 func (this *ServiceTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent { switch inMsg := in.Msg().(type) { case *serverproto.ServiceTransmitAck: dbMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData) if err != nil { util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId) return nil } //chybenchmark //this.kvTimeMsgLog(int32(inMsg.MsgId)) //log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg) ctx := model.Session2Context(in.Session()) return &model.RecvServiceMsgEvent{ Sess: in.Session(), Message: dbMsg, ClientID: inMsg.ClientId, ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接) } } return in } // db发送到其他服务器的消息 func (this *ServiceTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent { //todo... switch out.Msg().(type) { case *serverproto.ServiceTransmitAck: //log.Println("[DBTCPEventHook::OutEvent] DBTransmitAck db to game...", outMsg) } return out } /* ///////////////////////////////////////////AuthTCPEventHook //处理auth和其他服务器之间的消息 type AuthTCPEventHook struct{ } //auth接收到来自其他服务器的消息 func (this *AuthTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent { switch inMsg := in.Msg().(type) { case *serverproto.ServiceTransmitAck: gateMsg, _, err := rpcc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData) if err != nil { util.WarnF("[AuthTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId) return nil } ctx := model.Session2Context(in.Session()) return &model.RecvServiceMsgEvent{ Sess: in.Session(), Message: gateMsg, ClientID: inMsg.ClientId, ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接) } } return in } //auth发送给其他服务器的消息 func (this *AuthTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent { //todo... return out } */