Commit 9807cc45 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Refactor matchmaker interface elements.

parent 9915aece
Loading
Loading
Loading
Loading
+354 −43
Original line number Diff line number Diff line
@@ -103,6 +103,26 @@ type MatchmakerIndex struct {
	SessionID         string              `json:"-"`
	Intervals         int                 `json:"-"`
	SessionIDs        map[string]struct{} `json:"-"`
	Node              string              `json:"-"`
	StringProperties  map[string]string   `json:"-"`
	NumericProperties map[string]float64  `json:"-"`
}

type MatchmakerExtract struct {
	Presences         []*MatchmakerPresence
	SessionID         string
	PartyId           string
	Query             string
	MinCount          int
	MaxCount          int
	CountMultiple     int
	StringProperties  map[string]string
	NumericProperties map[string]float64
	Ticket            string
	Count             int
	Intervals         int
	CreatedAt         int64
	Node              string
}

type MatchmakerIndexGroup struct {
@@ -148,12 +168,19 @@ func groupIndexes(indexes []*MatchmakerIndex, required int) []*MatchmakerIndexGr
}

type Matchmaker interface {
	Pause()
	Resume()
	Stop()
	Add(presences []*MatchmakerPresence, sessionID, partyId, query string, minCount, maxCount, countMultiple int, stringProperties map[string]string, numericProperties map[string]float64) (string, error)
	OnMatchedEntries(fn func(entries [][]*MatchmakerEntry))
	Add(presences []*MatchmakerPresence, sessionID, partyId, query string, minCount, maxCount, countMultiple int, stringProperties map[string]string, numericProperties map[string]float64) (string, int64, error)
	Insert(extracts []*MatchmakerExtract) error
	Extract() []*MatchmakerExtract
	RemoveSession(sessionID, ticket string) error
	RemoveSessionAll(sessionID string) error
	RemoveParty(partyID, ticket string) error
	RemovePartyAll(partyID string) error
	RemoveAll(node string)
	Remove(tickets []string)
}

type LocalMatchmaker struct {
@@ -164,10 +191,13 @@ type LocalMatchmaker struct {
	router  MessageRouter
	runtime *Runtime

	active      *atomic.Uint32
	stopped     *atomic.Bool
	ctx         context.Context
	ctxCancelFn context.CancelFunc

	matchedEntriesFn func([][]*MatchmakerEntry)
	batch            *index.Batch
	indexWriter      *bluge.Writer
	sessionTickets   map[string]map[string]struct{}
	partyTickets     map[string]map[string]struct{}
@@ -192,10 +222,12 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router
		router:  router,
		runtime: runtime,

		active:      atomic.NewUint32(1),
		stopped:     atomic.NewBool(false),
		ctx:         ctx,
		ctxCancelFn: ctxCancelFn,

		batch:          bluge.NewBatch(),
		indexWriter:    indexWriter,
		sessionTickets: make(map[string]map[string]struct{}),
		partyTickets:   make(map[string]map[string]struct{}),
@@ -206,13 +238,12 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router

	go func() {
		ticker := time.NewTicker(time.Duration(config.GetMatchmaker().IntervalSec) * time.Second)
		batch := bluge.NewBatch()
		for {
			select {
			case <-ctx.Done():
				return
			case <-ticker.C:
				m.process(batch)
				m.Process()
			}
		}
	}()
@@ -220,12 +251,24 @@ func NewLocalMatchmaker(logger, startupLogger *zap.Logger, config Config, router
	return m
}

func (m *LocalMatchmaker) Pause() {
	m.active.Store(0)
}

func (m *LocalMatchmaker) Resume() {
	m.active.Store(1)
}

func (m *LocalMatchmaker) Stop() {
	m.stopped.Store(true)
	m.ctxCancelFn()
}

func (m *LocalMatchmaker) process(batch *index.Batch) {
func (m *LocalMatchmaker) OnMatchedEntries(fn func(entries [][]*MatchmakerEntry)) {
	m.matchedEntriesFn = fn
}

func (m *LocalMatchmaker) Process() {
	matchedEntries := make([][]*MatchmakerEntry, 0, 5)

	m.Lock()
@@ -245,6 +288,10 @@ func (m *LocalMatchmaker) process(batch *index.Batch) {
			delete(m.activeIndexes, ticket)
		}

		if m.active.Load() != 1 {
			continue
		}

		indexQuery := bluge.NewBooleanQuery()
		// Results must match the query string.
		parsedIndexQuery, err := ParseQueryString(index.Query)
@@ -494,7 +541,7 @@ func (m *LocalMatchmaker) process(batch *index.Batch) {
				ticketsToDelete := make(map[string]struct{}, len(currentMatchedEntries))
				for _, entry := range currentMatchedEntries {
					if _, ok := ticketsToDelete[entry.Ticket]; !ok {
						batch.Delete(bluge.Identifier(entry.Ticket))
						m.batch.Delete(bluge.Identifier(entry.Ticket))
						ticketsToDelete[entry.Ticket] = struct{}{}
					}
					delete(m.entries, entry.Ticket)
@@ -517,10 +564,10 @@ func (m *LocalMatchmaker) process(batch *index.Batch) {
						}
					}
				}
				if err := m.indexWriter.Batch(batch); err != nil {
				if err := m.indexWriter.Batch(m.batch); err != nil {
					m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err))
				}
				batch.Reset()
				m.batch.Reset()

				break
			}
@@ -585,21 +632,24 @@ func (m *LocalMatchmaker) process(batch *index.Batch) {
			m.router.SendToPresenceIDs(m.logger, []*PresenceID{{Node: entry.Presence.Node, SessionID: entry.Presence.SessionID}}, outgoing, true)
		}
	}
	if m.matchedEntriesFn != nil && len(matchedEntries) > 0 {
		m.matchedEntriesFn(matchedEntries)
	}
}

