5
0

2 Коміти 5ccf2d65fa ... 9cef8fd9d8

Автор SHA1 Опис Дата
  #Suyghur 9cef8fd9d8 v0.0.1开发:flowsrv消息推送开发 2 роки тому
  #Suyghur fad86a2656 v0.0.1开发:调整flowsrv中consumer的初始化位置 2 роки тому

+ 9 - 9
comm/model/message.go

@@ -5,16 +5,16 @@
 package model
 
 type KqMessage struct {
-	CreateTime  string `json:"create_time"`
-	Content     string `json:"content"`
-	Pic         string `json:"pic"`
-	ReceiverId  string `json:"receiver_id"`
-	SenderId    string `json:"sender_id"`
-	GameId      string `json:"game_id"`
-	OperationId string `json:"operation_id"`
+	CreateTime string `json:"create_time"`
+	Content    string `json:"content"`
+	Pic        string `json:"pic"`
+	ReceiverId string `json:"receiver_id"`
+	SenderId   string `json:"sender_id"`
+	GameId     string `json:"game_id"`
+	Ext        string `json:"ext"`
 }
 
 type KqCmdMessage struct {
-	KqMessage
-	Opt int64 `json:"opt"`
+	Opt int64  `json:"opt"`
+	Ext string `json:"ext"`
 }

+ 6 - 9
core/cmd/rpc/internal/logic/playersendmsglogic.go

@@ -4,7 +4,6 @@ import (
 	"context"
 	"encoding/json"
 	"time"
-	"ylink/comm/ctxdata"
 	"ylink/comm/model"
 	"ylink/core/cmd/rpc/internal/svc"
 	"ylink/core/cmd/rpc/pb"
@@ -28,15 +27,13 @@ func NewPlayerSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Pla
 
 func (l *PlayerSendMsgLogic) PlayerSendMsg(in *pb.PlayerSendMsgReq) (*pb.PlayerSendMsgResp, error) {
 	// 投递到自己的发件箱
-	operationId := ctxdata.GetTraceIdFromCtx(l.ctx)
 	msg, _ := json.Marshal(model.KqMessage{
-		CreateTime:  time.Now().Format("2006-01-02 15:04:05"),
-		Content:     in.Content,
-		Pic:         in.Pic,
-		ReceiverId:  "",
-		SenderId:    in.PlayerId,
-		GameId:      in.GameId,
-		OperationId: operationId,
+		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
+		Content:    in.Content,
+		Pic:        in.Pic,
+		ReceiverId: "",
+		SenderId:   in.PlayerId,
+		GameId:     in.GameId,
 	})
 	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, string(msg), in.PlayerId)
 	if err != nil {

+ 44 - 41
core/inner/rpc/internal/svc/servicecontext.go

@@ -71,12 +71,44 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 		logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
 		trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
 			if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
-				// 玩家发的消息
-				p2cMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map)
-				message.ReceiverId = p2cMap.Get(message.SenderId).(string)
-				logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
-				kMsg, _ := json.Marshal(message)
-				s.KqMsgBoxProducer.SendMessage(ctx, string(kMsg), message.ReceiverId)
+				// 玩家发的消息,先从connMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
+				if ext.GameConnMap.Contains(message.GameId) {
+					// 先从connMap找对应的客服映射
+					if playerConnMap := ext.GameConnMap.Get(message.GameId).(*treemap.Map); playerConnMap.Contains(message.SenderId) {
+						message.ReceiverId = playerConnMap.Get(message.SenderId).(string)
+					} else {
+						if ext.GameVipMap.Contains(message.GameId) {
+							// 从vipMap里面找
+							if playerVipMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map); playerVipMap.Contains(message.SenderId) {
+								message.ReceiverId = playerVipMap.Get(message.SenderId).(string)
+							} else {
+								message.ReceiverId = ""
+							}
+						} else {
+							message.ReceiverId = ""
+						}
+					}
+				} else {
+					if ext.GameVipMap.Contains(message.GameId) {
+						// 从vipMap里面找
+						if playerVipMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map); playerVipMap.Contains(message.SenderId) {
+							message.ReceiverId = playerVipMap.Get(message.SenderId).(string)
+						} else {
+							message.ReceiverId = ""
+						}
+					} else {
+						message.ReceiverId = ""
+					}
+				}
+
+				// 经过填补后receiver_id还是空的则有异常,丢弃信息不投递
+				if len(message.ReceiverId) != 0 && message.ReceiverId != "" {
+					logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
+					kMsg, _ := json.Marshal(message)
+					s.KqMsgBoxProducer.SendMessage(ctx, string(kMsg), message.ReceiverId)
+				} else {
+					logx.WithContext(ctx).Errorf("can not find receiver of the sender")
+				}
 			} else {
 				s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
 			}
