Commit a983570a authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Status registry refactor.

parent 74916aa9
Loading
Loading
Loading
Loading
+5 −3
Original line number Diff line number Diff line
@@ -129,7 +129,8 @@ func main() {
	cookie := newOrLoadCookie(config)
	metrics := server.NewMetrics(logger, startupLogger, config)
	sessionRegistry := server.NewLocalSessionRegistry(metrics)
	tracker := server.StartLocalTracker(logger, config, sessionRegistry, metrics, jsonpbMarshaler)
	statusRegistry := server.NewStatusRegistry(logger, config, sessionRegistry, jsonpbMarshaler)
	tracker := server.StartLocalTracker(logger, config, sessionRegistry, statusRegistry, metrics, jsonpbMarshaler)
	router := server.NewLocalMessageRouter(sessionRegistry, tracker, jsonpbMarshaler)
	leaderboardCache := server.NewLocalLeaderboardCache(logger, startupLogger, db)
	leaderboardRankCache := server.NewLocalLeaderboardRankCache(startupLogger, db, config.GetLeaderboard(), leaderboardCache)
@@ -149,10 +150,10 @@ func main() {

	leaderboardScheduler.Start(runtime)

	pipeline := server.NewPipeline(logger, config, db, jsonpbMarshaler, jsonpbUnmarshaler, sessionRegistry, matchRegistry, partyRegistry, matchmaker, tracker, router, runtime)
	pipeline := server.NewPipeline(logger, config, db, jsonpbMarshaler, jsonpbUnmarshaler, sessionRegistry, statusRegistry, matchRegistry, partyRegistry, matchmaker, tracker, router, runtime)
	statusHandler := server.NewLocalStatusHandler(logger, sessionRegistry, matchRegistry, tracker, metrics, config.GetName())

	apiServer := server.StartApiServer(logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, sessionRegistry, matchRegistry, matchmaker, tracker, router, metrics, pipeline, runtime)
	apiServer := server.StartApiServer(logger, startupLogger, db, jsonpbMarshaler, jsonpbUnmarshaler, config, socialClient, leaderboardCache, leaderboardRankCache, sessionRegistry, statusRegistry, matchRegistry, matchmaker, tracker, router, metrics, pipeline, runtime)
	consoleServer := server.StartConsoleServer(logger, startupLogger, db, config, tracker, router, statusHandler, runtimeInfo, matchRegistry, configWarnings, semver, leaderboardCache, leaderboardRankCache, apiServer, cookie)

	gaenabled := len(os.Getenv("NAKAMA_TELEMETRY")) < 1
@@ -213,6 +214,7 @@ func main() {
	matchmaker.Stop()
	leaderboardScheduler.Stop()
	tracker.Stop()
	statusRegistry.Stop()
	sessionRegistry.Stop()

	if gaenabled {
+2 −2
Original line number Diff line number Diff line
@@ -82,7 +82,7 @@ type ApiServer struct {
	grpcGatewayServer    *http.Server
}

func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, metrics *Metrics, pipeline *Pipeline, runtime *Runtime) *ApiServer {
func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, config Config, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, sessionRegistry SessionRegistry, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, metrics *Metrics, pipeline *Pipeline, runtime *Runtime) *ApiServer {
	var gatewayContextTimeoutMs string
	if config.GetSocket().IdleTimeoutMs > 500 {
		// Ensure the GRPC Gateway timeout is just under the idle timeout (if possible) to ensure it has priority.
@@ -203,7 +203,7 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, j
	grpcGatewayRouter := mux.NewRouter()
	// Special case routes. Do NOT enable compression on WebSocket route, it results in "http: response.Write on hijacked connection" errors.
	grpcGatewayRouter.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(200) }).Methods("GET")
	grpcGatewayRouter.HandleFunc("/ws", NewSocketWsAcceptor(logger, config, sessionRegistry, matchmaker, tracker, metrics, runtime, jsonpbMarshaler, jsonpbUnmarshaler, pipeline)).Methods("GET")
	grpcGatewayRouter.HandleFunc("/ws", NewSocketWsAcceptor(logger, config, sessionRegistry, statusRegistry, matchmaker, tracker, metrics, runtime, jsonpbMarshaler, jsonpbUnmarshaler, pipeline)).Methods("GET")

	// Another nested router to hijack RPC requests bound for GRPC Gateway.
	grpcGatewayMux := mux.NewRouter()
+3 −1
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@ type Pipeline struct {
	jsonpbMarshaler   *jsonpb.Marshaler
	jsonpbUnmarshaler *jsonpb.Unmarshaler
	sessionRegistry   SessionRegistry
	statusRegistry    *StatusRegistry
	matchRegistry     MatchRegistry
	partyRegistry     PartyRegistry
	matchmaker        Matchmaker
@@ -41,7 +42,7 @@ type Pipeline struct {
	node              string
}

func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, partyRegistry PartyRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, runtime *Runtime) *Pipeline {
func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, sessionRegistry SessionRegistry, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, partyRegistry PartyRegistry, matchmaker Matchmaker, tracker Tracker, router MessageRouter, runtime *Runtime) *Pipeline {
	return &Pipeline{
		logger:            logger,
		config:            config,
@@ -49,6 +50,7 @@ func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, jsonpbMarshaler
		jsonpbMarshaler:   jsonpbMarshaler,
		jsonpbUnmarshaler: jsonpbUnmarshaler,
		sessionRegistry:   sessionRegistry,
		statusRegistry:    statusRegistry,
		matchRegistry:     matchRegistry,
		partyRegistry:     partyRegistry,
		matchmaker:        matchmaker,
+27 −22
Original line number Diff line number Diff line
@@ -44,6 +44,10 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r
			}}}, true)
			return
		}
		if userID == session.UserID() {
			// The user cannot follow themselves.
			continue
		}

		uniqueUserIDs[userID] = struct{}{}
	}