func (m *LocalMatchmaker) Add(presences []*MatchmakerPresence, sessionID, partyId, query string, minCount, maxCount, countMultiple int, stringProperties map[string]string, numericProperties map[string]float64) (string, error) {
func (m *LocalMatchmaker) Add(presences []*MatchmakerPresence, sessionID, partyId, query string, minCount, maxCount, countMultiple int, stringProperties map[string]string, numericProperties map[string]float64) (string, int64, error) {
	// Check if the matchmaker has been stopped.
	if m.stopped.Load() {
		return "", runtime.ErrMatchmakerNotAvailable
		return "", 0, runtime.ErrMatchmakerNotAvailable
	}

	parsedQuery, err := ParseQueryString(query)
	if err != nil {
		return "", runtime.ErrMatchmakerQueryInvalid
		return "", 0, runtime.ErrMatchmakerQueryInvalid
	}
	if parsedQuery, ok := parsedQuery.(ValidatableQuery); ok {
		if parsedQuery.Validate() != nil {
			return "", runtime.ErrMatchmakerQueryInvalid
			return "", 0, runtime.ErrMatchmakerQueryInvalid
		}
	}

@@ -617,18 +667,19 @@ func (m *LocalMatchmaker) Add(presences []*MatchmakerPresence, sessionID, partyI
	sessionIDs := make(map[string]struct{}, len(presences))
	for _, presence := range presences {
		if _, found := sessionIDs[presence.SessionId]; found {
			return "", runtime.ErrMatchmakerDuplicateSession
			return "", 0, runtime.ErrMatchmakerDuplicateSession
		}
		sessionIDs[presence.SessionId] = struct{}{}
	}
	// Prepare index data.
	createdAt := time.Now().UTC().UnixNano()
	index := &MatchmakerIndex{
		Ticket:     ticket,
		Properties: properties,
		MinCount:   minCount,
		MaxCount:   maxCount,
		PartyId:    partyId,
		CreatedAt:  time.Now().UTC().UnixNano(),
		CreatedAt:  createdAt,

		Query:             query,
		Count:             len(presences),
@@ -636,6 +687,9 @@ func (m *LocalMatchmaker) Add(presences []*MatchmakerPresence, sessionID, partyI
		SessionID:         sessionID,
		Intervals:         0,
		SessionIDs:        sessionIDs,
		Node:              m.node,
		StringProperties:  stringProperties,
		NumericProperties: numericProperties,
	}

	m.Lock()
@@ -644,14 +698,14 @@ func (m *LocalMatchmaker) Add(presences []*MatchmakerPresence, sessionID, partyI
	for _, presence := range presences {
		if existingTickets := m.sessionTickets[presence.SessionId]; len(existingTickets) >= m.config.GetMatchmaker().MaxTickets {
			m.Unlock()
			return "", runtime.ErrMatchmakerTooManyTickets
			return "", 0, runtime.ErrMatchmakerTooManyTickets
		}
	}
	// Check if party is allowed to create more tickets.
	if partyId != "" {
		if existingTickets := m.partyTickets[partyId]; len(existingTickets) >= m.config.GetMatchmaker().MaxTickets {
			m.Unlock()
			return "", runtime.ErrMatchmakerTooManyTickets
			return "", 0, runtime.ErrMatchmakerTooManyTickets
		}
	}

@@ -659,13 +713,13 @@ func (m *LocalMatchmaker) Add(presences []*MatchmakerPresence, sessionID, partyI
	if err != nil {
		m.Unlock()
		m.logger.Error("error mapping matchmaker index document", zap.Error(err))
		return "", runtime.ErrMatchmakerIndex
		return "", 0, runtime.ErrMatchmakerIndex
	}

	if err := m.indexWriter.Update(bluge.Identifier(ticket), matchmakerIndexDoc); err != nil {
		m.Unlock()
		m.logger.Error("error indexing matchmaker entries", zap.Error(err))
		return "", runtime.ErrMatchmakerIndex
		return "", 0, runtime.ErrMatchmakerIndex
	}

	entries := make([]*MatchmakerEntry, 0, len(presences))
@@ -696,7 +750,161 @@ func (m *LocalMatchmaker) Add(presences []*MatchmakerPresence, sessionID, partyI
	m.activeIndexes[ticket] = index

	m.Unlock()
	return ticket, nil
	return ticket, createdAt, nil
}

func (m *LocalMatchmaker) Insert(extracts []*MatchmakerExtract) error {
	if m.stopped.Load() {
		return nil
	}
	if len(extracts) == 0 {
		return nil
	}

	batch := bluge.NewBatch()
	indexes := make(map[string]*MatchmakerIndex, len(extracts))
	entries := make(map[string][]*MatchmakerEntry, len(extracts))

	for _, extract := range extracts {
		properties := make(map[string]interface{}, len(extract.StringProperties)+len(extract.NumericProperties))
		for k, v := range extract.StringProperties {
			properties[k] = v
		}
		for k, v := range extract.NumericProperties {
			properties[k] = v
		}

		sessionIDs := make(map[string]struct{}, len(extract.Presences))
		for _, presence := range extract.Presences {
			if _, found := sessionIDs[presence.SessionId]; found {
				return runtime.ErrMatchmakerDuplicateSession
			}
			sessionIDs[presence.SessionId] = struct{}{}
		}

		index := &MatchmakerIndex{
			Ticket:     extract.Ticket,
			Properties: properties,
			MinCount:   extract.MinCount,
			MaxCount:   extract.MaxCount,
			PartyId:    extract.PartyId,
			CreatedAt:  extract.CreatedAt,

			Query:             extract.Query,
			Count:             len(extract.Presences),
			CountMultiple:     extract.CountMultiple,
			SessionID:         extract.SessionID,
			Intervals:         extract.Intervals,
			SessionIDs:        sessionIDs,
			Node:              extract.Node,
			StringProperties:  extract.StringProperties,
			NumericProperties: extract.NumericProperties,
		}

		matchmakerIndexDoc, err := MapMatchmakerIndex(extract.Ticket, index)
		if err != nil {
			m.Unlock()
			m.logger.Error("error mapping matchmaker index document", zap.Error(err))
			return runtime.ErrMatchmakerIndex
		}

		batch.Insert(matchmakerIndexDoc)

		extractEntries := make([]*MatchmakerEntry, 0, len(extract.Presences))
		for _, presence := range extract.Presences {
			extractEntries = append(extractEntries, &MatchmakerEntry{
				Ticket:            extract.Ticket,
				Presence:          presence,
				Properties:        properties,
				PartyId:           extract.PartyId,
				StringProperties:  extract.StringProperties,
				NumericProperties: extract.NumericProperties,
			})
		}
		entries[extract.Ticket] = extractEntries
		indexes[extract.Ticket] = index
	}

	m.Lock()

	if err := m.indexWriter.Batch(batch); err != nil {
		m.Unlock()
		m.logger.Error("error indexing matchmaker entries", zap.Error(err))
		return runtime.ErrMatchmakerIndex
	}
	for ticket, index := range indexes {
		m.indexes[ticket] = index
		if index.Intervals < m.config.GetMatchmaker().MaxIntervals {
			m.activeIndexes[ticket] = index
		}
		if index.PartyId != "" {
			if _, ok := m.partyTickets[index.PartyId]; ok {
				m.partyTickets[index.PartyId][ticket] = struct{}{}
			} else {
				m.partyTickets[index.PartyId] = map[string]struct{}{ticket: {}}
			}
		}
	}
	for ticket, ticketEntries := range entries {
		m.entries[ticket] = ticketEntries
		for _, entry := range ticketEntries {
			if _, ok := m.sessionTickets[entry.Presence.SessionId]; ok {
				m.sessionTickets[entry.Presence.SessionId][ticket] = struct{}{}
			} else {
				m.sessionTickets[entry.Presence.SessionId] = map[string]struct{}{ticket: {}}
			}
		}
	}

	m.Unlock()

	return nil
}

func (m *LocalMatchmaker) Extract() []*MatchmakerExtract {
	if m.stopped.Load() {
		return nil
	}

	extracts := make([]*MatchmakerExtract, 0, 100)
	m.Lock()

	for ticket, index := range m.indexes {
		if index.Node != m.node {
			continue
		}
		entries, ok := m.entries[ticket]
		if !ok {
			m.logger.Warn("matchmaker extract found ticket with no entries", zap.String("ticket", ticket))
			continue
		}

		extract := &MatchmakerExtract{
			Presences:         make([]*MatchmakerPresence, 0, len(entries)),
			SessionID:         index.SessionID,
			PartyId:           index.PartyId,
			Query:             index.Query,
			MinCount:          index.MinCount,
			MaxCount:          index.MaxCount,
			CountMultiple:     index.CountMultiple,
			StringProperties:  index.StringProperties,
			NumericProperties: index.NumericProperties,
			Ticket:            ticket,
			Count:             index.Count,
			Intervals:         index.Intervals,
			CreatedAt:         index.CreatedAt,
			Node:              index.Node,
		}
		for _, entry := range entries {
			extract.Presences = append(extract.Presences, entry.Presence)
		}

		extracts = append(extracts, extract)
	}

	m.Unlock()

	return extracts
}

func (m *LocalMatchmaker) RemoveSession(sessionID, ticket string) error {
@@ -712,7 +920,7 @@ func (m *LocalMatchmaker) RemoveSession(sessionID, ticket string) error {

	entries, ok := m.entries[ticket]
	if !ok {
		m.logger.Warn("matchmaker remove found ticket with no entries", zap.String("ticket", ticket))
		m.logger.Warn("matchmaker remove session found ticket with no entries", zap.String("ticket", ticket))
	}
	delete(m.entries, ticket)

@@ -767,7 +975,7 @@ func (m *LocalMatchmaker) RemoveSessionAll(sessionID string) error {
		index, ok := m.indexes[ticket]
		if !ok {
			// Ticket did not exist, should not happen.
			m.logger.Warn("matchmaker remove all found ticket with no index", zap.String("ticket", ticket))
			m.logger.Warn("matchmaker remove session all found ticket with no index", zap.String("ticket", ticket))
			continue
		}
		delete(m.indexes, ticket)
@@ -776,7 +984,7 @@ func (m *LocalMatchmaker) RemoveSessionAll(sessionID string) error {

		entries, ok := m.entries[ticket]
		if !ok {
			m.logger.Warn("matchmaker remove all found ticket with no entries", zap.String("ticket", ticket))
			m.logger.Warn("matchmaker remove session all found ticket with no entries", zap.String("ticket", ticket))
		}
		delete(m.entries, ticket)

@@ -827,7 +1035,7 @@ func (m *LocalMatchmaker) RemoveParty(partyID, ticket string) error {

	entries, ok := m.entries[ticket]
	if !ok {
		m.logger.Warn("matchmaker remove found ticket with no entries", zap.String("ticket", ticket))
		m.logger.Warn("matchmaker remove party found ticket with no entries", zap.String("ticket", ticket))
	}
	delete(m.entries, ticket)

@@ -880,7 +1088,7 @@ func (m *LocalMatchmaker) RemovePartyAll(partyID string) error {
		_, ok := m.indexes[ticket]
		if !ok {
			// Ticket did not exist, should not happen.
			m.logger.Warn("matchmaker remove all found ticket with no index", zap.String("ticket", ticket))
			m.logger.Warn("matchmaker remove party all found ticket with no index", zap.String("ticket", ticket))
			continue
		}
		delete(m.indexes, ticket)
@@ -889,7 +1097,7 @@ func (m *LocalMatchmaker) RemovePartyAll(partyID string) error {

		entries, ok := m.entries[ticket]
		if !ok {
			m.logger.Warn("matchmaker remove all found ticket with no entries", zap.String("ticket", ticket))
			m.logger.Warn("matchmaker remove party all found ticket with no entries", zap.String("ticket", ticket))
		}
		delete(m.entries, ticket)

@@ -913,6 +1121,109 @@ func (m *LocalMatchmaker) RemovePartyAll(partyID string) error {
	return nil
}

func (m *LocalMatchmaker) RemoveAll(node string) {
	batch := bluge.NewBatch()

	m.Lock()

	for ticket, index := range m.indexes {
		if index.Node != node {
			continue
		}

		batch.Delete(bluge.Identifier(ticket))

		delete(m.indexes, ticket)

		delete(m.activeIndexes, ticket)

		if index.PartyId != "" {
			partyTickets, ok := m.partyTickets[index.PartyId]
			if ok {
				if len(partyTickets) <= 1 {
					delete(m.partyTickets, index.PartyId)
				} else {
					delete(partyTickets, ticket)
				}
			}
		}

		entries, ok := m.entries[ticket]
		if !ok {
			m.logger.Warn("matchmaker remove all found ticket with no entries", zap.String("ticket", ticket))
		}
		delete(m.entries, ticket)

		for _, entry := range entries {
			if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok {
				if l := len(sessionTickets); l <= 1 {
					delete(m.sessionTickets, entry.Presence.SessionId)
				} else {
					delete(sessionTickets, ticket)
				}
			}
		}
	}

	err := m.indexWriter.Batch(batch)
	m.Unlock()
	if err != nil {
		m.logger.Error("error deleting matchmaker entries batch", zap.Error(err))
	}
}

func (m *LocalMatchmaker) Remove(tickets []string) {
	batch := bluge.NewBatch()

	m.Lock()

	for _, ticket := range tickets {
		index, found := m.indexes[ticket]
		if !found {
			continue
		}

		batch.Delete(bluge.Identifier(ticket))

		delete(m.indexes, ticket)

		delete(m.activeIndexes, ticket)

		if index.PartyId != "" {
			partyTickets, ok := m.partyTickets[index.PartyId]
			if ok {
				if len(partyTickets) <= 1 {
					delete(m.partyTickets, index.PartyId)
				} else {
					delete(partyTickets, ticket)
				}
			}
		}

		entries, ok := m.entries[ticket]
		if !ok {
			m.logger.Warn("matchmaker remove all found ticket with no entries", zap.String("ticket", ticket))
		}
		delete(m.entries, ticket)

		for _, entry := range entries {
			if sessionTickets, ok := m.sessionTickets[entry.Presence.SessionId]; ok {
				if l := len(sessionTickets); l <= 1 {
					delete(m.sessionTickets, entry.Presence.SessionId)
				} else {
					delete(sessionTickets, ticket)
				}
			}
		}
	}

	err := m.indexWriter.Batch(batch)
	m.Unlock()
	if err != nil {
		m.logger.Error("error deleting matchmaker entries batch", zap.Error(err))
	}
}

func MapMatchmakerIndex(id string, in *MatchmakerIndex) (*bluge.Document, error) {
	rv := bluge.NewDocument(id)

+1 −1
Original line number Diff line number Diff line
@@ -542,7 +542,7 @@ func (p *PartyHandler) MatchmakerAdd(sessionID, node, query string, minCount, ma

	p.RUnlock()

	ticket, err := p.matchmaker.Add(presences, "", p.IDStr, query, minCount, maxCount, countMultiple, stringProperties, numericProperties)
	ticket, _, err := p.matchmaker.Add(presences, "", p.IDStr, query, minCount, maxCount, countMultiple, stringProperties, numericProperties)
	if err != nil {
		return "", nil, err
	}
+1 −1
Original line number Diff line number Diff line
@@ -84,7 +84,7 @@ func (p *Pipeline) matchmakerAdd(logger *zap.Logger, session Session, envelope *
	}}

	// Run matchmaker add.
	ticket, err := p.matchmaker.Add(presences, session.ID().String(), "", query, minCount, maxCount, countMultiple, incoming.StringProperties, incoming.NumericProperties)
	ticket, _, err := p.matchmaker.Add(presences, session.ID().String(), "", query, minCount, maxCount, countMultiple, incoming.StringProperties, incoming.NumericProperties)
	if err != nil {
		logger.Error("Error adding to matchmaker", zap.Error(err))
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{