Commit 326c9af0 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Clean up operations between transaction begin and use.

parent 1c34ae2a
Loading
Loading
Loading
Loading
+14 −12
Original line number Diff line number Diff line
@@ -279,12 +279,6 @@ func (s *ConsoleServer) DemoteGroupMember(ctx context.Context, in *console.Updat
			return runtime.ErrGroupNotFound
		}

		tx, err := db.BeginTx(ctx, nil)
		if err != nil {
			logger.Error("Could not begin database transaction.", zap.Error(err))
			return err
		}

		// Prepare the messages we'll need to send to the group channel.
		stream := PresenceStream{
			Mode:    StreamModeGroup,
@@ -302,6 +296,13 @@ func (s *ConsoleServer) DemoteGroupMember(ctx context.Context, in *console.Updat

		var message *api.ChannelMessage
		ts := time.Now().Unix()

		tx, err := db.BeginTx(ctx, nil)
		if err != nil {
			logger.Error("Could not begin database transaction.", zap.Error(err))
			return err
		}

		if err := ExecuteInTx(ctx, tx, func() error {
			query := ""
			if myState == 0 {
@@ -428,12 +429,6 @@ func (s *ConsoleServer) PromoteGroupMember(ctx context.Context, in *console.Upda
			return runtime.ErrGroupNotFound
		}

		tx, err := db.BeginTx(ctx, nil)
		if err != nil {
			logger.Error("Could not begin database transaction.", zap.Error(err))
			return err
		}

		// Prepare the messages we'll need to send to the group channel.
		stream := PresenceStream{
			Mode:    StreamModeGroup,
@@ -447,6 +442,13 @@ func (s *ConsoleServer) PromoteGroupMember(ctx context.Context, in *console.Upda

		var message *api.ChannelMessage
		ts := time.Now().Unix()

		tx, err := db.BeginTx(ctx, nil)
		if err != nil {
			logger.Error("Could not begin database transaction.", zap.Error(err))
			return err
		}

		if err := ExecuteInTx(ctx, tx, func() error {
			if uid == caller {
				return errors.New("cannot promote self")
+14 −13
Original line number Diff line number Diff line
@@ -102,13 +102,14 @@ func CreateGroup(ctx context.Context, logger *zap.Logger, db *sql.DB, userID uui
	query += `, edge_count) VALUES (` + strings.Join(statements, ",") + `,1)
RETURNING id, creator_id, name, description, avatar_url, state, edge_count, lang_tag, max_count, metadata, create_time, update_time`

	var group *api.Group

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return nil, err
	}

	var group *api.Group
	if err = ExecuteInTx(ctx, tx, func() error {
		rows, err := tx.QueryContext(ctx, query, params...)
		if err != nil {
@@ -590,12 +591,6 @@ func AddGroupUsers(ctx context.Context, logger *zap.Logger, db *sql.DB, router M
		return err
	}

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return err
	}

	// Prepare notification data.
	notificationContentBytes, err := json.Marshal(map[string]string{"group_id": groupID.String(), "name": groupName.String})
	if err != nil {
@@ -619,6 +614,12 @@ func AddGroupUsers(ctx context.Context, logger *zap.Logger, db *sql.DB, router M
	ts := time.Now().Unix()
	var messages []*api.ChannelMessage

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return err
	}

	if err := ExecuteInTx(ctx, tx, func() error {
		// If the transaction is retried ensure we wipe any notifications/messages that may have been prepared by previous attempts.
		notifications = make(map[uuid.UUID][]*api.Notification, len(userIDs))
@@ -1103,12 +1104,6 @@ func PromoteGroupUsers(ctx context.Context, logger *zap.Logger, db *sql.DB, rout
		return runtime.ErrGroupNotFound
	}

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return err
	}

	// Prepare the messages we'll need to send to the group channel.
	stream := PresenceStream{
		Mode:    StreamModeGroup,
@@ -1123,6 +1118,12 @@ func PromoteGroupUsers(ctx context.Context, logger *zap.Logger, db *sql.DB, rout
	ts := time.Now().Unix()
	var messages []*api.ChannelMessage

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return err
	}

	if err := ExecuteInTx(ctx, tx, func() error {
		// If the transaction is retried ensure we wipe any messages that may have been prepared by previous attempts.
		messages = make([]*api.ChannelMessage, 0, len(userIDs))
+3 −3
Original line number Diff line number Diff line
@@ -28,15 +28,15 @@ func MultiUpdate(ctx context.Context, logger *zap.Logger, db *sql.DB, accountUpd
		return nil, nil, nil
	}

	var storageWriteAcks []*api.StorageObjectAck
	var walletUpdateResults []*runtime.WalletUpdateResult

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return nil, nil, err
	}

	var storageWriteAcks []*api.StorageObjectAck
	var walletUpdateResults []*runtime.WalletUpdateResult

	if err = ExecuteInTx(ctx, tx, func() error {
		storageWriteAcks = nil
		walletUpdateResults = nil
+2 −1
Original line number Diff line number Diff line
@@ -87,13 +87,14 @@ func UpdateWallets(ctx context.Context, logger *zap.Logger, db *sql.DB, updates
		return nil, nil
	}

	var results []*runtime.WalletUpdateResult

	tx, err := db.BeginTx(ctx, nil)
	if err != nil {
		logger.Error("Could not begin database transaction.", zap.Error(err))
		return nil, err
	}

	var results []*runtime.WalletUpdateResult
	if err = ExecuteInTx(ctx, tx, func() error {
		var updateErr error
		results, updateErr = updateWallets(ctx, logger, tx, updates, updateLedger)