server_socket.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package thrift
  20. import (
  21. "net"
  22. "sync"
  23. "time"
  24. )
  25. type TServerSocket struct {
  26. listener net.Listener
  27. addr net.Addr
  28. clientTimeout time.Duration
  29. // Protects the interrupted value to make it thread safe.
  30. mu sync.RWMutex
  31. interrupted bool
  32. //Size of buffer to use for socket. Defaults to 1024.
  33. //Set to 0 to disable bufferring server transport altogether.
  34. BufferSize int
  35. }
  36. func NewTServerSocket(listenAddr string) (*TServerSocket, error) {
  37. return NewTServerSocketTimeout(listenAddr, 0)
  38. }
  39. func NewTServerSocketTimeout(listenAddr string, clientTimeout time.Duration) (*TServerSocket, error) {
  40. addr, err := net.ResolveTCPAddr("tcp", listenAddr)
  41. if err != nil {
  42. return nil, err
  43. }
  44. return &TServerSocket{addr: addr, clientTimeout: clientTimeout, BufferSize: 1024}, nil
  45. }
  46. func (p *TServerSocket) Listen() error {
  47. if p.IsListening() {
  48. return nil
  49. }
  50. l, err := net.Listen(p.addr.Network(), p.addr.String())
  51. if err != nil {
  52. return err
  53. }
  54. p.listener = l
  55. return nil
  56. }
  57. func (p *TServerSocket) Accept() (TTransport, error) {
  58. p.mu.RLock()
  59. interrupted := p.interrupted
  60. p.mu.RUnlock()
  61. if interrupted {
  62. return nil, errTransportInterrupted
  63. }
  64. if p.listener == nil {
  65. return nil, NewTTransportException(NOT_OPEN, "No underlying server socket")
  66. }
  67. conn, err := p.listener.Accept()
  68. if err != nil {
  69. return nil, NewTTransportExceptionFromError(err)
  70. }
  71. var trans TTransport
  72. trans = NewTSocketFromConnTimeout(conn, p.clientTimeout)
  73. if p.BufferSize != 0 {
  74. trans = NewTBufferedTransport(trans, p.BufferSize)
  75. }
  76. return trans, nil
  77. }
  78. // Checks whether the socket is listening.
  79. func (p *TServerSocket) IsListening() bool {
  80. return p.listener != nil
  81. }
  82. // Connects the socket, creating a new socket object if necessary.
  83. func (p *TServerSocket) Open() error {
  84. if p.IsListening() {
  85. return NewTTransportException(ALREADY_OPEN, "Server socket already open")
  86. }
  87. if l, err := net.Listen(p.addr.Network(), p.addr.String()); err != nil {
  88. return err
  89. } else {
  90. p.listener = l
  91. }
  92. return nil
  93. }
  94. func (p *TServerSocket) Addr() net.Addr {
  95. return p.addr
  96. }
  97. func (p *TServerSocket) Close() error {
  98. defer func() {
  99. p.listener = nil
  100. }()
  101. if p.IsListening() {
  102. return p.listener.Close()
  103. }
  104. return nil
  105. }
  106. func (p *TServerSocket) Interrupt() error {
  107. p.mu.Lock()
  108. p.interrupted = true
  109. p.mu.Unlock()
  110. return nil
  111. }