Browse Source

v0.0.1开发:kafka消息加入链路追踪

#Suyghur 2 years ago
parent
commit
49178a22a8
83 changed files with 707 additions and 369 deletions
  1. 17 17
      bff/authbff/api/desc/authbff.api
  2. 2 2
      bff/authbff/api/etc/authbff.yaml
  3. 1 1
      bff/authbff/api/internal/handler/csloginhandler.go
  4. 1 1
      bff/authbff/api/internal/handler/playerloginhandler.go
  5. 1 1
      bff/cmdbff/api/internal/handler/csconnectplayerhandler.go
  6. 1 1
      bff/cmdbff/api/internal/handler/csfetchhistorylisthandler.go
  7. 1 1
      bff/cmdbff/api/internal/handler/csfetchhistorymsghandler.go
  8. 1 1
      bff/cmdbff/api/internal/handler/csfetchplayerqueuehandler.go
  9. 1 1
      bff/cmdbff/api/internal/handler/cssendmsghandler.go
  10. 1 1
      bff/cmdbff/api/internal/handler/playerdisconnecthandler.go
  11. 1 1
      bff/cmdbff/api/internal/handler/playerfetchcsinfohandler.go
  12. 1 1
      bff/cmdbff/api/internal/handler/playerfetchhistorymsghandler.go
  13. 1 1
      bff/cmdbff/api/internal/handler/playersendmsghandler.go
  14. 1 1
      bff/cmdbff/api/internal/logic/csconnectplayerlogic.go
  15. 1 1
      bff/cmdbff/api/internal/logic/csfetchhistorylistlogic.go
  16. 1 1
      bff/cmdbff/api/internal/logic/csfetchhistorymsglogic.go
  17. 1 1
      bff/cmdbff/api/internal/logic/cssendmsglogic.go
  18. 1 1
      bff/cmdbff/api/internal/logic/playerdisconnectlogic.go
  19. 1 1
      bff/cmdbff/api/internal/logic/playerfetchcsinfologic.go
  20. 1 1
      bff/cmdbff/api/internal/logic/playerfetchhistorymsglogic.go
  21. 1 1
      bff/cmdbff/api/internal/logic/playersendmsglogic.go
  22. 10 1
      comm/ctxdata/ctxdata.go
  23. 1 1
      comm/ds/rbtree/iterator.go
  24. 0 0
      comm/ds/rbtree/node.go
  25. 2 2
      comm/ds/rbtree/rbtree.go
  26. 1 1
      comm/ds/rbtree/rbtree_test.go
  27. 2 2
      comm/ds/treemap/iterator.go
  28. 5 5
      comm/ds/treemap/map.go
  29. 0 0
      comm/ds/treemap/map_test.go
  30. 4 4
      comm/ds/treemap/multimap.go
  31. 0 0
      comm/ds/treemap/multimap_test.go
  32. 10 0
      comm/globalkey/globalkey.go
  33. 0 0
      comm/jwtkey/jwtkey.go
  34. 3 2
      comm/kafka/consumergroup.go
  35. 16 0
      comm/kafka/extdata.go
  36. 6 1
      comm/kafka/kqconfig.go
  37. 17 23
      comm/kafka/producer.go
  38. 0 0
      comm/model/csinfo.go
  39. 20 0
      comm/model/message.go
  40. 0 0
      comm/result/err.go
  41. 0 0
      comm/result/httpresult.go
  42. 0 0
      comm/result/responsebean.go
  43. 42 0
      comm/trace/trace.go
  44. 39 0
      comm/utils/callerfunc.go
  45. 0 0
      comm/utils/comparator/comparator.go
  46. 0 0
      comm/utils/iterator/iterator.go
  47. 0 0
      comm/utils/sync/locker.go
  48. 0 0
      comm/utils/visitor/visitor.go
  49. 14 2
      core/auth/rpc/internal/logic/checkauthlogic.go
  50. 2 2
      core/auth/rpc/internal/logic/csauthlogic.go
  51. 2 2
      core/auth/rpc/internal/logic/playerauthlogic.go
  52. 36 16
      core/auth/rpc/pb/auth.pb.go
  53. 5 2
      core/auth/rpc/pb/auth.proto
  54. 2 8
      core/cmd/rpc/etc/cmd.yaml
  55. 3 3
      core/cmd/rpc/internal/config/config.go
  56. 1 1
      core/cmd/rpc/internal/logic/csfetchhistorychatlogic.go
  57. 1 1
      core/cmd/rpc/internal/logic/csfetchhistorymsglogic.go
  58. 1 1
      core/cmd/rpc/internal/logic/playerfetchhistorymsglogic.go
  59. 12 9
      core/cmd/rpc/internal/logic/playersendmsglogic.go
  60. 7 13
      core/cmd/rpc/internal/svc/servicecontext.go
  61. 30 0
      core/inner/rpc/Dockerfile
  62. 6 3
      core/inner/rpc/etc/inner.yaml
  63. 4 3
      core/inner/rpc/internal/config/config.go
  64. 2 1
      core/inner/rpc/internal/ext/global.go
  65. 0 71
      core/inner/rpc/internal/ext/sendboxhandler.go
  66. 1 1
      core/inner/rpc/internal/logic/csfetchplayerqueuelogic.go
  67. 107 2
      core/inner/rpc/internal/server/innerserver.go
  68. 10 10
      core/inner/rpc/internal/svc/servicecontext.go
  69. 2 0
      core/inner/rpc/pb/inner.proto
  70. 0 8
      ext/globalkey/rediskey.go
  71. 0 14
      ext/model/message.go
  72. 30 0
      flowsrv/rpc/Dockerfile
  73. 6 12
      flowsrv/rpc/etc/flowsrv.yaml
  74. 3 3
      flowsrv/rpc/internal/config/config.go
  75. 5 0
      flowsrv/rpc/internal/ext/recvboxhandler.go
  76. 8 3
      flowsrv/rpc/internal/logic/connectlogic.go
  77. 5 2
      flowsrv/rpc/internal/logic/disconnectlogic.go
  78. 36 0
      flowsrv/rpc/internal/mgr/flowmgr.go
  79. 109 1
      flowsrv/rpc/internal/server/flowsrvserver.go
  80. 37 90
      flowsrv/rpc/pb/flowsrv.pb.go
  81. 1 5
      flowsrv/rpc/pb/flowsrv.proto
  82. 1 0
      go.mod
  83. 2 0
      go.sum

+ 17 - 17
bff/authbff/api/desc/authbff.api

@@ -1,34 +1,34 @@
 syntax = "v1"
 
 info(
-	title: "api前端服务"
-	desc: "api前端服务 "
-	author: "#Suyghur"
-	version: "v1"
+    title: "api前端服务"
+    desc: "api前端服务 "
+    author: "#Suyghur"
+    version: "v1"
 )
 
 type PlayerAuthReq {
-	PlayerId string `json:"player_id"`
-	GameId   string `json:"game_id"`
+    PlayerId string `json:"player_id"`
+    GameId string `json:"game_id"`
 }
 
 type CsAuthReq {
-	CsId string `json:"cs_id"`
+    CsId string `json:"cs_id"`
 }
 
 type AuthResp {
-	AccessToken string `json:"access_token"`
+    AccessToken string `json:"access_token"`
 }
 
 @server(
-	prefix: api/v1/auth
+    prefix: api/v1/auth
 )
