Commit 3d88a1ed authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve handling of tracker multi-updates.

parent 96b3bc3d
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -199,7 +199,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		Persistence: incoming.Persistence == nil || incoming.Persistence.Value,
		Username:    session.Username(),
	}
	success, isNew := p.tracker.Track(session.ID(), stream, session.UserID(), meta, false)
	success, isNew := p.tracker.Track(session.Context(), session.ID(), stream, session.UserID(), meta, false)
	if !success {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
+3 −3
Original line number Diff line number Diff line
@@ -38,7 +38,7 @@ func (p *Pipeline) matchCreate(logger *zap.Logger, session Session, envelope *rt

	username := session.Username()

	if success, _ := p.tracker.Track(session.ID(), PresenceStream{Mode: StreamModeMatchRelayed, Subject: matchID}, session.UserID(), PresenceMeta{
	if success, _ := p.tracker.Track(session.Context(), session.ID(), PresenceStream{Mode: StreamModeMatchRelayed, Subject: matchID}, session.UserID(), PresenceMeta{
		Username: username,
		Format:   session.Format(),
	}, false); !success {
@@ -168,7 +168,7 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
				Username: username,
				Format:   session.Format(),
			}
			if success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), m, false); !success {
			if success, _ := p.tracker.Track(session.Context(), session.ID(), stream, session.UserID(), m, false); !success {
				// Presence creation was rejected due to `allowIfFirstForSession` flag, session is gone so no need to reply.
				return
			}
@@ -221,7 +221,7 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap
				Username: session.Username(),
				Format:   session.Format(),
			}
			p.tracker.Track(session.ID(), stream, session.UserID(), m, false)
			p.tracker.Track(session.Context(), session.ID(), stream, session.UserID(), m, false)
		}

		label = &wrappers.StringValue{Value: l}
+2 −2
Original line number Diff line number Diff line
@@ -45,7 +45,7 @@ func (p *Pipeline) partyCreate(logger *zap.Logger, session Session, envelope *rt
	}

	// If successful, the creator becomes the first user to join the party.
	success, _ := p.tracker.Track(session.ID(), ph.Stream, session.UserID(), PresenceMeta{
	success, _ := p.tracker.Track(session.Context(), session.ID(), ph.Stream, session.UserID(), PresenceMeta{
		Format:   session.Format(),
		Username: session.Username(),
		Status:   "",
@@ -106,7 +106,7 @@ func (p *Pipeline) partyJoin(logger *zap.Logger, session Session, envelope *rtap

	// If the party was open and the join was successful, track the new member immediately.
	if autoJoin {
		success, _ := p.tracker.Track(session.ID(), PresenceStream{Mode: StreamModeParty, Subject: partyID, Label: node}, session.UserID(), PresenceMeta{
		success, _ := p.tracker.Track(session.Context(), session.ID(), PresenceStream{Mode: StreamModeParty, Subject: partyID, Label: node}, session.UserID(), PresenceMeta{
			Format:   session.Format(),
			Username: session.Username(),
			Status:   "",
+21 −13
Original line number Diff line number Diff line
@@ -170,9 +170,14 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r

	// Follow all of the validated user IDs, and prepare a list of current presences to return.
	presences := make([]*rtapi.UserPresence, 0, len(followUserIDs))
	ops := make([]*TrackerOp, 0, len(followUserIDs))
	for userID := range followUserIDs {
		stream := PresenceStream{Mode: StreamModeStatus, Subject: userID}
		success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), PresenceMeta{Format: session.Format(), Username: session.Username(), Hidden: true}, false)
		ops = append(ops, &TrackerOp{
			Stream: PresenceStream{Mode: StreamModeStatus, Subject: userID},
			Meta:   PresenceMeta{Format: session.Format(), Username: session.Username(), Hidden: true},
		})
	}
	success := p.tracker.TrackMulti(session.Context(), session.ID(), ops, session.UserID(), false)
	if !success {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
@@ -180,8 +185,8 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r
		}}}, true)
		return
	}

		ps := p.tracker.ListByStream(stream, false, true)
	for _, op := range ops {
		ps := p.tracker.ListByStream(op.Stream, false, true)
		for _, p := range ps {
			presences = append(presences, &rtapi.UserPresence{
				UserId:    p.UserID.String(),
@@ -218,10 +223,13 @@ func (p *Pipeline) statusUnfollow(logger *zap.Logger, session Session, envelope
		userIDs = append(userIDs, userID)
	}

	streams := make([]*PresenceStream, 0, len(userIDs))
	for _, userID := range userIDs {
		p.tracker.Untrack(session.ID(), PresenceStream{Mode: StreamModeStatus, Subject: userID}, session.UserID())
		streams = append(streams, &PresenceStream{Mode: StreamModeStatus, Subject: userID})
	}

	p.tracker.UntrackMulti(session.ID(), streams, session.UserID())

	session.Send(&rtapi.Envelope{Cid: envelope.Cid}, true)
}

@@ -243,7 +251,7 @@ func (p *Pipeline) statusUpdate(logger *zap.Logger, session Session, envelope *r
		return
	}

	success := p.tracker.Update(session.ID(), PresenceStream{Mode: StreamModeStatus, Subject: session.UserID()}, session.UserID(), PresenceMeta{
	success := p.tracker.Update(session.Context(), session.ID(), PresenceStream{Mode: StreamModeStatus, Subject: session.UserID()}, session.UserID(), PresenceMeta{
		Format:   session.Format(),
		Username: session.Username(),
		Status:   incoming.Status.Value,
+10 −2
Original line number Diff line number Diff line
@@ -87,10 +87,18 @@ func NewSocketWsAcceptor(logger *zap.Logger, config Config, sessionRegistry Sess
		sessionRegistry.Add(session)

		// Register initial presences for this session.
		tracker.Track(session.ID(), PresenceStream{Mode: StreamModeNotifications, Subject: session.UserID()}, session.UserID(), PresenceMeta{Format: session.Format(), Username: session.Username(), Hidden: true}, true)
		ops := make([]*TrackerOp, 0, 2)
		ops = append(ops, &TrackerOp{
			Stream: PresenceStream{Mode: StreamModeNotifications, Subject: session.UserID()},
			Meta:   PresenceMeta{Format: session.Format(), Username: session.Username(), Hidden: true},
		})
		if status {
			tracker.Track(session.ID(), PresenceStream{Mode: StreamModeStatus, Subject: session.UserID()}, session.UserID(), PresenceMeta{Format: session.Format(), Username: session.Username(), Status: ""}, false)
			ops = append(ops, &TrackerOp{
				Stream: PresenceStream{Mode: StreamModeStatus, Subject: session.UserID()},
				Meta:   PresenceMeta{Format: session.Format(), Username: session.Username(), Status: ""},
			})
		}
		tracker.TrackMulti(session.Context(), session.ID(), ops, session.UserID(), true)

		// Allow the server to begin processing incoming messages from this session.
		session.Consume()
Loading