소스 검색

v0.0.1开发:调整KqMessage结构

#Suyghur 2 년 전
부모
커밋
e545ae608c

+ 9 - 11
comm/model/message.go

@@ -11,23 +11,21 @@ const (
 )
 
 type KqMessage struct {
-	Opt     int32  `json:"opt"`
-	Payload string `json:"payload"`
-	Ext     string `json:"ext"`
+	Opt        int32  `json:"opt"`
+	CreateTs   int64  `json:"create_ts"`
+	Payload    string `json:"payload"`
+	ReceiverId string `json:"receiver_id"`
+	SenderId   string `json:"sender_id"`
+	GameId     string `json:"game_id"`
+	Uid        string `json:"uid"`
+	Ext        string `json:"ext"`
 }
 type ChatMessage struct {
 	CreateTime string `json:"create_time"`
 	Content    string `json:"content"`
 	Pic        string `json:"pic"`
-	ReceiverId string `json:"receiver_id"`
-	SenderId   string `json:"sender_id"`
-	GameId     string `json:"game_id"`
-	Uid        string `json:"uid"`
 }
 
 type CommandMessage struct {
-	Payload    string `json:"payload"`
-	ReceiverId string `json:"receiver_id"`
-	GameId     string `json:"game_id"`
-	Uid        string `json:"uid"`
+	CmdInfo interface{} `json:"cmd_info"`
 }

+ 12 - 10
core/cmd/rpc/internal/logic/cssendmsglogic.go

@@ -28,22 +28,24 @@ func NewCsSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CsSendM
 
 func (l *CsSendMsgLogic) CsSendMsg(in *pb.CsSendMsgReq) (*pb.CsSendMsgResp, error) {
 	// 投递到自己的发件箱
-	message := &model.ChatMessage{
-		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
+	uniqueId := in.GameId + "_" + in.PlayerId
+	t := time.Now()
+	payload, _ := sonic.MarshalString(&model.ChatMessage{
+		CreateTime: t.Format("2006-01-02 15:04:05"),
 		Content:    in.Content,
 		Pic:        in.Pic,
-		ReceiverId: in.GameId + "_" + in.PlayerId,
+	})
+	kMsg, _ := sonic.MarshalString(&model.KqMessage{
+		Opt:        model.CMD_SEND_MESSAGE,
+		CreateTs:   t.Unix(),
+		Payload:    payload,
 		SenderId:   in.CsId,
+		ReceiverId: uniqueId,
 		GameId:     in.GameId,
 		Uid:        in.CsId,
-	}
-	payload, _ := sonic.MarshalString(message)
-	kMsg, _ := sonic.MarshalString(&model.KqMessage{
-		Opt:     model.CMD_SEND_MESSAGE,
-		Payload: payload,
-		Ext:     "",
+		Ext:        "",
 	})
-	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, message.SenderId)
+	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, in.CsId)
 	if err != nil {
 		return nil, err
 	}

+ 13 - 11
core/cmd/rpc/internal/logic/playersendmsglogic.go

@@ -27,21 +27,23 @@ func NewPlayerSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Pla
 
 func (l *PlayerSendMsgLogic) PlayerSendMsg(in *pb.PlayerSendMsgReq) (*pb.PlayerSendMsgResp, error) {
 	// 投递到自己的发件箱
-	message := &model.ChatMessage{
-		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
+	uniqueId := in.GameId + "_" + in.PlayerId
+	t := time.Now()
+	payload, _ := sonic.MarshalString(&model.ChatMessage{
+		CreateTime: t.Format("2006-01-02 15:04:05"),
 		Content:    in.Content,
 		Pic:        in.Pic,
-		SenderId:   in.GameId + "_" + in.PlayerId,
-		GameId:     in.GameId,
-		Uid:        in.PlayerId,
-	}
-	payload, _ := sonic.MarshalString(message)
+	})
 	kMsg, _ := sonic.MarshalString(&model.KqMessage{
-		Opt:     model.CMD_SEND_MESSAGE,
-		Payload: payload,
-		Ext:     "",
+		Opt:      model.CMD_SEND_MESSAGE,
+		CreateTs: t.Unix(),
+		Payload:  payload,
+		SenderId: uniqueId,
+		GameId:   in.GameId,
+		Uid:      in.PlayerId,
+		Ext:      "",
 	})
-	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, message.SenderId)
+	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, kMsg, uniqueId)
 	if err != nil {
 		return nil, err
 	}

+ 28 - 25
core/inner/rpc/internal/logic/csconnectplayerlogic.go

