Commit bde48e5a authored by Fernando Takagi's avatar Fernando Takagi
Browse files

Change to a concurrent safe query.

parent b01f6330
Loading
Loading
Loading
Loading
+8 −12
Original line number Diff line number Diff line
@@ -697,20 +697,16 @@ func storagePrepBatch(batch *pgx.Batch, authoritativeWrite bool, op *StorageOpWr
		// OCC if-not-exists, and all other non-OCC cases.
		// Existing permission checks are not applicable for new storage objects.
		query = `
		WITH ins AS (
		INSERT INTO storage (collection, key, user_id, value, version, read, write, create_time, update_time)
				VALUES ($1, $2, $3, $4, $5, $6, $7, now(), now())
			ON CONFLICT (collection, key, user_id) DO NOTHING
			RETURNING read, write, version, create_time, update_time
		)
		(SELECT read, write, version, create_time, update_time FROM ins)
		UNION ALL
		(SELECT read, write, version, create_time, update_time FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3 AND version = $5 AND read = $6 AND write = $7)
		LIMIT 1`
		SELECT $1, $2, $3, $4, $5, $6, $7, now(), now()
			WHERE NOT EXISTS(SELECT 1 FROM storage WHERE collection = $1 AND key = $2 AND user_id = $3) -- avoids ON CONFLICT hitting pre-existing object and immediately cancels insert
		ON CONFLICT (collection, key, user_id) DO UPDATE
			SET version = excluded.version WHERE storage.version = excluded.version AND storage.read = excluded.read AND storage.write = excluded.write -- needed to return a row on concurrent write of same object
		RETURNING read, write, version, create_time, update_time`

		// Outcomes:
		// - Row is returned if object was new or if object already existed but had the same values being inserted
		// - NoRows - insert was ignored due to constraint violation (concurrent insert or non-OCC existing row), and values being inserted were different from existing row
		// - Row is returned if object was new or if concurrent writer wrote object with the same values (ON CONFLICT UPDATE needed)
		// - NoRows - insert was ignored due to constraint violation (non-OCC already existing row), or concurrent write wrote object with different values (UPDATE did not happen)
	}

	batch.Queue(query, params...)