@@ -93,39 +125,10 @@ func fetchCsCenterInfo() {
 	// mock info
 	ext.Game2PlayerStatusMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.GameConnMap = treemap.New(treemap.WithGoroutineSafe())
+	ext.CsInfoMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.WaitingQueue = simplelist.New()
 	mockInfo()
 }
-
-func loadGameList() {
-
-}
-
-func loadCsInfo() {
-	ext.CsInfoMap = treemap.New(treemap.WithGoroutineSafe())
-	ext.CsInfoMap.Insert("cs_1231", &model.CsInfo{
-		CsId:         "cs_1231",
-		CsNickname:   "客服1231",
-		CsAvatarUrl:  "https://www.baidu.com",
-		CsSignature:  "我是客服1231",
-		OnlineStatus: 0,
-	})
-	ext.CsInfoMap.Insert("cs_1111", &model.CsInfo{
-		CsId:         "cs_1111",
-		CsNickname:   "客服1111",
-		CsAvatarUrl:  "https://www.baidu.com",
-		CsSignature:  "我是客服1111",
-		OnlineStatus: 0,
-	})
-	ext.CsInfoMap.Insert("cs_2222", &model.CsInfo{
-		CsId:         "cs_2222",
-		CsNickname:   "客服2222",
-		CsAvatarUrl:  "https://www.baidu.com",
-		CsSignature:  "我是客服2222",
-		OnlineStatus: 0,
-	})
-}
-
 func mockInfo() {
 	ext.GameVipMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.CsInfoMap = treemap.New(treemap.WithGoroutineSafe())
@@ -134,14 +137,14 @@ func mockInfo() {
 
 	// 专属客服映射
 	game1231P2cMap := treemap.New(treemap.WithGoroutineSafe())
-	game1231P2cMap.Insert("player1231", "cs_1231")
-	game1231P2cMap.Insert("player1111", "cs_2222")
+	game1231P2cMap.Insert("player_1231", "cs_1231")
+	game1231P2cMap.Insert("player_1111", "cs_2222")
 
 	game1111P2cMap := treemap.New(treemap.WithGoroutineSafe())
-	game1111P2cMap.Insert("player1231", "cs_1111")
+	game1111P2cMap.Insert("player_1231", "cs_1111")
 
-	ext.GameVipMap.Insert("game1231", game1231P2cMap)
-	ext.GameVipMap.Insert("game1111", game1111P2cMap)
+	ext.GameVipMap.Insert("game_1231", game1231P2cMap)
+	ext.GameVipMap.Insert("game_1111", game1111P2cMap)
 
 	ext.CsInfoMap.Insert("cs_1231", &model.CsInfo{
 		CsId:         "cs_1231",

+ 5 - 0
flowsrv/rpc/etc/flowsrv.yaml

@@ -23,6 +23,11 @@ KqMsgBoxConsumerConf:
   Topic: recv-box-topic
   GroupId: flowsrv-api
 
+Redis:
+  Host: 127.0.0.1:6379
+  Type: node
+  Pass: ylink
+
 JwtAuth:
   AccessSecret: ylink2022
   AccessExpire: 604800

+ 11 - 0
flowsrv/rpc/internal/ext/global.go

@@ -0,0 +1,11 @@
+//@File     global.go
+//@Time     2022/05/20
+//@Author   #Suyghur,
+
+package ext
+
+import treemap "github.com/liyue201/gostl/ds/map"
+
+var (
+	FlowMap *treemap.Map
+)

+ 0 - 5
flowsrv/rpc/internal/ext/recvboxhandler.go

@@ -1,5 +0,0 @@
-//@File     recvboxhandler.go
-//@Time     2022/05/13
-//@Author   #Suyghur,
-
-package ext

+ 53 - 7
flowsrv/rpc/internal/logic/connectlogic.go

@@ -9,7 +9,7 @@ import (
 	"ylink/comm/result"
 	"ylink/core/inner/rpc/inner"
 	"ylink/flowsrv/rpc/internal/mgr"
-
+	"ylink/flowsrv/rpc/internal/model"
 	"ylink/flowsrv/rpc/internal/svc"
 	"ylink/flowsrv/rpc/pb"
 
@@ -52,13 +52,59 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 		})
 	}
 
-	mgr.GetFlowMgrInstance().SetFlow(uid, stream)
+	flow := &model.Flow{
+		EndFlow: make(chan int),
+		Message: make(chan string),
+		Stream:  stream,
+		Ctx:     l.ctx,
+		SvcCtx:  l.svcCtx,
+		Logger:  l.Logger,
+		User: &model.User{
+			Type:   in.Type,
+			Uid:    uid,
+			GameId: gameId,
+		},
+	}
+	defer func() {
+		close(flow.EndFlow)
+		flow = nil
+	}()
 
-	return stream.Send(&pb.CommandResp{
-		Code: result.Ok,
-		Msg:  "success",
-		Data: nil,
-	})
+	mgr.GetFlowMgrInstance().Register(flow)
+
+	//go func() {
+	//	for {
+	//		select {
+	//		case <-stream.Context().Done():
+	//			if mgr.GetFlowMgrInstance().Has(uid) {
+	//				l.Logger.Infof("flowstream was disconnected abnormally")
+	//				mgr.GetFlowMgrInstance().UnRegister(uid)
+	//				_, err = l.svcCtx.InnerRpc.NotifyUserOffline(l.ctx, &inner.NotifyUserStatusReq{
+	//					Type:   in.Type,
+	//					Uid:    uid,
+	//					GameId: gameId,
+	//				})
+	//			}
+	//			flow.EndFlow <- 1
+	//			return
+	//		case msg, open := <-flow.Message:
+	//			if open {
+	//				stream.Send(&pb.CommandResp{
+	//					Code: result.Ok,
+	//					Msg:  "success",
+	//					Data: []byte(msg),
+	//				})
+	//			} else {
+	//				l.Logger.Error("message channel is close")
+	//				return
+	//			}
+	//		}
+	//	}
+	//}()
+
+	<-flow.EndFlow
+	l.Logger.Infof("end flow")
+	return nil
 }
 
 func (l *ConnectLogic) checkAuth(in *pb.CommandReq) (string, string, error) {

+ 1 - 2
flowsrv/rpc/internal/logic/disconnectlogic.go

@@ -9,7 +9,6 @@ import (
 	"ylink/comm/result"
 	"ylink/core/inner/rpc/inner"
 	"ylink/flowsrv/rpc/internal/mgr"
-
 	"ylink/flowsrv/rpc/internal/svc"
 	"ylink/flowsrv/rpc/pb"
 
@@ -52,7 +51,7 @@ func (l *DisconnectLogic) Disconnect(in *pb.CommandReq) (*pb.CommandResp, error)
 		}, err
 	}
 
-	mgr.GetFlowMgrInstance().RemoveFlow(uid)
+	mgr.GetFlowMgrInstance().UnRegister(uid)
 
 	return &pb.CommandResp{
 		Code: result.Ok,

+ 73 - 9
flowsrv/rpc/internal/mgr/flowmgr.go

@@ -1,16 +1,21 @@
 //@File     flowmgr.go
-//@Time     2022/05/13
+//@Time     2022/5/30
 //@Author   #Suyghur,
 
 package mgr
 
 import (
+	treemap "github.com/liyue201/gostl/ds/map"
 	"sync"
+	"time"
+	"ylink/comm/result"
+	"ylink/core/inner/rpc/inner"
+	"ylink/flowsrv/rpc/internal/model"
 	"ylink/flowsrv/rpc/pb"
 )
 
 type flowManager struct {
-	flowMap map[string]pb.Flowsrv_ConnectServer
+	flowMap *treemap.Map
 }
 
 var (
@@ -21,20 +26,79 @@ var (
 func GetFlowMgrInstance() *flowManager {
 	once.Do(func() {
 		instance = &flowManager{
-			flowMap: make(map[string]pb.Flowsrv_ConnectServer),
+			flowMap: treemap.New(treemap.WithGoroutineSafe()),
 		}
 	})
 	return instance
 }
 
-func (manager *flowManager) SetFlow(uid string, flow pb.Flowsrv_ConnectServer) {
-	manager.flowMap[uid] = flow
+func (manager *flowManager) Register(flow *model.Flow) {
+	//go registerWorker(flow)
+	go manager.registerFlow(flow)
+	manager.flowMap.Insert(flow.User.Uid, flow)
 }
 
-func (manager *flowManager) GetFlow(uid string) pb.Flowsrv_ConnectServer {
-	return manager.flowMap[uid]
+func (manager *flowManager) registerFlow(flow *model.Flow) {
+	go manager.subscribeRmq(flow)
+	for {
+		select {
+		case <-flow.Stream.Context().Done():
+			if manager.Has(flow.User.Uid) {
+				flow.Logger.Infof("flowstream was disconnected abnormally")
+				manager.UnRegister(flow.User.Uid)
+				flow.SvcCtx.InnerRpc.NotifyUserOffline(flow.Ctx, &inner.NotifyUserStatusReq{
+					Type:   flow.User.Type,
+					Uid:    flow.User.Uid,
+					GameId: flow.User.GameId,
+				})
+			}
+			flow.EndFlow <- 1
+			return
+		case msg, open := <-flow.Message:
+			if open {
+				flow.Stream.Send(&pb.CommandResp{
+					Code: result.Ok,
+					Msg:  "success",
+					Data: []byte(msg),
+				})
+			} else {
+				flow.Logger.Error("message channel is close")
+				return
+			}
+		}
+	}
+}
+
+func (manager *flowManager) subscribeRmq(flow *model.Flow) {
+	for {
+		select {
+		case <-flow.Stream.Context().Done():
+			flow.Logger.Infof("unsubscribe rmq...")
+			return
+		default:
+			resultCmd := flow.SvcCtx.RedisClient.BRPop(flow.Ctx, 10*time.Second, flow.User.Uid)
+			if message, err := resultCmd.Result(); err != nil {
+				flow.Logger.Errorf("get message from redis err: %v", err)
+			} else {
+				flow.Message <- message[1]
+			}
+		}
+	}
+}
+
+func (manager *flowManager) Get(uid string) *model.Flow {
+	return manager.flowMap.Get(uid).(*model.Flow)
+}
+
+func (manager *flowManager) UnRegister(uid string) {
+	if manager.flowMap.Contains(uid) {
+		flow := manager.Get(uid)
+		close(flow.Message)
+		//flow.EndRmq <- 0
+		manager.flowMap.Erase(uid)
+	}
 }
 
-func (manager *flowManager) RemoveFlow(uid string) {
-	delete(manager.flowMap, uid)
+func (manager *flowManager) Has(uid string) bool {
+	return manager.flowMap.Contains(uid)
 }

+ 28 - 0
flowsrv/rpc/internal/model/bean.go

@@ -0,0 +1,28 @@
+//@File     bean.go
+//@Time     2022/6/1
+//@Author   #Suyghur,
+
+package model
+
+import (
+	"context"
+	"github.com/zeromicro/go-zero/core/logx"
+	"ylink/flowsrv/rpc/internal/svc"
+	"ylink/flowsrv/rpc/pb"
+)
+
+type User struct {
+	Type   int64
+	Uid    string
+	GameId string
+}
+
+type Flow struct {
+	EndFlow chan int
+	Message chan string
+	Ctx     context.Context
+	SvcCtx  *svc.ServiceContext
+	Logger  logx.Logger
+	Stream  pb.Flowsrv_ConnectServer
+	User    *User
+}

+ 1 - 109
flowsrv/rpc/internal/server/flowsrvserver.go

@@ -5,19 +5,6 @@ package server
 
 import (
 	"context"
-	"encoding/json"
-	"github.com/Shopify/sarama"
-	"github.com/zeromicro/go-zero/core/logx"
-	gozerotrace "github.com/zeromicro/go-zero/core/trace"
-	"go.opentelemetry.io/otel"
-	"go.opentelemetry.io/otel/attribute"
-	"go.opentelemetry.io/otel/propagation"
-	oteltrace "go.opentelemetry.io/otel/trace"
-	"net/http"
-	"ylink/comm/kafka"
-	"ylink/comm/model"
-	"ylink/comm/trace"
-	"ylink/comm/utils"
 
 	"ylink/flowsrv/rpc/internal/logic"
 	"ylink/flowsrv/rpc/internal/svc"
@@ -27,23 +14,12 @@ import (
 type FlowsrvServer struct {
 	svcCtx *svc.ServiceContext
 	pb.UnimplementedFlowsrvServer
-	ConsumerGroup *kafka.ConsumerGroup
 }
 
 func NewFlowsrvServer(svcCtx *svc.ServiceContext) *FlowsrvServer {
-	server := &FlowsrvServer{
+	return &FlowsrvServer{
 		svcCtx: svcCtx,
-		ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
-			KafkaVersion:   sarama.V1_0_0_0,
-			OffsetsInitial: sarama.OffsetNewest,
-			IsReturnErr:    false,
-		},
-			svcCtx.Config.KqMsgBoxConsumerConf.Brokers,
-			[]string{svcCtx.Config.KqMsgBoxConsumerConf.Topic},
-			svcCtx.Config.KqMsgBoxConsumerConf.GroupId),
 	}
-	server.subscribe()
-	return server
 }
 
 func (s *FlowsrvServer) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServer) error {
@@ -55,87 +31,3 @@ func (s *FlowsrvServer) Disconnect(ctx context.Context, in *pb.CommandReq) (*pb.
 	l := logic.NewDisconnectLogic(ctx, s.svcCtx)
 	return l.Disconnect(in)
 }
-
-func (s *FlowsrvServer) Setup(_ sarama.ConsumerGroupSession) error {
-	return nil
-}
-
-func (s *FlowsrvServer) Cleanup(_ sarama.ConsumerGroupSession) error {
-	return nil
-}
-
-func (s *FlowsrvServer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
-	for msg := range claim.Messages() {
-		if msg.Topic == s.svcCtx.Config.KqMsgBoxConsumerConf.Topic {
-			s.handleMessage(sess, msg)
-		}
-	}
-	return nil
-}
-
-func (s *FlowsrvServer) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
-	propagator := otel.GetTextMapPropagator()
-	tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
-	ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
-	spanName := utils.CallerFuncName()
-	spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
-	defer span.End()
-	propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
-	callback(spanCtx)
-}
-
-func (s *FlowsrvServer) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
-	traceId := kafka.GetTraceFromHeader(msg.Headers)
-	if len(traceId) == 0 {
-		return
-	}
-	trace.RunOnTracing(traceId, func(ctx context.Context) {
-		var message model.KqMessage
-		if err := json.Unmarshal(msg.Value, &message); err != nil {
-			logx.Errorf("unmarshal msg error: %v", err)
-			return
-		}
-		trace.StartTrace(ctx, "FlowsrvServer.handleMessage.SendMessage", func(ctx context.Context) {
-			//if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
-			//	// 玩家发的消息
-			//	p2cMap := ext.IdMap.Get(message.GameId).(*treemap.Map)
-			//	message.ReceiverId = p2cMap.Get(message.SenderId).(string)
-			//	logx.Infof("receiver: %s", message.ReceiverId)
-			//	b, _ := json.Marshal(message)
-			//	s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(b), message.ReceiverId)
-			//} else {
-			//	s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
-			//}
-			logx.WithContext(ctx).Infof("headers: %v", msg.Headers)
-			logx.WithContext(ctx).Infof("traceId: %s", msg.Headers[0].Value)
-			logx.WithContext(ctx).Infof("flowsrv recv message: %v", string(msg.Value))
-			sess.MarkMessage(msg, "")
-		}, attribute.String("msg.key", string(msg.Key)))
-	})
-
-	//s.runWithCtx(func(ctx context.Context) {
-	//	var message model.KqCmdMessage
-	//	if err := json.Unmarshal(msg.Value, &message); err != nil {
-	//		logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
-	//		return
-	//	}
-	//	trace.StartTrace(ctx, "FlowsrvServer.handleMessage.SendMessage", func(ctx context.Context) {
-	//		//if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
-	//		//	// 玩家发的消息
-	//		//	p2cMap := ext.IdMap.Get(message.GameId).(*treemap.Map)
-	//		//	message.ReceiverId = p2cMap.Get(message.SenderId).(string)
-	//		//	logx.Infof("receiver: %s", message.ReceiverId)
-	//		//	b, _ := json.Marshal(message)
-	//		//	s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(b), message.ReceiverId)
-	//		//} else {
-	//		//	s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
-	//		//}
-	//		logx.WithContext(ctx).Infof("flowsrv recv message: %v", string(msg.Value))
-	//		sess.MarkMessage(msg, "")
-	//	}, attribute.String("msg.key", string(msg.Key)))
-	//})
-}
-
-func (s *FlowsrvServer) subscribe() {
-	go s.ConsumerGroup.RegisterHandleAndConsumer(s)
-}

+ 98 - 3
flowsrv/rpc/internal/svc/servicecontext.go

@@ -1,19 +1,114 @@
 package svc
 
 import (
+	"context"
+	"encoding/json"
+	"github.com/Shopify/sarama"
+	"github.com/go-redis/redis/v8"
+	"github.com/zeromicro/go-zero/core/logx"
+	gozerotrace "github.com/zeromicro/go-zero/core/trace"
 	"github.com/zeromicro/go-zero/zrpc"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/propagation"
+	oteltrace "go.opentelemetry.io/otel/trace"
+	"net/http"
+	"ylink/comm/kafka"
+	"ylink/comm/model"
+	"ylink/comm/trace"
+	"ylink/comm/utils"
 	"ylink/core/inner/rpc/inner"
 	"ylink/flowsrv/rpc/internal/config"
 )
 
 type ServiceContext struct {
-	Config   config.Config
-	InnerRpc inner.Inner
+	Config        config.Config
+	InnerRpc      inner.Inner
+	ConsumerGroup *kafka.ConsumerGroup
+	RedisClient   *redis.Client
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
-	return &ServiceContext{
+	svcCtx := &ServiceContext{
 		Config:   c,
 		InnerRpc: inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
+
+		RedisClient: redis.NewClient(&redis.Options{
+			Addr:     c.Redis.Host,
+			Password: c.Redis.Pass,
+		}),
+		//RedisClient: redis.New(c.Redis.Host, func(r *redis.Redis) {
+		//	r.Type = c.Redis.Type
+		//	r.Pass = c.Redis.Pass
+		//}),
+		ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
+			KafkaVersion:   sarama.V1_0_0_0,
+			OffsetsInitial: sarama.OffsetNewest,
+			IsReturnErr:    false,
+		},
+			c.KqMsgBoxConsumerConf.Brokers,
+			[]string{c.KqMsgBoxConsumerConf.Topic},
+			c.KqMsgBoxConsumerConf.GroupId),
+	}
+	go svcCtx.subscribe()
+	return svcCtx
+}
+
+func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	for msg := range claim.Messages() {
+		if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
+			s.handleMessage(sess, msg)
+		}
 	}
+	return nil
+}
+
+func (s *ServiceContext) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
+	propagator := otel.GetTextMapPropagator()
+	tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
+	ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
+	spanName := utils.CallerFuncName()
+	spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
+	defer span.End()
+	propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
+	callback(spanCtx)
+}
+
+func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
+	traceId := kafka.GetTraceFromHeader(msg.Headers)
+	if len(traceId) == 0 {
+		return
+	}
+	trace.RunOnTracing(traceId, func(ctx context.Context) {
+		var message model.KqMessage
+		if err := json.Unmarshal(msg.Value, &message); err != nil {
+			logx.Errorf("unmarshal msg error: %v", err)
+			return
+		}
+		trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
+			// 投递到receiver_id对应的redis队列暂存
+			intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
+			if size, err := intCmd.Result(); err != nil {
+				logx.WithContext(ctx).Errorf("push message rmq err %v", err)
+			} else {
+				logx.WithContext(ctx).Infof("current rmq size: %d", size)
+			}
+			//if _, err := s.RedisClient.(message.ReceiverId, string(msg.Value)); err != nil {
+			//	logx.WithContext(ctx).Errorf("push message err %v", err)
+			//}
+			sess.MarkMessage(msg, "")
+		}, attribute.String("msg.key", string(msg.Key)))
+	})
+}
+
+func (s *ServiceContext) subscribe() {
+	go s.ConsumerGroup.RegisterHandleAndConsumer(s)
 }

