| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- package model
- import (
- "github.com/gorilla/websocket"
- "io"
- "net"
- "rocommon"
- "rocommon/rpc"
- "rocommon/service"
- "rocommon/socket"
- _ "rocommon/socket/tcp"
- "rocommon/util"
- "roserver/baseserver/model"
- "roserver/baseserver/router"
- "roserver/serverproto"
- "strings"
- )
- //消息解析操作
- //MessageProcessor def.go
- //TCPMessageProcessor procrpc.go 基础实现方法
- type DirectTCPMessageProcessor struct {
- }
- //recv直接原始数据传递到后端 / 返回消息给client
- func (this *DirectTCPMessageProcessor) OnRecvMsg(s rocommon.Session) (msg interface{}, seqId uint32, err error) {
- reader, ok := s.Raw().(io.Reader)
- if !ok || reader == nil {
- util.InfoF("[DirectTCPMessageProcessor] OnRecvMsg err")
- return nil, 0, nil
- }
- //opt := s.Node().(socket.SocketOption)
- var opt socket.SocketOption
- if s.GetSessionOptFlag() {
- opt = s.Node().(socket.SocketOption)
- } else {
- opt = s.GetSessionOpt().(socket.SocketOption)
- }
- opt.SocketReadTimeout(reader.(net.Conn), func() {
- var msgId uint16
- var seqId uint32 //包序列号,客户端发送时的序列从1开始
- var msgData []byte
- var flagId uint16 //加密方式
- //获取原始格式数据包(pb序列化的二进制数据)
- msgId, seqId, flagId, msgData, err = rpc.RecvPackageData(reader, opt.MaxMsgLen())
- //尝试直接发送到其他后端服务器或者解析
- if err == nil {
- msg, err = FrontendPackageProc(int(msgId), seqId, flagId, msgData, s)
- }
- })
- return
- }
- //send 直接发往客户端的消息
- func (this *DirectTCPMessageProcessor) OnSendMsg(s rocommon.Session, msg interface{}) (err error) {
- writer, ok := s.Raw().(io.Writer)
- if !ok || writer == nil {
- util.InfoF("[DirectTCPMessageProcessor] OnSendMsg err")
- return nil
- }
- //opt := s.Node().(socket.SocketOption)
- var opt socket.SocketOption
- if s.GetSessionOptFlag() {
- opt = s.Node().(socket.SocketOption)
- } else {
- opt = s.GetSessionOpt().(socket.SocketOption)
- }
- opt.SocketWriteTimeout(writer.(net.Conn), func() {
- err = rpc.SendMessage(writer, msg, s.GetAES(), opt.MaxMsgLen(), "")
- })
- return
- }
- var (
- PingReqMsgID = rocommon.MessageInfoByName("CSPingReq").ID
- LoginReqMsgId = rocommon.MessageInfoByName("CSLoginReq").ID
- ReconnectMsgId = rocommon.MessageInfoByName("CSReconnectReq").ID
- //CSPlayerMoveReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveReq").ID
- //CSPlayerMoveToReqMsgId = rocommon.MessageInfoByName("CSPlayerMoveToReq").ID
- CSPlayerGMReqMsgId = rocommon.MessageInfoByName("CSGMCommandReq").ID
- )
- //尝试直接发送到其他后端服务器或者解析后直接回复client
- func FrontendPackageProc(msgId int, seqId uint32, flagId uint16, msgData []byte, s rocommon.Session) (msg interface{}, err error) {
- switch flagId {
- case 1: //rsa加密的数据
- msgData, err = rpc.RSADecrypt(msgData, rpc.PrivateKey)
- if err != nil {
- return nil, err
- }
- case 2: //aes加密的数据
- //msgData, err = rpc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...)
- msgData, err = rpc.AESCtrDecrypt(msgData, *s.GetAES(), *s.GetAES()...)
- if err != nil {
- return nil, err
- }
- }
- //gate接收到客户端消息的处理部分
- switch int(msgId) {
- case PingReqMsgID, LoginReqMsgId, ReconnectMsgId:
- msg, _, err = rpc.DecodeMessage(msgId, msgData)
- if err != nil {
- util.InfoF("FrontendPackageProc err=%v msgId=%v flagId=%v", err, msgId, flagId)
- return nil, err
- }
- //switch msgType := msg.(type) {
- switch in := msg.(type) {
- case *serverproto.CSPingReq:
- //登陆成功后开始发送pingreq消息
- //log.Println("[FrontendPackageProc] PingReq->PingAck:", msgType)
- s.Send(&serverproto.SCPingAck{})
- case *serverproto.CSLoginReq:
- //in.OpenId = in.OpenId + "@" + in.Platform
- // 加密处理
- //s.SetAES(in.CryptPass)
- //发送给auth进行验证,操作并获取是否有断线数据(断线重连使用)
- //第一个到gate的消息,绑定gameserver
- // 通过auth服务器来验证用户的合法性,并返回之前是否已经绑定过一个gameserver,可能
- // 有离线数据(如果重新绑定新的gameserver会有数据一致性问题),这边暂时处理成直接绑定第一个
- switch rawType := s.Raw().(type) {
- case net.Conn:
- in.Ip = rawType.RemoteAddr().String()
- case *websocket.Conn:
- in.Ip = rawType.RemoteAddr().String()
- }
- util.WarnF("CSLoginReq msg=%v sessionid=%v ip=%v", msg, s.ID(), in.Ip)
- //authNodeStr := model.SelectServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR)
- authNodeStr := model.SelectAuthServiceNode(model.SERVICE_NODE_TYPE_AUTH_STR, s.ID())
- tmpNode := model.GetServiceNode(authNodeStr)
- if tmpNode == nil {
- model.RemoveServiceNodeByName(authNodeStr)
- authNodeStr, _ = model.GetServiceNodeAndSession("", model.SERVICE_NODE_TYPE_AUTH_STR, s.ID())
- }
- cliUser, err := model.BindClientToBackendNew(authNodeStr, s, in.OpenId, in.Platform)
- if err == nil {
- newMsgData, _, _ := rpc.EncodeMessage(in)
- cliUser.LoginData = newMsgData
- //绑定成功
- //转发给对应的服务器做处理,例如game,auth
- err = cliUser.ClientDirect2Backend(authNodeStr, int(msgId), seqId, newMsgData, model.SERVICE_NODE_TYPE_AUTH_STR)
- if err != nil {
- util.WarnF("CSLoginReq ClientDirect2Backend err=%v auth=%v msg=%v", err, authNodeStr, msg)
- s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
- }
- } else {
- util.WarnF("CSLoginReq BindClientToBackendNew err=%v auth=%v msg=%v", err, authNodeStr, msg)
- s.Send(&serverproto.SCLoginAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
- }
- case *serverproto.CSReconnectReq:
- ////加密处理
- //s.SetAES(in.CryptPass)
- ////发送消息给game,重新进行session绑定
- //cliUser := model.ClientMag.GetReconnectFromOpenId(in.OpenId)
- //err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0,
- // msgData, model.SERVICE_NODE_TYPE_GAME_STR)
- //if err != nil {
- // //重连超时,不允许继续重连
- // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
- // baseutil.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg)
- //}
- //
- //ret := cliUser.ReconnectReset(s)
- //加密处理
- //s.SetAES(in.CryptPass)
- // 1.后续sdk需要拿token去服务器进行验证,避免盗号
- util.WarnF("CSReconnectReq msg=%v sessionId=%v", msg, s.ID())
- cliUser := model.ClientMag.GetConnectedFromOpenId(in.OpenId, in.Platform)
- if cliUser == nil {
- //该玩家没有登录过无法进行断线重连
- s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_FAIL)})
- util.InfoF("CSReconnectReq cliUser=%v msg=%v user not connected before", cliUser, msg)
- } else {
- //if cliUser.IsReconnectExpire() {
- // //重连超时,不允许继续重连 / 没有断线不允许发送重连消息
- // s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
- // util.InfoF("CSReconnectReq cliUser=%v msg=%v user connect time out", cliUser, msg)
- // return
- //}
- ret := cliUser.ConnectReset(s)
- if ret != nil {
- //session绑定出错
- s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
- util.InfoF("CSReconnectReqErr msg=%v session bind use err=%v", msg, ret)
- s.Close()
- return
- }
- //发送消息给game,重新进行session绑定
- err = cliUser.ClientDirect2BackendByServiceName(model.SERVICE_NODE_TYPE_GAME_STR, msgId, 0,
- msgData, model.SERVICE_NODE_TYPE_GAME_STR)
- if err != nil {
- //重连超时,不允许继续重连
- s.Send(&serverproto.SCReconnectAck{Error: int32(serverproto.ErrorCode_ERROR_ROLE_INVALID)})
- util.InfoF("CSReconnectReqErr msg=%v user connect game service not invalid", msg)
- } else {
- s.SetSessionOptFlag(true)
- util.WarnF("CSReconnectReq success msg=%v sessionid=%v", msg, s.ID())
- }
- }
- }
- default:
- //gm白名单处理
- if msgId == CSPlayerGMReqMsgId && len(service.GetServiceConfig().Node.WhiteListGM) >= 0 {
- bFind := false
- gmIPStr := ""
- switch t := s.Raw().(type) {
- case net.Conn:
- gmIPStr = t.RemoteAddr().String()
- case *websocket.Conn:
- gmIPStr = t.RemoteAddr().String()
- }
- //gmIPStr := s.Raw().(net.Conn).RemoteAddr().String()
- for idx := 0; idx < len(service.GetServiceConfig().Node.WhiteListGM); idx++ {
- if strings.Contains(gmIPStr, service.GetServiceConfig().Node.WhiteListGM[idx]) {
- bFind = true
- break
- }
- if strings.Contains("0.0.0.0", service.GetServiceConfig().Node.WhiteListGM[idx]) {
- bFind = true
- break
- }
- }
- if !bFind {
- s.Close()
- return
- }
- }
- //todo...目前暂时只针对gate做了透传,后期需要对所有服务器节点都可以执行透传
- routeRule := router.GetRuleByMsgID(msgId)
- if routeRule == nil {
- util.ErrorF("message not in route table, msgid=%v", msgId)
- //return nil, fmt.Errorf("message not in route table, msgid=%v", msgId)
- return nil, nil
- }
- //查看当前session对应的用户是否存在,否则不处理
- cliUser := model.Session2User(s)
- //cliUser.CheckAoi(msgId, CSPlayerMoveReqMsgId, CSPlayerMoveToReqMsgId, msgData)
- if cliUser != nil {
- //这边需要加锁(在调用GetServiceBackend时使用了读写锁)
- serviceId := cliUser.GetServiceBackend(routeRule.Mod)
- if serviceId == "" {
- //还没有绑定game/battlerecord节点就发送了别的消息
- //非正常流程导致,可能节点暂时未绑定
- if routeRule.Mod != model.SERVICE_NODE_TYPE_BATTLERECORD_STR {
- s.Close()
- }
- } else {
- //打印增加多线程冲突
- //baseutil.InfoF("receive other msg send to game server serviceId=%v msgid=%v", serviceId, msgId)
- //包学序列号合法校验,避免发送重复的包
- //if cliUser.SeqId >= seqId {
- // ntfMsg := &serverproto.SCKickOutNtf{
- // Error: int32(serverproto.ErrorCode_ERROR_FAIL),
- // }
- //
- // s.Send(ntfMsg)
- // s.(rocommon.ContextSet).SetContextData("user", nil, "SSUserKickNtf")
- // return
- //}
- //cliUser.SeqId = seqId
- //转发给后台服务器
- err = cliUser.ClientDirect2Backend(serviceId, msgId, seqId, msgData, routeRule.Mod)
- if err != nil {
- util.WarnF("ClientDirect2Backend sessionid=%v msgid=%v err=%v", s.ID(), msgId, err)
- //非正常流程导致,可能节点暂时未绑定
- model.ClientMag.RemoveConnectedFromOpenId(cliUser.OpenId, cliUser.Platform)
- s.Close()
- }
- }
- } else {
- //todo...是否需要断开连接
- util.InfoF("cliUser not exist s=%v msgId=%v", s, msgId)
- }
- }
- return
- }
|