| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- package service
- import (
- "rocommon"
- "rocommon/util"
- "runtime/debug"
- "sync"
- "time"
- )
- ////事件处理队列
- //type NetEventQueue interface {
- // StartQueue() NetEventQueue
- //
- // StopQueue() NetEventQueue
- //
- // Wait()
- //
- // PostCb(callback func())
- //
- // AttachUpdateModule(update rocommon.UpdateModule)
- //}
- //通用UpdateModule处理
- type CommonUpdateModule struct {
- }
- func (this *CommonUpdateModule) Init() {}
- func (this *CommonUpdateModule) Update(ms uint64) {}
- func NewEventQueue() rocommon.NetEventQueue {
- que := &eventQueue{
- onError: func(data interface{}) {
- util.InfoF("onError data:%v \n%s\n", data, string(debug.Stack()))
- //打印堆栈信息
- debug.PrintStack()
- },
- }
- //todo...
- //事件列表暂时容量为20000
- que.queList = make(chan interface{}, 20000)
- que.updateModule = &CommonUpdateModule{}
- return que
- }
- //eventQueue
- type eventQueue struct {
- wg sync.WaitGroup
- queList chan interface{} //目前用channel来代替 todo...
- onError func(interface{}) //打印奔溃处理
- updateModule rocommon.UpdateModule
- }
- func (this *eventQueue) AttachUpdateModule(update rocommon.UpdateModule) {
- //if this.updateModule != nil {
- // util.PanicF("update module has been attached !!!")
- //}
- if update != nil {
- update.Init()
- this.updateModule = update
- util.InfoF("update module attached success")
- }
- }
- var procNum int = 0
- var procNumTime time.Time
- var callbackNum int = 0
- var callbackTime time.Duration
- //处理回调队列主循环
- func (this *eventQueue) StartQueue() rocommon.NetEventQueue {
- this.wg.Add(1)
- //游戏服务器只有一个协程,机器人测试时会有DATE RACE
- //procNumTime = util.GetCurrentTimeNow()
- go func() {
- //log.Println("StartQueue goroutine")
- delayTimer := time.NewTimer(5 * time.Millisecond)
- for {
- delayTimer.Reset(5 * time.Millisecond)
- startUpTime := GetServiceStartupTime()
- if startUpTime > 0 {
- break
- }
- select {
- case <-delayTimer.C:
- }
- }
- //默认执行一次更新操作
- this.updateModule.Update(util.GetCurrentTime())
- nowTime1 := util.GetTimeMilliseconds()
- updateDelayTimer := time.NewTicker(5 * time.Millisecond)
- loop:
- for {
- //nowTime1 := util.GetTimeMilliseconds()
- //delayTimer.Reset(5 * time.Millisecond)
- exit := false
- select {
- case msg := <-this.queList:
- switch t := msg.(type) {
- case func():
- //procNum++
- this.queueCall(t)
- case nil:
- exit = true
- break loop //break //退出事件主循环
- //break //退出事件主循环
- }
- //case <-delayTimer.C:
- case <-updateDelayTimer.C:
- }
- //这边添加阶段判断,避免eventqueue中频繁的Update操作
- nowTime2 := util.GetTimeMilliseconds()
- if nowTime1+10 <= nowTime2 { //10ms
- nowTime1 = nowTime2
- this.updateModule.Update(nowTime2)
- }
- //1秒内处理的协议数量
- //this.AddProcNum(time.Now())
- //定时器update操作
- //callbackNum++
- //callbackTime += time.Now().Sub(now) //一个tick执行的消耗时间
- //nowTime := util.GetTimeMilliseconds()
- //delTime1 := nowTime2 - nowTime1
- //delTime2 := nowTime - nowTime1
- //if len(this.queList) > 100 {
- // util.DebugF("StartQueue deltime1=%v deltime2=%v quelen=%v", delTime1, delTime2, len(this.queList))
- //}
- if exit {
- break
- }
- }
- this.wg.Done()
- //util.InfoF("Exit Queue goroutine")
- }()
- return this
- }
- func (this *eventQueue) AddProcNum(nowTime time.Time) {
- if nowTime.Sub(procNumTime) > 1*time.Second {
- if callbackNum > 50 && procNum > 0 {
- util.InfoF("[1s] t=%v procNum=%v quelen=%v callbackNum=%v", nowTime.Sub(procNumTime), procNum,
- len(this.queList), callbackNum)
- }
- procNum = 0
- procNumTime = nowTime
- callbackTime = 0
- callbackNum = 0
- }
- }
- func (this *eventQueue) StopQueue() rocommon.NetEventQueue {
- this.queList <- nil
- return this
- }
- func (this *eventQueue) Wait() {
- this.wg.Wait()
- }
- func (this *eventQueue) PostCb(cb func()) {
- if cb != nil {
- this.queList <- cb
- }
- }
- func (this *eventQueue) queueCall(cb func()) {
- //todo...
- defer func() {
- //打印奔溃信息
- if err := recover(); err != nil {
- this.onError(err)
- }
- }()
- cb()
- }
|