|
@@ -5,7 +5,6 @@ import (
|
|
|
"github.com/Shopify/sarama"
|
|
|
"github.com/bytedance/sonic"
|
|
|
"github.com/go-redis/redis/v8"
|
|
|
- "github.com/gookit/event"
|
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
|
gozerotrace "github.com/zeromicro/go-zero/core/trace"
|
|
|
"github.com/zeromicro/go-zero/zrpc"
|
|
@@ -17,10 +16,15 @@ import (
|
|
|
"ylink/comm/globalkey"
|
|
|
"ylink/comm/kafka"
|
|
|
"ylink/comm/model"
|
|
|
+ "ylink/comm/result"
|
|
|
"ylink/comm/trace"
|
|
|
"ylink/comm/utils"
|
|
|
"ylink/core/inner/rpc/inner"
|
|
|
+ "ylink/flowsrv/rpc/flowsrv"
|
|
|
"ylink/flowsrv/rpc/internal/config"
|
|
|
+ "ylink/flowsrv/rpc/internal/mgr"
|
|
|
+ model2 "ylink/flowsrv/rpc/internal/model"
|
|
|
+ //model2 "ylink/flowsrv/rpc/internal/model"
|
|
|
)
|
|
|
|
|
|
type ServiceContext struct {
|
|
@@ -100,15 +104,27 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
|
|
|
return
|
|
|
}
|
|
|
trace.RunOnTracing(traceId, func(ctx context.Context) {
|
|
|
+ logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
|
|
|
var message model.KqMessage
|
|
|
if err := sonic.Unmarshal(msg.Value, &message); err != nil {
|
|
|
logx.Errorf("unmarshal msg error: %v", err)
|
|
|
return
|
|
|
}
|
|
|
- logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
|
|
|
+
|
|
|
+ if message.Opt != model.CMD_SEND_MESSAGE {
|
|
|
+ // 指令异常
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ var chatMessage model.ChatMessage
|
|
|
+ if err := sonic.Unmarshal([]byte(message.Payload), &chatMessage); err != nil {
|
|
|
+ logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
|
|
|
// 投递到receiver_id对应的redis队列暂存
|
|
|
- intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
|
|
|
+ intCmd := s.RedisClient.LPush(ctx, chatMessage.ReceiverId, string(msg.Value))
|
|
|
if size, err := intCmd.Result(); err != nil {
|
|
|
logx.WithContext(ctx).Errorf("push message rmq err %v", err)
|
|
|
} else {
|
|
@@ -125,23 +141,56 @@ func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sa
|
|
|
return
|
|
|
}
|
|
|
trace.RunOnTracing(traceId, func(ctx context.Context) {
|
|
|
- var message model.KqCmdMessage
|
|
|
+ logx.WithContext(ctx).Infof("handle command: %s", msg.Value)
|
|
|
+ var message model.KqMessage
|
|
|
if err := sonic.Unmarshal(msg.Value, &message); err != nil {
|
|
|
logx.Errorf("unmarshal msg error: %v", err)
|
|
|
return
|
|
|
}
|
|
|
- logx.WithContext(ctx).Infof("handle command: %s", msg.Value)
|
|
|
- trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushMessage", func(ctx context.Context) {
|
|
|
- // 投递到receiver_id对应的redis队列暂存
|
|
|
- intCmd := s.RedisClient.LPush(ctx, message.ReceiverId, string(msg.Value))
|
|
|
- if size, err := intCmd.Result(); err != nil {
|
|
|
- logx.WithContext(ctx).Errorf("push message rmq err %v", err)
|
|
|
- } else {
|
|
|
- logx.WithContext(ctx).Infof("current rmq size: %d", size)
|
|
|
+
|
|
|
+ if message.Opt == model.CMD_SEND_MESSAGE {
|
|
|
+ // 指令异常
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ var cmdMessage model.CommandMessage
|
|
|
+ if err := sonic.Unmarshal([]byte(message.Payload), &cmdMessage); err != nil {
|
|
|
+ logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ // 投递到receiver_id对应的redis队列暂存
|
|
|
+ switch cmdMessage.ReceiverId {
|
|
|
+ case globalkey.All:
|
|
|
+ case globalkey.AllPlayer:
|
|
|
+ case globalkey.AllVipPlayer:
|
|
|
+ case globalkey.AllNormalPlayer:
|
|
|
+ for iter := mgr.GetFlowMgrInstance().All().Begin(); iter.IsValid(); iter.Next() {
|
|
|
+ flow := iter.Value().(*model2.Flow)
|
|
|
+ if flow.Type != globalkey.ConnectTypeNormalPlayer {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ err := flow.Stream.Send(&flowsrv.CommandResp{
|
|
|
+ Code: result.Ok,
|
|
|
+ Msg: "success",
|
|
|
+ Data: msg.Value,
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ logx.WithContext(ctx).Errorf("%v", err)
|
|
|
+ }
|
|
|
}
|
|
|
+ case globalkey.AllCs:
|
|
|
+ default:
|
|
|
+ trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushCmdMessage", func(ctx context.Context) {
|
|
|
+ intCmd := s.RedisClient.LPush(ctx, cmdMessage.ReceiverId, string(msg.Value))
|
|
|
+ if size, err := intCmd.Result(); err != nil {
|
|
|
+ logx.WithContext(ctx).Errorf("push message rmq err %v", err)
|
|
|
+ } else {
|
|
|
+ logx.WithContext(ctx).Infof("current rmq size: %d", size)
|
|
|
+ }
|
|
|
|
|
|
+ }, attribute.String("msg.key", string(msg.Key)))
|
|
|
sess.MarkMessage(msg, "")
|
|
|
- }, attribute.String("msg.key", string(msg.Key)))
|
|
|
+ }
|
|
|
})
|
|
|
}
|
|
|
|
|
@@ -149,9 +198,41 @@ func (s *ServiceContext) subscribe() {
|
|
|
go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
|
|
|
go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
|
|
|
|
|
|
+ //event.On(globalkey.EventHandleRmqJob, event.ListenerFunc(func(e event.Event) error {
|
|
|
+ // resultCmd := flow.SvcCtx.RedisClient.BRPop(ctx, 30*time.Second, flow.FlowId)
|
|
|
+ // if message, err := resultCmd.Result(); err != nil {
|
|
|
+ // logx.WithContext(ctx).Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
|
|
|
+ // } else {
|
|
|
+ // trace.StartTrace(ctx, "FlowsrvServer.flowmgr.handleRmqMessage", func(ctx context.Context) {
|
|
|
+ // flow.Message <- message[1]
|
|
|
+ // })
|
|
|
+ // }
|
|
|
+ // return nil
|
|
|
+ //}), event.High)
|
|
|
+
|
|
|
// 注册事件
|
|
|
- event.On(globalkey.EventUnsubscribeRmq, event.ListenerFunc(func(e event.Event) error {
|
|
|
+ //event.On(globalkey.EventUnsubscribeRmqJob, event.ListenerFunc(func(e event.Event) error {
|
|
|
+ //
|
|
|
+ // return nil
|
|
|
+ //}), event.High)
|
|
|
|
|
|
- return nil
|
|
|
- }), event.High)
|
|
|
+ //event.On(globalkey.EventNotifyUserOfflineJob, event.ListenerFunc(func(e event.Event) error {
|
|
|
+ // traceId := e.Get("trace_id").(string)
|
|
|
+ // uType := e.Get("type").(int32)
|
|
|
+ // uid := e.Get("uid").(string)
|
|
|
+ // gameId := e.Get("game_id").(string)
|
|
|
+ // trace.RunOnTracing(traceId, func(ctx context.Context) {
|
|
|
+ // trace.StartTrace(ctx, "FlowsrvServer.EventNotifyUserOfflineJob.handleUserOffline", func(ctx context.Context) {
|
|
|
+ // _, err := s.InnerRpc.NotifyUserOffline(ctx, &inner.NotifyUserStatusReq{
|
|
|
+ // Type: uType,
|
|
|
+ // Uid: uid,
|
|
|
+ // GameId: gameId,
|
|
|
+ // })
|
|
|
+ // if err != nil {
|
|
|
+ // logx.WithContext(ctx).Errorf("notify user offline has some error: %v", err)
|
|
|
+ // }
|
|
|
+ // })
|
|
|
+ // })
|
|
|
+ // return nil
|
|
|
+ //}), event.High)
|
|
|
}
|