소스 검색

v0.0.1开发:transfer服务开发

#Suyghur 2 년 전
부모
커밋
36ecf22903

+ 17 - 16
bff/authbff/api/desc/authbff.api

@@ -1,34 +1,35 @@
 syntax = "v1"
 
 info(
-    title: "api前端服务"
-    desc: "api前端服务 "
-    author: "#Suyghur"
-    version: "v1"
+	title: "api前端服务"
+	desc: "api前端服务 "
+	author: "#Suyghur"
+	version: "v1"
 )
 
 type PlayerAuthReq {
-    PlayerId string `json:"player_id"`
-    GameId string `json:"game_id"`
+	GameId   string `json:"game_id"`
+	PlayerId string `json:"player_id"`
+	Type     int32  `json:"type"`
 }
 
 type CsAuthReq {
-    CsId string `json:"cs_id"`
+	CsId string `json:"cs_id"`
 }
 
 type AuthResp {
-    AccessToken string `json:"access_token"`
+	AccessToken string `json:"access_token"`
 }
 
 @server(
-    prefix: api/v1/auth
+	prefix: api/v1/auth
 )
 service authbff {
-    @doc "玩家登录"
-    @handler playerLogin
-    post /player-login (PlayerAuthReq) returns (AuthResp)
-
-    @doc "客服登录"
-    @handler csLogin
-    post /cs-login (CsAuthReq) returns (AuthResp)
+	@doc "玩家登录"
+	@handler playerLogin
+	post /player-login (PlayerAuthReq) returns (AuthResp)
+	
+	@doc "客服登录"
+	@handler csLogin
+	post /cs-login (CsAuthReq) returns (AuthResp)
 }

+ 2 - 0
bff/authbff/api/internal/logic/csloginlogic.go

@@ -8,6 +8,7 @@ import (
 	"time"
 	"ylink/bff/authbff/api/internal/svc"
 	"ylink/bff/authbff/api/internal/types"
+	"ylink/comm/globalkey"
 	"ylink/comm/jwtkey"
 	"ylink/comm/result"
 )
