client_user.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. package model
  2. import (
  3. "errors"
  4. "fmt"
  5. "rocommon"
  6. "rocommon/rpc"
  7. "rocommon/util"
  8. "roserver/baseserver/router"
  9. "roserver/serverproto"
  10. "sort"
  11. "strconv"
  12. "sync"
  13. "time"
  14. "unsafe"
  15. )
  16. const (
  17. CLIENT_STATE_CONNECTED = 1
  18. CLIENT_STATE_DISJOIN_RECONNECT = 2 //断线状态等待重连
  19. CLIENT_STATE_DISJOIN = 3 //断线状态等待移除列表
  20. )
  21. const CLIENT_TIME_OUT_DURATION = 30 * 60
  22. //
  23. //var clientDisJoinReconnect = func(m *StateMachineCore, data interface{}) int32 {
  24. // //移除clientUser的Session相关信息,但是保存节点服务器相关信息
  25. // parent := unsafe.Pointer(m)
  26. // cli := (*ClientUser)(parent)
  27. //
  28. // ClientMag.reconnectClientList.Store(cli.OpenId, cli)
  29. //
  30. // return CLIENT_STATE_DISJOIN_RECONNECT
  31. //}
  32. var clientConnected = func(m *StateMachineCore, data interface{}) int32 {
  33. //移除clientUser的Session相关信息,但是保存节点服务器相关信息
  34. parent := unsafe.Pointer(m)
  35. cli := (*ClientUser)(parent)
  36. //oldCliUser := ClientMag.GetConnectedFromOpenId(cli.OpenId)
  37. //if oldCliUser != nil {
  38. // oldCliUser.ClientSession = nil
  39. // ClientMag.connectedClientList.Delete(cli.OpenId)
  40. //}
  41. openId := ConvertPlatform(cli.OpenId, cli.Platform)
  42. ClientMag.connectedClientList.Store(openId, cli)
  43. return CLIENT_STATE_CONNECTED
  44. }
  45. // 主线程中不要对结构中的数据做修改,否则会有多线程冲突(或者加锁进行操作)
  46. type ClientUser struct {
  47. StateMachineCore
  48. serviceNode sync.RWMutex
  49. ClientSession rocommon.Session
  50. LastPingTime time.Time
  51. //绑定的后端服务器节点
  52. ServiceTargets map[string]*ServiceBackend //每种类型的服务只会绑定一个节点
  53. gateID ClientID
  54. //登陆验证使用
  55. OpenId string
  56. Platform string
  57. LoginData []byte
  58. SeqId uint32 //包序列号,判断合法性
  59. //收到消息数量
  60. msgTime uint64
  61. msgCount uint32
  62. invalidCount uint32 //触发非法次数
  63. //压力测试性能使用
  64. kvTimeLock sync.Mutex
  65. kvBossRewardTime map[int32][]*TempKVSt
  66. kvIndex int32
  67. }
  68. type TempKVSt struct {
  69. ClientKVTime uint64
  70. RecvClientKVTime uint64
  71. MsgName string
  72. }
  73. type ClientID struct {
  74. SessID uint64 //客户端在网管上的sessionid
  75. ServiceID string //客户端所在的网关
  76. SessIdList []uint64
  77. }
  78. type ServiceBackend struct {
  79. ServiceName string
  80. ServiceID string
  81. }
  82. func NewUser(cliSession rocommon.Session) *ClientUser {
  83. cli := &ClientUser{
  84. ClientSession: cliSession,
  85. gateID: ClientID{
  86. SessID: cliSession.ID(),
  87. ServiceID: GateServiceID,
  88. },
  89. ServiceTargets: map[string]*ServiceBackend{},
  90. kvBossRewardTime: map[int32][]*TempKVSt{},
  91. }
  92. cli.init()
  93. return cli
  94. }
  95. func (this *ClientUser) init() {
  96. //注册状态机
  97. this.InitState()
  98. this.RegisterState(CLIENT_STATE_CONNECTED, clientConnected)
  99. }
  100. func (this *ClientUser) ConnectReset(cliSession rocommon.Session) error {
  101. //关闭之前维护的session
  102. //通过无读写默认进行关闭
  103. this.ClientSession.(rocommon.ContextSet).SetContextData("user", nil, "ConnectReset")
  104. this.gateID.SessID = cliSession.ID()
  105. ret := BindUser2Session(this, cliSession)
  106. if ret != nil {
  107. return ret
  108. }
  109. //从重连队列中移除
  110. this.SeqId = 0
  111. this.msgTime = 0
  112. this.msgCount = 0
  113. this.invalidCount = 0
  114. if this.state != CLIENT_STATE_CONNECTED {
  115. this.RemoveConnected()
  116. this.SwitchState(CLIENT_STATE_CONNECTED, nil)
  117. }
  118. return nil
  119. }
  120. func (this *ClientUser) RemoveConnected() {
  121. openId := ConvertPlatform(this.OpenId, this.Platform)
  122. ClientMag.connectedClientList.Delete(openId)
  123. }
  124. func (this *ClientUser) Ping() {
  125. this.LastPingTime = util.GetCurrentTimeNow()
  126. }
  127. // gate把接收到的数据直接发送到后端服务器节点
  128. func (this *ClientUser) ClientDirect2Backend(serviceId string, msgId int, seqId uint32, msgData []byte, serviceType string) error {
  129. //获得后端服务器节点,并发送
  130. service := GetServiceNode(serviceId)
  131. if service == nil {
  132. return errors.New(fmt.Sprintf("server nod not find ClientDirect2Backend:%v %v %v", serviceType, msgId, serviceId))
  133. }
  134. //压力测试性能使用
  135. kvItem, ok := router.ReqAckKVList[msgId]
  136. if ok {
  137. this.kvTimeLock.Lock()
  138. nowTime := util.GetTimeMilliseconds()
  139. this.kvBossRewardTime[kvItem.AckMsgId] = append(this.kvBossRewardTime[kvItem.AckMsgId],
  140. &TempKVSt{
  141. RecvClientKVTime: nowTime,
  142. MsgName: kvItem.ReqMsgName,
  143. })
  144. this.kvTimeLock.Unlock()
  145. }
  146. //if msgId == 1173 {
  147. // this.kvTimeLock.Lock()
  148. // tmpMsg1, _, _ := rpcc.DecodeMessage(msgId, msgData)
  149. // recordTime := tmpMsg1.(*serverproto.CSPlayerBossRewardReq).RecordTimeStamp
  150. // nowTime := util.GetTimeMilliseconds()
  151. // util.DebugF("ClientKVTime=%v nowtime=%v", recordTime, nowTime)
  152. // this.kvBossRewardTime = append(this.kvBossRewardTime, &TempKVSt{
  153. // RecvClientKVTime: nowTime,
  154. // ClientKVTime: recordTime,
  155. // })
  156. // this.kvTimeLock.Unlock()
  157. //}
  158. //用户ID绑定处理
  159. service.Send(&serverproto.GateTransmitAck{
  160. MsgId: uint32(msgId),
  161. MsgData: msgData,
  162. ClientId: this.gateID.SessID,
  163. SeqId: seqId,
  164. //KvTime: util.GetTimeMilliseconds(),
  165. })
  166. //短时间内收到大量消息
  167. if this.msgTime <= 0 {
  168. nowTime := util.GetCurrentTime()
  169. this.msgTime = nowTime
  170. }
  171. this.msgCount++
  172. if this.msgCount >= 30 {
  173. nowTime := util.GetCurrentTime()
  174. if nowTime-this.msgTime <= 5 {
  175. this.invalidCount++
  176. }
  177. //log.Printf("msgcount=%v invalidCount=%v deltime=%v", this.msgCount, this.invalidCount, nowTime-this.msgTime)
  178. if this.invalidCount >= 3 {
  179. this.msgTime = 0
  180. this.invalidCount = 0
  181. this.msgCount = 0
  182. util.ErrorF("ClientDirect2Backend=%v %v %v %v| short time recv too many msg", serviceType, msgId, serviceId, this.OpenId)
  183. return errors.New(fmt.Sprintf("ClientDirect2Backend=%v %v %v %v| short time recv too many msg", serviceType, msgId, serviceId, this.OpenId))
  184. }
  185. this.msgTime = nowTime
  186. this.msgCount = 0
  187. }
  188. return nil
  189. }
  190. type PerformKVTimeSt struct {
  191. AckMsgId int32
  192. TotalNum int32
  193. CurTime uint64 //最近这次消耗时间
  194. TotalTime uint64
  195. MsgName string
  196. BeginTime uint64
  197. NowTime uint64
  198. LeftNum int
  199. PerNum int32
  200. MaxPerNum int32
  201. }
  202. var performTime uint64 = 0
  203. var performKvTime = map[int32]*PerformKVTimeSt{}
  204. var performKvLock sync.Mutex
  205. func (this *ClientUser) KvTestBossReward(msgId int32) {
  206. nowTime := util.GetTimeMilliseconds()
  207. var delTime uint64 = 0
  208. leftNum := 0
  209. msgName := ""
  210. this.kvTimeLock.Lock()
  211. if len(this.kvBossRewardTime[msgId]) > 0 {
  212. delTime = nowTime - this.kvBossRewardTime[msgId][0].RecvClientKVTime
  213. msgName = this.kvBossRewardTime[msgId][0].MsgName
  214. this.kvBossRewardTime[msgId] = append(this.kvBossRewardTime[msgId][:0], this.kvBossRewardTime[msgId][1:]...)
  215. leftNum = len(this.kvBossRewardTime[msgId])
  216. }
  217. this.kvTimeLock.Unlock()
  218. if msgName == "" {
  219. return
  220. }
  221. //if msgId == 1067 {
  222. // util.DebugF("delTimeLimit msgid=%v deltime=%v", msgId, delTime)
  223. //}
  224. var printfList []PerformKVTimeSt
  225. performKvLock.Lock()
  226. performItem, ok := performKvTime[msgId]
  227. if ok {
  228. performItem.TotalNum++
  229. performItem.TotalTime += delTime
  230. performItem.CurTime = delTime
  231. performItem.NowTime = uint64(util.GetTimeSeconds())
  232. performItem.LeftNum = leftNum
  233. } else {
  234. performItem = &PerformKVTimeSt{
  235. TotalTime: delTime,
  236. TotalNum: 1,
  237. AckMsgId: msgId,
  238. MsgName: msgName,
  239. BeginTime: uint64(util.GetTimeSeconds()),
  240. NowTime: uint64(util.GetTimeSeconds()),
  241. LeftNum: leftNum,
  242. CurTime: delTime,
  243. }
  244. performKvTime[msgId] = performItem
  245. }
  246. tmpPerNum := int32(performItem.NowTime - performItem.BeginTime)
  247. if tmpPerNum > 0 {
  248. tmpPerNum = performItem.TotalNum / tmpPerNum
  249. } else {
  250. tmpPerNum = performItem.TotalNum
  251. }
  252. performItem.PerNum = tmpPerNum
  253. if tmpPerNum > performItem.MaxPerNum {
  254. performItem.MaxPerNum = tmpPerNum
  255. }
  256. if performTime <= 0 {
  257. performTime = nowTime
  258. } else if nowTime-performTime >= 1000 {
  259. performTime = nowTime
  260. for _, val := range performKvTime {
  261. printfList = append(printfList, *val)
  262. }
  263. }
  264. performKvLock.Unlock()
  265. if len(printfList) > 0 {
  266. sort.Slice(printfList, func(i, j int) bool {
  267. return printfList[i].AckMsgId < printfList[j].AckMsgId
  268. })
  269. printfListStr := ""
  270. for idx := 0; idx < len(printfList); idx++ {
  271. //msgIdStr := strconv.Itoa(int(printfList[idx].AckMsgId))
  272. tmpTime := float64(printfList[idx].TotalTime) / float64(printfList[idx].TotalNum)
  273. printfListStr += " \n" +
  274. strconv.FormatInt(int64(printfList[idx].CurTime), 10) + "-" + strconv.FormatInt(int64(tmpTime), 10) + "(ms) | " +
  275. strconv.FormatInt(int64(printfList[idx].PerNum), 10) + "-" + strconv.FormatInt(int64(printfList[idx].MaxPerNum), 10) + "(num/s) | " +
  276. strconv.Itoa(int(printfList[idx].TotalNum)) + "(total) | " +
  277. strconv.Itoa(printfList[idx].LeftNum) + "(left) " + printfList[idx].MsgName
  278. }
  279. util.DebugF("printfListStr=%v", printfListStr)
  280. }
  281. //if len(this.kvBossRewardTime) > 0 {
  282. // delTime := nowTime - this.kvBossRewardTime[0].ClientKVTime
  283. // delTime1 := nowTime - this.kvBossRewardTime[0].RecvClientKVTime
  284. // delTime2 := this.kvBossRewardTime[0].RecvClientKVTime - this.kvBossRewardTime[0].ClientKVTime
  285. // this.kvIndex++
  286. // //if delTime > 0 {
  287. // // util.DebugF("kvtime cid=%v idx=%v 1073time=%v", this.ClientSession.ID(), this.kvIndex, delTime)
  288. // //}
  289. // util.DebugF("kvtime cid=%v idx=%v time=%v time1=%v time2=%v | %v", this.ClientSession.ID(),
  290. // this.kvIndex, delTime, delTime1, delTime2,
  291. // this.kvBossRewardTime[0].ClientKVTime, this.kvBossRewardTime[0].RecvClientKVTime)
  292. // this.kvBossRewardTime = append(this.kvBossRewardTime[:0], this.kvBossRewardTime[1:]...)
  293. //}
  294. //this.kvTimeLock.Unlock()
  295. }
  296. func (this *ClientUser) ClientDirect2BackendByServiceName(serviceName string, msgId int, seqId uint32, msgData []byte, serviceType string) error {
  297. serviceId := this.GetServiceBackend(serviceName)
  298. //获得后端服务器节点,并发送
  299. service := GetServiceNode(serviceId)
  300. if service == nil {
  301. return errors.New(fmt.Sprintf("server nod not find[ClientDirect2BackendByServiceName]:%v %v %v", serviceType, msgId, serviceId))
  302. }
  303. //用户ID绑定处理
  304. service.Send(&serverproto.GateTransmitAck{
  305. MsgId: uint32(msgId),
  306. MsgData: msgData,
  307. ClientId: this.gateID.SessID,
  308. SeqId: seqId,
  309. })
  310. return nil
  311. }
  312. // gate发送消息到后端指定服务器节点,例如serviceName为game,就是发送到game服务器
  313. func (this *ClientUser) Client2Backend(serviceName string, msg interface{}) error {
  314. serviceId := this.GetServiceBackend(serviceName)
  315. //获得后端服务器节点,并发送
  316. service := GetServiceNode(serviceId)
  317. if service == nil {
  318. return errors.New(fmt.Sprintf("server nod not find[Client2Backend]:%v %v", serviceId, msg))
  319. }
  320. msgData, info, err := rpc.EncodeMessage(msg)
  321. if err != nil {
  322. return err
  323. }
  324. //用户ID绑定处理
  325. service.Send(&serverproto.GateTransmitAck{
  326. MsgId: uint32(info.ID),
  327. MsgData: msgData,
  328. ClientId: this.gateID.SessID,
  329. })
  330. return nil
  331. }
  332. func (this *ClientUser) Broadcast2Backend(msg interface{}) {
  333. this.serviceNode.RLock()
  334. defer this.serviceNode.RUnlock()
  335. //todo...
  336. // 发送到该用户绑定的后端节点上,也许有不同类型的服务器节点需绑定
  337. for _, node := range this.ServiceTargets {
  338. nodeSess := GetServiceNode(node.ServiceID)
  339. if nodeSess != nil {
  340. nodeSess.Send(msg)
  341. }
  342. }
  343. }
  344. // 绑定用户需要发送到后台服务器的节点信息
  345. func (this *ClientUser) SetServiceBackend(serviceName string, serviceID string) {
  346. this.serviceNode.Lock()
  347. defer this.serviceNode.Unlock()
  348. //每种类型的服务只会绑定一个节点
  349. if data, ok := this.ServiceTargets[serviceName]; ok {
  350. data.ServiceID = serviceID
  351. return
  352. }
  353. this.ServiceTargets[serviceName] = &ServiceBackend{
  354. ServiceName: serviceName,
  355. ServiceID: serviceID,
  356. }
  357. }
  358. // 根据服务器类型获取后端服务器节点信息
  359. func (this *ClientUser) GetServiceBackend(serviceName string) string {
  360. this.serviceNode.RLock()
  361. defer this.serviceNode.RUnlock()
  362. if data, ok := this.ServiceTargets[serviceName]; ok {
  363. return data.ServiceID
  364. }
  365. return ""
  366. }
  367. // /ClientUserManager
  368. type ClientUserManager struct {
  369. StateMachineCore
  370. //reconnectClientList sync.Map //[openId, cli]
  371. connectedClientList sync.Map //[Openid, cli]
  372. }
  373. func NewClientUserManager() *ClientUserManager {
  374. mag := &ClientUserManager{
  375. connectedClientList: sync.Map{},
  376. }
  377. mag.InitState()
  378. return mag
  379. }
  380. // /game节点挂了,踢掉这个game上的玩家
  381. func (this *ClientUserManager) OnLogicDisJoin(serviceId string) {
  382. //todo...
  383. //for _,cli :=range this.clientUserList {
  384. // if cli.GetServiceBackend(SERVICE_NODE_TYPE_GAME_STR) == serviceId {
  385. // }
  386. //}
  387. }
  388. func (this *ClientUserManager) AddClient(cliSession rocommon.Session, openId, platform string) *ClientUser {
  389. cli := NewUser(cliSession)
  390. cli.OpenId = openId
  391. cli.Platform = platform
  392. cli.SwitchState(CLIENT_STATE_CONNECTED, nil)
  393. return cli
  394. }
  395. func (this *ClientUserManager) GetConnectedFromOpenId(openId, platform string) *ClientUser {
  396. openId = ConvertPlatform(openId, platform)
  397. //如果是服务器重启可能不存在cliUser,这边需要做判空处理
  398. cli, ok := this.connectedClientList.Load(openId)
  399. if !ok {
  400. return nil
  401. }
  402. cliUser := cli.(*ClientUser)
  403. if cliUser != nil {
  404. return cliUser
  405. }
  406. return nil
  407. }
  408. func (this *ClientUserManager) RemoveConnectedFromOpenId(openId, platform string) {
  409. openId = ConvertPlatform(openId, platform)
  410. _, ok := this.connectedClientList.Load(openId)
  411. if !ok {
  412. return
  413. }
  414. this.connectedClientList.Delete(openId)
  415. }
  416. func (this *ClientUserManager) ClientUserReBindOpenId(oldOpenId, newOpenid, platform string, cli *ClientUser) {
  417. oldOpenId = ConvertPlatform(oldOpenId, platform)
  418. newOpenid = ConvertPlatform(newOpenid, platform)
  419. this.connectedClientList.Delete(oldOpenId)
  420. cli.OpenId = newOpenid
  421. cli.Platform = platform
  422. ClientMag.connectedClientList.Store(newOpenid, cli)
  423. }