Selaa lähdekoodia

v0.0.1开发:修改等待队列的类型为treemap

#Suyghur 2 vuotta sitten
vanhempi
commit
2cb3d7755e

+ 15 - 4
comm/globalkey/globalkey.go

@@ -5,11 +5,22 @@
 package globalkey
 package globalkey
 
 
 const (
 const (
-	ConnectTypePlayer = 0
-	ConnectTypeCs     = 1
+	ConnectTypeNormalPlayer = 0
+	ConnectTypeVipPlayer    = 1
+	ConnectTypeCs           = 2
 )
 )
 
 
 const (
 const (
-	EventRemoveTimeoutJob = "EventRemoveTimeoutJob"
-	EventUnsubscribeRmq   = "EventUnsubscribeRmq"
+	EventRemoveTimeoutJob     = "EventRemoveTimeoutJob"
+	EventHandleRmqJob         = "EventHandleRmqJob"
+	EventUnsubscribeRmqJob    = "EventUnsubscribeRmq"
+	EventNotifyUserOfflineJob = "EventNotifyUserOfflineJob"
+)
+
+const (
+	All             = "all"
+	AllPlayer       = "all_player"
+	AllVipPlayer    = "all_vip_player"
+	AllNormalPlayer = "all_normal_player"
+	AllCs           = "all_cs"
 )
 )

+ 10 - 6
comm/model/message.go

