package model import ( "github.com/gorilla/websocket" "io" "net" "rocommon" "rocommon/rpc" "rocommon/service" "rocommon/socket" _ "rocommon/socket/tcp" "rocommon/util" "roserver/baseserver/model" "roserver/baseserver/router" "roserver/serverproto" "strings" ) // 消息解析操作 // MessageProcessor def.go // TCPMessageProcessor procrpc.go 基础实现方法 type DirectTCPMessageProcessor struct { } // recv直接原始数据传递到后端 / 返回消息给client func (this *DirectTCPMessageProcessor) OnRecvMsg(s rocommon.Session) (msg interface{}, seqId uint32, err error) { reader, ok := s.Raw().(io.Reader) if !ok || reader == nil { util.InfoF("[DirectTCPMessageProcessor] OnRecvMsg err") return nil, 0, nil } //opt := s.Node().(socket.SocketOption) var opt socket.SocketOption if s.GetSessionOptFlag() { opt = s.Node().(socket.SocketOption) } else { opt = s.GetSessionOpt().(socket.SocketOption) } opt.SocketReadTimeout(reader.(net.Conn), func() { var msgId uint16 var seqId uint32 //包序列号,客户端发送时的序列从1开始 var msgData []byte var flagId uint16 //加密方式 //获取原始格式数据包(pb序列化的二进制数据) msgId, seqId, flagId, msgData, err = rpc.RecvPackageData(reader, opt.MaxMsgLen()) //尝试直接发送到其他后端服务器或者解析 if err == nil { msg, err = FrontendPackageProc(int(msgId), seqId, flagId, msgData, s) } }) return } // send 直接发往客户端的消息 func (this *DirectTCPMessageProcessor) OnSendMsg(s rocommon.Session, msg interface{}) (err error) { writer, ok := s.Raw().(io.Writer) if !ok || writer == nil { util.InfoF("[DirectTCPMessageProcessor] OnSendMsg err") return nil } //opt := s.Node().(socket.SocketOption) var opt socket.SocketOption if s.GetSessionOptFlag() { opt = s.Node().(socket.SocketOption) } else { opt = s.GetSessionOpt().(socket.SocketOption) } opt.SocketWriteTimeout(writer.(net.Conn), func() { err = rpc.SendMessage(writer, msg, s.GetAES(), opt.MaxMsgLen(), "") }) return } var ( PingReqMsgID = rocommon.MessageInfoByName("CSPingReq").ID LoginReqMsgId = rocommon.MessageInfoByName("CSLoginReq").ID ReconnectMsgId = rocommon.MessageInfoByName("CSReconnectReq").ID //CSPlayerMoveReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveReq").ID //CSPlayerMoveToReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveToReq").ID CSPlayerGMReqMsgId = rocommon.MessageInfoByName("CSGMCommandReq").ID ) // 尝试直接发送到其他后端服务器或者解析后直接回复client func FrontendPackageProc(msgId int, seqId uint32, flagId uint16, msgData []byte, s rocommon.Session) (msg interface{}, err error) { switch flagId { case 1: //rsa加密的数据 msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateKey) if err != nil { return nil, err } case 2: //aes加密的数据 //msgData, err = rpcc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...) msgData, err = rpc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...) if err != nil { return nil, err } } //gate接收到客户端消息的处理部分 switch int(msgId) { case PingReqMsgID, LoginReqMsgId, ReconnectMsgId: msg, _, err = rpc.DecodeMessage(msgId, msgData) if err != nil { util.InfoF("FrontendPackageProc err=%v msgId=%v flagId=%v", err, msgId, flagId) return nil, err } //switch msgType := msg.(type) { switch in := msg.(type) { case *serverproto.CSPingReq: //登陆成功后开始发送pingreq消息 //log.Println("[FrontendPackageProc] PingReq->PingAck:", msgType) s.Send(&serverproto.SCPingAck{}) case *serverproto.CSLoginReq: //in.OpenId = in.OpenId + "@" + in.Platform // 加密处理 //s.SetAES(in.CryptPass) //发送给auth进行验证,操作并获取是否有断线数据(断线重连使用) //第一个到gate的消息,绑定gameserver // 通过auth服务器来验证用户的合法性,并返回之前是否已经绑定过一个gameserver,可能 // 有离线数据(如果重新绑定新的gameserver会有数据一致性问题),这边暂时处理成直接绑定第一个 switch rawType := s.Raw().(type) { case net.Conn: in.Ip = rawType.RemoteAddr().String() case *websocket.Conn: in.Ip = rawType.RemoteAddr().String() } util.WarnF("CSLoginReq msg=%v sessionid=%v ip=%v", msg, s.ID(), in.Ip) //authNodeStr := model.SelectServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR) authNodeStr := model.SelectAuthServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR, s.ID()) tmpNode := model.GetServiceNode(authNodeStr) if tmpNode == nil { model.RemoveServiceNodeByName(authNodeStr) authNodeStr, _ = model.GetServiceNodeAndSession("", model.SERVICE_NODE_TYPE_AUTH_STR, s.ID()) } cliUser, err := model.BindClientToBackendNew(authNodeStr, s, in.OpenId, in.Platform) if err == nil { newMsgData, _, _ := rpc.EncodeMessage(in) cliUser.LoginData = newMsgData //绑定成功 //转发给对应的服务器做处理,例如game,auth err = cliUser.ClientDirect2Backend(authNodeStr, int(msgId), seqId, newMsgData, model.SERVICE_NODE_TYPE_AUTH_STR) if err != nil { util.WarnF("CSLoginReq ClientDirect2Backend err=%v auth=%v msg=%v", err, authNodeStr, msg) s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)}) } } else { util.WarnF("CSLoginReq BindClientToBackendNew err=%v auth=%v msg=%v", err, authNodeStr, msg) s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)}) } case *serverproto.CSReconnectReq: ////加密处理 //s.SetAES(in.CryptPass) ////发送消息给game,重新进行session绑定 //cliUser := model.ClientMag.GetReconnectFromOpenId(in.OpenId) //err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0, // msgData, model.SERVICE_NODE_TYPE_GAME_STR) //if err != nil { // //重连超时,不允许继续重连 // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)}) // baseutil.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg) //} // //ret := cliUser.ReconnectReset(s) //加密处理 //s.SetAES(in.CryptPass) // 1.后续sdk需要拿token去服务器进行验证,避免盗号 util.WarnF("CSReconnectReq msg=%v sessionId=%v", msg, s.ID()) cliUser := model.ClientMag.GetConnectedFromOpenId(in.OpenId, in.Platform) if cliUser == nil { //该玩家没有登录过无法进行断线重连 s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)}) util.InfoF("CSReconnectReq cliUser=%v msg=%v user not connected before", cliUser, msg) } else { //if cliUser.IsReconnectExpire() { // //重连超时,不允许继续重连 / 没有断线不允许发送重连消息 // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)}) // util.InfoF("CSReconnectReq cliUser=%v msg=%v user connect time out", cliUser, msg) // return //} ret := cliUser.ConnectReset(s) if ret != nil { //session绑定出错 s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)}) util.InfoF("CSReconnectReqErr msg=%v session bind use err=%v", msg, ret) s.Close() return } //发送消息给game,重新进行session绑定 err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0, msgData, model.SERVICE_NODE_TYPE_GAME_STR) if err != nil { //重连超时,不允许继续重连 s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)}) util.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg) } else { s.SetSessionOptFlag(true) util.WarnF("CSReconnectReq success msg=%v sessionid=%v", msg, s.ID()) } } } default: //gm白名单处理 if msgId == CSPlayerGMReqMsgId && len(service.GetServiceConfig().Node.WhiteListGM) >= 0 { bFind := false gmIPStr := "" switch t := s.Raw().(type) { case net.Conn: gmIPStr = t.RemoteAddr().String() case *websocket.Conn: gmIPStr = t.RemoteAddr().String() } //gmIPStr := s.Raw().(net.Conn).RemoteAddr().String() for idx := 0; idx < len(service.GetServiceConfig().Node.WhiteListGM); idx++ { if strings.Contains(gmIPStr, service.GetServiceConfig().Node.WhiteListGM[idx]) { bFind = true break } if strings.Contains("0.0.0.0", service.GetServiceConfig().Node.WhiteListGM[idx]) { bFind = true break } } if !bFind { s.Close() return } } //todo...目前暂时只针对gate做了透传,后期需要对所有服务器节点都可以执行透传 routeRule := router.GetRuleByMsgID(msgId) if routeRule == nil { util.ErrorF("message not in route table, msgid=%v", msgId) //return nil, fmt.Errorf("message not in route table, msgid=%v", msgId) return nil, nil } //查看当前session对应的用户是否存在,否则不处理 cliUser := model.Session2User(s) //cliUser.CheckAoi(msgId, CSPlayerMoveReqMsgId, CSPlayerMoveToReqMsgId, msgData) if cliUser != nil { //这边需要加锁(在调用GetServiceBackend时使用了读写锁) serviceId := cliUser.GetServiceBackend(routeRule.Mod) if serviceId == "" { //还没有绑定game/battlerecord节点就发送了别的消息 //非正常流程导致,可能节点暂时未绑定 if routeRule.Mod != model.SERVICE_NODE_TYPE_BATTLERECORD_STR { s.Close() } } else { //打印增加多线程冲突 //baseutil.InfoF("receive other msg send to game server serviceId=%v msgid=%v", serviceId, msgId) //包学序列号合法校验,避免发送重复的包 //if cliUser.SeqId >= seqId { // ntfMsg := &serverproto.SCKickOutNtf{ // Error: int32(serverproto.ErrorCode_ERROR_FAIL), // } // // s.Send(ntfMsg) // s.(rocommon.ContextSet).SetContextData("user", nil, "SSUserKickNtf") // return //} //cliUser.SeqId = seqId //转发给后台服务器 err = cliUser.ClientDirect2Backend(serviceId, msgId, seqId, msgData, routeRule.Mod) if err != nil { util.WarnF("ClientDirect2Backend sessionid=%v msgid=%v err=%v", s.ID(), msgId, err) //非正常流程导致,可能节点暂时未绑定 model.ClientMag.RemoveConnectedFromOpenId(cliUser.OpenId, cliUser.Platform) s.Close() } } } else { //todo...是否需要断开连接 util.InfoF("cliUser not exist s=%v msgId=%v", s, msgId) } } return }