فهرست منبع

v0.0.1开发:普通玩家聊天超时踢下线功能开发

#Suyghur 2 سال پیش
والد
کامیت
869bfcc26b

+ 7 - 2
comm/model/message.go

@@ -4,6 +4,10 @@
 
 package model
 
+const (
+	CMD_CHAT_TIMEOUT = 2001
+)
+
 type KqMessage struct {
 	CreateTime string `json:"create_time"`
 	Content    string `json:"content"`
@@ -11,10 +15,11 @@ type KqMessage struct {
 	ReceiverId string `json:"receiver_id"`
 	SenderId   string `json:"sender_id"`
 	GameId     string `json:"game_id"`
+	Uid        string `json:"uid"`
 	Ext        string `json:"ext"`
 }
 
 type KqCmdMessage struct {
-	Opt int64  `json:"opt"`
-	Ext string `json:"ext"`
+	Opt int64       `json:"opt"`
+	Ext interface{} `json:"ext"`
 }

+ 2 - 6
comm/model/playerinfo.go

@@ -11,10 +11,6 @@ type PlayerInfo struct {
 	CsId       string `json:"cs_id"`
 	ConnectTs  int64  `json:"connect_ts"`
 	LastChatTs int64  `json:"last_chat_ts"`
-}
-
-type PlayerWaitingInfo struct {
-	PlayerId    string `json:"player_id"`
-	GameId      string `json:"game_id"`
-	EnqueueTime int64  `json:"enqueue_time"`
+	EnqueueTs  int64  `json:"enqueue_ts"`
+	DequeueTs  int64  `json:"dequeue_ts"`
 }

+ 18 - 2
core/cmd/rpc/internal/logic/cssendmsglogic.go

@@ -2,6 +2,9 @@ package logic
 
 import (
 	"context"
+	"github.com/bytedance/sonic"
+	"time"
+	"ylink/comm/model"
 
 	"ylink/core/cmd/rpc/internal/svc"
 	"ylink/core/cmd/rpc/pb"
@@ -24,7 +27,20 @@ func NewCsSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CsSendM
 }
 
 func (l *CsSendMsgLogic) CsSendMsg(in *pb.CsSendMsgReq) (*pb.CsSendMsgResp, error) {
-	// todo 投递到对应客服的收件箱
-	// todo 写入db
+	// 投递到自己的发件箱
+	message := &model.KqMessage{
+		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
+		Content:    in.Content,
+		Pic:        in.Pic,
+		ReceiverId: in.GameId + "_" + in.PlayerId,
+		SenderId:   in.CsId,
+		GameId:     in.GameId,
+		Uid:        in.CsId,
+	}
+	kMsg, _ := sonic.MarshalString(message)
+	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, message.SenderId)
+	if err != nil {
+		return nil, err
+	}
 	return &pb.CsSendMsgResp{}, nil
 }

+ 7 - 6
core/cmd/rpc/internal/logic/playersendmsglogic.go

@@ -2,7 +2,7 @@ package logic
 
 import (
 	"context"
-	"encoding/json"
+	"github.com/bytedance/sonic"
 	"time"
 	"ylink/comm/model"
 	"ylink/core/cmd/rpc/internal/svc"
@@ -27,15 +27,16 @@ func NewPlayerSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Pla
 
 func (l *PlayerSendMsgLogic) PlayerSendMsg(in *pb.PlayerSendMsgReq) (*pb.PlayerSendMsgResp, error) {
 	// 投递到自己的发件箱
-	msg, _ := json.Marshal(model.KqMessage{
+	message := &model.KqMessage{
 		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
 		Content:    in.Content,
 		Pic:        in.Pic,
-		ReceiverId: "",
-		SenderId:   in.PlayerId,
+		SenderId:   in.GameId + "_" + in.PlayerId,
 		GameId:     in.GameId,
-	})
-	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, string(msg), in.PlayerId)
+		Uid:        in.PlayerId,
+	}
+	kMsg, _ := sonic.MarshalString(message)
+	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, message.SenderId)
 	if err != nil {
 		return nil, err
 	}

+ 7 - 0
core/inner/rpc/internal/ext/buskey.go

@@ -0,0 +1,7 @@
+//@File     buskey.go
+//@Time     2022/06/07
+//@Author   #Suyghur,
+
+package ext
+
+const EVENT_REMOVE_TIMEOUT_JOB = "EVENT_REMOVE_TIMEOUT_JOB"

+ 35 - 5
core/inner/rpc/internal/ext/global.go

