Commit 14a3d94f authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Improve OCC applied to high contention storage writes.

parent f67ce460
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -4,10 +4,17 @@ All notable changes to this project are documented below.
The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org).

## [Unreleased]
### Added
- Custom events API for client and runtime events.

### Changed
- Default runtime HTTP key value is no longer the same as the default Server key value.
- Group create now returns HTTP 409 Conflict/GRPC Code 6 when group name is already in use.

### Fixed
- Correctly handle errors when concurrently writing new storage objects.
- Correctly apply optimistic concurrency controls to individual storage objects under high write contention.

## [2.7.0] - 2019-09-11
### Added
- Enable RPC functions to receive and return raw JSON data.
+27 −7
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ import (
	"encoding/gob"
	"errors"
	"fmt"
	"github.com/jackc/pgx"
	"sort"

	"context"
@@ -555,23 +556,42 @@ func storageWriteObject(ctx context.Context, logger *zap.Logger, tx *sql.Tx, aut
		return ack, nil
	}

	params := []interface{}{object.Collection, object.Key, ownerID, object.Value, newVersion, newPermissionRead, newPermissionWrite}
	var query string
	if dbVersion.Valid {
		// Updating an existing storage object.
		query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID"
	} else {
		// Inserting a new storage object.
	switch {
	case object.Version != "" && object.Version != "*":
		// OCC if match.
		query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8"
		params = append(params, object.Version)
		// Respect permissions in non-authoritative writes.
		if !authoritativeWrite {
			query += " AND write = 1"
		}
	case dbVersion.Valid && object.Version != "*":
		// An existing storage object was present, but no OCC if-not-exists required.
		query = "UPDATE storage SET value = $4, version = $5, read = $6, write = $7, update_time = now() WHERE collection = $1 AND key = $2 AND user_id = $3::UUID AND version = $8"
		params = append(params, dbVersion.String)
		// Respect permissions in non-authoritative writes.
		if !authoritativeWrite {
			query += " AND write = 1"
		}
	default:
		// OCC if-not-exists, and all other non-OCC cases.
		query = "INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time) VALUES ($1, $2, $3::UUID, $4, $5, $6, $7, now(), now())"
		// Existing permission checks are not applicable for new storage objects.
	}

	res, err := tx.ExecContext(ctx, query, object.Collection, object.Key, ownerID, object.Value, newVersion, newPermissionRead, newPermissionWrite)
	res, err := tx.ExecContext(ctx, query, params...)
	if err != nil {
		logger.Debug("Could not write storage object, exec error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
		if e, ok := err.(pgx.PgError); ok && e.Code == dbErrorUniqueViolation {
			return nil, ErrStorageRejectedVersion
		}
		return nil, err
	}
	if rowsAffected, _ := res.RowsAffected(); rowsAffected != 1 {
		logger.Debug("Could not write storage object, rowsAffected error.", zap.Any("object", object), zap.String("query", query), zap.Error(err))
		return nil, ErrStorageWriteFailed
		return nil, ErrStorageRejectedVersion
	}

	ack := &api.StorageObjectAck{