@@ -53,6 +54,7 @@ func (l *CsLoginLogic) generateCsToken(iat int64, csId string) (string, error) {
 	claims["iat"] = iat
 	claims["exp"] = iat + expire
 	claims[jwtkey.CsId] = csId
+	claims[jwtkey.Type] = globalkey.ConnectTypeCs
 	token := jwt.New(jwt.SigningMethodHS256)
 	token.Claims = claims
 	return token.SignedString([]byte(secret))

+ 4 - 3
bff/authbff/api/internal/logic/playerloginlogic.go

@@ -28,7 +28,7 @@ func NewPlayerLoginLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Playe
 
 func (l *PlayerLoginLogic) PlayerLogin(req *types.PlayerAuthReq) (resp *types.AuthResp, err error) {
 	now := time.Now().Unix()
-	token, err := l.generatePlayerToken(now, req.PlayerId, req.GameId)
+	token, err := l.generatePlayerToken(now, req.Type, req.GameId, req.PlayerId)
 	if err != nil {
 		return nil, errors.Wrap(result.NewErrCode(result.TokenGenerateError), "")
 	}
@@ -47,14 +47,15 @@ func (l *PlayerLoginLogic) PlayerLogin(req *types.PlayerAuthReq) (resp *types.Au
 //  @return string
 //  @return error
 //
-func (l *PlayerLoginLogic) generatePlayerToken(iat int64, playerId string, gameId string) (string, error) {
+func (l *PlayerLoginLogic) generatePlayerToken(iat int64, cType int32, gameId string, playerId string) (string, error) {
 	secret := l.svcCtx.Config.JwtAuth.AccessSecret
 	expire := l.svcCtx.Config.JwtAuth.AccessExpire
 	claims := make(jwt.MapClaims)
 	claims["iat"] = iat
 	claims["exp"] = iat + expire
-	claims[jwtkey.PlayerId] = playerId
 	claims[jwtkey.GameId] = gameId
+	claims[jwtkey.PlayerId] = playerId
+	claims[jwtkey.Type] = cType
 	token := jwt.New(jwt.SigningMethodHS256)
 	token.Claims = claims
 	return token.SignedString([]byte(secret))

+ 2 - 1
bff/authbff/api/internal/types/types.go

@@ -2,8 +2,9 @@
 package types
 
 type PlayerAuthReq struct {
-	PlayerId string `json:"player_id"`
 	GameId   string `json:"game_id"`
+	PlayerId string `json:"player_id"`
+	Type     int32  `json:"type"`
 }
 
 type CsAuthReq struct {

+ 5 - 0
comm/ctxdata/ctxdata.go

@@ -25,6 +25,11 @@ func GetCsIdFromCtx(ctx context.Context) string {
 	return csId
 }
 
+func GetConnectTypeFromCtx(ctx context.Context) int32 {
+	cType, _ := ctx.Value(jwtkey.Type).(int32)
+	return cType
+}
+
 func GetTraceIdFromCtx(ctx context.Context) string {
 	spanCtx := trace.SpanContextFromContext(ctx)
 	if spanCtx.HasTraceID() {

+ 70 - 0
comm/es/esclient.go

@@ -0,0 +1,70 @@
+//@File     esclient.go
+//@Time     2022/06/16
+//@Author   #Suyghur,
+
+package es
+
+import (
+	"bytes"
+	"context"
+	"github.com/bytedance/sonic/encoder"
+	"github.com/elastic/go-elasticsearch/v7"
+	"github.com/elastic/go-elasticsearch/v7/esapi"
+	"github.com/zeromicro/go-zero/core/logx"
+	"io"
+)
+
+type IEsClient interface {
+	Insert(index string, data map[string]interface{})
+}
+
+type EsClient struct {
+	client *elasticsearch.Client
+}
+
+func NewEsClient(conf EsConf) *EsClient {
+	c := elasticsearch.Config{
+		Addresses: conf.Addresses,
+		Username:  conf.Username,
+		Password:  conf.Password,
+	}
+	es, err := elasticsearch.NewClient(c)
+	if err != nil {
+		logx.WithContext(context.Background()).Error(err.Error())
+		panic(err.Error())
+	}
+	return &EsClient{
+		client: es,
+	}
+}
+
+func (es *EsClient) Insert(index string, data interface{}) {
+	var buf = bytes.NewBuffer(nil)
+	if err := encoder.NewStreamEncoder(buf).Encode(data); err != nil {
+		logx.WithContext(context.Background()).Error(err.Error())
+	}
+
+	req := esapi.IndexRequest{
+		Index:   index,
+		Body:    buf,
+		Refresh: "true",
+	}
+	resp, err := req.Do(context.Background(), es.client)
+	if err != nil {
+		logx.WithContext(context.Background()).Errorf("error getting response: %s", err)
+		return
+	}
+	logx.WithContext(context.Background()).Infof("%v", resp.String())
+	defer resp.Body.Close()
+
+	if resp.IsError() {
+		logx.WithContext(context.Background()).Errorf("%s error indexing document data: %s", resp.Status(), data)
+	}
+
+	defer func(Body io.ReadCloser) {
+		err := Body.Close()
+		if err != nil {
+			logx.WithContext(context.Background()).Error(err.Error())
+		}
+	}(resp.Body)
+}

+ 11 - 0
comm/es/esconfig.go

@@ -0,0 +1,11 @@
+//@File     esconfig.go
+//@Time     2022/06/16
+//@Author   #Suyghur,
+
+package es
+
+type EsConf struct {
+	Username  string
+	Password  string
+	Addresses []string
+}

+ 1 - 0
comm/globalkey/globalkey.go

@@ -5,6 +5,7 @@
 package globalkey
 
 const (
+	ConnectTypeError        = -1
 	ConnectTypeNormalPlayer = 0
 	ConnectTypeVipPlayer    = 1
 	ConnectTypeCs           = 2

+ 1 - 0
comm/jwtkey/jwtkey.go

@@ -8,4 +8,5 @@ const (
 	PlayerId = "player_id"
 	GameId   = "game_id"
 	CsId     = "cs_id"
+	Type     = "type"
 )

+ 2 - 0
core/inner/rpc/internal/logic/csconnectplayerlogic.go

@@ -59,11 +59,13 @@ func (l *CsConnectPlayerLogic) CsConnectPlayer(in *pb.InnerCsConnectPlayerReq) (
 		ext.WaitingQueue.Erase(uniqueId)
 
 		// 广播客户端更新等待队列信息
+		//TODO 通知客服也更新
 		payload, _ := sonic.MarshalString(&model.CommandMessage{
 			CmdInfo: map[string]interface{}{
 				"queue_size": ext.WaitingQueue.Size(),
 			},
 		})
+
 		kMsg, _ := sonic.MarshalString(&model.KqMessage{
 			Opt:        model.CMD_UPDATE_WAITING_QUEUE,
 			CreateTs:   time.Now().Unix(),

+ 0 - 1
core/inner/rpc/internal/logic/notifyuserofflinelogic.go

@@ -32,7 +32,6 @@ func NewNotifyUserOfflineLogic(ctx context.Context, svcCtx *svc.ServiceContext)
 }
 
 func (l *NotifyUserOfflineLogic) NotifyUserOffline(in *pb.NotifyUserStatusReq) (*pb.NotifyUserStatusResp, error) {
-	l.Logger.Infof("NotifyUserOffline")
 	switch in.Type {
 	case globalkey.ConnectTypeNormalPlayer:
 		// 修改玩家在线状态

+ 1 - 0
core/inner/rpc/internal/logic/notifyuseronlinelogic.go

@@ -39,6 +39,7 @@ func (l *NotifyUserOnlineLogic) NotifyUserOnline(in *pb.NotifyUserStatusReq) (*p
 			onlinePlayerMap := ext.GameOnlinePlayerMap.Get(in.GameId).(*treemap.Map)
 			if onlinePlayerMap.Contains(in.Uid) {
 				l.Logger.Error("该玩家已在线")
+				// TODO 单点在线
 			} else {
 				ts := time.Now().Unix()
 				playerInfo := &model.PlayerInfo{

+ 14 - 1
core/inner/rpc/internal/svc/servicecontext.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"github.com/Shopify/sarama"
 	"github.com/bytedance/sonic"
+	"github.com/duke-git/lancet/v2/strutil"
 	"github.com/gookit/event"
 	treemap "github.com/liyue201/gostl/ds/map"
 	"github.com/robfig/cron/v3"
@@ -82,6 +83,7 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 
 		trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
 			if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
+
 				// receiverId为空代表这条消息是玩家发送的
 				// 玩家发的消息,先从connectedMap找对应的客服,没有则从vipMap找,都没有则丢弃信息不投递
 				if playerInfo := ext.GetConnectedPlayerInfo(message.GameId, message.Uid); playerInfo != nil {
@@ -104,7 +106,18 @@ func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sa
 				}
 			} else {
 				// receiverId不为空代表这条消息是客服发的
-				s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
+				playerId := strutil.After(message.ReceiverId, message.GameId+"_")
+				// 判断是不是vip玩家
+				if playerInfo := ext.GetVipPlayer(message.GameId, playerId); playerInfo != nil {
+					s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
+				} else {
+					if playerInfo := ext.GetConnectedPlayerInfo(message.GameId, playerId); playerInfo != nil {
+						// 客服连接了这个玩家
+						s.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
+					} else {
+						logx.WithContext(ctx).Errorf("this player is not connected, player id: %s", playerId)
+					}
+				}
 			}
 			sess.MarkMessage(msg, "")
 		}, attribute.String("msg.key", string(msg.Key)))

+ 20 - 0
core/transfer/rpc/etc/transfer.yaml

@@ -0,0 +1,20 @@
+Name: transfer.rpc
+ListenOn: 0.0.0.0:10600
+
+Telemetry:
+  Name: inner-rpc
+  Endpoint: http://localhost:14268/api/traces
+  Sampler: 1.0
+  Batcher: jaeger
+
+KqMsgBoxConsumerConf:
+  Brokers:
+    - localhost:9094
+  Topic: ylink-recv-box-topic
+  GroupId: transfer-rpc
+
+EsConf:
+  Username:
+  Password:
+  Addresses:
+    - http:/localhost:9200

+ 13 - 0
core/transfer/rpc/internal/config/config.go

@@ -0,0 +1,13 @@
+package config
+
+import (
+	"github.com/zeromicro/go-zero/zrpc"
+	"ylink/comm/es"
+	"ylink/comm/kafka"
+)
+
+type Config struct {
+	zrpc.RpcServerConf
+	EsConf               es.EsConf
+	KqMsgBoxConsumerConf kafka.KqConsumerConfig
+}

+ 30 - 0
core/transfer/rpc/internal/logic/invokelogic.go

@@ -0,0 +1,30 @@
+package logic
+
+import (
+	"context"
+
+	"ylink/core/transfer/rpc/internal/svc"
+	"ylink/core/transfer/rpc/pb"
+
+	"github.com/zeromicro/go-zero/core/logx"
+)
+
+type InvokeLogic struct {
+	ctx    context.Context
+	svcCtx *svc.ServiceContext
+	logx.Logger
+}
+
+func NewInvokeLogic(ctx context.Context, svcCtx *svc.ServiceContext) *InvokeLogic {
+	return &InvokeLogic{
+		ctx:    ctx,
+		svcCtx: svcCtx,
+		Logger: logx.WithContext(ctx),
+	}
+}
+
+func (l *InvokeLogic) Invoke(in *pb.TransferReq) (*pb.TransferResp, error) {
+	// todo: add your logic here and delete this line
+
+	return &pb.TransferResp{}, nil
+}

+ 28 - 0
core/transfer/rpc/internal/server/transferserver.go

@@ -0,0 +1,28 @@
+// Code generated by goctl. DO NOT EDIT!
+// Source: transfer.proto
+
+package server
+
+import (
+	"context"
+
+	"ylink/core/transfer/rpc/internal/logic"
+	"ylink/core/transfer/rpc/internal/svc"
+	"ylink/core/transfer/rpc/pb"
+)
+
+type TransferServer struct {
+	svcCtx *svc.ServiceContext
+	pb.UnimplementedTransferServer
+}
+
+func NewTransferServer(svcCtx *svc.ServiceContext) *TransferServer {
+	return &TransferServer{
+		svcCtx: svcCtx,
+	}
+}
+
+func (s *TransferServer) Invoke(ctx context.Context, in *pb.TransferReq) (*pb.TransferResp, error) {
+	l := logic.NewInvokeLogic(ctx, s.svcCtx)
+	return l.Invoke(in)
+}

+ 84 - 0
core/transfer/rpc/internal/svc/servicecontext.go

@@ -0,0 +1,84 @@
+package svc
+
+import (
+	"context"
+	"github.com/Shopify/sarama"
+	"github.com/bytedance/sonic"
+	"github.com/zeromicro/go-zero/core/logx"
+	"go.opentelemetry.io/otel/attribute"
+	"ylink/comm/es"
+	"ylink/comm/kafka"
+	"ylink/comm/model"
+	"ylink/comm/trace"
+	"ylink/core/transfer/rpc/internal/config"
+)
+
+type ServiceContext struct {
+	Config            config.Config
+	EsClient          *es.EsClient
+	KqDbConsumerGroup *kafka.ConsumerGroup
+}
+
+func NewServiceContext(c config.Config) *ServiceContext {
+	svcCtx := &ServiceContext{
+		Config:   c,
+		EsClient: es.NewEsClient(c.EsConf),
+		KqDbConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
+			KafkaVersion:   sarama.V1_0_0_0,
+			OffsetsInitial: sarama.OffsetNewest,
+			IsReturnErr:    false,
+		},
+			c.KqMsgBoxConsumerConf.Brokers,
+			[]string{c.KqMsgBoxConsumerConf.Topic},
+			c.KqMsgBoxConsumerConf.GroupId),
+	}
+	go svcCtx.subscribe()
+	return svcCtx
+
+}
+
+func (s *ServiceContext) Setup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (s *ServiceContext) Cleanup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (s *ServiceContext) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	for msg := range claim.Messages() {
+		if msg.Topic == s.Config.KqMsgBoxConsumerConf.Topic {
+			s.handleMessage(sess, msg)
+		}
+	}
+	return nil
+}
+
+func (s *ServiceContext) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
+	traceId := kafka.GetTraceFromHeader(msg.Headers)
+	if len(traceId) == 0 {
+		return
+	}
+	trace.RunOnTracing(traceId, func(ctx context.Context) {
+		logx.WithContext(ctx).Infof("handle message: %s", msg.Value)
+		var message model.KqMessage
+		if err := sonic.Unmarshal(msg.Value, &message); err != nil {
+			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
+			return
+		}
+
+		if message.Opt != model.CMD_SEND_MESSAGE {
+			// 指令异常
+			return
+		}
+
+		trace.StartTrace(ctx, "TransferServer.handleMessage.InsertMessage2Es", func(ctx context.Context) {
+			s.EsClient.Insert("chat_message_log", message)
+			sess.MarkMessage(msg, "")
+		}, attribute.String("msg.key", string(msg.Key)))
+	})
+}
+
+func (s *ServiceContext) subscribe() {
+	go s.KqDbConsumerGroup.RegisterHandleAndConsumer(s)
+}

+ 208 - 0
core/transfer/rpc/pb/transfer.pb.go

@@ -0,0 +1,208 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// 	protoc-gen-go v1.28.0
+// 	protoc        v3.19.4
+// source: pb/transfer.proto
+
+package pb
+
+import (
+	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	reflect "reflect"
+	sync "sync"
+)
+
+const (
+	// Verify that this generated code is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+	// Verify that runtime/protoimpl is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type TransferReq struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Action int32  `protobuf:"varint,1,opt,name=action,proto3" json:"action,omitempty"`
+	Data   []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
+}
+
+func (x *TransferReq) Reset() {
+	*x = TransferReq{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_pb_transfer_proto_msgTypes[0]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *TransferReq) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*TransferReq) ProtoMessage() {}
+
+func (x *TransferReq) ProtoReflect() protoreflect.Message {
+	mi := &file_pb_transfer_proto_msgTypes[0]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use TransferReq.ProtoReflect.Descriptor instead.
+func (*TransferReq) Descriptor() ([]byte, []int) {
+	return file_pb_transfer_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *TransferReq) GetAction() int32 {
+	if x != nil {
+		return x.Action
+	}
+	return 0
+}
+
+func (x *TransferReq) GetData() []byte {
+	if x != nil {
+		return x.Data
+	}
+	return nil
+}
+
+type TransferResp struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+}
+
+func (x *TransferResp) Reset() {
+	*x = TransferResp{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_pb_transfer_proto_msgTypes[1]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *TransferResp) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*TransferResp) ProtoMessage() {}
+
+func (x *TransferResp) ProtoReflect() protoreflect.Message {
+	mi := &file_pb_transfer_proto_msgTypes[1]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use TransferResp.ProtoReflect.Descriptor instead.
+func (*TransferResp) Descriptor() ([]byte, []int) {
+	return file_pb_transfer_proto_rawDescGZIP(), []int{1}
+}
+
+var File_pb_transfer_proto protoreflect.FileDescriptor
+
+var file_pb_transfer_proto_rawDesc = []byte{
+	0x0a, 0x11, 0x70, 0x62, 0x2f, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x2e, 0x70, 0x72,
+	0x6f, 0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x22, 0x39, 0x0a, 0x0b, 0x54, 0x72, 0x61, 0x6e, 0x73,
+	0x66, 0x65, 0x72, 0x52, 0x65, 0x71, 0x12, 0x16, 0x0a, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e,
+	0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x06, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12,
+	0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61,
+	0x74, 0x61, 0x22, 0x0e, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x52, 0x65,
+	0x73, 0x70, 0x32, 0x37, 0x0a, 0x08, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x12, 0x2b,
+	0x0a, 0x06, 0x69, 0x6e, 0x76, 0x6f, 0x6b, 0x65, 0x12, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x54, 0x72,
+	0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x52, 0x65, 0x71, 0x1a, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x54,
+	0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e,
+	0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+	file_pb_transfer_proto_rawDescOnce sync.Once
+	file_pb_transfer_proto_rawDescData = file_pb_transfer_proto_rawDesc
+)
+
+func file_pb_transfer_proto_rawDescGZIP() []byte {
+	file_pb_transfer_proto_rawDescOnce.Do(func() {
+		file_pb_transfer_proto_rawDescData = protoimpl.X.CompressGZIP(file_pb_transfer_proto_rawDescData)
+	})
+	return file_pb_transfer_proto_rawDescData
+}
+
+var file_pb_transfer_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
+var file_pb_transfer_proto_goTypes = []interface{}{
+	(*TransferReq)(nil),  // 0: pb.TransferReq
+	(*TransferResp)(nil), // 1: pb.TransferResp
+}
+var file_pb_transfer_proto_depIdxs = []int32{
+	0, // 0: pb.Transfer.invoke:input_type -> pb.TransferReq
+	1, // 1: pb.Transfer.invoke:output_type -> pb.TransferResp
+	1, // [1:2] is the sub-list for method output_type
+	0, // [0:1] is the sub-list for method input_type
+	0, // [0:0] is the sub-list for extension type_name
+	0, // [0:0] is the sub-list for extension extendee
+	0, // [0:0] is the sub-list for field type_name
+}
+
+func init() { file_pb_transfer_proto_init() }
+func file_pb_transfer_proto_init() {
+	if File_pb_transfer_proto != nil {
+		return
+	}
+	if !protoimpl.UnsafeEnabled {
+		file_pb_transfer_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*TransferReq); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_pb_transfer_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*TransferResp); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+	}
+	type x struct{}
+	out := protoimpl.TypeBuilder{
+		File: protoimpl.DescBuilder{
+			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+			RawDescriptor: file_pb_transfer_proto_rawDesc,
+			NumEnums:      0,
+			NumMessages:   2,
+			NumExtensions: 0,
+			NumServices:   1,
+		},
+		GoTypes:           file_pb_transfer_proto_goTypes,
+		DependencyIndexes: file_pb_transfer_proto_depIdxs,
+		MessageInfos:      file_pb_transfer_proto_msgTypes,
+	}.Build()
+	File_pb_transfer_proto = out.File
+	file_pb_transfer_proto_rawDesc = nil
+	file_pb_transfer_proto_goTypes = nil
+	file_pb_transfer_proto_depIdxs = nil
+}

+ 16 - 0
core/transfer/rpc/pb/transfer.proto

@@ -0,0 +1,16 @@
+syntax = "proto3";
+
+option go_package = "./pb";
+
+package pb;
+
+message TransferReq{
+  int32 action = 1;
+  bytes data = 2;
+}
+
+message TransferResp{}
+
+service Transfer {
+  rpc invoke (TransferReq) returns (TransferResp);
+}

+ 105 - 0
core/transfer/rpc/pb/transfer_grpc.pb.go

@@ -0,0 +1,105 @@
+// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
+// versions:
+// - protoc-gen-go-grpc v1.2.0
+// - protoc             v3.19.4
+// source: pb/transfer.proto
+
+package pb
+
+import (
+	context "context"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+)
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+// Requires gRPC-Go v1.32.0 or later.
+const _ = grpc.SupportPackageIsVersion7
+
+// TransferClient is the client API for Transfer service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
+type TransferClient interface {
+	Invoke(ctx context.Context, in *TransferReq, opts ...grpc.CallOption) (*TransferResp, error)
+}
+
+type transferClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewTransferClient(cc grpc.ClientConnInterface) TransferClient {
+	return &transferClient{cc}
+}
+
+func (c *transferClient) Invoke(ctx context.Context, in *TransferReq, opts ...grpc.CallOption) (*TransferResp, error) {
+	out := new(TransferResp)
+	err := c.cc.Invoke(ctx, "/pb.Transfer/invoke", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// TransferServer is the server API for Transfer service.
+// All implementations must embed UnimplementedTransferServer
+// for forward compatibility
+type TransferServer interface {
+	Invoke(context.Context, *TransferReq) (*TransferResp, error)
+	mustEmbedUnimplementedTransferServer()
+}
+
+// UnimplementedTransferServer must be embedded to have forward compatible implementations.
+type UnimplementedTransferServer struct {
+}
+
+func (UnimplementedTransferServer) Invoke(context.Context, *TransferReq) (*TransferResp, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Invoke not implemented")
+}
+func (UnimplementedTransferServer) mustEmbedUnimplementedTransferServer() {}
+
+// UnsafeTransferServer may be embedded to opt out of forward compatibility for this service.
+// Use of this interface is not recommended, as added methods to TransferServer will
+// result in compilation errors.
+type UnsafeTransferServer interface {
+	mustEmbedUnimplementedTransferServer()
+}
+
+func RegisterTransferServer(s grpc.ServiceRegistrar, srv TransferServer) {
+	s.RegisterService(&Transfer_ServiceDesc, srv)
+}
+
+func _Transfer_Invoke_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(TransferReq)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(TransferServer).Invoke(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/pb.Transfer/invoke",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(TransferServer).Invoke(ctx, req.(*TransferReq))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+// Transfer_ServiceDesc is the grpc.ServiceDesc for Transfer service.
+// It's only intended for direct use with grpc.RegisterService,
+// and not to be introspected or modified (even as a copy)
+var Transfer_ServiceDesc = grpc.ServiceDesc{
+	ServiceName: "pb.Transfer",
+	HandlerType: (*TransferServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "invoke",
+			Handler:    _Transfer_Invoke_Handler,
+		},
+	},
+	Streams:  []grpc.StreamDesc{},
+	Metadata: "pb/transfer.proto",
+}

+ 40 - 0
core/transfer/rpc/transfer.go

@@ -0,0 +1,40 @@
+package main
+
+import (
+	"flag"
+	"fmt"
+
+	"ylink/core/transfer/rpc/internal/config"
+	"ylink/core/transfer/rpc/internal/server"
+	"ylink/core/transfer/rpc/internal/svc"
+	"ylink/core/transfer/rpc/pb"
+
+	"github.com/zeromicro/go-zero/core/conf"
+	"github.com/zeromicro/go-zero/core/service"
+	"github.com/zeromicro/go-zero/zrpc"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/reflection"
+)
+
+var configFile = flag.String("f", "etc/transfer.yaml", "the config file")
+
+func main() {
+	flag.Parse()
+
+	var c config.Config
+	conf.MustLoad(*configFile, &c)
+	ctx := svc.NewServiceContext(c)
+	svr := server.NewTransferServer(ctx)
+
+	s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
+		pb.RegisterTransferServer(grpcServer, svr)
+
+		if c.Mode == service.DevMode || c.Mode == service.TestMode {
+			reflection.Register(grpcServer)
+		}
+	})
+	defer s.Stop()
+
+	fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
+	s.Start()
+}

+ 37 - 0
core/transfer/rpc/transfer/transfer.go

@@ -0,0 +1,37 @@
+// Code generated by goctl. DO NOT EDIT!
+// Source: transfer.proto
+
+package transfer
+
+import (
+	"context"
+
+	"ylink/core/transfer/rpc/pb"
+
+	"github.com/zeromicro/go-zero/zrpc"
+	"google.golang.org/grpc"
+)
+
+type (
+	TransferReq  = pb.TransferReq
+	TransferResp = pb.TransferResp
+
+	Transfer interface {
+		Invoke(ctx context.Context, in *TransferReq, opts ...grpc.CallOption) (*TransferResp, error)
+	}
+
+	defaultTransfer struct {
+		cli zrpc.Client
+	}
+)
+
+func NewTransfer(cli zrpc.Client) Transfer {
+	return &defaultTransfer{
+		cli: cli,
+	}
+}
+
+func (m *defaultTransfer) Invoke(ctx context.Context, in *TransferReq, opts ...grpc.CallOption) (*TransferResp, error) {
+	client := pb.NewTransferClient(m.cli.Conn())
+	return client.Invoke(ctx, in, opts...)
+}

+ 20 - 14
flowsrv/rpc/internal/logic/connectlogic.go

@@ -31,8 +31,8 @@ func NewConnectLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ConnectLo
 }
 
 func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServer) error {
-	uid, gameId, err := l.checkAuth(in)
-	if err != nil {
+	cType, uid, gameId, err := l.checkAuth(in)
+	if err != nil || cType == globalkey.ConnectTypeError {
 		return stream.Send(&pb.CommandResp{
 			Code: result.TokenParseError,
 			Msg:  err.Error(),
@@ -40,7 +40,7 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 		})
 	}
 	_, err = l.svcCtx.InnerRpc.NotifyUserOnline(l.ctx, &inner.NotifyUserStatusReq{
-		Type:   in.Type,
+		Type:   cType,
 		Uid:    uid,
 		GameId: gameId,
 	})
@@ -53,7 +53,7 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 	}
 
 	var flowId string
-	if in.Type == globalkey.ConnectTypeCs {
+	if cType == globalkey.ConnectTypeCs {
 		flowId = uid
 	} else {
 		flowId = gameId + "_" + uid
@@ -65,7 +65,7 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 		Stream:      stream,
 		RedisClient: l.svcCtx.RedisClient,
 		InnerRpc:    l.svcCtx.InnerRpc,
-		Type:        in.Type,
+		Type:        cType,
 		Uid:         uid,
 		GameId:      gameId,
 		FlowId:      flowId,
@@ -81,33 +81,39 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 	return nil
 }
 
-func (l *ConnectLogic) checkAuth(in *pb.CommandReq) (string, string, error) {
+func (l *ConnectLogic) checkAuth(in *pb.CommandReq) (int32, string, string, error) {
 	token, err := jwt.Parse(in.AccessToken, func(token *jwt.Token) (i interface{}, err error) {
 		return []byte(l.svcCtx.Config.JwtAuth.AccessSecret), nil
 	})
 
 	uid := ""
 	gameId := ""
+	var cType int32 = -1
 	if token.Valid {
 		//将获取的token中的Claims强转为MapClaims
 		claims, _ := token.Claims.(jwt.MapClaims)
-		if in.Type == globalkey.ConnectTypeCs {
-			uid = claims[jwtkey.CsId].(string)
-		} else {
+		cType = int32(claims[jwtkey.Type].(float64))
+		switch cType {
+		case globalkey.ConnectTypeNormalPlayer:
 			uid = claims[jwtkey.PlayerId].(string)
 			gameId = claims[jwtkey.GameId].(string)
+		case globalkey.ConnectTypeVipPlayer:
+			uid = claims[jwtkey.PlayerId].(string)
+			gameId = claims[jwtkey.GameId].(string)
+		case globalkey.ConnectTypeCs:
+			uid = claims[jwtkey.CsId].(string)
 		}
-		return uid, gameId, nil
+		return cType, uid, gameId, nil
 	} else if ve, ok := err.(*jwt.ValidationError); ok {
 		if ve.Errors&jwt.ValidationErrorMalformed != 0 {
-			return uid, gameId, errors.Wrap(result.NewErrCode(result.TokenParseError), "")
+			return cType, uid, gameId, errors.Wrap(result.NewErrCode(result.TokenParseError), "")
 		} else if ve.Errors&(jwt.ValidationErrorExpired|jwt.ValidationErrorNotValidYet) != 0 {
 			// Token is either expired or not active yet
-			return uid, gameId, errors.Wrap(result.NewErrCode(result.TokenExpireError), "")
+			return cType, uid, gameId, errors.Wrap(result.NewErrCode(result.TokenExpireError), "")
 		} else {
-			return uid, gameId, errors.Wrap(result.NewErrCode(result.TokenParseError), "")
+			return cType, uid, gameId, errors.Wrap(result.NewErrCode(result.TokenParseError), "")
 		}
 	} else {
-		return uid, gameId, errors.Wrap(result.NewErrCode(result.TokenParseError), "")
+		return cType, uid, gameId, errors.Wrap(result.NewErrCode(result.TokenParseError), "")
 	}
 }

+ 13 - 22
flowsrv/rpc/pb/flowsrv.pb.go

@@ -26,7 +26,6 @@ type CommandReq struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Type        int32  `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"`
 	AccessToken string `protobuf:"bytes,2,opt,name=access_token,json=accessToken,proto3" json:"access_token,omitempty"`
 }
 
@@ -62,13 +61,6 @@ func (*CommandReq) Descriptor() ([]byte, []int) {
 	return file_pb_flowsrv_proto_rawDescGZIP(), []int{0}
 }
 
-func (x *CommandReq) GetType() int32 {
-	if x != nil {
-		return x.Type
-	}
-	return 0
-}
-
 func (x *CommandReq) GetAccessToken() string {
 	if x != nil {
 		return x.AccessToken
@@ -145,20 +137,19 @@ var file_pb_flowsrv_proto_rawDesc = []byte{
 	0x0a, 0x10, 0x70, 0x62, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x2e, 0x70, 0x72, 0x6f,
 	0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
 	0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70,
-	0x72, 0x6f, 0x74, 0x6f, 0x22, 0x43, 0x0a, 0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
-	0x65, 0x71, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05,
-	0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73,
-	0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63,
-	0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x47, 0x0a, 0x0b, 0x43, 0x6f, 0x6d,
-	0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65,
-	0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03,
-	0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x12, 0x12,
-	0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61,
-	0x74, 0x61, 0x32, 0x37, 0x0a, 0x07, 0x46, 0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x12, 0x2c, 0x0a,
-	0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f,
-	0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f,
-	0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x30, 0x01, 0x42, 0x06, 0x5a, 0x04, 0x2e,
-	0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x72, 0x6f, 0x74, 0x6f, 0x22, 0x2f, 0x0a, 0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
+	0x65, 0x71, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x6f, 0x6b,
+	0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73,
+	0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x47, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
+	0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x05, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6d, 0x73, 0x67, 0x18,
+	0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61,
+	0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x37,
+	0x0a, 0x07, 0x46, 0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x12, 0x2c, 0x0a, 0x07, 0x63, 0x6f, 0x6e,
+	0x6e, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
+	0x64, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
+	0x64, 0x52, 0x65, 0x73, 0x70, 0x30, 0x01, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62,
+	0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (

+ 0 - 4
flowsrv/rpc/pb/flowsrv.proto

@@ -4,11 +4,7 @@ option go_package = "./pb";
 
 package pb;
 
-import "google/protobuf/struct.proto";
-
-
 message CommandReq {
-  int32 type = 1;
   string access_token = 2;
 }
 

+ 2 - 0
go.mod

@@ -5,6 +5,7 @@ go 1.18
 require (
 	github.com/Shopify/sarama v1.33.0
 	github.com/bytedance/sonic v1.3.0
+	github.com/duke-git/lancet/v2 v2.0.9
 	github.com/go-redis/redis/v8 v8.11.4
 	github.com/golang-jwt/jwt/v4 v4.4.1
 	github.com/gookit/event v1.0.6
@@ -29,6 +30,7 @@ require (
 	github.com/eapache/go-resiliency v1.2.0 // indirect
 	github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
 	github.com/eapache/queue v1.1.0 // indirect
+	github.com/elastic/go-elasticsearch/v7 v7.17.1 // indirect
 	github.com/go-logr/logr v1.2.3 // indirect
 	github.com/go-logr/stdr v1.2.2 // indirect
 	github.com/gogo/protobuf v1.3.2 // indirect

+ 4 - 0
go.sum

@@ -105,6 +105,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
+github.com/duke-git/lancet/v2 v2.0.9 h1:9abrZ97PJLtbBHAlwANU+gV20yIF/h7AXhAbd4u60Ww=
+github.com/duke-git/lancet/v2 v2.0.9/go.mod h1:5Nawyf/bK783rCiHyVkZLx+jj8028oVVjLOrC21ZONA=
 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
 github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
@@ -112,6 +114,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8
 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
 github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
+github.com/elastic/go-elasticsearch/v7 v7.17.1 h1:49mHcHx7lpCL8cW1aioEwSEVKQF3s+Igi4Ye/QTWwmk=
+github.com/elastic/go-elasticsearch/v7 v7.17.1/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
 github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
 github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
 github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=