Loading main.go +5 −6 Original line number Diff line number Diff line Loading @@ -60,7 +60,6 @@ var ( ) func main() { //startedAt := int64(time.Nanosecond) * time.Now().UTC().UnixNano() / int64(time.Millisecond) semver := fmt.Sprintf("%s+%s", version, commitID) // Always set default timeout on HTTP client. http.DefaultClient.Timeout = 1500 * time.Millisecond Loading Loading @@ -103,7 +102,7 @@ func main() { tracker := server.StartLocalTracker(logger, config, sessionRegistry, jsonpbMarshaler) router := server.NewLocalMessageRouter(sessionRegistry, tracker, jsonpbMarshaler) leaderboardCache := server.NewLocalLeaderboardCache(logger, startupLogger, db) leaderboardRankCache := server.NewLocalLeaderboardRankCache(logger, startupLogger, db, config.GetLeaderboard(), leaderboardCache) leaderboardRankCache := server.NewLocalLeaderboardRankCache(startupLogger, db, config.GetLeaderboard(), leaderboardCache) leaderboardScheduler := server.NewLocalLeaderboardScheduler(logger, db, leaderboardCache, leaderboardRankCache) matchRegistry := server.NewLocalMatchRegistry(logger, startupLogger, config, tracker, router, config.GetName()) tracker.SetMatchJoinListener(matchRegistry.Join) Loading @@ -126,7 +125,7 @@ func main() { gaenabled := len(os.Getenv("NAKAMA_TELEMETRY")) < 1 cookie := newOrLoadCookie(config) gacode := "UA-89792135-1" const gacode = "UA-89792135-1" var telemetryClient *http.Client if gaenabled { telemetryClient = &http.Client{ Loading Loading @@ -184,7 +183,7 @@ func main() { sessionRegistry.Stop() if gaenabled { ga.SendSessionStop(telemetryClient, gacode, cookie) _ = ga.SendSessionStop(telemetryClient, gacode, cookie) } startupLogger.Info("Shutdown complete") Loading Loading @@ -253,7 +252,7 @@ func runTelemetry(httpc *http.Client, gacode string, cookie string) { if ga.SendEvent(httpc, gacode, cookie, &ga.Event{Ec: "version", Ea: fmt.Sprintf("%s+%s", version, commitID)}) != nil { return } ga.SendEvent(httpc, gacode, cookie, &ga.Event{Ec: "variant", Ea: "nakama"}) _ = ga.SendEvent(httpc, gacode, cookie, &ga.Event{Ec: "variant", Ea: "nakama"}) } func newOrLoadCookie(config server.Config) string { Loading @@ -262,7 +261,7 @@ func newOrLoadCookie(config server.Config) string { cookie := uuid.FromBytesOrNil(b) if err != nil || cookie == uuid.Nil { cookie = uuid.Must(uuid.NewV4()) ioutil.WriteFile(filePath, cookie.Bytes(), 0644) _ = ioutil.WriteFile(filePath, cookie.Bytes(), 0644) } return cookie.String() } server/leaderboard_rank_cache.go +1 −3 Original line number Diff line number Diff line Loading @@ -82,14 +82,12 @@ type LeaderboardWithExpiry struct { type LocalLeaderboardRankCache struct { sync.RWMutex cache map[LeaderboardWithExpiry]*RankMap logger *zap.Logger blacklistAll bool blacklistIds map[string]struct{} } func NewLocalLeaderboardRankCache(logger, startupLogger *zap.Logger, db *sql.DB, config *LeaderboardConfig, leaderboardCache LeaderboardCache) LeaderboardRankCache { func NewLocalLeaderboardRankCache(startupLogger *zap.Logger, db *sql.DB, config *LeaderboardConfig, leaderboardCache LeaderboardCache) LeaderboardRankCache { cache := &LocalLeaderboardRankCache{ logger: logger, blacklistIds: make(map[string]struct{}, len(config.BlacklistRankCache)), blacklistAll: len(config.BlacklistRankCache) == 1 && config.BlacklistRankCache[0] == "*", cache: make(map[LeaderboardWithExpiry]*RankMap, 0), Loading server/match_registry.go +98 −177 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ import ( "context" "encoding/json" "fmt" "github.com/blevesearch/bleve/search/query" "strings" "sync" "time" Loading Loading @@ -102,7 +104,9 @@ type LocalMatchRegistry struct { tracker Tracker router MessageRouter node string matches map[uuid.UUID]*MatchHandler matches *sync.Map matchCount *atomic.Int32 index bleve.Index stopped *atomic.Bool Loading @@ -124,7 +128,9 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, tra tracker: tracker, router: router, node: node, matches: make(map[uuid.UUID]*MatchHandler), matches: &sync.Map{}, matchCount: atomic.NewInt32(0), index: index, stopped: atomic.NewBool(false), Loading Loading @@ -164,27 +170,28 @@ func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, core Run return nil, err } r.Lock() r.matches[id] = match r.Unlock() r.matches.Store(id, match) r.matchCount.Inc() return match, nil } func (r *LocalMatchRegistry) GetMatch(id uuid.UUID) *MatchHandler { var mh *MatchHandler r.RLock() mh = r.matches[id] r.RUnlock() return mh mh, ok := r.matches.Load(id) if !ok { return nil } return mh.(*MatchHandler) } func (r *LocalMatchRegistry) RemoveMatch(id uuid.UUID, stream PresenceStream) { r.Lock() delete(r.matches, id) matchesRemaining := len(r.matches) r.Unlock() r.matches.Delete(id) matchesRemaining := r.matchCount.Dec() r.tracker.UntrackByStream(stream) r.index.Delete(fmt.Sprintf("%v.%v", id.String(), r.node)) if err := r.index.Delete(fmt.Sprintf("%v.%v", id.String(), r.node)); err != nil { r.logger.Warn("Error removing match list index", zap.String("id", fmt.Sprintf("%v.%v", id.String(), r.node)), zap.Error(err)) } // If there are no more matches in this registry and a shutdown was initiated then signal // that the process is complete. Loading @@ -198,10 +205,10 @@ func (r *LocalMatchRegistry) RemoveMatch(id uuid.UUID, stream PresenceStream) { } func (r *LocalMatchRegistry) GetMatchLabel(ctx context.Context, id uuid.UUID, node string) (string, error) { query := bleve.NewDocIDQuery([]string{fmt.Sprintf("%v.%v", id.String(), node)}) search := bleve.NewSearchRequestOptions(query, 1, 0, false) search.Fields = []string{"label_string"} results, err := r.index.SearchInContext(ctx, search) q := bleve.NewDocIDQuery([]string{fmt.Sprintf("%v.%v", id.String(), node)}) searchReq := bleve.NewSearchRequestOptions(q, 1, 0, false) searchReq.Fields = []string{"label_string"} results, err := r.index.SearchInContext(ctx, searchReq) if err != nil { return "", fmt.Errorf("error getting match label: %v", err.Error()) } Loading @@ -224,7 +231,7 @@ func (r *LocalMatchRegistry) UpdateMatchLabel(id uuid.UUID, label string) error var labelJSON map[string]interface{} // Doesn't matter if this is not JSON. json.Unmarshal([]byte(label), &labelJSON) _ = json.Unmarshal([]byte(label), &labelJSON) return r.index.Index(fmt.Sprintf("%v.%v", id.String(), r.node), &MatchIndexEntry{ Node: r.node, Label: labelJSON, Loading @@ -232,15 +239,14 @@ func (r *LocalMatchRegistry) UpdateMatchLabel(id uuid.UUID, label string) error }) } func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authoritative *wrappers.BoolValue, label *wrappers.StringValue, minSize *wrappers.Int32Value, maxSize *wrappers.Int32Value, query *wrappers.StringValue) ([]*api.Match, error) { func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authoritative *wrappers.BoolValue, label *wrappers.StringValue, minSize *wrappers.Int32Value, maxSize *wrappers.Int32Value, queryString *wrappers.StringValue) ([]*api.Match, error) { if limit == 0 { return make([]*api.Match, 0), nil } var modes map[uint8]*uint8 var allowRelayed bool var labelResults *bleve.SearchResult var orderRequired bool if query != nil { if queryString != nil { if authoritative != nil && !authoritative.Value { // A filter on query is requested but authoritative matches are not allowed. return make([]*api.Match, 0), nil Loading @@ -249,31 +255,23 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori // If there are filters other than query, we don't know which matches will work so get more than the limit. count := limit if minSize != nil || maxSize != nil { c, err := r.index.DocCount() if err != nil { return nil, fmt.Errorf("error listing matches count: %v", err.Error()) } count = int(c) count = int(r.matchCount.Load()) } // Apply the query filter to the set of known match labels. queryString := query.Value if queryString == "" { queryString = "*" var q query.Query if queryString := queryString.Value; queryString == "" { q = bleve.NewMatchAllQuery() } else { q = bleve.NewQueryStringQuery(queryString) } indexQuery := bleve.NewQueryStringQuery(queryString) search := bleve.NewSearchRequestOptions(indexQuery, count, 0, false) search.Fields = []string{"label_string"} searchReq := bleve.NewSearchRequestOptions(q, count, 0, false) searchReq.Fields = []string{"label_string"} var err error labelResults, err = r.index.SearchInContext(ctx, search) labelResults, err = r.index.SearchInContext(ctx, searchReq) if err != nil { return nil, fmt.Errorf("error listing matches by query: %v", err.Error()) } // Because we have a query filter only authoritative matches are eligible. modes = MatchFilterAuthoritative // The query may contain boosting, in which case the order of results matters. orderRequired = true } else if label != nil { if authoritative != nil && !authoritative.Value { // A filter on label is requested but authoritative matches are not allowed. Loading @@ -283,51 +281,43 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori // If there are filters other than label, we don't know which matches will work so get more than the limit. count := limit if minSize != nil || maxSize != nil { c, err := r.index.DocCount() if err != nil { return nil, fmt.Errorf("error listing matches count: %v", err.Error()) } count = int(c) count = int(r.matchCount.Load()) } // Apply the label filter to the set of known match labels. indexQuery := bleve.NewMatchQuery(label.Value) indexQuery.SetField("label_string") search := bleve.NewSearchRequestOptions(indexQuery, int(count), 0, false) search.Fields = []string{"label_string"} searchReq := bleve.NewSearchRequestOptions(indexQuery, count, 0, false) searchReq.Fields = []string{"label_string"} var err error labelResults, err = r.index.SearchInContext(ctx, search) labelResults, err = r.index.SearchInContext(ctx, searchReq) if err != nil { return nil, fmt.Errorf("error listing matches by label: %v", err.Error()) } // Because we have a query filter only authoritative matches are eligible. modes = MatchFilterAuthoritative } else if authoritative == nil || authoritative.Value { // Not using label/query filter but we still need access to the indexed labels to return them // if authoritative matches may be included in the results. count, err := r.index.DocCount() if err != nil { return nil, fmt.Errorf("error listing matches count: %v", err.Error()) count := limit if minSize != nil || maxSize != nil { count = int(r.matchCount.Load()) } indexQuery := bleve.NewMatchAllQuery() search := bleve.NewSearchRequestOptions(indexQuery, int(count), 0, false) search.Fields = []string{"label_string"} labelResults, err = r.index.SearchInContext(ctx, search) searchReq := bleve.NewSearchRequestOptions(indexQuery, count, 0, false) searchReq.Fields = []string{"label_string"} var err error labelResults, err = r.index.SearchInContext(ctx, searchReq) if err != nil { return nil, fmt.Errorf("error listing matches by label: %v", err.Error()) } if authoritative == nil { // Expect a possible mix of authoritative and relayed matches. modes = MatchFilterAny } else { // Authoritative was strictly true even if there was no label/query filter. modes = MatchFilterAuthoritative allowRelayed = true } } else { // Authoritative was strictly false, and there was no label/query filter. modes = MatchFilterRelayed allowRelayed = true } if labelResults != nil && labelResults.Hits.Len() == 0 && authoritative != nil && !authoritative.Value { Loading @@ -335,22 +325,20 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori return make([]*api.Match, 0), nil } // There is a query which may contain boosted search terms, which means order of results matters. if orderRequired { // Look up tracker info to determine match sizes. // This info is needed even if there is no min/max size filter because it's returned in results. matches := r.tracker.CountByStreamModeFilter(modes) matchSizes := make(map[string]int32, len(matches)) for stream, size := range matches { matchSizes[fmt.Sprintf("%v.%v", stream.Subject.String(), stream.Label)] = size } // Results. results := make([]*api.Match, 0, limit) // Use any eligible authoritative matches first. if labelResults != nil { for _, hit := range labelResults.Hits { // Size may be 0. size := matchSizes[hit.ID] matchIDComponents := strings.SplitN(hit.ID, ".", 2) id := uuid.FromStringOrNil(matchIDComponents[0]) mh, ok := r.matches.Load(id) if !ok { continue } size := int32(mh.(*MatchHandler).PresenceList.Size()) if minSize != nil && minSize.Value > size { // Not eligible based on minimum size. Loading Loading @@ -383,51 +371,21 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori return results, nil } } // We're in the query case, non-authoritative matches are not eligible so return what we can. return results, nil } // It was not a query so ordering does not matter, move on to process the minimal set possible in any order. // Match labels will only be nil if there is no label filter, no query filter, and authoritative is strictly false. // Therefore authoritative matches will never be part of this listing at all. var matchLabels map[string]*wrappers.StringValue if labelResults != nil { matchLabels = make(map[string]*wrappers.StringValue, labelResults.Hits.Len()) for _, hit := range labelResults.Hits { if l, ok := hit.Fields["label_string"]; ok { if ls, ok := l.(string); ok { matchLabels[hit.ID] = &wrappers.StringValue{Value: ls} } } } // If relayed matches are not allowed still return any available results. if !allowRelayed { return results, nil } // Look up tracker info to determine match sizes. // This info is needed even if there is no min/max size filter because it's returned in results. matches := r.tracker.CountByStreamModeFilter(modes) // Results. results := make([]*api.Match, 0, limit) // Intersection of matches listed from stream and matches listed from label index, if any. matches := r.tracker.CountByStreamModeFilter(MatchFilterRelayed) for stream, size := range matches { if stream.Mode != StreamModeMatchRelayed && stream.Mode != StreamModeMatchAuthoritative { if stream.Mode != StreamModeMatchRelayed { // Only relayed matches are expected at this point. r.logger.Warn("Ignoring unknown stream mode in match listing operation", zap.Uint8("mode", stream.Mode)) continue } id := fmt.Sprintf("%v.%v", stream.Subject.String(), stream.Label) label, ok := matchLabels[id] if ok { delete(matchLabels, id) } else if matchLabels != nil && stream.Mode == StreamModeMatchAuthoritative { // Not eligible based on the label/query. continue } if minSize != nil && minSize.Value > size { // Not eligible based on minimum size. continue Loading @@ -439,8 +397,8 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori } results = append(results, &api.Match{ MatchId: id, Authoritative: stream.Mode == StreamModeMatchAuthoritative, MatchId: fmt.Sprintf("%v.%v", stream.Subject.String(), stream.Label), Authoritative: false, Label: label, Size: size, }) Loading @@ -449,24 +407,6 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori } } // Return incomplete results here if we're not allowed to return potentially empty authoritative matches. if (authoritative != nil && !authoritative.Value) || (minSize != nil && minSize.Value > 0) { return results, nil } // All we have left now are empty authoritative matches that we already know matched label/query filter if any. for id, label := range matchLabels { results = append(results, &api.Match{ MatchId: id, Authoritative: true, Label: label, Size: 0, }) if len(results) == limit { break } } return results, nil } Loading @@ -476,13 +416,12 @@ func (r *LocalMatchRegistry) Stop(graceSeconds int) chan struct{} { // Graceful shutdown not allowed/required, or grace period has expired. if graceSeconds == 0 { r.RLock() for id, mh := range r.matches { mh.Close() delete(r.matches, id) r.matches.Range(func(id, mh interface{}) bool { mh.(*MatchHandler).Close() r.matches.Delete(id) // No need to clean up label index. } r.RUnlock() return true }) // Termination was triggered and there are no active matches. select { case r.stoppedCh <- struct{}{}: Loading @@ -492,32 +431,29 @@ func (r *LocalMatchRegistry) Stop(graceSeconds int) chan struct{} { return r.stoppedCh } r.RLock() if len(r.matches) == 0 { var anyRunning bool r.matches.Range(func(id, mh interface{}) bool { anyRunning = true // Don't care if the call queue is full, match is supposed to end anyway. mh.(*MatchHandler).QueueTerminate(graceSeconds) return true }) if !anyRunning { // Termination was triggered and there are no active matches. select { case r.stoppedCh <- struct{}{}: default: // Ignore if the signal has already been sent. } r.RUnlock() return r.stoppedCh } for _, mh := range r.matches { // Don't care if the call queue is full, match is supposed to end anyway. mh.QueueTerminate(graceSeconds) } r.RUnlock() return r.stoppedCh } func (r *LocalMatchRegistry) Count() int { var count int r.RLock() count = len(r.matches) r.RUnlock() return count return int(r.matchCount.Load()) } func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, string, string) { Loading @@ -525,14 +461,11 @@ func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node return false, false, "", "" } var mh *MatchHandler var ok bool r.RLock() mh, ok = r.matches[id] r.RUnlock() m, ok := r.matches.Load(id) if !ok { return false, false, "", "" } mh := m.(*MatchHandler) resultCh := make(chan *MatchJoinResult, 1) if !mh.QueueJoinAttempt(ctx, resultCh, userID, sessionID, username, fromNode, metadata) { Loading @@ -555,31 +488,23 @@ func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node } func (r *LocalMatchRegistry) Join(id uuid.UUID, presences []*MatchPresence) { var mh *MatchHandler var ok bool r.RLock() mh, ok = r.matches[id] r.RUnlock() mh, ok := r.matches.Load(id) if !ok { return } // Doesn't matter if the call queue was full here. If the match is being closed then joins don't matter anyway. mh.QueueJoin(presences, true) mh.(*MatchHandler).QueueJoin(presences, true) } func (r *LocalMatchRegistry) Leave(id uuid.UUID, presences []*MatchPresence) { var mh *MatchHandler var ok bool r.RLock() mh, ok = r.matches[id] r.RUnlock() mh, ok := r.matches.Load(id) if !ok { return } // Doesn't matter if the call queue was full here. If the match is being closed then leaves don't matter anyway. mh.QueueLeave(presences) mh.(*MatchHandler).QueueLeave(presences) } func (r *LocalMatchRegistry) Kick(stream PresenceStream, presences []*MatchPresence) { Loading @@ -596,16 +521,12 @@ func (r *LocalMatchRegistry) SendData(id uuid.UUID, node string, userID, session return } var mh *MatchHandler var ok bool r.RLock() mh, ok = r.matches[id] r.RUnlock() mh, ok := r.matches.Load(id) if !ok { return } mh.QueueData(&MatchDataMessage{ mh.(*MatchHandler).QueueData(&MatchDataMessage{ UserID: userID, SessionID: sessionID, Username: username, Loading Loading
main.go +5 −6 Original line number Diff line number Diff line Loading @@ -60,7 +60,6 @@ var ( ) func main() { //startedAt := int64(time.Nanosecond) * time.Now().UTC().UnixNano() / int64(time.Millisecond) semver := fmt.Sprintf("%s+%s", version, commitID) // Always set default timeout on HTTP client. http.DefaultClient.Timeout = 1500 * time.Millisecond Loading Loading @@ -103,7 +102,7 @@ func main() { tracker := server.StartLocalTracker(logger, config, sessionRegistry, jsonpbMarshaler) router := server.NewLocalMessageRouter(sessionRegistry, tracker, jsonpbMarshaler) leaderboardCache := server.NewLocalLeaderboardCache(logger, startupLogger, db) leaderboardRankCache := server.NewLocalLeaderboardRankCache(logger, startupLogger, db, config.GetLeaderboard(), leaderboardCache) leaderboardRankCache := server.NewLocalLeaderboardRankCache(startupLogger, db, config.GetLeaderboard(), leaderboardCache) leaderboardScheduler := server.NewLocalLeaderboardScheduler(logger, db, leaderboardCache, leaderboardRankCache) matchRegistry := server.NewLocalMatchRegistry(logger, startupLogger, config, tracker, router, config.GetName()) tracker.SetMatchJoinListener(matchRegistry.Join) Loading @@ -126,7 +125,7 @@ func main() { gaenabled := len(os.Getenv("NAKAMA_TELEMETRY")) < 1 cookie := newOrLoadCookie(config) gacode := "UA-89792135-1" const gacode = "UA-89792135-1" var telemetryClient *http.Client if gaenabled { telemetryClient = &http.Client{ Loading Loading @@ -184,7 +183,7 @@ func main() { sessionRegistry.Stop() if gaenabled { ga.SendSessionStop(telemetryClient, gacode, cookie) _ = ga.SendSessionStop(telemetryClient, gacode, cookie) } startupLogger.Info("Shutdown complete") Loading Loading @@ -253,7 +252,7 @@ func runTelemetry(httpc *http.Client, gacode string, cookie string) { if ga.SendEvent(httpc, gacode, cookie, &ga.Event{Ec: "version", Ea: fmt.Sprintf("%s+%s", version, commitID)}) != nil { return } ga.SendEvent(httpc, gacode, cookie, &ga.Event{Ec: "variant", Ea: "nakama"}) _ = ga.SendEvent(httpc, gacode, cookie, &ga.Event{Ec: "variant", Ea: "nakama"}) } func newOrLoadCookie(config server.Config) string { Loading @@ -262,7 +261,7 @@ func newOrLoadCookie(config server.Config) string { cookie := uuid.FromBytesOrNil(b) if err != nil || cookie == uuid.Nil { cookie = uuid.Must(uuid.NewV4()) ioutil.WriteFile(filePath, cookie.Bytes(), 0644) _ = ioutil.WriteFile(filePath, cookie.Bytes(), 0644) } return cookie.String() }
server/leaderboard_rank_cache.go +1 −3 Original line number Diff line number Diff line Loading @@ -82,14 +82,12 @@ type LeaderboardWithExpiry struct { type LocalLeaderboardRankCache struct { sync.RWMutex cache map[LeaderboardWithExpiry]*RankMap logger *zap.Logger blacklistAll bool blacklistIds map[string]struct{} } func NewLocalLeaderboardRankCache(logger, startupLogger *zap.Logger, db *sql.DB, config *LeaderboardConfig, leaderboardCache LeaderboardCache) LeaderboardRankCache { func NewLocalLeaderboardRankCache(startupLogger *zap.Logger, db *sql.DB, config *LeaderboardConfig, leaderboardCache LeaderboardCache) LeaderboardRankCache { cache := &LocalLeaderboardRankCache{ logger: logger, blacklistIds: make(map[string]struct{}, len(config.BlacklistRankCache)), blacklistAll: len(config.BlacklistRankCache) == 1 && config.BlacklistRankCache[0] == "*", cache: make(map[LeaderboardWithExpiry]*RankMap, 0), Loading
server/match_registry.go +98 −177 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ import ( "context" "encoding/json" "fmt" "github.com/blevesearch/bleve/search/query" "strings" "sync" "time" Loading Loading @@ -102,7 +104,9 @@ type LocalMatchRegistry struct { tracker Tracker router MessageRouter node string matches map[uuid.UUID]*MatchHandler matches *sync.Map matchCount *atomic.Int32 index bleve.Index stopped *atomic.Bool Loading @@ -124,7 +128,9 @@ func NewLocalMatchRegistry(logger, startupLogger *zap.Logger, config Config, tra tracker: tracker, router: router, node: node, matches: make(map[uuid.UUID]*MatchHandler), matches: &sync.Map{}, matchCount: atomic.NewInt32(0), index: index, stopped: atomic.NewBool(false), Loading Loading @@ -164,27 +170,28 @@ func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, core Run return nil, err } r.Lock() r.matches[id] = match r.Unlock() r.matches.Store(id, match) r.matchCount.Inc() return match, nil } func (r *LocalMatchRegistry) GetMatch(id uuid.UUID) *MatchHandler { var mh *MatchHandler r.RLock() mh = r.matches[id] r.RUnlock() return mh mh, ok := r.matches.Load(id) if !ok { return nil } return mh.(*MatchHandler) } func (r *LocalMatchRegistry) RemoveMatch(id uuid.UUID, stream PresenceStream) { r.Lock() delete(r.matches, id) matchesRemaining := len(r.matches) r.Unlock() r.matches.Delete(id) matchesRemaining := r.matchCount.Dec() r.tracker.UntrackByStream(stream) r.index.Delete(fmt.Sprintf("%v.%v", id.String(), r.node)) if err := r.index.Delete(fmt.Sprintf("%v.%v", id.String(), r.node)); err != nil { r.logger.Warn("Error removing match list index", zap.String("id", fmt.Sprintf("%v.%v", id.String(), r.node)), zap.Error(err)) } // If there are no more matches in this registry and a shutdown was initiated then signal // that the process is complete. Loading @@ -198,10 +205,10 @@ func (r *LocalMatchRegistry) RemoveMatch(id uuid.UUID, stream PresenceStream) { } func (r *LocalMatchRegistry) GetMatchLabel(ctx context.Context, id uuid.UUID, node string) (string, error) { query := bleve.NewDocIDQuery([]string{fmt.Sprintf("%v.%v", id.String(), node)}) search := bleve.NewSearchRequestOptions(query, 1, 0, false) search.Fields = []string{"label_string"} results, err := r.index.SearchInContext(ctx, search) q := bleve.NewDocIDQuery([]string{fmt.Sprintf("%v.%v", id.String(), node)}) searchReq := bleve.NewSearchRequestOptions(q, 1, 0, false) searchReq.Fields = []string{"label_string"} results, err := r.index.SearchInContext(ctx, searchReq) if err != nil { return "", fmt.Errorf("error getting match label: %v", err.Error()) } Loading @@ -224,7 +231,7 @@ func (r *LocalMatchRegistry) UpdateMatchLabel(id uuid.UUID, label string) error var labelJSON map[string]interface{} // Doesn't matter if this is not JSON. json.Unmarshal([]byte(label), &labelJSON) _ = json.Unmarshal([]byte(label), &labelJSON) return r.index.Index(fmt.Sprintf("%v.%v", id.String(), r.node), &MatchIndexEntry{ Node: r.node, Label: labelJSON, Loading @@ -232,15 +239,14 @@ func (r *LocalMatchRegistry) UpdateMatchLabel(id uuid.UUID, label string) error }) } func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authoritative *wrappers.BoolValue, label *wrappers.StringValue, minSize *wrappers.Int32Value, maxSize *wrappers.Int32Value, query *wrappers.StringValue) ([]*api.Match, error) { func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authoritative *wrappers.BoolValue, label *wrappers.StringValue, minSize *wrappers.Int32Value, maxSize *wrappers.Int32Value, queryString *wrappers.StringValue) ([]*api.Match, error) { if limit == 0 { return make([]*api.Match, 0), nil } var modes map[uint8]*uint8 var allowRelayed bool var labelResults *bleve.SearchResult var orderRequired bool if query != nil { if queryString != nil { if authoritative != nil && !authoritative.Value { // A filter on query is requested but authoritative matches are not allowed. return make([]*api.Match, 0), nil Loading @@ -249,31 +255,23 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori // If there are filters other than query, we don't know which matches will work so get more than the limit. count := limit if minSize != nil || maxSize != nil { c, err := r.index.DocCount() if err != nil { return nil, fmt.Errorf("error listing matches count: %v", err.Error()) } count = int(c) count = int(r.matchCount.Load()) } // Apply the query filter to the set of known match labels. queryString := query.Value if queryString == "" { queryString = "*" var q query.Query if queryString := queryString.Value; queryString == "" { q = bleve.NewMatchAllQuery() } else { q = bleve.NewQueryStringQuery(queryString) } indexQuery := bleve.NewQueryStringQuery(queryString) search := bleve.NewSearchRequestOptions(indexQuery, count, 0, false) search.Fields = []string{"label_string"} searchReq := bleve.NewSearchRequestOptions(q, count, 0, false) searchReq.Fields = []string{"label_string"} var err error labelResults, err = r.index.SearchInContext(ctx, search) labelResults, err = r.index.SearchInContext(ctx, searchReq) if err != nil { return nil, fmt.Errorf("error listing matches by query: %v", err.Error()) } // Because we have a query filter only authoritative matches are eligible. modes = MatchFilterAuthoritative // The query may contain boosting, in which case the order of results matters. orderRequired = true } else if label != nil { if authoritative != nil && !authoritative.Value { // A filter on label is requested but authoritative matches are not allowed. Loading @@ -283,51 +281,43 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori // If there are filters other than label, we don't know which matches will work so get more than the limit. count := limit if minSize != nil || maxSize != nil { c, err := r.index.DocCount() if err != nil { return nil, fmt.Errorf("error listing matches count: %v", err.Error()) } count = int(c) count = int(r.matchCount.Load()) } // Apply the label filter to the set of known match labels. indexQuery := bleve.NewMatchQuery(label.Value) indexQuery.SetField("label_string") search := bleve.NewSearchRequestOptions(indexQuery, int(count), 0, false) search.Fields = []string{"label_string"} searchReq := bleve.NewSearchRequestOptions(indexQuery, count, 0, false) searchReq.Fields = []string{"label_string"} var err error labelResults, err = r.index.SearchInContext(ctx, search) labelResults, err = r.index.SearchInContext(ctx, searchReq) if err != nil { return nil, fmt.Errorf("error listing matches by label: %v", err.Error()) } // Because we have a query filter only authoritative matches are eligible. modes = MatchFilterAuthoritative } else if authoritative == nil || authoritative.Value { // Not using label/query filter but we still need access to the indexed labels to return them // if authoritative matches may be included in the results. count, err := r.index.DocCount() if err != nil { return nil, fmt.Errorf("error listing matches count: %v", err.Error()) count := limit if minSize != nil || maxSize != nil { count = int(r.matchCount.Load()) } indexQuery := bleve.NewMatchAllQuery() search := bleve.NewSearchRequestOptions(indexQuery, int(count), 0, false) search.Fields = []string{"label_string"} labelResults, err = r.index.SearchInContext(ctx, search) searchReq := bleve.NewSearchRequestOptions(indexQuery, count, 0, false) searchReq.Fields = []string{"label_string"} var err error labelResults, err = r.index.SearchInContext(ctx, searchReq) if err != nil { return nil, fmt.Errorf("error listing matches by label: %v", err.Error()) } if authoritative == nil { // Expect a possible mix of authoritative and relayed matches. modes = MatchFilterAny } else { // Authoritative was strictly true even if there was no label/query filter. modes = MatchFilterAuthoritative allowRelayed = true } } else { // Authoritative was strictly false, and there was no label/query filter. modes = MatchFilterRelayed allowRelayed = true } if labelResults != nil && labelResults.Hits.Len() == 0 && authoritative != nil && !authoritative.Value { Loading @@ -335,22 +325,20 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori return make([]*api.Match, 0), nil } // There is a query which may contain boosted search terms, which means order of results matters. if orderRequired { // Look up tracker info to determine match sizes. // This info is needed even if there is no min/max size filter because it's returned in results. matches := r.tracker.CountByStreamModeFilter(modes) matchSizes := make(map[string]int32, len(matches)) for stream, size := range matches { matchSizes[fmt.Sprintf("%v.%v", stream.Subject.String(), stream.Label)] = size } // Results. results := make([]*api.Match, 0, limit) // Use any eligible authoritative matches first. if labelResults != nil { for _, hit := range labelResults.Hits { // Size may be 0. size := matchSizes[hit.ID] matchIDComponents := strings.SplitN(hit.ID, ".", 2) id := uuid.FromStringOrNil(matchIDComponents[0]) mh, ok := r.matches.Load(id) if !ok { continue } size := int32(mh.(*MatchHandler).PresenceList.Size()) if minSize != nil && minSize.Value > size { // Not eligible based on minimum size. Loading Loading @@ -383,51 +371,21 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori return results, nil } } // We're in the query case, non-authoritative matches are not eligible so return what we can. return results, nil } // It was not a query so ordering does not matter, move on to process the minimal set possible in any order. // Match labels will only be nil if there is no label filter, no query filter, and authoritative is strictly false. // Therefore authoritative matches will never be part of this listing at all. var matchLabels map[string]*wrappers.StringValue if labelResults != nil { matchLabels = make(map[string]*wrappers.StringValue, labelResults.Hits.Len()) for _, hit := range labelResults.Hits { if l, ok := hit.Fields["label_string"]; ok { if ls, ok := l.(string); ok { matchLabels[hit.ID] = &wrappers.StringValue{Value: ls} } } } // If relayed matches are not allowed still return any available results. if !allowRelayed { return results, nil } // Look up tracker info to determine match sizes. // This info is needed even if there is no min/max size filter because it's returned in results. matches := r.tracker.CountByStreamModeFilter(modes) // Results. results := make([]*api.Match, 0, limit) // Intersection of matches listed from stream and matches listed from label index, if any. matches := r.tracker.CountByStreamModeFilter(MatchFilterRelayed) for stream, size := range matches { if stream.Mode != StreamModeMatchRelayed && stream.Mode != StreamModeMatchAuthoritative { if stream.Mode != StreamModeMatchRelayed { // Only relayed matches are expected at this point. r.logger.Warn("Ignoring unknown stream mode in match listing operation", zap.Uint8("mode", stream.Mode)) continue } id := fmt.Sprintf("%v.%v", stream.Subject.String(), stream.Label) label, ok := matchLabels[id] if ok { delete(matchLabels, id) } else if matchLabels != nil && stream.Mode == StreamModeMatchAuthoritative { // Not eligible based on the label/query. continue } if minSize != nil && minSize.Value > size { // Not eligible based on minimum size. continue Loading @@ -439,8 +397,8 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori } results = append(results, &api.Match{ MatchId: id, Authoritative: stream.Mode == StreamModeMatchAuthoritative, MatchId: fmt.Sprintf("%v.%v", stream.Subject.String(), stream.Label), Authoritative: false, Label: label, Size: size, }) Loading @@ -449,24 +407,6 @@ func (r *LocalMatchRegistry) ListMatches(ctx context.Context, limit int, authori } } // Return incomplete results here if we're not allowed to return potentially empty authoritative matches. if (authoritative != nil && !authoritative.Value) || (minSize != nil && minSize.Value > 0) { return results, nil } // All we have left now are empty authoritative matches that we already know matched label/query filter if any. for id, label := range matchLabels { results = append(results, &api.Match{ MatchId: id, Authoritative: true, Label: label, Size: 0, }) if len(results) == limit { break } } return results, nil } Loading @@ -476,13 +416,12 @@ func (r *LocalMatchRegistry) Stop(graceSeconds int) chan struct{} { // Graceful shutdown not allowed/required, or grace period has expired. if graceSeconds == 0 { r.RLock() for id, mh := range r.matches { mh.Close() delete(r.matches, id) r.matches.Range(func(id, mh interface{}) bool { mh.(*MatchHandler).Close() r.matches.Delete(id) // No need to clean up label index. } r.RUnlock() return true }) // Termination was triggered and there are no active matches. select { case r.stoppedCh <- struct{}{}: Loading @@ -492,32 +431,29 @@ func (r *LocalMatchRegistry) Stop(graceSeconds int) chan struct{} { return r.stoppedCh } r.RLock() if len(r.matches) == 0 { var anyRunning bool r.matches.Range(func(id, mh interface{}) bool { anyRunning = true // Don't care if the call queue is full, match is supposed to end anyway. mh.(*MatchHandler).QueueTerminate(graceSeconds) return true }) if !anyRunning { // Termination was triggered and there are no active matches. select { case r.stoppedCh <- struct{}{}: default: // Ignore if the signal has already been sent. } r.RUnlock() return r.stoppedCh } for _, mh := range r.matches { // Don't care if the call queue is full, match is supposed to end anyway. mh.QueueTerminate(graceSeconds) } r.RUnlock() return r.stoppedCh } func (r *LocalMatchRegistry) Count() int { var count int r.RLock() count = len(r.matches) r.RUnlock() return count return int(r.matchCount.Load()) } func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string, metadata map[string]string) (bool, bool, string, string) { Loading @@ -525,14 +461,11 @@ func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node return false, false, "", "" } var mh *MatchHandler var ok bool r.RLock() mh, ok = r.matches[id] r.RUnlock() m, ok := r.matches.Load(id) if !ok { return false, false, "", "" } mh := m.(*MatchHandler) resultCh := make(chan *MatchJoinResult, 1) if !mh.QueueJoinAttempt(ctx, resultCh, userID, sessionID, username, fromNode, metadata) { Loading @@ -555,31 +488,23 @@ func (r *LocalMatchRegistry) JoinAttempt(ctx context.Context, id uuid.UUID, node } func (r *LocalMatchRegistry) Join(id uuid.UUID, presences []*MatchPresence) { var mh *MatchHandler var ok bool r.RLock() mh, ok = r.matches[id] r.RUnlock() mh, ok := r.matches.Load(id) if !ok { return } // Doesn't matter if the call queue was full here. If the match is being closed then joins don't matter anyway. mh.QueueJoin(presences, true) mh.(*MatchHandler).QueueJoin(presences, true) } func (r *LocalMatchRegistry) Leave(id uuid.UUID, presences []*MatchPresence) { var mh *MatchHandler var ok bool r.RLock() mh, ok = r.matches[id] r.RUnlock() mh, ok := r.matches.Load(id) if !ok { return } // Doesn't matter if the call queue was full here. If the match is being closed then leaves don't matter anyway. mh.QueueLeave(presences) mh.(*MatchHandler).QueueLeave(presences) } func (r *LocalMatchRegistry) Kick(stream PresenceStream, presences []*MatchPresence) { Loading @@ -596,16 +521,12 @@ func (r *LocalMatchRegistry) SendData(id uuid.UUID, node string, userID, session return } var mh *MatchHandler var ok bool r.RLock() mh, ok = r.matches[id] r.RUnlock() mh, ok := r.matches.Load(id) if !ok { return } mh.QueueData(&MatchDataMessage{ mh.(*MatchHandler).QueueData(&MatchDataMessage{ UserID: userID, SessionID: sessionID, Username: username, Loading