flowmgr.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. //@File flowmgr.go
  2. //@Time 2022/5/30
  3. //@Author #Suyghur,
  4. package mgr
  5. import (
  6. "context"
  7. treemap "github.com/liyue201/gostl/ds/map"
  8. "github.com/zeromicro/go-zero/core/logx"
  9. "sync"
  10. "time"
  11. "ylink/comm/ctxdata"
  12. "ylink/comm/result"
  13. "ylink/comm/trace"
  14. "ylink/core/inner/rpc/inner"
  15. "ylink/flowsrv/rpc/internal/model"
  16. "ylink/flowsrv/rpc/pb"
  17. )
  18. type flowManager struct {
  19. flowMap *treemap.Map
  20. }
  21. var (
  22. instance *flowManager
  23. once sync.Once
  24. )
  25. func GetFlowMgrInstance() *flowManager {
  26. once.Do(func() {
  27. instance = &flowManager{
  28. flowMap: treemap.New(treemap.WithGoroutineSafe()),
  29. }
  30. })
  31. return instance
  32. }
  33. func (manager *flowManager) Register(flow *model.Flow) {
  34. //go registerWorker(flow)
  35. go manager.registerFlow(flow)
  36. manager.flowMap.Insert(flow.FlowId, flow)
  37. }
  38. func (manager *flowManager) registerFlow(flow *model.Flow) {
  39. go manager.subscribeRmq(flow)
  40. for {
  41. select {
  42. case <-flow.Stream.Context().Done():
  43. if manager.Has(flow.FlowId) {
  44. flow.Logger.Infof("stream was disconnected abnormally")
  45. manager.UnRegister(flow.FlowId)
  46. manager.handleUserOffline(flow)
  47. }
  48. flow.EndFlow <- 1
  49. return
  50. case msg, open := <-flow.Message:
  51. if open {
  52. flow.Stream.Send(&pb.CommandResp{
  53. Code: result.Ok,
  54. Msg: "success",
  55. Data: []byte(msg),
  56. })
  57. } else {
  58. flow.Logger.Error("message channel is close")
  59. return
  60. }
  61. }
  62. }
  63. }
  64. func (manager *flowManager) subscribeRmq(flow *model.Flow) {
  65. traceId := ctxdata.GetTraceIdFromCtx(flow.Stream.Context())
  66. trace.RunOnTracing(traceId, func(ctx context.Context) {
  67. for {
  68. select {
  69. case <-flow.Stream.Context().Done():
  70. logx.WithContext(ctx).Infof("unsubscribe rmq...")
  71. return
  72. default:
  73. resultCmd := flow.SvcCtx.RedisClient.BRPop(ctx, 30*time.Second, flow.FlowId)
  74. if message, err := resultCmd.Result(); err != nil {
  75. logx.WithContext(ctx).Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
  76. } else {
  77. trace.StartTrace(ctx, "FlowsrvServer.flowmgr.handleRmqMessage", func(ctx context.Context) {
  78. flow.Message <- message[1]
  79. })
  80. }
  81. }
  82. }
  83. })
  84. }
  85. func (manager *flowManager) handleUserOffline(flow *model.Flow) {
  86. traceId := ctxdata.GetTraceIdFromCtx(flow.Stream.Context())
  87. trace.RunOnTracing(traceId, func(ctx context.Context) {
  88. trace.StartTrace(ctx, "FlowsrvServer.flowmgr.handleUserOffline", func(ctx context.Context) {
  89. _, err := flow.SvcCtx.InnerRpc.NotifyUserOffline(ctx, &inner.NotifyUserStatusReq{
  90. Type: flow.Type,
  91. Uid: flow.Uid,
  92. GameId: flow.GameId,
  93. })
  94. if err != nil {
  95. logx.WithContext(ctx).Errorf("notify user offline has some error: %v", err)
  96. }
  97. })
  98. })
  99. }
  100. func (manager *flowManager) Get(flowId string) *model.Flow {
  101. return manager.flowMap.Get(flowId).(*model.Flow)
  102. }
  103. func (manager *flowManager) UnRegister(flowId string) {
  104. if manager.flowMap.Contains(flowId) {
  105. flow := manager.Get(flowId)
  106. close(flow.Message)
  107. manager.flowMap.Erase(flowId)
  108. }
  109. }
  110. func (manager *flowManager) Has(flowId string) bool {
  111. return manager.flowMap.Contains(flowId)
  112. }