msg.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package model
  2. import (
  3. "errors"
  4. "fmt"
  5. "rocommon"
  6. "rocommon/rpc"
  7. "rocommon/util"
  8. "roserver/serverproto"
  9. )
  10. ///////////////////////////////////////RecvGateMsgEvent
  11. //recv send event -> ProcEvent
  12. //来自gate的消息
  13. type RecvGateMsgEvent struct {
  14. Sess rocommon.Session
  15. Message interface{}
  16. ClientID uint64 //当前消息是来自哪个网关的客户端,gate回复消息时使用
  17. MsgSeqId uint32
  18. KvTime uint64
  19. }
  20. func (this *RecvGateMsgEvent) Session() rocommon.Session {
  21. return this.Sess
  22. }
  23. func (this *RecvGateMsgEvent) Msg() interface{} {
  24. return this.Message
  25. }
  26. func (this *RecvGateMsgEvent) SeqId() uint32 {
  27. return this.MsgSeqId
  28. }
  29. func (this *RecvGateMsgEvent) KVTime() uint64 {
  30. return 0
  31. }
  32. //接收到消息处理后并回复消息(如果需要回复调用该接口)
  33. func (this *RecvGateMsgEvent) Replay(msg interface{}) error {
  34. data, info, err := rpc.EncodeMessage(msg)
  35. if err != nil {
  36. return errors.New(fmt.Sprintf("replay msg encode err:%v", err))
  37. }
  38. //透传给gate服务器,然后再发送给客户端
  39. this.Sess.Send(&serverproto.ServiceTransmitAck{
  40. MsgId: uint32(info.ID),
  41. MsgData: data,
  42. ClientId: this.ClientID,
  43. })
  44. return nil
  45. }
  46. ///////////////////////////////////////RecvServiceMsgEvent
  47. //来自game的消息
  48. type RecvServiceMsgEvent struct {
  49. Sess rocommon.Session
  50. Message interface{}
  51. ClientID uint64
  52. ClientIDList []uint64
  53. ServiceID string
  54. IsMaster bool
  55. }
  56. func (this *RecvServiceMsgEvent) Session() rocommon.Session {
  57. return this.Sess
  58. }
  59. func (this *RecvServiceMsgEvent) Msg() interface{} {
  60. return this.Message
  61. }
  62. func (this *RecvServiceMsgEvent) SeqId() uint32 {
  63. return 0
  64. }
  65. func (this *RecvServiceMsgEvent) KVTime() uint64 {
  66. return 0
  67. }
  68. //接收到消息处理后并回复消息(如果需要回复调用该接口)
  69. func (this *RecvServiceMsgEvent) Replay(msg interface{}) error {
  70. data, info, err := rpc.EncodeMessage(msg)
  71. if err != nil {
  72. return errors.New(fmt.Sprintf("replay msg encode err:%v", err))
  73. }
  74. //todo...需要重新获取一次sess,session可能不存在
  75. this.Sess.Send(&serverproto.ServiceTransmitAck{
  76. MsgId: uint32(info.ID),
  77. MsgData: data,
  78. ClientId: this.ClientID,
  79. ClientIdList: this.ClientIDList,
  80. })
  81. return nil
  82. }
  83. ///////////////////////////////////////RecvRouterServiceMsgEvent
  84. //来自game的消息
  85. type RecvRouterServiceMsgEvent struct {
  86. Sess rocommon.Session
  87. Message interface{}
  88. ClientID uint64
  89. ClientIDList []uint64
  90. ServiceID string
  91. IsMaster bool
  92. FromZone int32
  93. }
  94. func (this *RecvRouterServiceMsgEvent) Session() rocommon.Session {
  95. return this.Sess
  96. }
  97. func (this *RecvRouterServiceMsgEvent) Msg() interface{} {
  98. return this.Message
  99. }
  100. func (this *RecvRouterServiceMsgEvent) SeqId() uint32 {
  101. return 0
  102. }
  103. func (this *RecvRouterServiceMsgEvent) KVTime() uint64 {
  104. return 0
  105. }
  106. //接收到消息处理后并回复消息(如果需要回复调用该接口)
  107. func (this *RecvRouterServiceMsgEvent) Replay(msg interface{}) error {
  108. data, info, err := rpc.EncodeMessage(msg)
  109. if err != nil {
  110. return errors.New(fmt.Sprintf("replay msg encode err=%v msg=%v", err, msg))
  111. }
  112. this.Sess.Send(&serverproto.ServiceTransmitRouterNtf{
  113. MsgId: uint32(info.ID),
  114. MsgData: data,
  115. ClientId: this.ClientID,
  116. FromZone: this.FromZone,
  117. })
  118. return nil
  119. }
  120. func HandleBackendMessage(userHandler func(ev rocommon.ProcEvent, cliID ClientID)) func(ev rocommon.ProcEvent) {
  121. return func(e rocommon.ProcEvent) {
  122. //util.InfoF("receive msg=%v |%v", e.Msg(), reflect.TypeOf(e.Msg()))
  123. switch in := e.(type) {
  124. case *RecvGateMsgEvent: //BackendTCPEventHook
  125. cId := ClientID{}
  126. cId.SessID = in.ClientID
  127. if ctx := Session2Context(e.Session()); ctx != nil {
  128. cId.ServiceID = ctx.ID
  129. }
  130. userHandler(in, cId)
  131. //from benchmark test
  132. //begin
  133. //tmpInfo := rocommon.MessageInfoByMsg(e.Msg())
  134. //tmpData, _ := tmpInfo.Codec.Marshal(e.Msg())
  135. //tmpDataStr := base64.StdEncoding.EncodeToString((tmpData).([]byte))
  136. //nowTime := util.GetTimeMilliseconds()
  137. //if tmpInfo.ID > 1010 {
  138. // util.InfoF("receivemsg msgid=%v msgdata=%v msgtime=%v msg=%v msgtype=%v", tmpInfo.ID, tmpDataStr, nowTime, e.Msg(), reflect.TypeOf(e.Msg()))
  139. //}
  140. //end
  141. case *RecvServiceMsgEvent:
  142. cId := ClientID{
  143. SessID: in.ClientID,
  144. ServiceID: in.ServiceID, //来自哪个服务器节点的信息
  145. SessIdList: in.ClientIDList,
  146. }
  147. userHandler(in, cId)
  148. case *RecvRouterServiceMsgEvent:
  149. cId := ClientID{
  150. SessID: in.ClientID,
  151. ServiceID: in.ServiceID, //来自哪个服务器节点的信息
  152. SessIdList: in.ClientIDList,
  153. }
  154. userHandler(in, cId)
  155. }
  156. }
  157. }
  158. func ServiceReplay(ev rocommon.ProcEvent, msg interface{}) {
  159. if e, ok := ev.(rocommon.ReplayEvent); ok {
  160. err := e.Replay(msg)
  161. if err != nil {
  162. util.InfoF("replay msg err:%v", err.Error())
  163. } else {
  164. //log.Println("replay msg ok:", msg)
  165. }
  166. } else {
  167. util.PanicF("the given event must be a ReplayEvent!!! msg:%v", msg)
  168. }
  169. }