Commit 793baf02 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Update interface, improve storage object deletion handling.

parent f4709ce0
Loading
Loading
Loading
Loading
+17 −45
Original line number Diff line number Diff line
@@ -20,6 +20,8 @@ import (
	"encoding/json"
	"errors"
	"fmt"
	"time"

	"github.com/blugelabs/bluge"
	"github.com/blugelabs/bluge/index"
	"github.com/blugelabs/bluge/search"
@@ -28,12 +30,11 @@ import (
	"github.com/jackc/pgtype"
	"go.uber.org/zap"
	"google.golang.org/protobuf/types/known/wrapperspb"
	"time"
)

type StorageIndex interface {
	Write(ctx context.Context, objects StorageOpWrites) (map[string]StorageOpWrites, map[string]StorageOpDeletes)
	Delete(ctx context.Context, objects StorageOpDeletes) map[string]StorageOpDeletes
	Write(ctx context.Context, objects StorageOpWrites) (creates int, deletes int)
	Delete(ctx context.Context, objects StorageOpDeletes) (deletes int)
	List(ctx context.Context, indexName, query string, limit int) (*api.StorageObjects, error)
	Load(ctx context.Context) error
	CreateIndex(ctx context.Context, name, collection, key string, fields []string, maxEntries int) error
@@ -58,7 +59,7 @@ type LocalStorageIndex struct {
}

func NewLocalStorageIndex(logger *zap.Logger, db *sql.DB) (StorageIndex, error) {
	lsc := &LocalStorageIndex{
	si := &LocalStorageIndex{
		logger:                logger,
		db:                    db,
		indexByName:           make(map[string]*storageIndex),
@@ -66,17 +67,15 @@ func NewLocalStorageIndex(logger *zap.Logger, db *sql.DB) (StorageIndex, error)
		customFilterFunctions: make(map[string]RuntimeStorageIndexFilterFunction),
	}

	return lsc, nil
	return si, nil
}

func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpWrites) (updates map[string]StorageOpWrites, deletes map[string]StorageOpDeletes) {
func (si *LocalStorageIndex) Write(ctx context.Context, objects StorageOpWrites) (updates int, deletes int) {
	batches := make(map[*storageIndex]*index.Batch, 0)
	updates = make(map[string]StorageOpWrites, 0)
	deletes = make(map[string]StorageOpDeletes, 0)

	updateTime := time.Now()

	for _, so := range storageWrites {
	for _, so := range objects {
		indices, found := si.indicesByCollection[so.Object.Collection]
		if !found {
			continue
@@ -102,25 +101,7 @@ func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpW
						docId := si.storageIndexDocumentId(so.Object.Collection, so.Object.Key, so.OwnerID)
						batch.Delete(docId)

						if ds, ok := deletes[idx.Name]; ok {
							deletes[idx.Name] = append(ds, &StorageOpDelete{
								OwnerID: so.OwnerID,
								ObjectID: &api.DeleteStorageObjectId{
									Collection: so.Object.Collection,
									Key:        so.Object.Key,
									// Blank Version as it is irrelevant for storage index deletes.
								},
							})
						} else {
							deletes[idx.Name] = StorageOpDeletes{&StorageOpDelete{
								OwnerID: so.OwnerID,
								ObjectID: &api.DeleteStorageObjectId{
									Collection: so.Object.Collection,
									Key:        so.Object.Key,
									// Blank Version as it is irrelevant for storage index deletes.
								},
							}}
						}
						deletes++

						continue
					}
@@ -138,11 +119,7 @@ func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpW

				batch.Update(doc.ID(), doc)

				if u, ok := updates[idx.Name]; ok {
					updates[idx.Name] = append(u, so)
				} else {
					updates[idx.Name] = StorageOpWrites{so}
				}
				updates++
			}
		}
	}
@@ -191,14 +168,13 @@ func (si *LocalStorageIndex) Write(ctx context.Context, storageWrites StorageOpW
	return updates, deletes
}

func (si *LocalStorageIndex) Delete(ctx context.Context, deletes StorageOpDeletes) (ops map[string]StorageOpDeletes) {
func (si *LocalStorageIndex) Delete(ctx context.Context, objects StorageOpDeletes) (deletes int) {
	batches := make(map[*storageIndex]*index.Batch, 0)
	ops = make(map[string]StorageOpDeletes)

	for _, d := range deletes {
	for _, d := range objects {
		indices, found := si.indicesByCollection[d.ObjectID.Collection]
		if !found {
			return ops
			continue
		}

		for _, idx := range indices {
@@ -211,11 +187,7 @@ func (si *LocalStorageIndex) Delete(ctx context.Context, deletes StorageOpDelete
			docId := si.storageIndexDocumentId(d.ObjectID.Collection, d.ObjectID.Key, d.OwnerID)
			batch.Delete(docId)

			if dels, ok := ops[idx.Name]; ok {
				ops[idx.Name] = append(dels, d)
			} else {
				ops[idx.Name] = StorageOpDeletes{d}
			}
			deletes++
		}
	}

@@ -226,7 +198,7 @@ func (si *LocalStorageIndex) Delete(ctx context.Context, deletes StorageOpDelete
		}
	}

	return ops
	return deletes
}

func (si *LocalStorageIndex) List(ctx context.Context, indexName, query string, limit int) (*api.StorageObjects, error) {
@@ -410,7 +382,7 @@ LIMIT $2`
	return nil
}

func (sc *LocalStorageIndex) mapIndexStorageFields(userID, collection, key, version, value string, filters []string, updateTime time.Time) (*bluge.Document, error) {
func (si *LocalStorageIndex) mapIndexStorageFields(userID, collection, key, version, value string, filters []string, updateTime time.Time) (*bluge.Document, error) {
	if collection == "" || key == "" || userID == "" {
		return nil, errors.New("insufficient fields to create index document id")
	}
@@ -435,7 +407,7 @@ func (sc *LocalStorageIndex) mapIndexStorageFields(userID, collection, key, vers
		return nil, nil
	}

	rv := bluge.NewDocument(string(sc.storageIndexDocumentId(collection, key, userID)))
	rv := bluge.NewDocument(string(si.storageIndexDocumentId(collection, key, userID)))
	rv.AddField(bluge.NewDateTimeField("update_time", updateTime).StoreValue().Sortable())
	rv.AddField(bluge.NewKeywordField("collection", collection).StoreValue())
	rv.AddField(bluge.NewKeywordField("key", key).StoreValue())