Commit 1adf2b71 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve match label update batching semantics.

parent 98c42189
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Build with Go 1.16.0 release.
- Do not import Steam friends by default on Steam authentication.
- Do not import Facebook friends by default on Facebook authentication.
- Improve match label update batching semantics.

### Fixed
- Fix an issue in the js runtime that would prevent the matchmaker matched callback to function correctly.
+17 −12
Original line number Diff line number Diff line
@@ -242,6 +242,9 @@ func CheckConfig(logger *zap.Logger, config Config) map[string]string {
	if config.GetMatch().MaxEmptySec < 0 {
		logger.Fatal("Match max idle seconds must be >= 0", zap.Int("match.max_empty_sec", config.GetMatch().MaxEmptySec))
	}
	if config.GetMatch().LabelUpdateIntervalMs < 1 {
		logger.Fatal("Match label update interval milliseconds must be > 0", zap.Int("match.label_update_interval_ms", config.GetMatch().LabelUpdateIntervalMs))
	}
	if config.GetTracker().EventQueueSize < 1 {
		logger.Fatal("Tracker presence event queue size must be >= 1", zap.Int("tracker.event_queue_size", config.GetTracker().EventQueueSize))
	}
@@ -791,6 +794,7 @@ type MatchConfig struct {
	DeferredQueueSize     int `yaml:"deferred_queue_size" json:"deferred_queue_size" usage:"Size of the authoritative match buffer that holds deferred message broadcasts until the end of each loop execution. Default 128."`
	JoinMarkerDeadlineMs  int `yaml:"join_marker_deadline_ms" json:"join_marker_deadline_ms" usage:"Deadline in milliseconds that client authoritative match joins will wait for match handlers to acknowledge joins. Default 15000."`
	MaxEmptySec           int `yaml:"max_empty_sec" json:"max_empty_sec" usage:"Maximum number of consecutive seconds that authoritative matches are allowed to be empty before they are stopped. 0 indicates no maximum. Default 0."`
	LabelUpdateIntervalMs int `yaml:"label_update_interval_ms" json:"label_update_interval_ms" usage:"Time in milliseconds between match label update batch processes. Default 1000."`
}

// NewMatchConfig creates a new MatchConfig struct.
@@ -802,6 +806,7 @@ func NewMatchConfig() *MatchConfig {
		DeferredQueueSize:     128,
		JoinMarkerDeadlineMs:  15000,
		MaxEmptySec:           0,
		LabelUpdateIntervalMs: 1000,
	}
}

+8 −0
Original line number Diff line number Diff line
@@ -226,6 +226,14 @@ func (mh *MatchHandler) Label() string {
	return mh.Core.Label()
}

func (mh *MatchHandler) TickRate() int {
	return mh.Core.TickRate()
}

func (mh *MatchHandler) HandlerName() string {
	return mh.Core.HandlerName()
}