@@ -5,11 +5,17 @@
 package model
 package model
 
 
 const (
 const (
-	CMD_SEND_MESSAGE = 0
-	CMD_CHAT_TIMEOUT = 2001
+	CMD_SEND_MESSAGE         = 0
+	CMD_UPDATE_WAITING_QUEUE = 2000
+	CMD_CHAT_TIMEOUT         = 2001
 )
 )
 
 
 type KqMessage struct {
 type KqMessage struct {
+	Opt     int32  `json:"opt"`
+	Payload string `json:"payload"`
+	Ext     string `json:"ext"`
+}
+type ChatMessage struct {
 	CreateTime string `json:"create_time"`
 	CreateTime string `json:"create_time"`
 	Content    string `json:"content"`
 	Content    string `json:"content"`
 	Pic        string `json:"pic"`
 	Pic        string `json:"pic"`
@@ -17,13 +23,11 @@ type KqMessage struct {
 	SenderId   string `json:"sender_id"`
 	SenderId   string `json:"sender_id"`
 	GameId     string `json:"game_id"`
 	GameId     string `json:"game_id"`
 	Uid        string `json:"uid"`
 	Uid        string `json:"uid"`
-	Ext        string `json:"ext"`
 }
 }
 
 
-type KqCmdMessage struct {
-	Opt        int32  `json:"opt"`
+type CommandMessage struct {
+	Payload    string `json:"payload"`
 	ReceiverId string `json:"receiver_id"`
 	ReceiverId string `json:"receiver_id"`
 	GameId     string `json:"game_id"`
 	GameId     string `json:"game_id"`
 	Uid        string `json:"uid"`
 	Uid        string `json:"uid"`
-	Ext        string `json:"ext"`
 }
 }

+ 7 - 2
core/cmd/rpc/internal/logic/cssendmsglogic.go

@@ -28,7 +28,7 @@ func NewCsSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CsSendM
 
 
 func (l *CsSendMsgLogic) CsSendMsg(in *pb.CsSendMsgReq) (*pb.CsSendMsgResp, error) {
 func (l *CsSendMsgLogic) CsSendMsg(in *pb.CsSendMsgReq) (*pb.CsSendMsgResp, error) {
 	// 投递到自己的发件箱
 	// 投递到自己的发件箱
-	message := &model.KqMessage{
+	message := &model.ChatMessage{
 		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
 		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
 		Content:    in.Content,
 		Content:    in.Content,
 		Pic:        in.Pic,
 		Pic:        in.Pic,
@@ -37,7 +37,12 @@ func (l *CsSendMsgLogic) CsSendMsg(in *pb.CsSendMsgReq) (*pb.CsSendMsgResp, erro
 		GameId:     in.GameId,
 		GameId:     in.GameId,
 		Uid:        in.CsId,
 		Uid:        in.CsId,
 	}
 	}
-	kMsg, _ := sonic.MarshalString(message)
+	payload, _ := sonic.MarshalString(message)
+	kMsg, _ := sonic.MarshalString(&model.KqMessage{
+		Opt:     model.CMD_SEND_MESSAGE,
+		Payload: payload,
+		Ext:     "",
+	})
 	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, message.SenderId)
 	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, message.SenderId)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err

+ 7 - 2
core/cmd/rpc/internal/logic/playersendmsglogic.go

@@ -27,7 +27,7 @@ func NewPlayerSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Pla
 
 
 func (l *PlayerSendMsgLogic) PlayerSendMsg(in *pb.PlayerSendMsgReq) (*pb.PlayerSendMsgResp, error) {
 func (l *PlayerSendMsgLogic) PlayerSendMsg(in *pb.PlayerSendMsgReq) (*pb.PlayerSendMsgResp, error) {
 	// 投递到自己的发件箱
 	// 投递到自己的发件箱
-	message := &model.KqMessage{
+	message := &model.ChatMessage{
 		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
 		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
 		Content:    in.Content,
 		Content:    in.Content,
 		Pic:        in.Pic,
 		Pic:        in.Pic,
@@ -35,7 +35,12 @@ func (l *PlayerSendMsgLogic) PlayerSendMsg(in *pb.PlayerSendMsgReq) (*pb.PlayerS
 		GameId:     in.GameId,
 		GameId:     in.GameId,
 		Uid:        in.PlayerId,
 		Uid:        in.PlayerId,
 	}
 	}
-	kMsg, _ := sonic.MarshalString(message)
+	payload, _ := sonic.MarshalString(message)
+	kMsg, _ := sonic.MarshalString(&model.KqMessage{
+		Opt:     model.CMD_SEND_MESSAGE,
+		Payload: payload,
+		Ext:     "",
+	})
 	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, message.SenderId)
 	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, message.SenderId)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err

+ 2 - 3
core/inner/rpc/internal/ext/global.go

@@ -5,7 +5,6 @@
 package ext
 package ext
 
 
 import (
 import (
-	"github.com/liyue201/gostl/ds/list/simplelist"
 	"github.com/liyue201/gostl/ds/map"
 	"github.com/liyue201/gostl/ds/map"
 	"ylink/comm/model"
 	"ylink/comm/model"
 )
 )
@@ -19,8 +18,8 @@ var (
 	GameOnlinePlayerMap *treemap.Map
 	GameOnlinePlayerMap *treemap.Map
 	// GameConnectedMap 已连接客服玩家
 	// GameConnectedMap 已连接客服玩家
 	GameConnectedMap *treemap.Map
 	GameConnectedMap *treemap.Map
-	// WaitingList 玩家等待队列
-	WaitingList *simplelist.List
+	// WaitingQueue 玩家等待队列
+	WaitingQueue *treemap.Map
 )
 )
 
 
 func GetVipPlayer(gameId, playerId string) *model.PlayerInfo {
 func GetVipPlayer(gameId, playerId string) *model.PlayerInfo {

+ 56 - 30
core/inner/rpc/internal/logic/csconnectplayerlogic.go

@@ -9,9 +9,11 @@ import (
 	"github.com/robfig/cron/v3"
 	"github.com/robfig/cron/v3"
 	"github.com/zeromicro/go-zero/core/logx"
 	"github.com/zeromicro/go-zero/core/logx"
 	"time"
 	"time"
+	"ylink/comm/ctxdata"
 	"ylink/comm/globalkey"
 	"ylink/comm/globalkey"
 	"ylink/comm/model"
 	"ylink/comm/model"
 	"ylink/comm/result"
 	"ylink/comm/result"
+	"ylink/comm/trace"
 	"ylink/core/inner/rpc/internal/ext"
 	"ylink/core/inner/rpc/internal/ext"
 	"ylink/core/inner/rpc/internal/svc"
 	"ylink/core/inner/rpc/internal/svc"
 	"ylink/core/inner/rpc/pb"
 	"ylink/core/inner/rpc/pb"
@@ -34,10 +36,10 @@ func NewCsConnectPlayerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *C
 func (l *CsConnectPlayerLogic) CsConnectPlayer(in *pb.InnerCsConnectPlayerReq) (*pb.InnerCsConnectPlayerResp, error) {
 func (l *CsConnectPlayerLogic) CsConnectPlayer(in *pb.InnerCsConnectPlayerReq) (*pb.InnerCsConnectPlayerResp, error) {
 
 
 	playerInfo := ext.GetOnlinePlayerInfo(in.GameId, in.PlayerId)
 	playerInfo := ext.GetOnlinePlayerInfo(in.GameId, in.PlayerId)
-
 	if playerInfo == nil {
 	if playerInfo == nil {
-		return nil, errors.Wrapf(result.NewErrMsg("The player is not connected"), "")
+		return nil, errors.Wrapf(result.NewErrMsg("玩家不在线"), "")
 	}
 	}
+
 	playerInfo.CsId = in.CsId
 	playerInfo.CsId = in.CsId
 	playerInfo.DequeueTs = time.Now().Unix()
 	playerInfo.DequeueTs = time.Now().Unix()
 
 
@@ -51,38 +53,62 @@ func (l *CsConnectPlayerLogic) CsConnectPlayer(in *pb.InnerCsConnectPlayerReq) (
 	}
 	}
 
 
 	// 移除WaitingQueue
 	// 移除WaitingQueue
-	for n := ext.WaitingList.FrontNode(); n != nil; n = n.Next() {
-		playerInfo := n.Value.(*model.PlayerInfo)
-		if playerInfo.GameId == in.GameId && playerInfo.PlayerId == in.PlayerId {
-			l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.PlayerId)
-			ext.WaitingList.Remove(nil, n)
-			break
+	key := in.GameId + "_" + in.PlayerId
+	if ext.WaitingQueue.Contains(key) {
+		l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.PlayerId)
+		ext.WaitingQueue.Erase(key)
+		//TODO 广播客户端更新等待队列信息
+		queueInfo, _ := sonic.MarshalString(map[string]interface{}{
+			"queue_size": ext.WaitingQueue.Size(),
+		})
+		cmdMessage := &model.CommandMessage{
+			Payload:    queueInfo,
+			ReceiverId: globalkey.AllNormalPlayer,
+			GameId:     in.GameId,
+			Uid:        in.PlayerId,
 		}
 		}
+		payload, _ := sonic.MarshalString(cmdMessage)
+		message, _ := sonic.MarshalString(&model.KqMessage{
+			Opt:     model.CMD_UPDATE_WAITING_QUEUE,
+			Payload: payload,
+			Ext:     "",
+		})
+		l.svcCtx.KqCmdBoxProducer.SendMessage(l.ctx, message, globalkey.AllNormalPlayer)
 	}
 	}
 
 
-	var entryId cron.EntryID
-	entryId, _ = l.svcCtx.TimeoutCron.AddFunc("@every 1m", func() {
-		// TODO 增加trace
-		var timeoutTs int64
-		if playerInfo.LastChatTs == 0 {
-			timeoutTs = time.Now().Unix() - playerInfo.ConnectTs
-		} else {
-			timeoutTs = time.Now().Unix() - playerInfo.LastChatTs
-		}
-		if timeoutTs >= 3600 {
-			// 释放计时器任务
-			_ = event.MustFire(globalkey.EventRemoveTimeoutJob, event.M{"entry_id": entryId})
-			l.Logger.Infof("trigger timeout event, remove cron job, entry id: %d", entryId)
+	traceId := ctxdata.GetTraceIdFromCtx(l.ctx)
+	trace.RunOnTracing(traceId, func(ctx context.Context) {
+		var entryId cron.EntryID
+		entryId, _ = l.svcCtx.TimeoutCron.AddFunc("@every 1m", func() {
+			var timeoutTs int64
+			if playerInfo.LastChatTs == 0 {
+				timeoutTs = time.Now().Unix() - playerInfo.ConnectTs
+			} else {
+				timeoutTs = time.Now().Unix() - playerInfo.LastChatTs
+			}
+			if timeoutTs >= 3600 {
+				// 释放计时器任务
+				_ = event.MustFire(globalkey.EventRemoveTimeoutJob, event.M{"entry_id": entryId})
+				l.Logger.Infof("trigger timeout event, remove cron job, entry id: %d", entryId)
 
 
-			// 发踢下线的command指令
-			message, _ := sonic.MarshalString(&model.KqCmdMessage{
-				Opt:        model.CMD_CHAT_TIMEOUT,
-				ReceiverId: in.GameId + "_" + in.PlayerId,
-				GameId:     in.GameId,
-				Uid:        in.PlayerId,
-			})
-			l.svcCtx.KqCmdBoxProducer.SendMessage(l.ctx, message, in.GameId+"_"+in.PlayerId)
-		}
+				trace.StartTrace(ctx, "InnerServer.CountDownTimer.SendCmdMessage", func(ctx context.Context) {
+					// 发踢下线的command指令
+					cmdMessage := &model.CommandMessage{
+						Payload:    "",
+						ReceiverId: in.GameId + "_" + in.PlayerId,
+						GameId:     in.GameId,
+						Uid:        in.PlayerId,
+					}
+					payload, _ := sonic.MarshalString(cmdMessage)
+					message, _ := sonic.MarshalString(&model.KqMessage{
+						Opt:     model.CMD_CHAT_TIMEOUT,
+						Payload: payload,
+						Ext:     "",
+					})
+					l.svcCtx.KqCmdBoxProducer.SendMessage(ctx, message, in.GameId+"_"+in.PlayerId)
+				})
+			}
+		})
 	})
 	})
 
 
 	return &pb.InnerCsConnectPlayerResp{}, nil
 	return &pb.InnerCsConnectPlayerResp{}, nil

+ 20 - 7
core/inner/rpc/internal/logic/csfetchplayerqueuelogic.go

@@ -29,8 +29,8 @@ func NewCsFetchPlayerQueueLogic(ctx context.Context, svcCtx *svc.ServiceContext)
 }
 }
 
 
 func (l *CsFetchPlayerQueueLogic) CsFetchPlayerQueue(in *pb.InnerCsFetchPlayerQueueReq) (*pb.InnerCsFetchPlayerQueueResp, error) {
 func (l *CsFetchPlayerQueueLogic) CsFetchPlayerQueue(in *pb.InnerCsFetchPlayerQueueReq) (*pb.InnerCsFetchPlayerQueueResp, error) {
-	queueLen := int32(ext.WaitingList.Len())
-	if queueLen == 0 {
+	queueSize := int32(ext.WaitingQueue.Size())
+	if queueSize == 0 {
 		// 等待队列为空直接返回
 		// 等待队列为空直接返回
 		return &pb.InnerCsFetchPlayerQueueResp{
 		return &pb.InnerCsFetchPlayerQueueResp{
 			List: nil,
 			List: nil,
@@ -38,14 +38,17 @@ func (l *CsFetchPlayerQueueLogic) CsFetchPlayerQueue(in *pb.InnerCsFetchPlayerQu
 	}
 	}
 
 
 	var index int32 = 0
 	var index int32 = 0
-	if in.Limit != 0 && in.Limit < queueLen {
-		queueLen = in.Limit
+	if in.Limit != 0 && in.Limit < queueSize {
+		queueSize = in.Limit
 	}
 	}
 
 
-	queue := make([]interface{}, queueLen)
+	queue := make([]interface{}, queueSize)
 
 
-	for node := ext.WaitingList.FrontNode(); node != nil && index < queueLen; node = node.Next() {
-		info := node.Value.(*model.PlayerInfo)
+	for iter := ext.WaitingQueue.Begin(); iter.IsValid(); iter.Next() {
+		if index >= queueSize {
+			break
+		}
+		info := iter.Value().(*model.PlayerInfo)
 		queue[index] = map[string]interface{}{
 		queue[index] = map[string]interface{}{
 			"game_id":   info.GameId,
 			"game_id":   info.GameId,
 			"player_id": info.PlayerId,
 			"player_id": info.PlayerId,
@@ -54,6 +57,16 @@ func (l *CsFetchPlayerQueueLogic) CsFetchPlayerQueue(in *pb.InnerCsFetchPlayerQu
 		index += 1
 		index += 1
 	}
 	}
 
 
+	//for node := ext.WaitingQueue.FrontNode(); node != nil && index < queueLen; node = node.Next() {
+	//	info := node.Value.(*model.PlayerInfo)
+	//	queue[index] = map[string]interface{}{
+	//		"game_id":   info.GameId,
+	//		"player_id": info.PlayerId,
+	//		"wait_time": time.Now().Unix() - info.EnqueueTs,
+	//	}
+	//	index += 1
+	//}
+
 	list, err := structpb.NewList(queue)
 	list, err := structpb.NewList(queue)
 	if err != nil {
 	if err != nil {
 		return nil, errors.Wrap(result.NewErrMsg("fetch player wait queue error"), "")
 		return nil, errors.Wrap(result.NewErrMsg("fetch player wait queue error"), "")

+ 39 - 13
core/inner/rpc/internal/logic/notifyuserofflinelogic.go

@@ -5,7 +5,6 @@ import (
 	treemap "github.com/liyue201/gostl/ds/map"
 	treemap "github.com/liyue201/gostl/ds/map"
 	"github.com/pkg/errors"
 	"github.com/pkg/errors"
 	"ylink/comm/globalkey"
 	"ylink/comm/globalkey"
-	"ylink/comm/model"
 	"ylink/comm/result"
 	"ylink/comm/result"
 	"ylink/core/inner/rpc/internal/ext"
 	"ylink/core/inner/rpc/internal/ext"
 
 
@@ -32,7 +31,7 @@ func NewNotifyUserOfflineLogic(ctx context.Context, svcCtx *svc.ServiceContext)
 func (l *NotifyUserOfflineLogic) NotifyUserOffline(in *pb.NotifyUserStatusReq) (*pb.NotifyUserStatusResp, error) {
 func (l *NotifyUserOfflineLogic) NotifyUserOffline(in *pb.NotifyUserStatusReq) (*pb.NotifyUserStatusResp, error) {
 	l.Logger.Infof("NotifyUserOffline")
 	l.Logger.Infof("NotifyUserOffline")
 	switch in.Type {
 	switch in.Type {
-	case globalkey.ConnectTypePlayer:
+	case globalkey.ConnectTypeNormalPlayer:
 		// 修改玩家在线状态
 		// 修改玩家在线状态
 		if ext.GameOnlinePlayerMap.Contains(in.GameId) {
 		if ext.GameOnlinePlayerMap.Contains(in.GameId) {
 			// 有则取出玩家
 			// 有则取出玩家
@@ -44,25 +43,52 @@ func (l *NotifyUserOfflineLogic) NotifyUserOffline(in *pb.NotifyUserStatusReq) (
 			}
 			}
 		}
 		}
 
 
-		go func() {
-			for n := ext.WaitingList.FrontNode(); n != nil; n = n.Next() {
-				info := n.Value.(*model.PlayerInfo)
-				if info.GameId == in.GameId && info.PlayerId == in.Uid {
-					l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.Uid)
-					ext.WaitingList.Remove(nil, n)
-					break
-				}
+		key := in.GameId + "_" + in.Uid
+		if ext.WaitingQueue.Contains(key) {
+			l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.Uid)
+			ext.WaitingQueue.Erase(key)
+
+		}
+		//for n := ext.WaitingList.FrontNode(); n != nil; n = n.Next() {
+		//	info := n.Value.(*model.PlayerInfo)
+		//	l.Logger.Infof("playerInfo: %v", info)
+		//	if info.GameId == in.GameId && info.PlayerId == in.Uid {
+		//		l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.Uid)
+		//		ext.WaitingList.Remove(nil, n)
+		//		//TODO 广播客户端更新等待队列信息
+		//		//queueInfo, _ := sonic.MarshalString(map[string]interface{}{
+		//		//	"queue_size": 10,
+		//		//})
+		//		//message, _ := sonic.MarshalString(&model.KqCmdMessage{
+		//		//	Opt:        model.CMD_UPDATE_WAITING_QUEUE,
+		//		//	ReceiverId: "all",
+		//		//	GameId:     in.GameId,
+		//		//	Uid:        in.Uid,
+		//		//	Ext:        queueInfo,
+		//		//})
+		//		break
+		//	}
+		//}
+	case globalkey.ConnectTypeVipPlayer:
+		// 修改玩家在线状态
+		if ext.GameOnlinePlayerMap.Contains(in.GameId) {
+			// 有则取出玩家
+			onlinePlayerMap := ext.GameOnlinePlayerMap.Get(in.GameId).(*treemap.Map)
+			if onlinePlayerMap.Contains(in.Uid) {
+				// 有则清除,代表下线
+				onlinePlayerMap.Erase(in.Uid)
+				l.Logger.Infof("清除玩家在线状态")
 			}
 			}
-		}()
+		}
 	case globalkey.ConnectTypeCs:
 	case globalkey.ConnectTypeCs:
 		// 修改客服在线状态
 		// 修改客服在线状态
 		if csInfo := ext.GetCsInfo(in.Uid); csInfo != nil {
 		if csInfo := ext.GetCsInfo(in.Uid); csInfo != nil {
 			csInfo.OnlineStatus = 0
 			csInfo.OnlineStatus = 0
 		} else {
 		} else {
-			return nil, errors.Wrap(result.NewErrMsg("no such user"), "")
+			return nil, errors.Wrap(result.NewErrMsg("用户不存在"), "")
 		}
 		}
 	default:
 	default:
-		return nil, errors.Wrap(result.NewErrMsg("no such user type"), "")
+		return nil, errors.Wrap(result.NewErrMsg("用户不存在"), "")
 	}
 	}
 	return &pb.NotifyUserStatusResp{}, nil
 	return &pb.NotifyUserStatusResp{}, nil
 }
 }

+ 50 - 38
core/inner/rpc/internal/logic/notifyuseronlinelogic.go

@@ -32,63 +32,75 @@ func NewNotifyUserOnlineLogic(ctx context.Context, svcCtx *svc.ServiceContext) *
 
 
 func (l *NotifyUserOnlineLogic) NotifyUserOnline(in *pb.NotifyUserStatusReq) (*pb.NotifyUserStatusResp, error) {
 func (l *NotifyUserOnlineLogic) NotifyUserOnline(in *pb.NotifyUserStatusReq) (*pb.NotifyUserStatusResp, error) {
 	switch in.Type {
 	switch in.Type {
-	case globalkey.ConnectTypePlayer:
+	case globalkey.ConnectTypeNormalPlayer:
 		// 修改玩家在线状态
 		// 修改玩家在线状态
 		if ext.GameOnlinePlayerMap.Contains(in.GameId) {
 		if ext.GameOnlinePlayerMap.Contains(in.GameId) {
 			// 有则取出玩家的map
 			// 有则取出玩家的map
 			onlinePlayerMap := ext.GameOnlinePlayerMap.Get(in.GameId).(*treemap.Map)
 			onlinePlayerMap := ext.GameOnlinePlayerMap.Get(in.GameId).(*treemap.Map)
 			if onlinePlayerMap.Contains(in.Uid) {
 			if onlinePlayerMap.Contains(in.Uid) {
-				l.Logger.Error("such player has been connected")
+				l.Logger.Error("该玩家已在线")
 			} else {
 			} else {
-				// 不存在换这个玩家,判断是否vip
-				if playerInfo := ext.GetVipPlayer(in.GameId, in.Uid); playerInfo != nil {
-					playerInfo.ConnectTs = time.Now().Unix()
-					onlinePlayerMap.Insert(in.Uid, playerInfo)
-				} else {
-					// 不是vip
-					ts := time.Now().Unix()
-					playerInfo := model.PlayerInfo{
-						GameId:    in.GameId,
-						PlayerId:  in.Uid,
-						ConnectTs: ts,
-						EnqueueTs: ts,
-					}
-					onlinePlayerMap.Insert(in.Uid, &playerInfo)
-					// 放入等待队列
-					ext.WaitingList.PushBack(&playerInfo)
-					l.Logger.Infof("enqueue waiting list: %s", ext.WaitingList.String())
+				ts := time.Now().Unix()
+				playerInfo := &model.PlayerInfo{
+					GameId:     in.GameId,
+					PlayerId:   in.Uid,
+					IsVip:      0,
+					CsId:       "",
+					ConnectTs:  ts,
+					LastChatTs: 0,
+					EnqueueTs:  ts,
+					DequeueTs:  0,
 				}
 				}
+				//if playerInfo == nil {
+				//	l.Logger.Infof("playerInfo is nil")
+				//}
+				onlinePlayerMap.Insert(in.Uid, playerInfo)
+				// 放入等待队列
+				ext.WaitingQueue.Insert(in.GameId+"_"+in.Uid, playerInfo)
+				l.Logger.Infof("enqueue waiting list: %s", ext.WaitingQueue)
+				//TODO 返回等待信息
 			}
 			}
 		} else {
 		} else {
 			onlinePlayerMap := treemap.New(treemap.WithGoroutineSafe())
 			onlinePlayerMap := treemap.New(treemap.WithGoroutineSafe())
-			// 判断是不是vip玩家
-			if playerInfo := ext.GetVipPlayer(in.GameId, in.Uid); playerInfo != nil {
-				playerInfo.ConnectTs = time.Now().Unix()
-				onlinePlayerMap.Insert(in.Uid, playerInfo)
-			} else {
-				// 不是vip
-				ts := time.Now().Unix()
-				playerInfo := model.PlayerInfo{
-					GameId:    in.GameId,
-					PlayerId:  in.Uid,
-					ConnectTs: ts,
-					EnqueueTs: ts,
-				}
-				onlinePlayerMap.Insert(in.Uid, &playerInfo)
-				// 放入等待队列
-				ext.WaitingList.PushBack(&playerInfo)
-				l.Logger.Infof("enqueue waiting list: %s", ext.WaitingList.String())
+			ts := time.Now().Unix()
+			playerInfo := &model.PlayerInfo{
+				GameId:    in.GameId,
+				PlayerId:  in.Uid,
+				ConnectTs: ts,
+				EnqueueTs: ts,
 			}
 			}
+			onlinePlayerMap.Insert(in.Uid, playerInfo)
+			// 放入等待队列
+			ext.WaitingQueue.Insert(in.GameId+"_"+in.Uid, playerInfo)
+			l.Logger.Infof("enqueue waiting list: %s", ext.WaitingQueue)
 			ext.GameOnlinePlayerMap.Insert(in.GameId, onlinePlayerMap)
 			ext.GameOnlinePlayerMap.Insert(in.GameId, onlinePlayerMap)
+			//TODO 返回等待信息
+
+		}
+	case globalkey.ConnectTypeVipPlayer:
+		var onlinePlayerMap *treemap.Map
+		if ext.GameOnlinePlayerMap.Contains(in.GameId) {
+			onlinePlayerMap = ext.GameOnlinePlayerMap.Get(in.GameId).(*treemap.Map)
+		} else {
+			onlinePlayerMap = treemap.New(treemap.WithGoroutineSafe())
+		}
+
+		if playerInfo := ext.GetVipPlayer(in.GameId, in.Uid); playerInfo != nil {
+			playerInfo.ConnectTs = time.Now().Unix()
+			onlinePlayerMap.Insert(in.Uid, playerInfo)
+			ext.GameOnlinePlayerMap.Insert(in.GameId, onlinePlayerMap)
+		} else {
+			return nil, errors.Wrap(result.NewErrMsg("用户不存在"), "")
 		}
 		}
 	case globalkey.ConnectTypeCs:
 	case globalkey.ConnectTypeCs:
 		if csInfo := ext.GetCsInfo(in.Uid); csInfo != nil {
 		if csInfo := ext.GetCsInfo(in.Uid); csInfo != nil {
 			csInfo.OnlineStatus = 1
 			csInfo.OnlineStatus = 1
+			//TODO 返回等待信息
 		} else {
 		} else {
-			return nil, errors.Wrap(result.NewErrMsg("no such user"), "")
+			return nil, errors.Wrap(result.NewErrMsg("用户不存在"), "")
 		}
 		}
 	default:
 	default:
-		return nil, errors.Wrap(result.NewErrMsg("no such user type"), "")
+		return nil, errors.Wrap(result.NewErrMsg("用户不存在"), "")
 	}
 	}
 	return &pb.NotifyUserStatusResp{}, nil
 	return &pb.NotifyUserStatusResp{}, nil
 }
 }

+ 2 - 3
core/inner/rpc/internal/logic/playerfetchcsinfologic.go

@@ -37,8 +37,7 @@ func (l *PlayerFetchCsInfoLogic) PlayerFetchCsInfo(in *pb.InnerPlayerFetchCsInfo
 				OnlineStatus: csInfo.OnlineStatus,
 				OnlineStatus: csInfo.OnlineStatus,
 			}, nil
 			}, nil
 		}
 		}
-		return nil, errors.Wrap(result.NewErrMsg("Customer service information does not exist"), "")
+		return nil, errors.Wrap(result.NewErrMsg("查询的客服不存在"), "")
 	}
 	}
-	return nil, errors.Wrap(result.NewErrMsg("The player is not connected"), "")
-
+	return nil, errors.Wrap(result.NewErrMsg("玩家未连接"), "")
 }
 }

+ 23 - 13
core/inner/rpc/internal/svc/servicecontext.go

@@ -5,7 +5,6 @@ import (
 	"github.com/Shopify/sarama"
 	"github.com/Shopify/sarama"
 	"github.com/bytedance/sonic"
 	"github.com/bytedance/sonic"
 	"github.com/gookit/event"
 	"github.com/gookit/event"
-	"github.com/liyue201/gostl/ds/list/simplelist"
 	treemap "github.com/liyue201/gostl/ds/map"
 	treemap "github.com/liyue201/gostl/ds/map"
 	"github.com/robfig/cron/v3"
 	"github.com/robfig/cron/v3"
 	"github.com/zeromicro/go-zero/core/logx"
 	"github.com/zeromicro/go-zero/core/logx"
@@ -69,37 +68,48 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 		return
 		return
 	}
 	}
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
+		logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
 		var message model.KqMessage
 		var message model.KqMessage
 		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
 		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
 			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
 			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
 			return
 			return
 		}
 		}
