ConnectServer.cs 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. //#define SHOW_MESSAGE
  2. using DeepCore;
  3. using DeepCore.Concurrent;
  4. using DeepCore.IO;
  5. using DeepCore.Log;
  6. using DeepCore.Net;
  7. using DeepCore.Reflection;
  8. using DeepCrystal.FuckPomeloServer;
  9. using DeepCrystal.RPC;
  10. using DeepMMO.Data;
  11. using DeepMMO.Protocol;
  12. using DeepMMO.Protocol.Client;
  13. using DeepMMO.Server.Gate;
  14. using DeepMMO.Server.SystemMessage;
  15. using System;
  16. using System.Collections.Concurrent;
  17. using System.Collections.Generic;
  18. using System.Threading;
  19. using System.Threading.Tasks;
  20. namespace DeepMMO.Server.Connect
  21. {
  22. public class ConnectServer : IService
  23. {
  24. public static IOStreamPool ClientCodec { get; private set; }
  25. public static bool TraceRoute { get; set; } = true;
  26. private static bool KickOnError { get; set; }
  27. public string acceptor_host { get; private set; }
  28. public int acceptor_port { get; private set; }
  29. public Logger log { get; private set; }
  30. public IServer acceptor { get; private set; }
  31. public ConnectServer(ServiceStartInfo start) : base(start)
  32. {
  33. this.log = LoggerFactory.GetLogger(start.Address.ServiceName);
  34. {
  35. var client_codec = ReflectionUtil.CreateInterface<IExternalizableFactory>(start.Config["NetCodec"].ToString());
  36. if (ConnectServer.ClientCodec == null)
  37. {
  38. ConnectServer.ClientCodec = new IOStreamPool(client_codec);
  39. }
  40. ConnectServer.KickOnError = GlobalConfig.GetAsBool("KickOnError");
  41. var factory = ServerFactory.Instance;
  42. if (!string.IsNullOrEmpty(DeepMMO.Server.GlobalConfig.ReplaceNetHost))
  43. {
  44. start.Config["Host"] = DeepMMO.Server.GlobalConfig.ReplaceNetHost;
  45. }
  46. this.acceptor_host = start.Config["Host"].ToString();
  47. this.acceptor_port = int.Parse(start.Config["HostPort"].ToString());
  48. this.acceptor = factory.CreateServer(new HashMap<string, string>(start.Config), client_codec);
  49. this.acceptor.OnSessionConnected += Acceptor_OnSessionConnected;
  50. this.acceptor.OnSessionDisconnected += Acceptor_OnSessionDisconnected;
  51. this.acceptor.OnServerError += Acceptor_OnServerError;
  52. this.current_token = Guid.NewGuid().ToString();
  53. }
  54. }
  55. protected override void OnDisposed()
  56. {
  57. sessionMap.Dispose();
  58. }
  59. protected override async Task OnStartAsync()
  60. {
  61. log.Info("[ConnectServer Start]");
  62. this.gate_service = await base.Provider.GetAsync(ServerNames.GateServer);
  63. this.heartbeat_timer = base.Provider.CreateTimer(CheckHeartbeat, this,
  64. TimeSpan.FromSeconds(TimerConfig.timer_sec_SessionKeepTimeout),
  65. TimeSpan.FromSeconds(TimerConfig.timer_sec_SessionKeepTimeout));
  66. }
  67. protected override Task OnStopAsync(ServiceStopInfo reason)
  68. {
  69. this.heartbeat_timer.Dispose();
  70. this.gate_sync_timer.Dispose();
  71. this.acceptor.Dispose();
  72. return Task.FromResult(0);
  73. }
  74. [RpcHandler(typeof(SystemShutdownNotify))]
  75. public virtual void system_rpc_Handle(SystemShutdownNotify shutdown)
  76. {
  77. this.acceptor.StopAsync(shutdown.reason);
  78. }
  79. [RpcHandler(typeof(SystemStaticServicesStartedNotify))]
  80. public virtual void system_rpc_Handle(SystemStaticServicesStartedNotify shutdown)
  81. {
  82. //if (DeepCore.Log.Logger.SHOW_LOG)
  83. {
  84. log.InfoFormat("HostIP={0} HostPort={1}", acceptor_host, acceptor_port);
  85. log.InfoFormat("[ConnectServer Started]");
  86. }
  87. this.acceptor.StartAsync();
  88. this.OnGateSyncTimerTick(this);
  89. var intervalSec = TimeSpan.FromSeconds(TimerConfig.timer_sec_SyncConnectToGateNotify);
  90. this.gate_sync_timer = base.Provider.CreateTimer(OnGateSyncTimerTick, this, intervalSec, intervalSec);
  91. }
  92. [RpcHandler(typeof(ConnectorBroadcastNotify))]
  93. public virtual void broadcast_rpc_Handle(ConnectorBroadcastNotify notify)
  94. {
  95. var sockets = sessionMap.GetNotifySockets(notify.serverGroups, notify.sessions);
  96. var binary = ClientCodec.ToBinary(notify.notify);
  97. foreach (var socket in sockets)
  98. {
  99. socket.Send(binary);
  100. }
  101. }
  102. public override void OnWormholeTransported(RemoteAddress from, object message)
  103. {
  104. if (message is ConnectorBroadcastNotify notify)
  105. {
  106. var sockets = sessionMap.GetNotifySockets(notify.serverGroups, notify.sessions);
  107. var binary = ClientCodec.ToBinary(notify.notify);
  108. foreach (var socket in sockets)
  109. {
  110. socket.Send(binary);
  111. }
  112. }
  113. else if (message is ISerializable toall)
  114. {
  115. acceptor.Broadcast(toall);
  116. }
  117. }
  118. //------------------------------------------------------------------------------------------
  119. #region __Gate__
  120. protected IRemoteService gate_service { get; private set; }
  121. protected IDisposable gate_sync_timer { get; private set; }
  122. protected string current_token = "";
  123. protected virtual void OnGateSyncTimerTick(object state)
  124. {
  125. Provider.Execute(async () =>
  126. {
  127. if (gate_service == null)
  128. {
  129. this.gate_service = await base.Provider.GetAsync(ServerNames.GateServer);
  130. }
  131. if (gate_service != null)
  132. {
  133. var gate_sync = new SyncConnectToGateNotify()
  134. {
  135. connectServiceAddress = base.SelfAddress.FullPath,
  136. connectHost = acceptor_host,
  137. connectPort = acceptor_port,
  138. connectToken = current_token,
  139. clientNumber = acceptor.SessionCount,
  140. groupClientNumbers = sessionMap.GetGroupSessionNumbers(),
  141. };
  142. gate_service.Invoke(gate_sync);
  143. }
  144. });
  145. }
  146. public virtual bool ValidateGateToken(ClientEnterServerRequest enter)
  147. {
  148. if (enter.c2s_gate_token == current_token) { return true; }
  149. return false;
  150. }
  151. #endregion
  152. //------------------------------------------------------------------------------------------
  153. #region __SessionInfo__
  154. private IDisposable heartbeat_timer;
  155. private TimeSpan heartbeat_timeout = TimeSpan.FromSeconds(TimerConfig.timer_sec_SessionKeepTimeout);
  156. public class SessionInfo
  157. {
  158. private IRemoteService session;
  159. private DateTime last_heartbeat = DateTime.Now;
  160. public SessionInfo(IRemoteService session)
  161. {
  162. this.session = session;
  163. }
  164. public void Refresh()
  165. {
  166. last_heartbeat = DateTime.Now;
  167. }
  168. public bool CheckHeartbeat(TimeSpan heartbeat_timeout)
  169. {
  170. if (DateTime.Now - last_heartbeat > heartbeat_timeout)
  171. {
  172. if (session != null)
  173. {
  174. session.ShutdownAsync("timeout");
  175. session = null;
  176. return true;
  177. }
  178. }
  179. return false;
  180. }
  181. }
  182. protected void CheckHeartbeat(object state)
  183. {
  184. }
  185. #endregion
  186. //------------------------------------------------------------------------------------------
  187. #region __NetSession__
  188. //-------------------------------------------------------------------------------------------------------------------
  189. public class SessionMap : Disposable
  190. {
  191. private ReaderWriterLockSlim lock_rw = new ReaderWriterLockSlim();
  192. private HashMap<string, HashMap<string, ViewSession>> groupSessionsMap = new HashMap<string, HashMap<string, ViewSession>>();
  193. private HashMap<string, ViewSession> sessionsMap = new HashMap<string, ViewSession>();
  194. protected override void Disposing()
  195. {
  196. lock_rw.Dispose();
  197. }
  198. internal void RegistViewSession(ViewSession session)
  199. {
  200. using (lock_rw.EnterWrite())
  201. {
  202. var group = groupSessionsMap.GetOrAdd(session.ServerGroupID, (g) => new HashMap<string, ViewSession>());
  203. group[session.ServiceID] = session;
  204. sessionsMap[session.ServiceID] = session;
  205. }
  206. }
  207. internal void UnregistViewSession(ViewSession session)
  208. {
  209. if (session.IsValidate)
  210. {
  211. using (lock_rw.EnterWrite())
  212. {
  213. if (sessionsMap.TryGetValue(session.ServiceID, out var exist))
  214. {
  215. if (exist == session)
  216. {
  217. sessionsMap.Remove(session.ServiceID);
  218. var group = groupSessionsMap.GetOrAdd(session.ServerGroupID, (g) => new HashMap<string, ViewSession>());
  219. group.Remove(session.ServiceID);
  220. }
  221. }
  222. }
  223. }
  224. }
  225. internal HashMap<string, int> GetGroupSessionNumbers()
  226. {
  227. var ret = new HashMap<string, int>();
  228. using (lock_rw.EnterRead())
  229. {
  230. foreach (var e in groupSessionsMap)
  231. {
  232. ret.Add(e.Key, e.Value.Count);
  233. }
  234. }
  235. return ret;
  236. }
  237. internal void Clear()
  238. {
  239. using (lock_rw.EnterWrite())
  240. {
  241. groupSessionsMap.Clear();
  242. sessionsMap.Clear();
  243. }
  244. }
  245. public List<ViewSession> GetNotifySessions(ArrayList<string> serverGroups, ArrayList<string> accept)
  246. {
  247. using (lock_rw.EnterRead())
  248. {
  249. var list = new ArrayList<ViewSession>(sessionsMap.Count);
  250. var sessions = new HashMap<string, ViewSession>(this.sessionsMap.Count);
  251. if (serverGroups != null && serverGroups.Count > 0)
  252. {
  253. if (serverGroups.Count == 1)
  254. {
  255. if (groupSessionsMap.TryGetValue(serverGroups[0], out var group))
  256. {
  257. sessions.PutAll(group);
  258. }
  259. }
  260. else
  261. {
  262. foreach (var gid in serverGroups)
  263. {
  264. if (groupSessionsMap.TryGetValue(gid, out var group))
  265. {
  266. sessions.PutAll(group);
  267. }
  268. }
  269. }
  270. }
  271. else
  272. {
  273. sessions = this.sessionsMap;
  274. }
  275. if (accept != null && accept.Count > 0)
  276. {
  277. foreach (var sid in accept)
  278. {
  279. if (sessions.TryGetValue(sid, out var session))
  280. {
  281. list.Add(session);
  282. }
  283. }
  284. }
  285. else
  286. {
  287. list.AddRange(sessions.Values);
  288. }
  289. return list;
  290. }
  291. }
  292. public List<ISession> GetNotifySockets(ArrayList<string> serverGroups, ArrayList<string> accept)
  293. {
  294. var list = GetNotifySessions(serverGroups, accept);
  295. return list.ConvertAll(e => e.socket);
  296. }
  297. }
  298. private SessionMap sessionMap = new SessionMap();
  299. protected virtual async Task<IRemoteService> CreateSessionServiceAsync(string socketID, ClientEnterServerRequest enter)
  300. {
  301. var cfg = new HashMap<string, string>();
  302. cfg["sessionID"] = socketID;
  303. cfg["accountID"] = enter.c2s_account;
  304. cfg["connectName"] = this.SelfAddress.ServiceName;
  305. cfg["channel"] = enter.c2s_clientInfo.channel;
  306. cfg["passport"] = enter.c2s_clientInfo.sdkName;
  307. var addr = ServerNames.CreateSessionServiceAddress(enter.c2s_account, this.SelfAddress);
  308. var session = await this.Provider.GetOrCreateAsync(addr, cfg);
  309. if (session.Config["connectName"] != this.SelfAddress.ServiceName)
  310. {
  311. //如果Session在别的进程//
  312. this.log.WarnFormat("已存在相同Session在其他Connect:{0}", session.Address);
  313. this.log.WarnFormat("关闭老的Session:{0}", session.Address);
  314. var newAddress = ServerNames.CreateSessionServiceAddress(enter.c2s_account, this.SelfAddress);
  315. //log.Log(string.Format("Session Replace {0} -> {1}", session.Address, newAddress));
  316. try { await session.ShutdownAsync("new connect"); } catch (Exception err) { log.Error(err.Message, err); }
  317. session = await this.Provider.CreateAsync(newAddress, cfg);
  318. //throw new Exception("Session Alread Exist In Node : " + ret.Address.ServiceNode);
  319. }
  320. return session;
  321. }
  322. protected virtual void Acceptor_OnSessionConnected(ISession session)
  323. {
  324. //if (DeepCore.Log.Logger.SHOW_LOG)
  325. {
  326. log.Info("Acceptor_OnSessionConnected : " + session);
  327. }
  328. new ViewSession(this, session);
  329. }
  330. protected virtual void Acceptor_OnSessionDisconnected(ISession session)
  331. {
  332. //if (DeepCore.Log.Logger.SHOW_LOG)
  333. {
  334. log.Info("Acceptor_OnSessionDisconnected : " + session);
  335. }
  336. }
  337. protected virtual void Acceptor_OnServerError(IServer server, Exception err)
  338. {
  339. log.Error("Acceptor_OnServerError : " + err.Message, err);
  340. }
  341. public class ViewSession : Disposable
  342. {
  343. private static TypeAllocRecorder Alloc = new TypeAllocRecorder(nameof(ViewSession));
  344. public readonly ConnectServer connect;
  345. public readonly ISession socket;
  346. public Logger log { get => connect.log; }
  347. public string ServiceID { get => service_address.ServiceName; }
  348. public string ServerGroupID { get => serverGroupID; }
  349. public bool IsValidate { get => session_service != null; }
  350. protected RemoteAddress service_address;
  351. protected SessionService session_service;
  352. protected IRemoteService session_service_prx;
  353. protected string serverGroupID;
  354. public ViewSession(ConnectServer connect, ISession session)
  355. {
  356. Alloc.RecordConstructor(GetType());
  357. this.connect = connect;
  358. this.socket = session;
  359. this.socket.OnValidateAsync += Session_OnValidateAsync;
  360. this.socket.OnClosed += Session_OnClosed;
  361. this.socket.OnReceivedBinary += Session_OnReceivedBinary;
  362. this.socket.OnError += Session_OnError;
  363. }
  364. protected override void Disposing()
  365. {
  366. connect.sessionMap.UnregistViewSession(this);
  367. var prx = session_service_prx;
  368. this.session_service_prx = null;
  369. this.session_service = null;
  370. if (prx != null)
  371. {
  372. prx.Invoke(new SessionDisconnectNotify() { socketID = socket.ID });
  373. }
  374. Alloc.RecordDispose(GetType());
  375. }
  376. ~ViewSession()
  377. {
  378. Alloc.RecordDestructor(GetType());
  379. }
  380. protected virtual Task<Tuple<bool, ISerializable>> Session_OnValidateAsync(ISession socket, ISerializable user)
  381. {
  382. if (user is ClientEnterServerRequest enter)
  383. {
  384. return connect.Provider.Execute(new Func<Task<Tuple<bool, ISerializable>>>(async () =>
  385. {
  386. var validate = connect.ValidateGateToken(enter);
  387. if (validate || (enter.c2s_session_token != null))
  388. {
  389. //Gate验证成功,忽略SessionToken//
  390. if (validate)
  391. {
  392. enter.c2s_session_token = null;
  393. }
  394. else
  395. {
  396. //if (DeepCore.Log.Logger.SHOW_LOG)
  397. {
  398. connect.log.InfoFormat("玩家断线重连: {0}", enter.c2s_account);
  399. }
  400. }
  401. //log.Log(enter);
  402. var session = await connect.CreateSessionServiceAsync(socket.ID, enter);
  403. var rsp = await session.CallAsync<LocalBindSessionResponse>(new LocalBindSessionRequest()
  404. {
  405. session = this,
  406. enter = enter,
  407. });
  408. if (Response.CheckSuccess(rsp))
  409. {
  410. this.session_service = rsp.session;
  411. this.session_service_prx = session;
  412. this.service_address = session_service_prx.Address;
  413. this.serverGroupID = rsp.serverGroupID;
  414. connect.sessionMap.RegistViewSession(this);
  415. //log.Log(ret);
  416. var ret = new ClientEnterServerResponse() { s2c_session_token = rsp.sessionToken };
  417. return new Tuple<bool, ISerializable>(true, ret);
  418. }
  419. else
  420. {
  421. log.Error($"Connect绑定Session失败: {enter.c2s_account}");
  422. return new Tuple<bool, ISerializable>(false, null);
  423. }
  424. }
  425. else
  426. {
  427. return new Tuple<bool, ISerializable>(false, null);
  428. }
  429. }));
  430. }
  431. else
  432. {
  433. return Task.FromResult(new Tuple<bool, ISerializable>(false, null));
  434. }
  435. }
  436. protected virtual void Session_OnReceivedBinary(ISession session, BinaryMessage message, uint sendID)
  437. {
  438. try
  439. {
  440. var prx = session_service_prx;
  441. var svc = session_service;
  442. if (prx != null)
  443. {
  444. var route_codec = ConnectServer.ClientCodec.Factory.GetCodec(message.Route);
  445. if (route_codec == null)
  446. {
  447. throw new Exception("Bad Message Route : " + message.Route);
  448. }
  449. #if SHOW_MESSAGE
  450. Trace(" <-- Recv : {1} : {0}", route_codec, sendID);
  451. #endif
  452. //--------------------------------------------------------------------------
  453. // protocol check //
  454. //if (!route_codec.MessageType.IsInterfaceOf(typeof(INetProtocolC2S)))
  455. //{
  456. // throw new Exception($"Client Protocol '{route_codec.MessageType.FullName}' Not Interface Of '{nameof(INetProtocolC2S)}'");
  457. //}
  458. //--------------------------------------------------------------------------
  459. if (typeof(Request).IsAssignableFrom(route_codec.MessageType))
  460. {
  461. if (typeof(ISessionProtocol).IsAssignableFrom(route_codec.MessageType))
  462. {
  463. //链接服协议//
  464. prx.Call(message, do_SessionPrxCall);
  465. void do_SessionPrxCall(BinaryMessage rsp, Exception err)
  466. {
  467. if (rsp.HasRoute)
  468. {
  469. #if SHOW_MESSAGE
  470. Trace(" --> Send : {1} : {0}", rsp, sendID);
  471. #endif
  472. session.SendResponse(rsp, sendID);
  473. }
  474. else if (err != null)
  475. {
  476. connect.log.Error("Request is : " + route_codec);
  477. connect.log.Error(err.Message, err);
  478. if (KickOnError)
  479. {
  480. session.Disconnect(err.Message);
  481. }
  482. }
  483. }
  484. }
  485. else
  486. {
  487. //逻辑服协议//
  488. svc.connect_OnReceivedBinaryImmediately(route_codec, message, do_async_OnReceivedBinaryImmediately);
  489. void do_async_OnReceivedBinaryImmediately(BinaryMessage rsp, Exception err)
  490. {
  491. if (rsp.HasRoute)
  492. {
  493. this.SocketSend(rsp, sendID);
  494. }
  495. else if (err != null)
  496. {
  497. connect.log.Error("Request is : " + route_codec);
  498. connect.log.Error(err.Message, err);
  499. if (KickOnError)
  500. {
  501. session.Disconnect(err.Message);
  502. }
  503. }
  504. }
  505. }
  506. }
  507. else if (typeof(Notify).IsAssignableFrom(route_codec.MessageType))
  508. {
  509. if (typeof(ISessionProtocol).IsAssignableFrom(route_codec.MessageType))
  510. {
  511. //链接服协议//
  512. prx.Invoke(message);
  513. }
  514. else
  515. {
  516. //逻辑服协议//
  517. svc.connect_OnReceivedBinaryImmediately(route_codec, message);
  518. }
  519. }
  520. else
  521. {
  522. throw new Exception("Bad Message Type : " + route_codec.MessageType);
  523. }
  524. }
  525. }
  526. catch (Exception err)
  527. {
  528. connect.log.Error(err.Message, err);
  529. if (KickOnError)
  530. {
  531. session.Disconnect(err.Message);
  532. }
  533. }
  534. }
  535. protected virtual void Session_OnClosed(ISession session, string reason)
  536. {
  537. this.Dispose();
  538. }
  539. protected virtual void Session_OnError(ISession session, Exception err)
  540. {
  541. connect.log.Error("Session_OnError : " + err.Message, err);
  542. }
  543. public void SocketSend(BinaryMessage bin, uint sendID)
  544. {
  545. #if SHOW_MESSAGE
  546. Trace(" --> Send : {1} : {0}", bin, sendID);
  547. #endif
  548. connect.Execute(() =>
  549. {
  550. socket.SendResponse(bin, sendID);
  551. });
  552. }
  553. public void SocketSend(BinaryMessage bin)
  554. {
  555. #if SHOW_MESSAGE
  556. Trace(" --> Send : {1} : {0}", bin, 0);
  557. #endif
  558. connect.Execute(() =>
  559. {
  560. socket.Send(bin);
  561. });
  562. }
  563. #if SHOW_MESSAGE
  564. public void Trace(string message, TypeCodec codec, uint sendID)
  565. {
  566. if (TraceRoute)
  567. {
  568. if (codec.MessageType.FullName != "DeepMMO.Protocol.Client.ClientBattleAction"
  569. && codec.MessageType.FullName != "TLProtocol.Protocol.Client.ClientSyncServerTimeRequest"
  570. )
  571. {
  572. log.Trace(string.Format(message, codec.MessageType.FullName, sendID));
  573. }
  574. }
  575. }
  576. public void Trace(string message, ISerializable msg, uint sendID)
  577. {
  578. if (TraceRoute)
  579. {
  580. log.Trace(string.Format(message, msg.GetType().FullName, sendID));
  581. }
  582. }
  583. public void Trace(string message, BinaryMessage msg, uint sendID)
  584. {
  585. if (TraceRoute)
  586. {
  587. var route_codec = ConnectServer.ClientCodec.Factory.GetCodec(msg.Route);
  588. if (route_codec == null)
  589. {
  590. throw new Exception("Bad Message Route : " + msg.Route);
  591. }
  592. if(route_codec.MessageType.FullName != "DeepMMO.Protocol.Client.ClientBattleAction"
  593. && route_codec.MessageType.FullName != "TLProtocol.Protocol.Client.ClientSyncServerTimeResponse"
  594. )
  595. {
  596. Trace(message, route_codec, sendID);
  597. }
  598. }
  599. }
  600. #endif
  601. }
  602. #endregion
  603. }
  604. //--------------------------------------------------------------------------------------------------------------------------------------------
  605. }