@@ -53,27 +53,28 @@ func (l *CsConnectPlayerLogic) CsConnectPlayer(in *pb.InnerCsConnectPlayerReq) (
 	}
 
 	// 移除WaitingQueue
-	key := in.GameId + "_" + in.PlayerId
-	if ext.WaitingQueue.Contains(key) {
+	uniqueId := in.GameId + "_" + in.PlayerId
+	if ext.WaitingQueue.Contains(uniqueId) {
 		l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.PlayerId)
-		ext.WaitingQueue.Erase(key)
-		//TODO 广播客户端更新等待队列信息
-		queueInfo, _ := sonic.MarshalString(map[string]interface{}{
-			"queue_size": ext.WaitingQueue.Size(),
+		ext.WaitingQueue.Erase(uniqueId)
+
+		// 广播客户端更新等待队列信息
+		payload, _ := sonic.MarshalString(&model.CommandMessage{
+			CmdInfo: map[string]interface{}{
+				"queue_size": ext.WaitingQueue.Size(),
+			},
 		})
-		cmdMessage := &model.CommandMessage{
-			Payload:    queueInfo,
+		kMsg, _ := sonic.MarshalString(&model.KqMessage{
+			Opt:        model.CMD_UPDATE_WAITING_QUEUE,
+			CreateTs:   time.Now().Unix(),
+			Payload:    payload,
+			SenderId:   uniqueId,
 			ReceiverId: globalkey.AllNormalPlayer,
 			GameId:     in.GameId,
 			Uid:        in.PlayerId,
-		}
-		payload, _ := sonic.MarshalString(cmdMessage)
-		message, _ := sonic.MarshalString(&model.KqMessage{
-			Opt:     model.CMD_UPDATE_WAITING_QUEUE,
-			Payload: payload,
-			Ext:     "",
+			Ext:        "",
 		})
-		l.svcCtx.KqCmdBoxProducer.SendMessage(l.ctx, message, globalkey.AllNormalPlayer)
+		l.svcCtx.KqCmdBoxProducer.SendMessage(l.ctx, kMsg, globalkey.AllNormalPlayer)
 	}
 
 	traceId := ctxdata.GetTraceIdFromCtx(l.ctx)
@@ -93,19 +94,21 @@ func (l *CsConnectPlayerLogic) CsConnectPlayer(in *pb.InnerCsConnectPlayerReq) (
 
 				trace.StartTrace(ctx, "InnerServer.CountDownTimer.SendCmdMessage", func(ctx context.Context) {
 					// 发踢下线的command指令
-					cmdMessage := &model.CommandMessage{
-						Payload:    "",
-						ReceiverId: in.GameId + "_" + in.PlayerId,
+					uniqueId := in.GameId + "_" + in.PlayerId
+					payload, _ := sonic.MarshalString(&model.CommandMessage{
+						CmdInfo: "",
+					})
+					kMsg, _ := sonic.MarshalString(&model.KqMessage{
+						Opt:        model.CMD_CHAT_TIMEOUT,
+						CreateTs:   time.Now().Unix(),
+						Payload:    payload,
+						SenderId:   uniqueId,
+						ReceiverId: uniqueId,
 						GameId:     in.GameId,
 						Uid:        in.PlayerId,
-					}
-					payload, _ := sonic.MarshalString(cmdMessage)
-					message, _ := sonic.MarshalString(&model.KqMessage{
-						Opt:     model.CMD_CHAT_TIMEOUT,
-						Payload: payload,
-						Ext:     "",
+						Ext:        "",
 					})
-					l.svcCtx.KqCmdBoxProducer.SendMessage(ctx, message, in.GameId+"_"+in.PlayerId)
+					l.svcCtx.KqCmdBoxProducer.SendMessage(ctx, kMsg, uniqueId)
 				})
 			}
 		})

+ 23 - 23
core/inner/rpc/internal/logic/notifyuserofflinelogic.go

@@ -2,9 +2,12 @@ package logic
 
 import (
 	"context"
+	"github.com/bytedance/sonic"
 	treemap "github.com/liyue201/gostl/ds/map"
 	"github.com/pkg/errors"
+	"time"
 	"ylink/comm/globalkey"
+	"ylink/comm/model"
 	"ylink/comm/result"
 	"ylink/core/inner/rpc/internal/ext"
 
@@ -43,32 +46,29 @@ func (l *NotifyUserOfflineLogic) NotifyUserOffline(in *pb.NotifyUserStatusReq) (
 			}
 		}
 
-		key := in.GameId + "_" + in.Uid
-		if ext.WaitingQueue.Contains(key) {
+		uniqueId := in.GameId + "_" + in.Uid
+		if ext.WaitingQueue.Contains(uniqueId) {
 			l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.Uid)
-			ext.WaitingQueue.Erase(key)
+			ext.WaitingQueue.Erase(uniqueId)
 
+			// 广播客户端更新等待队列信息
+			payload, _ := sonic.MarshalString(&model.CommandMessage{
+				CmdInfo: map[string]interface{}{
+					"queue_size": ext.WaitingQueue.Size(),
+				},
+			})
+			kMsg, _ := sonic.MarshalString(&model.KqMessage{
+				Opt:        model.CMD_UPDATE_WAITING_QUEUE,
+				CreateTs:   time.Now().Unix(),
+				Payload:    payload,
+				SenderId:   uniqueId,
+				ReceiverId: globalkey.AllNormalPlayer,
+				GameId:     in.GameId,
+				Uid:        in.Uid,
+				Ext:        "",
+			})
+			l.svcCtx.KqCmdBoxProducer.SendMessage(l.ctx, kMsg, globalkey.AllNormalPlayer)
 		}
-		//for n := ext.WaitingList.FrontNode(); n != nil; n = n.Next() {
-		//	info := n.Value.(*model.PlayerInfo)
-		//	l.Logger.Infof("playerInfo: %v", info)
-		//	if info.GameId == in.GameId && info.PlayerId == in.Uid {
-		//		l.Logger.Infof("remove the player from the queue, game_id: %s, player_id: %s", in.GameId, in.Uid)
-		//		ext.WaitingList.Remove(nil, n)
-		//		//TODO 广播客户端更新等待队列信息
-		//		//queueInfo, _ := sonic.MarshalString(map[string]interface{}{
-		//		//	"queue_size": 10,
-		//		//})
-		//		//message, _ := sonic.MarshalString(&model.KqCmdMessage{
-		//		//	Opt:        model.CMD_UPDATE_WAITING_QUEUE,
-		//		//	ReceiverId: "all",
-		//		//	GameId:     in.GameId,
-		//		//	Uid:        in.Uid,
-		//		//	Ext:        queueInfo,
-		//		//})
-		//		break
-		//	}
-		//}
 	case globalkey.ConnectTypeVipPlayer:
 		// 修改玩家在线状态
 		if ext.GameOnlinePlayerMap.Contains(in.GameId) {

+ 11 - 16
core/inner/rpc/internal/svc/servicecontext.go

@@ -74,42 +74,37 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
 			return
 		}
+
 		if message.Opt != model.CMD_SEND_MESSAGE {
 			// 指令异常
 			return
 		}
 
-		var chatMessage model.ChatMessage
-		if err := sonic.Unmarshal([]byte(message.Payload), &chatMessage); err != nil {
-			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
-			return
-		}
-
 		trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
-			if len(chatMessage.ReceiverId) == 0 || chatMessage.ReceiverId == "" {
+			if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
 				// receiverId为空代表这条消息是玩家发送的
 				// 玩家发的消息,先从connectedMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
-				if playerInfo := ext.GetConnectedPlayerInfo(chatMessage.GameId, chatMessage.Uid); playerInfo != nil {
-					chatMessage.ReceiverId = playerInfo.CsId
+				if playerInfo := ext.GetConnectedPlayerInfo(message.GameId, message.Uid); playerInfo != nil {
+					message.ReceiverId = playerInfo.CsId
 				} else {
-					if playerInfo := ext.GetVipPlayer(chatMessage.GameId, chatMessage.Uid); playerInfo != nil {
-						chatMessage.ReceiverId = playerInfo.CsId
+					if playerInfo := ext.GetVipPlayer(message.GameId, message.Uid); playerInfo != nil {
+						message.ReceiverId = playerInfo.CsId
 					} else {
-						chatMessage.ReceiverId = ""
+						message.ReceiverId = ""
 					}
 				}
 
 				// 经过填补后receiver_id还是空的则有异常,丢弃信息不投递
-				if len(chatMessage.ReceiverId) != 0 && chatMessage.ReceiverId != "" {
-					logx.WithContext(ctx).Infof("receiver: %s", chatMessage.ReceiverId)
+				if len(message.ReceiverId) != 0 && message.ReceiverId != "" {
+					logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
 					kMsg, _ := sonic.MarshalString(message)
-					s.KqMsgBoxProducer.SendMessage(ctx, kMsg, chatMessage.ReceiverId)
+					s.KqMsgBoxProducer.SendMessage(ctx, kMsg, message.ReceiverId)
 				} else {
 					logx.WithContext(ctx).Errorf("can not find receiver of the sender")
 				}
 			} else {
 				// receiverId不为空代表这条消息是客服发的
-				s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), chatMessage.ReceiverId)
+				s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
 			}
 			sess.MarkMessage(msg, "")
 		}, attribute.String("msg.key", string(msg.Key)))

+ 3 - 55
flowsrv/rpc/internal/svc/servicecontext.go

@@ -24,7 +24,6 @@ import (
 	"ylink/flowsrv/rpc/internal/config"
 	"ylink/flowsrv/rpc/internal/mgr"
 	model2 "ylink/flowsrv/rpc/internal/model"
-	//model2 "ylink/flowsrv/rpc/internal/model"
 )
 
 type ServiceContext struct {
@@ -77,10 +76,8 @@ func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sa
 	for msg := range claim.Messages() {
 
 		if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
-			logx.Info("handleMessage")
 			s.handleMessage(sess, msg)
 		} else if msg.Topic == s.Config.KqCmdBoxConsumerConf.Topic {
-			logx.Info("handleCommand")
 			s.handleCommand(sess, msg)
 		}
 	}
@@ -116,15 +113,9 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 			return
 		}
 
-		var chatMessage model.ChatMessage
-		if err := sonic.Unmarshal([]byte(message.Payload), &chatMessage); err != nil {
-			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
-			return
-		}
-
 		trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
 			// 投递到receiver_id对应的redis队列暂存
-			intCmd := s.RedisClient.LPush(ctx, chatMessage.ReceiverId, string(msg.Value))
+			intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
 			if size, err := intCmd.Result(); err != nil {
 				logx.WithContext(ctx).Errorf("push message rmq err %v", err)
 			} else {
@@ -153,13 +144,8 @@ func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sa
 			return
 		}
 
-		var cmdMessage model.CommandMessage
-		if err := sonic.Unmarshal([]byte(message.Payload), &cmdMessage); err != nil {
-			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
-			return
-		}
 		// 投递到receiver_id对应的redis队列暂存
-		switch cmdMessage.ReceiverId {
+		switch message.ReceiverId {
 		case globalkey.All:
 		case globalkey.AllPlayer:
 		case globalkey.AllVipPlayer:
@@ -181,7 +167,7 @@ func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sa
 		case globalkey.AllCs:
 		default:
 			trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushCmdMessage", func(ctx context.Context) {
-				intCmd := s.RedisClient.LPush(ctx, cmdMessage.ReceiverId, string(msg.Value))
+				intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
 				if size, err := intCmd.Result(); err != nil {
 					logx.WithContext(ctx).Errorf("push message rmq err %v", err)
 				} else {
@@ -197,42 +183,4 @@ func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sa
 func (s *ServiceContext) subscribe() {
 	go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
 	go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
-
-	//event.On(globalkey.EventHandleRmqJob, event.ListenerFunc(func(e event.Event) error {
-	//	resultCmd := flow.SvcCtx.RedisClient.BRPop(ctx, 30*time.Second, flow.FlowId)
-	//	if message, err := resultCmd.Result(); err != nil {
-	//		logx.WithContext(ctx).Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
-	//	} else {
-	//		trace.StartTrace(ctx, "FlowsrvServer.flowmgr.handleRmqMessage", func(ctx context.Context) {
-	//			flow.Message <- message[1]
-	//		})
-	//	}
-	//	return nil
-	//}), event.High)
-
-	// 注册事件
-	//event.On(globalkey.EventUnsubscribeRmqJob, event.ListenerFunc(func(e event.Event) error {
-	//
-	//	return nil
-	//}), event.High)
-
-	//event.On(globalkey.EventNotifyUserOfflineJob, event.ListenerFunc(func(e event.Event) error {
-	//	traceId := e.Get("trace_id").(string)
-	//	uType := e.Get("type").(int32)
-	//	uid := e.Get("uid").(string)
-	//	gameId := e.Get("game_id").(string)
-	//	trace.RunOnTracing(traceId, func(ctx context.Context) {
-	//		trace.StartTrace(ctx, "FlowsrvServer.EventNotifyUserOfflineJob.handleUserOffline", func(ctx context.Context) {
-	//			_, err := s.InnerRpc.NotifyUserOffline(ctx, &inner.NotifyUserStatusReq{
-	//				Type:   uType,
-	//				Uid:    uid,
-	//				GameId: gameId,
-	//			})
-	//			if err != nil {
-	//				logx.WithContext(ctx).Errorf("notify user offline has some error: %v", err)
-	//			}
-	//		})
-	//	})
-	//	return nil
-	//}), event.High)
 }