-		logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
+		if message.Opt != model.CMD_SEND_MESSAGE {
+			// 指令异常
+			return
+		}
+
+		var chatMessage model.ChatMessage
+		if err := sonic.Unmarshal([]byte(message.Payload), &chatMessage); err != nil {
+			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
+			return
+		}
+
 		trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
 		trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
-			if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
+			if len(chatMessage.ReceiverId) == 0 || chatMessage.ReceiverId == "" {
 				// receiverId为空代表这条消息是玩家发送的
 				// receiverId为空代表这条消息是玩家发送的
 				// 玩家发的消息,先从connectedMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
 				// 玩家发的消息,先从connectedMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
-				if playerInfo := ext.GetConnectedPlayerInfo(message.GameId, message.Uid); playerInfo != nil {
-					message.ReceiverId = playerInfo.CsId
+				if playerInfo := ext.GetConnectedPlayerInfo(chatMessage.GameId, chatMessage.Uid); playerInfo != nil {
+					chatMessage.ReceiverId = playerInfo.CsId
 				} else {
 				} else {
-					if playerInfo := ext.GetVipPlayer(message.GameId, message.Uid); playerInfo != nil {
-						message.ReceiverId = playerInfo.CsId
+					if playerInfo := ext.GetVipPlayer(chatMessage.GameId, chatMessage.Uid); playerInfo != nil {
+						chatMessage.ReceiverId = playerInfo.CsId
 					} else {
 					} else {
-						message.ReceiverId = ""
+						chatMessage.ReceiverId = ""
 					}
 					}
 				}
 				}
 
 
 				// 经过填补后receiver_id还是空的则有异常,丢弃信息不投递
 				// 经过填补后receiver_id还是空的则有异常,丢弃信息不投递
-				if len(message.ReceiverId) != 0 && message.ReceiverId != "" {
-					logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
+				if len(chatMessage.ReceiverId) != 0 && chatMessage.ReceiverId != "" {
+					logx.WithContext(ctx).Infof("receiver: %s", chatMessage.ReceiverId)
 					kMsg, _ := sonic.MarshalString(message)
 					kMsg, _ := sonic.MarshalString(message)
-					s.KqMsgBoxProducer.SendMessage(ctx, kMsg, message.ReceiverId)
+					s.KqMsgBoxProducer.SendMessage(ctx, kMsg, chatMessage.ReceiverId)
 				} else {
 				} else {
 					logx.WithContext(ctx).Errorf("can not find receiver of the sender")
 					logx.WithContext(ctx).Errorf("can not find receiver of the sender")
 				}
 				}
 			} else {
 			} else {
 				// receiverId不为空代表这条消息是客服发的
 				// receiverId不为空代表这条消息是客服发的
-				s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
+				s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), chatMessage.ReceiverId)
 			}
 			}
 			sess.MarkMessage(msg, "")
 			sess.MarkMessage(msg, "")
 		}, attribute.String("msg.key", string(msg.Key)))
 		}, attribute.String("msg.key", string(msg.Key)))
