| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190 |
- package model
- import (
- "errors"
- "fmt"
- "rocommon"
- "rocommon/rpc"
- "rocommon/util"
- "roserver/serverproto"
- )
- ///////////////////////////////////////RecvGateMsgEvent
- //recv send event -> ProcEvent
- //来自gate的消息
- type RecvGateMsgEvent struct {
- Sess rocommon.Session
- Message interface{}
- ClientID uint64 //当前消息是来自哪个网关的客户端,gate回复消息时使用
- MsgSeqId uint32
- KvTime uint64
- }
- func (this *RecvGateMsgEvent) Session() rocommon.Session {
- return this.Sess
- }
- func (this *RecvGateMsgEvent) Msg() interface{} {
- return this.Message
- }
- func (this *RecvGateMsgEvent) SeqId() uint32 {
- return this.MsgSeqId
- }
- func (this *RecvGateMsgEvent) KVTime() uint64 {
- return 0
- }
- //接收到消息处理后并回复消息(如果需要回复调用该接口)
- func (this *RecvGateMsgEvent) Replay(msg interface{}) error {
- data, info, err := rpc.EncodeMessage(msg)
- if err != nil {
- return errors.New(fmt.Sprintf("replay msg encode err:%v", err))
- }
- //透传给gate服务器,然后再发送给客户端
- this.Sess.Send(&serverproto.ServiceTransmitAck{
- MsgId: uint32(info.ID),
- MsgData: data,
- ClientId: this.ClientID,
- })
- return nil
- }
- ///////////////////////////////////////RecvServiceMsgEvent
- //来自game的消息
- type RecvServiceMsgEvent struct {
- Sess rocommon.Session
- Message interface{}
- ClientID uint64
- ClientIDList []uint64
- ServiceID string
- IsMaster bool
- }
- func (this *RecvServiceMsgEvent) Session() rocommon.Session {
- return this.Sess
- }
- func (this *RecvServiceMsgEvent) Msg() interface{} {
- return this.Message
- }
- func (this *RecvServiceMsgEvent) SeqId() uint32 {
- return 0
- }
- func (this *RecvServiceMsgEvent) KVTime() uint64 {
- return 0
- }
- //接收到消息处理后并回复消息(如果需要回复调用该接口)
- func (this *RecvServiceMsgEvent) Replay(msg interface{}) error {
- data, info, err := rpc.EncodeMessage(msg)
- if err != nil {
- return errors.New(fmt.Sprintf("replay msg encode err:%v", err))
- }
- //todo...需要重新获取一次sess,session可能不存在
- this.Sess.Send(&serverproto.ServiceTransmitAck{
- MsgId: uint32(info.ID),
- MsgData: data,
- ClientId: this.ClientID,
- ClientIdList: this.ClientIDList,
- })
- return nil
- }
- ///////////////////////////////////////RecvRouterServiceMsgEvent
- //来自game的消息
- type RecvRouterServiceMsgEvent struct {
- Sess rocommon.Session
- Message interface{}
- ClientID uint64
- ClientIDList []uint64
- ServiceID string
- IsMaster bool
- FromZone int32
- }
- func (this *RecvRouterServiceMsgEvent) Session() rocommon.Session {
- return this.Sess
- }
- func (this *RecvRouterServiceMsgEvent) Msg() interface{} {
- return this.Message
- }
- func (this *RecvRouterServiceMsgEvent) SeqId() uint32 {
- return 0
- }
- func (this *RecvRouterServiceMsgEvent) KVTime() uint64 {
- return 0
- }
- //接收到消息处理后并回复消息(如果需要回复调用该接口)
- func (this *RecvRouterServiceMsgEvent) Replay(msg interface{}) error {
- data, info, err := rpc.EncodeMessage(msg)
- if err != nil {
- return errors.New(fmt.Sprintf("replay msg encode err=%v msg=%v", err, msg))
- }
- this.Sess.Send(&serverproto.ServiceTransmitRouterNtf{
- MsgId: uint32(info.ID),
- MsgData: data,
- ClientId: this.ClientID,
- FromZone: this.FromZone,
- })
- return nil
- }
- func HandleBackendMessage(userHandler func(ev rocommon.ProcEvent, cliID ClientID)) func(ev rocommon.ProcEvent) {
- return func(e rocommon.ProcEvent) {
- //util.InfoF("receive msg=%v |%v", e.Msg(), reflect.TypeOf(e.Msg()))
- switch in := e.(type) {
- case *RecvGateMsgEvent: //BackendTCPEventHook
- cId := ClientID{}
- cId.SessID = in.ClientID
- if ctx := Session2Context(e.Session()); ctx != nil {
- cId.ServiceID = ctx.ID
- }
- userHandler(in, cId)
- //from benchmark test
- //begin
- //tmpInfo := rocommon.MessageInfoByMsg(e.Msg())
- //tmpData, _ := tmpInfo.Codec.Marshal(e.Msg())
- //tmpDataStr := base64.StdEncoding.EncodeToString((tmpData).([]byte))
- //nowTime := util.GetTimeMilliseconds()
- //if tmpInfo.ID > 1010 {
- // util.InfoF("receivemsg msgid=%v msgdata=%v msgtime=%v msg=%v msgtype=%v", tmpInfo.ID, tmpDataStr, nowTime, e.Msg(), reflect.TypeOf(e.Msg()))
- //}
- //end
- case *RecvServiceMsgEvent:
- cId := ClientID{
- SessID: in.ClientID,
- ServiceID: in.ServiceID, //来自哪个服务器节点的信息
- SessIdList: in.ClientIDList,
- }
- userHandler(in, cId)
- case *RecvRouterServiceMsgEvent:
- cId := ClientID{
- SessID: in.ClientID,
- ServiceID: in.ServiceID, //来自哪个服务器节点的信息
- SessIdList: in.ClientIDList,
- }
- userHandler(in, cId)
- }
- }
- }
- func ServiceReplay(ev rocommon.ProcEvent, msg interface{}) {
- if e, ok := ev.(rocommon.ReplayEvent); ok {
- err := e.Replay(msg)
- if err != nil {
- util.InfoF("replay msg err:%v", err.Error())
- } else {
- //log.Println("replay msg ok:", msg)
- }
- } else {
- util.PanicF("the given event must be a ReplayEvent!!! msg:%v", msg)
- }
- }
|