From 2072012d8815a31cbeaa56e76ad07d1f049a4b22 Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Fri, 14 Apr 2023 14:53:33 +0100 Subject: [PATCH] Add optional matchmaker function hook to override matches before proceeding. (#1015) --- CHANGELOG.md | 1 + go.mod | 2 +- go.sum | 13 +- server/matchmaker.go | 313 +-------- server/matchmaker_process.go | 612 ++++++++++++++++++ server/runtime.go | 20 +- server/runtime_go.go | 37 +- .../nakama-common/runtime/runtime.go | 3 + vendor/modules.txt | 2 +- 9 files changed, 677 insertions(+), 326 deletions(-) create mode 100644 server/matchmaker_process.go diff --git a/CHANGELOG.md b/CHANGELOG.md index f804757f5..8c237a9a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - Add tournament record delete runtime functions and API. - Add insecure flag to runtime http functions to optionally skip TLS checks. - Add Satori API integration to all runtimes. +- Add optional matchmaker function hook to override matches before proceeding. ### Changed - Improve graceful shutdown of Google IAP receipt processor. diff --git a/go.mod b/go.mod index 61baff76f..04e7eae22 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.5.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 - github.com/heroiclabs/nakama-common v1.26.1-0.20230321170403-7becee7153cf + github.com/heroiclabs/nakama-common v1.26.1-0.20230414123330-4d149b942148 github.com/jackc/pgconn v1.13.0 github.com/jackc/pgerrcode v0.0.0-20220416144525-469b46aa5efa github.com/jackc/pgtype v1.12.0 diff --git a/go.sum b/go.sum index c85346bfb..9e3552af4 100644 --- a/go.sum +++ b/go.sum @@ -156,7 +156,6 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= -github.com/go-gorp/gorp/v3 v3.0.2 h1:ULqJXIekoqMx29FI5ekXXFoH1dT2Vc8UhnRzBg+Emz4= github.com/go-gorp/gorp/v3 v3.0.2/go.mod h1:BJ3q1ejpV8cVALtcXvXaXyTOlMmJhWDxTmncaR6rwBY= github.com/go-gorp/gorp/v3 v3.1.0 h1:ItKF/Vbuj31dmV4jxA1qblpSwkl9g1typ24xoe70IGs= github.com/go-gorp/gorp/v3 v3.1.0/go.mod h1:dLEjIyyRNiXvNZ8PSmzpt1GsWAUK8kjVhEpjH8TixEw= @@ -169,7 +168,6 @@ github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG github.com/go-sourcemap/sourcemap v2.1.3+incompatible h1:W1iEw64niKVGogNgBN3ePyLFfuisuzeidWPMPWmECqU= github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -288,8 +286,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= -github.com/heroiclabs/nakama-common v1.26.1-0.20230321170403-7becee7153cf h1:N+HC+zfBt3Vp0g3REss4CJIjETCNhGHBT3Pi2vDm4mw= -github.com/heroiclabs/nakama-common v1.26.1-0.20230321170403-7becee7153cf/go.mod h1:/znXrt+yd+i7WHvizD3SNeA4tTLl9PvVckHafUnLAi0= +github.com/heroiclabs/nakama-common v1.26.1-0.20230414123330-4d149b942148 h1:KQySe+ikCDSBJV0IIGX1z3c3ZSjlcL7wSRNrVG2+IbY= +github.com/heroiclabs/nakama-common v1.26.1-0.20230414123330-4d149b942148/go.mod h1:/znXrt+yd+i7WHvizD3SNeA4tTLl9PvVckHafUnLAi0= github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -370,7 +368,6 @@ github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -384,7 +381,6 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.10.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -405,7 +401,6 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-oci8 v0.1.1/go.mod h1:wjDx6Xm9q7dFtHJvIlrI99JytznLw5wQ4R+9mNXJwGI= github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-sqlite3 v1.11.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= -github.com/mattn/go-sqlite3 v1.14.14 h1:qZgc/Rwetq+MtyE18WhzjokPD93dNqLGNT3QJuLvBGw= github.com/mattn/go-sqlite3 v1.14.14/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= @@ -444,7 +439,6 @@ github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= -github.com/poy/onpar v0.0.0-20190519213022-ee068f8ea4d1 h1:oL4IBbcqwhhNWh31bjOX8C/OCy0zs9906d/VUru+bqg= github.com/poy/onpar v0.0.0-20190519213022-ee068f8ea4d1/go.mod h1:nSbFQvMj97ZyhFRSJYtut+msi4sOY6zJDGCdSc+/rZU= github.com/poy/onpar v1.1.2 h1:QaNrNiZx0+Nar5dLgTVp5mXkyoVFIbepjyEoGSnhbAY= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -533,7 +527,6 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= -github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= @@ -751,8 +744,6 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d h1:Zu/JngovGLVi6t2J3nmAf3AoTDwuzw85YZ3b9o4yU7s= -golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43 h1:OK7RB6t2WQX54srQQYSXMW8dF5C6/8+oA/s5QBmmto4= golang.org/x/sys v0.0.0-20221013171732-95e765b1cc43/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/server/matchmaker.go b/server/matchmaker.go index bd34c6a77..32eb08da0 100644 --- a/server/matchmaker.go +++ b/server/matchmaker.go @@ -17,8 +17,6 @@ package server import ( "context" "fmt" - "math" - "sort" "sync" "time" @@ -282,16 +280,12 @@ func (m *LocalMatchmaker) OnMatchedEntries(fn func(entries [][]*MatchmakerEntry) } func (m *LocalMatchmaker) Process() { - matchedEntries := make([][]*MatchmakerEntry, 0, 5) - startTime := time.Now() var activeIndexCount, indexCount int defer func() { m.metrics.Matchmaker(float64(indexCount), float64(activeIndexCount), time.Now().Sub(startTime)) }() - expiredActiveIndexes := make([]string, 0, 10) - m.Lock() activeIndexCount = len(m.activeIndexes) @@ -314,306 +308,13 @@ func (m *LocalMatchmaker) Process() { m.Unlock() - var threshold bool - var timer *time.Timer - if m.active.Load() == 1 && m.revThresholdFn != nil { - timer = m.revThresholdFn() - defer timer.Stop() - } - - selectedTickets := make(map[string]struct{}, activeIndexCount*2) - for ticket, activeIndex := range activeIndexesCopy { - if !threshold && timer != nil { - select { - case <-timer.C: - threshold = true - default: - } - } - - // This ticket may already have found a match in a previous iteration. - if _, found := selectedTickets[activeIndex.Ticket]; found { - continue - } - - activeIndex.Intervals++ - lastInterval := activeIndex.Intervals >= m.config.GetMatchmaker().MaxIntervals || activeIndex.MinCount == activeIndex.MaxCount - if lastInterval { - // Drop from active indexes if it has reached its max intervals, or if its min/max counts are equal. In the - // latter case keeping it active would have the same result as leaving it in the pool, so this saves work. - expiredActiveIndexes = append(expiredActiveIndexes, ticket) - } - - if m.active.Load() != 1 { - continue - } - - indexQuery := bluge.NewBooleanQuery() - - // Results must match the query string. - indexQuery.AddMust(activeIndex.ParsedQuery) - - // Results must also have compatible min/max ranges, for example 2-4 must not match with 6-8. - minCountRange := bluge.NewNumericRangeInclusiveQuery( - float64(activeIndex.MinCount), math.Inf(1), true, true). - SetField("min_count") - indexQuery.AddMust(minCountRange) - maxCountRange := bluge.NewNumericRangeInclusiveQuery( - math.Inf(-1), float64(activeIndex.MaxCount), true, true). - SetField("max_count") - indexQuery.AddMust(maxCountRange) - - // Results must not include the current party, if any. - if activeIndex.PartyId != "" { - partyIdQuery := bluge.NewTermQuery(activeIndex.PartyId) - partyIdQuery.SetField("party_id") - indexQuery.AddMustNot(partyIdQuery) - } - - searchRequest := bluge.NewTopNSearch(indexCount, indexQuery) - // Sort results to try and select the best match, or if the - // matches are equivalent, the longest waiting tickets first. - searchRequest.SortBy([]string{"-_score", "created_at"}) - - indexReader, err := m.indexWriter.Reader() - if err != nil { - m.logger.Error("error accessing index reader", zap.Error(err)) - continue - } - - result, err := indexReader.Search(m.ctx, searchRequest) - if err != nil { - _ = indexReader.Close() - m.logger.Error("error searching index", zap.Error(err)) - continue - } - - blugeMatches, err := IterateBlugeMatches(result, map[string]struct{}{}, m.logger) - if err != nil { - _ = indexReader.Close() - m.logger.Error("error iterating search results", zap.Error(err)) - continue - } - - for i := 0; i < len(blugeMatches.Hits); i++ { - hitTicket := blugeMatches.Hits[i].ID - if hitTicket == ticket { - // Remove the current ticket. - blugeMatches.Hits = append(blugeMatches.Hits[:i], blugeMatches.Hits[i+1:]...) - if len(selectedTickets) == 0 { - break - } - i-- - } else if _, found := selectedTickets[hitTicket]; found { - // Ticket has already been selected for another match during this process iteration. - blugeMatches.Hits = append(blugeMatches.Hits[:i], blugeMatches.Hits[i+1:]...) - i-- - } - } - - // Form possible combinations, in case multiple matches might be suitable. - entryCombos := make([][]*MatchmakerEntry, 0, 5) - lastHitCounter := len(blugeMatches.Hits) - 1 - for hitCounter, hit := range blugeMatches.Hits { - hitIndex, ok := indexesCopy[hit.ID] - if !ok { - // Ticket did not exist, should not happen. - m.logger.Warn("matchmaker process missing index", zap.String("ticket", hit.ID)) - continue - } - - if !threshold && m.config.GetMatchmaker().RevPrecision { - outerMutualMatch, err := validateMatch(m.ctx, m.revCache, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) - if err != nil { - m.logger.Error("error validating mutual match", zap.Error(err)) - continue - } else if !outerMutualMatch { - // This search hit is not a mutual match with the outer ticket. - continue - } - } - - if activeIndex.MaxCount < hitIndex.MaxCount && hitIndex.Intervals <= m.config.GetMatchmaker().MaxIntervals { - // This match would be less than the search hit's preferred max, and they can still wait. Let them wait more. - continue - } - - // Check if there are overlapping session IDs, and if so these tickets are ineligible to match together. - var sessionIdConflict bool - for sessionID := range activeIndex.SessionIDs { - if _, found := hitIndex.SessionIDs[sessionID]; found { - sessionIdConflict = true - break - } - } - if sessionIdConflict { - continue - } - - var foundComboIdx int - var foundCombo []*MatchmakerEntry - for entryComboIdx, entryCombo := range entryCombos { - if len(entryCombo)+len(hitIndex.Entries)+activeIndex.Count <= activeIndex.MaxCount { - // There is room in this combo for these entries. Check if there are session ID or mutual match conflicts with current combo. - var mutualMatchConflict bool - for _, entry := range entryCombo { - if _, found := hitIndex.SessionIDs[entry.Presence.SessionId]; found { - sessionIdConflict = true - break - } - if !threshold && m.config.GetMatchmaker().RevPrecision { - entryMatchesSearchHitQuery, err := validateMatch(m.ctx, m.revCache, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket) - if err != nil { - mutualMatchConflict = true - m.logger.Error("error validating mutual match", zap.Error(err)) - break - } else if !entryMatchesSearchHitQuery { - mutualMatchConflict = true - // This search hit is not a mutual match with the outer ticket. - break - } - // MatchmakerEntry does not have the query, read it out of indexes. - if entriesIndexEntry, ok := indexesCopy[entry.Ticket]; ok { - searchHitMatchesEntryQuery, err := validateMatch(m.ctx, m.revCache, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID) - if err != nil { - mutualMatchConflict = true - m.logger.Error("error validating mutual match", zap.Error(err)) - break - } else if !searchHitMatchesEntryQuery { - mutualMatchConflict = true - // This search hit is not a mutual match with the outer ticket. - break - } - } else { - m.logger.Warn("matchmaker missing index entry for entry combo") - } - } - } - if sessionIdConflict || mutualMatchConflict { - continue - } - - entryCombo = append(entryCombo, hitIndex.Entries...) - entryCombos[entryComboIdx] = entryCombo - - foundCombo = entryCombo - foundComboIdx = entryComboIdx - break - } - } - // Either processing first hit, or current hit entries combined with previous hits may tip over activeIndex.MaxCount. - if foundCombo == nil { - entryCombo := make([]*MatchmakerEntry, len(hitIndex.Entries)) - copy(entryCombo, hitIndex.Entries) - entryCombos = append(entryCombos, entryCombo) - - foundCombo = entryCombo - foundComboIdx = len(entryCombos) - 1 - } - - // The combo is considered match-worthy if either the max count has been satisfied, or ALL of these conditions are met: - // * It is the last interval for this active index. - // * The combo at least satisfies the min count. - // * The combo does not exceed the max count. - // * There are no more hits that may further fill the found combo, so we get as close as possible to the max count. - if l := len(foundCombo) + activeIndex.Count; l == activeIndex.MaxCount || (lastInterval && l >= activeIndex.MinCount && l <= activeIndex.MaxCount && hitCounter >= lastHitCounter) { - if rem := l % activeIndex.CountMultiple; rem != 0 { - // The size of the combination being considered does not satisfy the count multiple. - // Attempt to adjust the combo by removing the smallest possible number of entries. - // Prefer keeping entries that have been in the matchmaker the longest, if possible. - eligibleIndexesUniq := make(map[*MatchmakerIndex]struct{}, len(foundCombo)) - for _, e := range foundCombo { - // Only tickets individually less <= the removable size are considered. - // For example removing a party of 3 when we're only looking to remove 2 is not allowed. - if foundIndex, ok := indexesCopy[e.Ticket]; ok && foundIndex.Count <= rem { - eligibleIndexesUniq[foundIndex] = struct{}{} - } - } - - eligibleIndexes := make([]*MatchmakerIndex, 0, len(eligibleIndexesUniq)) - for idx := range eligibleIndexesUniq { - eligibleIndexes = append(eligibleIndexes, idx) - } - - eligibleGroups := groupIndexes(eligibleIndexes, rem) - if len(eligibleGroups) <= 0 { - // No possible combination to remove, unlikely but guard. - continue - } - // Sort to ensure we keep as many of the longest-waiting tickets as possible. - sort.Slice(eligibleGroups, func(i, j int) bool { - return eligibleGroups[i].avgCreatedAt < eligibleGroups[j].avgCreatedAt - }) - // The most eligible group is removed from the combo. - for _, egIndex := range eligibleGroups[0].indexes { - for i := 0; i < len(foundCombo); i++ { - if egIndex.Ticket == foundCombo[i].Ticket { - foundCombo[i] = foundCombo[len(foundCombo)-1] - foundCombo[len(foundCombo)-1] = nil - foundCombo = foundCombo[:len(foundCombo)-1] - i-- - } - } - } - - // We've removed something, update the known size of the currently considered combo. - l = len(foundCombo) + activeIndex.Count - - if l%activeIndex.CountMultiple != 0 { - // Removal was insufficient, the combo is still not valid for the required multiple. - continue - } - } - - // Check that ALL of these conditions are true for ALL matched entries: - // * The found combo size satisfies the minimum count. - // * The found combo size satisfies the maximum count. - // * The found combo size satisfies the count multiple. - // For any condition failures it does not matter which specific condition is not met. - var conditionFailed bool - for _, e := range foundCombo { - if foundIndex, ok := indexesCopy[e.Ticket]; ok && (foundIndex.MinCount > l || foundIndex.MaxCount < l || l%foundIndex.CountMultiple != 0) { - conditionFailed = true - break - } - } - if conditionFailed { - continue - } - - // Found a suitable match. - currentMatchedEntries := append(foundCombo, activeIndex.Entries...) - - // Remove the found combos from currently tracked list. - entryCombos = append(entryCombos[:foundComboIdx], entryCombos[foundComboIdx+1:]...) - - matchedEntries = append(matchedEntries, currentMatchedEntries) - - var batchSize int - batch := bluge.NewBatch() - // Mark tickets as unavailable for further use in this process iteration. - for _, currentMatchedEntry := range currentMatchedEntries { - if _, found := selectedTickets[currentMatchedEntry.Ticket]; found { - continue - } - selectedTickets[currentMatchedEntry.Ticket] = struct{}{} - batchSize++ - batch.Delete(bluge.Identifier(currentMatchedEntry.Ticket)) - } - if batchSize > 0 { - if err := m.indexWriter.Batch(batch); err != nil { - m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err)) - } - } - - break - } - } - err = indexReader.Close() - if err != nil { - m.logger.Error("error closing index reader", zap.Error(err)) - continue - } + // Run the custom matching function if one is registered in the runtime, otherwise use the default process function. + var matchedEntries [][]*MatchmakerEntry + var expiredActiveIndexes []string + if m.runtime.matchmakerOverrideFunction != nil { + matchedEntries, expiredActiveIndexes = m.processCustom(activeIndexesCopy, indexCount, indexesCopy) + } else { + matchedEntries, expiredActiveIndexes = m.processDefault(activeIndexCount, activeIndexesCopy, indexCount, indexesCopy) } m.Lock() diff --git a/server/matchmaker_process.go b/server/matchmaker_process.go new file mode 100644 index 000000000..1998fa86d --- /dev/null +++ b/server/matchmaker_process.go @@ -0,0 +1,612 @@ +// Copyright 2023 The Nakama Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "math" + "math/bits" + "sort" + "time" + + "github.com/blugelabs/bluge" + "go.uber.org/zap" +) + +func (m *LocalMatchmaker) processDefault(activeIndexCount int, activeIndexesCopy map[string]*MatchmakerIndex, indexCount int, indexesCopy map[string]*MatchmakerIndex) ([][]*MatchmakerEntry, []string) { + matchedEntries := make([][]*MatchmakerEntry, 0, 5) + expiredActiveIndexes := make([]string, 0, 10) + + var threshold bool + var timer *time.Timer + if m.active.Load() == 1 && m.revThresholdFn != nil { + timer = m.revThresholdFn() + defer timer.Stop() + } + + selectedTickets := make(map[string]struct{}, activeIndexCount*2) + for ticket, activeIndex := range activeIndexesCopy { + if !threshold && timer != nil { + select { + case <-timer.C: + threshold = true + default: + } + } + + // This ticket may already have found a match in a previous iteration. + if _, found := selectedTickets[activeIndex.Ticket]; found { + continue + } + + activeIndex.Intervals++ + lastInterval := activeIndex.Intervals >= m.config.GetMatchmaker().MaxIntervals || activeIndex.MinCount == activeIndex.MaxCount + if lastInterval { + // Drop from active indexes if it has reached its max intervals, or if its min/max counts are equal. In the + // latter case keeping it active would have the same result as leaving it in the pool, so this saves work. + expiredActiveIndexes = append(expiredActiveIndexes, ticket) + } + + if m.active.Load() != 1 { + continue + } + + indexQuery := bluge.NewBooleanQuery() + + // Results must match the query string. + indexQuery.AddMust(activeIndex.ParsedQuery) + + // Results must also have compatible min/max ranges, for example 2-4 must not match with 6-8. + minCountRange := bluge.NewNumericRangeInclusiveQuery( + float64(activeIndex.MinCount), math.Inf(1), true, true). + SetField("min_count") + indexQuery.AddMust(minCountRange) + maxCountRange := bluge.NewNumericRangeInclusiveQuery( + math.Inf(-1), float64(activeIndex.MaxCount), true, true). + SetField("max_count") + indexQuery.AddMust(maxCountRange) + + // Results must not include the current party, if any. + if activeIndex.PartyId != "" { + partyIdQuery := bluge.NewTermQuery(activeIndex.PartyId) + partyIdQuery.SetField("party_id") + indexQuery.AddMustNot(partyIdQuery) + } + + searchRequest := bluge.NewTopNSearch(indexCount, indexQuery) + // Sort results to try and select the best match, or if the + // matches are equivalent, the longest waiting tickets first. + searchRequest.SortBy([]string{"-_score", "created_at"}) + + indexReader, err := m.indexWriter.Reader() + if err != nil { + m.logger.Error("error accessing index reader", zap.Error(err)) + continue + } + + result, err := indexReader.Search(m.ctx, searchRequest) + if err != nil { + _ = indexReader.Close() + m.logger.Error("error searching index", zap.Error(err)) + continue + } + + blugeMatches, err := IterateBlugeMatches(result, map[string]struct{}{}, m.logger) + if err != nil { + _ = indexReader.Close() + m.logger.Error("error iterating search results", zap.Error(err)) + continue + } + + for i := 0; i < len(blugeMatches.Hits); i++ { + hitTicket := blugeMatches.Hits[i].ID + if hitTicket == ticket { + // Remove the current ticket. + blugeMatches.Hits = append(blugeMatches.Hits[:i], blugeMatches.Hits[i+1:]...) + if len(selectedTickets) == 0 { + break + } + i-- + } else if _, found := selectedTickets[hitTicket]; found { + // Ticket has already been selected for another match during this process iteration. + blugeMatches.Hits = append(blugeMatches.Hits[:i], blugeMatches.Hits[i+1:]...) + i-- + } + } + + // Form possible combinations, in case multiple matches might be suitable. + entryCombos := make([][]*MatchmakerEntry, 0, 5) + lastHitCounter := len(blugeMatches.Hits) - 1 + for hitCounter, hit := range blugeMatches.Hits { + hitIndex, ok := indexesCopy[hit.ID] + if !ok { + // Ticket did not exist, should not happen. + m.logger.Warn("matchmaker process missing index", zap.String("ticket", hit.ID)) + continue + } + + if !threshold && m.config.GetMatchmaker().RevPrecision { + outerMutualMatch, err := validateMatch(m.ctx, m.revCache, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) + if err != nil { + m.logger.Error("error validating mutual match", zap.Error(err)) + continue + } else if !outerMutualMatch { + // This search hit is not a mutual match with the outer ticket. + continue + } + } + + if activeIndex.MaxCount < hitIndex.MaxCount && hitIndex.Intervals <= m.config.GetMatchmaker().MaxIntervals { + // This match would be less than the search hit's preferred max, and they can still wait. Let them wait more. + continue + } + + // Check if there are overlapping session IDs, and if so these tickets are ineligible to match together. + var sessionIdConflict bool + for sessionID := range activeIndex.SessionIDs { + if _, found := hitIndex.SessionIDs[sessionID]; found { + sessionIdConflict = true + break + } + } + if sessionIdConflict { + continue + } + + var foundComboIdx int + var foundCombo []*MatchmakerEntry + for entryComboIdx, entryCombo := range entryCombos { + if len(entryCombo)+len(hitIndex.Entries)+activeIndex.Count <= activeIndex.MaxCount { + // There is room in this combo for these entries. Check if there are session ID or mutual match conflicts with current combo. + var mutualMatchConflict bool + for _, entry := range entryCombo { + if _, found := hitIndex.SessionIDs[entry.Presence.SessionId]; found { + sessionIdConflict = true + break + } + if !threshold && m.config.GetMatchmaker().RevPrecision { + entryMatchesSearchHitQuery, err := validateMatch(m.ctx, m.revCache, indexReader, hitIndex.ParsedQuery, hit.ID, entry.Ticket) + if err != nil { + mutualMatchConflict = true + m.logger.Error("error validating mutual match", zap.Error(err)) + break + } else if !entryMatchesSearchHitQuery { + mutualMatchConflict = true + // This search hit is not a mutual match with the outer ticket. + break + } + // MatchmakerEntry does not have the query, read it out of indexes. + if entriesIndexEntry, ok := indexesCopy[entry.Ticket]; ok { + searchHitMatchesEntryQuery, err := validateMatch(m.ctx, m.revCache, indexReader, entriesIndexEntry.ParsedQuery, entry.Ticket, hit.ID) + if err != nil { + mutualMatchConflict = true + m.logger.Error("error validating mutual match", zap.Error(err)) + break + } else if !searchHitMatchesEntryQuery { + mutualMatchConflict = true + // This search hit is not a mutual match with the outer ticket. + break + } + } else { + m.logger.Warn("matchmaker missing index entry for entry combo") + } + } + } + if sessionIdConflict || mutualMatchConflict { + continue + } + + entryCombo = append(entryCombo, hitIndex.Entries...) + entryCombos[entryComboIdx] = entryCombo + + foundCombo = entryCombo + foundComboIdx = entryComboIdx + break + } + } + // Either processing first hit, or current hit entries combined with previous hits may tip over activeIndex.MaxCount. + if foundCombo == nil { + entryCombo := make([]*MatchmakerEntry, len(hitIndex.Entries)) + copy(entryCombo, hitIndex.Entries) + entryCombos = append(entryCombos, entryCombo) + + foundCombo = entryCombo + foundComboIdx = len(entryCombos) - 1 + } + + // The combo is considered match-worthy if either the max count has been satisfied, or ALL of these conditions are met: + // * It is the last interval for this active index. + // * The combo at least satisfies the min count. + // * The combo does not exceed the max count. + // * There are no more hits that may further fill the found combo, so we get as close as possible to the max count. + if l := len(foundCombo) + activeIndex.Count; l == activeIndex.MaxCount || (lastInterval && l >= activeIndex.MinCount && l <= activeIndex.MaxCount && hitCounter >= lastHitCounter) { + if rem := l % activeIndex.CountMultiple; rem != 0 { + // The size of the combination being considered does not satisfy the count multiple. + // Attempt to adjust the combo by removing the smallest possible number of entries. + // Prefer keeping entries that have been in the matchmaker the longest, if possible. + eligibleIndexesUniq := make(map[*MatchmakerIndex]struct{}, len(foundCombo)) + for _, e := range foundCombo { + // Only tickets individually less <= the removable size are considered. + // For example removing a party of 3 when we're only looking to remove 2 is not allowed. + if foundIndex, ok := indexesCopy[e.Ticket]; ok && foundIndex.Count <= rem { + eligibleIndexesUniq[foundIndex] = struct{}{} + } + } + + eligibleIndexes := make([]*MatchmakerIndex, 0, len(eligibleIndexesUniq)) + for idx := range eligibleIndexesUniq { + eligibleIndexes = append(eligibleIndexes, idx) + } + + eligibleGroups := groupIndexes(eligibleIndexes, rem) + if len(eligibleGroups) <= 0 { + // No possible combination to remove, unlikely but guard. + continue + } + // Sort to ensure we keep as many of the longest-waiting tickets as possible. + sort.Slice(eligibleGroups, func(i, j int) bool { + return eligibleGroups[i].avgCreatedAt < eligibleGroups[j].avgCreatedAt + }) + // The most eligible group is removed from the combo. + for _, egIndex := range eligibleGroups[0].indexes { + for i := 0; i < len(foundCombo); i++ { + if egIndex.Ticket == foundCombo[i].Ticket { + foundCombo[i] = foundCombo[len(foundCombo)-1] + foundCombo[len(foundCombo)-1] = nil + foundCombo = foundCombo[:len(foundCombo)-1] + i-- + } + } + } + + // We've removed something, update the known size of the currently considered combo. + l = len(foundCombo) + activeIndex.Count + + if l%activeIndex.CountMultiple != 0 { + // Removal was insufficient, the combo is still not valid for the required multiple. + continue + } + } + + // Check that ALL of these conditions are true for ALL matched entries: + // * The found combo size satisfies the minimum count. + // * The found combo size satisfies the maximum count. + // * The found combo size satisfies the count multiple. + // For any condition failures it does not matter which specific condition is not met. + var conditionFailed bool + for _, e := range foundCombo { + if foundIndex, ok := indexesCopy[e.Ticket]; ok && (foundIndex.MinCount > l || foundIndex.MaxCount < l || l%foundIndex.CountMultiple != 0) { + conditionFailed = true + break + } + } + if conditionFailed { + continue + } + + // Found a suitable match. + currentMatchedEntries := append(foundCombo, activeIndex.Entries...) + + // Remove the found combos from currently tracked list. + entryCombos = append(entryCombos[:foundComboIdx], entryCombos[foundComboIdx+1:]...) + + matchedEntries = append(matchedEntries, currentMatchedEntries) + + var batchSize int + batch := bluge.NewBatch() + // Mark tickets as unavailable for further use in this process iteration. + for _, currentMatchedEntry := range currentMatchedEntries { + if _, found := selectedTickets[currentMatchedEntry.Ticket]; found { + continue + } + selectedTickets[currentMatchedEntry.Ticket] = struct{}{} + batchSize++ + batch.Delete(bluge.Identifier(currentMatchedEntry.Ticket)) + } + if batchSize > 0 { + if err := m.indexWriter.Batch(batch); err != nil { + m.logger.Error("error deleting matchmaker process entries batch", zap.Error(err)) + } + } + + break + } + } + err = indexReader.Close() + if err != nil { + m.logger.Error("error closing index reader", zap.Error(err)) + continue + } + } + + return matchedEntries, expiredActiveIndexes +} + +func (m *LocalMatchmaker) processCustom(activeIndexesCopy map[string]*MatchmakerIndex, indexCount int, indexesCopy map[string]*MatchmakerIndex) ([][]*MatchmakerEntry, []string) { + matchedEntries := make([][]*MatchmakerEntry, 0, 5) + expiredActiveIndexes := make([]string, 0, 10) + + var threshold bool + var timer *time.Timer + if m.revThresholdFn != nil { + timer = m.revThresholdFn() + defer timer.Stop() + } + + // Update all interval counts at once. + for _, index := range activeIndexesCopy { + index.Intervals++ + } + + for ticket, index := range activeIndexesCopy { + if !threshold && timer != nil { + select { + case <-timer.C: + threshold = true + default: + } + } + + lastInterval := index.Intervals >= m.config.GetMatchmaker().MaxIntervals || index.MinCount == index.MaxCount + if lastInterval { + // Drop from active indexes if it has reached its max intervals, or if its min/max counts are equal. In the + // latter case keeping it active would have the same result as leaving it in the pool, so this saves work. + expiredActiveIndexes = append(expiredActiveIndexes, ticket) + } + + if m.active.Load() != 1 { + continue + } + + indexQuery := bluge.NewBooleanQuery() + + // Results must match the query string. + indexQuery.AddMust(index.ParsedQuery) + + // Results must also have compatible min/max ranges, for example 2-4 must not match with 6-8. + minCountRange := bluge.NewNumericRangeInclusiveQuery( + float64(index.MinCount), math.Inf(1), true, true). + SetField("min_count") + indexQuery.AddMust(minCountRange) + maxCountRange := bluge.NewNumericRangeInclusiveQuery( + math.Inf(-1), float64(index.MaxCount), true, true). + SetField("max_count") + indexQuery.AddMust(maxCountRange) + + // Results must not include the current party, if any. + if index.PartyId != "" { + partyIdQuery := bluge.NewTermQuery(index.PartyId) + partyIdQuery.SetField("party_id") + indexQuery.AddMustNot(partyIdQuery) + } + + searchRequest := bluge.NewTopNSearch(indexCount, indexQuery) + // Sort results to try and select the best match, or if the + // matches are equivalent, the longest waiting tickets first. + searchRequest.SortBy([]string{"-_score", "created_at"}) + + indexReader, err := m.indexWriter.Reader() + if err != nil { + m.logger.Error("error accessing index reader", zap.Error(err)) + continue + } + + result, err := indexReader.Search(m.ctx, searchRequest) + if err != nil { + _ = indexReader.Close() + m.logger.Error("error searching index", zap.Error(err)) + continue + } + + blugeMatches, err := IterateBlugeMatches(result, map[string]struct{}{}, m.logger) + if err != nil { + _ = indexReader.Close() + m.logger.Error("error iterating search results", zap.Error(err)) + continue + } + + err = indexReader.Close() + if err != nil { + m.logger.Error("error closing index reader", zap.Error(err)) + continue + } + + hitIndexes := make([]*MatchmakerIndex, 0, len(blugeMatches.Hits)) + for _, hit := range blugeMatches.Hits { + if hit.ID == ticket { + // Remove the current ticket. + continue + } + + hitIndex, ok := indexesCopy[hit.ID] + if !ok { + // Ticket did not exist, should not happen. + m.logger.Warn("matchmaker process missing index", zap.String("ticket", hit.ID)) + continue + } + + if !threshold && m.config.GetMatchmaker().RevPrecision { + outerMutualMatch, err := validateMatch(m.ctx, m.revCache, indexReader, hitIndex.ParsedQuery, hit.ID, ticket) + if err != nil { + m.logger.Error("error validating mutual match", zap.Error(err)) + continue + } else if !outerMutualMatch { + // This search hit is not a mutual match with the outer ticket. + continue + } + } + + if index.MaxCount < hitIndex.MaxCount && hitIndex.Intervals <= m.config.GetMatchmaker().MaxIntervals { + // This match would be less than the search hit's preferred max, and they can still wait. Let them wait more. + continue + } + + // Check if there are overlapping session IDs, and if so these tickets are ineligible to match together. + var sessionIdConflict bool + for sessionID := range index.SessionIDs { + if _, found := hitIndex.SessionIDs[sessionID]; found { + sessionIdConflict = true + break + } + } + if sessionIdConflict { + continue + } + + hitIndexes = append(hitIndexes, hitIndex) + } + + for hitIndexes := range combineIndexes(hitIndexes, index.MinCount-index.Count, index.MaxCount-index.Count) { + // Check the min and max counts are met across the hit. + var hitCount int + for _, hitIndex := range hitIndexes { + hitCount += hitIndex.Count + } + hitCount += index.Count + if hitCount > index.MaxCount || hitCount < index.MinCount { + continue + } + if hitCount%index.CountMultiple != 0 { + continue + } + var reject bool + for _, hitIndex := range hitIndexes { + // Check hit max count. + if hitCount > hitIndex.MaxCount || hitCount < hitIndex.MinCount { + reject = true + break + } + // Check if count multiple is satisfied for this hit. + if hitCount%hitIndex.CountMultiple != 0 { + reject = true + break + } + // Check if the max is not met, but this hit has not reached its max intervals yet. + if hitCount < hitIndex.MaxCount && hitIndex.Intervals <= m.config.GetMatchmaker().MaxIntervals { + reject = true + break + } + } + if reject { + continue + } + + // Check for session ID or mutual match conflicts. + var sessionIdConflict, mutualMatchConflict bool + sessionIDs := make(map[string]struct{}, index.MaxCount-index.Count) + parsedQueries := make(map[string]bluge.Query, index.MaxCount-index.Count) + for _, hitIndex := range hitIndexes { + for sessionID, _ := range hitIndex.SessionIDs { + // Check for session ID conflicts. + if _, found := sessionIDs[sessionID]; found { + sessionIdConflict = true + break + } + sessionIDs[sessionID] = struct{}{} + + // Check for mutual match conflicts. + if !threshold && m.config.GetMatchmaker().RevPrecision { + for otherTicket, parsedQuery := range parsedQueries { + entryMatchesSearchHitQuery, err := validateMatch(m.ctx, m.revCache, indexReader, hitIndex.ParsedQuery, hitIndex.Ticket, otherTicket) + if err != nil { + mutualMatchConflict = true + m.logger.Error("error validating mutual match", zap.Error(err)) + break + } else if !entryMatchesSearchHitQuery { + mutualMatchConflict = true + // This hit is not a mutual match with the other ticket. + break + } + entryMatchesSearchHitQuery, err = validateMatch(m.ctx, m.revCache, indexReader, parsedQuery, otherTicket, hitIndex.Ticket) + if err != nil { + mutualMatchConflict = true + m.logger.Error("error validating mutual match", zap.Error(err)) + break + } else if !entryMatchesSearchHitQuery { + mutualMatchConflict = true + // This hit is not a mutual match with the other ticket. + break + } + } + if mutualMatchConflict { + break + } + parsedQueries[hitIndex.Ticket] = hitIndex.ParsedQuery + } + } + if sessionIdConflict || mutualMatchConflict { + break + } + } + if sessionIdConflict || mutualMatchConflict { + continue + } + + // Hit is valid, collect all its entries. + matchedEntry := make([]*MatchmakerEntry, 0, hitCount) + for _, hitIndex := range hitIndexes { + matchedEntry = append(matchedEntry, hitIndex.Entries...) + } + // Include the active index that was the root of this potential match. + matchedEntry = append(matchedEntry, index.Entries...) + + matchedEntries = append(matchedEntries, matchedEntry) + } + } + + if len(matchedEntries) == 0 { + return matchedEntries, expiredActiveIndexes + } + + // Allow the custom function to determine which of the matches should be formed. All others will be discarded. + matchedEntries = m.runtime.matchmakerOverrideFunction(m.ctx, matchedEntries) + + return matchedEntries, expiredActiveIndexes +} + +func combineIndexes(from []*MatchmakerIndex, min, max int) <-chan []*MatchmakerIndex { + c := make(chan []*MatchmakerIndex) + + go func() { + defer close(c) + length := uint(len(from)) + + // Go through all possible combinations of from 1 (only first element in subset) to 2^length (all objects in subset) + // and return those that contain between min and max elements. + combination: + for combinationBits := 1; combinationBits < (1 << length); combinationBits++ { + count := bits.OnesCount(uint(combinationBits)) + if count > max { + continue + } + + combination := make([]*MatchmakerIndex, 0, count) + entryCount := 0 + for element := uint(0); element < length; element++ { + // Check if element should be contained in combination by checking if bit 'element' is set in combinationBits. + if (combinationBits>>element)&1 == 1 { + entryCount = entryCount + from[element].Count + if entryCount > max { + continue combination + } + combination = append(combination, from[element]) + } + } + if entryCount >= min { + c <- combination + } + } + }() + return c +} diff --git a/server/runtime.go b/server/runtime.go index a75549391..122b2a2a2 100644 --- a/server/runtime.go +++ b/server/runtime.go @@ -208,7 +208,8 @@ type ( RuntimeBeforeGetSubscriptionFunction func(ctx context.Context, logger *zap.Logger, userID, username string, vars map[string]string, expiry int64, clientIP, clientPort string, in *api.GetSubscriptionRequest) (*api.GetSubscriptionRequest, error, codes.Code) RuntimeAfterGetSubscriptionFunction func(ctx context.Context, logger *zap.Logger, userID, username string, vars map[string]string, expiry int64, clientIP, clientPort string, out *api.ValidatedSubscription, in *api.GetSubscriptionRequest) error - RuntimeMatchmakerMatchedFunction func(ctx context.Context, entries []*MatchmakerEntry) (string, bool, error) + RuntimeMatchmakerMatchedFunction func(ctx context.Context, entries []*MatchmakerEntry) (string, bool, error) + RuntimeMatchmakerOverrideFunction func(ctx context.Context, candidateMatches [][]*MatchmakerEntry) (matches [][]*MatchmakerEntry) RuntimeMatchCreateFunction func(ctx context.Context, logger *zap.Logger, id uuid.UUID, node string, stopped *atomic.Bool, name string) (RuntimeMatchCore, error) RuntimeMatchDeferMessageFunction func(msg *DeferredMessage) error @@ -240,6 +241,7 @@ const ( RuntimeExecutionModeAfter RuntimeExecutionModeMatch RuntimeExecutionModeMatchmaker + RuntimeExecutionModeMatchmakerOverride RuntimeExecutionModeMatchCreate RuntimeExecutionModeTournamentEnd RuntimeExecutionModeTournamentReset @@ -266,6 +268,8 @@ func (e RuntimeExecutionMode) String() string { return "match" case RuntimeExecutionModeMatchmaker: return "matchmaker" + case RuntimeExecutionModeMatchmakerOverride: + return "matchmaker_override" case RuntimeExecutionModeMatchCreate: return "match_create" case RuntimeExecutionModeTournamentEnd: @@ -497,7 +501,8 @@ type Runtime struct { beforeReqFunctions *RuntimeBeforeReqFunctions afterReqFunctions *RuntimeAfterReqFunctions - matchmakerMatchedFunction RuntimeMatchmakerMatchedFunction + matchmakerMatchedFunction RuntimeMatchmakerMatchedFunction + matchmakerOverrideFunction RuntimeMatchmakerOverrideFunction tournamentEndFunction RuntimeTournamentEndFunction tournamentResetFunction RuntimeTournamentResetFunction @@ -626,7 +631,7 @@ func NewRuntime(ctx context.Context, logger, startupLogger *zap.Logger, db *sql. matchProvider := NewMatchProvider() - goModules, goRPCFns, goBeforeRtFns, goAfterRtFns, goBeforeReqFns, goAfterReqFns, goMatchmakerMatchedFn, goTournamentEndFn, goTournamentResetFn, goLeaderboardResetFn, goPurchaseNotificationAppleFn, goSubscriptionNotificationAppleFn, goPurchaseNotificationGoogleFn, goSubscriptionNotificationGoogleFn, allEventFns, goMatchNamesListFn, err := NewRuntimeProviderGo(ctx, logger, startupLogger, db, protojsonMarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, runtimeConfig.Path, paths, eventQueue, matchProvider) + goModules, goRPCFns, goBeforeRtFns, goAfterRtFns, goBeforeReqFns, goAfterReqFns, goMatchmakerMatchedFn, goMatchmakerCustomMatchingFn, goTournamentEndFn, goTournamentResetFn, goLeaderboardResetFn, goPurchaseNotificationAppleFn, goSubscriptionNotificationAppleFn, goPurchaseNotificationGoogleFn, goSubscriptionNotificationGoogleFn, allEventFns, goMatchNamesListFn, err := NewRuntimeProviderGo(ctx, logger, startupLogger, db, protojsonMarshaler, config, version, socialClient, leaderboardCache, leaderboardRankCache, leaderboardScheduler, sessionRegistry, sessionCache, statusRegistry, matchRegistry, tracker, metrics, streamManager, router, runtimeConfig.Path, paths, eventQueue, matchProvider) if err != nil { startupLogger.Error("Error initialising Go runtime provider", zap.Error(err)) return nil, nil, err @@ -2452,6 +2457,14 @@ func NewRuntime(ctx context.Context, logger, startupLogger *zap.Logger, db *sql. startupLogger.Info("Registered JavaScript runtime Matchmaker Matched function invocation") } + var allMatchmakerOverrideFunction RuntimeMatchmakerOverrideFunction + switch { + case goMatchmakerMatchedFn != nil: + allMatchmakerOverrideFunction = goMatchmakerCustomMatchingFn + startupLogger.Info("Registered Go runtime Matchmaker Override function invocation") + // TODO: Handle other runtimes. + } + var allTournamentEndFunction RuntimeTournamentEndFunction switch { case goTournamentEndFn != nil: @@ -2563,6 +2576,7 @@ func NewRuntime(ctx context.Context, logger, startupLogger *zap.Logger, db *sql. beforeReqFunctions: allBeforeReqFunctions, afterReqFunctions: allAfterReqFunctions, matchmakerMatchedFunction: allMatchmakerMatchedFunction, + matchmakerOverrideFunction: allMatchmakerOverrideFunction, tournamentEndFunction: allTournamentEndFunction, tournamentResetFunction: allTournamentResetFunction, leaderboardResetFunction: allLeaderboardResetFunction, diff --git a/server/runtime_go.go b/server/runtime_go.go index 6a111f740..dda686e2e 100644 --- a/server/runtime_go.go +++ b/server/runtime_go.go @@ -58,6 +58,7 @@ type RuntimeGoInitializer struct { subscriptionNotificationApple RuntimeSubscriptionNotificationAppleFunction purchaseNotificationGoogle RuntimePurchaseNotificationGoogleFunction subscriptionNotificationGoogle RuntimeSubscriptionNotificationGoogleFunction + matchmakerOverride RuntimeMatchmakerOverrideFunction eventFunctions []RuntimeEventFunction sessionStartFunctions []RuntimeEventFunction @@ -2481,6 +2482,34 @@ func (ri *RuntimeGoInitializer) RegisterMatchmakerMatched(fn func(ctx context.Co return nil } +func (ri *RuntimeGoInitializer) RegisterMatchmakerOverride(fn func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, matches [][]runtime.MatchmakerEntry) [][]runtime.MatchmakerEntry) error { + ri.matchmakerOverride = func(ctx context.Context, entries [][]*MatchmakerEntry) [][]*MatchmakerEntry { + ctx = NewRuntimeGoContext(ctx, ri.node, ri.version, ri.env, RuntimeExecutionModeMatchmakerOverride, nil, nil, 0, "", "", nil, "", "", "", "") + runtimeCombinations := make([][]runtime.MatchmakerEntry, len(entries)) + for i, combination := range entries { + runtimeEntry := make([]runtime.MatchmakerEntry, len(combination)) + for j, entry := range combination { + runtimeEntry[j] = runtime.MatchmakerEntry(entry) + } + runtimeCombinations[i] = runtimeEntry + } + + returnedEntries := fn(ctx, ri.logger.WithField("mode", RuntimeExecutionModeMatchmakerOverride.String()), ri.db, ri.nk, runtimeCombinations) + + combinations := make([][]*MatchmakerEntry, len(entries)) + for i, combination := range returnedEntries { + entries := make([]*MatchmakerEntry, len(combination)) + for j, entry := range combination { + e, _ := entry.(*MatchmakerEntry) + entries[j] = e + } + combinations[i] = entries + } + return combinations + } + return nil +} + func (ri *RuntimeGoInitializer) RegisterTournamentEnd(fn func(ctx context.Context, logger runtime.Logger, db *sql.DB, nk runtime.NakamaModule, tournament *api.Tournament, end, reset int64) error) error { ri.tournamentEnd = func(ctx context.Context, tournament *api.Tournament, end, reset int64) error { ctx = NewRuntimeGoContext(ctx, ri.node, ri.version, ri.env, RuntimeExecutionModeTournamentEnd, nil, nil, 0, "", "", nil, "", "", "", "") @@ -2544,7 +2573,7 @@ func (ri *RuntimeGoInitializer) RegisterMatch(name string, fn func(ctx context.C return nil } -func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, config Config, version string, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter, rootPath string, paths []string, eventQueue *RuntimeEventQueue, matchProvider *MatchProvider) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, RuntimePurchaseNotificationAppleFunction, RuntimeSubscriptionNotificationAppleFunction, RuntimePurchaseNotificationGoogleFunction, RuntimeSubscriptionNotificationGoogleFunction, *RuntimeEventFunctions, func() []string, error) { +func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger, db *sql.DB, protojsonMarshaler *protojson.MarshalOptions, config Config, version string, socialClient *social.Client, leaderboardCache LeaderboardCache, leaderboardRankCache LeaderboardRankCache, leaderboardScheduler LeaderboardScheduler, sessionRegistry SessionRegistry, sessionCache SessionCache, statusRegistry *StatusRegistry, matchRegistry MatchRegistry, tracker Tracker, metrics Metrics, streamManager StreamManager, router MessageRouter, rootPath string, paths []string, eventQueue *RuntimeEventQueue, matchProvider *MatchProvider) ([]string, map[string]RuntimeRpcFunction, map[string]RuntimeBeforeRtFunction, map[string]RuntimeAfterRtFunction, *RuntimeBeforeReqFunctions, *RuntimeAfterReqFunctions, RuntimeMatchmakerMatchedFunction, RuntimeMatchmakerOverrideFunction, RuntimeTournamentEndFunction, RuntimeTournamentResetFunction, RuntimeLeaderboardResetFunction, RuntimePurchaseNotificationAppleFunction, RuntimeSubscriptionNotificationAppleFunction, RuntimePurchaseNotificationGoogleFunction, RuntimeSubscriptionNotificationGoogleFunction, *RuntimeEventFunctions, func() []string, error) { runtimeLogger := NewRuntimeGoLogger(logger) node := config.GetName() env := config.GetRuntime().Environment @@ -2624,13 +2653,13 @@ func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger relPath, name, fn, err := openGoModule(startupLogger, rootPath, path) if err != nil { // Errors are already logged in the function above. - return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err } // Run the initialisation. if err = fn(ctx, runtimeLogger, db, nk, initializer); err != nil { startupLogger.Fatal("Error returned by InitModule function in Go module", zap.String("name", name), zap.Error(err)) - return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, errors.New("error returned by InitModule function in Go module") + return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, errors.New("error returned by InitModule function in Go module") } modulePaths = append(modulePaths, relPath) } @@ -2678,7 +2707,7 @@ func NewRuntimeProviderGo(ctx context.Context, logger, startupLogger *zap.Logger } } - return modulePaths, initializer.rpc, initializer.beforeRt, initializer.afterRt, initializer.beforeReq, initializer.afterReq, initializer.matchmakerMatched, initializer.tournamentEnd, initializer.tournamentReset, initializer.leaderboardReset, initializer.purchaseNotificationApple, initializer.subscriptionNotificationApple, initializer.purchaseNotificationGoogle, initializer.subscriptionNotificationGoogle, events, matchNamesListFn, nil + return modulePaths, initializer.rpc, initializer.beforeRt, initializer.afterRt, initializer.beforeReq, initializer.afterReq, initializer.matchmakerMatched, initializer.matchmakerOverride, initializer.tournamentEnd, initializer.tournamentReset, initializer.leaderboardReset, initializer.purchaseNotificationApple, initializer.subscriptionNotificationApple, initializer.purchaseNotificationGoogle, initializer.subscriptionNotificationGoogle, events, matchNamesListFn, nil } func CheckRuntimeProviderGo(logger *zap.Logger, rootPath string, paths []string) error { diff --git a/vendor/github.com/heroiclabs/nakama-common/runtime/runtime.go b/vendor/github.com/heroiclabs/nakama-common/runtime/runtime.go index eb0c87c71..bcc2595e1 100644 --- a/vendor/github.com/heroiclabs/nakama-common/runtime/runtime.go +++ b/vendor/github.com/heroiclabs/nakama-common/runtime/runtime.go @@ -338,6 +338,9 @@ type Initializer interface { // RegisterMatchmakerMatched RegisterMatchmakerMatched(fn func(ctx context.Context, logger Logger, db *sql.DB, nk NakamaModule, entries []MatchmakerEntry) (string, error)) error + // RegisterMatchmakerOverride + RegisterMatchmakerOverride(fn func(ctx context.Context, logger Logger, db *sql.DB, nk NakamaModule, candidateMatches [][]MatchmakerEntry) (matches [][]MatchmakerEntry)) error + // RegisterMatch RegisterMatch(name string, fn func(ctx context.Context, logger Logger, db *sql.DB, nk NakamaModule) (Match, error)) error diff --git a/vendor/modules.txt b/vendor/modules.txt index 4501e85ee..c4949a990 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -146,7 +146,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2/internal/genopena github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-openapiv2/options github.com/grpc-ecosystem/grpc-gateway/v2/runtime github.com/grpc-ecosystem/grpc-gateway/v2/utilities -# github.com/heroiclabs/nakama-common v1.26.1-0.20230321170403-7becee7153cf +# github.com/heroiclabs/nakama-common v1.26.1-0.20230414123330-4d149b942148 ## explicit; go 1.19 github.com/heroiclabs/nakama-common/api github.com/heroiclabs/nakama-common/rtapi -- GitLab