@@ -129,7 +139,7 @@ func fetchCsCenterInfo(c config.Config) {
 	ext.GameVipMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.GameVipMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.GameOnlinePlayerMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.GameOnlinePlayerMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.GameConnectedMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.GameConnectedMap = treemap.New(treemap.WithGoroutineSafe())
-	ext.WaitingList = simplelist.New()
+	ext.WaitingQueue = treemap.New(treemap.WithGoroutineSafe())
 	go loadMockInfo(c)
 	go loadMockInfo(c)
 }
 }
 
 

+ 9 - 3
core/inner/rpc/pb/inner.proto

@@ -52,11 +52,17 @@ message InnerCsConnectPlayerResp{}
 
 
 message NotifyUserStatusReq{
 message NotifyUserStatusReq{
   int32 type = 1;
   int32 type = 1;
-  string uid = 2;
-  string game_id = 3;
+  string game_id = 2;
+  string uid = 3;
 }
 }
 
 
-message NotifyUserStatusResp{}
+message NotifyUserStatusResp{
+  int32 type = 1;
+  int32 queue_size = 2;
+  string game_id = 3;
+  string uid = 4;
+  string cs_id = 5;
+}
 
 
 service Inner {
 service Inner {
   rpc playerFetchCsInfo (InnerPlayerFetchCsInfoReq) returns (InnerPlayerFetchCsInfoResp);
   rpc playerFetchCsInfo (InnerPlayerFetchCsInfoReq) returns (InnerPlayerFetchCsInfoResp);

+ 15 - 15
flowsrv/rpc/internal/logic/connectlogic.go

@@ -53,22 +53,22 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 	}
 	}
 
 
 	var flowId string
 	var flowId string