func (mh *MatchHandler) queueCall(f func(*MatchHandler)) bool {
	if mh.stopped.Load() {
		return false
+76 −6
Original line number Diff line number Diff line
@@ -131,10 +131,16 @@ type LocalMatchRegistry struct {
	metrics         *Metrics
	node            string

	ctx         context.Context
	ctxCancelFn context.CancelFunc

	matches    *sync.Map
	matchCount *atomic.Int64
	index      bleve.Index

	pendingUpdatesMutex *sync.Mutex
	pendingUpdates      map[string]*MatchIndexEntry

	stopped   *atomic.Bool
	stoppedCh chan struct{}
}
@@ -148,7 +154,9 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, ses
		startupLogger.Fatal("Failed to create match registry index", zap.Error(err))
	}

	return &LocalMatchRegistry{
	ctx, ctxCancelFn := context.WithCancel(context.Background())

	r := &LocalMatchRegistry{
		logger:          logger,
		config:          config,
		sessionRegistry: sessionRegistry,
@@ -157,13 +165,60 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, ses
		metrics:         metrics,
		node:            node,

		ctx:         ctx,
		ctxCancelFn: ctxCancelFn,

		matches:    &sync.Map{},
		matchCount: atomic.NewInt64(0),
		index:      index,

		pendingUpdatesMutex: &sync.Mutex{},
		pendingUpdates:      make(map[string]*MatchIndexEntry, 10),

		stopped:   atomic.NewBool(false),
		stoppedCh: make(chan struct{}, 2),
	}

	go func() {
		ticker := time.NewTicker(time.Duration(config.GetMatch().LabelUpdateIntervalMs) * time.Millisecond)
		for {
			select {
			case <-ctx.Done():
				ticker.Stop()
				return
			case <-ticker.C:
				r.processLabelUpdates()
			}
		}
	}()

	return r
}

func (r *LocalMatchRegistry) processLabelUpdates() {
	r.pendingUpdatesMutex.Lock()
	if len(r.pendingUpdates) == 0 {
		r.pendingUpdatesMutex.Unlock()
		return
	}
	pendingUpdates := r.pendingUpdates
	r.pendingUpdates = make(map[string]*MatchIndexEntry, len(pendingUpdates)+10)
	r.pendingUpdatesMutex.Unlock()

	batch := r.index.NewBatch()
	for id, op := range pendingUpdates {
		if op == nil {
			batch.Delete(id)
			continue
		}
		if err := batch.Index(id, op); err != nil {
			r.logger.Error("error indexing match label update", zap.Error(err))
		}
	}

	if err := r.index.Batch(batch); err != nil {
		r.logger.Error("error processing match label updates", zap.Error(err))
	}
}

func (r *LocalMatchRegistry) CreateMatch(ctx context.Context, logger *zap.Logger, createFn RuntimeMatchCreateFunction, module string, params map[string]interface{}) (string, error) {
@@ -261,13 +316,16 @@ func (r *LocalMatchRegistry) RemoveMatch(id uuid.UUID, stream PresenceStream) {
	r.metrics.GaugeAuthoritativeMatches(float64(matchesRemaining))

	r.tracker.UntrackByStream(stream)
	if err := r.index.Delete(fmt.Sprintf("%v.%v", id.String(), r.node)); err != nil {
		r.logger.Warn("Error removing match list index", zap.String("id", fmt.Sprintf("%v.%v", id.String(), r.node)), zap.Error(err))
	}

	idStr := fmt.Sprintf("%v.%v", id.String(), r.node)
	r.pendingUpdatesMutex.Lock()
	r.pendingUpdates[idStr] = nil
	r.pendingUpdatesMutex.Unlock()

	// If there are no more matches in this registry and a shutdown was initiated then signal
	// that the process is complete.
	if matchesRemaining == 0 && r.stopped.Load() {
		r.ctxCancelFn()
		select {
		case r.stoppedCh <- struct{}{}:
		default:
@@ -283,14 +341,22 @@ func (r *LocalMatchRegistry) UpdateMatchLabel(id uuid.UUID, tickRate int, handle
	var labelJSON map[string]interface{}
	// Doesn't matter if this is not JSON.
	_ = json.Unmarshal([]byte(label), &labelJSON)
	return r.index.Index(fmt.Sprintf("%v.%v", id.String(), r.node), &MatchIndexEntry{

	idStr := fmt.Sprintf("%v.%v", id.String(), r.node)
	entry := &MatchIndexEntry{
		Node:        r.node,
		Label:       labelJSON,
		TickRate:    tickRate,
		HandlerName: handlerName,
		LabelString: label,
		CreateTime:  createTime,
	})
	}

	r.pendingUpdatesMutex.Lock()
	r.pendingUpdates[idStr] = entry
	r.pendingUpdatesMutex.Unlock()

	return nil
}

func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authoritative *wrappers.BoolValue, label *wrappers.StringValue, minSize *wrappers.Int32Value, maxSize *wrappers.Int32Value, queryString *wrappers.StringValue) ([]*api.Match, error) {
@@ -506,6 +572,9 @@ func (r *LocalMatchRegistry) Stop(graceSeconds int) chan struct{} {

	// Graceful shutdown not allowed/required, or grace period has expired.
	if graceSeconds == 0 {
		// If grace period is 0 stop match label processing immediately.
		r.ctxCancelFn()

		r.matches.Range(func(id, mh interface{}) bool {
			mh.(*MatchHandler).Stop()
			return true
@@ -529,6 +598,7 @@ func (r *LocalMatchRegistry) Stop(graceSeconds int) chan struct{} {

	if !anyRunning {
		// Termination was triggered and there are no active matches.
		r.ctxCancelFn()
		select {
		case r.stoppedCh <- struct{}{}:
		default:
+1 −0
Original line number Diff line number Diff line
@@ -258,6 +258,7 @@ type RuntimeMatchCore interface {
	MatchTerminate(tick int64, state interface{}, graceSeconds int) (interface{}, error)
	GetState(state interface{}) (string, error)
	Label() string
	TickRate() int
	HandlerName() string
	Cancel()
}
Loading