Loading CHANGELOG.md +2 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,8 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com/) and this project uses [semantic versioning](http://semver.org/). ## [Unreleased] ### Changed - Run Facebook friends import after registration completes. ### Changed - Restructure and stabilize API messages. Loading server/pipeline_friend.go +59 −15 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ import ( "database/sql" "errors" "fmt" "github.com/lib/pq" "github.com/satori/go.uuid" "go.uber.org/zap" Loading Loading @@ -99,7 +100,7 @@ func (p *pipeline) addFacebookFriends(logger *zap.Logger, userID []byte, accessT if err != nil { logger.Error("Could not commit transaction", zap.Error(err)) } else { logger.Info("Imported friends") logger.Info("Imported friends from Facebook") } } } Loading @@ -109,35 +110,78 @@ func (p *pipeline) addFacebookFriends(logger *zap.Logger, userID []byte, accessT if err != nil { return } if len(fbFriends) == 0 { return } tx, err = p.db.Begin() if err != nil { return } friendAddedCounter := 0 for _, fbFriend := range fbFriends { var friendID []byte err = tx.QueryRow("SELECT id FROM users WHERE facebook_id = $1", fbFriend.ID).Scan(&friendID) query := "SELECT id FROM users WHERE facebook_id IN (" friends := make([]interface{}, len(fbFriends)) for i, fbFriend := range fbFriends { if i != 0 { query += ", " } query += fmt.Sprintf("$%v", i+1) friends[i] = fbFriend.ID } query += ")" rows, err := tx.Query(query, friends...) if err != nil { return } defer rows.Close() updatedAt := nowMs() _, err = tx.Exec(` INSERT INTO user_edge (source_id, position, updated_at, destination_id, state) VALUES ($1, $2, $2, $3, 0), ($3, $2, $2, $1, 0)`, userID, updatedAt, friendID) queryEdge := "INSERT INTO user_edge (source_id, position, updated_at, destination_id, state) VALUES " paramsEdge := []interface{}{userID, updatedAt} queryEdgeMetadata := "UPDATE user_edge_metadata SET count = count + 1, updated_at = $1 WHERE source_id IN (" paramsEdgeMetadata := []interface{}{updatedAt} for rows.Next() { var currentUser []byte err = rows.Scan(¤tUser) if err != nil { return } friendAddedCounter++ if len(paramsEdge) != 2 { queryEdge += ", " } paramsEdge = append(paramsEdge, currentUser) queryEdge += fmt.Sprintf("($1, $2, $2, $%v, 0), ($%v, $2, $2, $1, 0)", len(paramsEdge), len(paramsEdge)) if len(paramsEdgeMetadata) != 1 { queryEdgeMetadata += ", " } paramsEdgeMetadata = append(paramsEdgeMetadata, currentUser) queryEdgeMetadata += fmt.Sprintf("$%v", len(paramsEdgeMetadata)) } err = rows.Err() if err != nil { return } queryEdgeMetadata += ")" _, err = tx.Exec(`UPDATE user_edge_metadata SET count = count + 1, updated_at = $1 WHERE source_id = $2`, updatedAt, friendID) // Check if any Facebook friends are already users, if not there are no new edges to handle. if len(paramsEdge) <= 2 { return } _, err = tx.Exec(`UPDATE user_edge_metadata SET count = $1, updated_at = $2 WHERE source_id = $3`, friendAddedCounter, nowMs(), userID) // Insert new friend relationship edges. _, err = tx.Exec(queryEdge, paramsEdge...) if err != nil { return } // Update edge metadata for each user to increment count. _, err = tx.Exec(queryEdgeMetadata, paramsEdgeMetadata...) if err != nil { return } // Update edge metadata for current user to bump count by number of new friends. _, err = tx.Exec(`UPDATE user_edge_metadata SET count = $1, updated_at = $2 WHERE source_id = $3`, len(paramsEdge)-2, updatedAt, userID) } func (p *pipeline) getFriends(filterQuery string, userID []byte) ([]*Friend, error) { Loading server/session_auth.go +11 −3 Original line number Diff line number Diff line Loading @@ -594,12 +594,17 @@ func (a *authenticationService) loginCustom(authReq *AuthenticateRequest) ([]byt func (a *authenticationService) register(authReq *AuthenticateRequest) ([]byte, string, string, int) { // Route to correct register handler var registerFunc func(tx *sql.Tx, authReq *AuthenticateRequest) ([]byte, string, string, int) var registerHook func(authReq *AuthenticateRequest, userID []byte, handle string) switch authReq.Id.(type) { case *AuthenticateRequest_Device: registerFunc = a.registerDevice case *AuthenticateRequest_Facebook: registerFunc = a.registerFacebook registerHook = func(authReq *AuthenticateRequest, userID []byte, handle string) { l := a.logger.With(zap.String("user_id", uuid.FromBytesOrNil(userID).String())) a.pipeline.addFacebookFriends(l, userID, authReq.GetFacebook()) } case *AuthenticateRequest_Google: registerFunc = a.registerGoogle case *AuthenticateRequest_GameCenter_: Loading Loading @@ -638,6 +643,12 @@ func (a *authenticationService) register(authReq *AuthenticateRequest) ([]byte, return nil, "", errorCouldNotRegister, 500 } // Run any post-registration steps outside the main registration transaction. // Errors here should not cause registration to fail. if registerHook != nil { registerHook(authReq, userID, handle) } a.logger.Info("Registration complete", zap.String("uid", uuid.FromBytesOrNil(userID).String())) return userID, handle, errorMessage, errorCode } Loading Loading @@ -736,9 +747,6 @@ WHERE NOT EXISTS return nil, "", errorIDAlreadyInUse, 401 } l := a.logger.With(zap.String("user_id", uuid.FromBytesOrNil(userID).String())) a.pipeline.addFacebookFriends(l, userID, accessToken) err = a.addUserEdgeMetadata(tx, userID, updatedAt) if err != nil { return nil, "", errorCouldNotRegister, 401 Loading Loading
CHANGELOG.md +2 −0 Original line number Diff line number Diff line Loading @@ -4,6 +4,8 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com/) and this project uses [semantic versioning](http://semver.org/). ## [Unreleased] ### Changed - Run Facebook friends import after registration completes. ### Changed - Restructure and stabilize API messages. Loading
server/pipeline_friend.go +59 −15 Original line number Diff line number Diff line Loading @@ -18,6 +18,7 @@ import ( "database/sql" "errors" "fmt" "github.com/lib/pq" "github.com/satori/go.uuid" "go.uber.org/zap" Loading Loading @@ -99,7 +100,7 @@ func (p *pipeline) addFacebookFriends(logger *zap.Logger, userID []byte, accessT if err != nil { logger.Error("Could not commit transaction", zap.Error(err)) } else { logger.Info("Imported friends") logger.Info("Imported friends from Facebook") } } } Loading @@ -109,35 +110,78 @@ func (p *pipeline) addFacebookFriends(logger *zap.Logger, userID []byte, accessT if err != nil { return } if len(fbFriends) == 0 { return } tx, err = p.db.Begin() if err != nil { return } friendAddedCounter := 0 for _, fbFriend := range fbFriends { var friendID []byte err = tx.QueryRow("SELECT id FROM users WHERE facebook_id = $1", fbFriend.ID).Scan(&friendID) query := "SELECT id FROM users WHERE facebook_id IN (" friends := make([]interface{}, len(fbFriends)) for i, fbFriend := range fbFriends { if i != 0 { query += ", " } query += fmt.Sprintf("$%v", i+1) friends[i] = fbFriend.ID } query += ")" rows, err := tx.Query(query, friends...) if err != nil { return } defer rows.Close() updatedAt := nowMs() _, err = tx.Exec(` INSERT INTO user_edge (source_id, position, updated_at, destination_id, state) VALUES ($1, $2, $2, $3, 0), ($3, $2, $2, $1, 0)`, userID, updatedAt, friendID) queryEdge := "INSERT INTO user_edge (source_id, position, updated_at, destination_id, state) VALUES " paramsEdge := []interface{}{userID, updatedAt} queryEdgeMetadata := "UPDATE user_edge_metadata SET count = count + 1, updated_at = $1 WHERE source_id IN (" paramsEdgeMetadata := []interface{}{updatedAt} for rows.Next() { var currentUser []byte err = rows.Scan(¤tUser) if err != nil { return } friendAddedCounter++ if len(paramsEdge) != 2 { queryEdge += ", " } paramsEdge = append(paramsEdge, currentUser) queryEdge += fmt.Sprintf("($1, $2, $2, $%v, 0), ($%v, $2, $2, $1, 0)", len(paramsEdge), len(paramsEdge)) if len(paramsEdgeMetadata) != 1 { queryEdgeMetadata += ", " } paramsEdgeMetadata = append(paramsEdgeMetadata, currentUser) queryEdgeMetadata += fmt.Sprintf("$%v", len(paramsEdgeMetadata)) } err = rows.Err() if err != nil { return } queryEdgeMetadata += ")" _, err = tx.Exec(`UPDATE user_edge_metadata SET count = count + 1, updated_at = $1 WHERE source_id = $2`, updatedAt, friendID) // Check if any Facebook friends are already users, if not there are no new edges to handle. if len(paramsEdge) <= 2 { return } _, err = tx.Exec(`UPDATE user_edge_metadata SET count = $1, updated_at = $2 WHERE source_id = $3`, friendAddedCounter, nowMs(), userID) // Insert new friend relationship edges. _, err = tx.Exec(queryEdge, paramsEdge...) if err != nil { return } // Update edge metadata for each user to increment count. _, err = tx.Exec(queryEdgeMetadata, paramsEdgeMetadata...) if err != nil { return } // Update edge metadata for current user to bump count by number of new friends. _, err = tx.Exec(`UPDATE user_edge_metadata SET count = $1, updated_at = $2 WHERE source_id = $3`, len(paramsEdge)-2, updatedAt, userID) } func (p *pipeline) getFriends(filterQuery string, userID []byte) ([]*Friend, error) { Loading
server/session_auth.go +11 −3 Original line number Diff line number Diff line Loading @@ -594,12 +594,17 @@ func (a *authenticationService) loginCustom(authReq *AuthenticateRequest) ([]byt func (a *authenticationService) register(authReq *AuthenticateRequest) ([]byte, string, string, int) { // Route to correct register handler var registerFunc func(tx *sql.Tx, authReq *AuthenticateRequest) ([]byte, string, string, int) var registerHook func(authReq *AuthenticateRequest, userID []byte, handle string) switch authReq.Id.(type) { case *AuthenticateRequest_Device: registerFunc = a.registerDevice case *AuthenticateRequest_Facebook: registerFunc = a.registerFacebook registerHook = func(authReq *AuthenticateRequest, userID []byte, handle string) { l := a.logger.With(zap.String("user_id", uuid.FromBytesOrNil(userID).String())) a.pipeline.addFacebookFriends(l, userID, authReq.GetFacebook()) } case *AuthenticateRequest_Google: registerFunc = a.registerGoogle case *AuthenticateRequest_GameCenter_: Loading Loading @@ -638,6 +643,12 @@ func (a *authenticationService) register(authReq *AuthenticateRequest) ([]byte, return nil, "", errorCouldNotRegister, 500 } // Run any post-registration steps outside the main registration transaction. // Errors here should not cause registration to fail. if registerHook != nil { registerHook(authReq, userID, handle) } a.logger.Info("Registration complete", zap.String("uid", uuid.FromBytesOrNil(userID).String())) return userID, handle, errorMessage, errorCode } Loading Loading @@ -736,9 +747,6 @@ WHERE NOT EXISTS return nil, "", errorIDAlreadyInUse, 401 } l := a.logger.With(zap.String("user_id", uuid.FromBytesOrNil(userID).String())) a.pipeline.addFacebookFriends(l, userID, accessToken) err = a.addUserEdgeMetadata(tx, userID, updatedAt) if err != nil { return nil, "", errorCouldNotRegister, 401 Loading