-	if in.Type == globalkey.ConnectTypePlayer {
-		flowId = gameId + "_" + uid
-	} else {
+	if in.Type == globalkey.ConnectTypeCs {
 		flowId = uid
 		flowId = uid
+	} else {
+		flowId = gameId + "_" + uid
 	}
 	}
 
 
 	flow := &model.Flow{
 	flow := &model.Flow{
-		EndFlow: make(chan int),
-		Message: make(chan string),
-		Stream:  stream,
-		SvcCtx:  l.svcCtx,
-		Logger:  l.Logger,
-		Type:    in.Type,
-		Uid:     uid,
-		GameId:  gameId,
-		FlowId:  flowId,
+		EndFlow:     make(chan int),
+		Message:     make(chan string),
+		Stream:      stream,
+		RedisClient: l.svcCtx.RedisClient,
+		InnerRpc:    l.svcCtx.InnerRpc,
+		Type:        in.Type,
+		Uid:         uid,
+		GameId:      gameId,
+		FlowId:      flowId,
 	}
 	}
 	defer func() {
 	defer func() {
 		close(flow.EndFlow)
 		close(flow.EndFlow)
@@ -91,11 +91,11 @@ func (l *ConnectLogic) checkAuth(in *pb.CommandReq) (string, string, error) {
 	if token.Valid {
 	if token.Valid {
 		//将获取的token中的Claims强转为MapClaims
 		//将获取的token中的Claims强转为MapClaims
 		claims, _ := token.Claims.(jwt.MapClaims)
 		claims, _ := token.Claims.(jwt.MapClaims)
-		if in.Type == globalkey.ConnectTypePlayer {
+		if in.Type == globalkey.ConnectTypeCs {
+			uid = claims[jwtkey.CsId].(string)
+		} else {
 			uid = claims[jwtkey.PlayerId].(string)
 			uid = claims[jwtkey.PlayerId].(string)
 			gameId = claims[jwtkey.GameId].(string)
 			gameId = claims[jwtkey.GameId].(string)
-		} else {
-			uid = claims[jwtkey.CsId].(string)
 		}
 		}
 		return uid, gameId, nil
 		return uid, gameId, nil
 	} else if ve, ok := err.(*jwt.ValidationError); ok {
 	} else if ve, ok := err.(*jwt.ValidationError); ok {

+ 8 - 4
flowsrv/rpc/internal/mgr/flowmgr.go

@@ -48,7 +48,7 @@ func (manager *flowManager) registerFlow(flow *model.Flow) {
 		select {
 		select {
 		case <-flow.Stream.Context().Done():
 		case <-flow.Stream.Context().Done():
 			if manager.Has(flow.FlowId) {
 			if manager.Has(flow.FlowId) {
-				flow.Logger.Infof("stream was disconnected abnormally")
+				logx.Info("stream was disconnected abnormally")
 				manager.UnRegister(flow.FlowId)
 				manager.UnRegister(flow.FlowId)
 				manager.handleUserOffline(flow)
 				manager.handleUserOffline(flow)
 			}
 			}
@@ -62,7 +62,7 @@ func (manager *flowManager) registerFlow(flow *model.Flow) {
 					Data: []byte(msg),
 					Data: []byte(msg),
 				})
 				})
 			} else {
 			} else {
-				flow.Logger.Error("message channel is close")
+				logx.Error("message channel is close")
 				return
 				return
 			}
 			}
 		}
 		}
@@ -78,7 +78,7 @@ func (manager *flowManager) subscribeRmq(flow *model.Flow) {
 				logx.WithContext(ctx).Infof("unsubscribe rmq...")
 				logx.WithContext(ctx).Infof("unsubscribe rmq...")
 				return
 				return
 			default:
 			default:
-				resultCmd := flow.SvcCtx.RedisClient.BRPop(ctx, 30*time.Second, flow.FlowId)
+				resultCmd := flow.RedisClient.BRPop(ctx, 30*time.Second, flow.FlowId)
 				if message, err := resultCmd.Result(); err != nil {
 				if message, err := resultCmd.Result(); err != nil {
 					logx.WithContext(ctx).Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
 					logx.WithContext(ctx).Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
 				} else {
 				} else {
@@ -95,7 +95,7 @@ func (manager *flowManager) handleUserOffline(flow *model.Flow) {
 	traceId := ctxdata.GetTraceIdFromCtx(flow.Stream.Context())
 	traceId := ctxdata.GetTraceIdFromCtx(flow.Stream.Context())
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
 		trace.StartTrace(ctx, "FlowsrvServer.flowmgr.handleUserOffline", func(ctx context.Context) {
 		trace.StartTrace(ctx, "FlowsrvServer.flowmgr.handleUserOffline", func(ctx context.Context) {
-			_, err := flow.SvcCtx.InnerRpc.NotifyUserOffline(ctx, &inner.NotifyUserStatusReq{
+			_, err := flow.InnerRpc.NotifyUserOffline(ctx, &inner.NotifyUserStatusReq{
 				Type:   flow.Type,
 				Type:   flow.Type,
 				Uid:    flow.Uid,
 				Uid:    flow.Uid,
 				GameId: flow.GameId,
 				GameId: flow.GameId,
@@ -107,6 +107,10 @@ func (manager *flowManager) handleUserOffline(flow *model.Flow) {
 	})
 	})
 }
 }
 
 
+func (manager *flowManager) All() *treemap.Map {
+	return manager.flowMap
+}
+
 func (manager *flowManager) Get(flowId string) *model.Flow {
 func (manager *flowManager) Get(flowId string) *model.Flow {
 	return manager.flowMap.Get(flowId).(*model.Flow)
 	return manager.flowMap.Get(flowId).(*model.Flow)
 }
 }

+ 11 - 11
flowsrv/rpc/internal/model/bean.go

@@ -5,19 +5,19 @@
 package model
 package model
 
 
 import (
 import (
-	"github.com/zeromicro/go-zero/core/logx"
-	"ylink/flowsrv/rpc/internal/svc"
+	"github.com/go-redis/redis/v8"
+	"ylink/core/inner/rpc/inner"
 	"ylink/flowsrv/rpc/pb"
 	"ylink/flowsrv/rpc/pb"
 )
 )
 
 
 type Flow struct {
 type Flow struct {
-	EndFlow chan int
-	Message chan string
-	SvcCtx  *svc.ServiceContext
-	Logger  logx.Logger
-	Stream  pb.Flowsrv_ConnectServer
-	Type    int32
-	Uid     string
-	GameId  string
-	FlowId  string
+	EndFlow     chan int
+	Message     chan string
+	Stream      pb.Flowsrv_ConnectServer
+	RedisClient *redis.Client
+	InnerRpc    inner.Inner
+	Type        int32
+	Uid         string
+	GameId      string
+	FlowId      string
 }
 }

+ 97 - 16
flowsrv/rpc/internal/svc/servicecontext.go

@@ -5,7 +5,6 @@ import (
 	"github.com/Shopify/sarama"
 	"github.com/Shopify/sarama"
 	"github.com/bytedance/sonic"
 	"github.com/bytedance/sonic"
 	"github.com/go-redis/redis/v8"
 	"github.com/go-redis/redis/v8"
-	"github.com/gookit/event"
 	"github.com/zeromicro/go-zero/core/logx"
 	"github.com/zeromicro/go-zero/core/logx"
 	gozerotrace "github.com/zeromicro/go-zero/core/trace"
 	gozerotrace "github.com/zeromicro/go-zero/core/trace"
 	"github.com/zeromicro/go-zero/zrpc"
 	"github.com/zeromicro/go-zero/zrpc"
@@ -17,10 +16,15 @@ import (
 	"ylink/comm/globalkey"
 	"ylink/comm/globalkey"
 	"ylink/comm/kafka"
 	"ylink/comm/kafka"
 	"ylink/comm/model"
 	"ylink/comm/model"
+	"ylink/comm/result"
 	"ylink/comm/trace"
 	"ylink/comm/trace"
 	"ylink/comm/utils"
 	"ylink/comm/utils"
 	"ylink/core/inner/rpc/inner"
 	"ylink/core/inner/rpc/inner"
+	"ylink/flowsrv/rpc/flowsrv"
 	"ylink/flowsrv/rpc/internal/config"
 	"ylink/flowsrv/rpc/internal/config"
+	"ylink/flowsrv/rpc/internal/mgr"
+	model2 "ylink/flowsrv/rpc/internal/model"
+	//model2 "ylink/flowsrv/rpc/internal/model"
 )
 )
 
 
 type ServiceContext struct {
 type ServiceContext struct {
@@ -100,15 +104,27 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 		return
 		return
 	}
 	}
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
+		logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
 		var message model.KqMessage
 		var message model.KqMessage
 		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
 		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
 			logx.Errorf("unmarshal msg error: %v", err)
 			logx.Errorf("unmarshal msg error: %v", err)
 			return
 			return
 		}
 		}
-		logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
+
+		if message.Opt != model.CMD_SEND_MESSAGE {
+			// 指令异常
+			return
+		}
+
+		var chatMessage model.ChatMessage
+		if err := sonic.Unmarshal([]byte(message.Payload), &chatMessage); err != nil {
+			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
+			return
+		}
+
 		trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
 		trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
 			// 投递到receiver_id对应的redis队列暂存
 			// 投递到receiver_id对应的redis队列暂存
-			intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
+			intCmd := s.RedisClient.LPush(ctx, chatMessage.ReceiverId, string(msg.Value))
 			if size, err := intCmd.Result(); err != nil {
 			if size, err := intCmd.Result(); err != nil {
 				logx.WithContext(ctx).Errorf("push message rmq err %v", err)
 				logx.WithContext(ctx).Errorf("push message rmq err %v", err)
 			} else {
 			} else {
@@ -125,23 +141,56 @@ func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sa
 		return
 		return
 	}
 	}
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
 	trace.RunOnTracing(traceId, func(ctx context.Context) {
-		var message model.KqCmdMessage
+		logx.WithContext(ctx).Infof("handle command: %s", msg.Value)
+		var message model.KqMessage
 		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
 		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
 			logx.Errorf("unmarshal msg error: %v", err)
 			logx.Errorf("unmarshal msg error: %v", err)
 			return
 			return
 		}
 		}
-		logx.WithContext(ctx).Infof("handle command: %s", msg.Value)
-		trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushMessage", func(ctx context.Context) {
-			// 投递到receiver_id对应的redis队列暂存
-			intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
-			if size, err := intCmd.Result(); err != nil {
-				logx.WithContext(ctx).Errorf("push message rmq err %v", err)
-			} else {
-				logx.WithContext(ctx).Infof("current rmq size: %d", size)
+
+		if message.Opt == model.CMD_SEND_MESSAGE {
+			// 指令异常
+			return
+		}
+
+		var cmdMessage model.CommandMessage
+		if err := sonic.Unmarshal([]byte(message.Payload), &cmdMessage); err != nil {
+			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
+			return
+		}
+		// 投递到receiver_id对应的redis队列暂存
+		switch cmdMessage.ReceiverId {
+		case globalkey.All:
+		case globalkey.AllPlayer:
+		case globalkey.AllVipPlayer:
+		case globalkey.AllNormalPlayer:
+			for iter := mgr.GetFlowMgrInstance().All().Begin(); iter.IsValid(); iter.Next() {
+				flow := iter.Value().(*model2.Flow)
+				if flow.Type != globalkey.ConnectTypeNormalPlayer {
+					continue
+				}
+				err := flow.Stream.Send(&flowsrv.CommandResp{
+					Code: result.Ok,
+					Msg:  "success",
+					Data: msg.Value,
+				})
+				if err != nil {
+					logx.WithContext(ctx).Errorf("%v", err)
+				}
 			}
 			}
+		case globalkey.AllCs:
+		default:
+			trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushCmdMessage", func(ctx context.Context) {
+				intCmd := s.RedisClient.LPush(ctx, cmdMessage.ReceiverId, string(msg.Value))
+				if size, err := intCmd.Result(); err != nil {
+					logx.WithContext(ctx).Errorf("push message rmq err %v", err)
+				} else {
+					logx.WithContext(ctx).Infof("current rmq size: %d", size)
+				}
 
 
+			}, attribute.String("msg.key", string(msg.Key)))
 			sess.MarkMessage(msg, "")
 			sess.MarkMessage(msg, "")
-		}, attribute.String("msg.key", string(msg.Key)))
+		}
 	})
 	})
 }
 }
 
 
