Loading server/core_storage.go +126 −96 Original line number Diff line number Diff line Loading @@ -92,21 +92,27 @@ FROM storage WHERE collection = $1 AND read = 2` + cursorQuery + ` LIMIT $2` var objects *api.StorageObjectList err := ExecuteRetryable(func() error { rows, err := db.Query(query, params...) if err != nil { if err == sql.ErrNoRows { return &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor}, nil objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor} return nil } else { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return nil, err return err } } defer rows.Close() objects, err := storageListObjects(rows, cursor) objects, err = storageListObjects(rows, cursor) if err != nil { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return err } return nil }) return objects, err } Loading @@ -125,21 +131,27 @@ FROM storage WHERE collection = $1 AND read = 2 AND user_id = $2 ` + cursorQuery + ` LIMIT $3` var objects *api.StorageObjectList err := ExecuteRetryable(func() error { rows, err := db.Query(query, params...) if err != nil { if err == sql.ErrNoRows { return &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor}, nil objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor} return nil } else { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return nil, err return err } } defer rows.Close() objects, err := storageListObjects(rows, cursor) objects, err = storageListObjects(rows, cursor) if err != nil { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return err } return nil }) return objects, err } Loading @@ -166,21 +178,27 @@ WHERE collection = $1 AND user_id = $2 ` + cursorQuery + ` LIMIT $3` } var objects *api.StorageObjectList err := ExecuteRetryable(func() error { rows, err := db.Query(query, params...) if err != nil { if err == sql.ErrNoRows { return &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor}, nil objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor} return nil } else { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return nil, err return err } } defer rows.Close() objects, err := storageListObjects(rows, cursor) objects, err = storageListObjects(rows, cursor) if err != nil { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return err } return nil }) return objects, err } Loading @@ -191,38 +209,44 @@ SELECT collection, key, user_id, value, version, read, write, create_time, updat FROM storage WHERE user_id = $1` var objects []*api.StorageObject err := ExecuteRetryable(func() error { rows, err := db.Query(query, userID) if err != nil { if err == sql.ErrNoRows { return make([]*api.StorageObject, 0), nil objects = make([]*api.StorageObject, 0) return nil } else { logger.Error("Could not read storage objects.", zap.Error(err), zap.String("user_id", userID.String())) return nil, err return err } } defer rows.Close() objects := make([]*api.StorageObject, 0) funcObjects := make([]*api.StorageObject, 0) for rows.Next() { o := &api.StorageObject{CreateTime: ×tamp.Timestamp{}, UpdateTime: ×tamp.Timestamp{}} var createTime pq.NullTime var updateTime pq.NullTime var userID sql.NullString if err := rows.Scan(&o.Collection, &o.Key, &userID, &o.Value, &o.Version, &o.PermissionRead, &o.PermissionWrite, &createTime, &updateTime); err != nil { return nil, err return err } o.CreateTime.Seconds = createTime.Time.Unix() o.UpdateTime.Seconds = updateTime.Time.Unix() o.UserId = userID.String objects = append(objects, o) funcObjects = append(funcObjects, o) } if rows.Err() != nil { logger.Error("Could not read storage objects.", zap.Error(err), zap.String("user_id", userID.String())) return nil, rows.Err() return rows.Err() } objects = funcObjects return nil }) return objects, err } Loading Loading @@ -307,18 +331,21 @@ FROM storage WHERE ` + whereClause var objects *api.StorageObjects err := ExecuteRetryable(func() error { rows, err := db.Query(query, params...) if err != nil { if err == sql.ErrNoRows { return &api.StorageObjects{Objects: make([]*api.StorageObject, 0)}, nil objects = &api.StorageObjects{Objects: make([]*api.StorageObject, 0)} return nil } else { logger.Error("Could not read storage objects.", zap.Error(err)) return nil, err return err } } defer rows.Close() objects := &api.StorageObjects{Objects: make([]*api.StorageObject, 0)} funcObjects := &api.StorageObjects{Objects: make([]*api.StorageObject, 0)} for rows.Next() { o := &api.StorageObject{CreateTime: ×tamp.Timestamp{}, UpdateTime: ×tamp.Timestamp{}} var createTime pq.NullTime Loading @@ -326,7 +353,7 @@ WHERE var userID sql.NullString if err := rows.Scan(&o.Collection, &o.Key, &userID, &o.Value, &o.Version, &o.PermissionRead, &o.PermissionWrite, &createTime, &updateTime); err != nil { return nil, err return err } o.CreateTime.Seconds = createTime.Time.Unix() Loading @@ -335,14 +362,17 @@ WHERE if !uuid.Equal(uuid.FromStringOrNil(userID.String), uuid.Nil) { o.UserId = userID.String } objects.Objects = append(objects.Objects, o) funcObjects.Objects = append(funcObjects.Objects, o) } if err = rows.Err(); err != nil { logger.Error("Could not read storage objects.", zap.Error(err)) return nil, err return err } objects = funcObjects return nil }) return objects, nil return objects, err } func StorageWriteObjects(logger *zap.Logger, db *sql.DB, authoritativeWrite bool, objects map[uuid.UUID][]*api.WriteStorageObject) (*api.StorageObjectAcks, codes.Code, error) { Loading server/db.go +13 −0 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ package server import ( "errors" "github.com/lib/pq" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) Loading Loading @@ -59,3 +60,15 @@ func StatusError(code codes.Code, msg string, cause error) error { cause: cause, } } // Retry functions that perform non-transactional database operations. func ExecuteRetryable(fn func() error) error { if err := fn(); err != nil { if pqErr, ok := err.(*pq.Error); ok && pqErr.Code == "CR000" || pqErr.Code == "40001" { // A recognised error type that can be retried. return ExecuteRetryable(fn) } return err } return nil } Loading
server/core_storage.go +126 −96 Original line number Diff line number Diff line Loading @@ -92,21 +92,27 @@ FROM storage WHERE collection = $1 AND read = 2` + cursorQuery + ` LIMIT $2` var objects *api.StorageObjectList err := ExecuteRetryable(func() error { rows, err := db.Query(query, params...) if err != nil { if err == sql.ErrNoRows { return &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor}, nil objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor} return nil } else { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return nil, err return err } } defer rows.Close() objects, err := storageListObjects(rows, cursor) objects, err = storageListObjects(rows, cursor) if err != nil { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return err } return nil }) return objects, err } Loading @@ -125,21 +131,27 @@ FROM storage WHERE collection = $1 AND read = 2 AND user_id = $2 ` + cursorQuery + ` LIMIT $3` var objects *api.StorageObjectList err := ExecuteRetryable(func() error { rows, err := db.Query(query, params...) if err != nil { if err == sql.ErrNoRows { return &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor}, nil objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor} return nil } else { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return nil, err return err } } defer rows.Close() objects, err := storageListObjects(rows, cursor) objects, err = storageListObjects(rows, cursor) if err != nil { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return err } return nil }) return objects, err } Loading @@ -166,21 +178,27 @@ WHERE collection = $1 AND user_id = $2 ` + cursorQuery + ` LIMIT $3` } var objects *api.StorageObjectList err := ExecuteRetryable(func() error { rows, err := db.Query(query, params...) if err != nil { if err == sql.ErrNoRows { return &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor}, nil objects = &api.StorageObjectList{Objects: make([]*api.StorageObject, 0), Cursor: cursor} return nil } else { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return nil, err return err } } defer rows.Close() objects, err := storageListObjects(rows, cursor) objects, err = storageListObjects(rows, cursor) if err != nil { logger.Error("Could not list storage.", zap.Error(err), zap.String("collection", collection), zap.Int("limit", limit), zap.String("cursor", cursor)) return err } return nil }) return objects, err } Loading @@ -191,38 +209,44 @@ SELECT collection, key, user_id, value, version, read, write, create_time, updat FROM storage WHERE user_id = $1` var objects []*api.StorageObject err := ExecuteRetryable(func() error { rows, err := db.Query(query, userID) if err != nil { if err == sql.ErrNoRows { return make([]*api.StorageObject, 0), nil objects = make([]*api.StorageObject, 0) return nil } else { logger.Error("Could not read storage objects.", zap.Error(err), zap.String("user_id", userID.String())) return nil, err return err } } defer rows.Close() objects := make([]*api.StorageObject, 0) funcObjects := make([]*api.StorageObject, 0) for rows.Next() { o := &api.StorageObject{CreateTime: ×tamp.Timestamp{}, UpdateTime: ×tamp.Timestamp{}} var createTime pq.NullTime var updateTime pq.NullTime var userID sql.NullString if err := rows.Scan(&o.Collection, &o.Key, &userID, &o.Value, &o.Version, &o.PermissionRead, &o.PermissionWrite, &createTime, &updateTime); err != nil { return nil, err return err } o.CreateTime.Seconds = createTime.Time.Unix() o.UpdateTime.Seconds = updateTime.Time.Unix() o.UserId = userID.String objects = append(objects, o) funcObjects = append(funcObjects, o) } if rows.Err() != nil { logger.Error("Could not read storage objects.", zap.Error(err), zap.String("user_id", userID.String())) return nil, rows.Err() return rows.Err() } objects = funcObjects return nil }) return objects, err } Loading Loading @@ -307,18 +331,21 @@ FROM storage WHERE ` + whereClause var objects *api.StorageObjects err := ExecuteRetryable(func() error { rows, err := db.Query(query, params...) if err != nil { if err == sql.ErrNoRows { return &api.StorageObjects{Objects: make([]*api.StorageObject, 0)}, nil objects = &api.StorageObjects{Objects: make([]*api.StorageObject, 0)} return nil } else { logger.Error("Could not read storage objects.", zap.Error(err)) return nil, err return err } } defer rows.Close() objects := &api.StorageObjects{Objects: make([]*api.StorageObject, 0)} funcObjects := &api.StorageObjects{Objects: make([]*api.StorageObject, 0)} for rows.Next() { o := &api.StorageObject{CreateTime: ×tamp.Timestamp{}, UpdateTime: ×tamp.Timestamp{}} var createTime pq.NullTime Loading @@ -326,7 +353,7 @@ WHERE var userID sql.NullString if err := rows.Scan(&o.Collection, &o.Key, &userID, &o.Value, &o.Version, &o.PermissionRead, &o.PermissionWrite, &createTime, &updateTime); err != nil { return nil, err return err } o.CreateTime.Seconds = createTime.Time.Unix() Loading @@ -335,14 +362,17 @@ WHERE if !uuid.Equal(uuid.FromStringOrNil(userID.String), uuid.Nil) { o.UserId = userID.String } objects.Objects = append(objects.Objects, o) funcObjects.Objects = append(funcObjects.Objects, o) } if err = rows.Err(); err != nil { logger.Error("Could not read storage objects.", zap.Error(err)) return nil, err return err } objects = funcObjects return nil }) return objects, nil return objects, err } func StorageWriteObjects(logger *zap.Logger, db *sql.DB, authoritativeWrite bool, objects map[uuid.UUID][]*api.WriteStorageObject) (*api.StorageObjectAcks, codes.Code, error) { Loading
server/db.go +13 −0 Original line number Diff line number Diff line Loading @@ -16,6 +16,7 @@ package server import ( "errors" "github.com/lib/pq" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) Loading Loading @@ -59,3 +60,15 @@ func StatusError(code codes.Code, msg string, cause error) error { cause: cause, } } // Retry functions that perform non-transactional database operations. func ExecuteRetryable(fn func() error) error { if err := fn(); err != nil { if pqErr, ok := err.(*pq.Error); ok && pqErr.Code == "CR000" || pqErr.Code == "40001" { // A recognised error type that can be retried. return ExecuteRetryable(fn) } return err } return nil }