@@ -19,15 +19,15 @@ var (
 	GameOnlinePlayerMap *treemap.Map
 	// GameConnectedMap 已连接客服玩家
 	GameConnectedMap *treemap.Map
-	// WaitingQueue 玩家等待队列
-	WaitingQueue *simplelist.List
+	// WaitingList 玩家等待队列
+	WaitingList *simplelist.List
 )
 
 func GetVipPlayer(gameId, playerId string) *model.PlayerInfo {
 	if GameVipMap.Contains(gameId) {
-		vipMap := GameVipMap.Get(gameId).(*treemap.Map)
-		if vipMap.Contains(playerId) {
-			return vipMap.Get(playerId).(*model.PlayerInfo)
+		playerInfoMap := GameVipMap.Get(gameId).(*treemap.Map)
+		if playerInfoMap.Contains(playerId) {
+			return playerInfoMap.Get(playerId).(*model.PlayerInfo)
 		}
 	}
 	return nil
@@ -39,3 +39,33 @@ func GetCsInfo(csId string) *model.CsInfo {
 	}
 	return nil
 }
+
+func GetConnectedPlayerInfo(gameId, playerId string) *model.PlayerInfo {
+	if GameConnectedMap.Contains(gameId) {
+		connectedMap := GameConnectedMap.Get(gameId).(*treemap.Map)
+		if connectedMap.Contains(playerId) {
+			return connectedMap.Get(playerId).(*model.PlayerInfo)
+		}
+	}
+	return nil
+}
+
+func GetOnlinePlayerInfo(gameId, playerId string) *model.PlayerInfo {
+	if GameOnlinePlayerMap.Contains(gameId) {
+		onlinePlayerMap := GameOnlinePlayerMap.Get(gameId).(*treemap.Map)
+		if onlinePlayerMap.Contains(playerId) {
+			return onlinePlayerMap.Get(playerId).(*model.PlayerInfo)
+		}
+	}
+	return nil
+}
+
+func GetWaitingPlayerInfo(gameId, playerId string) *model.PlayerInfo {
+	for n := WaitingList.FrontNode(); n != nil; n = n.Next() {
+		playerInfo := n.Value.(*model.PlayerInfo)
+		if playerInfo.GameId == gameId && playerInfo.PlayerId == playerId {
+			return playerInfo
+		}
+	}
+	return nil
+}

+ 49 - 7
core/inner/rpc/internal/logic/csconnectplayerlogic.go

@@ -2,12 +2,16 @@ package logic
 
 import (
 	"context"
+	"github.com/bytedance/sonic"
+	"github.com/gookit/event"
 	treemap "github.com/liyue201/gostl/ds/map"
+	"github.com/robfig/cron/v3"
+	"github.com/zeromicro/go-zero/core/logx"
+	"time"
+	"ylink/comm/model"
 	"ylink/core/inner/rpc/internal/ext"
 	"ylink/core/inner/rpc/internal/svc"
 	"ylink/core/inner/rpc/pb"
-
-	"github.com/zeromicro/go-zero/core/logx"
 )
 
 type CsConnectPlayerLogic struct {
@@ -25,14 +29,52 @@ func NewCsConnectPlayerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *C
 }
 
 func (l *CsConnectPlayerLogic) CsConnectPlayer(in *pb.InnerCsConnectPlayerReq) (*pb.InnerCsConnectPlayerResp, error) {
+
+	playerInfo := ext.GetOnlinePlayerInfo(in.GameId, in.PlayerId)
+	playerInfo.CsId = in.CsId
+	playerInfo.DequeueTs = time.Now().Unix()
+
 	if ext.GameConnectedMap.Contains(in.GameId) {
-		playerConnMap := ext.GameConnectedMap.Get(in.GameId).(*treemap.Map)
-		playerConnMap.Insert(in.PlayerId, in.CsId)
+		playerConnectedMap := ext.GameConnectedMap.Get(in.GameId).(*treemap.Map)
+		playerConnectedMap.Insert(in.PlayerId, playerInfo)
 	} else {
-		playerConnMap := treemap.New(treemap.WithGoroutineSafe())
-		playerConnMap.Insert(in.PlayerId, in.CsId)
-		ext.GameConnectedMap.Insert(in.GameId, playerConnMap)
+		playerConnectedMap := treemap.New(treemap.WithGoroutineSafe())
+		playerConnectedMap.Insert(in.PlayerId, playerInfo)
+		ext.GameConnectedMap.Insert(in.GameId, playerConnectedMap)
 	}
 
+	// 移除WaitingQueue
+	for n := ext.WaitingList.FrontNode(); n != nil; n = n.Next() {
+		playerInfo := n.Value.(*model.PlayerInfo)
+		if playerInfo.GameId == in.GameId && playerInfo.PlayerId == in.PlayerId {
+			l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.PlayerId)
+			ext.WaitingList.Remove(nil, n)
+			break
+		}
+	}
+
+	var entryId cron.EntryID
+	entryId, _ = l.svcCtx.TimeoutCron.AddFunc("@every 1m", func() {
+		// TODO 增加trace
+		var timeoutTs int64
+		if playerInfo.LastChatTs == 0 {
+			timeoutTs = time.Now().Unix() - playerInfo.ConnectTs
+		} else {
+			timeoutTs = time.Now().Unix() - playerInfo.LastChatTs
+		}
+		if timeoutTs >= 300 {
+			_ = event.MustFire(ext.EVENT_REMOVE_TIMEOUT_JOB, event.M{"entry_id": entryId})
+			l.Logger.Infof("trigger timeout event, remove cron job, entry id: %d", entryId)
+
+			// 发下线command
+			//ext, _ := sonic.Marshal(playerInfo)
+			message, _ := sonic.MarshalString(&model.KqCmdMessage{
+				Opt: model.CMD_CHAT_TIMEOUT,
+				Ext: playerInfo,
+			})
+			l.svcCtx.KqCmdBoxProducer.SendMessage(l.ctx, message, in.PlayerId)
+		}
+	})
+
 	return &pb.InnerCsConnectPlayerResp{}, nil
 }

