Loading CHANGELOG.md +2 −1 Original line number Diff line number Diff line Loading @@ -4,7 +4,8 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org). ## [Unreleased] ### Added - Publish new metric for presences count. ## [2.13.0] - 2020-08-31 ### Added Loading main.go +1 −1 Original line number Diff line number Diff line Loading @@ -119,7 +119,7 @@ func main() { metrics := server.NewMetrics(logger, startupLogger, config) matchmaker := server.NewLocalMatchmaker(startupLogger, config.GetName()) sessionRegistry := server.NewLocalSessionRegistry(metrics) tracker := server.StartLocalTracker(logger, config, sessionRegistry, jsonpbMarshaler) tracker := server.StartLocalTracker(logger, config, sessionRegistry, metrics, jsonpbMarshaler) router := server.NewLocalMessageRouter(sessionRegistry, tracker, jsonpbMarshaler) leaderboardCache := server.NewLocalLeaderboardCache(logger, startupLogger, db) leaderboardRankCache := server.NewLocalLeaderboardRankCache(startupLogger, db, config.GetLeaderboard(), leaderboardCache) Loading server/metrics.go +5 −0 Original line number Diff line number Diff line Loading @@ -247,3 +247,8 @@ func (m *Metrics) CountWebsocketClosed(delta int64) { func (m *Metrics) GaugeSessions(value float64) { m.prometheusScope.Gauge("sessions").Update(value) } // Set the absolute value of currently tracked presences. func (m *Metrics) GaugePresences(value float64) { m.prometheusScope.Gauge("presences").Update(value) } server/tracker.go +21 −12 Original line number Diff line number Diff line Loading @@ -17,15 +17,16 @@ package server import ( "bytes" "context" "github.com/golang/protobuf/proto" "sync" "fmt" "sync" "time" "github.com/gofrs/uuid" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/wrappers" "github.com/heroiclabs/nakama-common/rtapi" "go.uber.org/atomic" "go.uber.org/zap" ) Loading Loading @@ -159,27 +160,31 @@ type LocalTracker struct { matchJoinListener func(id uuid.UUID, leaves []*MatchPresence) matchLeaveListener func(id uuid.UUID, leaves []*MatchPresence) sessionRegistry SessionRegistry metrics *Metrics jsonpbMarshaler *jsonpb.Marshaler name string eventsCh chan *PresenceEvent presencesByStream map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta presencesBySession map[uuid.UUID]map[presenceCompact]PresenceMeta count *atomic.Int64 ctx context.Context ctxCancelFn context.CancelFunc } func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, jsonpbMarshaler *jsonpb.Marshaler) Tracker { func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, metrics *Metrics, jsonpbMarshaler *jsonpb.Marshaler) Tracker { ctx, ctxCancelFn := context.WithCancel(context.Background()) t := &LocalTracker{ logger: logger, sessionRegistry: sessionRegistry, metrics: metrics, jsonpbMarshaler: jsonpbMarshaler, name: config.GetName(), eventsCh: make(chan *PresenceEvent, config.GetTracker().EventQueueSize), presencesByStream: make(map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta), presencesBySession: make(map[uuid.UUID]map[presenceCompact]PresenceMeta), count: atomic.NewInt64(0), ctx: ctx, ctxCancelFn: ctxCancelFn, Loading @@ -193,6 +198,8 @@ func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry Sessio return case e := <-t.eventsCh: t.processEvent(e) case <-time.After(15 * time.Second): t.metrics.GaugePresences(float64(t.count.Load())) } } }() Loading Loading @@ -238,6 +245,7 @@ func (t *LocalTracker) Track(sessionID uuid.UUID, stream PresenceStream, userID bySession[pc] = meta t.presencesBySession[sessionID] = bySession } t.count.Inc() // Update tracking for stream. byStreamMode, ok := t.presencesByStream[stream.Mode] Loading Loading @@ -291,6 +299,7 @@ func (t *LocalTracker) Untrack(sessionID uuid.UUID, stream PresenceStream, userI // There were other presences for the session, drop just this one. delete(bySession, pc) } t.count.Dec() // Update the tracking for stream. if byStreamMode := t.presencesByStream[stream.Mode]; len(byStreamMode) == 1 { Loading Loading @@ -361,6 +370,8 @@ func (t *LocalTracker) UntrackAll(sessionID uuid.UUID) { if !meta.Hidden { leaves = append(leaves, Presence{ID: pc.ID, Stream: pc.Stream, UserID: pc.UserID, Meta: meta}) } t.count.Dec() } // Discard the tracking for session. delete(t.presencesBySession, sessionID) Loading Loading @@ -393,6 +404,9 @@ func (t *LocalTracker) Update(sessionID uuid.UUID, stream PresenceStream, userID // Update tracking for session, but capture any previous meta in case a leave event is required. previousMeta, alreadyTracked := bySession[pc] bySession[pc] = meta if !alreadyTracked { t.count.Inc() } // Update tracking for stream. byStreamMode, ok := t.presencesByStream[stream.Mode] Loading Loading @@ -453,6 +467,7 @@ func (t *LocalTracker) UntrackLocalByStream(stream PresenceStream) { // There were other presences for the session, drop just this one. delete(bySession, pc) } t.count.Dec() } // Discard the tracking for stream. Loading Loading @@ -487,6 +502,7 @@ func (t *LocalTracker) UntrackByStream(stream PresenceStream) { // There were other presences for the session, drop just this one. delete(bySession, pc) } t.count.Dec() } // Discard the tracking for stream. Loading Loading @@ -521,14 +537,7 @@ func (t *LocalTracker) StreamExists(stream PresenceStream) bool { } func (t *LocalTracker) Count() int { var count int t.RLock() // For each session add together their presence count. for _, bySession := range t.presencesBySession { count += len(bySession) } t.RUnlock() return count return int(t.count.Load()) } func (t *LocalTracker) CountByStream(stream PresenceStream) int { Loading Loading
CHANGELOG.md +2 −1 Original line number Diff line number Diff line Loading @@ -4,7 +4,8 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org). ## [Unreleased] ### Added - Publish new metric for presences count. ## [2.13.0] - 2020-08-31 ### Added Loading
main.go +1 −1 Original line number Diff line number Diff line Loading @@ -119,7 +119,7 @@ func main() { metrics := server.NewMetrics(logger, startupLogger, config) matchmaker := server.NewLocalMatchmaker(startupLogger, config.GetName()) sessionRegistry := server.NewLocalSessionRegistry(metrics) tracker := server.StartLocalTracker(logger, config, sessionRegistry, jsonpbMarshaler) tracker := server.StartLocalTracker(logger, config, sessionRegistry, metrics, jsonpbMarshaler) router := server.NewLocalMessageRouter(sessionRegistry, tracker, jsonpbMarshaler) leaderboardCache := server.NewLocalLeaderboardCache(logger, startupLogger, db) leaderboardRankCache := server.NewLocalLeaderboardRankCache(startupLogger, db, config.GetLeaderboard(), leaderboardCache) Loading
server/metrics.go +5 −0 Original line number Diff line number Diff line Loading @@ -247,3 +247,8 @@ func (m *Metrics) CountWebsocketClosed(delta int64) { func (m *Metrics) GaugeSessions(value float64) { m.prometheusScope.Gauge("sessions").Update(value) } // Set the absolute value of currently tracked presences. func (m *Metrics) GaugePresences(value float64) { m.prometheusScope.Gauge("presences").Update(value) }
server/tracker.go +21 −12 Original line number Diff line number Diff line Loading @@ -17,15 +17,16 @@ package server import ( "bytes" "context" "github.com/golang/protobuf/proto" "sync" "fmt" "sync" "time" "github.com/gofrs/uuid" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/wrappers" "github.com/heroiclabs/nakama-common/rtapi" "go.uber.org/atomic" "go.uber.org/zap" ) Loading Loading @@ -159,27 +160,31 @@ type LocalTracker struct { matchJoinListener func(id uuid.UUID, leaves []*MatchPresence) matchLeaveListener func(id uuid.UUID, leaves []*MatchPresence) sessionRegistry SessionRegistry metrics *Metrics jsonpbMarshaler *jsonpb.Marshaler name string eventsCh chan *PresenceEvent presencesByStream map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta presencesBySession map[uuid.UUID]map[presenceCompact]PresenceMeta count *atomic.Int64 ctx context.Context ctxCancelFn context.CancelFunc } func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, jsonpbMarshaler *jsonpb.Marshaler) Tracker { func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry SessionRegistry, metrics *Metrics, jsonpbMarshaler *jsonpb.Marshaler) Tracker { ctx, ctxCancelFn := context.WithCancel(context.Background()) t := &LocalTracker{ logger: logger, sessionRegistry: sessionRegistry, metrics: metrics, jsonpbMarshaler: jsonpbMarshaler, name: config.GetName(), eventsCh: make(chan *PresenceEvent, config.GetTracker().EventQueueSize), presencesByStream: make(map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta), presencesBySession: make(map[uuid.UUID]map[presenceCompact]PresenceMeta), count: atomic.NewInt64(0), ctx: ctx, ctxCancelFn: ctxCancelFn, Loading @@ -193,6 +198,8 @@ func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry Sessio return case e := <-t.eventsCh: t.processEvent(e) case <-time.After(15 * time.Second): t.metrics.GaugePresences(float64(t.count.Load())) } } }() Loading Loading @@ -238,6 +245,7 @@ func (t *LocalTracker) Track(sessionID uuid.UUID, stream PresenceStream, userID bySession[pc] = meta t.presencesBySession[sessionID] = bySession } t.count.Inc() // Update tracking for stream. byStreamMode, ok := t.presencesByStream[stream.Mode] Loading Loading @@ -291,6 +299,7 @@ func (t *LocalTracker) Untrack(sessionID uuid.UUID, stream PresenceStream, userI // There were other presences for the session, drop just this one. delete(bySession, pc) } t.count.Dec() // Update the tracking for stream. if byStreamMode := t.presencesByStream[stream.Mode]; len(byStreamMode) == 1 { Loading Loading @@ -361,6 +370,8 @@ func (t *LocalTracker) UntrackAll(sessionID uuid.UUID) { if !meta.Hidden { leaves = append(leaves, Presence{ID: pc.ID, Stream: pc.Stream, UserID: pc.UserID, Meta: meta}) } t.count.Dec() } // Discard the tracking for session. delete(t.presencesBySession, sessionID) Loading Loading @@ -393,6 +404,9 @@ func (t *LocalTracker) Update(sessionID uuid.UUID, stream PresenceStream, userID // Update tracking for session, but capture any previous meta in case a leave event is required. previousMeta, alreadyTracked := bySession[pc] bySession[pc] = meta if !alreadyTracked { t.count.Inc() } // Update tracking for stream. byStreamMode, ok := t.presencesByStream[stream.Mode] Loading Loading @@ -453,6 +467,7 @@ func (t *LocalTracker) UntrackLocalByStream(stream PresenceStream) { // There were other presences for the session, drop just this one. delete(bySession, pc) } t.count.Dec() } // Discard the tracking for stream. Loading Loading @@ -487,6 +502,7 @@ func (t *LocalTracker) UntrackByStream(stream PresenceStream) { // There were other presences for the session, drop just this one. delete(bySession, pc) } t.count.Dec() } // Discard the tracking for stream. Loading Loading @@ -521,14 +537,7 @@ func (t *LocalTracker) StreamExists(stream PresenceStream) bool { } func (t *LocalTracker) Count() int { var count int t.RLock() // For each session add together their presence count. for _, bySession := range t.presencesBySession { count += len(bySession) } t.RUnlock() return count return int(t.count.Load()) } func (t *LocalTracker) CountByStream(stream PresenceStream) int { Loading