timewheel.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. package util
  2. import (
  3. "container/list"
  4. )
  5. // https://github.com/ouqiang/timewheel/blob/master/timewheel.go
  6. type TimeWheel struct {
  7. interval uint64
  8. //tick *time.Ticker
  9. slots []*list.List //后续优化跳表实现
  10. //[key定时器唯一标识,slotnum定时器所在槽位]
  11. timer map[interface{}]int
  12. currentIdx int //当前指针所在槽位
  13. slotNum int
  14. Callback func(twTask *TWTask, ms uint64)
  15. oldMs uint64
  16. }
  17. const (
  18. TWTASK_TYPE_Save = 1 //role save操作
  19. )
  20. type TWTask struct {
  21. Delay uint64
  22. circle int
  23. Key interface{}
  24. Data interface{}
  25. Callback func(interface{})
  26. Uid uint64
  27. CallbackType int32
  28. Repeated bool
  29. }
  30. // interval ms
  31. func NewTimeWheel(interval uint64, slotNum int) *TimeWheel {
  32. if interval <= 0 {
  33. return nil
  34. }
  35. tw := &TimeWheel{
  36. interval: interval,
  37. slots: make([]*list.List, slotNum),
  38. currentIdx: 0,
  39. slotNum: slotNum,
  40. timer: map[interface{}]int{},
  41. }
  42. tw.initSlots()
  43. return tw
  44. }
  45. func (this *TimeWheel) initSlots() {
  46. for idx := 0; idx < this.slotNum; idx++ {
  47. this.slots[idx] = list.New()
  48. }
  49. }
  50. //func (this *TimeWheel) Start() {
  51. // //this.tick = time.NewTicker(this.interval)
  52. // //this.Start()
  53. //}
  54. func (this *TimeWheel) Update(ms uint64) {
  55. if this.oldMs <= 0 {
  56. this.oldMs = ms
  57. return
  58. }
  59. for {
  60. if this.oldMs > ms {
  61. this.oldMs = ms
  62. }
  63. delaTime := ms - this.oldMs
  64. if delaTime < this.interval {
  65. return
  66. }
  67. this.oldMs += this.interval
  68. this.update(ms)
  69. }
  70. }
  71. func (this *TimeWheel) update(ms uint64) {
  72. slotIdxList := this.slots[this.currentIdx]
  73. for item := slotIdxList.Front(); item != nil; {
  74. task := item.Value.(*TWTask)
  75. if task.circle > 0 {
  76. task.circle--
  77. item = item.Next()
  78. continue
  79. }
  80. this.Callback(task, ms)
  81. next := item.Next()
  82. if task.Key != nil {
  83. delete(this.timer, task.Key)
  84. //添加到新的槽位节点上,继续触发事件
  85. slotIdxList.Remove(item)
  86. if task.Repeated {
  87. this.AddTask(task)
  88. }
  89. }
  90. item = next
  91. }
  92. if this.currentIdx >= this.slotNum-1 {
  93. this.currentIdx = 0
  94. } else {
  95. this.currentIdx++
  96. }
  97. }
  98. func (this *TimeWheel) AddTask(task *TWTask) bool {
  99. _, ok := this.timer[task.Key]
  100. if ok {
  101. return false
  102. }
  103. idx, circle := this.getIdxAndCircle(task.Delay)
  104. task.circle = circle
  105. this.slots[idx].PushBack(task)
  106. if task.Key != nil {
  107. this.timer[task.Key] = idx
  108. }
  109. return true
  110. }
  111. func (this *TimeWheel) getIdxAndCircle(taskDuration uint64) (int, int) {
  112. tmpVal := int(taskDuration / this.interval)
  113. circle := int(tmpVal / this.slotNum)
  114. idx := (this.currentIdx + tmpVal) % this.slotNum
  115. return idx, circle
  116. }
  117. func (this *TimeWheel) RemoveTask(key interface{}) {
  118. idx, ok := this.timer[key]
  119. if !ok {
  120. return
  121. }
  122. slotIdxList := this.slots[idx]
  123. for item := slotIdxList.Front(); item != nil; {
  124. task := item.Value.(*TWTask)
  125. if task.Key == key {
  126. delete(this.timer, task.Key)
  127. slotIdxList.Remove(item)
  128. }
  129. item = item.Next()
  130. }
  131. }