+ 4 - 4
core/inner/rpc/internal/logic/csfetchplayerqueuelogic.go

@@ -29,7 +29,7 @@ func NewCsFetchPlayerQueueLogic(ctx context.Context, svcCtx *svc.ServiceContext)
 }
 
 func (l *CsFetchPlayerQueueLogic) CsFetchPlayerQueue(in *pb.InnerCsFetchPlayerQueueReq) (*pb.InnerCsFetchPlayerQueueResp, error) {
-	queueLen := int64(ext.WaitingQueue.Len())
+	queueLen := int64(ext.WaitingList.Len())
 	if queueLen == 0 {
 		// 等待队列为空直接返回
 		return &pb.InnerCsFetchPlayerQueueResp{
@@ -44,12 +44,12 @@ func (l *CsFetchPlayerQueueLogic) CsFetchPlayerQueue(in *pb.InnerCsFetchPlayerQu
 
 	queue := make([]interface{}, queueLen)
 
-	for node := ext.WaitingQueue.FrontNode(); node != nil && index < queueLen; node = node.Next() {
-		info := node.Value.(*model.PlayerWaitingInfo)
+	for node := ext.WaitingList.FrontNode(); node != nil && index < queueLen; node = node.Next() {
+		info := node.Value.(*model.PlayerInfo)
 		queue[index] = map[string]interface{}{
 			"player_id": info.PlayerId,
 			"game_id":   info.GameId,
-			"wait_time": time.Now().Unix() - info.EnqueueTime,
+			"wait_time": time.Now().Unix() - info.EnqueueTs,
 		}
 		index += 1
 	}

+ 4 - 4
core/inner/rpc/internal/logic/notifyuserofflinelogic.go

@@ -34,7 +34,7 @@ func (l *NotifyUserOfflineLogic) NotifyUserOffline(in *pb.NotifyUserStatusReq) (
 	case globalkey.CONNECT_TYPE_PLAYER:
 		// 修改玩家在线状态
 		if ext.GameOnlinePlayerMap.Contains(in.GameId) {
-			// 有则取出玩家的set
+			// 有则取出玩家
 			onlinePlayerMap := ext.GameOnlinePlayerMap.Get(in.GameId).(*treemap.Map)
 			if onlinePlayerMap.Contains(in.Uid) {
 				// 有则清除,代表下线
@@ -42,11 +42,11 @@ func (l *NotifyUserOfflineLogic) NotifyUserOffline(in *pb.NotifyUserStatusReq) (
 			}
 		}
 
-		for n := ext.WaitingQueue.FrontNode(); n != nil; n = n.Next() {
-			info := n.Value.(*model.PlayerWaitingInfo)
+		for n := ext.WaitingList.FrontNode(); n != nil; n = n.Next() {
+			info := n.Value.(*model.PlayerInfo)
 			if info.GameId == in.GameId && info.PlayerId == in.Uid {
 				l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.Uid)
-				ext.WaitingQueue.Remove(nil, n)
+				ext.WaitingList.Remove(nil, n)
 				break
 			}
 		}

+ 18 - 18
core/inner/rpc/internal/logic/notifyuseronlinelogic.go

@@ -46,20 +46,20 @@ func (l *NotifyUserOnlineLogic) NotifyUserOnline(in *pb.NotifyUserStatusReq) (*p
 					onlinePlayerMap.Insert(in.Uid, playerInfo)
 				} else {
 					// 不是vip
-					onlinePlayerMap.Insert(in.Uid, &model.PlayerInfo{
+					ts := time.Now().Unix()
+					playerInfo := model.PlayerInfo{
 						PlayerId:  in.Uid,
 						GameId:    in.GameId,
-						ConnectTs: time.Now().Unix(),
-					})
+						ConnectTs: ts,
+						EnqueueTs: ts,
+					}
+					onlinePlayerMap.Insert(in.Uid, &playerInfo)
 					// 放入等待队列
-					ext.WaitingQueue.PushBack(&model.PlayerWaitingInfo{
-						PlayerId:    in.Uid,
-						GameId:      in.GameId,
-						EnqueueTime: time.Now().Unix(),
-					})
-					l.Logger.Infof("enqueue waiting list: %s", ext.WaitingQueue.String())
+					ext.WaitingList.PushBack(&playerInfo)
+					l.Logger.Infof("enqueue waiting list: %s", ext.WaitingList.String())
 				}
 			}
+			l.Logger.Infof("111111")
 		} else {
 			onlinePlayerMap := treemap.New(treemap.WithGoroutineSafe())
 			// 判断是不是vip玩家
@@ -68,20 +68,20 @@ func (l *NotifyUserOnlineLogic) NotifyUserOnline(in *pb.NotifyUserStatusReq) (*p
 				onlinePlayerMap.Insert(in.Uid, playerInfo)
 			} else {
 				// 不是vip
-				onlinePlayerMap.Insert(in.Uid, &model.PlayerInfo{
+				ts := time.Now().Unix()
+				playerInfo := model.PlayerInfo{
 					PlayerId:  in.Uid,
 					GameId:    in.GameId,
-					ConnectTs: time.Now().Unix(),
-				})
+					ConnectTs: ts,
+					EnqueueTs: ts,
+				}
+				onlinePlayerMap.Insert(in.Uid, &playerInfo)
 				// 放入等待队列
-				ext.WaitingQueue.PushBack(&model.PlayerWaitingInfo{
-					PlayerId:    in.Uid,
-					GameId:      in.GameId,
-					EnqueueTime: time.Now().Unix(),
-				})
-				l.Logger.Infof("enqueue waiting list: %s", ext.WaitingQueue.String())
+				ext.WaitingList.PushBack(&playerInfo)
+				l.Logger.Infof("enqueue waiting list: %s", ext.WaitingList.String())
 			}
 			ext.GameOnlinePlayerMap.Insert(in.GameId, onlinePlayerMap)
+			l.Logger.Infof("22222")
 		}
 	case globalkey.CONNECT_TYPE_CS:
 		if csInfo := ext.GetCsInfo(in.Uid); csInfo != nil {

+ 33 - 35
core/inner/rpc/internal/svc/servicecontext.go

@@ -2,11 +2,12 @@ package svc
 
 import (
 	"context"
-	"encoding/json"
 	"github.com/Shopify/sarama"
 	"github.com/bytedance/sonic"
+	"github.com/gookit/event"
 	"github.com/liyue201/gostl/ds/list/simplelist"
 	treemap "github.com/liyue201/gostl/ds/map"
+	"github.com/robfig/cron/v3"
 	"github.com/zeromicro/go-zero/core/logx"
 	"go.opentelemetry.io/otel/attribute"
 	"io/ioutil"
@@ -18,10 +19,11 @@ import (
 )
 
 type ServiceContext struct {
-	Config           config.Config
-	KqMsgBoxProducer *kafka.Producer
-	KqCmdBoxProducer *kafka.Producer
-	ConsumerGroup    *kafka.ConsumerGroup
+	Config             config.Config
+	KqMsgBoxProducer   *kafka.Producer
+	KqCmdBoxProducer   *kafka.Producer
+	KqMsgConsumerGroup *kafka.ConsumerGroup
+	TimeoutCron        *cron.Cron
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
@@ -29,7 +31,7 @@ func NewServiceContext(c config.Config) *ServiceContext {
 		Config:           c,
 		KqMsgBoxProducer: kafka.NewKafkaProducer(c.KqMsgBoxProducerConf.Brokers, c.KqMsgBoxProducerConf.Topic),
 		KqCmdBoxProducer: kafka.NewKafkaProducer(c.KqCmdBoxProducerConf.Brokers, c.KqCmdBoxProducerConf.Topic),
-		ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
+		KqMsgConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
 			KafkaVersion:   sarama.V1_0_0_0,
 			OffsetsInitial: sarama.OffsetNewest,
 			IsReturnErr:    false,
@@ -67,38 +69,20 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 	}
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
 		var message model.KqMessage
-		if err := json.Unmarshal(msg.Value, &message); err != nil {
+		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
 			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
 			return
 		}
 		logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
 		trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
 			if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
-				// 玩家发的消息,先从connMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
-				if ext.GameConnectedMap.Contains(message.GameId) {
-					// 先从connMap找对应的客服映射
-					if playerConnMap := ext.GameConnectedMap.Get(message.GameId).(*treemap.Map); playerConnMap.Contains(message.SenderId) {
-						message.ReceiverId = playerConnMap.Get(message.SenderId).(string)
-					} else {
-						if ext.GameVipMap.Contains(message.GameId) {
-							// 从vipMap里面找
-							if playerVipMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map); playerVipMap.Contains(message.SenderId) {
-								message.ReceiverId = playerVipMap.Get(message.SenderId).(string)
-							} else {
-								message.ReceiverId = ""
-							}
-						} else {
-							message.ReceiverId = ""
-						}
-					}
+				// receiverId为空代表这条消息是玩家发送的
+				// 玩家发的消息,先从connectedMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
+				if playerInfo := ext.GetConnectedPlayerInfo(message.GameId, message.Uid); playerInfo != nil {
+					message.ReceiverId = playerInfo.CsId
 				} else {
-					if ext.GameVipMap.Contains(message.GameId) {
-						// 从vipMap里面找
-						if playerVipMap := ext.GameVipMap.Get(message.GameId).(*treemap.Map); playerVipMap.Contains(message.SenderId) {
-							message.ReceiverId = playerVipMap.Get(message.SenderId).(string)
-						} else {
-							message.ReceiverId = ""
-						}
+					if playerInfo := ext.GetVipPlayer(message.GameId, message.Uid); playerInfo != nil {
+						message.ReceiverId = playerInfo.CsId
 					} else {
 						message.ReceiverId = ""
 					}
@@ -107,12 +91,13 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 				// 经过填补后receiver_id还是空的则有异常,丢弃信息不投递
 				if len(message.ReceiverId) != 0 && message.ReceiverId != "" {
 					logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
-					kMsg, _ := json.Marshal(message)
-					s.KqMsgBoxProducer.SendMessage(ctx, string(kMsg), message.ReceiverId)
+					kMsg, _ := sonic.MarshalString(message)
+					s.KqMsgBoxProducer.SendMessage(ctx, kMsg, message.ReceiverId)
 				} else {
 					logx.WithContext(ctx).Errorf("can not find receiver of the sender")
 				}
 			} else {
+				// receiverId不为空代表这条消息是客服发的
 				s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
 			}
 			sess.MarkMessage(msg, "")
@@ -121,7 +106,19 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 }
 
 func (s *ServiceContext) subscribe() {
-	go s.ConsumerGroup.RegisterHandleAndConsumer(s)
+	go s.KqMsgConsumerGroup.RegisterHandleAndConsumer(s)
+
+	// 注册事件
+	event.On(ext.EVENT_REMOVE_TIMEOUT_JOB, event.ListenerFunc(func(e event.Event) error {
+		logx.Info("on event remove timeout job...")
+		entryId := e.Get("entry_id").(cron.EntryID)
+		s.TimeoutCron.Remove(entryId)
+		return nil
+	}), event.High)
+
+	// 初始化定时任务
+	s.TimeoutCron = cron.New(cron.WithSeconds())
+	s.TimeoutCron.Start()
 }
 
 func fetchCsCenterInfo(c config.Config) {
@@ -130,9 +127,10 @@ func fetchCsCenterInfo(c config.Config) {
 	ext.GameVipMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.GameOnlinePlayerMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.GameConnectedMap = treemap.New(treemap.WithGoroutineSafe())
-	ext.WaitingQueue = simplelist.New()
+	ext.WaitingList = simplelist.New()
 	go loadMockInfo(c)
 }
+
 func loadMockInfo(c config.Config) {
 	// 加载游戏列表
 	logx.Info("加载游戏列表")

+ 6 - 0
flowsrv/rpc/etc/flowsrv.yaml

@@ -23,6 +23,12 @@ KqMsgBoxConsumerConf:
   Topic: recv-box-topic
   GroupId: flowsrv-api
 
+KqCmdBoxConsumerConf:
+  Brokers:
+    - 127.0.0.1:9092
+  Topic: cmd-box-topic
+  GroupId: flowsrv-api
+
 Redis:
   Host: 127.0.0.1:6379
   Type: node

+ 1 - 0
flowsrv/rpc/internal/config/config.go

@@ -9,6 +9,7 @@ type Config struct {
 	zrpc.RpcServerConf
 	InnerRpcConf         zrpc.RpcClientConf
 	KqMsgBoxConsumerConf kafka.KqConsumerConfig
+	KqCmdBoxConsumerConf kafka.KqConsumerConfig
 	JwtAuth              struct {
 		AccessSecret string
 		AccessExpire int64

+ 11 - 35
flowsrv/rpc/internal/logic/connectlogic.go

@@ -52,6 +52,13 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 		})
 	}
 
+	var flowId string
+	if in.Type == globalkey.CONNECT_TYPE_PLAYER {
+		flowId = gameId + "_" + uid
+	} else {
+		flowId = uid
+	}
+
 	flow := &model.Flow{
 		EndFlow: make(chan int),
 		Message: make(chan string),
@@ -59,11 +66,10 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 		Ctx:     l.ctx,
 		SvcCtx:  l.svcCtx,
 		Logger:  l.Logger,
-		User: &model.User{
-			Type:   in.Type,
-			Uid:    uid,
-			GameId: gameId,
-		},
+		Type:    in.Type,
+		Uid:     uid,
+		GameId:  gameId,
+		FlowId:  flowId,
 	}
 	defer func() {
 		close(flow.EndFlow)
@@ -72,36 +78,6 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 
 	mgr.GetFlowMgrInstance().Register(flow)
 
-	//go func() {
-	//	for {
-	//		select {
-	//		case <-stream.Context().Done():
-	//			if mgr.GetFlowMgrInstance().Has(uid) {
-	//				l.Logger.Infof("flowstream was disconnected abnormally")
-	//				mgr.GetFlowMgrInstance().UnRegister(uid)
-	//				_, err = l.svcCtx.InnerRpc.NotifyUserOffline(l.ctx, &inner.NotifyUserStatusReq{
-	//					Type:   in.Type,
-	//					Uid:    uid,
-	//					GameId: gameId,
-	//				})
-	//			}
-	//			flow.EndFlow <- 1
-	//			return
-	//		case msg, open := <-flow.Message:
-	//			if open {
-	//				stream.Send(&pb.CommandResp{
-	//					Code: result.Ok,
-	//					Msg:  "success",
-	//					Data: []byte(msg),
-	//				})
-	//			} else {
-	//				l.Logger.Error("message channel is close")
-	//				return
-	//			}
-	//		}
-	//	}
-	//}()
-
 	<-flow.EndFlow
 	l.Logger.Infof("end flow")
 	return nil

+ 7 - 1
flowsrv/rpc/internal/logic/disconnectlogic.go

@@ -51,7 +51,13 @@ func (l *DisconnectLogic) Disconnect(in *pb.CommandReq) (*pb.CommandResp, error)
 		}, err
 	}
 
-	mgr.GetFlowMgrInstance().UnRegister(uid)
+	var flowId string
+	if in.Type == globalkey.CONNECT_TYPE_PLAYER {
+		flowId = gameId + "_" + uid
+	} else {
+		flowId = uid
+	}
+	mgr.GetFlowMgrInstance().UnRegister(flowId)
 
 	return &pb.CommandResp{
 		Code: result.Ok,

+ 16 - 16
flowsrv/rpc/internal/mgr/flowmgr.go

@@ -35,7 +35,7 @@ func GetFlowMgrInstance() *flowManager {
 func (manager *flowManager) Register(flow *model.Flow) {
 	//go registerWorker(flow)
 	go manager.registerFlow(flow)
-	manager.flowMap.Insert(flow.User.Uid, flow)
+	manager.flowMap.Insert(flow.FlowId, flow)
 }
 
 func (manager *flowManager) registerFlow(flow *model.Flow) {
@@ -43,13 +43,13 @@ func (manager *flowManager) registerFlow(flow *model.Flow) {
 	for {
 		select {
 		case <-flow.Stream.Context().Done():
-			if manager.Has(flow.User.Uid) {
+			if manager.Has(flow.FlowId) {
 				flow.Logger.Infof("flowstream was disconnected abnormally")
-				manager.UnRegister(flow.User.Uid)
+				manager.UnRegister(flow.FlowId)
 				flow.SvcCtx.InnerRpc.NotifyUserOffline(flow.Ctx, &inner.NotifyUserStatusReq{
-					Type:   flow.User.Type,
-					Uid:    flow.User.Uid,
-					GameId: flow.User.GameId,
+					Type:   flow.Type,
+					Uid:    flow.Uid,
+					GameId: flow.GameId,
 				})
 			}
 			flow.EndFlow <- 1
@@ -76,9 +76,9 @@ func (manager *flowManager) subscribeRmq(flow *model.Flow) {
 			flow.Logger.Infof("unsubscribe rmq...")
 			return
 		default:
-			resultCmd := flow.SvcCtx.RedisClient.BRPop(flow.Ctx, 10*time.Second, flow.User.Uid)
+			resultCmd := flow.SvcCtx.RedisClient.BRPop(flow.Ctx, 10*time.Second, flow.FlowId)
 			if message, err := resultCmd.Result(); err != nil {
-				flow.Logger.Errorf("get message from redis err: %v", err)
+				flow.Logger.Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
 			} else {
 				flow.Message <- message[1]
 			}
@@ -86,19 +86,19 @@ func (manager *flowManager) subscribeRmq(flow *model.Flow) {
 	}
 }
 
-func (manager *flowManager) Get(uid string) *model.Flow {
-	return manager.flowMap.Get(uid).(*model.Flow)
+func (manager *flowManager) Get(flowId string) *model.Flow {
+	return manager.flowMap.Get(flowId).(*model.Flow)
 }
 
-func (manager *flowManager) UnRegister(uid string) {
-	if manager.flowMap.Contains(uid) {
-		flow := manager.Get(uid)
+func (manager *flowManager) UnRegister(flowId string) {
+	if manager.flowMap.Contains(flowId) {
+		flow := manager.Get(flowId)
 		close(flow.Message)
 		//flow.EndRmq <- 0
-		manager.flowMap.Erase(uid)
+		manager.flowMap.Erase(flowId)
 	}
 }
 
-func (manager *flowManager) Has(uid string) bool {
-	return manager.flowMap.Contains(uid)
+func (manager *flowManager) Has(flowId string) bool {
+	return manager.flowMap.Contains(flowId)
 }

+ 4 - 7
flowsrv/rpc/internal/model/bean.go

@@ -11,12 +11,6 @@ import (
 	"ylink/flowsrv/rpc/pb"
 )
 
-type User struct {
-	Type   int64
-	Uid    string
-	GameId string
-}
-
 type Flow struct {
 	EndFlow chan int
 	Message chan string
@@ -24,5 +18,8 @@ type Flow struct {
 	SvcCtx  *svc.ServiceContext
 	Logger  logx.Logger
 	Stream  pb.Flowsrv_ConnectServer
-	User    *User
+	Type    int64
+	Uid     string
+	GameId  string
+	FlowId  string
 }

+ 50 - 15
flowsrv/rpc/internal/svc/servicecontext.go

@@ -2,8 +2,8 @@ package svc
 
 import (
 	"context"
-	"encoding/json"
 	"github.com/Shopify/sarama"
+	"github.com/bytedance/sonic"
 	"github.com/go-redis/redis/v8"
 	"github.com/zeromicro/go-zero/core/logx"
 	gozerotrace "github.com/zeromicro/go-zero/core/trace"
@@ -22,26 +22,22 @@ import (
 )
 
 type ServiceContext struct {
-	Config        config.Config
-	InnerRpc      inner.Inner
-	ConsumerGroup *kafka.ConsumerGroup
-	RedisClient   *redis.Client
+	Config               config.Config
+	InnerRpc             inner.Inner
+	MessageConsumerGroup *kafka.ConsumerGroup
+	CommandConsumerGroup *kafka.ConsumerGroup
+	RedisClient          *redis.Client
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
 	svcCtx := &ServiceContext{
 		Config:   c,
 		InnerRpc: inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
-
 		RedisClient: redis.NewClient(&redis.Options{
 			Addr:     c.Redis.Host,
 			Password: c.Redis.Pass,
 		}),
-		//RedisClient: redis.New(c.Redis.Host, func(r *redis.Redis) {
-		//	r.Type = c.Redis.Type
-		//	r.Pass = c.Redis.Pass
-		//}),
-		ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
+		MessageConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
 			KafkaVersion:   sarama.V1_0_0_0,
 			OffsetsInitial: sarama.OffsetNewest,
 			IsReturnErr:    false,
@@ -49,6 +45,15 @@ func NewServiceContext(c config.Config) *ServiceContext {
 			c.KqMsgBoxConsumerConf.Brokers,
 			[]string{c.KqMsgBoxConsumerConf.Topic},
 			c.KqMsgBoxConsumerConf.GroupId),
+
+		CommandConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
+			KafkaVersion:   sarama.V1_0_0_0,
+			OffsetsInitial: sarama.OffsetNewest,
+			IsReturnErr:    false,
+		},
+			c.KqCmdBoxConsumerConf.Brokers,
+			[]string{c.KqCmdBoxConsumerConf.Topic},
+			c.KqCmdBoxConsumerConf.GroupId),
 	}
 	go svcCtx.subscribe()
 	return svcCtx
@@ -64,8 +69,13 @@ func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
 
 func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 	for msg := range claim.Messages() {
+
 		if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
+			logx.Info("handleMessage")
 			s.handleMessage(sess, msg)
+		} else if msg.Topic == s.Config.KqCmdBoxConsumerConf.Topic {
+			logx.Info("handleCommand")
+			s.handleCommand(sess, msg)
 		}
 	}
 	return nil
@@ -89,7 +99,7 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 	}
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
 		var message model.KqMessage
-		if err := json.Unmarshal(msg.Value, &message); err != nil {
+		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
 			logx.Errorf("unmarshal msg error: %v", err)
 			return
 		}
@@ -101,14 +111,39 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 			} else {
 				logx.WithContext(ctx).Infof("current rmq size: %d", size)
 			}
-			//if _, err := s.RedisClient.(message.ReceiverId, string(msg.Value)); err != nil {
-			//	logx.WithContext(ctx).Errorf("push message err %v", err)
+			sess.MarkMessage(msg, "")
+		}, attribute.String("msg.key", string(msg.Key)))
+	})
+}
+
+func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
+	traceId := kafka.GetTraceFromHeader(msg.Headers)
+	if len(traceId) == 0 {
+		return
+	}
+	trace.RunOnTracing(traceId, func(ctx context.Context) {
+		var message model.KqCmdMessage
+		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
+			logx.Errorf("unmarshal msg error: %v", err)
+			return
+		}
+		trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushMessage", func(ctx context.Context) {
+			logx.WithContext(ctx).Infof("recv command: %v", message)
+
+			//// 投递到receiver_id对应的redis队列暂存
+			//intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
+			//if size, err := intCmd.Result(); err != nil {
+			//	logx.WithContext(ctx).Errorf("push message rmq err %v", err)
+			//} else {
+			//	logx.WithContext(ctx).Infof("current rmq size: %d", size)
 			//}
+
 			sess.MarkMessage(msg, "")
 		}, attribute.String("msg.key", string(msg.Key)))
 	})
 }
 
 func (s *ServiceContext) subscribe() {
-	go s.ConsumerGroup.RegisterHandleAndConsumer(s)
+	go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
+	go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
 }

+ 2 - 0
go.mod

@@ -7,8 +7,10 @@ require (
 	github.com/bytedance/sonic v1.3.0
 	github.com/go-redis/redis/v8 v8.11.4
 	github.com/golang-jwt/jwt/v4 v4.4.1
+	github.com/gookit/event v1.0.6
 	github.com/liyue201/gostl v1.0.1
 	github.com/pkg/errors v0.9.1
+	github.com/robfig/cron/v3 v3.0.1
 	github.com/zeromicro/go-zero v1.3.3
 	go.opentelemetry.io/otel v1.7.0
 	go.opentelemetry.io/otel/trace v1.7.0

+ 4 - 0
go.sum

@@ -248,6 +248,8 @@ github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+
 github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
 github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
 github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
+github.com/gookit/event v1.0.6 h1:/U95T1tBzt9RSSi23pg4VR3B9VWkyM4xv8TXAGi60IQ=
+github.com/gookit/event v1.0.6/go.mod h1:7Udf/q/HQcrK9XE4JZUvbqi46rI1V8r/Pvao2NbPajA=
 github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
 github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
 github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
@@ -396,6 +398,8 @@ github.com/rabbitmq/amqp091-go v1.1.0/go.mod h1:ogQDLSOACsLPsIq0NpbtiifNZi2YOz0V
 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
 github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
 github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
+github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
 github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=