Loading CHANGELOG.md +1 −0 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr ## [Unreleased] ### Added - Allow RPC functions to receive and return raw JSON data. - Status follow operations now also accept usernames to follow. ### Changed - Update devconsole lodash (4.17.13) and lodash.template (4.5.0) dependencies. Loading build/generate_proto_gocode +7 −7 Original line number Diff line number Diff line Loading @@ -36,17 +36,17 @@ fi protoc_flags=(-I/usr/local/include -I../ -I$GOPATH/src -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway) api_pbfile="../api/api.proto" protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${api_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${api_pbfile}" || exit 1 apigrpc_pbfile="../apigrpc/apigrpc.proto" protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${apigrpc_pbfile}" || exit 1 protoc "${protoc_flags[@]}" "--grpc-gateway_out=logtostderr=true:../" "${apigrpc_pbfile}" || exit 1 protoc "${protoc_flags[@]}" "--swagger_out=logtostderr=true:../" "${apigrpc_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${apigrpc_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--grpc-gateway_out=logtostderr=true:../" "${apigrpc_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--swagger_out=logtostderr=true:../" "${apigrpc_pbfile}" || exit 1 console_pbfile="../console/console.proto" protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${console_pbfile}" || exit 1 protoc "${protoc_flags[@]}" "--grpc-gateway_out=logtostderr=true:../" "${console_pbfile}" || exit 1 protoc "${protoc_flags[@]}" "--swagger_out=logtostderr=true:../" "${console_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${console_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--grpc-gateway_out=logtostderr=true:../" "${console_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--swagger_out=logtostderr=true:../" "${console_pbfile}" || exit 1 rtapi_pbfile="../rtapi/realtime.proto" protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${rtapi_pbfile}" || exit 1 rtapi/realtime.pb.go +148 −138 File changed.Preview size limit exceeded, changes collapsed. Show changes rtapi/realtime.proto +3 −1 Original line number Diff line number Diff line Loading @@ -361,8 +361,10 @@ message Status { // Start receiving status updates for some set of users. message StatusFollow { // Users to follow. // User IDs to follow. repeated string user_ids = 1; // Usernames to follow. repeated string usernames = 2; } // A batch of status updates for a given user. Loading server/pipeline_status.go +122 −27 Original line number Diff line number Diff line Loading @@ -26,13 +26,14 @@ import ( func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *rtapi.Envelope) { incoming := envelope.GetStatusFollow() if len(incoming.UserIds) == 0 { if len(incoming.UserIds) == 0 && len(incoming.Usernames) == 0 { session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Status{Status: &rtapi.Status{ Presences: make([]*rtapi.UserPresence, 0), }}}) return } // Deduplicate user IDs. uniqueUserIDs := make(map[uuid.UUID]struct{}, len(incoming.UserIds)) for _, uid := range incoming.UserIds { userID, err := uuid.FromString(uid) Loading @@ -43,21 +44,38 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r }}}) return } uniqueUserIDs[userID] = struct{}{} } userIDs := make([]interface{}, 0, len(uniqueUserIDs)) // Deduplicate usernames. // Note: we do not yet know if these usernames and the previous user IDs may point to the same user account. uniqueUsernames := make(map[string]struct{}, len(incoming.Usernames)) for _, username := range incoming.Usernames { if username == "" { session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_BAD_INPUT), Message: "Invalid username", }}}) return } uniqueUsernames[username] = struct{}{} } var followUserIDs map[uuid.UUID]struct{} if len(uniqueUsernames) == 0 { params := make([]interface{}, 0, len(uniqueUserIDs)) statements := make([]string, 0, len(uniqueUserIDs)) index := 1 for userID, _ := range uniqueUserIDs { userIDs = append(userIDs, userID) statements = append(statements, "$"+strconv.Itoa(index)+"::UUID") index++ params = append(params, userID) statements = append(statements, "$"+strconv.Itoa(len(params))+"::UUID") } // See if all the users exist. query := "SELECT COUNT(id) FROM users WHERE id IN (" + strings.Join(statements, ", ") + ")" var dbCount int err := p.db.QueryRowContext(session.Context(), query, userIDs...).Scan(&dbCount) err := p.db.QueryRowContext(session.Context(), query, params...).Scan(&dbCount) if err != nil { logger.Error("Error checking users in status follow", zap.Error(err)) session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Loading @@ -66,7 +84,9 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r }}}) return } if dbCount != len(userIDs) { // If one or more users were missing reject the whole operation. if dbCount != len(params) { session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_BAD_INPUT), Message: "One or more users do not exist", Loading @@ -74,8 +94,83 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r return } presences := make([]*rtapi.UserPresence, 0, len(userIDs)) followUserIDs = uniqueUserIDs } else { query := "SELECT id FROM users WHERE " params := make([]interface{}, 0, len(uniqueUserIDs)) statements := make([]string, 0, len(uniqueUserIDs)) for userID, _ := range uniqueUserIDs { params = append(params, userID) statements = append(statements, "$"+strconv.Itoa(len(params))+"::UUID") } if len(statements) != 0 { query += "id IN (" + strings.Join(statements, ", ") + ")" statements = make([]string, 0, len(uniqueUsernames)) } for username, _ := range uniqueUsernames { params = append(params, username) statements = append(statements, "$"+strconv.Itoa(len(params))) } if len(uniqueUserIDs) != 0 { query += " OR " } query += "username IN (" + strings.Join(statements, ", ") + ")" followUserIDs = make(map[uuid.UUID]struct{}, len(uniqueUserIDs)+len(uniqueUsernames)) // See if all the users exist. rows, err := p.db.QueryContext(session.Context(), query, params...) if err != nil { logger.Error("Error checking users in status follow", zap.Error(err)) session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_RUNTIME_EXCEPTION), Message: "Could not check users", }}}) return } for rows.Next() { var id string err := rows.Scan(&id) if err != nil { _ = rows.Close() logger.Error("Error scanning users in status follow", zap.Error(err)) session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_RUNTIME_EXCEPTION), Message: "Could not check users", }}}) return } uid, err := uuid.FromString(id) if err != nil { _ = rows.Close() logger.Error("Error parsing users in status follow", zap.Error(err)) session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_RUNTIME_EXCEPTION), Message: "Could not check users", }}}) return } followUserIDs[uid] = struct{}{} } _ = rows.Close() // If one or more users were missing reject the whole operation. // Note: any overlap between user IDs and usernames (pointing to the same user) will also fail here. if len(followUserIDs) != len(uniqueUserIDs)+len(uniqueUsernames) { session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_BAD_INPUT), Message: "One or more users do not exist", }}}) return } } // Follow all of the validated user IDs, and prepare a list of current presences to return. presences := make([]*rtapi.UserPresence, 0, len(followUserIDs)) for userID, _ := range followUserIDs { stream := PresenceStream{Mode: StreamModeStatus, Subject: userID} success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), PresenceMeta{Format: session.Format(), Username: session.Username(), Hidden: true}, false) if !success { Loading Loading
CHANGELOG.md +1 −0 Original line number Diff line number Diff line Loading @@ -6,6 +6,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr ## [Unreleased] ### Added - Allow RPC functions to receive and return raw JSON data. - Status follow operations now also accept usernames to follow. ### Changed - Update devconsole lodash (4.17.13) and lodash.template (4.5.0) dependencies. Loading
build/generate_proto_gocode +7 −7 Original line number Diff line number Diff line Loading @@ -36,17 +36,17 @@ fi protoc_flags=(-I/usr/local/include -I../ -I$GOPATH/src -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis -I$GOPATH/src/github.com/grpc-ecosystem/grpc-gateway) api_pbfile="../api/api.proto" protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${api_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${api_pbfile}" || exit 1 apigrpc_pbfile="../apigrpc/apigrpc.proto" protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${apigrpc_pbfile}" || exit 1 protoc "${protoc_flags[@]}" "--grpc-gateway_out=logtostderr=true:../" "${apigrpc_pbfile}" || exit 1 protoc "${protoc_flags[@]}" "--swagger_out=logtostderr=true:../" "${apigrpc_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${apigrpc_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--grpc-gateway_out=logtostderr=true:../" "${apigrpc_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--swagger_out=logtostderr=true:../" "${apigrpc_pbfile}" || exit 1 console_pbfile="../console/console.proto" protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${console_pbfile}" || exit 1 protoc "${protoc_flags[@]}" "--grpc-gateway_out=logtostderr=true:../" "${console_pbfile}" || exit 1 protoc "${protoc_flags[@]}" "--swagger_out=logtostderr=true:../" "${console_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${console_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--grpc-gateway_out=logtostderr=true:../" "${console_pbfile}" || exit 1 #protoc "${protoc_flags[@]}" "--swagger_out=logtostderr=true:../" "${console_pbfile}" || exit 1 rtapi_pbfile="../rtapi/realtime.proto" protoc "${protoc_flags[@]}" "--go_out=plugins=grpc:../" "${rtapi_pbfile}" || exit 1
rtapi/realtime.pb.go +148 −138 File changed.Preview size limit exceeded, changes collapsed. Show changes
rtapi/realtime.proto +3 −1 Original line number Diff line number Diff line Loading @@ -361,8 +361,10 @@ message Status { // Start receiving status updates for some set of users. message StatusFollow { // Users to follow. // User IDs to follow. repeated string user_ids = 1; // Usernames to follow. repeated string usernames = 2; } // A batch of status updates for a given user. Loading
server/pipeline_status.go +122 −27 Original line number Diff line number Diff line Loading @@ -26,13 +26,14 @@ import ( func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *rtapi.Envelope) { incoming := envelope.GetStatusFollow() if len(incoming.UserIds) == 0 { if len(incoming.UserIds) == 0 && len(incoming.Usernames) == 0 { session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Status{Status: &rtapi.Status{ Presences: make([]*rtapi.UserPresence, 0), }}}) return } // Deduplicate user IDs. uniqueUserIDs := make(map[uuid.UUID]struct{}, len(incoming.UserIds)) for _, uid := range incoming.UserIds { userID, err := uuid.FromString(uid) Loading @@ -43,21 +44,38 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r }}}) return } uniqueUserIDs[userID] = struct{}{} } userIDs := make([]interface{}, 0, len(uniqueUserIDs)) // Deduplicate usernames. // Note: we do not yet know if these usernames and the previous user IDs may point to the same user account. uniqueUsernames := make(map[string]struct{}, len(incoming.Usernames)) for _, username := range incoming.Usernames { if username == "" { session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_BAD_INPUT), Message: "Invalid username", }}}) return } uniqueUsernames[username] = struct{}{} } var followUserIDs map[uuid.UUID]struct{} if len(uniqueUsernames) == 0 { params := make([]interface{}, 0, len(uniqueUserIDs)) statements := make([]string, 0, len(uniqueUserIDs)) index := 1 for userID, _ := range uniqueUserIDs { userIDs = append(userIDs, userID) statements = append(statements, "$"+strconv.Itoa(index)+"::UUID") index++ params = append(params, userID) statements = append(statements, "$"+strconv.Itoa(len(params))+"::UUID") } // See if all the users exist. query := "SELECT COUNT(id) FROM users WHERE id IN (" + strings.Join(statements, ", ") + ")" var dbCount int err := p.db.QueryRowContext(session.Context(), query, userIDs...).Scan(&dbCount) err := p.db.QueryRowContext(session.Context(), query, params...).Scan(&dbCount) if err != nil { logger.Error("Error checking users in status follow", zap.Error(err)) session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Loading @@ -66,7 +84,9 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r }}}) return } if dbCount != len(userIDs) { // If one or more users were missing reject the whole operation. if dbCount != len(params) { session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_BAD_INPUT), Message: "One or more users do not exist", Loading @@ -74,8 +94,83 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r return } presences := make([]*rtapi.UserPresence, 0, len(userIDs)) followUserIDs = uniqueUserIDs } else { query := "SELECT id FROM users WHERE " params := make([]interface{}, 0, len(uniqueUserIDs)) statements := make([]string, 0, len(uniqueUserIDs)) for userID, _ := range uniqueUserIDs { params = append(params, userID) statements = append(statements, "$"+strconv.Itoa(len(params))+"::UUID") } if len(statements) != 0 { query += "id IN (" + strings.Join(statements, ", ") + ")" statements = make([]string, 0, len(uniqueUsernames)) } for username, _ := range uniqueUsernames { params = append(params, username) statements = append(statements, "$"+strconv.Itoa(len(params))) } if len(uniqueUserIDs) != 0 { query += " OR " } query += "username IN (" + strings.Join(statements, ", ") + ")" followUserIDs = make(map[uuid.UUID]struct{}, len(uniqueUserIDs)+len(uniqueUsernames)) // See if all the users exist. rows, err := p.db.QueryContext(session.Context(), query, params...) if err != nil { logger.Error("Error checking users in status follow", zap.Error(err)) session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_RUNTIME_EXCEPTION), Message: "Could not check users", }}}) return } for rows.Next() { var id string err := rows.Scan(&id) if err != nil { _ = rows.Close() logger.Error("Error scanning users in status follow", zap.Error(err)) session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_RUNTIME_EXCEPTION), Message: "Could not check users", }}}) return } uid, err := uuid.FromString(id) if err != nil { _ = rows.Close() logger.Error("Error parsing users in status follow", zap.Error(err)) session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_RUNTIME_EXCEPTION), Message: "Could not check users", }}}) return } followUserIDs[uid] = struct{}{} } _ = rows.Close() // If one or more users were missing reject the whole operation. // Note: any overlap between user IDs and usernames (pointing to the same user) will also fail here. if len(followUserIDs) != len(uniqueUserIDs)+len(uniqueUsernames) { session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{ Code: int32(rtapi.Error_BAD_INPUT), Message: "One or more users do not exist", }}}) return } } // Follow all of the validated user IDs, and prepare a list of current presences to return. presences := make([]*rtapi.UserPresence, 0, len(followUserIDs)) for userID, _ := range followUserIDs { stream := PresenceStream{Mode: StreamModeStatus, Subject: userID} success, _ := p.tracker.Track(session.ID(), stream, session.UserID(), PresenceMeta{Format: session.Format(), Username: session.Username(), Hidden: true}, false) if !success { Loading