ormconnector.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. package mysql
  2. import (
  3. "database/sql"
  4. "gorm.io/driver/mysql"
  5. "gorm.io/gorm"
  6. "rocommon"
  7. "rocommon/socket"
  8. "rocommon/util"
  9. "sync"
  10. "time"
  11. )
  12. type MysqlOrmConnector struct {
  13. MySQLParameter
  14. socket.NetServerNodeProperty
  15. db *sql.DB
  16. ormDb *gorm.DB
  17. dbMutex sync.RWMutex
  18. reconDur time.Duration
  19. }
  20. func (this *MysqlOrmConnector) dbConn() *sql.DB {
  21. this.dbMutex.RLock()
  22. defer this.dbMutex.RUnlock()
  23. return this.db
  24. }
  25. func (this *MysqlOrmConnector) DbConnORM() *gorm.DB {
  26. this.dbMutex.RLock()
  27. defer this.dbMutex.RUnlock()
  28. return this.ormDb
  29. }
  30. func (this *MysqlOrmConnector) IsReady() bool {
  31. return this.dbConn() != nil
  32. }
  33. func (this *MysqlOrmConnector) Operate(cb func(client interface{}) interface{}) interface{} {
  34. return cb(this.dbConn())
  35. }
  36. func (this *MysqlOrmConnector) TypeOfName() string {
  37. return "MysqlOrmConnector"
  38. }
  39. func (this *MysqlOrmConnector) SetReconnectDuration(v time.Duration) {
  40. this.reconDur = v
  41. }
  42. func (this *MysqlOrmConnector) tryConnect() {
  43. tmpOrmDb, err := gorm.Open(mysql.Open(this.GetAddr()), &gorm.Config{})
  44. //_, err := mysql.ParseDSN(this.GetAddr())
  45. if err != nil {
  46. util.ErrorF("invalid mysql dns=%v err=%v", this.GetAddr(), err)
  47. return
  48. }
  49. //util.InfoF("connect to mysql name=%v addr=%v dbname=%v", this.GetName(), cfg.Addr, cfg.DBName)
  50. this.ormDb = tmpOrmDb
  51. tmpDb, err := this.ormDb.DB()
  52. if err != nil {
  53. util.ErrorF("invalid mysql DB() err=%v", this.GetAddr(), err)
  54. return
  55. }
  56. err = tmpDb.Ping()
  57. if err != nil {
  58. util.ErrorF("ping err=%v", err)
  59. return
  60. }
  61. tmpDb.SetMaxIdleConns(int(this.PoolConnCount))
  62. tmpDb.SetMaxIdleConns(int(this.PoolConnCount))
  63. this.dbMutex.Lock()
  64. this.db = tmpDb
  65. this.dbMutex.Unlock()
  66. }
  67. func (this *MysqlOrmConnector) Start() rocommon.ServerNode {
  68. for {
  69. this.tryConnect()
  70. if this.reconDur == 0 || this.IsReady() {
  71. break
  72. }
  73. time.Sleep(this.reconDur)
  74. }
  75. return this
  76. }
  77. func (this *MysqlOrmConnector) Stop() {
  78. db := this.dbConn()
  79. if db != nil {
  80. db.Close()
  81. }
  82. }
  83. func init() {
  84. socket.RegisterServerNode(func() rocommon.ServerNode {
  85. node := new(MysqlOrmConnector)
  86. node.MySQLParameter.Init()
  87. return node
  88. })
  89. }