diff --git a/CHANGELOG.md b/CHANGELOG.md index 39d97ddc75b3fef95f6bfe813ce719ec890e6aaf..7f4b25b47a5aaa79eddd038d4f990d8337fb0e29 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - Improved delivery of non-persistent SendAll notifications to large numbers of users. - Truncate stats reported to devconsole status view to 2 decimal digits for improved readability. - Memory usage and population time improvements in leaderboard rank cache. +- Better handling of internal transaction retries. +- Better handling of party membership when interacting with matchmaking. ### Fixed - Correct cursor usage in group listings using only open/closed group state filter. @@ -21,6 +23,8 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - Remove incorrect category start and category end parameters from runtime leaderboard list functions. - Graceful handling of idempotent tournament creation operations. - Correct sorting of batched storage write and delete operations. +- Fix indexing of channel message list responses in Lua runtime. +- Better handling of parameters submitted from the devconsole UI. ## [3.16.0] - 2023-04-18 ### Added diff --git a/server/match_presence.go b/server/match_presence.go index 29ebbab91f65bcf493d48aa1f229f8927a0768ba..82ac0be72167326da7c64bd45454026f1b9ea993 100644 --- a/server/match_presence.go +++ b/server/match_presence.go @@ -22,6 +22,8 @@ import ( "go.uber.org/atomic" ) +var _ runtime.Presence = &MatchPresence{} + // Represents routing and identify information for a single match participant. type MatchPresence struct { Node string diff --git a/server/party_handler.go b/server/party_handler.go index 1cd20346be67a9033d6923ac0b1f26346d794eac..ff5abfbddfb47b84211c460ff18b9b34a0396448 100644 --- a/server/party_handler.go +++ b/server/party_handler.go @@ -25,6 +25,16 @@ import ( "go.uber.org/zap" ) +type PartyJoinRequest struct { + Presence *Presence + UserPresence *rtapi.UserPresence +} + +type PartyLeader struct { + PresenceID *PresenceID + UserPresence *rtapi.UserPresence +} + type PartyHandler struct { sync.RWMutex logger *zap.Logger @@ -41,17 +51,14 @@ type PartyHandler struct { MaxSize int Stream PresenceStream - stopped bool - ctx context.Context - ctxCancelFn context.CancelFunc - expectedInitialLeader *rtapi.UserPresence - leader *PresenceID - leaderUserPresence *rtapi.UserPresence - members []*PresenceID - memberUserPresences []*rtapi.UserPresence - joinsInProgress []*PresenceID - joinRequests []*Presence - joinRequestUserPresences []*rtapi.UserPresence + stopped bool + ctx context.Context + ctxCancelFn context.CancelFunc + expectedInitialLeader *rtapi.UserPresence + leader *PartyLeader + joinRequests []*PartyJoinRequest + + members *PartyPresenceList } func NewPartyHandler(logger *zap.Logger, partyRegistry PartyRegistry, matchmaker Matchmaker, tracker Tracker, streamManager StreamManager, router MessageRouter, id uuid.UUID, node string, open bool, maxSize int, presence *rtapi.UserPresence) *PartyHandler { @@ -72,17 +79,14 @@ func NewPartyHandler(logger *zap.Logger, partyRegistry PartyRegistry, matchmaker MaxSize: maxSize, Stream: PresenceStream{Mode: StreamModeParty, Subject: id, Label: node}, - stopped: false, - ctx: ctx, - ctxCancelFn: ctxCancelFn, - expectedInitialLeader: presence, - leader: nil, - leaderUserPresence: nil, - members: make([]*PresenceID, 0, maxSize), - memberUserPresences: make([]*rtapi.UserPresence, 0, maxSize), - joinsInProgress: make([]*PresenceID, 0, maxSize), - joinRequests: make([]*Presence, 0, maxSize), - joinRequestUserPresences: make([]*rtapi.UserPresence, 0, maxSize), + stopped: false, + ctx: ctx, + ctxCancelFn: ctxCancelFn, + expectedInitialLeader: presence, + leader: nil, + joinRequests: make([]*PartyJoinRequest, 0, maxSize), + + members: NewPartyPresenceList(maxSize), } } @@ -101,14 +105,19 @@ func (p *PartyHandler) JoinRequest(presence *Presence) (bool, error) { } // Check if party is full. - if len(p.members)+len(p.joinsInProgress) >= p.MaxSize { + if p.members.Size() >= p.MaxSize { p.Unlock() return false, runtime.ErrPartyFull } // Check if party is open, and therefore automatically accepts join requests. if p.Open { - p.joinsInProgress = append(p.joinsInProgress, &presence.ID) + _, err := p.members.Join([]*Presence{presence}) p.Unlock() + if err != nil { + return false, err + } + // The party membership has changed, stop any ongoing matchmaking processes. + _ = p.matchmaker.RemovePartyAll(p.IDStr) return true, nil } // Check if party has room for more join requests. @@ -117,13 +126,15 @@ func (p *PartyHandler) JoinRequest(presence *Presence) (bool, error) { return false, runtime.ErrPartyJoinRequestsFull } - p.joinRequests = append(p.joinRequests, presence) - joinRequestUserPresence := &rtapi.UserPresence{ - UserId: presence.GetUserId(), - SessionId: presence.GetSessionId(), - Username: presence.GetUsername(), + joinRequest := &PartyJoinRequest{ + Presence: presence, + UserPresence: &rtapi.UserPresence{ + UserId: presence.GetUserId(), + SessionId: presence.GetSessionId(), + Username: presence.GetUsername(), + }, } - p.joinRequestUserPresences = append(p.joinRequestUserPresences, joinRequestUserPresence) + p.joinRequests = append(p.joinRequests, joinRequest) leader := p.leader p.Unlock() @@ -133,11 +144,11 @@ func (p *PartyHandler) JoinRequest(presence *Presence) (bool, error) { Message: &rtapi.Envelope_PartyJoinRequest{ PartyJoinRequest: &rtapi.PartyJoinRequest{ PartyId: p.IDStr, - Presences: []*rtapi.UserPresence{joinRequestUserPresence}, + Presences: []*rtapi.UserPresence{joinRequest.UserPresence}, }, }, } - p.router.SendToPresenceIDs(p.logger, []*PresenceID{leader}, envelope, true) + p.router.SendToPresenceIDs(p.logger, []*PresenceID{leader.PresenceID}, envelope, true) } return false, nil @@ -164,11 +175,13 @@ func (p *PartyHandler) Join(presences []*Presence) { if presence.GetUserId() == expectedInitialLeader.UserId && presence.GetSessionId() == expectedInitialLeader.SessionId { // The initial leader is joining the party at creation time. initialLeader = presence - p.leader = &presence.ID - p.leaderUserPresence = &rtapi.UserPresence{ - UserId: presence.GetUserId(), - SessionId: presence.GetSessionId(), - Username: presence.GetUsername(), + p.leader = &PartyLeader{ + PresenceID: &presence.ID, + UserPresence: &rtapi.UserPresence{ + UserId: presence.GetUserId(), + SessionId: presence.GetSessionId(), + Username: presence.GetUsername(), + }, } break } @@ -177,17 +190,24 @@ func (p *PartyHandler) Join(presences []*Presence) { if initialLeader == nil { // If the expected initial leader was not assigned, select the first joiner. Also // covers the party leader leaving at some point during the lifecycle of the party. - p.leader = &presences[0].ID - p.leaderUserPresence = &rtapi.UserPresence{ - UserId: presences[0].GetUserId(), - SessionId: presences[0].GetSessionId(), - Username: presences[0].GetUsername(), + p.leader = &PartyLeader{ + PresenceID: &presences[0].ID, + UserPresence: &rtapi.UserPresence{ + UserId: presences[0].GetUserId(), + SessionId: presences[0].GetSessionId(), + Username: presences[0].GetUsername(), + }, } } } - memberUserPresences := make([]*rtapi.UserPresence, len(p.memberUserPresences), len(p.memberUserPresences)+len(presences)) - copy(memberUserPresences, p.memberUserPresences) + _, err := p.members.Join(presences) + if err != nil { + p.Unlock() + // Should not happen, this process is just a confirmation. + p.logger.Error("error in party join", zap.Error(err)) + return + } presenceIDs := make(map[*PresenceID]*rtapi.Envelope, len(presences)) for _, presence := range presences { @@ -197,18 +217,6 @@ func (p *PartyHandler) Join(presences []*Presence) { SessionId: presence.GetSessionId(), Username: presence.GetUsername(), } - p.members = append(p.members, ¤tPresence.ID) - p.memberUserPresences = append(p.memberUserPresences, memberUserPresence) - memberUserPresences = append(memberUserPresences, memberUserPresence) - - for i := 0; i < len(p.joinsInProgress); i++ { - if p.joinsInProgress[i].SessionID == presence.ID.SessionID && p.joinsInProgress[i].Node == presence.ID.Node { - copy(p.joinsInProgress[i:], p.joinsInProgress[i+1:]) - p.joinsInProgress[len(p.joinsInProgress)-1] = nil - p.joinsInProgress = p.joinsInProgress[:len(p.joinsInProgress)-1] - break - } - } // Prepare message to be sent to the new presences. if initialLeader != nil && presence == initialLeader { @@ -222,22 +230,27 @@ func (p *PartyHandler) Join(presences []*Presence) { Open: p.Open, MaxSize: int32(p.MaxSize), Self: memberUserPresence, - Leader: p.leaderUserPresence, + Leader: p.leader.UserPresence, // Presences assigned below. }, }, } } + members := p.members.List() + p.Unlock() + memberUserPresences := make([]*rtapi.UserPresence, 0, len(members)) + for _, member := range members { + memberUserPresences = append(memberUserPresences, member.UserPresence) + } + // Send party info to the new joiners. for presenceID, envelope := range presenceIDs { envelope.GetParty().Presences = memberUserPresences p.router.SendToPresenceIDs(p.logger, []*PresenceID{presenceID}, envelope, true) } - // The party membership has changed, stop any ongoing matchmaking processes. - _ = p.matchmaker.RemovePartyAll(p.IDStr) } func (p *PartyHandler) Leave(presences []*Presence) { @@ -251,59 +264,49 @@ func (p *PartyHandler) Leave(presences []*Presence) { return } - // Drop each presence from the party list, and remove the leader if they've left. + presences, _ = p.members.Leave(presences) + if len(presences) == 0 { + p.Unlock() + return + } + + // Remove the leader if they've left. for _, presence := range presences { - if p.leader != nil && p.leader.SessionID == presence.ID.SessionID && p.leader.Node == presence.ID.Node { + if p.leader != nil && p.leader.PresenceID.SessionID == presence.ID.SessionID && p.leader.PresenceID.Node == presence.ID.Node { // Check is only meaningful if a leader exists. Leader may temporarily be nil here until a new // one is assigned below, when multiple presences leave concurrently and one was just the leader. p.leader = nil - p.leaderUserPresence = nil - } - for i := 0; i < len(p.members); i++ { - if p.members[i].SessionID == presence.ID.SessionID && p.members[i].Node == presence.ID.Node { - copy(p.members[i:], p.members[i+1:]) - p.members[len(p.members)-1] = nil - p.members = p.members[:len(p.members)-1] - - copy(p.memberUserPresences[i:], p.memberUserPresences[i+1:]) - p.memberUserPresences[len(p.memberUserPresences)-1] = nil - p.memberUserPresences = p.memberUserPresences[:len(p.memberUserPresences)-1] - break + oldestPresenceID, oldestUserPresence := p.members.Oldest() + if oldestPresenceID == nil || oldestUserPresence == nil { + // Party is now empty, close it. + p.stopped = true + p.Unlock() + p.stop() + return } - } - } - - // If the leader has left try to assign a new one from the remaining presences. - var envelope *rtapi.Envelope - if p.leader == nil { - // Party is now empty, close it. - if len(p.members) == 0 { - p.stopped = true - p.Unlock() - p.stop() - return - } - // Leader has left, but there are other party members. Promote the oldest presence as the new party leader. - p.leader = p.members[0] - p.leaderUserPresence = p.memberUserPresences[0] + // Leader has left, but there are other party members. Promote the oldest presence as the new party leader. + p.leader = &PartyLeader{ + PresenceID: oldestPresenceID, + UserPresence: oldestUserPresence, + } - envelope = &rtapi.Envelope{ - Message: &rtapi.Envelope_PartyLeader{ - PartyLeader: &rtapi.PartyLeader{ - PartyId: p.IDStr, - Presence: p.leaderUserPresence, + // Send any new leader promotion message to party members. + p.router.SendToStream(p.logger, p.Stream, &rtapi.Envelope{ + Message: &rtapi.Envelope_PartyLeader{ + PartyLeader: &rtapi.PartyLeader{ + PartyId: p.IDStr, + Presence: p.leader.UserPresence, + }, }, - }, + }, true) + + break } } p.Unlock() - // Send any new leader promotion message to party members. - if envelope != nil { - p.router.SendToStream(p.logger, p.Stream, envelope, true) - } // The party membership has changed, stop any ongoing matchmaking processes. _ = p.matchmaker.RemovePartyAll(p.IDStr) } @@ -316,23 +319,27 @@ func (p *PartyHandler) Promote(sessionID, node string, presence *rtapi.UserPrese } // Only the party leader may promote. - if p.leader == nil || p.leader.SessionID.String() != sessionID || p.leader.Node != node { + if p.leader == nil || p.leader.PresenceID.SessionID.String() != sessionID || p.leader.PresenceID.Node != node { p.Unlock() return runtime.ErrPartyNotLeader } + members := p.members.List() + var envelope *rtapi.Envelope - for i, memberUserPresence := range p.memberUserPresences { - if memberUserPresence.SessionId == presence.SessionId && memberUserPresence.UserId == presence.UserId && memberUserPresence.Username == presence.Username { + for _, member := range members { + if member.UserPresence.SessionId == presence.SessionId && member.UserPresence.UserId == presence.UserId && member.UserPresence.Username == presence.Username { // Found the party member being promoted. - p.leader = p.members[i] - p.leaderUserPresence = memberUserPresence + p.leader = &PartyLeader{ + PresenceID: member.PresenceID, + UserPresence: member.UserPresence, + } envelope = &rtapi.Envelope{ Message: &rtapi.Envelope_PartyLeader{ PartyLeader: &rtapi.PartyLeader{ PartyId: p.IDStr, - Presence: p.leaderUserPresence, + Presence: p.leader.UserPresence, }, }, } @@ -361,31 +368,24 @@ func (p *PartyHandler) Accept(sessionID, node string, presence *rtapi.UserPresen } // Only the party leader may promote. - if p.leader == nil || p.leader.SessionID.String() != sessionID || p.leader.Node != node { + if p.leader == nil || p.leader.PresenceID.SessionID.String() != sessionID || p.leader.PresenceID.Node != node { p.Unlock() return runtime.ErrPartyNotLeader } // Check if there's room to accept the new party member. - if len(p.members)+len(p.joinsInProgress) >= p.MaxSize { + if p.members.Size() >= p.MaxSize { p.Unlock() return runtime.ErrPartyFull } // Check if the presence has actually requested to join. + var idx int var joinRequestPresence *Presence for i, joinRequest := range p.joinRequests { - if joinRequest.ID.SessionID.String() == presence.SessionId && joinRequest.UserID.String() == presence.UserId && joinRequest.GetUsername() == presence.Username { - joinRequestPresence = joinRequest - - copy(p.joinRequests[i:], p.joinRequests[i+1:]) - p.joinRequests[len(p.joinRequests)-1] = nil - p.joinRequests = p.joinRequests[:len(p.joinRequests)-1] - - copy(p.joinRequestUserPresences[i:], p.joinRequestUserPresences[i+1:]) - p.joinRequestUserPresences[len(p.joinRequestUserPresences)-1] = nil - p.joinRequestUserPresences = p.joinRequestUserPresences[:len(p.joinRequestUserPresences)-1] - + if joinRequest.UserPresence.SessionId == presence.SessionId && joinRequest.UserPresence.UserId == presence.UserId && joinRequest.UserPresence.Username == presence.Username { + idx = i + joinRequestPresence = joinRequest.Presence break } } @@ -394,24 +394,29 @@ func (p *PartyHandler) Accept(sessionID, node string, presence *rtapi.UserPresen return runtime.ErrPartyNotRequest } - p.joinsInProgress = append(p.joinsInProgress, &joinRequestPresence.ID) + if err := p.members.Reserve(joinRequestPresence); err != nil { + p.Unlock() + return err + } + + copy(p.joinRequests[idx:], p.joinRequests[idx+1:]) + p.joinRequests[len(p.joinRequests)-1] = nil + p.joinRequests = p.joinRequests[:len(p.joinRequests)-1] + p.Unlock() // Add the presence to the party stream, which will trigger the Join() hook above. success, _, err := p.streamManager.UserJoin(p.Stream, joinRequestPresence.UserID, joinRequestPresence.ID.SessionID, false, false, "") if err != nil || !success { - p.Lock() - for i := 0; i < len(p.joinsInProgress); i++ { - if p.joinsInProgress[i].SessionID == joinRequestPresence.ID.SessionID && p.joinsInProgress[i].Node == joinRequestPresence.ID.Node { - copy(p.joinsInProgress[i:], p.joinsInProgress[i+1:]) - p.joinsInProgress[len(p.joinsInProgress)-1] = nil - p.joinsInProgress = p.joinsInProgress[:len(p.joinsInProgress)-1] - break - } - } - p.Unlock() + p.members.Release(joinRequestPresence) return runtime.ErrPartyAcceptRequest } + if _, err = p.members.Join([]*Presence{joinRequestPresence}); err != nil { + return err + } + + // The party membership has changed, stop any ongoing matchmaking processes. + _ = p.matchmaker.RemovePartyAll(p.IDStr) return nil } @@ -424,49 +429,36 @@ func (p *PartyHandler) Remove(sessionID, node string, presence *rtapi.UserPresen } // Only the party leader may remove. - if p.leader == nil || p.leader.SessionID.String() != sessionID || p.leader.Node != node { + if p.leader == nil || p.leader.PresenceID.SessionID.String() != sessionID || p.leader.PresenceID.Node != node { p.Unlock() return runtime.ErrPartyNotLeader } // Check if the leader is attempting to remove its own presence. - if p.leader.SessionID.String() == presence.SessionId && p.leaderUserPresence.GetUserId() == presence.UserId && p.leaderUserPresence.GetUsername() == presence.Username { + if p.leader.UserPresence.SessionId == presence.SessionId && p.leader.UserPresence.UserId == presence.UserId && p.leader.UserPresence.Username == presence.Username { p.Unlock() return runtime.ErrPartyRemoveSelf } - // Remove the party member, if found. - var removeMember *rtapi.UserPresence - var removePresenceID *PresenceID - for i, memberUserPresence := range p.memberUserPresences { - if memberUserPresence.SessionId == presence.SessionId && memberUserPresence.UserId == presence.UserId && memberUserPresence.Username == presence.Username { - removeMember = memberUserPresence - removePresenceID = p.members[i] - - copy(p.memberUserPresences[i:], p.memberUserPresences[i+1:]) - p.memberUserPresences[len(p.memberUserPresences)-1] = nil - p.memberUserPresences = p.memberUserPresences[:len(p.memberUserPresences)-1] - - copy(p.members[i:], p.members[i+1:]) - p.members[len(p.members)-1] = nil - p.members = p.members[:len(p.members)-1] + presences := p.members.List() + // Remove the party member, if found. + var removeMember *PartyPresenceListItem + for _, item := range presences { + if item.UserPresence.SessionId == presence.SessionId && item.UserPresence.UserId == presence.UserId && item.UserPresence.Username == presence.Username { + removeMember = item break } } if removeMember == nil { // Wasn't a party member, check if it's actually a rejected join request. for i, joinRequest := range p.joinRequests { - if joinRequest.ID.SessionID.String() == presence.SessionId && joinRequest.UserID.String() == presence.UserId && joinRequest.GetUsername() == presence.Username { + if joinRequest.UserPresence.SessionId == presence.SessionId && joinRequest.UserPresence.UserId == presence.UserId && joinRequest.UserPresence.Username == presence.Username { // Rejected join requests do not require stream removal, they were never part of the stream to begin with. copy(p.joinRequests[i:], p.joinRequests[i+1:]) p.joinRequests[len(p.joinRequests)-1] = nil p.joinRequests = p.joinRequests[:len(p.joinRequests)-1] - copy(p.joinRequestUserPresences[i:], p.joinRequestUserPresences[i+1:]) - p.joinRequestUserPresences[len(p.joinRequestUserPresences)-1] = nil - p.joinRequestUserPresences = p.joinRequestUserPresences[:len(p.joinRequestUserPresences)-1] - p.Unlock() return nil } @@ -479,13 +471,18 @@ func (p *PartyHandler) Remove(sessionID, node string, presence *rtapi.UserPresen return runtime.ErrPartyNotMember } + // The party membership has changed, stop any ongoing matchmaking processes. + _ = p.matchmaker.RemovePartyAll(p.IDStr) + + p.members.Leave([]*Presence{removeMember.Presence}) + // Remove the presence from the party stream, which will trigger the Leave() hook above. - err := p.streamManager.UserLeave(p.Stream, uuid.FromStringOrNil(removeMember.UserId), uuid.FromStringOrNil(removeMember.SessionId)) + err := p.streamManager.UserLeave(p.Stream, removeMember.Presence.UserID, removeMember.PresenceID.SessionID) if err != nil { return runtime.ErrPartyRemove } - p.router.SendToPresenceIDs(p.logger, []*PresenceID{removePresenceID}, &rtapi.Envelope{Message: &rtapi.Envelope_PartyClose{PartyClose: &rtapi.PartyClose{ + p.router.SendToPresenceIDs(p.logger, []*PresenceID{removeMember.PresenceID}, &rtapi.Envelope{Message: &rtapi.Envelope_PartyClose{PartyClose: &rtapi.PartyClose{ PartyId: p.IDStr, }}}, true) @@ -500,7 +497,7 @@ func (p *PartyHandler) Close(sessionID, node string) error { } // Only the party leader may close the party. - if p.leader == nil || p.leader.SessionID.String() != sessionID || p.leader.Node != node { + if p.leader == nil || p.leader.UserPresence.SessionId != sessionID || p.leader.PresenceID.Node != node { p.Unlock() return runtime.ErrPartyNotLeader } @@ -524,13 +521,15 @@ func (p *PartyHandler) JoinRequestList(sessionID, node string) ([]*rtapi.UserPre } // Only the party leader may request a list of pending join requests. - if p.leader == nil || p.leader.SessionID.String() != sessionID || p.leader.Node != node { + if p.leader == nil || p.leader.UserPresence.SessionId != sessionID || p.leader.PresenceID.Node != node { p.RUnlock() return nil, runtime.ErrPartyNotLeader } - joinRequestUserPresences := make([]*rtapi.UserPresence, len(p.joinRequestUserPresences)) - copy(joinRequestUserPresences, p.joinRequestUserPresences) + joinRequestUserPresences := make([]*rtapi.UserPresence, 0, len(p.joinRequests)) + for _, joinRequest := range p.joinRequests { + joinRequestUserPresences = append(joinRequestUserPresences, joinRequest.UserPresence) + } p.RUnlock() @@ -545,27 +544,29 @@ func (p *PartyHandler) MatchmakerAdd(sessionID, node, query string, minCount, ma } // Only the party leader may start a matchmaking process. - if p.leader == nil || p.leader.SessionID.String() != sessionID || p.leader.Node != node { + if p.leader == nil || p.leader.UserPresence.SessionId != sessionID || p.leader.PresenceID.Node != node { p.RUnlock() return "", nil, runtime.ErrPartyNotLeader } + members := p.members.List() + // Prepare the list of presences that will go into the matchmaker as part of the party. - presences := make([]*MatchmakerPresence, 0, len(p.members)) - memberPresenceIDs := make([]*PresenceID, 0, len(p.members)-1) - for i, member := range p.members { - memberUserPresence := p.memberUserPresences[i] + presences := make([]*MatchmakerPresence, 0, len(members)) + memberPresenceIDs := make([]*PresenceID, 0, len(members)-1) + for _, member := range members { + memberUserPresence := member.UserPresence presences = append(presences, &MatchmakerPresence{ UserId: memberUserPresence.UserId, SessionId: memberUserPresence.SessionId, Username: memberUserPresence.Username, - Node: member.Node, - SessionID: member.SessionID, + Node: member.PresenceID.Node, + SessionID: member.PresenceID.SessionID, }) - if member.SessionID == p.leader.SessionID && member.Node == p.leader.Node { + if member.PresenceID.SessionID == p.leader.PresenceID.SessionID && member.PresenceID.Node == p.leader.PresenceID.Node { continue } - memberPresenceIDs = append(memberPresenceIDs, member) + memberPresenceIDs = append(memberPresenceIDs, member.PresenceID) } p.RUnlock() @@ -585,7 +586,7 @@ func (p *PartyHandler) MatchmakerRemove(sessionID, node, ticket string) error { } // Only the party leader may stop a matchmaking process. - if p.leader == nil || p.leader.SessionID.String() != sessionID || p.leader.Node != node { + if p.leader == nil || p.leader.UserPresence.SessionId != sessionID || p.leader.PresenceID.Node != node { p.RUnlock() return runtime.ErrPartyNotLeader } @@ -602,22 +603,24 @@ func (p *PartyHandler) DataSend(sessionID, node string, opCode int64, data []byt return runtime.ErrPartyClosed } + members := p.members.List() + // Check if the sender is a party member. var sender *rtapi.UserPresence - for i, member := range p.members { - if member.SessionID.String() == sessionID && member.Node == node { - sender = p.memberUserPresences[i] + for _, member := range members { + if member.UserPresence.SessionId == sessionID && member.PresenceID.Node == node { + sender = member.UserPresence break } } var recipients []*PresenceID - if sender != nil && len(p.members) > 0 { - recipients = make([]*PresenceID, 0, len(p.members)-1) - for _, member := range p.members { - if member.SessionID.String() == sessionID && member.Node == node { + if sender != nil && len(members) > 0 { + recipients = make([]*PresenceID, 0, len(members)-1) + for _, member := range members { + if member.UserPresence.SessionId == sessionID && member.PresenceID.Node == node { continue } - recipients = append(recipients, member) + recipients = append(recipients, member.PresenceID) } } diff --git a/server/party_handler_test.go b/server/party_handler_test.go index c806d8b0523f2d19ddc449df2376b68476489230..30364a1ec0c8174d88265dd9271cbf3fef5dff3d 100644 --- a/server/party_handler_test.go +++ b/server/party_handler_test.go @@ -56,7 +56,6 @@ func TestPartyMatchmakerAddAndRemove(t *testing.T) { } func createTestPartyHandler(t *testing.T, logger *zap.Logger) (*PartyHandler, func() error) { - node := "node1" mm, cleanup, _ := createTestMatchmaker(t, logger, true, nil) diff --git a/server/party_presence.go b/server/party_presence.go new file mode 100644 index 0000000000000000000000000000000000000000..f65890b26a1d8b07bb83f2694dfd38780ab5f6b1 --- /dev/null +++ b/server/party_presence.go @@ -0,0 +1,169 @@ +// Copyright 2023 The Nakama Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "sync" + "sync/atomic" + + "github.com/gofrs/uuid" + "github.com/heroiclabs/nakama-common/rtapi" + "github.com/heroiclabs/nakama-common/runtime" +) + +type PartyPresenceList struct { + sync.RWMutex + maxSize int + presences []*PartyPresenceListItem + presenceMap map[uuid.UUID]string + presencesRead *atomic.Value + reservedMap map[uuid.UUID]struct{} +} + +type PartyPresenceListItem struct { + PresenceID *PresenceID + Presence *Presence + UserPresence *rtapi.UserPresence +} + +func NewPartyPresenceList(maxSize int) *PartyPresenceList { + m := &PartyPresenceList{ + maxSize: maxSize, + presences: make([]*PartyPresenceListItem, 0, maxSize), + presenceMap: make(map[uuid.UUID]string, maxSize), + presencesRead: &atomic.Value{}, + } + m.presencesRead.Store(make([]*PartyPresenceListItem, 0, maxSize)) + return m +} + +func (m *PartyPresenceList) Reserve(presence *Presence) error { + m.Lock() + if _, found := m.reservedMap[presence.ID.SessionID]; found { + m.Unlock() + return nil + } + if len(m.presenceMap)+len(m.reservedMap) >= m.maxSize { + m.Unlock() + return runtime.ErrPartyFull + } + m.reservedMap[presence.ID.SessionID] = struct{}{} + m.Unlock() + return nil +} + +func (m *PartyPresenceList) Release(presence *Presence) { + m.Lock() + delete(m.reservedMap, presence.ID.SessionID) + m.Unlock() +} + +func (m *PartyPresenceList) Join(joins []*Presence) ([]*Presence, error) { + processed := make([]*Presence, 0, len(joins)) + m.Lock() + var newPresences int + for _, join := range joins { + _, reservationFound := m.reservedMap[join.ID.SessionID] + _, presenceFound := m.presenceMap[join.ID.SessionID] + if !reservationFound && !presenceFound { + newPresences++ + } + } + if newPresences > 0 && len(m.reservedMap)+len(m.presenceMap)+newPresences > m.maxSize { + m.Unlock() + return nil, runtime.ErrPartyFull + } + + for _, join := range joins { + delete(m.reservedMap, join.ID.SessionID) + if _, ok := m.presenceMap[join.ID.SessionID]; !ok { + m.presences = append(m.presences, &PartyPresenceListItem{ + PresenceID: &PresenceID{ + Node: join.ID.Node, + SessionID: join.ID.SessionID, + }, + Presence: join, + UserPresence: &rtapi.UserPresence{ + UserId: join.GetUserId(), + SessionId: join.GetSessionId(), + Username: join.GetUsername(), + }, + }) + m.presenceMap[join.ID.SessionID] = join.ID.Node + processed = append(processed, join) + } + } + if len(processed) > 0 { + presencesRead := make([]*PartyPresenceListItem, 0, len(m.presences)) + for _, presence := range m.presences { + presencesRead = append(presencesRead, presence) + } + m.presencesRead.Store(presencesRead) + } + m.Unlock() + return processed, nil +} + +func (m *PartyPresenceList) Leave(leaves []*Presence) ([]*Presence, []*Presence) { + processed := make([]*Presence, 0, len(leaves)) + reservations := make([]*Presence, 0, len(leaves)) + m.Lock() + for _, leave := range leaves { + if _, found := m.reservedMap[leave.ID.SessionID]; found { + delete(m.reservedMap, leave.ID.SessionID) + reservations = append(reservations, leave) + } + if _, ok := m.presenceMap[leave.ID.SessionID]; ok { + for i, presence := range m.presences { + if presence.Presence.ID.SessionID == leave.ID.SessionID && presence.Presence.ID.Node == leave.ID.Node { + copy(m.presences[i:], m.presences[i+1:]) + m.presences[len(m.presences)-1] = nil + m.presences = m.presences[:len(m.presences)-1] + break + } + } + delete(m.presenceMap, leave.ID.SessionID) + processed = append(processed, leave) + } + } + if len(processed) > 0 { + presencesRead := make([]*PartyPresenceListItem, 0, len(m.presences)) + for _, presence := range m.presences { + presencesRead = append(presencesRead, presence) + } + m.presencesRead.Store(presencesRead) + } + m.Unlock() + return processed, reservations +} + +func (m *PartyPresenceList) List() []*PartyPresenceListItem { + return m.presencesRead.Load().([]*PartyPresenceListItem) +} + +func (m *PartyPresenceList) Size() int { + m.RLock() + defer m.RUnlock() + return len(m.presenceMap) + len(m.reservedMap) +} + +func (m *PartyPresenceList) Oldest() (*PresenceID, *rtapi.UserPresence) { + m.RLock() + defer m.RUnlock() + if len(m.presences) == 0 { + return nil, nil + } + return &m.presences[0].Presence.ID, m.presences[0].UserPresence +}