package model import ( "errors" "fmt" "rocommon" "rocommon/rpc" "rocommon/util" "roserver/baseserver/router" "roserver/serverproto" "sort" "strconv" "sync" "time" "unsafe" ) const ( CLIENT_STATE_CONNECTED = 1 CLIENT_STATE_DISJOIN_RECONNECT = 2 //断线状态等待重连 CLIENT_STATE_DISJOIN = 3 //断线状态等待移除列表 ) const CLIENT_TIME_OUT_DURATION = 30 * 60 // //var clientDisJoinReconnect = func(m *StateMachineCore, data interface{}) int32 { // //移除clientUser的Session相关信息,但是保存节点服务器相关信息 // parent := unsafe.Pointer(m) // cli := (*ClientUser)(parent) // // ClientMag.reconnectClientList.Store(cli.OpenId, cli) // // return CLIENT_STATE_DISJOIN_RECONNECT //} var clientConnected = func(m *StateMachineCore, data interface{}) int32 { //移除clientUser的Session相关信息,但是保存节点服务器相关信息 parent := unsafe.Pointer(m) cli := (*ClientUser)(parent) //oldCliUser := ClientMag.GetConnectedFromOpenId(cli.OpenId) //if oldCliUser != nil { // oldCliUser.ClientSession = nil // ClientMag.connectedClientList.Delete(cli.OpenId) //} openId := ConvertPlatform(cli.OpenId, cli.Platform) ClientMag.connectedClientList.Store(openId, cli) return CLIENT_STATE_CONNECTED } // 主线程中不要对结构中的数据做修改,否则会有多线程冲突(或者加锁进行操作) type ClientUser struct { StateMachineCore serviceNode sync.RWMutex ClientSession rocommon.Session LastPingTime time.Time //绑定的后端服务器节点 ServiceTargets map[string]*ServiceBackend //每种类型的服务只会绑定一个节点 gateID ClientID //登陆验证使用 OpenId string Platform string LoginData []byte SeqId uint32 //包序列号,判断合法性 //收到消息数量 msgTime uint64 msgCount uint32 invalidCount uint32 //触发非法次数 //压力测试性能使用 kvTimeLock sync.Mutex kvBossRewardTime map[int32][]*TempKVSt kvIndex int32 } type TempKVSt struct { ClientKVTime uint64 RecvClientKVTime uint64 MsgName string } type ClientID struct { SessID uint64 //客户端在网管上的sessionid ServiceID string //客户端所在的网关 SessIdList []uint64 } type ServiceBackend struct { ServiceName string ServiceID string } func NewUser(cliSession rocommon.Session) *ClientUser { cli := &ClientUser{ ClientSession: cliSession, gateID: ClientID{ SessID: cliSession.ID(), ServiceID: GateServiceID, }, ServiceTargets: map[string]*ServiceBackend{}, kvBossRewardTime: map[int32][]*TempKVSt{}, } cli.init() return cli } func (this *ClientUser) init() { //注册状态机 this.InitState() this.RegisterState(CLIENT_STATE_CONNECTED, clientConnected) } func (this *ClientUser) ConnectReset(cliSession rocommon.Session) error { //关闭之前维护的session //通过无读写默认进行关闭 this.ClientSession.(rocommon.ContextSet).SetContextData("user", nil, "ConnectReset") this.gateID.SessID = cliSession.ID() ret := BindUser2Session(this, cliSession) if ret != nil { return ret } //从重连队列中移除 this.SeqId = 0 this.msgTime = 0 this.msgCount = 0 this.invalidCount = 0 if this.state != CLIENT_STATE_CONNECTED { this.RemoveConnected() this.SwitchState(CLIENT_STATE_CONNECTED, nil) } return nil } func (this *ClientUser) RemoveConnected() { openId := ConvertPlatform(this.OpenId, this.Platform) ClientMag.connectedClientList.Delete(openId) } func (this *ClientUser) Ping() { this.LastPingTime = util.GetCurrentTimeNow() } // gate把接收到的数据直接发送到后端服务器节点 func (this *ClientUser) ClientDirect2Backend(serviceId string, msgId int, seqId uint32, msgData []byte, serviceType string) error { //获得后端服务器节点,并发送 service := GetServiceNode(serviceId) if service == nil { return errors.New(fmt.Sprintf("server nod not find ClientDirect2Backend:%v %v %v", serviceType, msgId, serviceId)) } //压力测试性能使用 kvItem, ok := router.ReqAckKVList[msgId] if ok { this.kvTimeLock.Lock() nowTime := util.GetTimeMilliseconds() this.kvBossRewardTime[kvItem.AckMsgId] = append(this.kvBossRewardTime[kvItem.AckMsgId], &TempKVSt{ RecvClientKVTime: nowTime, MsgName: kvItem.ReqMsgName, }) this.kvTimeLock.Unlock() } //if msgId == 1173 { // this.kvTimeLock.Lock() // tmpMsg1, _, _ := rpcc.DecodeMessage(msgId, msgData) // recordTime := tmpMsg1.(*serverproto.CSPlayerBossRewardReq).RecordTimeStamp // nowTime := util.GetTimeMilliseconds() // util.DebugF("ClientKVTime=%v nowtime=%v", recordTime, nowTime) // this.kvBossRewardTime = append(this.kvBossRewardTime, &TempKVSt{ // RecvClientKVTime: nowTime, // ClientKVTime: recordTime, // }) // this.kvTimeLock.Unlock() //} //用户ID绑定处理 service.Send(&serverproto.GateTransmitAck{ MsgId: uint32(msgId), MsgData: msgData, ClientId: this.gateID.SessID, SeqId: seqId, //KvTime: util.GetTimeMilliseconds(), }) //短时间内收到大量消息 if this.msgTime <= 0 { nowTime := util.GetCurrentTime() this.msgTime = nowTime } this.msgCount++ if this.msgCount >= 30 { nowTime := util.GetCurrentTime() if nowTime-this.msgTime <= 5 { this.invalidCount++ } //log.Printf("msgcount=%v invalidCount=%v deltime=%v", this.msgCount, this.invalidCount, nowTime-this.msgTime) if this.invalidCount >= 3 { this.msgTime = 0 this.invalidCount = 0 this.msgCount = 0 util.ErrorF("ClientDirect2Backend=%v %v %v %v| short time recv too many msg", serviceType, msgId, serviceId, this.OpenId) return errors.New(fmt.Sprintf("ClientDirect2Backend=%v %v %v %v| short time recv too many msg", serviceType, msgId, serviceId, this.OpenId)) } this.msgTime = nowTime this.msgCount = 0 } return nil } type PerformKVTimeSt struct { AckMsgId int32 TotalNum int32 CurTime uint64 //最近这次消耗时间 TotalTime uint64 MsgName string BeginTime uint64 NowTime uint64 LeftNum int PerNum int32 MaxPerNum int32 } var performTime uint64 = 0 var performKvTime = map[int32]*PerformKVTimeSt{} var performKvLock sync.Mutex func (this *ClientUser) KvTestBossReward(msgId int32) { nowTime := util.GetTimeMilliseconds() var delTime uint64 = 0 leftNum := 0 msgName := "" this.kvTimeLock.Lock() if len(this.kvBossRewardTime[msgId]) > 0 { delTime = nowTime - this.kvBossRewardTime[msgId][0].RecvClientKVTime msgName = this.kvBossRewardTime[msgId][0].MsgName this.kvBossRewardTime[msgId] = append(this.kvBossRewardTime[msgId][:0], this.kvBossRewardTime[msgId][1:]...) leftNum = len(this.kvBossRewardTime[msgId]) } this.kvTimeLock.Unlock() if msgName == "" { return } //if msgId == 1067 { // util.DebugF("delTimeLimit msgid=%v deltime=%v", msgId, delTime) //} var printfList []PerformKVTimeSt performKvLock.Lock() performItem, ok := performKvTime[msgId] if ok { performItem.TotalNum++ performItem.TotalTime += delTime performItem.CurTime = delTime performItem.NowTime = uint64(util.GetTimeSeconds()) performItem.LeftNum = leftNum } else { performItem = &PerformKVTimeSt{ TotalTime: delTime, TotalNum: 1, AckMsgId: msgId, MsgName: msgName, BeginTime: uint64(util.GetTimeSeconds()), NowTime: uint64(util.GetTimeSeconds()), LeftNum: leftNum, CurTime: delTime, } performKvTime[msgId] = performItem } tmpPerNum := int32(performItem.NowTime - performItem.BeginTime) if tmpPerNum > 0 { tmpPerNum = performItem.TotalNum / tmpPerNum } else { tmpPerNum = performItem.TotalNum } performItem.PerNum = tmpPerNum if tmpPerNum > performItem.MaxPerNum { performItem.MaxPerNum = tmpPerNum } if performTime <= 0 { performTime = nowTime } else if nowTime-performTime >= 1000 { performTime = nowTime for _, val := range performKvTime { printfList = append(printfList, *val) } } performKvLock.Unlock() if len(printfList) > 0 { sort.Slice(printfList, func(i, j int) bool { return printfList[i].AckMsgId < printfList[j].AckMsgId }) printfListStr := "" for idx := 0; idx < len(printfList); idx++ { //msgIdStr := strconv.Itoa(int(printfList[idx].AckMsgId)) tmpTime := float64(printfList[idx].TotalTime) / float64(printfList[idx].TotalNum) printfListStr += " \n" + strconv.FormatInt(int64(printfList[idx].CurTime), 10) + "-" + strconv.FormatInt(int64(tmpTime), 10) + "(ms) | " + strconv.FormatInt(int64(printfList[idx].PerNum), 10) + "-" + strconv.FormatInt(int64(printfList[idx].MaxPerNum), 10) + "(num/s) | " + strconv.Itoa(int(printfList[idx].TotalNum)) + "(total) | " + strconv.Itoa(printfList[idx].LeftNum) + "(left) " + printfList[idx].MsgName } util.DebugF("printfListStr=%v", printfListStr) } //if len(this.kvBossRewardTime) > 0 { // delTime := nowTime - this.kvBossRewardTime[0].ClientKVTime // delTime1 := nowTime - this.kvBossRewardTime[0].RecvClientKVTime // delTime2 := this.kvBossRewardTime[0].RecvClientKVTime - this.kvBossRewardTime[0].ClientKVTime // this.kvIndex++ // //if delTime > 0 { // // util.DebugF("kvtime cid=%v idx=%v 1073time=%v", this.ClientSession.ID(), this.kvIndex, delTime) // //} // util.DebugF("kvtime cid=%v idx=%v time=%v time1=%v time2=%v | %v", this.ClientSession.ID(), // this.kvIndex, delTime, delTime1, delTime2, // this.kvBossRewardTime[0].ClientKVTime, this.kvBossRewardTime[0].RecvClientKVTime) // this.kvBossRewardTime = append(this.kvBossRewardTime[:0], this.kvBossRewardTime[1:]...) //} //this.kvTimeLock.Unlock() } func (this *ClientUser) ClientDirect2BackendByServiceName(serviceName string, msgId int, seqId uint32, msgData []byte, serviceType string) error { serviceId := this.GetServiceBackend(serviceName) //获得后端服务器节点,并发送 service := GetServiceNode(serviceId) if service == nil { return errors.New(fmt.Sprintf("server nod not find[ClientDirect2BackendByServiceName]:%v %v %v", serviceType, msgId, serviceId)) } //用户ID绑定处理 service.Send(&serverproto.GateTransmitAck{ MsgId: uint32(msgId), MsgData: msgData, ClientId: this.gateID.SessID, SeqId: seqId, }) return nil } // gate发送消息到后端指定服务器节点,例如serviceName为game,就是发送到game服务器 func (this *ClientUser) Client2Backend(serviceName string, msg interface{}) error { serviceId := this.GetServiceBackend(serviceName) //获得后端服务器节点,并发送 service := GetServiceNode(serviceId) if service == nil { return errors.New(fmt.Sprintf("server nod not find[Client2Backend]:%v %v", serviceId, msg)) } msgData, info, err := rpc.EncodeMessage(msg) if err != nil { return err } //用户ID绑定处理 service.Send(&serverproto.GateTransmitAck{ MsgId: uint32(info.ID), MsgData: msgData, ClientId: this.gateID.SessID, }) return nil } func (this *ClientUser) Broadcast2Backend(msg interface{}) { this.serviceNode.RLock() defer this.serviceNode.RUnlock() //todo... // 发送到该用户绑定的后端节点上,也许有不同类型的服务器节点需绑定 for _, node := range this.ServiceTargets { nodeSess := GetServiceNode(node.ServiceID) if nodeSess != nil { nodeSess.Send(msg) } } } // 绑定用户需要发送到后台服务器的节点信息 func (this *ClientUser) SetServiceBackend(serviceName string, serviceID string) { this.serviceNode.Lock() defer this.serviceNode.Unlock() //每种类型的服务只会绑定一个节点 if data, ok := this.ServiceTargets[serviceName]; ok { data.ServiceID = serviceID return } this.ServiceTargets[serviceName] = &ServiceBackend{ ServiceName: serviceName, ServiceID: serviceID, } } // 根据服务器类型获取后端服务器节点信息 func (this *ClientUser) GetServiceBackend(serviceName string) string { this.serviceNode.RLock() defer this.serviceNode.RUnlock() if data, ok := this.ServiceTargets[serviceName]; ok { return data.ServiceID } return "" } // /ClientUserManager type ClientUserManager struct { StateMachineCore //reconnectClientList sync.Map //[openId, cli] connectedClientList sync.Map //[Openid, cli] } func NewClientUserManager() *ClientUserManager { mag := &ClientUserManager{ connectedClientList: sync.Map{}, } mag.InitState() return mag } // /game节点挂了,踢掉这个game上的玩家 func (this *ClientUserManager) OnLogicDisJoin(serviceId string) { //todo... //for _,cli :=range this.clientUserList { // if cli.GetServiceBackend(SERVICE_NODE_TYPE_GAME_STR) == serviceId { // } //} } func (this *ClientUserManager) AddClient(cliSession rocommon.Session, openId, platform string) *ClientUser { cli := NewUser(cliSession) cli.OpenId = openId cli.Platform = platform cli.SwitchState(CLIENT_STATE_CONNECTED, nil) return cli } func (this *ClientUserManager) GetConnectedFromOpenId(openId, platform string) *ClientUser { openId = ConvertPlatform(openId, platform) //如果是服务器重启可能不存在cliUser,这边需要做判空处理 cli, ok := this.connectedClientList.Load(openId) if !ok { return nil } cliUser := cli.(*ClientUser) if cliUser != nil { return cliUser } return nil } func (this *ClientUserManager) RemoveConnectedFromOpenId(openId, platform string) { openId = ConvertPlatform(openId, platform) _, ok := this.connectedClientList.Load(openId) if !ok { return } this.connectedClientList.Delete(openId) } func (this *ClientUserManager) ClientUserReBindOpenId(oldOpenId, newOpenid, platform string, cli *ClientUser) { oldOpenId = ConvertPlatform(oldOpenId, platform) newOpenid = ConvertPlatform(newOpenid, platform) this.connectedClientList.Delete(oldOpenId) cli.OpenId = newOpenid cli.Platform = platform ClientMag.connectedClientList.Store(newOpenid, cli) }