@@ -149,9 +198,41 @@ func (s *ServiceContext) subscribe() {
 	go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
 	go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
 	go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
 	go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
 
 
+	//event.On(globalkey.EventHandleRmqJob, event.ListenerFunc(func(e event.Event) error {
+	//	resultCmd := flow.SvcCtx.RedisClient.BRPop(ctx, 30*time.Second, flow.FlowId)
+	//	if message, err := resultCmd.Result(); err != nil {
+	//		logx.WithContext(ctx).Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
+	//	} else {
+	//		trace.StartTrace(ctx, "FlowsrvServer.flowmgr.handleRmqMessage", func(ctx context.Context) {
+	//			flow.Message <- message[1]
+	//		})
+	//	}
+	//	return nil
+	//}), event.High)
+
 	// 注册事件
 	// 注册事件
-	event.On(globalkey.EventUnsubscribeRmq, event.ListenerFunc(func(e event.Event) error {
+	//event.On(globalkey.EventUnsubscribeRmqJob, event.ListenerFunc(func(e event.Event) error {
+	//
+	//	return nil
+	//}), event.High)
 
 
-		return nil
-	}), event.High)
+	//event.On(globalkey.EventNotifyUserOfflineJob, event.ListenerFunc(func(e event.Event) error {
+	//	traceId := e.Get("trace_id").(string)
+	//	uType := e.Get("type").(int32)
+	//	uid := e.Get("uid").(string)
+	//	gameId := e.Get("game_id").(string)
+	//	trace.RunOnTracing(traceId, func(ctx context.Context) {
+	//		trace.StartTrace(ctx, "FlowsrvServer.EventNotifyUserOfflineJob.handleUserOffline", func(ctx context.Context) {
+	//			_, err := s.InnerRpc.NotifyUserOffline(ctx, &inner.NotifyUserStatusReq{
+	//				Type:   uType,
+	//				Uid:    uid,
+	//				GameId: gameId,
+	//			})
+	//			if err != nil {
+	//				logx.WithContext(ctx).Errorf("notify user offline has some error: %v", err)
+	//			}
+	//		})
+	//	})
+	//	return nil
+	//}), event.High)
 }
 }