etcdreg.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/coreos/etcd/clientv3"
  7. "github.com/coreos/etcd/mvcc/mvccpb"
  8. "log"
  9. "net"
  10. "rocommon"
  11. "rocommon/util"
  12. "strconv"
  13. "strings"
  14. "sync/atomic"
  15. "time"
  16. )
  17. // 第一次服务器启动时间
  18. var ServiceStartupTime uint64 = 0
  19. // 注册到服务器发现
  20. func ETCDRegister(node rocommon.ServerNode, opts ...interface{}) *ETCDServiceDesc {
  21. property := node.(rocommon.ServerNodeProperty)
  22. sd := &ETCDServiceDesc{
  23. ID: GenServiceID(property),
  24. Name: property.GetName(),
  25. Host: property.GetAddr(),
  26. Type: property.ServerType(),
  27. Zone: property.GetZone(),
  28. Index: property.GetIndex(),
  29. }
  30. sd.RegTime = util.GetTimeSeconds()
  31. //服务器节点信息
  32. node.(rocommon.ContextSet).SetContextData("sid", sd, "ETCDRegister")
  33. //获取本地IPv4
  34. addrs, err := net.InterfaceAddrs()
  35. if err == nil {
  36. for _, addr := range addrs {
  37. if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
  38. if ipnet.IP.To4() != nil {
  39. sd.LocalAddr = ipnet.IP.String()
  40. break
  41. }
  42. }
  43. }
  44. }
  45. if sd.LocalAddr != "" {
  46. hostList := strings.Split(sd.Host, ":")
  47. if hostList[0] == "0.0.0.0" {
  48. sd.Host = sd.LocalAddr + ":" + hostList[1]
  49. }
  50. }
  51. //先查询是否存在相同的该节点,如果存在不做处理(或者通过del操作关闭其他客户端)
  52. etcdKey := GenServicePrefix(sd.ID, property.GetZone())
  53. rsp, err := etcdDiscovery.EtcdKV.Get(context.TODO(), etcdKey)
  54. if err != nil {
  55. util.PanicF("etcd discovery get err:%v\n", err)
  56. //log.Fatalf("etcd discovery get err:%v\n", err)
  57. } else {
  58. if rsp.Count > 0 {
  59. util.PanicF("current node has been register to etcd:%v\n", etcdKey)
  60. //log.Fatalf("current node has been register to etcd", sd.ID)
  61. } else {
  62. etcdDiscovery.RegisterWithTimeOut(etcdKey, sd.String())
  63. etcdDiscovery.WatchSelf(etcdKey, *sd)
  64. }
  65. }
  66. //cross etcd
  67. if crossEtcdDiscovery != nil {
  68. //先查询是否存在相同的该节点,如果存在不做处理(或者通过del操作关闭其他客户端)
  69. etcdKey := GenServicePrefix(sd.ID, property.GetZone())
  70. rsp, err := crossEtcdDiscovery.EtcdKV.Get(context.TODO(), etcdKey)
  71. if err != nil {
  72. util.PanicF("etcd discovery get err:%v\n", err)
  73. } else {
  74. if rsp.Count > 0 {
  75. util.PanicF("current node has been register to etcd:%v\n", etcdKey)
  76. } else {
  77. crossEtcdDiscovery.RegisterWithTimeOut(etcdKey, sd.String())
  78. crossEtcdDiscovery.WatchSelf(etcdKey, *sd)
  79. }
  80. }
  81. }
  82. //添加服务器开服时间(server/zone)
  83. InitServiceStartupTime(property.GetZone())
  84. return sd
  85. }
  86. func InitServiceStartupTime(zone int) {
  87. //添加服务器开服时间(server/zone)
  88. startupKey := GenServiceZonePrefix(zone)
  89. rsp1, err1 := etcdDiscovery.EtcdKV.Get(context.TODO(), startupKey)
  90. util.InfoF("InitServiceStartupTime startupKey:”%v rsp1:%v", startupKey, rsp1)
  91. if err1 != nil {
  92. util.PanicF("etcd discovery get err:%v\n", err1)
  93. } else {
  94. if rsp1.Count > 0 {
  95. //已经注册了服务器启动时间
  96. tmpTime, _ := strconv.ParseUint(string(rsp1.Kvs[0].Value), 10, 64)
  97. atomic.StoreUint64(&ServiceStartupTime, tmpTime)
  98. //atomic.StoreUint64(&ServiceStartupTime, 1734234903000)
  99. //util.InfoF("InitServiceStartupTime startupKey:”%v rsp222:%v", startupKey, rsp1)
  100. } else {
  101. nowTime := util.GetCurrentTime()
  102. atomic.StoreUint64(&ServiceStartupTime, nowTime)
  103. val := strconv.FormatUint(nowTime, 10)
  104. etcdDiscovery.Register(startupKey, val)
  105. }
  106. tmpTime := GetServiceStartupTime()
  107. tmpTime1 := time.Unix(int64(tmpTime/1000), 0).In(util.GetLoc())
  108. util.InfoF("InitServiceStartupTime Service StartupTime %v| %v", tmpTime, tmpTime1)
  109. }
  110. }
  111. // return ms
  112. func GetServiceStartupTime() uint64 {
  113. return atomic.LoadUint64(&ServiceStartupTime)
  114. }
  115. // todo..解除注册
  116. func ETCDUnregister(node rocommon.ServerNode) {
  117. property := node.(rocommon.ServerNodeProperty)
  118. sd := &ETCDServiceDesc{
  119. ID: GenServiceID(property),
  120. Name: property.GetName(),
  121. Host: property.GetAddr(),
  122. Type: property.ServerType(),
  123. Zone: property.GetZone(),
  124. Index: property.GetIndex(),
  125. }
  126. sd.RegTime = util.GetTimeSeconds()
  127. etcdKey := GenServicePrefix(sd.ID, property.GetZone())
  128. util.InfoF("ETCDUnregister =%v", etcdKey)
  129. etcdDiscovery.Del(etcdKey)
  130. if crossEtcdDiscovery != nil {
  131. crossEtcdDiscovery.Del(etcdKey)
  132. }
  133. }
  134. // 发现服务器,服务可能有多个地址,例如需要连接多个game
  135. // todo...返回多个servernode结构体
  136. func DiscoveryService(serviceName string, serviceZone int, nodeCreator func(MultiServerNode, *ETCDServiceDesc)) rocommon.ServerNode {
  137. //如果已经存在的,就停止之前正在运行的节点(注意不要配置成一样的节点信息,否则会关闭之前的连接)
  138. multiNode := NewMultiServerNode() //nodereg.go
  139. //连接同一个zone里的服务器节点
  140. etcdKey := GenDiscoveryServicePrefix(serviceName, serviceZone)
  141. /*
  142. rsp, err := etcdDiscovery.EtcdKV.Get(context.TODO(),etcdKey, clientv3.WithPrefix())
  143. if err != nil {
  144. util.FatalF("etcd discovery get err:%v", err)
  145. //log.Fatalf("etcd discovery get err:%v\n", err)
  146. }
  147. logutil.InfoF("service[%v] node find count:%v", etcdKey, rsp.Count)
  148. //log.Printf("service[%v] node find count:%v\n", serviceName, rsp.Count)
  149. for _,data := range rsp.Kvs {
  150. util.InfoF("etcd discovery start connect:%v", string(data.Key))
  151. //需要判断节点是否已经存在
  152. var sd ETCDServiceDesc
  153. err := json.Unmarshal(data.Value, &sd)
  154. if err != nil {
  155. util.InfoF("etcd discovery kv[%v][value]err:%v",data.Key, err)
  156. continue
  157. }
  158. //先停止之前的连接,再执行新的连接
  159. if preNode := multiNode.GetNode(sd.ID); preNode != nil {
  160. multiNode.RemoveNode(sd.ID)
  161. preNode.Stop()
  162. }
  163. nodeCreator(multiNode, &sd)
  164. }
  165. */
  166. //会收到key 对应的最近一次变化通知,
  167. var ch clientv3.WatchChan
  168. ch = etcdDiscovery.EtcdCli.Watch(context.TODO(), etcdKey, clientv3.WithPrefix())
  169. //watch操作
  170. go func() {
  171. //查找已经存在的节点
  172. rsp, err := etcdDiscovery.EtcdKV.Get(context.TODO(), etcdKey, clientv3.WithPrefix())
  173. if err != nil {
  174. util.FatalF("etcd discovery get err:%v", err)
  175. //log.Fatalf("etcd discovery get err:%v\n", err)
  176. }
  177. util.InfoF("service[%v] node find count:%v", etcdKey, rsp.Count)
  178. for _, data := range rsp.Kvs {
  179. util.InfoF("etcd discovery start connect:%v", string(data.Key))
  180. //需要判断节点是否已经存在
  181. var sd ETCDServiceDesc
  182. err := json.Unmarshal(data.Value, &sd)
  183. if err != nil {
  184. util.InfoF("etcd discovery kv[%v][value]err:%v", data.Key, err)
  185. continue
  186. }
  187. //先停止之前的连接,再执行新的连接
  188. if preNode := multiNode.GetNode(sd.ID); preNode != nil {
  189. multiNode.RemoveNode(sd.ID)
  190. preNode.Stop()
  191. }
  192. nodeCreator(multiNode, &sd)
  193. }
  194. for {
  195. select {
  196. case c := <-ch:
  197. //log.Println("etcd discovery watch count:",len(c.Events))
  198. //todo...处理删除kv操作
  199. for _, ev := range c.Events {
  200. switch ev.Type {
  201. case mvccpb.PUT:
  202. var sd ETCDServiceDesc
  203. err := json.Unmarshal(ev.Kv.Value, &sd)
  204. if err != nil {
  205. util.InfoF("err:etcd discovery kv[%v][value]err:%v", string(ev.Kv.Key), err)
  206. continue
  207. }
  208. util.InfoF("etcd discovery watch put key=%v", string(ev.Kv.Key))
  209. //log.Println("etcd discovery watch put key:",string(ev.Kv.Key))
  210. //先停止之前的连接,再执行新的连接
  211. if preNode := multiNode.GetNode(sd.ID); preNode != nil {
  212. //todo...
  213. //暂时先处理成,如果存在节点则返回(保证节点ip和端口不变的情况下,否则需要启用移除老连接启用新连接)
  214. util.InfoF("etcd discovery watch put find oldkey:%v %v", string(ev.Kv.Key), sd.ID)
  215. //continue
  216. //调试模式下使用已经存在的节点
  217. if DebugMode {
  218. util.InfoF("etcd discovery DebugMode=%v", DebugMode)
  219. continue
  220. }
  221. var preDesc *ETCDServiceDesc
  222. preNode.(rocommon.ContextSet).RawContextData("sid", &preDesc)
  223. if preDesc.RegTime == sd.RegTime {
  224. continue
  225. }
  226. multiNode.RemoveNode(sd.ID)
  227. //todo...通过etcd处理,如果相同的键值还存在则服务器启动时会失败,所以这边暂时不做停止处理
  228. // 后续解决重连时需要注意
  229. // 重连产生的问题,重连上来后再断开后stop中的wait才能继续,然后再调用nodeCreator函数,导致每次
  230. // 关闭对端的节点后才进行连接,因为主动调用stop时,重连上了,导致stop会一直在wait状态,导致执行
  231. // 不到nodeCreator,关闭对端后,stop中的wait被解除(断开连接导致解除),然后执行nodeCreator
  232. // 但是因为此时对端已经关闭,所以导致开始时想要连接的反而连接不上,处于重连状态
  233. // 需要context来主动断开所有协程
  234. preNode.Stop()
  235. util.InfoF("remove old node:%v time:%v %v", sd.ID, preDesc.RegTime, util.GetTimeByUint32(uint32(preDesc.RegTime)).String())
  236. //log.Println("remove node:", sd.ID)
  237. }
  238. //util.InfoF("etcd discovery watch put k1111ey:%v", string(ev.Kv.Key))
  239. nodeCreator(multiNode, &sd)
  240. case mvccpb.DELETE:
  241. //注意:social关注本区中的其他social节点,所以自己的节点删除这边会通知,其他节点不会
  242. util.InfoF("etcd discovery watch delete key:%v", string(ev.Kv.Key))
  243. //log.Println("etcd discovery watch delete key:", string(ev.Kv.Key))
  244. nodeID := GenService(string(ev.Kv.Key))
  245. //log.Println("pre delete:", nodeID)
  246. //先停止之前的连接,再执行新的连接
  247. if preNode := multiNode.GetNode(nodeID); preNode != nil {
  248. //不移除可以触发断线重连,否则,这边直接把节点关闭无法触发断线重连
  249. //避免这边移除后导致etcd无法成功注册的话还能重连成功
  250. //multiNode.RemoveNode(nodeID)
  251. //preNode.Stop()
  252. util.InfoF("delete node:%v", nodeID)
  253. }
  254. }
  255. }
  256. }
  257. }
  258. }()
  259. return nil
  260. }
  261. // /////////////////////////////////////////
  262. type ServiceDiscovery struct {
  263. etcdConfig clientv3.Config
  264. EtcdCli *clientv3.Client //clientv3.New(conf)
  265. EtcdKV clientv3.KV
  266. watchSelfCloseCh chan interface{}
  267. }
  268. func NewNetServiceDiscovery(addr string) (*ServiceDiscovery, error) {
  269. sd := &ServiceDiscovery{
  270. watchSelfCloseCh: make(chan interface{}),
  271. }
  272. epsStr := fmt.Sprintf("http://%s", addr)
  273. sd.etcdConfig = clientv3.Config{
  274. Endpoints: []string{epsStr},
  275. DialTimeout: 3 * time.Second,
  276. }
  277. cli, err := clientv3.New(sd.etcdConfig)
  278. if err != nil {
  279. return nil, err
  280. } else {
  281. sd.EtcdCli = cli
  282. sd.EtcdKV = clientv3.NewKV(sd.EtcdCli)
  283. return sd, nil
  284. }
  285. }
  286. func (this *ServiceDiscovery) Close() {
  287. this.EtcdCli.Close()
  288. this.watchSelfCloseCh <- true
  289. }
  290. func (this *ServiceDiscovery) RegisterWithTimeOut(key string, value string) int64 {
  291. //获得lease数据
  292. leaseRsp, err := this.EtcdCli.Grant(context.TODO(), 3)
  293. if err != nil {
  294. util.PanicF("etcd grant falied=%v", err)
  295. //log.Fatalf("etcd grant falied:%v\n", err)
  296. return 0
  297. }
  298. ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
  299. defer cancel()
  300. rsp, err := this.EtcdKV.Put(ctx, key, value, clientv3.WithLease(leaseRsp.ID))
  301. if err != nil {
  302. //util.PanicF("etcd put key failed=%v\n", err)
  303. util.FatalF("etcd put key failed:%v\n", err)
  304. return 0
  305. } else {
  306. util.InfoF("etcd register ok key=%v clusterid=%v leaseid=%v etcdaddr=%v", key, rsp.Header.ClusterId, leaseRsp.ID, this.etcdConfig.Endpoints)
  307. //log.Printf("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  308. }
  309. _, err = this.EtcdCli.KeepAlive(context.TODO(), leaseRsp.ID)
  310. if err != nil {
  311. util.PanicF("etcd put key failed=%v\n", err)
  312. }
  313. return int64(leaseRsp.ID)
  314. }
  315. // watch自己,网络恢复后得到自己被删除的通知,重新设置key租约
  316. // WatchSelf只重新设置lease,不做其他操作(key只是自己)
  317. func (this *ServiceDiscovery) WatchSelf(key string, value ETCDServiceDesc) {
  318. //调试模式下不生效
  319. if DebugMode {
  320. util.InfoF("DebugMode=%v WatchSelf Invalid", DebugMode)
  321. return
  322. }
  323. //watch自己,网络恢复后得到自己被删除的通知,重新设置key租约
  324. keepaliveWatch := this.EtcdCli.Watch(context.TODO(), key)
  325. go func() {
  326. for {
  327. select {
  328. case c := <-keepaliveWatch:
  329. for _, ev := range c.Events {
  330. switch ev.Type {
  331. case mvccpb.DELETE:
  332. util.InfoF("etcd WatchSelf del-self key=%v etcdaddr=%v", key, this.etcdConfig.Endpoints)
  333. value.RegTime = util.GetTimeSeconds()
  334. this.RegisterWithTimeOut(key, value.String())
  335. }
  336. }
  337. case <-this.watchSelfCloseCh: //discovery close
  338. break
  339. }
  340. }
  341. }()
  342. }
  343. func (this *ServiceDiscovery) WatchKey(key string) {
  344. keepaliveWatch := this.EtcdCli.Watch(context.TODO(), key)
  345. go func() {
  346. for {
  347. select {
  348. case c := <-keepaliveWatch:
  349. for _, ev := range c.Events {
  350. switch ev.Type {
  351. case mvccpb.DELETE:
  352. util.InfoF("etcd WatchKey del key=%v etcdaddr=%v", key, this.etcdConfig.Endpoints)
  353. }
  354. }
  355. }
  356. }
  357. }()
  358. }
  359. func (this *ServiceDiscovery) Del(key string) bool {
  360. _, err := this.EtcdCli.Delete(context.TODO(), key)
  361. if err != nil {
  362. util.FatalF("etcd del key failed:%v", key)
  363. return false
  364. }
  365. return true
  366. }
  367. func (this *ServiceDiscovery) Register(key string, value string) {
  368. rsp, err := this.EtcdKV.Put(context.TODO(), key, value)
  369. if err != nil {
  370. util.PanicF("etcd put key failed:%v\n", err)
  371. //log.Fatalf("etcd put key failed:%v\n", err)
  372. return
  373. } else {
  374. util.InfoF("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  375. log.Printf("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  376. }
  377. }
  378. // 上报自己服务器当前的状态,供其它进程获取(例如获取当前地图线路情况)
  379. // leaseId < 0 表示不带lease的key更新
  380. func (this *ServiceDiscovery) UpdateStateToETCD(key, val string, leaseId int64) int64 {
  381. ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
  382. defer cancel()
  383. if leaseId >= 0 {
  384. if clientv3.LeaseID(leaseId) == clientv3.NoLease {
  385. leaseId = this.RegisterWithTimeOut(key, val)
  386. util.InfoF("UpdateStateToETCD first key=%v leaseid=%v", key, leaseId)
  387. return leaseId
  388. }
  389. //查看lease是否过期
  390. _, err := this.EtcdKV.Put(ctx, key, val, clientv3.WithLease(clientv3.LeaseID(leaseId)))
  391. if err != nil {
  392. util.FatalF("UpdateStateToETCD etcd update key failed:%v\n", err)
  393. //重新申请lease并注册
  394. leaseId = this.RegisterWithTimeOut(key, val)
  395. } else {
  396. //util.InfoF("UpdateStateToETCD etcd update ok key=%v clusterid=%v leaseId=%v etcdaddr=%v", key, rsp.Header.ClusterId, leaseId, this.etcdConfig.Endpoints)
  397. //log.Printf("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  398. }
  399. } else {
  400. _, err := this.EtcdKV.Put(ctx, key, val)
  401. if err != nil {
  402. util.FatalF("UpdateStateToETCD etcd update key failed:%v\n", err)
  403. } else {
  404. //util.InfoF("UpdateStateToETCD etcd update ok key=%v clusterid=%v etcdaddr=%v", key, rsp.Header.ClusterId, this.etcdConfig.Endpoints)
  405. //log.Printf("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  406. }
  407. }
  408. return leaseId
  409. }