Loading CHANGELOG.md +4 −0 Original line number Diff line number Diff line Loading @@ -4,7 +4,11 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org). ## [Unreleased] ### Changed - Include ticket in party matchmaker add operation responses. ### Fixed - Ensure all members are correctly listed in party info when there are multiple concurrent successful joins. ## [3.3.0] - 2021-05-17 ### Added Loading server/party_handler.go +21 −11 Original line number Diff line number Diff line Loading @@ -187,7 +187,7 @@ func (p *PartyHandler) Join(presences []*Presence) { } } memberUserPresences := make([]*rtapi.UserPresence, len(p.memberUserPresences)+len(presences)) memberUserPresences := make([]*rtapi.UserPresence, len(p.memberUserPresences), len(p.memberUserPresences)+len(presences)) copy(memberUserPresences, p.memberUserPresences) presenceIDs := make(map[*PresenceID]*rtapi.Envelope, len(presences)) Loading Loading @@ -216,7 +216,7 @@ func (p *PartyHandler) Join(presences []*Presence) { MaxSize: int32(p.MaxSize), Self: memberUserPresence, Leader: p.leaderUserPresence, Presences: memberUserPresences, // Presences assigned below, }, }, } Loading @@ -226,6 +226,7 @@ func (p *PartyHandler) Join(presences []*Presence) { // Send party info to the new joiners. for presenceID, envelope := range presenceIDs { envelope.GetParty().Presences = memberUserPresences p.router.SendToPresenceIDs(p.logger, []*PresenceID{presenceID}, envelope, true) } // The party membership has changed, stop any ongoing matchmaking processes. Loading Loading @@ -510,21 +511,22 @@ func (p *PartyHandler) JoinRequestList(sessionID, node string) ([]*rtapi.UserPre return joinRequestUserPresences, nil } func (p *PartyHandler) MatchmakerAdd(sessionID, node, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, error) { func (p *PartyHandler) MatchmakerAdd(sessionID, node, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, []*PresenceID, error) { p.RLock() if p.stopped { p.RUnlock() return "", ErrPartyClosed return "", nil, ErrPartyClosed } // Only the party leader may start a matchmaking process. if p.leader == nil || p.leader.SessionID.String() != sessionID || p.leader.Node != node { p.RUnlock() return "", ErrPartyNotLeader return "", nil, ErrPartyNotLeader } // Prepare the list of presences that will go into the matchmaker as part of the party. presences := make([]*MatchmakerPresence, 0, len(p.members)) memberPresenceIDs := make([]*PresenceID, 0, len(p.members)-1) for i, member := range p.members { memberUserPresence := p.memberUserPresences[i] presences = append(presences, &MatchmakerPresence{ Loading @@ -534,11 +536,19 @@ func (p *PartyHandler) MatchmakerAdd(sessionID, node, query string, minCount, ma Node: member.Node, SessionID: member.SessionID, }) if member.SessionID == p.leader.SessionID && member.Node == p.leader.Node { continue } memberPresenceIDs = append(memberPresenceIDs, member) } p.RUnlock() return p.matchmaker.Add(presences, "", p.IDStr, query, minCount, maxCount, stringProperties, numericProperties) ticket, err := p.matchmaker.Add(presences, "", p.IDStr, query, minCount, maxCount, stringProperties, numericProperties) if err != nil { return "", nil, err } return ticket, memberPresenceIDs, nil } func (p *PartyHandler) MatchmakerRemove(sessionID, node, ticket string) error { Loading server/party_registry.go +4 −4 Original line number Diff line number Diff line Loading @@ -39,7 +39,7 @@ type PartyRegistry interface { PartyRemove(ctx context.Context, id uuid.UUID, node, sessionID, fromNode string, presence *rtapi.UserPresence) error PartyClose(ctx context.Context, id uuid.UUID, node, sessionID, fromNode string) error PartyJoinRequestList(ctx context.Context, id uuid.UUID, node, sessionID, fromNode string) ([]*rtapi.UserPresence, error) PartyMatchmakerAdd(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, error) PartyMatchmakerAdd(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, []*PresenceID, error) PartyMatchmakerRemove(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, ticket string) error PartyDataSend(ctx context.Context, id uuid.UUID, node, sessionID, fromNode string, opCode int64, data []byte) error } Loading Loading @@ -175,14 +175,14 @@ func (p *LocalPartyRegistry) PartyJoinRequestList(ctx context.Context, id uuid.U return ph.(*PartyHandler).JoinRequestList(sessionID, fromNode) } func (p *LocalPartyRegistry) PartyMatchmakerAdd(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, error) { func (p *LocalPartyRegistry) PartyMatchmakerAdd(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, []*PresenceID, error) { if node != p.node { return "", ErrPartyNotFound return "", nil, ErrPartyNotFound } ph, found := p.parties.Load(id) if !found { return "", ErrPartyNotFound return "", nil, ErrPartyNotFound } return ph.(*PartyHandler).MatchmakerAdd(sessionID, fromNode, query, minCount, maxCount, stringProperties, numericProperties) Loading server/pipeline_party.go +16 −10 Original line number Diff line number Diff line Loading @@ -418,7 +418,7 @@ func (p *Pipeline) partyMatchmakerAdd(logger *zap.Logger, session Session, envel node := partyIDComponents[1] // Handle through the party registry. ticket, err := p.partyRegistry.PartyMatchmakerAdd(session.Context(), partyID, node, session.ID().String(), p.node, query, minCount, maxCount, incoming.StringProperties, incoming.NumericProperties) ticket, memberPresenceIDs, err := p.partyRegistry.PartyMatchmakerAdd(session.Context(), partyID, node, session.ID().String(), p.node, query, minCount, maxCount, incoming.StringProperties, incoming.NumericProperties) if err != nil { session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_BAD_INPUT), Loading @@ -427,9 +427,14 @@ func (p *Pipeline) partyMatchmakerAdd(logger *zap.Logger, session Session, envel return } session.Send(&rtapi.Envelope{Cid: envelope.Cid}, true) // Return the ticket to the party leader. session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_PartyMatchmakerTicket{PartyMatchmakerTicket: &rtapi.PartyMatchmakerTicket{ PartyId: incoming.PartyId, Ticket: ticket, }}}, true) // Return the ticket. if len(memberPresenceIDs) != 0 { // Notify all other party members. outgoing := &rtapi.Envelope{ Message: &rtapi.Envelope_PartyMatchmakerTicket{ PartyMatchmakerTicket: &rtapi.PartyMatchmakerTicket{ Loading @@ -438,7 +443,8 @@ func (p *Pipeline) partyMatchmakerAdd(logger *zap.Logger, session Session, envel }, }, } p.router.SendToStream(p.logger, PresenceStream{Mode: StreamModeParty, Subject: partyID, Label: node}, outgoing, true) p.router.SendToPresenceIDs(p.logger, memberPresenceIDs, outgoing, true) } } func (p *Pipeline) partyMatchmakerRemove(logger *zap.Logger, session Session, envelope *rtapi.Envelope) { Loading Loading
CHANGELOG.md +4 −0 Original line number Diff line number Diff line Loading @@ -4,7 +4,11 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org). ## [Unreleased] ### Changed - Include ticket in party matchmaker add operation responses. ### Fixed - Ensure all members are correctly listed in party info when there are multiple concurrent successful joins. ## [3.3.0] - 2021-05-17 ### Added Loading
server/party_handler.go +21 −11 Original line number Diff line number Diff line Loading @@ -187,7 +187,7 @@ func (p *PartyHandler) Join(presences []*Presence) { } } memberUserPresences := make([]*rtapi.UserPresence, len(p.memberUserPresences)+len(presences)) memberUserPresences := make([]*rtapi.UserPresence, len(p.memberUserPresences), len(p.memberUserPresences)+len(presences)) copy(memberUserPresences, p.memberUserPresences) presenceIDs := make(map[*PresenceID]*rtapi.Envelope, len(presences)) Loading Loading @@ -216,7 +216,7 @@ func (p *PartyHandler) Join(presences []*Presence) { MaxSize: int32(p.MaxSize), Self: memberUserPresence, Leader: p.leaderUserPresence, Presences: memberUserPresences, // Presences assigned below, }, }, } Loading @@ -226,6 +226,7 @@ func (p *PartyHandler) Join(presences []*Presence) { // Send party info to the new joiners. for presenceID, envelope := range presenceIDs { envelope.GetParty().Presences = memberUserPresences p.router.SendToPresenceIDs(p.logger, []*PresenceID{presenceID}, envelope, true) } // The party membership has changed, stop any ongoing matchmaking processes. Loading Loading @@ -510,21 +511,22 @@ func (p *PartyHandler) JoinRequestList(sessionID, node string) ([]*rtapi.UserPre return joinRequestUserPresences, nil } func (p *PartyHandler) MatchmakerAdd(sessionID, node, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, error) { func (p *PartyHandler) MatchmakerAdd(sessionID, node, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, []*PresenceID, error) { p.RLock() if p.stopped { p.RUnlock() return "", ErrPartyClosed return "", nil, ErrPartyClosed } // Only the party leader may start a matchmaking process. if p.leader == nil || p.leader.SessionID.String() != sessionID || p.leader.Node != node { p.RUnlock() return "", ErrPartyNotLeader return "", nil, ErrPartyNotLeader } // Prepare the list of presences that will go into the matchmaker as part of the party. presences := make([]*MatchmakerPresence, 0, len(p.members)) memberPresenceIDs := make([]*PresenceID, 0, len(p.members)-1) for i, member := range p.members { memberUserPresence := p.memberUserPresences[i] presences = append(presences, &MatchmakerPresence{ Loading @@ -534,11 +536,19 @@ func (p *PartyHandler) MatchmakerAdd(sessionID, node, query string, minCount, ma Node: member.Node, SessionID: member.SessionID, }) if member.SessionID == p.leader.SessionID && member.Node == p.leader.Node { continue } memberPresenceIDs = append(memberPresenceIDs, member) } p.RUnlock() return p.matchmaker.Add(presences, "", p.IDStr, query, minCount, maxCount, stringProperties, numericProperties) ticket, err := p.matchmaker.Add(presences, "", p.IDStr, query, minCount, maxCount, stringProperties, numericProperties) if err != nil { return "", nil, err } return ticket, memberPresenceIDs, nil } func (p *PartyHandler) MatchmakerRemove(sessionID, node, ticket string) error { Loading
server/party_registry.go +4 −4 Original line number Diff line number Diff line Loading @@ -39,7 +39,7 @@ type PartyRegistry interface { PartyRemove(ctx context.Context, id uuid.UUID, node, sessionID, fromNode string, presence *rtapi.UserPresence) error PartyClose(ctx context.Context, id uuid.UUID, node, sessionID, fromNode string) error PartyJoinRequestList(ctx context.Context, id uuid.UUID, node, sessionID, fromNode string) ([]*rtapi.UserPresence, error) PartyMatchmakerAdd(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, error) PartyMatchmakerAdd(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, []*PresenceID, error) PartyMatchmakerRemove(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, ticket string) error PartyDataSend(ctx context.Context, id uuid.UUID, node, sessionID, fromNode string, opCode int64, data []byte) error } Loading Loading @@ -175,14 +175,14 @@ func (p *LocalPartyRegistry) PartyJoinRequestList(ctx context.Context, id uuid.U return ph.(*PartyHandler).JoinRequestList(sessionID, fromNode) } func (p *LocalPartyRegistry) PartyMatchmakerAdd(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, error) { func (p *LocalPartyRegistry) PartyMatchmakerAdd(ctx context.Context, id uuid.UUID, node, sessionID, fromNode, query string, minCount, maxCount int, stringProperties map[string]string, numericProperties map[string]float64) (string, []*PresenceID, error) { if node != p.node { return "", ErrPartyNotFound return "", nil, ErrPartyNotFound } ph, found := p.parties.Load(id) if !found { return "", ErrPartyNotFound return "", nil, ErrPartyNotFound } return ph.(*PartyHandler).MatchmakerAdd(sessionID, fromNode, query, minCount, maxCount, stringProperties, numericProperties) Loading
server/pipeline_party.go +16 −10 Original line number Diff line number Diff line Loading @@ -418,7 +418,7 @@ func (p *Pipeline) partyMatchmakerAdd(logger *zap.Logger, session Session, envel node := partyIDComponents[1] // Handle through the party registry. ticket, err := p.partyRegistry.PartyMatchmakerAdd(session.Context(), partyID, node, session.ID().String(), p.node, query, minCount, maxCount, incoming.StringProperties, incoming.NumericProperties) ticket, memberPresenceIDs, err := p.partyRegistry.PartyMatchmakerAdd(session.Context(), partyID, node, session.ID().String(), p.node, query, minCount, maxCount, incoming.StringProperties, incoming.NumericProperties) if err != nil { session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_BAD_INPUT), Loading @@ -427,9 +427,14 @@ func (p *Pipeline) partyMatchmakerAdd(logger *zap.Logger, session Session, envel return } session.Send(&rtapi.Envelope{Cid: envelope.Cid}, true) // Return the ticket to the party leader. session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_PartyMatchmakerTicket{PartyMatchmakerTicket: &rtapi.PartyMatchmakerTicket{ PartyId: incoming.PartyId, Ticket: ticket, }}}, true) // Return the ticket. if len(memberPresenceIDs) != 0 { // Notify all other party members. outgoing := &rtapi.Envelope{ Message: &rtapi.Envelope_PartyMatchmakerTicket{ PartyMatchmakerTicket: &rtapi.PartyMatchmakerTicket{ Loading @@ -438,7 +443,8 @@ func (p *Pipeline) partyMatchmakerAdd(logger *zap.Logger, session Session, envel }, }, } p.router.SendToStream(p.logger, PresenceStream{Mode: StreamModeParty, Subject: partyID, Label: node}, outgoing, true) p.router.SendToPresenceIDs(p.logger, memberPresenceIDs, outgoing, true) } } func (p *Pipeline) partyMatchmakerRemove(logger *zap.Logger, session Session, envelope *rtapi.Envelope) { Loading