-service Authbff {
-	@doc "玩家登录"
-	@handler playerLogin
-	post /player-login (PlayerAuthReq) returns (AuthResp)
-	
-	@doc "客服登录"
-	@handler csLogin
-	post /cs-login (CsAuthReq) returns (AuthResp)
+service authbff {
+    @doc "玩家登录"
+    @handler playerLogin
+    post /player-login (PlayerAuthReq) returns (AuthResp)
+
+    @doc "客服登录"
+    @handler csLogin
+    post /cs-login (CsAuthReq) returns (AuthResp)
 }

+ 2 - 2
bff/authbff/api/etc/authbff.yaml

@@ -1,10 +1,10 @@
-Name: Authbff
+Name: authbff
 Host: 0.0.0.0
 Port: 10000
 
 AuthRpcConf:
   Endpoints:
-        - localhost:10400
+    - localhost:10400
   NonBlock: true
 
 #链路追踪

+ 1 - 1
bff/authbff/api/internal/handler/csloginhandler.go

@@ -5,7 +5,7 @@ import (
 	"ylink/bff/authbff/api/internal/logic"
 	"ylink/bff/authbff/api/internal/svc"
 	"ylink/bff/authbff/api/internal/types"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 )

+ 1 - 1
bff/authbff/api/internal/handler/playerloginhandler.go

@@ -5,7 +5,7 @@ import (
 	"ylink/bff/authbff/api/internal/logic"
 	"ylink/bff/authbff/api/internal/svc"
 	"ylink/bff/authbff/api/internal/types"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 )

+ 1 - 1
bff/cmdbff/api/internal/handler/csconnectplayerhandler.go

@@ -2,7 +2,7 @@ package handler
 
 import (
 	"net/http"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 	"ylink/bff/cmdbff/api/internal/logic"

+ 1 - 1
bff/cmdbff/api/internal/handler/csfetchhistorylisthandler.go

@@ -2,7 +2,7 @@ package handler
 
 import (
 	"net/http"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 	"ylink/bff/cmdbff/api/internal/logic"

+ 1 - 1
bff/cmdbff/api/internal/handler/csfetchhistorymsghandler.go

@@ -2,7 +2,7 @@ package handler
 
 import (
 	"net/http"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 	"ylink/bff/cmdbff/api/internal/logic"

+ 1 - 1
bff/cmdbff/api/internal/handler/csfetchplayerqueuehandler.go

@@ -2,7 +2,7 @@ package handler
 
 import (
 	"net/http"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 	"ylink/bff/cmdbff/api/internal/logic"

+ 1 - 1
bff/cmdbff/api/internal/handler/cssendmsghandler.go

@@ -2,7 +2,7 @@ package handler
 
 import (
 	"net/http"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 	"ylink/bff/cmdbff/api/internal/logic"

+ 1 - 1
bff/cmdbff/api/internal/handler/playerdisconnecthandler.go

@@ -2,7 +2,7 @@ package handler
 
 import (
 	"net/http"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"ylink/bff/cmdbff/api/internal/logic"
 	"ylink/bff/cmdbff/api/internal/svc"

+ 1 - 1
bff/cmdbff/api/internal/handler/playerfetchcsinfohandler.go

@@ -2,7 +2,7 @@ package handler
 
 import (
 	"net/http"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 	"ylink/bff/cmdbff/api/internal/logic"

+ 1 - 1
bff/cmdbff/api/internal/handler/playerfetchhistorymsghandler.go

@@ -2,7 +2,7 @@ package handler
 
 import (
 	"net/http"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 	"ylink/bff/cmdbff/api/internal/logic"

+ 1 - 1
bff/cmdbff/api/internal/handler/playersendmsghandler.go

@@ -2,7 +2,7 @@ package handler
 
 import (
 	"net/http"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"github.com/zeromicro/go-zero/rest/httpx"
 	"ylink/bff/cmdbff/api/internal/logic"

+ 1 - 1
bff/cmdbff/api/internal/logic/csconnectplayerlogic.go

@@ -2,8 +2,8 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/ctxdata"
 	"ylink/core/cmd/rpc/cmd"
-	"ylink/ext/ctxdata"
 
 	"ylink/bff/cmdbff/api/internal/svc"
 	"ylink/bff/cmdbff/api/internal/types"

+ 1 - 1
bff/cmdbff/api/internal/logic/csfetchhistorylistlogic.go

@@ -2,8 +2,8 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/ctxdata"
 	"ylink/core/cmd/rpc/cmd"
-	"ylink/ext/ctxdata"
 
 	"ylink/bff/cmdbff/api/internal/svc"
 	"ylink/bff/cmdbff/api/internal/types"

+ 1 - 1
bff/cmdbff/api/internal/logic/csfetchhistorymsglogic.go

@@ -2,8 +2,8 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/ctxdata"
 	"ylink/core/cmd/rpc/cmd"
-	"ylink/ext/ctxdata"
 
 	"ylink/bff/cmdbff/api/internal/svc"
 	"ylink/bff/cmdbff/api/internal/types"

+ 1 - 1
bff/cmdbff/api/internal/logic/cssendmsglogic.go

@@ -2,8 +2,8 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/ctxdata"
 	"ylink/core/cmd/rpc/cmd"
-	"ylink/ext/ctxdata"
 
 	"ylink/bff/cmdbff/api/internal/svc"
 	"ylink/bff/cmdbff/api/internal/types"

+ 1 - 1
bff/cmdbff/api/internal/logic/playerdisconnectlogic.go

@@ -2,8 +2,8 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/ctxdata"
 	"ylink/core/cmd/rpc/cmd"
-	"ylink/ext/ctxdata"
 
 	"github.com/zeromicro/go-zero/core/logx"
 	"ylink/bff/cmdbff/api/internal/svc"

+ 1 - 1
bff/cmdbff/api/internal/logic/playerfetchcsinfologic.go

@@ -2,8 +2,8 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/ctxdata"
 	"ylink/core/cmd/rpc/cmd"
-	"ylink/ext/ctxdata"
 
 	"ylink/bff/cmdbff/api/internal/svc"
 	"ylink/bff/cmdbff/api/internal/types"

+ 1 - 1
bff/cmdbff/api/internal/logic/playerfetchhistorymsglogic.go

@@ -2,8 +2,8 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/ctxdata"
 	"ylink/core/cmd/rpc/pb"
-	"ylink/ext/ctxdata"
 
 	"ylink/bff/cmdbff/api/internal/svc"
 	"ylink/bff/cmdbff/api/internal/types"

+ 1 - 1
bff/cmdbff/api/internal/logic/playersendmsglogic.go

@@ -2,8 +2,8 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/ctxdata"
 	"ylink/core/cmd/rpc/pb"
-	"ylink/ext/ctxdata"
 
 	"ylink/bff/cmdbff/api/internal/svc"
 	"ylink/bff/cmdbff/api/internal/types"

+ 10 - 1
ext/ctxdata/ctxdata.go → comm/ctxdata/ctxdata.go

@@ -6,7 +6,8 @@ package ctxdata
 
 import (
 	"context"
-	"ylink/ext/jwtkey"
+	"go.opentelemetry.io/otel/trace"
+	"ylink/comm/jwtkey"
 )
 
 func GetPlayerIdFromCtx(ctx context.Context) string {
@@ -23,3 +24,11 @@ func GetCsIdFromCtx(ctx context.Context) string {
 	csId, _ := ctx.Value(jwtkey.CsId).(string)
 	return csId
 }
+
+func GetTraceIdFromCtx(ctx context.Context) string {
+	spanCtx := trace.SpanContextFromContext(ctx)
+	if spanCtx.HasTraceID() {
+		return spanCtx.TraceID().String()
+	}
+	return ""
+}

+ 1 - 1
ext/ds/rbtree/iterator.go → comm/ds/rbtree/iterator.go

@@ -4,7 +4,7 @@
 
 package rbtree
 
-import "ylink/ext/utils/iterator"
+import "ylink/comm/utils/iterator"
 
 type RbTreeIterator struct {
 	node *Node

+ 0 - 0
ext/ds/rbtree/node.go → comm/ds/rbtree/node.go


+ 2 - 2
ext/ds/rbtree/rbtree.go → comm/ds/rbtree/rbtree.go

@@ -6,8 +6,8 @@ package rbtree
 
 import (
 	"fmt"
-	"ylink/ext/utils/comparator"
-	"ylink/ext/utils/visitor"
+	"ylink/comm/utils/comparator"
+	"ylink/comm/utils/visitor"
 )
 
 var defaultKeyComparator = comparator.BuiltinTypeComparator

+ 1 - 1
ext/ds/rbtree/rbtree_test.go → comm/ds/rbtree/rbtree_test.go

@@ -5,7 +5,7 @@ import (
 	"math/rand"
 	"testing"
 	"time"
-	"ylink/ext/utils/comparator"
+	"ylink/comm/utils/comparator"
 )
 
 func TestRbTeeFind(t *testing.T) {

+ 2 - 2
ext/ds/treemap/iterator.go → comm/ds/treemap/iterator.go

@@ -5,8 +5,8 @@
 package treemap
 
 import (
-	"ylink/ext/ds/rbtree"
-	"ylink/ext/utils/iterator"
+	"ylink/comm/ds/rbtree"
+	"ylink/comm/utils/iterator"
 )
 
 type MapIterator struct {

+ 5 - 5
ext/ds/treemap/map.go → comm/ds/treemap/map.go

@@ -6,11 +6,11 @@ package treemap
 
 import (
 	gosync "sync"
-	"ylink/ext/ds/rbtree"
-	"ylink/ext/utils/comparator"
-	"ylink/ext/utils/iterator"
-	"ylink/ext/utils/sync"
-	"ylink/ext/utils/visitor"
+	"ylink/comm/ds/rbtree"
+	"ylink/comm/utils/comparator"
+	"ylink/comm/utils/iterator"
+	"ylink/comm/utils/sync"
+	"ylink/comm/utils/visitor"
 )
 
 var (

+ 0 - 0
ext/ds/treemap/map_test.go → comm/ds/treemap/map_test.go


+ 4 - 4
ext/ds/treemap/multimap.go → comm/ds/treemap/multimap.go

@@ -4,10 +4,10 @@
 package treemap
 
 import (
-	"ylink/ext/ds/rbtree"
-	"ylink/ext/utils/comparator"
-	"ylink/ext/utils/sync"
-	"ylink/ext/utils/visitor"
+	"ylink/comm/ds/rbtree"
+	"ylink/comm/utils/comparator"
+	"ylink/comm/utils/sync"
+	"ylink/comm/utils/visitor"
 )
 
 // MultiMap uses RbTress for internal data structure, and keys can bee repeated.

+ 0 - 0
ext/ds/treemap/multimap_test.go → comm/ds/treemap/multimap_test.go


+ 10 - 0
comm/globalkey/globalkey.go

@@ -0,0 +1,10 @@
+//@File     globalkey.go
+//@Time     2022/05/13
+//@Author   #Suyghur,
+
+package globalkey
+
+const (
+	CONNECT_TYPE_PLAYER = 0
+	CONNECT_TYPE_CS     = 1
+)

+ 0 - 0
ext/jwtkey/jwtkey.go → comm/jwtkey/jwtkey.go


+ 3 - 2
ext/kafka/consumergroup.go → comm/kafka/consumergroup.go

@@ -30,12 +30,12 @@ func NewConsumerGroup(c *ConsumerGroupConfig, addr, topics []string, groupId str
 	client, err := sarama.NewClient(addr, config)
 	if err != nil {
 		logx.WithContext(context.Background()).Error(err.Error())
-		return nil
+		panic(err.Error())
 	}
 	consumerGroup, err := sarama.NewConsumerGroupFromClient(groupId, client)
 	if err != nil {
 		logx.WithContext(context.Background()).Error(err.Error())
-		return nil
+		panic(err.Error())
 	}
 	return &ConsumerGroup{consumerGroup, groupId, topics}
 }
@@ -45,6 +45,7 @@ func (cg *ConsumerGroup) RegisterHandleAndConsumer(handler sarama.ConsumerGroupH
 	for {
 		if err := cg.ConsumerGroup.Consume(ctx, cg.topics, handler); err != nil {
 			logx.Error(err.Error())
+			panic(err.Error())
 		}
 	}
 }

+ 16 - 0
comm/kafka/extdata.go

@@ -0,0 +1,16 @@
+//@File     extdata.go
+//@Time     2022/05/17
+//@Author   #Suyghur,
+
+package kafka
+
+import "github.com/Shopify/sarama"
+
+func GetTraceFromHeader(headers []*sarama.RecordHeader) string {
+	for _, h := range headers {
+		if string(h.Key) == "trace_id" {
+			return string(h.Value)
+		}
+	}
+	return ""
+}

+ 6 - 1
ext/kafka/kqconfig.go → comm/kafka/kqconfig.go

@@ -4,7 +4,12 @@
 
 package kafka
 
-type KqConfig struct {
+type KqProducerConfig struct {
+	Brokers []string
+	Topic   string
+}
+
+type KqConsumerConfig struct {
 	Brokers []string
 	Topic   string
 	GroupId string

+ 17 - 23
ext/kafka/producer.go → comm/kafka/producer.go

@@ -8,6 +8,9 @@ import (
 	"context"
 	"github.com/Shopify/sarama"
 	"github.com/zeromicro/go-zero/core/logx"
+	"go.opentelemetry.io/otel/attribute"
+	"ylink/comm/ctxdata"
+	"ylink/comm/trace"
 )
 
 type Producer struct {
@@ -18,6 +21,8 @@ type Producer struct {
 }
 
 func NewKafkaProducer(addr []string, topic string) *Producer {
+	logx.Infof("brokers: %v", addr)
+	logx.Infof("topic: %s", topic)
 	p := Producer{}
 	p.config = sarama.NewConfig()
 	// Whether to enable the successes channel to be notified after the message is sent successfully
@@ -35,38 +40,27 @@ func NewKafkaProducer(addr []string, topic string) *Producer {
 	producer, err := sarama.NewSyncProducer(p.addr, p.config)
 	if err != nil {
 		logx.WithContext(context.Background()).Error(err.Error())
-		return nil
+		panic(err.Error())
 	}
 	p.producer = producer
 	return &p
 }
 
-//func (p *Producer) SendMessage(m proto.Message, key ...string) (int32, int64, error) {
-//	kMsg := &sarama.ProducerMessage{}
-//	kMsg.Topic = p.topic
-//	if len(key) == 1 {
-//		kMsg.Key = sarama.StringEncoder(key[0])
-//	}
-//	bMsg, err := proto.Marshal(m)
-//	if err != nil {
-//		logx.WithContext(context.Background()).Errorf("proto marshal err: %s", err.Error())
-//		return -1, -1, err
-//	}
-//	kMsg.Value = sarama.ByteEncoder(bMsg)
-//	return p.producer.SendMessage(kMsg)
-//}
-
-func (p *Producer) SendMessage(m string, key ...string) (int32, int64, error) {
+func (p *Producer) SendMessage(ctx context.Context, m string, key ...string) (partition int32, offset int64, err error) {
+	traceId := ctxdata.GetTraceIdFromCtx(ctx)
 	msg := &sarama.ProducerMessage{}
+	msg.Headers = []sarama.RecordHeader{{
+		Key:   sarama.ByteEncoder("trace_id"),
+		Value: sarama.ByteEncoder(traceId),
+	}}
 	msg.Topic = p.topic
 	if len(key) == 1 {
 		msg.Key = sarama.StringEncoder(key[0])
 	}
-	//bMsg, err := proto.Marshal(m)
-	//if err != nil {
-	//	logx.Errorf("proto marshal err: %s", err.Error())
-	//	return -1, -1, err
-	//}
 	msg.Value = sarama.StringEncoder(m)
-	return p.producer.SendMessage(msg)
+
+	trace.StartTrace(ctx, "SendMessageToKafka", func(ctx context.Context) {
+		partition, offset, err = p.producer.SendMessage(msg)
+	}, attribute.StringSlice("keys", key), attribute.String("topic", p.topic))
+	return
 }

+ 0 - 0
ext/model/csinfo.go → comm/model/csinfo.go


+ 20 - 0
comm/model/message.go

@@ -0,0 +1,20 @@
+//@File     message.go
+//@Time     2022/05/10
+//@Author   #Suyghur,
+
+package model
+
+type KqMessage struct {
+	CreateTime  string `json:"create_time"`
+	Content     string `json:"content"`
+	Pic         string `json:"pic"`
+	ReceiverId  string `json:"receiver_id"`
+	SenderId    string `json:"sender_id"`
+	GameId      string `json:"game_id"`
+	OperationId string `json:"operation_id"`
+}
+
+type KqCmdMessage struct {
+	KqMessage
+	Opt int64 `json:"opt"`
+}

+ 0 - 0
ext/result/err.go → comm/result/err.go


+ 0 - 0
ext/result/httpresult.go → comm/result/httpresult.go


+ 0 - 0
ext/result/responsebean.go → comm/result/responsebean.go


+ 42 - 0
comm/trace/trace.go

@@ -0,0 +1,42 @@
+//@File     trace.go
+//@Time     2022/05/16
+//@Author   #Suyghur,
+
+package trace
+
+import (
+	"context"
+	gozerotrace "github.com/zeromicro/go-zero/core/trace"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/propagation"
+	oteltrace "go.opentelemetry.io/otel/trace"
+	"net/http"
+	"ylink/comm/utils"
+)
+
+func StartTrace(ctx context.Context, name string, callback func(context.Context), kv ...attribute.KeyValue) {
+	tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
+	spanCtx, span := tracer.Start(ctx, name, oteltrace.WithSpanKind(oteltrace.SpanKindInternal), oteltrace.WithAttributes(kv...))
+	defer span.End()
+	callback(spanCtx)
+}
+
+func RunOnTracing(traceId string, callback func(ctx context.Context), kv ...attribute.KeyValue) {
+	propagator := otel.GetTextMapPropagator()
+	tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
+	header := http.Header{}
+	if len(traceId) != 0 {
+		header.Set("x-trace-id", traceId)
+	}
+	ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(header))
+	spanName := utils.CallerFuncName()
+	traceIdFromHex, _ := oteltrace.TraceIDFromHex(traceId)
+	ctx = oteltrace.ContextWithSpanContext(ctx, oteltrace.NewSpanContext(oteltrace.SpanContextConfig{
+		TraceID: traceIdFromHex,
+	}))
+	spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindConsumer), oteltrace.WithAttributes(kv...))
+	defer span.End()
+	propagator.Inject(spanCtx, propagation.HeaderCarrier(header))
+	callback(spanCtx)
+}

+ 39 - 0
comm/utils/callerfunc.go

@@ -0,0 +1,39 @@
+//@File     callerfunc.go
+//@Time     2022/05/16
+//@Author   #Suyghur,
+
+package utils
+
+import (
+	"fmt"
+	"runtime"
+	"strings"
+)
+
+func CallerFuncName() string {
+	pc := make([]uintptr, 1)
+	runtime.Callers(3, pc)
+	f := runtime.FuncForPC(pc[0])
+	return f.Name()
+}
+
+func CallerFuncLine() string {
+	pc := make([]uintptr, 1)
+	runtime.Callers(2, pc)
+	f := runtime.FuncForPC(pc[0])
+	file, line := f.FileLine(pc[0])
+	return fmt.Sprintf("%s@%d", file, line)
+}
+
+func cleanUpFuncName(funcName string) string {
+	end := strings.LastIndex(funcName, ".")
+	if end == -1 {
+		return ""
+	}
+	return funcName[end+1:]
+}
+
+func GetSelfFuncName() string {
+	pc, _, _, _ := runtime.Caller(1)
+	return cleanUpFuncName(runtime.FuncForPC(pc).Name())
+}

+ 0 - 0
ext/utils/comparator/comparator.go → comm/utils/comparator/comparator.go


+ 0 - 0
ext/utils/iterator/iterator.go → comm/utils/iterator/iterator.go


+ 0 - 0
ext/utils/sync/locker.go → comm/utils/sync/locker.go


+ 0 - 0
ext/utils/visitor/visitor.go → comm/utils/visitor/visitor.go


+ 14 - 2
core/auth/rpc/internal/logic/checkauthlogic.go

@@ -4,7 +4,9 @@ import (
 	"context"
 	"github.com/golang-jwt/jwt/v4"
 	"github.com/pkg/errors"
-	"ylink/ext/result"
+	"ylink/comm/globalkey"
+	"ylink/comm/jwtkey"
+	"ylink/comm/result"
 
 	"ylink/core/auth/rpc/internal/svc"
 	"ylink/core/auth/rpc/pb"
@@ -32,7 +34,17 @@ func (l *CheckAuthLogic) CheckAuth(in *pb.CheckAuthReq) (*pb.CheckAuthResp, erro
 	})
 
 	if token.Valid {
-		return &pb.CheckAuthResp{}, nil
+		//将获取的token中的Claims强转为MapClaims
+		claims, _ := token.Claims.(jwt.MapClaims)
+		var uid string
+		if in.Type == globalkey.CONNECT_TYPE_PLAYER {
+			uid = claims[jwtkey.PlayerId].(string)
+		} else {
+			uid = claims[jwtkey.CsId].(string)
+		}
+		return &pb.CheckAuthResp{
+			Uid: uid,
+		}, nil
 	} else if ve, ok := err.(*jwt.ValidationError); ok {
 		if ve.Errors&jwt.ValidationErrorMalformed != 0 {
 			return nil, errors.Wrap(result.NewErrCode(result.TokenParseError), "")

+ 2 - 2
core/auth/rpc/internal/logic/csauthlogic.go

@@ -5,8 +5,8 @@ import (
 	"github.com/golang-jwt/jwt/v4"
 	"github.com/pkg/errors"
 	"time"
-	"ylink/ext/jwtkey"
-	"ylink/ext/result"
+	"ylink/comm/jwtkey"
+	"ylink/comm/result"
 
 	"ylink/core/auth/rpc/internal/svc"
 	"ylink/core/auth/rpc/pb"

+ 2 - 2
core/auth/rpc/internal/logic/playerauthlogic.go

@@ -5,8 +5,8 @@ import (
 	"github.com/golang-jwt/jwt/v4"
 	"github.com/pkg/errors"
 	"time"
-	"ylink/ext/jwtkey"
-	"ylink/ext/result"
+	"ylink/comm/jwtkey"
+	"ylink/comm/result"
 
 	"ylink/core/auth/rpc/internal/svc"
 	"ylink/core/auth/rpc/pb"

+ 36 - 16
core/auth/rpc/pb/auth.pb.go

@@ -174,7 +174,8 @@ type CheckAuthReq struct {
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	AccessToken string `protobuf:"bytes,1,opt,name=access_token,json=accessToken,proto3" json:"access_token,omitempty"`
+	Type        int64  `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"`
+	AccessToken string `protobuf:"bytes,2,opt,name=access_token,json=accessToken,proto3" json:"access_token,omitempty"`
 }
 
 func (x *CheckAuthReq) Reset() {
@@ -209,6 +210,13 @@ func (*CheckAuthReq) Descriptor() ([]byte, []int) {
 	return file_pb_auth_proto_rawDescGZIP(), []int{3}
 }
 
+func (x *CheckAuthReq) GetType() int64 {
+	if x != nil {
+		return x.Type
+	}
+	return 0
+}
+
 func (x *CheckAuthReq) GetAccessToken() string {
 	if x != nil {
 		return x.AccessToken
@@ -220,6 +228,8 @@ type CheckAuthResp struct {
 	state         protoimpl.MessageState
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
+
+	Uid string `protobuf:"bytes,1,opt,name=uid,proto3" json:"uid,omitempty"`
 }
 
 func (x *CheckAuthResp) Reset() {
@@ -254,6 +264,13 @@ func (*CheckAuthResp) Descriptor() ([]byte, []int) {
 	return file_pb_auth_proto_rawDescGZIP(), []int{4}
 }
 
+func (x *CheckAuthResp) GetUid() string {
+	if x != nil {
+		return x.Uid
+	}
+	return ""
+}
+
 var File_pb_auth_proto protoreflect.FileDescriptor
 
 var file_pb_auth_proto_rawDesc = []byte{
@@ -267,21 +284,24 @@ var file_pb_auth_proto_rawDesc = []byte{
 	0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x73, 0x49, 0x64, 0x22, 0x2d, 0x0a, 0x08,
 	0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65,
 	0x73, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b,
-	0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x31, 0x0a, 0x0c, 0x43,
-	0x68, 0x65, 0x63, 0x6b, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x12, 0x21, 0x0a, 0x0c, 0x61,
-	0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28,
-	0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x0f,
-	0x0a, 0x0d, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x32,
-	0x8e, 0x01, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x12, 0x2d, 0x0a, 0x0a, 0x70, 0x6c, 0x61, 0x79,
-	0x65, 0x72, 0x41, 0x75, 0x74, 0x68, 0x12, 0x11, 0x2e, 0x70, 0x62, 0x2e, 0x50, 0x6c, 0x61, 0x79,
-	0x65, 0x72, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x1a, 0x0c, 0x2e, 0x70, 0x62, 0x2e, 0x41,
-	0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x12, 0x25, 0x0a, 0x06, 0x63, 0x73, 0x41, 0x75, 0x74,
-	0x68, 0x12, 0x0d, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x73, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71,
-	0x1a, 0x0c, 0x2e, 0x70, 0x62, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x12, 0x30,
-	0x0a, 0x09, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x41, 0x75, 0x74, 0x68, 0x12, 0x10, 0x2e, 0x70, 0x62,
-	0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e,
-	0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70,
-	0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x45, 0x0a, 0x0c, 0x43,
+	0x68, 0x65, 0x63, 0x6b, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x12, 0x12, 0x0a, 0x04, 0x74,
+	0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12,
+	0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18,
+	0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b,
+	0x65, 0x6e, 0x22, 0x21, 0x0a, 0x0d, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x41, 0x75, 0x74, 0x68, 0x52,
+	0x65, 0x73, 0x70, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
+	0x52, 0x03, 0x75, 0x69, 0x64, 0x32, 0x8e, 0x01, 0x0a, 0x04, 0x41, 0x75, 0x74, 0x68, 0x12, 0x2d,
+	0x0a, 0x0a, 0x70, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x41, 0x75, 0x74, 0x68, 0x12, 0x11, 0x2e, 0x70,
+	0x62, 0x2e, 0x50, 0x6c, 0x61, 0x79, 0x65, 0x72, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x1a,
+	0x0c, 0x2e, 0x70, 0x62, 0x2e, 0x41, 0x75, 0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x12, 0x25, 0x0a,
+	0x06, 0x63, 0x73, 0x41, 0x75, 0x74, 0x68, 0x12, 0x0d, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x73, 0x41,
+	0x75, 0x74, 0x68, 0x52, 0x65, 0x71, 0x1a, 0x0c, 0x2e, 0x70, 0x62, 0x2e, 0x41, 0x75, 0x74, 0x68,
+	0x52, 0x65, 0x73, 0x70, 0x12, 0x30, 0x0a, 0x09, 0x63, 0x68, 0x65, 0x63, 0x6b, 0x41, 0x75, 0x74,
+	0x68, 0x12, 0x10, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x41, 0x75, 0x74, 0x68,
+	0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x41, 0x75,
+	0x74, 0x68, 0x52, 0x65, 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06,
+	0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
 }
 
 var (

+ 5 - 2
core/auth/rpc/pb/auth.proto

@@ -18,10 +18,13 @@ message AuthResp{
 }
 
 message CheckAuthReq{
-  string  access_token = 1;
+  int64 type = 1;
+  string  access_token = 2;
 }
 
-message CheckAuthResp{}
+message CheckAuthResp{
+  string uid = 1;
+}
 
 service Auth{
   rpc playerAuth (PlayerAuthReq) returns (AuthResp);

+ 2 - 8
core/cmd/rpc/etc/cmd.yaml

@@ -12,13 +12,7 @@ Telemetry:
   Sampler: 1.0
   Batcher: jaeger
 
-KqSendMsgConf:
+KqMsgBoxProducerConf:
   Brokers:
     - localhost:9092
-  Topic: send-box-topic
-  GroupId:
-
-Redis:
-  Host: redis:6379
-  Type: node
-  Pass: ylink
+  Topic: send-box-topic

+ 3 - 3
core/cmd/rpc/internal/config/config.go

@@ -2,11 +2,11 @@ package config
 
 import (
 	"github.com/zeromicro/go-zero/zrpc"
-	"ylink/ext/kafka"
+	"ylink/comm/kafka"
 )
 
 type Config struct {
 	zrpc.RpcServerConf
-	InnerRpcConf  zrpc.RpcClientConf
-	KqSendMsgConf kafka.KqConfig
+	InnerRpcConf         zrpc.RpcClientConf
+	KqMsgBoxProducerConf kafka.KqProducerConfig
 }

+ 1 - 1
core/cmd/rpc/internal/logic/csfetchhistorychatlogic.go

@@ -4,7 +4,7 @@ import (
 	"context"
 	"github.com/pkg/errors"
 	"google.golang.org/protobuf/types/known/structpb"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"ylink/core/cmd/rpc/internal/svc"
 	"ylink/core/cmd/rpc/pb"

+ 1 - 1
core/cmd/rpc/internal/logic/csfetchhistorymsglogic.go

@@ -4,7 +4,7 @@ import (
 	"context"
 	"github.com/pkg/errors"
 	"google.golang.org/protobuf/types/known/structpb"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"ylink/core/cmd/rpc/internal/svc"
 	"ylink/core/cmd/rpc/pb"

+ 1 - 1
core/cmd/rpc/internal/logic/playerfetchhistorymsglogic.go

@@ -4,7 +4,7 @@ import (
 	"context"
 	"github.com/pkg/errors"
 	"google.golang.org/protobuf/types/known/structpb"
-	"ylink/ext/result"
+	"ylink/comm/result"
 
 	"ylink/core/cmd/rpc/internal/svc"
 	"ylink/core/cmd/rpc/pb"

+ 12 - 9
core/cmd/rpc/internal/logic/playersendmsglogic.go

@@ -4,9 +4,10 @@ import (
 	"context"
 	"encoding/json"
 	"time"
+	"ylink/comm/ctxdata"
+	"ylink/comm/model"
 	"ylink/core/cmd/rpc/internal/svc"
 	"ylink/core/cmd/rpc/pb"
-	"ylink/ext/model"
 
 	"github.com/zeromicro/go-zero/core/logx"
 )
@@ -27,15 +28,17 @@ func NewPlayerSendMsgLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Pla
 
 func (l *PlayerSendMsgLogic) PlayerSendMsg(in *pb.PlayerSendMsgReq) (*pb.PlayerSendMsgResp, error) {
 	// 投递到自己的发件箱
-	msg, _ := json.Marshal(model.ChatMessage{
-		CreateTime: time.Now().Format("2006-01-02 15:04:05"),
-		Content:    in.Content,
-		Pic:        in.Pic,
-		ReceiverId: "",
-		SenderId:   in.PlayerId,
-		GameId:     in.GameId,
+	operationId := ctxdata.GetTraceIdFromCtx(l.ctx)
+	msg, _ := json.Marshal(model.KqMessage{
+		CreateTime:  time.Now().Format("2006-01-02 15:04:05"),
+		Content:     in.Content,
+		Pic:         in.Pic,
+		ReceiverId:  "",
+		SenderId:    in.PlayerId,
+		GameId:      in.GameId,
+		OperationId: operationId,
 	})
-	_, _, err := l.svcCtx.SendBoxProducer.SendMessage(string(msg), in.PlayerId)
+	_, _, err := l.svcCtx.KqMsgBoxProducer.SendMessage(l.ctx, string(msg), in.PlayerId)
 	if err != nil {
 		return nil, err
 	}

+ 7 - 13
core/cmd/rpc/internal/svc/servicecontext.go

@@ -1,28 +1,22 @@
 package svc
 
 import (
-	"github.com/zeromicro/go-zero/core/stores/redis"
 	"github.com/zeromicro/go-zero/zrpc"
+	"ylink/comm/kafka"
 	"ylink/core/cmd/rpc/internal/config"
 	"ylink/core/inner/rpc/inner"
-	"ylink/ext/kafka"
 )
 
 type ServiceContext struct {
-	Config          config.Config
-	InnerRpc        inner.Inner
-	SendBoxProducer *kafka.Producer
-	RedisClient     *redis.Redis
+	Config           config.Config
+	InnerRpc         inner.Inner
+	KqMsgBoxProducer *kafka.Producer
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
 	return &ServiceContext{
-		Config:          c,
-		InnerRpc:        inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
-		SendBoxProducer: kafka.NewKafkaProducer(c.KqSendMsgConf.Brokers, c.KqSendMsgConf.Topic),
-		RedisClient: redis.New(c.Redis.Host, func(r *redis.Redis) {
-			r.Type = c.Redis.Type
-			r.Pass = c.Redis.Pass
-		}),
+		Config:           c,
+		InnerRpc:         inner.NewInner(zrpc.MustNewClient(c.InnerRpcConf)),
+		KqMsgBoxProducer: kafka.NewKafkaProducer(c.KqMsgBoxProducerConf.Brokers, c.KqMsgBoxProducerConf.Topic),
 	}
 }

+ 30 - 0
core/inner/rpc/Dockerfile

@@ -0,0 +1,30 @@
+FROM golang:alpine AS builder
+
+LABEL stage=gobuilder
+
+ENV CGO_ENABLED 0
+ENV GOPROXY https://goproxy.cn,direct
+
+RUN apk update --no-cache && apk add --no-cache tzdata
+
+WORKDIR /build
+
+ADD go.mod .
+ADD go.sum .
+RUN go mod download
+COPY . .
+COPY core/inner/rpc/etc /app/etc
+RUN go build -ldflags="-s -w" -o /app/inner core/inner/rpc/inner.go
+
+
+FROM scratch
+
+COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
+COPY --from=builder /usr/share/zoneinfo/Asia/Shanghai /usr/share/zoneinfo/Asia/Shanghai
+ENV TZ Asia/Shanghai
+
+WORKDIR /app
+COPY --from=builder /app/inner /app/inner
+COPY --from=builder /app/etc /app/etc
+
+CMD ["./inner", "-f", "etc/inner.yaml"]

+ 6 - 3
core/inner/rpc/etc/inner.yaml

@@ -7,15 +7,18 @@ Telemetry:
   Sampler: 1.0
   Batcher: jaeger
 
-KqRecvMsgConf:
+KqMsgBoxConsumerConf:
   Brokers:
     - localhost:9092
   Topic: send-box-topic
   GroupId: inner-rpc
 
-KqSendMsgConf:
+KqMsgBoxProducerConf:
   Brokers:
     - localhost:9092
   Topic: recv-box-topic
-  GroupId:
 
+KqDbBoxProducerConf:
+  Brokers:
+    - localhost:9092
+  Topic: db-box-topic

+ 4 - 3
core/inner/rpc/internal/config/config.go

@@ -2,11 +2,12 @@ package config
 
 import (
 	"github.com/zeromicro/go-zero/zrpc"
-	"ylink/ext/kafka"
+	"ylink/comm/kafka"
 )
 
 type Config struct {
 	zrpc.RpcServerConf
-	KqRecvMsgConf kafka.KqConfig
-	KqSendMsgConf kafka.KqConfig
+	KqMsgBoxConsumerConf kafka.KqConsumerConfig
+	KqMsgBoxProducerConf kafka.KqProducerConfig
+	//KqDbBoxProducerConf  kafka.KqProducerConfig
 }

+ 2 - 1
core/inner/rpc/internal/ext/global.go

@@ -4,9 +4,10 @@
 
 package ext
 
-import "ylink/ext/ds/treemap"
+import "ylink/comm/ds/treemap"
 
 var (
 	IdMap *treemap.Map
 	CsMap *treemap.Map
+	// PlayerStatMap
 )

+ 0 - 71
core/inner/rpc/internal/ext/sendboxhandler.go

@@ -1,71 +0,0 @@
-//@File     sendboxhandler.go
-//@Time     2022/05/12
-//@Author   #Suyghur,
-
-package ext
-
-import (
-	"context"
-	"encoding/json"
-	"github.com/Shopify/sarama"
-	"github.com/zeromicro/go-zero/core/logx"
-	"ylink/ext/ds/treemap"
-	"ylink/ext/kafka"
-	"ylink/ext/model"
-)
-
-type callback func(msg []byte)
-
-type SendBoxConsumerHandler struct {
-	callbacks     map[string]callback
-	producer      *kafka.Producer
-	ConsumerGroup *kafka.ConsumerGroup
-}
-
-func (handler *SendBoxConsumerHandler) Init(c kafka.KqConfig, producer *kafka.Producer) {
-	handler.callbacks = make(map[string]callback)
-	handler.callbacks[c.Topic] = handler.handleMessage
-	handler.producer = producer
-	handler.ConsumerGroup = kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
-		KafkaVersion:   sarama.V0_10_2_0,
-		OffsetsInitial: sarama.OffsetNewest,
-		IsReturnErr:    false,
-	}, c.Brokers, []string{c.Topic}, c.GroupId)
-}
-
-func (handler *SendBoxConsumerHandler) handleMessage(msg []byte) {
-	logx.WithContext(context.Background()).Infof("message recv from send-box, %s", string(msg))
-	// todo 将message转发到recv-box
-
-	var message model.ChatMessage
-	err := json.Unmarshal(msg, &message)
-	if err != nil {
-		logx.WithContext(context.Background()).Errorf("unmarshal message err: %s", err.Error())
-		return
-	}
-	if len(message.ReceiverId) == 0 {
-		// 玩家发的消息
-		p2cMap := IdMap.Get(message.GameId).(*treemap.Map)
-		message.ReceiverId = p2cMap.Get(message.SenderId).(string)
-		b, _ := json.Marshal(message)
-		handler.producer.SendMessage(string(b), message.ReceiverId)
-	} else {
-		handler.producer.SendMessage(string(msg), message.ReceiverId)
-	}
-}
-
-func (SendBoxConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error {
-	return nil
-}
-
-func (SendBoxConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
-	return nil
-}
-
-func (handler *SendBoxConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
-	for msg := range claim.Messages() {
-		logx.WithContext(context.Background()).Infof("send-box get info to db, topic: %s, partition: %d, msg: %s", msg.Topic, msg.Partition, string(msg.Value))
-		handler.callbacks[msg.Topic](msg.Value)
-	}
-	return nil
-}

+ 1 - 1
core/inner/rpc/internal/logic/csfetchplayerqueuelogic.go

@@ -4,9 +4,9 @@ import (
 	"context"
 	"github.com/pkg/errors"
 	"google.golang.org/protobuf/types/known/structpb"
+	"ylink/comm/result"
 	"ylink/core/inner/rpc/internal/svc"
 	"ylink/core/inner/rpc/pb"
-	"ylink/ext/result"
 
 	"github.com/zeromicro/go-zero/core/logx"
 )

+ 107 - 2
core/inner/rpc/internal/server/innerserver.go

@@ -5,7 +5,21 @@ package server
 
 import (
 	"context"
-
+	"encoding/json"
+	"github.com/Shopify/sarama"
+	"github.com/zeromicro/go-zero/core/logx"
+	gozerotrace "github.com/zeromicro/go-zero/core/trace"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/propagation"
+	oteltrace "go.opentelemetry.io/otel/trace"
+	"net/http"
+	"ylink/comm/ds/treemap"
+	"ylink/comm/kafka"
+	"ylink/comm/model"
+	"ylink/comm/trace"
+	"ylink/comm/utils"
+	"ylink/core/inner/rpc/internal/ext"
 	"ylink/core/inner/rpc/internal/logic"
 	"ylink/core/inner/rpc/internal/svc"
 	"ylink/core/inner/rpc/pb"
@@ -14,12 +28,23 @@ import (
 type InnerServer struct {
 	svcCtx *svc.ServiceContext
 	pb.UnimplementedInnerServer
+	ConsumerGroup *kafka.ConsumerGroup
 }
 
 func NewInnerServer(svcCtx *svc.ServiceContext) *InnerServer {
-	return &InnerServer{
+	server := &InnerServer{
 		svcCtx: svcCtx,
+		ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
+			KafkaVersion:   sarama.V1_0_0_0,
+			OffsetsInitial: sarama.OffsetNewest,
+			IsReturnErr:    false,
+		},
+			svcCtx.Config.KqMsgBoxConsumerConf.Brokers,
+			[]string{svcCtx.Config.KqMsgBoxConsumerConf.Topic},
+			svcCtx.Config.KqMsgBoxConsumerConf.GroupId),
 	}
+	server.subscribe()
+	return server
 }
 
 func (s *InnerServer) PlayerFetchCsInfo(ctx context.Context, in *pb.InnerPlayerFetchCsInfoReq) (*pb.InnerPlayerFetchCsInfoResp, error) {
@@ -41,3 +66,83 @@ func (s *InnerServer) CsConnectPlayer(ctx context.Context, in *pb.InnerCsConnect
 	l := logic.NewCsConnectPlayerLogic(ctx, s.svcCtx)
 	return l.CsConnectPlayer(in)
 }
+
+func (s *InnerServer) Setup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (s *InnerServer) Cleanup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (s *InnerServer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	for msg := range claim.Messages() {
+		if msg.Topic == s.svcCtx.Config.KqMsgBoxConsumerConf.Topic {
+			s.handleMessage(sess, msg)
+		}
+	}
+	return nil
+}
+
+func (s *InnerServer) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
+	propagator := otel.GetTextMapPropagator()
+	tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
+	ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
+	spanName := utils.CallerFuncName()
+	spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
+	defer span.End()
+	propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
+	callback(spanCtx)
+}
+
+func (s *InnerServer) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
+	traceId := kafka.GetTraceFromHeader(msg.Headers)
+	if len(traceId) == 0 {
+		return
+	}
+	trace.RunOnTracing(traceId, func(ctx context.Context) {
+		var message model.KqMessage
+		if err := json.Unmarshal(msg.Value, &message); err != nil {
+			logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
+			return
+		}
+		trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
+			if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
+				// 玩家发的消息
+				p2cMap := ext.IdMap.Get(message.GameId).(*treemap.Map)
+				message.ReceiverId = p2cMap.Get(message.SenderId).(string)
+				logx.WithContext(ctx).Infof("receiver: %s", message.ReceiverId)
+				kMsg, _ := json.Marshal(message)
+				s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(kMsg), message.ReceiverId)
+			} else {
+				s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
+			}
+			sess.MarkMessage(msg, "")
+		}, attribute.String("msg.key", string(msg.Key)))
+	})
+
+	//s.runWithCtx(func(ctx context.Context) {
+	//	var message model.KqCmdMessage
+	//	if err := json.Unmarshal(msg.Value, &message); err != nil {
+	//		logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
+	//		return
+	//	}
+	//	trace.StartTrace(ctx, "InnerServer.handleMessage.SendMessage", func(ctx context.Context) {
+	//		if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
+	//			// 玩家发的消息
+	//			p2cMap := ext.IdMap.Get(message.GameId).(*treemap.Map)
+	//			message.ReceiverId = p2cMap.Get(message.SenderId).(string)
+	//			logx.Infof("receiver: %s", message.ReceiverId)
+	//			b, _ := json.Marshal(message)
+	//			s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(b), message.ReceiverId)
+	//		} else {
+	//			s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
+	//		}
+	//		sess.MarkMessage(msg, "")
+	//	}, attribute.String("msg.key", string(msg.Key)))
+	//})
+}
+
+func (s *InnerServer) subscribe() {
+	go s.ConsumerGroup.RegisterHandleAndConsumer(s)
+}

+ 10 - 10
core/inner/rpc/internal/svc/servicecontext.go

@@ -1,26 +1,23 @@
 package svc
 
 import (
+	"ylink/comm/ds/treemap"
+	"ylink/comm/kafka"
+	"ylink/comm/model"
 	"ylink/core/inner/rpc/internal/config"
 	"ylink/core/inner/rpc/internal/ext"
-	"ylink/ext/ds/treemap"
-	"ylink/ext/kafka"
-	"ylink/ext/model"
 )
 
 type ServiceContext struct {
-	Config          config.Config
-	RecvBoxProducer *kafka.Producer
+	Config           config.Config
+	KqMsgBoxProducer *kafka.Producer
 }
 
 func NewServiceContext(c config.Config) *ServiceContext {
 	fetchCsCenterInfo()
-	recvBoxProducer := kafka.NewKafkaProducer(c.KqSendMsgConf.Brokers, c.KqSendMsgConf.Topic)
-	var sendBoxHandler ext.SendBoxConsumerHandler
-	sendBoxHandler.Init(c.KqRecvMsgConf, recvBoxProducer)
-	go sendBoxHandler.ConsumerGroup.RegisterHandleAndConsumer(&sendBoxHandler)
 	return &ServiceContext{
-		Config: c,
+		Config:           c,
+		KqMsgBoxProducer: kafka.NewKafkaProducer(c.KqMsgBoxProducerConf.Brokers, c.KqMsgBoxProducerConf.Topic),
 	}
 }
 
@@ -33,6 +30,9 @@ func mockInfo() {
 	ext.IdMap = treemap.New(treemap.WithGoroutineSafe())
 	ext.CsMap = treemap.New(treemap.WithGoroutineSafe())
 
+	// 已连接的映射
+
+	// 专属客服映射
 	game1231P2cMap := treemap.New(treemap.WithGoroutineSafe())
 	game1231P2cMap.Insert("player1231", "cs_1231")
 	game1231P2cMap.Insert("player1111", "cs_2222")

+ 2 - 0
core/inner/rpc/pb/inner.proto

@@ -54,6 +54,8 @@ service Inner {
   rpc playerFetchCsInfo (InnerPlayerFetchCsInfoReq) returns (InnerPlayerFetchCsInfoResp);
   rpc playerDisconnect (InnerPlayerDisconnectReq) returns (InnerPlayerDisconnectResp);
 
+  //
+
   rpc csFetchPlayerQueue (InnerCsFetchPlayerQueueReq) returns (InnerCsFetchPlayerQueueResp);
   rpc csConnectPlayer (InnerCsConnectPlayerReq) returns (InnerCsConnectPlayerResp);
 }

+ 0 - 8
ext/globalkey/rediskey.go

@@ -1,8 +0,0 @@
-//@File     rediskey.go
-//@Time     2022/04/24
-//@Author   #Suyghur,
-
-package globalkey
-
-var AccessSecret string
-var AccessExpire int64

+ 0 - 14
ext/model/message.go

@@ -1,14 +0,0 @@
-//@File     message.go
-//@Time     2022/05/10
-//@Author   #Suyghur,
-
-package model
-
-type ChatMessage struct {
-	CreateTime string `json:"create_time"`
-	Content    string `json:"content"`
-	Pic        string `json:"pic"`
-	ReceiverId string `json:"receiver_id"`
-	SenderId   string `json:"sender_id"`
-	GameId     string `json:"game_id"`
-}

+ 30 - 0
flowsrv/rpc/Dockerfile

@@ -0,0 +1,30 @@
+FROM golang:alpine AS builder
+
+LABEL stage=gobuilder
+
+ENV CGO_ENABLED 0
+ENV GOPROXY https://goproxy.cn,direct
+
+RUN apk update --no-cache && apk add --no-cache tzdata
+
+WORKDIR /build
+
+ADD go.mod .
+ADD go.sum .
+RUN go mod download
+COPY . .
+COPY flowsrv/rpc/etc /app/etc
+RUN go build -ldflags="-s -w" -o /app/flowsrv flowsrv/rpc/flowsrv.go
+
+
+FROM scratch
+
+COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
+COPY --from=builder /usr/share/zoneinfo/Asia/Shanghai /usr/share/zoneinfo/Asia/Shanghai
+ENV TZ Asia/Shanghai
+
+WORKDIR /app
+COPY --from=builder /app/flowsrv /app/flowsrv
+COPY --from=builder /app/etc /app/etc
+
+CMD ["./flowsrv", "-f", "etc/flowsrv.yaml"]

+ 6 - 12
flowsrv/rpc/etc/flowsrv.yaml

@@ -1,25 +1,19 @@
 Name: flowsrv.rpc
 ListenOn: 0.0.0.0:10200
 
-Etcd:
-  Hosts:
-    - localhost:2379
-  Key: flowsrv.rpc
-
 Telemetry:
   Name: flowsrv-rpc
-  Endpoint: http://ylink-jaeger-trace.ylink.svc.cluster.local:14268/api/traces
+  Endpoint: http://localhost:14268/api/traces
   Sampler: 1.0
   Batcher: jaeger
 
 AuthRpcConf:
-  Etcd:
-    Hosts:
-      - localhost:2379
-    Key: auth.rpc
+  Endpoints:
+    - localhost:10400
+  NonBlock: true
 
-KqRecvMsgConf:
+KqMsgBoxConsumerConf:
   Brokers:
     - localhost:9092
   Topic: recv-box-topic
-  GroupId: flowsrv
+  GroupId: flowsrv-rpc

+ 3 - 3
flowsrv/rpc/internal/config/config.go

@@ -2,11 +2,11 @@ package config
 
 import (
 	"github.com/zeromicro/go-zero/zrpc"
-	"ylink/ext/kafka"
+	"ylink/comm/kafka"
 )
 
 type Config struct {
 	zrpc.RpcServerConf
-	AuthRpcConf zrpc.RpcClientConf
-	KqMsgConf   kafka.KqConfig
+	AuthRpcConf          zrpc.RpcClientConf
+	KqMsgBoxConsumerConf kafka.KqConsumerConfig
 }

+ 5 - 0
flowsrv/rpc/internal/ext/recvboxhandler.go

@@ -0,0 +1,5 @@
+//@File     recvboxhandler.go
+//@Time     2022/05/13
+//@Author   #Suyghur,
+
+package ext

+ 8 - 3
flowsrv/rpc/internal/logic/connectlogic.go

@@ -2,8 +2,9 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/result"
 	"ylink/core/auth/rpc/auth"
-	"ylink/ext/result"
+	"ylink/flowsrv/rpc/internal/mgr"
 
 	"ylink/flowsrv/rpc/internal/svc"
 	"ylink/flowsrv/rpc/pb"
@@ -26,10 +27,10 @@ func NewConnectLogic(ctx context.Context, svcCtx *svc.ServiceContext) *ConnectLo
 }
 
 func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServer) error {
-	_, err := l.svcCtx.AuthRpc.CheckAuth(l.ctx, &auth.CheckAuthReq{
+	authResp, err := l.svcCtx.AuthRpc.CheckAuth(l.ctx, &auth.CheckAuthReq{
+		Type:        in.Type,
 		AccessToken: in.AccessToken,
 	})
-	//data, _ := structpb.NewStruct(treemap[string]interface{}{})
 	if err != nil {
 		return stream.Send(&pb.CommandResp{
 			Code: result.TokenParseError,
@@ -37,6 +38,10 @@ func (l *ConnectLogic) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServe
 			Data: nil,
 		})
 	}
+	// update(对接的user的状态也返回)
+	//stream.RecvMsg()
+	mgr.GetFlowMgrInstance().SetFlow(authResp.Uid, stream)
+
 	return stream.Send(&pb.CommandResp{
 		Code: result.Ok,
 		Msg:  "success",

+ 5 - 2
flowsrv/rpc/internal/logic/disconnectlogic.go

@@ -2,8 +2,8 @@ package logic
 
 import (
 	"context"
+	"ylink/comm/result"
 	"ylink/core/auth/rpc/auth"
-	"ylink/ext/result"
 
 	"ylink/flowsrv/rpc/internal/svc"
 	"ylink/flowsrv/rpc/pb"
@@ -27,9 +27,9 @@ func NewDisconnectLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Discon
 
 func (l *DisconnectLogic) Disconnect(in *pb.CommandReq) (*pb.CommandResp, error) {
 	_, err := l.svcCtx.AuthRpc.CheckAuth(l.ctx, &auth.CheckAuthReq{
+		Type:        in.Type,
 		AccessToken: in.AccessToken,
 	})
-	//data, _ := structpb.NewStruct(treemap[string]interface{}{})
 	if err != nil {
 		return &pb.CommandResp{
 			Code: result.TokenParseError,
@@ -37,6 +37,9 @@ func (l *DisconnectLogic) Disconnect(in *pb.CommandReq) (*pb.CommandResp, error)
 			Data: nil,
 		}, err
 	}
+
+	// TODO: notify inner service
+
 	return &pb.CommandResp{
 		Code: result.Ok,
 		Msg:  "success",

+ 36 - 0
flowsrv/rpc/internal/mgr/flowmgr.go

@@ -0,0 +1,36 @@
+//@File     flowmgr.go
+//@Time     2022/05/13
+//@Author   #Suyghur,
+
+package mgr
+
+import (
+	"sync"
+	"ylink/flowsrv/rpc/pb"
+)
+
+type flowManager struct {
+	flowMap map[string]pb.Flowsrv_ConnectServer
+}
+
+var (
+	instance *flowManager
+	once     sync.Once
+)
+
+func GetFlowMgrInstance() *flowManager {
+	once.Do(func() {
+		instance = &flowManager{
+			flowMap: make(map[string]pb.Flowsrv_ConnectServer),
+		}
+	})
+	return instance
+}
+
+func (manager *flowManager) SetFlow(uid string, flow pb.Flowsrv_ConnectServer) {
+	manager.flowMap[uid] = flow
+}
+
+func (manager *flowManager) GetFlow(uid string) pb.Flowsrv_ConnectServer {
+	return manager.flowMap[uid]
+}

+ 109 - 1
flowsrv/rpc/internal/server/flowsrvserver.go

@@ -5,6 +5,19 @@ package server
 
 import (
 	"context"
+	"encoding/json"
+	"github.com/Shopify/sarama"
+	"github.com/zeromicro/go-zero/core/logx"
+	gozerotrace "github.com/zeromicro/go-zero/core/trace"
+	"go.opentelemetry.io/otel"
+	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/propagation"
+	oteltrace "go.opentelemetry.io/otel/trace"
+	"net/http"
+	"ylink/comm/kafka"
+	"ylink/comm/model"
+	"ylink/comm/trace"
+	"ylink/comm/utils"
 
 	"ylink/flowsrv/rpc/internal/logic"
 	"ylink/flowsrv/rpc/internal/svc"
@@ -14,12 +27,23 @@ import (
 type FlowsrvServer struct {
 	svcCtx *svc.ServiceContext
 	pb.UnimplementedFlowsrvServer
+	ConsumerGroup *kafka.ConsumerGroup
 }
 
 func NewFlowsrvServer(svcCtx *svc.ServiceContext) *FlowsrvServer {
-	return &FlowsrvServer{
+	server := &FlowsrvServer{
 		svcCtx: svcCtx,
+		ConsumerGroup: kafka.NewConsumerGroup(&kafka.ConsumerGroupConfig{
+			KafkaVersion:   sarama.V1_0_0_0,
+			OffsetsInitial: sarama.OffsetNewest,
+			IsReturnErr:    false,
+		},
+			svcCtx.Config.KqMsgBoxConsumerConf.Brokers,
+			[]string{svcCtx.Config.KqMsgBoxConsumerConf.Topic},
+			svcCtx.Config.KqMsgBoxConsumerConf.GroupId),
 	}
+	server.subscribe()
+	return server
 }
 
 func (s *FlowsrvServer) Connect(in *pb.CommandReq, stream pb.Flowsrv_ConnectServer) error {
@@ -31,3 +55,87 @@ func (s *FlowsrvServer) Disconnect(ctx context.Context, in *pb.CommandReq) (*pb.
 	l := logic.NewDisconnectLogic(ctx, s.svcCtx)
 	return l.Disconnect(in)
 }
+
+func (s *FlowsrvServer) Setup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (s *FlowsrvServer) Cleanup(_ sarama.ConsumerGroupSession) error {
+	return nil
+}
+
+func (s *FlowsrvServer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
+	for msg := range claim.Messages() {
+		if msg.Topic == s.svcCtx.Config.KqMsgBoxConsumerConf.Topic {
+			s.handleMessage(sess, msg)
+		}
+	}
+	return nil
+}
+
+func (s *FlowsrvServer) runWithCtx(callback func(ctx context.Context), kv ...attribute.KeyValue) {
+	propagator := otel.GetTextMapPropagator()
+	tracer := otel.GetTracerProvider().Tracer(gozerotrace.TraceName)
+	ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(http.Header{}))
+	spanName := utils.CallerFuncName()
+	spanCtx, span := tracer.Start(ctx, spanName, oteltrace.WithSpanKind(oteltrace.SpanKindServer), oteltrace.WithAttributes(kv...))
+	defer span.End()
+	propagator.Inject(spanCtx, propagation.HeaderCarrier(http.Header{}))
+	callback(spanCtx)
+}
+
+func (s *FlowsrvServer) handleMessage(sess sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) {
+	traceId := kafka.GetTraceFromHeader(msg.Headers)
+	if len(traceId) == 0 {
+		return
+	}
+	trace.RunOnTracing(traceId, func(ctx context.Context) {
+		var message model.KqMessage
+		if err := json.Unmarshal(msg.Value, &message); err != nil {
+			logx.Errorf("unmarshal msg error: %v", err)
+			return
+		}
+		trace.StartTrace(ctx, "FlowsrvServer.handleMessage.SendMessage", func(ctx context.Context) {
+			//if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
+			//	// 玩家发的消息
+			//	p2cMap := ext.IdMap.Get(message.GameId).(*treemap.Map)
+			//	message.ReceiverId = p2cMap.Get(message.SenderId).(string)
+			//	logx.Infof("receiver: %s", message.ReceiverId)
+			//	b, _ := json.Marshal(message)
+			//	s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(b), message.ReceiverId)
+			//} else {
+			//	s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
+			//}
+			logx.WithContext(ctx).Infof("headers: %v", msg.Headers)
+			logx.WithContext(ctx).Infof("traceId: %s", msg.Headers[0].Value)
+			logx.WithContext(ctx).Infof("flowsrv recv message: %v", string(msg.Value))
+			sess.MarkMessage(msg, "")
+		}, attribute.String("msg.key", string(msg.Key)))
+	})
+
+	//s.runWithCtx(func(ctx context.Context) {
+	//	var message model.KqCmdMessage
+	//	if err := json.Unmarshal(msg.Value, &message); err != nil {
+	//		logx.WithContext(ctx).Errorf("unmarshal msg error: %v", err)
+	//		return
+	//	}
+	//	trace.StartTrace(ctx, "FlowsrvServer.handleMessage.SendMessage", func(ctx context.Context) {
+	//		//if len(message.ReceiverId) == 0 || message.ReceiverId == "" {
+	//		//	// 玩家发的消息
+	//		//	p2cMap := ext.IdMap.Get(message.GameId).(*treemap.Map)
+	//		//	message.ReceiverId = p2cMap.Get(message.SenderId).(string)
+	//		//	logx.Infof("receiver: %s", message.ReceiverId)
+	//		//	b, _ := json.Marshal(message)
+	//		//	s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(b), message.ReceiverId)
+	//		//} else {
+	//		//	s.svcCtx.KqMsgBoxProducer.SendMessage(ctx, string(msg.Value), message.ReceiverId)
+	//		//}
+	//		logx.WithContext(ctx).Infof("flowsrv recv message: %v", string(msg.Value))
+	//		sess.MarkMessage(msg, "")
+	//	}, attribute.String("msg.key", string(msg.Key)))
+	//})
+}
+
+func (s *FlowsrvServer) subscribe() {
+	go s.ConsumerGroup.RegisterHandleAndConsumer(s)
+}

+ 37 - 90
flowsrv/rpc/pb/flowsrv.pb.go

@@ -21,59 +21,13 @@ const (
 	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
 )
 
-type ConnectType int32
-
-const (
-	ConnectType_PLAYER ConnectType = 0
-	ConnectType_CS     ConnectType = 1
-)
-
-// Enum value maps for ConnectType.
-var (
-	ConnectType_name = map[int32]string{
-		0: "PLAYER",
-		1: "CS",
-	}
-	ConnectType_value = map[string]int32{
-		"PLAYER": 0,
-		"CS":     1,
-	}
-)
-
-func (x ConnectType) Enum() *ConnectType {
-	p := new(ConnectType)
-	*p = x
-	return p
-}
-
-func (x ConnectType) String() string {
-	return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
-}
-
-func (ConnectType) Descriptor() protoreflect.EnumDescriptor {
-	return file_pb_flowsrv_proto_enumTypes[0].Descriptor()
-}
-
-func (ConnectType) Type() protoreflect.EnumType {
-	return &file_pb_flowsrv_proto_enumTypes[0]
-}
-
-func (x ConnectType) Number() protoreflect.EnumNumber {
-	return protoreflect.EnumNumber(x)
-}
-
-// Deprecated: Use ConnectType.Descriptor instead.
-func (ConnectType) EnumDescriptor() ([]byte, []int) {
-	return file_pb_flowsrv_proto_rawDescGZIP(), []int{0}
-}
-
 type CommandReq struct {
 	state         protoimpl.MessageState
 	sizeCache     protoimpl.SizeCache
 	unknownFields protoimpl.UnknownFields
 
-	Type        ConnectType `protobuf:"varint,1,opt,name=type,proto3,enum=pb.ConnectType" json:"type,omitempty"`
-	AccessToken string      `protobuf:"bytes,2,opt,name=access_token,json=accessToken,proto3" json:"access_token,omitempty"`
+	Type        int64  `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"`
+	AccessToken string `protobuf:"bytes,2,opt,name=access_token,json=accessToken,proto3" json:"access_token,omitempty"`
 }
 
 func (x *CommandReq) Reset() {
@@ -108,11 +62,11 @@ func (*CommandReq) Descriptor() ([]byte, []int) {
 	return file_pb_flowsrv_proto_rawDescGZIP(), []int{0}
 }
 
-func (x *CommandReq) GetType() ConnectType {
+func (x *CommandReq) GetType() int64 {
 	if x != nil {
 		return x.Type
 	}
-	return ConnectType_PLAYER
+	return 0
 }
 
 func (x *CommandReq) GetAccessToken() string {
@@ -191,28 +145,25 @@ var file_pb_flowsrv_proto_rawDesc = []byte{
 	0x0a, 0x10, 0x70, 0x62, 0x2f, 0x66, 0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x2e, 0x70, 0x72, 0x6f,
 	0x74, 0x6f, 0x12, 0x02, 0x70, 0x62, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
 	0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x73, 0x74, 0x72, 0x75, 0x63, 0x74, 0x2e, 0x70,
-	0x72, 0x6f, 0x74, 0x6f, 0x22, 0x54, 0x0a, 0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
-	0x65, 0x71, 0x12, 0x23, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e,
-	0x32, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70,
-	0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73,
-	0x73, 0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61,
-	0x63, 0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x60, 0x0a, 0x0b, 0x43, 0x6f,
-	0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64,
-	0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a,
-	0x03, 0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x12,
-	0x2b, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e,
-	0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
-	0x53, 0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x2a, 0x21, 0x0a, 0x0b,
-	0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x50,
-	0x4c, 0x41, 0x59, 0x45, 0x52, 0x10, 0x00, 0x12, 0x06, 0x0a, 0x02, 0x43, 0x53, 0x10, 0x01, 0x32,
-	0x66, 0x0a, 0x07, 0x46, 0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x12, 0x2c, 0x0a, 0x07, 0x63, 0x6f,
-	0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
-	0x6e, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
-	0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x30, 0x01, 0x12, 0x2d, 0x0a, 0x0a, 0x64, 0x69, 0x73, 0x63,
-	0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d,
-	0x61, 0x6e, 0x64, 0x52, 0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d,
-	0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62,
-	0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+	0x72, 0x6f, 0x74, 0x6f, 0x22, 0x43, 0x0a, 0x0a, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
+	0x65, 0x71, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03,
+	0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x61, 0x63, 0x63, 0x65, 0x73, 0x73,
+	0x5f, 0x74, 0x6f, 0x6b, 0x65, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x61, 0x63,
+	0x63, 0x65, 0x73, 0x73, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x22, 0x60, 0x0a, 0x0b, 0x43, 0x6f, 0x6d,
+	0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x6f, 0x64, 0x65,
+	0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x63, 0x6f, 0x64, 0x65, 0x12, 0x10, 0x0a, 0x03,
+	0x6d, 0x73, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6d, 0x73, 0x67, 0x12, 0x2b,
+	0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x67,
+	0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53,
+	0x74, 0x72, 0x75, 0x63, 0x74, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x32, 0x66, 0x0a, 0x07, 0x46,
+	0x6c, 0x6f, 0x77, 0x73, 0x72, 0x76, 0x12, 0x2c, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63,
+	0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65,
+	0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65,
+	0x73, 0x70, 0x30, 0x01, 0x12, 0x2d, 0x0a, 0x0a, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65,
+	0x63, 0x74, 0x12, 0x0e, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
+	0x65, 0x71, 0x1a, 0x0f, 0x2e, 0x70, 0x62, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52,
+	0x65, 0x73, 0x70, 0x42, 0x06, 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x33,
 }
 
 var (
@@ -227,26 +178,23 @@ func file_pb_flowsrv_proto_rawDescGZIP() []byte {
 	return file_pb_flowsrv_proto_rawDescData
 }
 
-var file_pb_flowsrv_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
 var file_pb_flowsrv_proto_msgTypes = make([]protoimpl.MessageInfo, 2)
 var file_pb_flowsrv_proto_goTypes = []interface{}{
-	(ConnectType)(0),        // 0: pb.ConnectType
-	(*CommandReq)(nil),      // 1: pb.CommandReq
-	(*CommandResp)(nil),     // 2: pb.CommandResp
-	(*structpb.Struct)(nil), // 3: google.protobuf.Struct
+	(*CommandReq)(nil),      // 0: pb.CommandReq
+	(*CommandResp)(nil),     // 1: pb.CommandResp
+	(*structpb.Struct)(nil), // 2: google.protobuf.Struct
 }
 var file_pb_flowsrv_proto_depIdxs = []int32{
-	0, // 0: pb.CommandReq.type:type_name -> pb.ConnectType
-	3, // 1: pb.CommandResp.data:type_name -> google.protobuf.Struct
-	1, // 2: pb.Flowsrv.connect:input_type -> pb.CommandReq
-	1, // 3: pb.Flowsrv.disconnect:input_type -> pb.CommandReq
-	2, // 4: pb.Flowsrv.connect:output_type -> pb.CommandResp
-	2, // 5: pb.Flowsrv.disconnect:output_type -> pb.CommandResp
-	4, // [4:6] is the sub-list for method output_type
-	2, // [2:4] is the sub-list for method input_type
-	2, // [2:2] is the sub-list for extension type_name
-	2, // [2:2] is the sub-list for extension extendee
-	0, // [0:2] is the sub-list for field type_name
+	2, // 0: pb.CommandResp.data:type_name -> google.protobuf.Struct
+	0, // 1: pb.Flowsrv.connect:input_type -> pb.CommandReq
+	0, // 2: pb.Flowsrv.disconnect:input_type -> pb.CommandReq
+	1, // 3: pb.Flowsrv.connect:output_type -> pb.CommandResp
+	1, // 4: pb.Flowsrv.disconnect:output_type -> pb.CommandResp
+	3, // [3:5] is the sub-list for method output_type
+	1, // [1:3] is the sub-list for method input_type
+	1, // [1:1] is the sub-list for extension type_name
+	1, // [1:1] is the sub-list for extension extendee
+	0, // [0:1] is the sub-list for field type_name
 }
 
 func init() { file_pb_flowsrv_proto_init() }
@@ -285,14 +233,13 @@ func file_pb_flowsrv_proto_init() {
 		File: protoimpl.DescBuilder{
 			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
 			RawDescriptor: file_pb_flowsrv_proto_rawDesc,
-			NumEnums:      1,
+			NumEnums:      0,
 			NumMessages:   2,
 			NumExtensions: 0,
 			NumServices:   1,
 		},
 		GoTypes:           file_pb_flowsrv_proto_goTypes,
 		DependencyIndexes: file_pb_flowsrv_proto_depIdxs,
-		EnumInfos:         file_pb_flowsrv_proto_enumTypes,
 		MessageInfos:      file_pb_flowsrv_proto_msgTypes,
 	}.Build()
 	File_pb_flowsrv_proto = out.File

+ 1 - 5
flowsrv/rpc/pb/flowsrv.proto

@@ -6,13 +6,9 @@ package pb;
 
 import "google/protobuf/struct.proto";
 
-enum ConnectType{
-  PLAYER = 0;
-  CS = 1;
-}
 
 message CommandReq {
-  ConnectType type = 1;
+  int64 type = 1;
   string access_token = 2;
 }
 

+ 1 - 0
go.mod

@@ -19,6 +19,7 @@ require (
 	github.com/coreos/go-systemd/v22 v22.3.2 // indirect
 	github.com/davecgh/go-spew v1.1.1 // indirect
 	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
+	github.com/duke-git/lancet/v2 v2.0.6 // indirect
 	github.com/eapache/go-resiliency v1.2.0 // indirect
 	github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
 	github.com/eapache/queue v1.1.0 // indirect

+ 2 - 0
go.sum

@@ -101,6 +101,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r
 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
 github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
 github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
+github.com/duke-git/lancet/v2 v2.0.6 h1:HZKwz3Lcslh1wKYscKy21MhF1JOFrF9bbE1mKM54P/s=
+github.com/duke-git/lancet/v2 v2.0.6/go.mod h1:5Nawyf/bK783rCiHyVkZLx+jj8028oVVjLOrC21ZONA=
 github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
 github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
 github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=