@@ -59,10 +63,21 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r
			}}}, true)
			return
		}
		if username == session.Username() {
			// The user cannot follow themselves.
			continue
		}

		uniqueUsernames[username] = struct{}{}
	}

	if len(uniqueUserIDs) == 0 && len(uniqueUsernames) == 0 {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Status{Status: &rtapi.Status{
			Presences: make([]*rtapi.UserPresence, 0),
		}}}, true)
		return
	}

	var followUserIDs map[uuid.UUID]struct{}
	if len(uniqueUsernames) == 0 {
		params := make([]interface{}, 0, len(uniqueUserIDs))
@@ -152,6 +167,10 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r
				}}}, true)
				return
			}
			if uid == session.UserID() {
				// The user cannot follow themselves.
				continue
			}

			followUserIDs[uid] = struct{}{}
		}
@@ -169,24 +188,11 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r
	}

	// Follow all of the validated user IDs, and prepare a list of current presences to return.
	p.statusRegistry.Follow(session.ID(), followUserIDs)

	presences := make([]*rtapi.UserPresence, 0, len(followUserIDs))
	ops := make([]*TrackerOp, 0, len(followUserIDs))
	for userID := range followUserIDs {
		ops = append(ops, &TrackerOp{
			Stream: PresenceStream{Mode: StreamModeStatus, Subject: userID},
			Meta:   PresenceMeta{Format: session.Format(), Username: session.Username(), Hidden: true},
		})
	}
	success := p.tracker.TrackMulti(session.Context(), session.ID(), ops, session.UserID(), false)
	if !success {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Could not follow user status",
		}}}, true)
		return
	}
	for _, op := range ops {
		ps := p.tracker.ListByStream(op.Stream, false, true)
		ps := p.tracker.ListByStream(PresenceStream{Mode: StreamModeStatus, Subject: userID}, false, true)
		for _, p := range ps {
			presences = append(presences, &rtapi.UserPresence{
				UserId:    p.UserID.String(),
@@ -220,15 +226,14 @@ func (p *Pipeline) statusUnfollow(logger *zap.Logger, session Session, envelope
			}}}, true)
			return
		}
		userIDs = append(userIDs, userID)
		if userID == session.UserID() {
			// The user cannot unfollow themselves.
			continue
		}

	streams := make([]*PresenceStream, 0, len(userIDs))
	for _, userID := range userIDs {
		streams = append(streams, &PresenceStream{Mode: StreamModeStatus, Subject: userID})
		userIDs = append(userIDs, userID)
	}

	p.tracker.UntrackMulti(session.ID(), streams, session.UserID())
	p.statusRegistry.Unfollow(session.ID(), userIDs)

	session.Send(&rtapi.Envelope{Cid: envelope.Cid}, true)
}
+7 −1
Original line number Diff line number Diff line
@@ -60,6 +60,7 @@ type sessionWS struct {
	writeWaitDuration  time.Duration

	sessionRegistry SessionRegistry
	statusRegistry  *StatusRegistry
	matchmaker      Matchmaker
	tracker         Tracker
	metrics         *Metrics
@@ -74,7 +75,7 @@ type sessionWS struct {
	outgoingCh             chan []byte
}

func NewSessionWS(logger *zap.Logger, config Config, format SessionFormat, sessionID, userID uuid.UUID, username string, vars map[string]string, expiry int64, clientIP string, clientPort string, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, conn *websocket.Conn, sessionRegistry SessionRegistry, matchmaker Matchmaker, tracker Tracker, metrics *Metrics, pipeline *Pipeline, runtime *Runtime) Session {
func NewSessionWS(logger *zap.Logger, config Config, format SessionFormat, sessionID, userID uuid.UUID, username string, vars map[string]string, expiry int64, clientIP string, clientPort string, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, conn *websocket.Conn, sessionRegistry SessionRegistry, statusRegistry *StatusRegistry, matchmaker Matchmaker, tracker Tracker, metrics *Metrics, pipeline *Pipeline, runtime *Runtime) Session {
	sessionLogger := logger.With(zap.String("uid", userID.String()), zap.String("sid", sessionID.String()))

	sessionLogger.Info("New WebSocket session connected", zap.Uint8("format", uint8(format)))
@@ -109,6 +110,7 @@ func NewSessionWS(logger *zap.Logger, config Config, format SessionFormat, sessi
		writeWaitDuration:  time.Duration(config.GetSocket().WriteWaitMs) * time.Millisecond,

		sessionRegistry: sessionRegistry,
		statusRegistry:  statusRegistry,
		matchmaker:      matchmaker,
		tracker:         tracker,
		metrics:         metrics,
@@ -444,6 +446,10 @@ func (s *sessionWS) Close(reason string) {
	if s.logger.Core().Enabled(zap.DebugLevel) {
		s.logger.Info("Cleaned up closed connection tracker")
	}
	s.statusRegistry.UnfollowAll(s.id)
	if s.logger.Core().Enabled(zap.DebugLevel) {
		s.logger.Info("Cleaned up closed connection status registry")
	}
	s.sessionRegistry.Remove(s.id)
	if s.logger.Core().Enabled(zap.DebugLevel) {
		s.logger.Info("Cleaned up closed connection session registry")
Loading