eventqueue.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. package service
  2. import (
  3. "rocommon"
  4. "rocommon/util"
  5. "runtime/debug"
  6. "sync"
  7. "time"
  8. )
  9. ////事件处理队列
  10. //type NetEventQueue interface {
  11. // StartQueue() NetEventQueue
  12. //
  13. // StopQueue() NetEventQueue
  14. //
  15. // Wait()
  16. //
  17. // PostCb(callback func())
  18. //
  19. // AttachUpdateModule(update rocommon.UpdateModule)
  20. //}
  21. //通用UpdateModule处理
  22. type CommonUpdateModule struct {
  23. }
  24. func (this *CommonUpdateModule) Init() {}
  25. func (this *CommonUpdateModule) Update(ms uint64) {}
  26. func NewEventQueue() rocommon.NetEventQueue {
  27. que := &eventQueue{
  28. onError: func(data interface{}) {
  29. util.InfoF("onError data:%v \n%s\n", data, string(debug.Stack()))
  30. //打印堆栈信息
  31. debug.PrintStack()
  32. },
  33. }
  34. //todo...
  35. //事件列表暂时容量为20000
  36. que.queList = make(chan interface{}, 20000)
  37. que.updateModule = &CommonUpdateModule{}
  38. return que
  39. }
  40. //eventQueue
  41. type eventQueue struct {
  42. wg sync.WaitGroup
  43. queList chan interface{} //目前用channel来代替 todo...
  44. onError func(interface{}) //打印奔溃处理
  45. updateModule rocommon.UpdateModule
  46. }
  47. func (this *eventQueue) AttachUpdateModule(update rocommon.UpdateModule) {
  48. //if this.updateModule != nil {
  49. // util.PanicF("update module has been attached !!!")
  50. //}
  51. if update != nil {
  52. update.Init()
  53. this.updateModule = update
  54. util.InfoF("update module attached success")
  55. }
  56. }
  57. var procNum int = 0
  58. var procNumTime time.Time
  59. var callbackNum int = 0
  60. var callbackTime time.Duration
  61. //处理回调队列主循环
  62. func (this *eventQueue) StartQueue() rocommon.NetEventQueue {
  63. this.wg.Add(1)
  64. //游戏服务器只有一个协程,机器人测试时会有DATE RACE
  65. //procNumTime = util.GetCurrentTimeNow()
  66. go func() {
  67. //log.Println("StartQueue goroutine")
  68. delayTimer := time.NewTimer(5 * time.Millisecond)
  69. for {
  70. delayTimer.Reset(5 * time.Millisecond)
  71. startUpTime := GetServiceStartupTime()
  72. if startUpTime > 0 {
  73. break
  74. }
  75. select {
  76. case <-delayTimer.C:
  77. }
  78. }
  79. //默认执行一次更新操作
  80. this.updateModule.Update(util.GetCurrentTime())
  81. nowTime1 := util.GetTimeMilliseconds()
  82. updateDelayTimer := time.NewTicker(5 * time.Millisecond)
  83. loop:
  84. for {
  85. //nowTime1 := util.GetTimeMilliseconds()
  86. //delayTimer.Reset(5 * time.Millisecond)
  87. exit := false
  88. select {
  89. case msg := <-this.queList:
  90. switch t := msg.(type) {
  91. case func():
  92. //procNum++
  93. this.queueCall(t)
  94. case nil:
  95. exit = true
  96. break loop //break //退出事件主循环
  97. //break //退出事件主循环
  98. }
  99. //case <-delayTimer.C:
  100. case <-updateDelayTimer.C:
  101. }
  102. //这边添加阶段判断,避免eventqueue中频繁的Update操作
  103. nowTime2 := util.GetTimeMilliseconds()
  104. if nowTime1+10 <= nowTime2 { //10ms
  105. nowTime1 = nowTime2
  106. this.updateModule.Update(nowTime2)
  107. }
  108. //1秒内处理的协议数量
  109. //this.AddProcNum(time.Now())
  110. //定时器update操作
  111. //callbackNum++
  112. //callbackTime += time.Now().Sub(now) //一个tick执行的消耗时间
  113. //nowTime := util.GetTimeMilliseconds()
  114. //delTime1 := nowTime2 - nowTime1
  115. //delTime2 := nowTime - nowTime1
  116. //if len(this.queList) > 100 {
  117. // util.DebugF("StartQueue deltime1=%v deltime2=%v quelen=%v", delTime1, delTime2, len(this.queList))
  118. //}
  119. if exit {
  120. break
  121. }
  122. }
  123. this.wg.Done()
  124. //util.InfoF("Exit Queue goroutine")
  125. }()
  126. return this
  127. }
  128. func (this *eventQueue) AddProcNum(nowTime time.Time) {
  129. if nowTime.Sub(procNumTime) > 1*time.Second {
  130. if callbackNum > 50 && procNum > 0 {
  131. util.InfoF("[1s] t=%v procNum=%v quelen=%v callbackNum=%v", nowTime.Sub(procNumTime), procNum,
  132. len(this.queList), callbackNum)
  133. }
  134. procNum = 0
  135. procNumTime = nowTime
  136. callbackTime = 0
  137. callbackNum = 0
  138. }
  139. }
  140. func (this *eventQueue) StopQueue() rocommon.NetEventQueue {
  141. this.queList <- nil
  142. return this
  143. }
  144. func (this *eventQueue) Wait() {
  145. this.wg.Wait()
  146. }
  147. func (this *eventQueue) PostCb(cb func()) {
  148. if cb != nil {
  149. this.queList <- cb
  150. }
  151. }
  152. func (this *eventQueue) queueCall(cb func()) {
  153. //todo...
  154. defer func() {
  155. //打印奔溃信息
  156. if err := recover(); err != nil {
  157. this.onError(err)
  158. }
  159. }()
  160. cb()
  161. }