multiplexed_protocol.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package thrift
  20. import (
  21. "fmt"
  22. "strings"
  23. )
  24. /*
  25. TMultiplexedProtocol is a protocol-independent concrete decorator
  26. that allows a Thrift client to communicate with a multiplexing Thrift server,
  27. by prepending the service name to the function name during function calls.
  28. NOTE: THIS IS NOT USED BY SERVERS. On the server, use TMultiplexedProcessor to handle request
  29. from a multiplexing client.
  30. This example uses a single socket transport to invoke two services:
  31. socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT)
  32. transport := thrift.NewTFramedTransport(socket)
  33. protocol := thrift.NewTBinaryProtocolTransport(transport)
  34. mp := thrift.NewTMultiplexedProtocol(protocol, "Calculator")
  35. service := Calculator.NewCalculatorClient(mp)
  36. mp2 := thrift.NewTMultiplexedProtocol(protocol, "WeatherReport")
  37. service2 := WeatherReport.NewWeatherReportClient(mp2)
  38. err := transport.Open()
  39. if err != nil {
  40. t.Fatal("Unable to open client socket", err)
  41. }
  42. fmt.Println(service.Add(2,2))
  43. fmt.Println(service2.GetTemperature())
  44. */
  45. type TMultiplexedProtocol struct {
  46. TProtocol
  47. serviceName string
  48. }
  49. const MULTIPLEXED_SEPARATOR = ":"
  50. func NewTMultiplexedProtocol(protocol TProtocol, serviceName string) *TMultiplexedProtocol {
  51. return &TMultiplexedProtocol{
  52. TProtocol: protocol,
  53. serviceName: serviceName,
  54. }
  55. }
  56. func (t *TMultiplexedProtocol) WriteMessageBegin(name string, typeId TMessageType, seqid int32) error {
  57. if typeId == CALL || typeId == ONEWAY {
  58. return t.TProtocol.WriteMessageBegin(t.serviceName+MULTIPLEXED_SEPARATOR+name, typeId, seqid)
  59. } else {
  60. return t.TProtocol.WriteMessageBegin(name, typeId, seqid)
  61. }
  62. }
  63. /*
  64. TMultiplexedProcessor is a TProcessor allowing
  65. a single TServer to provide multiple services.
  66. To do so, you instantiate the processor and then register additional
  67. processors with it, as shown in the following example:
  68. var processor = thrift.NewTMultiplexedProcessor()
  69. firstProcessor :=
  70. processor.RegisterProcessor("FirstService", firstProcessor)
  71. processor.registerProcessor(
  72. "Calculator",
  73. Calculator.NewCalculatorProcessor(&CalculatorHandler{}),
  74. )
  75. processor.registerProcessor(
  76. "WeatherReport",
  77. WeatherReport.NewWeatherReportProcessor(&WeatherReportHandler{}),
  78. )
  79. serverTransport, err := thrift.NewTServerSocketTimeout(addr, TIMEOUT)
  80. if err != nil {
  81. t.Fatal("Unable to create server socket", err)
  82. }
  83. server := thrift.NewTSimpleServer2(processor, serverTransport)
  84. server.Serve();
  85. */
  86. type TMultiplexedProcessor struct {
  87. serviceProcessorMap map[string]TProcessor
  88. DefaultProcessor TProcessor
  89. }
  90. func NewTMultiplexedProcessor() *TMultiplexedProcessor {
  91. return &TMultiplexedProcessor{
  92. serviceProcessorMap: make(map[string]TProcessor),
  93. }
  94. }
  95. func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) {
  96. t.DefaultProcessor = processor
  97. }
  98. func (t *TMultiplexedProcessor) RegisterProcessor(name string, processor TProcessor) {
  99. if t.serviceProcessorMap == nil {
  100. t.serviceProcessorMap = make(map[string]TProcessor)
  101. }
  102. t.serviceProcessorMap[name] = processor
  103. }
  104. func (t *TMultiplexedProcessor) Process(in, out TProtocol) (bool, TException) {
  105. name, typeId, seqid, err := in.ReadMessageBegin()
  106. if err != nil {
  107. return false, err
  108. }
  109. if typeId != CALL && typeId != ONEWAY {
  110. return false, fmt.Errorf("Unexpected message type %v", typeId)
  111. }
  112. //extract the service name
  113. v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2)
  114. if len(v) != 2 {
  115. if t.DefaultProcessor != nil {
  116. smb := NewStoredMessageProtocol(in, name, typeId, seqid)
  117. return t.DefaultProcessor.Process(smb, out)
  118. }
  119. return false, fmt.Errorf("Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name)
  120. }
  121. actualProcessor, ok := t.serviceProcessorMap[v[0]]
  122. if !ok {
  123. return false, fmt.Errorf("Service name not found: %s. Did you forget to call registerProcessor()?", v[0])
  124. }
  125. smb := NewStoredMessageProtocol(in, v[1], typeId, seqid)
  126. return actualProcessor.Process(smb, out)
  127. }
  128. //Protocol that use stored message for ReadMessageBegin
  129. type storedMessageProtocol struct {
  130. TProtocol
  131. name string
  132. typeId TMessageType
  133. seqid int32
  134. }
  135. func NewStoredMessageProtocol(protocol TProtocol, name string, typeId TMessageType, seqid int32) *storedMessageProtocol {
  136. return &storedMessageProtocol{protocol, name, typeId, seqid}
  137. }
  138. func (s *storedMessageProtocol) ReadMessageBegin() (name string, typeId TMessageType, seqid int32, err error) {
  139. return s.name, s.typeId, s.seqid, nil
  140. }