etcdreg.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. package service
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/coreos/etcd/clientv3"
  7. "github.com/coreos/etcd/mvcc/mvccpb"
  8. "log"
  9. "net"
  10. "rocommon"
  11. "rocommon/util"
  12. "strconv"
  13. "strings"
  14. "sync/atomic"
  15. "time"
  16. )
  17. // 第一次服务器启动时间
  18. var ServiceStartupTime uint64 = 0
  19. // 注册到服务器发现
  20. func ETCDRegister(node rocommon.ServerNode, opts ...interface{}) *ETCDServiceDesc {
  21. property := node.(rocommon.ServerNodeProperty)
  22. sd := &ETCDServiceDesc{
  23. ID: GenServiceID(property),
  24. Name: property.GetName(),
  25. Host: property.GetAddr(),
  26. Type: property.ServerType(),
  27. Zone: property.GetZone(),
  28. Index: property.GetIndex(),
  29. }
  30. sd.RegTime = util.GetTimeSeconds()
  31. //服务器节点信息
  32. node.(rocommon.ContextSet).SetContextData("sid", sd, "ETCDRegister")
  33. //获取本地IPv4
  34. addrs, err := net.InterfaceAddrs()
  35. if err == nil {
  36. for _, addr := range addrs {
  37. if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
  38. if ipnet.IP.To4() != nil {
  39. sd.LocalAddr = ipnet.IP.String()
  40. break
  41. }
  42. }
  43. }
  44. }
  45. if sd.LocalAddr != "" {
  46. hostList := strings.Split(sd.Host, ":")
  47. if hostList[0] == "0.0.0.0" {
  48. sd.Host = sd.LocalAddr + ":" + hostList[1]
  49. }
  50. }
  51. //先查询是否存在相同的该节点,如果存在不做处理(或者通过del操作关闭其他客户端)
  52. etcdKey := GenServicePrefix(sd.ID, property.GetZone())
  53. rsp, err := etcdDiscovery.EtcdKV.Get(context.TODO(), etcdKey)
  54. if err != nil {
  55. util.PanicF("etcd discovery get err:%v\n", err)
  56. //log.Fatalf("etcd discovery get err:%v\n", err)
  57. } else {
  58. if rsp.Count > 0 {
  59. util.PanicF("current node has been register to etcd:%v\n", etcdKey)
  60. //log.Fatalf("current node has been register to etcd", sd.ID)
  61. } else {
  62. etcdDiscovery.RegisterWithTimeOut(etcdKey, sd.String())
  63. etcdDiscovery.WatchSelf(etcdKey, *sd)
  64. }
  65. }
  66. //cross etcd
  67. //if crossEtcdDiscovery != nil {
  68. // //先查询是否存在相同的该节点,如果存在不做处理(或者通过del操作关闭其他客户端)
  69. // etcdKey := GenServicePrefix(sd.ID, property.GetZone())
  70. // rsp, err := crossEtcdDiscovery.EtcdKV.Get(context.TODO(), etcdKey)
  71. // if err != nil {
  72. // util.PanicF("etcd discovery get err:%v\n", err)
  73. // } else {
  74. // if rsp.Count > 0 {
  75. // util.PanicF("current node has been register to etcd:%v\n", etcdKey)
  76. // } else {
  77. // crossEtcdDiscovery.RegisterWithTimeOut(etcdKey, sd.String())
  78. // crossEtcdDiscovery.WatchSelf(etcdKey, *sd)
  79. // }
  80. // }
  81. //}
  82. //添加服务器开服时间(server/zone)
  83. InitServiceStartupTime(property.GetZone())
  84. return sd
  85. }
  86. func InitServiceStartupTime(zone int) {
  87. //添加服务器开服时间(server/zone)
  88. startupKey := GenServiceZonePrefix(zone)
  89. rsp1, err1 := etcdDiscovery.EtcdKV.Get(context.TODO(), startupKey)
  90. util.InfoF("InitServiceStartupTime startupKey:”%v rsp1:%v", startupKey, rsp1)
  91. if err1 != nil {
  92. util.PanicF("etcd discovery get err:%v\n", err1)
  93. } else {
  94. if rsp1.Count > 0 {
  95. //已经注册了服务器启动时间
  96. tmpTime, _ := strconv.ParseUint(string(rsp1.Kvs[0].Value), 10, 64)
  97. atomic.StoreUint64(&ServiceStartupTime, tmpTime)
  98. //atomic.StoreUint64(&ServiceStartupTime, 1734234903000)
  99. //util.InfoF("InitServiceStartupTime startupKey:”%v rsp222:%v", startupKey, rsp1)
  100. } else {
  101. nowTime := util.GetCurrentTime()
  102. atomic.StoreUint64(&ServiceStartupTime, nowTime)
  103. val := strconv.FormatUint(nowTime, 10)
  104. etcdDiscovery.Register(startupKey, val)
  105. }
  106. tmpTime := GetServiceStartupTime()
  107. tmpTime1 := time.Unix(int64(tmpTime/1000), 0).In(util.GetLoc())
  108. util.InfoF("InitServiceStartupTime Service StartupTime %v| %v", tmpTime, tmpTime1)
  109. }
  110. }
  111. // return ms
  112. func GetServiceStartupTime() uint64 {
  113. return atomic.LoadUint64(&ServiceStartupTime)
  114. }
  115. // todo..解除注册
  116. func ETCDUnregister(node rocommon.ServerNode) {
  117. property := node.(rocommon.ServerNodeProperty)
  118. sd := &ETCDServiceDesc{
  119. ID: GenServiceID(property),
  120. Name: property.GetName(),
  121. Host: property.GetAddr(),
  122. Type: property.ServerType(),
  123. Zone: property.GetZone(),
  124. Index: property.GetIndex(),
  125. }
  126. sd.RegTime = util.GetTimeSeconds()
  127. etcdKey := GenServicePrefix(sd.ID, property.GetZone())
  128. util.InfoF("ETCDUnregister =%v", etcdKey)
  129. etcdDiscovery.Del(etcdKey)
  130. if crossEtcdDiscovery != nil {
  131. crossEtcdDiscovery.Del(etcdKey)
  132. }
  133. }
  134. // 发现服务器,服务可能有多个地址,例如需要连接多个game
  135. // todo...返回多个servernode结构体
  136. func DiscoveryService(serviceName string, serviceZone int, nodeCreator func(MultiServerNode, *ETCDServiceDesc)) rocommon.ServerNode {
  137. //如果已经存在的,就停止之前正在运行的节点(注意不要配置成一样的节点信息,否则会关闭之前的连接)
  138. multiNode := NewMultiServerNode() //nodereg.go
  139. //连接同一个zone里的服务器节点
  140. etcdKey := GenDiscoveryServicePrefix(serviceName, serviceZone)
  141. /*
  142. rsp, err := etcdDiscovery.EtcdKV.Get(context.TODO(),etcdKey, clientv3.WithPrefix())
  143. if err != nil {
  144. util.FatalF("etcd discovery get err:%v", err)
  145. //log.Fatalf("etcd discovery get err:%v\n", err)
  146. }
  147. logutil.InfoF("service[%v] node find count:%v", etcdKey, rsp.Count)
  148. //log.Printf("service[%v] node find count:%v\n", serviceName, rsp.Count)
  149. for _,data := range rsp.Kvs {
  150. util.InfoF("etcd discovery start connect:%v", string(data.Key))
  151. //需要判断节点是否已经存在
  152. var sd ETCDServiceDesc
  153. err := json.Unmarshal(data.Value, &sd)
  154. if err != nil {
  155. util.InfoF("etcd discovery kv[%v][value]err:%v",data.Key, err)
  156. continue
  157. }
  158. //先停止之前的连接,再执行新的连接
  159. if preNode := multiNode.GetNode(sd.ID); preNode != nil {
  160. multiNode.RemoveNode(sd.ID)
  161. preNode.Stop()
  162. }
  163. nodeCreator(multiNode, &sd)
  164. }
  165. */
  166. //会收到key 对应的最近一次变化通知,
  167. var ch clientv3.WatchChan
  168. ch = etcdDiscovery.EtcdCli.Watch(context.TODO(), etcdKey, clientv3.WithPrefix())
  169. //watch操作
  170. go func() {
  171. //查找已经存在的节点
  172. rsp, err := etcdDiscovery.EtcdKV.Get(context.TODO(), etcdKey, clientv3.WithPrefix())
  173. if err != nil {
  174. util.FatalF("etcd discovery get err:%v", err)
  175. //log.Fatalf("etcd discovery get err:%v\n", err)
  176. }
  177. util.InfoF("service[%v] node find count:%v", etcdKey, rsp.Count)
  178. for _, data := range rsp.Kvs {
  179. util.InfoF("etcd discovery start connect:%v", string(data.Key))
  180. //需要判断节点是否已经存在
  181. var sd ETCDServiceDesc
  182. err := json.Unmarshal(data.Value, &sd)
  183. if err != nil {
  184. util.InfoF("etcd discovery kv[%v][value]err:%v", data.Key, err)
  185. continue
  186. }
  187. //先停止之前的连接,再执行新的连接
  188. if preNode := multiNode.GetNode(sd.ID); preNode != nil {
  189. multiNode.RemoveNode(sd.ID)
  190. preNode.Stop()
  191. }
  192. nodeCreator(multiNode, &sd)
  193. }
  194. for {
  195. select {
  196. case c := <-ch:
  197. //log.Println("etcd discovery watch count:",len(c.Events))
  198. //todo...处理删除kv操作
  199. for _, ev := range c.Events {
  200. switch ev.Type {
  201. case mvccpb.PUT:
  202. var sd ETCDServiceDesc
  203. err := json.Unmarshal(ev.Kv.Value, &sd)
  204. if err != nil {
  205. util.InfoF("err:etcd discovery kv[%v][value]err:%v", string(ev.Kv.Key), err)
  206. continue
  207. }
  208. util.InfoF("etcd discovery watch put key=%v", string(ev.Kv.Key))
  209. //log.Println("etcd discovery watch put key:",string(ev.Kv.Key))
  210. //先停止之前的连接,再执行新的连接
  211. if preNode := multiNode.GetNode(sd.ID); preNode != nil {
  212. //todo...
  213. //暂时先处理成,如果存在节点则返回(保证节点ip和端口不变的情况下,否则需要启用移除老连接启用新连接)
  214. util.InfoF("etcd discovery watch put find oldkey:%v %v", string(ev.Kv.Key), sd.ID)
  215. //continue
  216. //调试模式下使用已经存在的节点
  217. if DebugMode {
  218. util.InfoF("etcd discovery DebugMode=%v", DebugMode)
  219. continue
  220. }
  221. var preDesc *ETCDServiceDesc
  222. preNode.(rocommon.ContextSet).RawContextData("sid", &preDesc)
  223. if preDesc.RegTime == sd.RegTime {
  224. continue
  225. }
  226. multiNode.RemoveNode(sd.ID)
  227. //todo...通过etcd处理,如果相同的键值还存在则服务器启动时会失败,所以这边暂时不做停止处理
  228. // 后续解决重连时需要注意
  229. // 重连产生的问题,重连上来后再断开后stop中的wait才能继续,然后再调用nodeCreator函数,导致每次
  230. // 关闭对端的节点后才进行连接,因为主动调用stop时,重连上了,导致stop会一直在wait状态,导致执行
  231. // 不到nodeCreator,关闭对端后,stop中的wait被解除(断开连接导致解除),然后执行nodeCreator
  232. // 但是因为此时对端已经关闭,所以导致开始时想要连接的反而连接不上,处于重连状态
  233. // 需要context来主动断开所有协程
  234. preNode.Stop()
  235. util.InfoF("remove old node:%v time:%v %v", sd.ID, preDesc.RegTime, util.GetTimeByUint32(uint32(preDesc.RegTime)).String())
  236. //log.Println("remove node:", sd.ID)
  237. }
  238. //util.InfoF("etcd discovery watch put k1111ey:%v", string(ev.Kv.Key))
  239. nodeCreator(multiNode, &sd)
  240. case mvccpb.DELETE:
  241. //注意:social关注本区中的其他social节点,所以自己的节点删除这边会通知,其他节点不会
  242. util.InfoF("etcd discovery watch delete key:%v", string(ev.Kv.Key))
  243. //log.Println("etcd discovery watch delete key:", string(ev.Kv.Key))
  244. nodeID := GenService(string(ev.Kv.Key))
  245. //log.Println("pre delete:", nodeID)
  246. //先停止之前的连接,再执行新的连接
  247. if preNode := multiNode.GetNode(nodeID); preNode != nil {
  248. //不移除可以触发断线重连,否则,这边直接把节点关闭无法触发断线重连
  249. //避免这边移除后导致etcd无法成功注册的话还能重连成功
  250. //multiNode.RemoveNode(nodeID)
  251. //preNode.Stop()
  252. util.InfoF("delete node:%v", nodeID)
  253. }
  254. }
  255. }
  256. }
  257. }
  258. }()
  259. return nil
  260. }
  261. // /////////////////////////////////////////
  262. type ServiceDiscovery struct {
  263. etcdConfig clientv3.Config
  264. EtcdCli *clientv3.Client //clientv3.New(conf)
  265. EtcdKV clientv3.KV
  266. watchSelfCloseCh chan interface{}
  267. }
  268. func NewNetServiceDiscovery(addr string) (*ServiceDiscovery, error) {
  269. sd := &ServiceDiscovery{
  270. watchSelfCloseCh: make(chan interface{}),
  271. }
  272. epsStr := fmt.Sprintf("http://%s", addr)
  273. sd.etcdConfig = clientv3.Config{
  274. Endpoints: []string{epsStr},
  275. DialTimeout: 3 * time.Second,
  276. }
  277. cli, err := clientv3.New(sd.etcdConfig)
  278. if err != nil {
  279. return nil, err
  280. } else {
  281. sd.EtcdCli = cli
  282. sd.EtcdKV = clientv3.NewKV(sd.EtcdCli)
  283. return sd, nil
  284. }
  285. }
  286. func (this *ServiceDiscovery) Close() {
  287. this.EtcdCli.Close()
  288. this.watchSelfCloseCh <- true
  289. }
  290. func (this *ServiceDiscovery) RegisterWithTimeOut(key string, value string) int64 {
  291. ////获得lease数据
  292. //leaseRsp, err := this.EtcdCli.Grant(context.TODO(), 3)
  293. //if err != nil {
  294. // util.PanicF("etcd grant falied=%v", err)
  295. // //log.Fatalf("etcd grant falied:%v\n", err)
  296. // return 0
  297. //}
  298. //ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
  299. //defer cancel()
  300. //rsp, err := this.EtcdKV.Put(ctx, key, value, clientv3.WithLease(leaseRsp.ID))
  301. //if err != nil {
  302. // //util.PanicF("etcd put key failed=%v\n", err)
  303. // util.FatalF("etcd put key failed:%v\n", err)
  304. // return 0
  305. //} else {
  306. // util.InfoF("etcd register ok key=%v clusterid=%v leaseid=%v etcdaddr=%v", key, rsp.Header.ClusterId, leaseRsp.ID, this.etcdConfig.Endpoints)
  307. // //log.Printf("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  308. //}
  309. //_, err = this.EtcdCli.KeepAlive(context.TODO(), leaseRsp.ID)
  310. //if err != nil {
  311. // util.PanicF("etcd put key failed=%v\n", err)
  312. //}
  313. //return int64(leaseRsp.ID)
  314. // 创建租约
  315. leaseRsp, err := this.EtcdCli.Grant(context.TODO(), 10)
  316. if err != nil {
  317. util.ErrorF("etcd grant failed=%v", err)
  318. return 0
  319. }
  320. // 设置 Put 操作的超时时间
  321. ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
  322. defer cancel()
  323. // 写入键值对并关联租约
  324. rsp, err := this.EtcdKV.Put(ctx, key, value, clientv3.WithLease(leaseRsp.ID))
  325. if err != nil {
  326. util.ErrorF("etcd put key failed=%v\n", err)
  327. return 0
  328. }
  329. util.InfoF("etcd register ok key=%v clusterid=%v leaseid=%v etcdaddr=%v", key, rsp.Header.ClusterId, leaseRsp.ID, this.etcdConfig.Endpoints)
  330. // 续期租约
  331. keepAliveChan, err := this.EtcdCli.KeepAlive(context.TODO(), leaseRsp.ID)
  332. if err != nil {
  333. util.ErrorF("etcd keepalive failed=%v\n", err)
  334. return 0
  335. }
  336. // 监控续期响应
  337. go func() {
  338. for resp := range keepAliveChan {
  339. if resp == nil {
  340. util.ErrorF("etcd keepalive channel closed")
  341. return
  342. }
  343. //util.InfoF("etcd keepalive success leaseid=%v", resp.ID)
  344. }
  345. }()
  346. return int64(leaseRsp.ID)
  347. }
  348. // watch自己,网络恢复后得到自己被删除的通知,重新设置key租约
  349. // WatchSelf只重新设置lease,不做其他操作(key只是自己)
  350. func (this *ServiceDiscovery) WatchSelf(key string, value ETCDServiceDesc) {
  351. //调试模式下不生效
  352. if DebugMode {
  353. util.InfoF("DebugMode=%v WatchSelf Invalid", DebugMode)
  354. return
  355. }
  356. //watch自己,网络恢复后得到自己被删除的通知,重新设置key租约
  357. keepaliveWatch := this.EtcdCli.Watch(context.TODO(), key)
  358. go func() {
  359. for {
  360. select {
  361. case c := <-keepaliveWatch:
  362. for _, ev := range c.Events {
  363. switch ev.Type {
  364. case mvccpb.DELETE:
  365. util.InfoF("etcd WatchSelf del-self key=%v etcdaddr=%v", key, this.etcdConfig.Endpoints)
  366. value.RegTime = util.GetTimeSeconds()
  367. this.RegisterWithTimeOut(key, value.String())
  368. }
  369. }
  370. case <-this.watchSelfCloseCh: //discovery close
  371. break
  372. }
  373. }
  374. }()
  375. }
  376. func (this *ServiceDiscovery) WatchKey(key string) {
  377. keepaliveWatch := this.EtcdCli.Watch(context.TODO(), key)
  378. go func() {
  379. for {
  380. select {
  381. case c := <-keepaliveWatch:
  382. for _, ev := range c.Events {
  383. switch ev.Type {
  384. case mvccpb.DELETE:
  385. util.InfoF("etcd WatchKey del key=%v etcdaddr=%v", key, this.etcdConfig.Endpoints)
  386. }
  387. }
  388. }
  389. }
  390. }()
  391. }
  392. func (this *ServiceDiscovery) Del(key string) bool {
  393. _, err := this.EtcdCli.Delete(context.TODO(), key)
  394. if err != nil {
  395. util.FatalF("etcd del key failed:%v", key)
  396. return false
  397. }
  398. return true
  399. }
  400. func (this *ServiceDiscovery) Register(key string, value string) {
  401. rsp, err := this.EtcdKV.Put(context.TODO(), key, value)
  402. if err != nil {
  403. util.PanicF("etcd put key failed:%v\n", err)
  404. //log.Fatalf("etcd put key failed:%v\n", err)
  405. return
  406. } else {
  407. util.InfoF("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  408. log.Printf("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  409. }
  410. }
  411. // 上报自己服务器当前的状态,供其它进程获取(例如获取当前地图线路情况)
  412. // leaseId < 0 表示不带lease的key更新
  413. func (this *ServiceDiscovery) UpdateStateToETCD(key, val string, leaseId int64) int64 {
  414. ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second)
  415. defer cancel()
  416. if leaseId >= 0 {
  417. if clientv3.LeaseID(leaseId) == clientv3.NoLease {
  418. leaseId = this.RegisterWithTimeOut(key, val)
  419. util.InfoF("UpdateStateToETCD first key=%v leaseid=%v", key, leaseId)
  420. return leaseId
  421. }
  422. //查看lease是否过期
  423. _, err := this.EtcdKV.Put(ctx, key, val, clientv3.WithLease(clientv3.LeaseID(leaseId)))
  424. if err != nil {
  425. util.FatalF("UpdateStateToETCD etcd update key failed:%v\n", err)
  426. //重新申请lease并注册
  427. leaseId = this.RegisterWithTimeOut(key, val)
  428. } else {
  429. //util.InfoF("UpdateStateToETCD etcd update ok key=%v clusterid=%v leaseId=%v etcdaddr=%v", key, rsp.Header.ClusterId, leaseId, this.etcdConfig.Endpoints)
  430. //log.Printf("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  431. }
  432. } else {
  433. _, err := this.EtcdKV.Put(ctx, key, val)
  434. if err != nil {
  435. util.FatalF("UpdateStateToETCD etcd update key failed:%v\n", err)
  436. } else {
  437. //util.InfoF("UpdateStateToETCD etcd update ok key=%v clusterid=%v etcdaddr=%v", key, rsp.Header.ClusterId, this.etcdConfig.Endpoints)
  438. //log.Printf("etcd register server:%v clusterid:%v", key, rsp.Header.ClusterId)
  439. }
  440. }
  441. return leaseId
  442. }