| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486 |
- package model
- import (
- "errors"
- "fmt"
- "rocommon"
- "rocommon/rpc"
- "rocommon/util"
- "roserver/baseserver/router"
- "roserver/serverproto"
- "sort"
- "strconv"
- "sync"
- "time"
- "unsafe"
- )
- const (
- CLIENT_STATE_CONNECTED = 1
- CLIENT_STATE_DISJOIN_RECONNECT = 2 //断线状态等待重连
- CLIENT_STATE_DISJOIN = 3 //断线状态等待移除列表
- )
- const CLIENT_TIME_OUT_DURATION = 30 * 60
- //
- //var clientDisJoinReconnect = func(m *StateMachineCore, data interface{}) int32 {
- // //移除clientUser的Session相关信息,但是保存节点服务器相关信息
- // parent := unsafe.Pointer(m)
- // cli := (*ClientUser)(parent)
- //
- // ClientMag.reconnectClientList.Store(cli.OpenId, cli)
- //
- // return CLIENT_STATE_DISJOIN_RECONNECT
- //}
- var clientConnected = func(m *StateMachineCore, data interface{}) int32 {
- //移除clientUser的Session相关信息,但是保存节点服务器相关信息
- parent := unsafe.Pointer(m)
- cli := (*ClientUser)(parent)
- //oldCliUser := ClientMag.GetConnectedFromOpenId(cli.OpenId)
- //if oldCliUser != nil {
- // oldCliUser.ClientSession = nil
- // ClientMag.connectedClientList.Delete(cli.OpenId)
- //}
- openId := ConvertPlatform(cli.OpenId, cli.Platform)
- ClientMag.connectedClientList.Store(openId, cli)
- return CLIENT_STATE_CONNECTED
- }
- // 主线程中不要对结构中的数据做修改,否则会有多线程冲突(或者加锁进行操作)
- type ClientUser struct {
- StateMachineCore
- serviceNode sync.RWMutex
- ClientSession rocommon.Session
- LastPingTime time.Time
- //绑定的后端服务器节点
- ServiceTargets map[string]*ServiceBackend //每种类型的服务只会绑定一个节点
- gateID ClientID
- //登陆验证使用
- OpenId string
- Platform string
- LoginData []byte
- SeqId uint32 //包序列号,判断合法性
- //收到消息数量
- msgTime uint64
- msgCount uint32
- invalidCount uint32 //触发非法次数
- //压力测试性能使用
- kvTimeLock sync.Mutex
- kvBossRewardTime map[int32][]*TempKVSt
- kvIndex int32
- }
- type TempKVSt struct {
- ClientKVTime uint64
- RecvClientKVTime uint64
- MsgName string
- }
- type ClientID struct {
- SessID uint64 //客户端在网管上的sessionid
- ServiceID string //客户端所在的网关
- SessIdList []uint64
- }
- type ServiceBackend struct {
- ServiceName string
- ServiceID string
- }
- func NewUser(cliSession rocommon.Session) *ClientUser {
- cli := &ClientUser{
- ClientSession: cliSession,
- gateID: ClientID{
- SessID: cliSession.ID(),
- ServiceID: GateServiceID,
- },
- ServiceTargets: map[string]*ServiceBackend{},
- kvBossRewardTime: map[int32][]*TempKVSt{},
- }
- cli.init()
- return cli
- }
- func (this *ClientUser) init() {
- //注册状态机
- this.InitState()
- this.RegisterState(CLIENT_STATE_CONNECTED, clientConnected)
- }
- func (this *ClientUser) ConnectReset(cliSession rocommon.Session) error {
- //关闭之前维护的session
- //通过无读写默认进行关闭
- this.ClientSession.(rocommon.ContextSet).SetContextData("user", nil, "ConnectReset")
- this.gateID.SessID = cliSession.ID()
- ret := BindUser2Session(this, cliSession)
- if ret != nil {
- return ret
- }
- //从重连队列中移除
- this.SeqId = 0
- this.msgTime = 0
- this.msgCount = 0
- this.invalidCount = 0
- if this.state != CLIENT_STATE_CONNECTED {
- this.RemoveConnected()
- this.SwitchState(CLIENT_STATE_CONNECTED, nil)
- }
- return nil
- }
- func (this *ClientUser) RemoveConnected() {
- openId := ConvertPlatform(this.OpenId, this.Platform)
- ClientMag.connectedClientList.Delete(openId)
- }
- func (this *ClientUser) Ping() {
- this.LastPingTime = util.GetCurrentTimeNow()
- }
- // gate把接收到的数据直接发送到后端服务器节点
- func (this *ClientUser) ClientDirect2Backend(serviceId string, msgId int, seqId uint32, msgData []byte, serviceType string) error {
- //获得后端服务器节点,并发送
- service := GetServiceNode(serviceId)
- if service == nil {
- return errors.New(fmt.Sprintf("server nod not find ClientDirect2Backend:%v %v %v", serviceType, msgId, serviceId))
- }
- //压力测试性能使用
- kvItem, ok := router.ReqAckKVList[msgId]
- if ok {
- this.kvTimeLock.Lock()
- nowTime := util.GetTimeMilliseconds()
- this.kvBossRewardTime[kvItem.AckMsgId] = append(this.kvBossRewardTime[kvItem.AckMsgId],
- &TempKVSt{
- RecvClientKVTime: nowTime,
- MsgName: kvItem.ReqMsgName,
- })
- this.kvTimeLock.Unlock()
- }
- //if msgId == 1173 {
- // this.kvTimeLock.Lock()
- // tmpMsg1, _, _ := rpcc.DecodeMessage(msgId, msgData)
- // recordTime := tmpMsg1.(*serverproto.CSPlayerBossRewardReq).RecordTimeStamp
- // nowTime := util.GetTimeMilliseconds()
- // util.DebugF("ClientKVTime=%v nowtime=%v", recordTime, nowTime)
- // this.kvBossRewardTime = append(this.kvBossRewardTime, &TempKVSt{
- // RecvClientKVTime: nowTime,
- // ClientKVTime: recordTime,
- // })
- // this.kvTimeLock.Unlock()
- //}
- //用户ID绑定处理
- service.Send(&serverproto.GateTransmitAck{
- MsgId: uint32(msgId),
- MsgData: msgData,
- ClientId: this.gateID.SessID,
- SeqId: seqId,
- //KvTime: util.GetTimeMilliseconds(),
- })
- //短时间内收到大量消息
- if this.msgTime <= 0 {
- nowTime := util.GetCurrentTime()
- this.msgTime = nowTime
- }
- this.msgCount++
- if this.msgCount >= 30 {
- nowTime := util.GetCurrentTime()
- if nowTime-this.msgTime <= 5 {
- this.invalidCount++
- }
- //log.Printf("msgcount=%v invalidCount=%v deltime=%v", this.msgCount, this.invalidCount, nowTime-this.msgTime)
- if this.invalidCount >= 3 {
- this.msgTime = 0
- this.invalidCount = 0
- this.msgCount = 0
- util.ErrorF("ClientDirect2Backend=%v %v %v %v| short time recv too many msg", serviceType, msgId, serviceId, this.OpenId)
- return errors.New(fmt.Sprintf("ClientDirect2Backend=%v %v %v %v| short time recv too many msg", serviceType, msgId, serviceId, this.OpenId))
- }
- this.msgTime = nowTime
- this.msgCount = 0
- }
- return nil
- }
- type PerformKVTimeSt struct {
- AckMsgId int32
- TotalNum int32
- CurTime uint64 //最近这次消耗时间
- TotalTime uint64
- MsgName string
- BeginTime uint64
- NowTime uint64
- LeftNum int
- PerNum int32
- MaxPerNum int32
- }
- var performTime uint64 = 0
- var performKvTime = map[int32]*PerformKVTimeSt{}
- var performKvLock sync.Mutex
- func (this *ClientUser) KvTestBossReward(msgId int32) {
- nowTime := util.GetTimeMilliseconds()
- var delTime uint64 = 0
- leftNum := 0
- msgName := ""
- this.kvTimeLock.Lock()
- if len(this.kvBossRewardTime[msgId]) > 0 {
- delTime = nowTime - this.kvBossRewardTime[msgId][0].RecvClientKVTime
- msgName = this.kvBossRewardTime[msgId][0].MsgName
- this.kvBossRewardTime[msgId] = append(this.kvBossRewardTime[msgId][:0], this.kvBossRewardTime[msgId][1:]...)
- leftNum = len(this.kvBossRewardTime[msgId])
- }
- this.kvTimeLock.Unlock()
- if msgName == "" {
- return
- }
- //if msgId == 1067 {
- // util.DebugF("delTimeLimit msgid=%v deltime=%v", msgId, delTime)
- //}
- var printfList []PerformKVTimeSt
- performKvLock.Lock()
- performItem, ok := performKvTime[msgId]
- if ok {
- performItem.TotalNum++
- performItem.TotalTime += delTime
- performItem.CurTime = delTime
- performItem.NowTime = uint64(util.GetTimeSeconds())
- performItem.LeftNum = leftNum
- } else {
- performItem = &PerformKVTimeSt{
- TotalTime: delTime,
- TotalNum: 1,
- AckMsgId: msgId,
- MsgName: msgName,
- BeginTime: uint64(util.GetTimeSeconds()),
- NowTime: uint64(util.GetTimeSeconds()),
- LeftNum: leftNum,
- CurTime: delTime,
- }
- performKvTime[msgId] = performItem
- }
- tmpPerNum := int32(performItem.NowTime - performItem.BeginTime)
- if tmpPerNum > 0 {
- tmpPerNum = performItem.TotalNum / tmpPerNum
- } else {
- tmpPerNum = performItem.TotalNum
- }
- performItem.PerNum = tmpPerNum
- if tmpPerNum > performItem.MaxPerNum {
- performItem.MaxPerNum = tmpPerNum
- }
- if performTime <= 0 {
- performTime = nowTime
- } else if nowTime-performTime >= 1000 {
- performTime = nowTime
- for _, val := range performKvTime {
- printfList = append(printfList, *val)
- }
- }
- performKvLock.Unlock()
- if len(printfList) > 0 {
- sort.Slice(printfList, func(i, j int) bool {
- return printfList[i].AckMsgId < printfList[j].AckMsgId
- })
- printfListStr := ""
- for idx := 0; idx < len(printfList); idx++ {
- //msgIdStr := strconv.Itoa(int(printfList[idx].AckMsgId))
- tmpTime := float64(printfList[idx].TotalTime) / float64(printfList[idx].TotalNum)
- printfListStr += " \n" +
- strconv.FormatInt(int64(printfList[idx].CurTime), 10) + "-" + strconv.FormatInt(int64(tmpTime), 10) + "(ms) | " +
- strconv.FormatInt(int64(printfList[idx].PerNum), 10) + "-" + strconv.FormatInt(int64(printfList[idx].MaxPerNum), 10) + "(num/s) | " +
- strconv.Itoa(int(printfList[idx].TotalNum)) + "(total) | " +
- strconv.Itoa(printfList[idx].LeftNum) + "(left) " + printfList[idx].MsgName
- }
- util.DebugF("printfListStr=%v", printfListStr)
- }
- //if len(this.kvBossRewardTime) > 0 {
- // delTime := nowTime - this.kvBossRewardTime[0].ClientKVTime
- // delTime1 := nowTime - this.kvBossRewardTime[0].RecvClientKVTime
- // delTime2 := this.kvBossRewardTime[0].RecvClientKVTime - this.kvBossRewardTime[0].ClientKVTime
- // this.kvIndex++
- // //if delTime > 0 {
- // // util.DebugF("kvtime cid=%v idx=%v 1073time=%v", this.ClientSession.ID(), this.kvIndex, delTime)
- // //}
- // util.DebugF("kvtime cid=%v idx=%v time=%v time1=%v time2=%v | %v", this.ClientSession.ID(),
- // this.kvIndex, delTime, delTime1, delTime2,
- // this.kvBossRewardTime[0].ClientKVTime, this.kvBossRewardTime[0].RecvClientKVTime)
- // this.kvBossRewardTime = append(this.kvBossRewardTime[:0], this.kvBossRewardTime[1:]...)
- //}
- //this.kvTimeLock.Unlock()
- }
- func (this *ClientUser) ClientDirect2BackendByServiceName(serviceName string, msgId int, seqId uint32, msgData []byte, serviceType string) error {
- serviceId := this.GetServiceBackend(serviceName)
- //获得后端服务器节点,并发送
- service := GetServiceNode(serviceId)
- if service == nil {
- return errors.New(fmt.Sprintf("server nod not find[ClientDirect2BackendByServiceName]:%v %v %v", serviceType, msgId, serviceId))
- }
- //用户ID绑定处理
- service.Send(&serverproto.GateTransmitAck{
- MsgId: uint32(msgId),
- MsgData: msgData,
- ClientId: this.gateID.SessID,
- SeqId: seqId,
- })
- return nil
- }
- // gate发送消息到后端指定服务器节点,例如serviceName为game,就是发送到game服务器
- func (this *ClientUser) Client2Backend(serviceName string, msg interface{}) error {
- serviceId := this.GetServiceBackend(serviceName)
- //获得后端服务器节点,并发送
- service := GetServiceNode(serviceId)
- if service == nil {
- return errors.New(fmt.Sprintf("server nod not find[Client2Backend]:%v %v", serviceId, msg))
- }
- msgData, info, err := rpc.EncodeMessage(msg)
- if err != nil {
- return err
- }
- //用户ID绑定处理
- service.Send(&serverproto.GateTransmitAck{
- MsgId: uint32(info.ID),
- MsgData: msgData,
- ClientId: this.gateID.SessID,
- })
- return nil
- }
- func (this *ClientUser) Broadcast2Backend(msg interface{}) {
- this.serviceNode.RLock()
- defer this.serviceNode.RUnlock()
- //todo...
- // 发送到该用户绑定的后端节点上,也许有不同类型的服务器节点需绑定
- for _, node := range this.ServiceTargets {
- nodeSess := GetServiceNode(node.ServiceID)
- if nodeSess != nil {
- nodeSess.Send(msg)
- }
- }
- }
- // 绑定用户需要发送到后台服务器的节点信息
- func (this *ClientUser) SetServiceBackend(serviceName string, serviceID string) {
- this.serviceNode.Lock()
- defer this.serviceNode.Unlock()
- //每种类型的服务只会绑定一个节点
- if data, ok := this.ServiceTargets[serviceName]; ok {
- data.ServiceID = serviceID
- return
- }
- this.ServiceTargets[serviceName] = &ServiceBackend{
- ServiceName: serviceName,
- ServiceID: serviceID,
- }
- }
- // 根据服务器类型获取后端服务器节点信息
- func (this *ClientUser) GetServiceBackend(serviceName string) string {
- this.serviceNode.RLock()
- defer this.serviceNode.RUnlock()
- if data, ok := this.ServiceTargets[serviceName]; ok {
- return data.ServiceID
- }
- return ""
- }
- // /ClientUserManager
- type ClientUserManager struct {
- StateMachineCore
- //reconnectClientList sync.Map //[openId, cli]
- connectedClientList sync.Map //[Openid, cli]
- }
- func NewClientUserManager() *ClientUserManager {
- mag := &ClientUserManager{
- connectedClientList: sync.Map{},
- }
- mag.InitState()
- return mag
- }
- // /game节点挂了,踢掉这个game上的玩家
- func (this *ClientUserManager) OnLogicDisJoin(serviceId string) {
- //todo...
- //for _,cli :=range this.clientUserList {
- // if cli.GetServiceBackend(SERVICE_NODE_TYPE_GAME_STR) == serviceId {
- // }
- //}
- }
- func (this *ClientUserManager) AddClient(cliSession rocommon.Session, openId, platform string) *ClientUser {
- cli := NewUser(cliSession)
- cli.OpenId = openId
- cli.Platform = platform
- cli.SwitchState(CLIENT_STATE_CONNECTED, nil)
- return cli
- }
- func (this *ClientUserManager) GetConnectedFromOpenId(openId, platform string) *ClientUser {
- openId = ConvertPlatform(openId, platform)
- //如果是服务器重启可能不存在cliUser,这边需要做判空处理
- cli, ok := this.connectedClientList.Load(openId)
- if !ok {
- return nil
- }
- cliUser := cli.(*ClientUser)
- if cliUser != nil {
- return cliUser
- }
- return nil
- }
- func (this *ClientUserManager) RemoveConnectedFromOpenId(openId, platform string) {
- openId = ConvertPlatform(openId, platform)
- _, ok := this.connectedClientList.Load(openId)
- if !ok {
- return
- }
- this.connectedClientList.Delete(openId)
- }
- func (this *ClientUserManager) ClientUserReBindOpenId(oldOpenId, newOpenid, platform string, cli *ClientUser) {
- oldOpenId = ConvertPlatform(oldOpenId, platform)
- newOpenid = ConvertPlatform(newOpenid, platform)
- this.connectedClientList.Delete(oldOpenId)
- cli.OpenId = newOpenid
- cli.Platform = platform
- ClientMag.connectedClientList.Store(newOpenid, cli)
- }
|