MergeServerLogic.lua 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. --合服逻辑
  2. --[=[
  3. 0. --创建一个mongo连接实例
  4. ]=]--
  5. local DB = require("common.DB")
  6. local LuaMongo = _G.lua_mongo
  7. local Config = require("Config")
  8. local Log = require("common.Log")
  9. local MergeServerDefine = require("merge.MergeServerDefine")
  10. local primaryKeyTable = {}
  11. local dbcache = {}
  12. --重复_id缓存表
  13. local repeatIdTb = {}
  14. --创建一个mongo连接实例
  15. local function clientMongoDb()
  16. LuaMongo.client(Config.DB_IP)
  17. end
  18. --切换数据库
  19. local function chooseMongoDb(dbName)
  20. if not dbName or dbName == "" then
  21. print(string.format("数据库名错误, dbName = %s\n", dbName))
  22. return
  23. end
  24. LuaMongo.auth(dbName, Config.DB_USER, Config.DB_PASS)
  25. end
  26. --生成新的 _id
  27. local function generateNewId()
  28. local newId = LuaMongo.id()
  29. return newId
  30. end
  31. ---------------------------------------优化-----------------------------
  32. local function processCollection(sourceDb, targetDb, collectionName)
  33. local finalName = sourceDb .. collectionName
  34. local targetCollection = targetDb .. collectionName
  35. LuaMongo.find(finalName, nil, {})
  36. while true do
  37. local data = {}
  38. if not LuaMongo.next(data) then
  39. break
  40. end
  41. --直接插入目标库
  42. local ok, err = pcall(LuaMongo.insert, targetCollection, data)
  43. if not ok then
  44. print(string.format("=================插入失败, sourceDb = %s, collectionName = %s, _id = %s, err = %s\n", sourceDb, collectionName, data._id, err))
  45. --Log.write(Log.LOGID_OSS_MERGE, finalName, data._id, err)
  46. end
  47. end
  48. end
  49. local writeQueue = {}
  50. local function flushWriteQueue()
  51. for _, task in ipairs(writeQueue) do
  52. local ok, err = pcall(LuaMongo.insert, task.collection, task.data)
  53. if not ok then
  54. Log.error("插入失败", task)
  55. end
  56. end
  57. writeQueue = {}
  58. end
  59. -- 每积累100条数据批量写入
  60. local function bufferedInsert(collection, data)
  61. table.insert(writeQueue, {collection=collection, data=data})
  62. if #writeQueue >= 100 then
  63. flushWriteQueue()
  64. end
  65. end
  66. ----------------------------------------------------------------
  67. -----------------------------------------------------char集合中_id的重复检测和处理-------------------------------------------------
  68. --用于检测不同数据库的char集合中的_id是否有重复, 一般来说通过mongo生成的_id基本不会重复,但是还是检测下
  69. local function primaryKeyRepeatCheck(dbName)
  70. if not dbName or dbName == "" then
  71. print(string.format("数据库名错误, dbName = %s\n", dbName))
  72. return
  73. end
  74. local collectionName = MergeServerDefine.COLLECTIONS[1]
  75. local finalName = dbName .. collectionName
  76. LuaMongo.find(finalName, nil, {newUniqueTag = 1, _id = 1})
  77. while true do
  78. local data = {}
  79. if not LuaMongo.next(data) then
  80. break
  81. end
  82. if primaryKeyTable[data._id] then
  83. print(string.format("重复的_id, dbName = %s, _id = %s\n", dbName, data._id))
  84. end
  85. primaryKeyTable[data._id] = '1'
  86. end
  87. end
  88. --好友集合
  89. local function handleFriend(data)
  90. if repeatIdTb[data.uuid1] then
  91. data.uuid1 = repeatIdTb[data.uuid1]
  92. end
  93. if repeatIdTb[data.uuid2] then
  94. data.uuid2 = repeatIdTb[data.uuid2]
  95. end
  96. end
  97. --邮件集合
  98. local function handleMail(data)
  99. if repeatIdTb[data.receiverUuid] then
  100. data.receiverUuid = repeatIdTb[data.receiverUuid]
  101. end
  102. end
  103. --公会集合
  104. local function handleUnion(data)
  105. local tb = {}
  106. for id, v in pairs(data.member) do
  107. if repeatIdTb[id] then
  108. tb[id] = v
  109. end
  110. end
  111. for id, v in pairs(tb) do
  112. data.member[id] = nil
  113. data.member[repeatIdTb[id]] = v
  114. end
  115. tb = {}
  116. for id, v in pairs(data.apply) do
  117. if repeatIdTb[id] then
  118. tb[id] = v
  119. end
  120. end
  121. for id, v in pairs(tb) do
  122. data.apply[id] = nil
  123. data.apply[repeatIdTb[id]] = v
  124. end
  125. if data.presidentUuid and repeatIdTb[data.presidentUuid] then
  126. data.presidentUuid = repeatIdTb[data.presidentUuid]
  127. end
  128. end
  129. --星空争霸集合
  130. local function handleStar(data)
  131. if repeatIdTb[data.uuid] then
  132. data.uuid = repeatIdTb[data.uuid]
  133. end
  134. end
  135. --单人竞技场集合
  136. local function handleJJC(data)
  137. if not data.monsterOutID and repeatIdTb[data._id] then
  138. data._id = repeatIdTb[data._id]
  139. end
  140. end
  141. --战斗视频集合
  142. local function handleVideo(data)
  143. local uuid = data.combatInfo.attacker.uuid
  144. if repeatIdTb[uuid] then
  145. data.combatInfo.attacker.uuid = repeatIdTb[uuid]
  146. end
  147. end
  148. ---------------------------------------------------------------------------------------------------------------------------
  149. local tblList = { "ckwy_fy_S350064", "ckwy_fy_S350004" }
  150. local collName = ".middle_act_group"
  151. --合服开始函数
  152. function StartMergeServer()
  153. print("========================合服开始=====================\n")
  154. --连接数据库
  155. clientMongoDb()
  156. local targetDb = MergeServerDefine.MERGEDBTB[1]
  157. for i = 2, #MergeServerDefine.MERGEDBTB do
  158. local sourceDb = MergeServerDefine.MERGEDBTB[i]
  159. LuaMongo.auth(sourceDb, Config.DB_USER, Config.DB_PASS)
  160. for _, coll in ipairs(MergeServerDefine.COLLECTIONS) do
  161. if not MergeServerDefine.NOINSERTCOLLECTIONS[coll] then
  162. print(string.format("=====================开始合并集合: %s\n", coll))
  163. processCollection(sourceDb, targetDb, coll)
  164. end
  165. end
  166. end
  167. -- for _, db in ipairs(tblList) do
  168. -- LuaMongo.auth(db, Config.DB_USER, Config.DB_PASS)
  169. -- local coll = db .. collName
  170. -- LuaMongo.find(coll)
  171. -- while true do
  172. -- local data = {}
  173. -- if not LuaMongo.next(data) then
  174. -- break
  175. -- end
  176. -- print(string.format("================[StartMergeServer], _id : %s, startIdx :%s, endIdx :%s\n", data._id, data.startIdx, data.endIdx))
  177. -- end
  178. -- end
  179. end