123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- package svc
- import (
- "context"
- "github.com/Shopify/sarama"
- "github.com/bytedance/sonic"
- "github.com/go-redis/redis/v8"
- "github.com/zeromicro/go-zero/core/logx"
- gozerotrace "github.com/zeromicro/go-zero/core/trace"
- "github.com/zeromicro/go-zero/zrpc"
- "go.opentelemetry.io/otel"
- "go.opentelemetry.io/otel/attribute"
- "go.opentelemetry.io/otel/propagation"
- oteltrace "go.opentelemetry.io/otel/trace"
- "net/http"
- "ylink/comm/globalkey"
- "ylink/comm/kafka"
- "ylink/comm/model"
- "ylink/comm/result"
- "ylink/comm/trace"
- "ylink/comm/utils"
- "ylink/core/inner/rpc/inner"
- "ylink/flowsrv/rpc/flowsrv"
- "ylink/flowsrv/rpc/internal/config"
- "ylink/flowsrv/rpc/internal/mgr"
- model2 "ylink/flowsrv/rpc/internal/model"
- //model2 "ylink/flowsrv/rpc/internal/model"
- )
- type ServiceContext struct {
- Config config.Config
- InnerRpc inner.Inner
- MessageConsumerGroup *kafka.ConsumerGroup
- CommandConsumerGroup *kafka.ConsumerGroup
- RedisClient *redis.Client
- }
- func NewServiceContext(c config.Config) *ServiceContext {
- svcCtx := &ServiceContext{
- Config: c,
- InnerRpc: inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
- RedisClient: redis.NewClient(&redis.Options{
- Addr: c.Redis.Host,
- Password: c.Redis.Pass,
- }),
- MessageConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
- KafkaVersion: sarama.V1_0_0_0,
- OffsetsInitial: sarama.OffsetNewest,
- IsReturnErr: false,
- },
- c.KqMsgBoxConsumerConf.Brokers,
- []string{c.KqMsgBoxConsumerConf.Topic},
- c.KqMsgBoxConsumerConf.GroupId),
- CommandConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
- KafkaVersion: sarama.V1_0_0_0,
- OffsetsInitial: sarama.OffsetNewest,
- IsReturnErr: false,
- },
- c.KqCmdBoxConsumerConf.Brokers,
- []string{c.KqCmdBoxConsumerConf.Topic},
- c.KqCmdBoxConsumerConf.GroupId),
- }
- go svcCtx.subscribe()
- return svcCtx
- }
- func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
- return nil
- }
- func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
- return nil
- }
- func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
- for msg := range claim.Messages() {
- if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
- logx.Info("handleMessage")
- s.handleMessage(sess, msg)
- } else if msg.Topic == s.Config.KqCmdBoxConsumerConf.Topic {
- logx.Info("handleCommand")
- s.handleCommand(sess, msg)
- }
- }
- return nil
- }
- func (s *ServiceContext) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
- propagator := otel.GetTextMapPropagator()
- tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
- ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
- spanName := utils.CallerFuncName()
- spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
- defer span.End()
- propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
- callback(spanCtx)
- }
- func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
- traceId := kafka.GetTraceFromHeader(msg.Headers)
- if len(traceId) == 0 {
- return
- }
- trace.RunOnTracing(traceId, func(ctx context.Context) {
- logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
- var message model.KqMessage
- if err := sonic.Unmarshal(msg.Value, &message); err != nil {
- logx.Errorf("unmarshal msg error: %v", err)
- return
- }
- if message.Opt != model.CMD_SEND_MESSAGE {
- // 指令异常
- return
- }
- var chatMessage model.ChatMessage
- if err := sonic.Unmarshal([]byte(message.Payload), &chatMessage); err != nil {
- logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
- return
- }
- trace.StartTrace(ctx, "FlowsrvServer.handleMessage.PushMessage", func(ctx context.Context) {
- // 投递到receiver_id对应的redis队列暂存
- intCmd := s.RedisClient.LPush(ctx, chatMessage.ReceiverId, string(msg.Value))
- if size, err := intCmd.Result(); err != nil {
- logx.WithContext(ctx).Errorf("push message rmq err %v", err)
- } else {
- logx.WithContext(ctx).Infof("current rmq size: %d", size)
- }
- sess.MarkMessage(msg, "")
- }, attribute.String("msg.key", string(msg.Key)))
- })
- }
- func (s *ServiceContext) handleCommand(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
- traceId := kafka.GetTraceFromHeader(msg.Headers)
- if len(traceId) == 0 {
- return
- }
- trace.RunOnTracing(traceId, func(ctx context.Context) {
- logx.WithContext(ctx).Infof("handle command: %s", msg.Value)
- var message model.KqMessage
- if err := sonic.Unmarshal(msg.Value, &message); err != nil {
- logx.Errorf("unmarshal msg error: %v", err)
- return
- }
- if message.Opt == model.CMD_SEND_MESSAGE {
- // 指令异常
- return
- }
- var cmdMessage model.CommandMessage
- if err := sonic.Unmarshal([]byte(message.Payload), &cmdMessage); err != nil {
- logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
- return
- }
- // 投递到receiver_id对应的redis队列暂存
- switch cmdMessage.ReceiverId {
- case globalkey.All:
- case globalkey.AllPlayer:
- case globalkey.AllVipPlayer:
- case globalkey.AllNormalPlayer:
- for iter := mgr.GetFlowMgrInstance().All().Begin(); iter.IsValid(); iter.Next() {
- flow := iter.Value().(*model2.Flow)
- if flow.Type != globalkey.ConnectTypeNormalPlayer {
- continue
- }
- err := flow.Stream.Send(&flowsrv.CommandResp{
- Code: result.Ok,
- Msg: "success",
- Data: msg.Value,
- })
- if err != nil {
- logx.WithContext(ctx).Errorf("%v", err)
- }
- }
- case globalkey.AllCs:
- default:
- trace.StartTrace(ctx, "FlowsrvServer.handleCommand.PushCmdMessage", func(ctx context.Context) {
- intCmd := s.RedisClient.LPush(ctx, cmdMessage.ReceiverId, string(msg.Value))
- if size, err := intCmd.Result(); err != nil {
- logx.WithContext(ctx).Errorf("push message rmq err %v", err)
- } else {
- logx.WithContext(ctx).Infof("current rmq size: %d", size)
- }
- }, attribute.String("msg.key", string(msg.Key)))
- sess.MarkMessage(msg, "")
- }
- })
- }
- func (s *ServiceContext) subscribe() {
- go s.MessageConsumerGroup.RegisterHandleAndConsumer(s)
- go s.CommandConsumerGroup.RegisterHandleAndConsumer(s)
- //event.On(globalkey.EventHandleRmqJob, event.ListenerFunc(func(e event.Event) error {
- // resultCmd := flow.SvcCtx.RedisClient.BRPop(ctx, 30*time.Second, flow.FlowId)
- // if message, err := resultCmd.Result(); err != nil {
- // logx.WithContext(ctx).Errorf("get message from redis, key: %s, err: %v", flow.FlowId, err)
- // } else {
- // trace.StartTrace(ctx, "FlowsrvServer.flowmgr.handleRmqMessage", func(ctx context.Context) {
- // flow.Message <- message[1]
- // })
- // }
- // return nil
- //}), event.High)
- // 注册事件
- //event.On(globalkey.EventUnsubscribeRmqJob, event.ListenerFunc(func(e event.Event) error {
- //
- // return nil
- //}), event.High)
- //event.On(globalkey.EventNotifyUserOfflineJob, event.ListenerFunc(func(e event.Event) error {
- // traceId := e.Get("trace_id").(string)
- // uType := e.Get("type").(int32)
- // uid := e.Get("uid").(string)
- // gameId := e.Get("game_id").(string)
- // trace.RunOnTracing(traceId, func(ctx context.Context) {
- // trace.StartTrace(ctx, "FlowsrvServer.EventNotifyUserOfflineJob.handleUserOffline", func(ctx context.Context) {
- // _, err := s.InnerRpc.NotifyUserOffline(ctx, &inner.NotifyUserStatusReq{
- // Type: uType,
- // Uid: uid,
- // GameId: gameId,
- // })
- // if err != nil {
- // logx.WithContext(ctx).Errorf("notify user offline has some error: %v", err)
- // }
- // })
- // })
- // return nil
- //}), event.High)
- }
|