Loading CHANGELOG.md +3 −5 Original line number Diff line number Diff line Loading @@ -4,14 +4,12 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com/) and this project uses [semantic versioning](http://semver.org/). ## [Unreleased] ### Changed - Configuration in dashboard is now displayed as YAML. - Server configuration in dashboard is now displayed as YAML. - Update server protocol to simplify presence messages across chat and multiplayer. ### Fixed - Improve group SQL query with type information. - Work around a limitation in cockroachdb with type information in group sub-queries. ## [0.11.0] - 2017-02-09 ### Added Loading server/api.proto +15 −19 Original line number Diff line number Diff line Loading @@ -328,7 +328,7 @@ message TGroupUserPromote { bytes user_id = 2; } message Topic { message TopicId { oneof id { bytes dm = 1; bytes room = 2; Loading @@ -336,7 +336,7 @@ message Topic { } } message TopicUser { message UserPresence { bytes user_id = 1; bytes session_id = 2; } Loading @@ -349,16 +349,17 @@ message TTopicJoin { } } message TTopic { Topic topic = 1; repeated TopicUser users = 2; TopicId topic = 1; repeated UserPresence presences = 2; UserPresence self = 3; } message TTopicLeave { Topic topic = 1; TopicId topic = 1; } message TTopicMessageSend { Topic topic = 1; TopicId topic = 1; bytes data = 2; } Loading @@ -370,7 +371,7 @@ message TTopicMessageAck { } message TopicMessage { Topic topic = 1; TopicId topic = 1; bytes user_id = 2; bytes message_id = 3; int64 created_at = 4; Loading @@ -396,9 +397,9 @@ message TTopicMessages { } message TopicPresence { Topic topic = 1; repeated TopicUser joins = 2; repeated TopicUser leaves = 3; TopicId topic = 1; repeated UserPresence joins = 2; repeated UserPresence leaves = 3; } message TMatchCreate {} Loading @@ -406,16 +407,11 @@ message TMatch { bytes id = 1; } message MatchUser { bytes user_id = 1; bytes session_id = 2; } message TMatchJoin { bytes match_id = 1; } message TMatchUsers { repeated MatchUser users = 1; repeated UserPresence users = 1; } message TMatchDataSend { Loading @@ -426,7 +422,7 @@ message TMatchDataSend { message MatchData { bytes match_id = 1; MatchUser user = 2; UserPresence user = 2; int64 op_code = 3; bytes data = 4; } Loading @@ -437,8 +433,8 @@ message TMatchLeave { message MatchPresence { bytes match_id = 1; repeated MatchUser joins = 2; repeated MatchUser leaves = 3; repeated UserPresence joins = 2; repeated UserPresence leaves = 3; } message TStorageFetch { Loading server/pipeline_group.go +5 −5 Original line number Diff line number Diff line Loading @@ -663,7 +663,7 @@ VALUES ($1, $2, $2, $3, $4), ($3, $2, $2, $1, $4)`, return } p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 1, []byte("{}")) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 1, []byte("{}")) } func (p *pipeline) groupLeave(l zap.Logger, session *session, envelope *Envelope) { Loading Loading @@ -770,7 +770,7 @@ OR return } p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 3, []byte("{}")) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 3, []byte("{}")) } func (p *pipeline) groupUserAdd(l zap.Logger, session *session, envelope *Envelope) { Loading Loading @@ -859,7 +859,7 @@ DO UPDATE SET state = 1, updated_at = $2::INT`, } data, err := json.Marshal(map[string]string{"user_id": userID.String(), "handle": handle}) p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 2, data) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 2, data) } func (p *pipeline) groupUserKick(l zap.Logger, session *session, envelope *Envelope) { Loading Loading @@ -953,7 +953,7 @@ AND } data, err := json.Marshal(map[string]string{"user_id": userID.String(), "handle": handle}) p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 4, data) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 4, data) } func (p *pipeline) groupUserPromote(l zap.Logger, session *session, envelope *Envelope) { Loading Loading @@ -1011,7 +1011,7 @@ AND } data, _ := json.Marshal(map[string]string{"user_id": userID.String(), "handle": handle}) p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 5, data) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 5, data) session.Send(&Envelope{CollationId: envelope.CollationId}) } server/pipeline_match.go +3 −3 Original line number Diff line number Diff line Loading @@ -44,10 +44,10 @@ func (p *pipeline) matchJoin(logger zap.Logger, session *session, envelope *Enve p.tracker.Track(session.id, topic, session.userID, PresenceMeta{}) users := make([]*MatchUser, len(ps)) users := make([]*UserPresence, len(ps)) for i := 0; i < len(ps); i++ { p := ps[i] users[i] = &MatchUser{ users[i] = &UserPresence{ UserId: p.UserID.Bytes(), SessionId: p.ID.SessionID.Bytes(), } Loading Loading @@ -133,7 +133,7 @@ func (p *pipeline) matchDataSend(logger zap.Logger, session *session, envelope * Payload: &Envelope_MatchData{ MatchData: &MatchData{ MatchId: matchIDBytes, User: &MatchUser{ User: &UserPresence{ UserId: session.userID.Bytes(), SessionId: session.id.Bytes(), }, Loading server/pipeline_topic.go +27 −28 Original line number Diff line number Diff line Loading @@ -37,7 +37,7 @@ var invalidRoomRegex = regexp.MustCompilePOSIX("[[:cntrl:]]+") func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Envelope) { id := envelope.GetTopicJoin() var topic *Topic var topic *TopicId var trackerTopic string switch id.Id.(type) { case *TTopicJoin_UserId: Loading Loading @@ -69,10 +69,10 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve userIDString := session.userID.String() otherUserIDString := otherUserID.String() if userIDString < otherUserIDString { topic = &Topic{Id: &Topic_Dm{Dm: append(session.userID.Bytes(), otherUserIDBytes...)}} topic = &TopicId{Id: &TopicId_Dm{Dm: append(session.userID.Bytes(), otherUserIDBytes...)}} trackerTopic = "dm:" + userIDString + ":" + otherUserIDString } else { topic = &Topic{Id: &Topic_Dm{Dm: append(otherUserIDBytes, session.userID.Bytes()...)}} topic = &TopicId{Id: &TopicId_Dm{Dm: append(otherUserIDBytes, session.userID.Bytes()...)}} trackerTopic = "dm:" + otherUserIDString + ":" + userIDString } case *TTopicJoin_Room: Loading @@ -91,7 +91,7 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve return } topic = &Topic{Id: &Topic_Room{Room: room}} topic = &TopicId{Id: &TopicId_Room{Room: room}} trackerTopic = "room:" + string(room) case *TTopicJoin_GroupId: // Check input is valid ID. Loading @@ -114,7 +114,7 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve } trackerTopic = "group:" + groupID.String() topic = &Topic{Id: &Topic_GroupId{GroupId: groupIDBytes}} topic = &TopicId{Id: &TopicId_GroupId{GroupId: groupIDBytes}} case nil: session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Error{&Error{Reason: "No topic ID found"}}}) return Loading @@ -126,25 +126,24 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve // Track the presence, and gather current member list. p.tracker.Track(session.id, trackerTopic, session.userID, PresenceMeta{}) presences := p.tracker.ListByTopic(trackerTopic) users := make([]*TopicUser, len(presences)-1) j := 0 userPresences := make([]*UserPresence, len(presences)) for i := 0; i < len(presences); i++ { // Do not report this session to itself. if presences[i].UserID == session.userID && presences[i].ID.SessionID == session.id { j += -1 } else { users[i+j] = &TopicUser{UserId: presences[i].UserID.Bytes(), SessionId: presences[i].ID.SessionID.Bytes()} } userPresences[i] = &UserPresence{UserId: presences[i].UserID.Bytes(), SessionId: presences[i].ID.SessionID.Bytes()} } session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Topic{Topic: &TTopic{Topic: topic, Users: users}}}) session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Topic{Topic: &TTopic{ Topic: topic, Presences: userPresences, Self: &UserPresence{UserId: session.userID.Bytes(), SessionId: session.id.Bytes()}, }}}) } func (p *pipeline) topicLeave(logger zap.Logger, session *session, envelope *Envelope) { topic := envelope.GetTopicLeave().Topic var trackerTopic string switch topic.Id.(type) { case *Topic_Dm: case *TopicId_Dm: // Check input is valid DM topic. bothUserIDBytes := topic.GetDm() if bothUserIDBytes == nil || len(bothUserIDBytes) != 32 { Loading Loading @@ -187,7 +186,7 @@ func (p *pipeline) topicLeave(logger zap.Logger, session *session, envelope *Env } trackerTopic = "dm:" + userID1String + ":" + userID2String case *Topic_Room: case *TopicId_Room: // Check input is valid room name. room := topic.GetRoom() if room == nil || len(room) < 1 || len(room) > 64 { Loading @@ -204,7 +203,7 @@ func (p *pipeline) topicLeave(logger zap.Logger, session *session, envelope *Env } trackerTopic = "room:" + string(room) case *Topic_GroupId: case *TopicId_GroupId: // Check input is valid ID. groupIDBytes := topic.GetGroupId() groupID, err := uuid.FromBytes(groupIDBytes) Loading Loading @@ -248,7 +247,7 @@ func (p *pipeline) topicMessageSend(logger zap.Logger, session *session, envelop var trackerTopic string switch topic.Id.(type) { case *Topic_Dm: case *TopicId_Dm: // Check input is valid DM topic. bothUserIDBytes := topic.GetDm() if bothUserIDBytes == nil || len(bothUserIDBytes) != 32 { Loading Loading @@ -291,7 +290,7 @@ func (p *pipeline) topicMessageSend(logger zap.Logger, session *session, envelop } trackerTopic = "dm:" + userID1String + ":" + userID2String case *Topic_Room: case *TopicId_Room: // Check input is valid room name. room := topic.GetRoom() if room == nil || len(room) < 1 || len(room) > 64 { Loading @@ -308,7 +307,7 @@ func (p *pipeline) topicMessageSend(logger zap.Logger, session *session, envelop } trackerTopic = "room:" + string(room) case *Topic_GroupId: case *TopicId_GroupId: // Check input is valid ID. groupIDBytes := topic.GetGroupId() groupID, err := uuid.FromBytes(groupIDBytes) Loading Loading @@ -357,7 +356,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo return } var topic *Topic var topic *TopicId var topicBytes []byte var topicType int64 switch input.Id.(type) { Loading @@ -383,7 +382,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo } else { topicBytes = append(otherUserIDBytes, session.userID.Bytes()...) } topic = &Topic{Id: &Topic_Dm{Dm: topicBytes}} topic = &TopicId{Id: &TopicId_Dm{Dm: topicBytes}} topicType = 0 case *TTopicMessagesList_Room: // Check input is valid room name. Loading @@ -401,7 +400,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo return } topic = &Topic{Id: &Topic_Room{Room: room}} topic = &TopicId{Id: &TopicId_Room{Room: room}} topicBytes = room topicType = 1 case *TTopicMessagesList_GroupId: Loading @@ -424,7 +423,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo return } topic = &Topic{Id: &Topic_GroupId{GroupId: groupIDBytes}} topic = &TopicId{Id: &TopicId_GroupId{GroupId: groupIDBytes}} topicBytes = groupIDBytes topicType = 2 case nil: Loading Loading @@ -552,12 +551,12 @@ AND ue.source_id = $2`, checkUserID, blocksUserID).Scan(&uid, &state) } // Assumes `topic` has already been validated, or was constructed internally. func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, topic *Topic, msgType int64, data []byte) ([]byte, string, int64, int64, error) { func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, topic *TopicId, msgType int64, data []byte) ([]byte, string, int64, int64, error) { var trackerTopic string var topicBytes []byte var topicType int64 switch topic.Id.(type) { case *Topic_Dm: case *TopicId_Dm: bothUserIDBytes := topic.GetDm() userID1 := uuid.FromBytesOrNil(bothUserIDBytes[:16]) userID2 := uuid.FromBytesOrNil(bothUserIDBytes[16:]) Loading @@ -565,11 +564,11 @@ func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, t trackerTopic = "dm:" + userID1.String() + ":" + userID2.String() topicBytes = bothUserIDBytes topicType = 0 case *Topic_Room: case *TopicId_Room: trackerTopic = "room:" + string(topic.GetRoom()) topicBytes = []byte(topic.GetRoom()) topicType = 1 case *Topic_GroupId: case *TopicId_GroupId: trackerTopic = "group:" + uuid.FromBytesOrNil(topic.GetGroupId()).String() topicBytes = topic.GetGroupId() topicType = 2 Loading Loading
CHANGELOG.md +3 −5 Original line number Diff line number Diff line Loading @@ -4,14 +4,12 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com/) and this project uses [semantic versioning](http://semver.org/). ## [Unreleased] ### Changed - Configuration in dashboard is now displayed as YAML. - Server configuration in dashboard is now displayed as YAML. - Update server protocol to simplify presence messages across chat and multiplayer. ### Fixed - Improve group SQL query with type information. - Work around a limitation in cockroachdb with type information in group sub-queries. ## [0.11.0] - 2017-02-09 ### Added Loading
server/api.proto +15 −19 Original line number Diff line number Diff line Loading @@ -328,7 +328,7 @@ message TGroupUserPromote { bytes user_id = 2; } message Topic { message TopicId { oneof id { bytes dm = 1; bytes room = 2; Loading @@ -336,7 +336,7 @@ message Topic { } } message TopicUser { message UserPresence { bytes user_id = 1; bytes session_id = 2; } Loading @@ -349,16 +349,17 @@ message TTopicJoin { } } message TTopic { Topic topic = 1; repeated TopicUser users = 2; TopicId topic = 1; repeated UserPresence presences = 2; UserPresence self = 3; } message TTopicLeave { Topic topic = 1; TopicId topic = 1; } message TTopicMessageSend { Topic topic = 1; TopicId topic = 1; bytes data = 2; } Loading @@ -370,7 +371,7 @@ message TTopicMessageAck { } message TopicMessage { Topic topic = 1; TopicId topic = 1; bytes user_id = 2; bytes message_id = 3; int64 created_at = 4; Loading @@ -396,9 +397,9 @@ message TTopicMessages { } message TopicPresence { Topic topic = 1; repeated TopicUser joins = 2; repeated TopicUser leaves = 3; TopicId topic = 1; repeated UserPresence joins = 2; repeated UserPresence leaves = 3; } message TMatchCreate {} Loading @@ -406,16 +407,11 @@ message TMatch { bytes id = 1; } message MatchUser { bytes user_id = 1; bytes session_id = 2; } message TMatchJoin { bytes match_id = 1; } message TMatchUsers { repeated MatchUser users = 1; repeated UserPresence users = 1; } message TMatchDataSend { Loading @@ -426,7 +422,7 @@ message TMatchDataSend { message MatchData { bytes match_id = 1; MatchUser user = 2; UserPresence user = 2; int64 op_code = 3; bytes data = 4; } Loading @@ -437,8 +433,8 @@ message TMatchLeave { message MatchPresence { bytes match_id = 1; repeated MatchUser joins = 2; repeated MatchUser leaves = 3; repeated UserPresence joins = 2; repeated UserPresence leaves = 3; } message TStorageFetch { Loading
server/pipeline_group.go +5 −5 Original line number Diff line number Diff line Loading @@ -663,7 +663,7 @@ VALUES ($1, $2, $2, $3, $4), ($3, $2, $2, $1, $4)`, return } p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 1, []byte("{}")) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 1, []byte("{}")) } func (p *pipeline) groupLeave(l zap.Logger, session *session, envelope *Envelope) { Loading Loading @@ -770,7 +770,7 @@ OR return } p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 3, []byte("{}")) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 3, []byte("{}")) } func (p *pipeline) groupUserAdd(l zap.Logger, session *session, envelope *Envelope) { Loading Loading @@ -859,7 +859,7 @@ DO UPDATE SET state = 1, updated_at = $2::INT`, } data, err := json.Marshal(map[string]string{"user_id": userID.String(), "handle": handle}) p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 2, data) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 2, data) } func (p *pipeline) groupUserKick(l zap.Logger, session *session, envelope *Envelope) { Loading Loading @@ -953,7 +953,7 @@ AND } data, err := json.Marshal(map[string]string{"user_id": userID.String(), "handle": handle}) p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 4, data) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 4, data) } func (p *pipeline) groupUserPromote(l zap.Logger, session *session, envelope *Envelope) { Loading Loading @@ -1011,7 +1011,7 @@ AND } data, _ := json.Marshal(map[string]string{"user_id": userID.String(), "handle": handle}) p.storeAndDeliverMessage(logger, session, &Topic{Id: &Topic_GroupId{GroupId: groupID.Bytes()}}, 5, data) p.storeAndDeliverMessage(logger, session, &TopicId{Id: &TopicId_GroupId{GroupId: groupID.Bytes()}}, 5, data) session.Send(&Envelope{CollationId: envelope.CollationId}) }
server/pipeline_match.go +3 −3 Original line number Diff line number Diff line Loading @@ -44,10 +44,10 @@ func (p *pipeline) matchJoin(logger zap.Logger, session *session, envelope *Enve p.tracker.Track(session.id, topic, session.userID, PresenceMeta{}) users := make([]*MatchUser, len(ps)) users := make([]*UserPresence, len(ps)) for i := 0; i < len(ps); i++ { p := ps[i] users[i] = &MatchUser{ users[i] = &UserPresence{ UserId: p.UserID.Bytes(), SessionId: p.ID.SessionID.Bytes(), } Loading Loading @@ -133,7 +133,7 @@ func (p *pipeline) matchDataSend(logger zap.Logger, session *session, envelope * Payload: &Envelope_MatchData{ MatchData: &MatchData{ MatchId: matchIDBytes, User: &MatchUser{ User: &UserPresence{ UserId: session.userID.Bytes(), SessionId: session.id.Bytes(), }, Loading
server/pipeline_topic.go +27 −28 Original line number Diff line number Diff line Loading @@ -37,7 +37,7 @@ var invalidRoomRegex = regexp.MustCompilePOSIX("[[:cntrl:]]+") func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Envelope) { id := envelope.GetTopicJoin() var topic *Topic var topic *TopicId var trackerTopic string switch id.Id.(type) { case *TTopicJoin_UserId: Loading Loading @@ -69,10 +69,10 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve userIDString := session.userID.String() otherUserIDString := otherUserID.String() if userIDString < otherUserIDString { topic = &Topic{Id: &Topic_Dm{Dm: append(session.userID.Bytes(), otherUserIDBytes...)}} topic = &TopicId{Id: &TopicId_Dm{Dm: append(session.userID.Bytes(), otherUserIDBytes...)}} trackerTopic = "dm:" + userIDString + ":" + otherUserIDString } else { topic = &Topic{Id: &Topic_Dm{Dm: append(otherUserIDBytes, session.userID.Bytes()...)}} topic = &TopicId{Id: &TopicId_Dm{Dm: append(otherUserIDBytes, session.userID.Bytes()...)}} trackerTopic = "dm:" + otherUserIDString + ":" + userIDString } case *TTopicJoin_Room: Loading @@ -91,7 +91,7 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve return } topic = &Topic{Id: &Topic_Room{Room: room}} topic = &TopicId{Id: &TopicId_Room{Room: room}} trackerTopic = "room:" + string(room) case *TTopicJoin_GroupId: // Check input is valid ID. Loading @@ -114,7 +114,7 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve } trackerTopic = "group:" + groupID.String() topic = &Topic{Id: &Topic_GroupId{GroupId: groupIDBytes}} topic = &TopicId{Id: &TopicId_GroupId{GroupId: groupIDBytes}} case nil: session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Error{&Error{Reason: "No topic ID found"}}}) return Loading @@ -126,25 +126,24 @@ func (p *pipeline) topicJoin(logger zap.Logger, session *session, envelope *Enve // Track the presence, and gather current member list. p.tracker.Track(session.id, trackerTopic, session.userID, PresenceMeta{}) presences := p.tracker.ListByTopic(trackerTopic) users := make([]*TopicUser, len(presences)-1) j := 0 userPresences := make([]*UserPresence, len(presences)) for i := 0; i < len(presences); i++ { // Do not report this session to itself. if presences[i].UserID == session.userID && presences[i].ID.SessionID == session.id { j += -1 } else { users[i+j] = &TopicUser{UserId: presences[i].UserID.Bytes(), SessionId: presences[i].ID.SessionID.Bytes()} } userPresences[i] = &UserPresence{UserId: presences[i].UserID.Bytes(), SessionId: presences[i].ID.SessionID.Bytes()} } session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Topic{Topic: &TTopic{Topic: topic, Users: users}}}) session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Topic{Topic: &TTopic{ Topic: topic, Presences: userPresences, Self: &UserPresence{UserId: session.userID.Bytes(), SessionId: session.id.Bytes()}, }}}) } func (p *pipeline) topicLeave(logger zap.Logger, session *session, envelope *Envelope) { topic := envelope.GetTopicLeave().Topic var trackerTopic string switch topic.Id.(type) { case *Topic_Dm: case *TopicId_Dm: // Check input is valid DM topic. bothUserIDBytes := topic.GetDm() if bothUserIDBytes == nil || len(bothUserIDBytes) != 32 { Loading Loading @@ -187,7 +186,7 @@ func (p *pipeline) topicLeave(logger zap.Logger, session *session, envelope *Env } trackerTopic = "dm:" + userID1String + ":" + userID2String case *Topic_Room: case *TopicId_Room: // Check input is valid room name. room := topic.GetRoom() if room == nil || len(room) < 1 || len(room) > 64 { Loading @@ -204,7 +203,7 @@ func (p *pipeline) topicLeave(logger zap.Logger, session *session, envelope *Env } trackerTopic = "room:" + string(room) case *Topic_GroupId: case *TopicId_GroupId: // Check input is valid ID. groupIDBytes := topic.GetGroupId() groupID, err := uuid.FromBytes(groupIDBytes) Loading Loading @@ -248,7 +247,7 @@ func (p *pipeline) topicMessageSend(logger zap.Logger, session *session, envelop var trackerTopic string switch topic.Id.(type) { case *Topic_Dm: case *TopicId_Dm: // Check input is valid DM topic. bothUserIDBytes := topic.GetDm() if bothUserIDBytes == nil || len(bothUserIDBytes) != 32 { Loading Loading @@ -291,7 +290,7 @@ func (p *pipeline) topicMessageSend(logger zap.Logger, session *session, envelop } trackerTopic = "dm:" + userID1String + ":" + userID2String case *Topic_Room: case *TopicId_Room: // Check input is valid room name. room := topic.GetRoom() if room == nil || len(room) < 1 || len(room) > 64 { Loading @@ -308,7 +307,7 @@ func (p *pipeline) topicMessageSend(logger zap.Logger, session *session, envelop } trackerTopic = "room:" + string(room) case *Topic_GroupId: case *TopicId_GroupId: // Check input is valid ID. groupIDBytes := topic.GetGroupId() groupID, err := uuid.FromBytes(groupIDBytes) Loading Loading @@ -357,7 +356,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo return } var topic *Topic var topic *TopicId var topicBytes []byte var topicType int64 switch input.Id.(type) { Loading @@ -383,7 +382,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo } else { topicBytes = append(otherUserIDBytes, session.userID.Bytes()...) } topic = &Topic{Id: &Topic_Dm{Dm: topicBytes}} topic = &TopicId{Id: &TopicId_Dm{Dm: topicBytes}} topicType = 0 case *TTopicMessagesList_Room: // Check input is valid room name. Loading @@ -401,7 +400,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo return } topic = &Topic{Id: &Topic_Room{Room: room}} topic = &TopicId{Id: &TopicId_Room{Room: room}} topicBytes = room topicType = 1 case *TTopicMessagesList_GroupId: Loading @@ -424,7 +423,7 @@ func (p *pipeline) topicMessagesList(logger zap.Logger, session *session, envelo return } topic = &Topic{Id: &Topic_GroupId{GroupId: groupIDBytes}} topic = &TopicId{Id: &TopicId_GroupId{GroupId: groupIDBytes}} topicBytes = groupIDBytes topicType = 2 case nil: Loading Loading @@ -552,12 +551,12 @@ AND ue.source_id = $2`, checkUserID, blocksUserID).Scan(&uid, &state) } // Assumes `topic` has already been validated, or was constructed internally. func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, topic *Topic, msgType int64, data []byte) ([]byte, string, int64, int64, error) { func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, topic *TopicId, msgType int64, data []byte) ([]byte, string, int64, int64, error) { var trackerTopic string var topicBytes []byte var topicType int64 switch topic.Id.(type) { case *Topic_Dm: case *TopicId_Dm: bothUserIDBytes := topic.GetDm() userID1 := uuid.FromBytesOrNil(bothUserIDBytes[:16]) userID2 := uuid.FromBytesOrNil(bothUserIDBytes[16:]) Loading @@ -565,11 +564,11 @@ func (p *pipeline) storeAndDeliverMessage(logger zap.Logger, session *session, t trackerTopic = "dm:" + userID1.String() + ":" + userID2.String() topicBytes = bothUserIDBytes topicType = 0 case *Topic_Room: case *TopicId_Room: trackerTopic = "room:" + string(topic.GetRoom()) topicBytes = []byte(topic.GetRoom()) topicType = 1 case *Topic_GroupId: case *TopicId_GroupId: trackerTopic = "group:" + uuid.FromBytesOrNil(topic.GetGroupId()).String() topicBytes = topic.GetGroupId() topicType = 2 Loading