flowmgr.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. //@File flowmgr.go
  2. //@Time 2022/5/30
  3. //@Author #Suyghur,
  4. package mgr
  5. import (
  6. treemap "github.com/liyue201/gostl/ds/map"
  7. "sync"
  8. "time"
  9. "ylink/comm/result"
  10. "ylink/core/inner/rpc/inner"
  11. "ylink/flowsrv/rpc/internal/model"
  12. "ylink/flowsrv/rpc/pb"
  13. )
  14. type flowManager struct {
  15. flowMap *treemap.Map
  16. }
  17. var (
  18. instance *flowManager
  19. once sync.Once
  20. )
  21. func GetFlowMgrInstance() *flowManager {
  22. once.Do(func() {
  23. instance = &flowManager{
  24. flowMap: treemap.New(treemap.WithGoroutineSafe()),
  25. }
  26. })
  27. return instance
  28. }
  29. func (manager *flowManager) Register(flow *model.Flow) {
  30. //go registerWorker(flow)
  31. go manager.registerFlow(flow)
  32. manager.flowMap.Insert(flow.FlowId, flow)
  33. }
  34. func (manager *flowManager) registerFlow(flow *model.Flow) {
  35. go manager.subscribeRmq(flow)
  36. for {
  37. select {
  38. case <-flow.Stream.Context().Done():
  39. if manager.Has(flow.FlowId) {
  40. flow.Logger.Infof("stream was disconnected abnormally")
  41. manager.UnRegister(flow.FlowId)
  42. flow.SvcCtx.InnerRpc.NotifyUserOffline(flow.Ctx, &inner.NotifyUserStatusReq{
  43. Type: flow.Type,
  44. Uid: flow.Uid,
  45. GameId: flow.GameId,
  46. })
  47. }
  48. flow.EndFlow <- 1
  49. return
  50. case msg, open := <-flow.Message:
  51. if open {
  52. flow.Stream.Send(&pb.CommandResp{
  53. Code: result.Ok,
  54. Msg: "success",
  55. Data: []byte(msg),
  56. })
  57. } else {
  58. flow.Logger.Error("message channel is close")
  59. return
  60. }
  61. }
  62. }
  63. }
  64. func (manager *flowManager) subscribeRmq(flow *model.Flow) {
  65. for {
  66. select {
  67. case <-flow.Stream.Context().Done():
  68. flow.Logger.Infof("unsubscribe rmq...")
  69. return
  70. default:
  71. resultCmd := flow.SvcCtx.RedisClient.BRPop(flow.Ctx, 10*time.Second, flow.FlowId)
  72. if message, err := resultCmd.Result(); err != nil {
  73. flow.Logger.Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
  74. } else {
  75. flow.Message <- message[1]
  76. }
  77. }
  78. }
  79. }
  80. func (manager *flowManager) Get(flowId string) *model.Flow {
  81. return manager.flowMap.Get(flowId).(*model.Flow)
  82. }
  83. func (manager *flowManager) UnRegister(flowId string) {
  84. if manager.flowMap.Contains(flowId) {
  85. flow := manager.Get(flowId)
  86. close(flow.Message)
  87. manager.flowMap.Erase(flowId)
  88. }
  89. }
  90. func (manager *flowManager) Has(flowId string) bool {
  91. return manager.flowMap.Contains(flowId)
  92. }