+ 27 - 31
flowsrv/rpc/pb/flowsrv.pb.go

@@ -9,7 +9,7 @@ package pb
 import (
 	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
 	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
-	structpb "google.golang.org/protobuf/types/known/structpb"
+	_ "google.golang.org/protobuf/types/known/structpb"
 	reflect "reflect"
 	sync "sync"
 )
@@ -81,9 +81,9 @@ type CommandResp struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Code int64            `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
-	Msg  string           `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"`
-	Data *structpb.Struct `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
+	Code int64  `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
+	Msg  string `protobuf:"bytes,2,opt,name=msg,proto3" json:"msg,omitempty"`
+	Data []byte `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
 }
 
 func (x *CommandResp) Reset() {
@@ -132,7 +132,7 @@ func (x *CommandResp) GetMsg() string {
 	return ""
 }
 
-func (x *CommandResp) GetData() *structpb.Struct {
+func (x *CommandResp) GetData() []byte {
 	if x != nil {
 		return x.Data
 	}
@@ -149,21 +149,19 @@ var file_pb_flowsrv_proto_rawDesc = []byte{
 	0x65, 0x71, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03,
 	0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73,
 	0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63,
-	0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x60, 0x0a, 0x0b, 0x43, 0x6f, 0x6d,
+	0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x47, 0x0a, 0x0b, 0x43, 0x6f, 0x6d,
 	0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65,
 	0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03,
-	0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x12, 0x2b,
-	0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67,
-	0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53,
-	0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x66, 0x0a, 0x07, 0x46,
-	0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x12, 0x2c, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63,
-	0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65,
-	0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65,
-	0x73, 0x70, 0x30, 0x01, 0x12, 0x2d, 0x0a, 0x0a, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65,
-	0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
-	0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
-	0x65, 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f,
-	0x74, 0x6f, 0x33,
+	0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x12, 0x12,
+	0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61,
+	0x74, 0x61, 0x32, 0x66, 0x0a, 0x07, 0x46, 0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x12, 0x2c, 0x0a,
+	0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f,
+	0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f,
+	0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x30, 0x01, 0x12, 0x2d, 0x0a, 0x0a, 0x64,
+	0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43,
+	0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43,
+	0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f,
+	0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (
@@ -180,21 +178,19 @@ func file_pb_flowsrv_proto_rawDescGZIP() []byte {
 
 var file_pb_flowsrv_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
 var file_pb_flowsrv_proto_goTypes = []interface{}{
-	(*CommandReq)(nil),      // 0: pb.CommandReq
-	(*CommandResp)(nil),     // 1: pb.CommandResp
-	(*structpb.Struct)(nil), // 2: google.protobuf.Struct
+	(*CommandReq)(nil),  // 0: pb.CommandReq
+	(*CommandResp)(nil), // 1: pb.CommandResp
 }
 var file_pb_flowsrv_proto_depIdxs = []int32{
-	2, // 0: pb.CommandResp.data:type_name -> google.protobuf.Struct
-	0, // 1: pb.Flowsrv.connect:input_type -> pb.CommandReq
-	0, // 2: pb.Flowsrv.disconnect:input_type -> pb.CommandReq
-	1, // 3: pb.Flowsrv.connect:output_type -> pb.CommandResp
-	1, // 4: pb.Flowsrv.disconnect:output_type -> pb.CommandResp
-	3, // [3:5] is the sub-list for method output_type
-	1, // [1:3] is the sub-list for method input_type
-	1, // [1:1] is the sub-list for extension type_name
-	1, // [1:1] is the sub-list for extension extendee
-	0, // [0:1] is the sub-list for field type_name
+	0, // 0: pb.Flowsrv.connect:input_type -> pb.CommandReq
+	0, // 1: pb.Flowsrv.disconnect:input_type -> pb.CommandReq
+	1, // 2: pb.Flowsrv.connect:output_type -> pb.CommandResp
+	1, // 3: pb.Flowsrv.disconnect:output_type -> pb.CommandResp
+	2, // [2:4] is the sub-list for method output_type
+	0, // [0:2] is the sub-list for method input_type
+	0, // [0:0] is the sub-list for extension type_name
+	0, // [0:0] is the sub-list for extension extendee
+	0, // [0:0] is the sub-list for field type_name
 }
 
 func init() { file_pb_flowsrv_proto_init() }

+ 1 - 1
flowsrv/rpc/pb/flowsrv.proto

@@ -15,7 +15,7 @@ message CommandReq {
 message CommandResp {
   int64 code = 1;
   string  msg = 2;
-  google.protobuf.Struct data = 3;
+  bytes data = 3;
 }
 
 service Flowsrv {