Commit dc834438 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Disconnect match participants when an authoritative match ends due to an error.

parent d093d056
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -14,6 +14,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Strictly validate authoritative match create parameter encoding.
- Only perform user account updates if fields have changed.
- Developer console status snapshot gauges more accurately reflect current server metrics.
- Disconnect match participants when an authoritative match ends due to an error.
- Build with Go 1.14.2 release.

### Fixed
+1 −1
Original line number Diff line number Diff line
@@ -124,7 +124,7 @@ func main() {
	leaderboardCache := server.NewLocalLeaderboardCache(logger, startupLogger, db)
	leaderboardRankCache := server.NewLocalLeaderboardRankCache(startupLogger, db, config.GetLeaderboard(), leaderboardCache)
	leaderboardScheduler := server.NewLocalLeaderboardScheduler(logger, db, config, leaderboardCache, leaderboardRankCache)
	matchRegistry := server.NewLocalMatchRegistry(logger, startupLogger, config, tracker, router, metrics, config.GetName())
	matchRegistry := server.NewLocalMatchRegistry(logger, startupLogger, config, sessionRegistry, tracker, router, metrics, config.GetName())
	tracker.SetMatchJoinListener(matchRegistry.Join)
	tracker.SetMatchLeaveListener(matchRegistry.Leave)
	streamManager := server.NewLocalStreamManager(config, sessionRegistry, tracker)
+26 −10
Original line number Diff line number Diff line
@@ -72,6 +72,7 @@ func (m *MatchDataMessage) GetReceiveTime() int64 {

type MatchHandler struct {
	logger          *zap.Logger
	sessionRegistry SessionRegistry
	matchRegistry   MatchRegistry
	router          MessageRouter

@@ -107,7 +108,7 @@ type MatchHandler struct {
	state interface{}
}

func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegistry, router MessageRouter, core RuntimeMatchCore, id uuid.UUID, node string, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error) {
func NewMatchHandler(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, matchRegistry MatchRegistry, router MessageRouter, core RuntimeMatchCore, id uuid.UUID, node string, stopped *atomic.Bool, params map[string]interface{}) (*MatchHandler, error) {
	presenceList := NewMatchPresenceList()

	deferredCh := make(chan *DeferredMessage, config.GetMatch().DeferredQueueSize)
@@ -133,6 +134,7 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
	// Construct the match.
	mh := &MatchHandler{
		logger:          logger,
		sessionRegistry: sessionRegistry,
		matchRegistry:   matchRegistry,
		router:          router,

@@ -196,6 +198,14 @@ func NewMatchHandler(logger *zap.Logger, config Config, matchRegistry MatchRegis
	return mh, nil
}

// Disconnect all clients currently connected to the server.
func (mh *MatchHandler) disconnectClients() {
	presenceIDs := mh.PresenceList.ListPresenceIDs()
	for _, presenceID := range presenceIDs {
		_ = mh.sessionRegistry.Disconnect(context.Background(), presenceID.SessionID, presenceID.Node)
	}
}

// Stop the match handler and clean up all its resources.
func (mh *MatchHandler) Stop() {
	if !mh.stopped.CAS(false, true) {
@@ -227,8 +237,9 @@ func (mh *MatchHandler) queueCall(f func(*MatchHandler)) bool {
		return true
	default:
		// Match call queue is full, the handler isn't processing fast enough.
		mh.logger.Warn("Match handler call processing too slow, closing match")
		mh.Stop()
		mh.disconnectClients()
		mh.logger.Warn("Match handler call processing too slow, closing match")
		return false
	}
}
@@ -257,6 +268,7 @@ func loop(mh *MatchHandler) {
	state, err := mh.core.MatchLoop(mh.tick, mh.state, mh.inputCh)
	if err != nil {
		mh.Stop()
		mh.disconnectClients()
		mh.logger.Warn("Stopping match after error from match_loop execution", zap.Int64("tick", mh.tick), zap.Error(err))
		return
	}
@@ -337,8 +349,9 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *M
		state, allow, reason, err := mh.core.MatchJoinAttempt(mh.tick, mh.state, userID, sessionID, username, node, metadata)
		if err != nil {
			mh.Stop()
			mh.logger.Warn("Stopping match after error from match_join_attempt execution", zap.Int64("tick", mh.tick), zap.Error(err))
			resultCh <- &MatchJoinResult{Allow: false}
			mh.disconnectClients()
			mh.logger.Warn("Stopping match after error from match_join_attempt execution", zap.Int64("tick", mh.tick), zap.Error(err))
			return
		}

@@ -347,8 +360,8 @@ func (mh *MatchHandler) QueueJoinAttempt(ctx context.Context, resultCh chan<- *M
			mh.processDeferred()
		} else {
			mh.Stop()
			mh.logger.Info("Match join attempt returned nil or no state, stopping match")
			resultCh <- &MatchJoinResult{Allow: false}
			mh.logger.Info("Match join attempt returned nil or no state, stopping match")
			return
		}

@@ -396,6 +409,7 @@ func (mh *MatchHandler) QueueJoin(joins []*MatchPresence, mark bool) bool {
			state, err := mh.core.MatchJoin(mh.tick, mh.state, processed)
			if err != nil {
				mh.Stop()
				mh.disconnectClients()
				mh.logger.Warn("Stopping match after error from match_join execution", zap.Int64("tick", mh.tick), zap.Error(err))
				return
			}
@@ -434,6 +448,7 @@ func (mh *MatchHandler) QueueLeave(leaves []*MatchPresence) bool {
			state, err := mh.core.MatchLeave(mh.tick, mh.state, leaves)
			if err != nil {
				mh.Stop()
				mh.disconnectClients()
				mh.logger.Warn("Stopping match after error from match_leave execution", zap.Int("tick", int(mh.tick)), zap.Error(err))
				return
			}
@@ -466,6 +481,7 @@ func (mh *MatchHandler) QueueTerminate(graceSeconds int) bool {
		state, err := mh.core.MatchTerminate(mh.tick, mh.state, graceSeconds)
		if err != nil {
			mh.Stop()
			mh.disconnectClients()
			mh.logger.Warn("Stopping match after error from match_terminate execution", zap.Int("tick", int(mh.tick)), zap.Error(err))
			return
		}
+16 −14
Original line number Diff line number Diff line
@@ -106,6 +106,7 @@ type MatchRegistry interface {
type LocalMatchRegistry struct {
	logger          *zap.Logger
	config          Config
	sessionRegistry SessionRegistry
	tracker         Tracker
	router          MessageRouter
	metrics         *Metrics
@@ -119,7 +120,7 @@ type LocalMatchRegistry struct {
	stoppedCh chan struct{}
}

func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, tracker Tracker, router MessageRouter, metrics *Metrics, node string) MatchRegistry {
func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, sessionRegistry SessionRegistry, tracker Tracker, router MessageRouter, metrics *Metrics, node string) MatchRegistry {
	mapping := bleve.NewIndexMapping()
	mapping.DefaultAnalyzer = keyword.Name

@@ -131,6 +132,7 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, tra
	return &LocalMatchRegistry{
		logger:          logger,
		config:          config,
		sessionRegistry: sessionRegistry,
		tracker:         tracker,
		router:          router,
		metrics:         metrics,
@@ -177,7 +179,7 @@ func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, core Run
		return nil, errors.New("shutdown in progress")
	}

	match, err := NewMatchHandler(logger, r.config, r, r.router, core, id, r.node, stopped, params)
	match, err := NewMatchHandler(logger, r.config, r.sessionRegistry, r, r.router, core, id, r.node, stopped, params)
	if err != nil {
		return nil, err
	}