Commit 96b3bc3d authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve presence representation in memory.

parent ceabaa74
Loading
Loading
Loading
Loading
+54 −68
Original line number Diff line number Diff line
@@ -104,8 +104,8 @@ func (p *Presence) GetStatus() string {
}

type PresenceEvent struct {
	Joins  []Presence
	Leaves []Presence
	Joins  []*Presence
	Leaves []*Presence
}

type Tracker interface {
@@ -169,8 +169,8 @@ type LocalTracker struct {
	jsonpbMarshaler    *jsonpb.Marshaler
	name               string
	eventsCh           chan *PresenceEvent
	presencesByStream  map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta
	presencesBySession map[uuid.UUID]map[presenceCompact]PresenceMeta
	presencesByStream  map[uint8]map[PresenceStream]map[presenceCompact]*Presence
	presencesBySession map[uuid.UUID]map[presenceCompact]*Presence
	count              *atomic.Int64

	ctx         context.Context
@@ -187,8 +187,8 @@ func StartLocalTracker(logger *zap.Logger, config Config, sessionRegistry Sessio
		jsonpbMarshaler:    jsonpbMarshaler,
		name:               config.GetName(),
		eventsCh:           make(chan *PresenceEvent, config.GetTracker().EventQueueSize),
		presencesByStream:  make(map[uint8]map[PresenceStream]map[presenceCompact]PresenceMeta),
		presencesBySession: make(map[uuid.UUID]map[presenceCompact]PresenceMeta),
		presencesByStream:  make(map[uint8]map[PresenceStream]map[presenceCompact]*Presence),
		presencesBySession: make(map[uuid.UUID]map[presenceCompact]*Presence),
		count:              atomic.NewInt64(0),

		ctx:         ctx,
@@ -235,6 +235,7 @@ func (t *LocalTracker) Stop() {

func (t *LocalTracker) Track(sessionID uuid.UUID, stream PresenceStream, userID uuid.UUID, meta PresenceMeta, allowIfFirstForSession bool) (bool, bool) {
	pc := presenceCompact{ID: PresenceID{Node: t.name, SessionID: sessionID}, Stream: stream, UserID: userID}
	p := &Presence{ID: PresenceID{Node: t.name, SessionID: sessionID}, Stream: stream, UserID: userID, Meta: meta}
	t.Lock()

	// See if this session has any presences tracked at all.
@@ -242,7 +243,7 @@ func (t *LocalTracker) Track(sessionID uuid.UUID, stream PresenceStream, userID
		// Then see if the exact presence we need is tracked.
		if _, alreadyTracked := bySession[pc]; !alreadyTracked {
			// If the current session had others tracked, but not this presence.
			bySession[pc] = meta
			bySession[pc] = p
		} else {
			t.Unlock()
			return true, false
@@ -254,8 +255,8 @@ func (t *LocalTracker) Track(sessionID uuid.UUID, stream PresenceStream, userID
			return false, false
		}
		// If nothing at all was tracked for the current session, begin tracking.
		bySession = make(map[presenceCompact]PresenceMeta)
		bySession[pc] = meta
		bySession = make(map[presenceCompact]*Presence)
		bySession[pc] = p
		t.presencesBySession[sessionID] = bySession
	}
	t.count.Inc()
@@ -263,26 +264,21 @@ func (t *LocalTracker) Track(sessionID uuid.UUID, stream PresenceStream, userID
	// Update tracking for stream.
	byStreamMode, ok := t.presencesByStream[stream.Mode]
	if !ok {
		byStreamMode = make(map[PresenceStream]map[presenceCompact]PresenceMeta)
		byStreamMode = make(map[PresenceStream]map[presenceCompact]*Presence)
		t.presencesByStream[stream.Mode] = byStreamMode
	}

	if byStream, ok := byStreamMode[stream]; !ok {
		byStream = make(map[presenceCompact]PresenceMeta)
		byStream[pc] = meta
		byStream = make(map[presenceCompact]*Presence)
		byStream[pc] = p
		byStreamMode[stream] = byStream
	} else {
		byStream[pc] = meta
		byStream[pc] = p
	}

	t.Unlock()
	if !meta.Hidden {
		t.queueEvent(
			[]Presence{
				{ID: pc.ID, Stream: stream, UserID: userID, Meta: meta},
			},
			nil,
		)
		t.queueEvent([]*Presence{p}, nil)
	}
	return true, true
}
@@ -297,7 +293,7 @@ func (t *LocalTracker) Untrack(sessionID uuid.UUID, stream PresenceStream, userI
		t.Unlock()
		return
	}
	meta, found := bySession[pc]
	p, found := bySession[pc]
	if !found {
		// The session had other presences, but not for this stream.
		t.Unlock()
@@ -336,13 +332,8 @@ func (t *LocalTracker) Untrack(sessionID uuid.UUID, stream PresenceStream, userI
	}

	t.Unlock()
	if !meta.Hidden {
		t.queueEvent(
			nil,
			[]Presence{
				{ID: pc.ID, Stream: stream, UserID: userID, Meta: meta},
			},
		)
	if !p.Meta.Hidden {
		t.queueEvent(nil, []*Presence{p})
	}
}

@@ -356,8 +347,8 @@ func (t *LocalTracker) UntrackAll(sessionID uuid.UUID) {
		return
	}

	leaves := make([]Presence, 0, len(bySession))
	for pc, meta := range bySession {
	leaves := make([]*Presence, 0, len(bySession))
	for pc, p := range bySession {
		// Update the tracking for stream.
		if byStreamMode := t.presencesByStream[pc.Stream.Mode]; len(byStreamMode) == 1 {
			// This is the only stream for this stream mode.
@@ -380,8 +371,8 @@ func (t *LocalTracker) UntrackAll(sessionID uuid.UUID) {
		}

		// Check if there should be an event for this presence.
		if !meta.Hidden {
			leaves = append(leaves, Presence{ID: pc.ID, Stream: pc.Stream, UserID: pc.UserID, Meta: meta})
		if !p.Meta.Hidden {
			leaves = append(leaves, p)
		}

		t.count.Dec()
@@ -391,15 +382,13 @@ func (t *LocalTracker) UntrackAll(sessionID uuid.UUID) {

	t.Unlock()
	if len(leaves) != 0 {
		t.queueEvent(
			nil,
			leaves,
		)
		t.queueEvent(nil, leaves)
	}
}

func (t *LocalTracker) Update(sessionID uuid.UUID, stream PresenceStream, userID uuid.UUID, meta PresenceMeta, allowIfFirstForSession bool) bool {
	pc := presenceCompact{ID: PresenceID{Node: t.name, SessionID: sessionID}, Stream: stream, UserID: userID}
	p := &Presence{ID: PresenceID{Node: t.name, SessionID: sessionID}, Stream: stream, UserID: userID, Meta: meta}
	t.Lock()

	bySession, anyTracked := t.presencesBySession[sessionID]
@@ -410,13 +399,13 @@ func (t *LocalTracker) Update(sessionID uuid.UUID, stream PresenceStream, userID
			return false
		}

		bySession = make(map[presenceCompact]PresenceMeta)
		bySession = make(map[presenceCompact]*Presence)
		t.presencesBySession[sessionID] = bySession
	}

	// Update tracking for session, but capture any previous meta in case a leave event is required.
	previousMeta, alreadyTracked := bySession[pc]
	bySession[pc] = meta
	previousP, alreadyTracked := bySession[pc]
	bySession[pc] = p
	if !alreadyTracked {
		t.count.Inc()
	}
@@ -424,38 +413,31 @@ func (t *LocalTracker) Update(sessionID uuid.UUID, stream PresenceStream, userID
	// Update tracking for stream.
	byStreamMode, ok := t.presencesByStream[stream.Mode]
	if !ok {
		byStreamMode = make(map[PresenceStream]map[presenceCompact]PresenceMeta)
		byStreamMode = make(map[PresenceStream]map[presenceCompact]*Presence)
		t.presencesByStream[stream.Mode] = byStreamMode
	}

	if byStream, ok := byStreamMode[stream]; !ok {
		byStream = make(map[presenceCompact]PresenceMeta)
		byStream[pc] = meta
		byStream = make(map[presenceCompact]*Presence)
		byStream[pc] = p
		byStreamMode[stream] = byStream
	} else {
		byStream[pc] = meta
		byStream[pc] = p
	}

	t.Unlock()

	if !meta.Hidden || (alreadyTracked && !previousMeta.Hidden) {
		var joins []Presence
	if !meta.Hidden || (alreadyTracked && !previousP.Meta.Hidden) {
		var joins []*Presence
		if !meta.Hidden {
			joins = []Presence{
				{ID: pc.ID, Stream: stream, UserID: userID, Meta: meta},
			}
		}
		var leaves []Presence
		if alreadyTracked && !previousMeta.Hidden {
			leaves = []Presence{
				{ID: pc.ID, Stream: stream, UserID: userID, Meta: previousMeta},
			joins = []*Presence{p}
		}
		var leaves []*Presence
		if alreadyTracked && !previousP.Meta.Hidden {
			leaves = []*Presence{previousP}
		}
		// Guaranteed joins and/or leaves are not empty or we wouldn't be inside this block.
		t.queueEvent(
			joins,
			leaves,
		)
		t.queueEvent(joins, leaves)
	}
	return true
}
@@ -589,12 +571,12 @@ func (t *LocalTracker) GetLocalBySessionIDStreamUserID(sessionID uuid.UUID, stre
		t.RUnlock()
		return nil
	}
	meta, found := bySession[pc]
	p, found := bySession[pc]
	t.RUnlock()
	if !found {
		return nil
	}
	return &meta
	return &p.Meta
}

func (t *LocalTracker) GetBySessionIDStreamUserID(node string, sessionID uuid.UUID, stream PresenceStream, userID uuid.UUID) *PresenceMeta {
@@ -606,15 +588,19 @@ func (t *LocalTracker) GetBySessionIDStreamUserID(node string, sessionID uuid.UU
		t.RUnlock()
		return nil
	}
	meta, found := bySession[pc]
	p, found := bySession[pc]
	t.RUnlock()
	if !found {
		return nil
	}
	return &meta
	return &p.Meta
}

func (t *LocalTracker) ListByStream(stream PresenceStream, includeHidden bool, includeNotHidden bool) []*Presence {
	if !includeHidden && !includeNotHidden {
		return []*Presence{}
	}

	t.RLock()
	byStream, anyTracked := t.presencesByStream[stream.Mode][stream]
	if !anyTracked {
@@ -622,9 +608,9 @@ func (t *LocalTracker) ListByStream(stream PresenceStream, includeHidden bool, i
		return []*Presence{}
	}
	ps := make([]*Presence, 0, len(byStream))
	for pc, meta := range byStream {
		if (meta.Hidden && includeHidden) || (!meta.Hidden && includeNotHidden) {
			ps = append(ps, &Presence{ID: pc.ID, Stream: stream, UserID: pc.UserID, Meta: meta})
	for _, p := range byStream {
		if (p.Meta.Hidden && includeHidden) || (!p.Meta.Hidden && includeNotHidden) {
			ps = append(ps, p)
		}
	}
	t.RUnlock()
@@ -662,7 +648,7 @@ func (t *LocalTracker) ListPresenceIDByStream(stream PresenceStream) []*Presence
	return ps
}

func (t *LocalTracker) queueEvent(joins, leaves []Presence) {
func (t *LocalTracker) queueEvent(joins, leaves []*Presence) {
	select {
	case t.eventsCh <- &PresenceEvent{Joins: joins, Leaves: leaves}:
		// Event queued for asynchronous dispatch.
@@ -733,9 +719,9 @@ func (t *LocalTracker) processEvent(e *PresenceEvent) {
		if p.Stream.Mode == StreamModeParty && p.Stream.Label == t.name {
			c := p
			if j, ok := partyJoins[p.Stream.Subject]; ok {
				partyJoins[p.Stream.Subject] = append(j, &c)
				partyJoins[p.Stream.Subject] = append(j, c)
			} else {
				partyJoins[p.Stream.Subject] = []*Presence{&c}
				partyJoins[p.Stream.Subject] = []*Presence{c}
			}
		}
	}
@@ -775,9 +761,9 @@ func (t *LocalTracker) processEvent(e *PresenceEvent) {
		if p.Stream.Mode == StreamModeParty && p.Stream.Label == t.name {
			c := p
			if l, ok := partyLeaves[p.Stream.Subject]; ok {
				partyLeaves[p.Stream.Subject] = append(l, &c)
				partyLeaves[p.Stream.Subject] = append(l, c)
			} else {
				partyLeaves[p.Stream.Subject] = []*Presence{&c}
				partyLeaves[p.Stream.Subject] = []*Presence{c}
			}
		}
	}