Commit 947a38e0 authored by Fernando Takagi's avatar Fernando Takagi
Browse files

Store unix nanoseconds on cursor

parent 3e3b45e7
Loading
Loading
Loading
Loading
+13 −9
Original line number Diff line number Diff line
@@ -125,7 +125,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
		query += " LIMIT $5"
		params := []interface{}{stream.Mode, stream.Subject, stream.Subcontext, stream.Label, limit + 1}
		if incomingCursor != nil {
			params = append(params, time.Unix(incomingCursor.CreateTime, 0).UTC(), incomingCursor.Id)
			params = append(params, time.Unix(0, incomingCursor.CreateTime).UTC(), incomingCursor.Id)
		}

		rows, err := db.QueryContext(ctx, query, params...)
@@ -138,6 +138,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
		userIDOne := stream.Subject.String()
		userIDTwo := stream.Subcontext.String()
		messages := make([]*api.ChannelMessage, 0, limit)
		createTimeNano := make(map[string]int64)
		var nextCursor, prevCursor *channelMessageListCursor

		var dbID string
@@ -154,7 +155,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
					StreamSubject:    stream.Subject.String(),
					StreamSubcontext: stream.Subcontext.String(),
					StreamLabel:      stream.Label,
					CreateTime:       dbCreateTime.Time.Unix(),
					CreateTime:       dbCreateTime.Time.UnixNano(),
					Id:               dbID,
					Forward:          forward,
					IsNext:           true,
@@ -180,6 +181,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
				UpdateTime: &timestamppb.Timestamp{Seconds: dbUpdateTime.Time.Unix()},
				Persistent: &wrapperspb.BoolValue{Value: true},
			}
			createTimeNano[dbID] = dbCreateTime.Time.UnixNano()
			switch stream.Mode {
			case StreamModeChannel:
				message.RoomName = stream.Label
@@ -199,7 +201,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
					StreamSubject:    stream.Subject.String(),
					StreamSubcontext: stream.Subcontext.String(),
					StreamLabel:      stream.Label,
					CreateTime:       dbCreateTime.Time.Unix(),
					CreateTime:       dbCreateTime.Time.UnixNano(),
					Id:               dbID,
					Forward:          forward,
					IsNext:           false,
@@ -231,7 +233,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
				StreamSubject:    stream.Subject.String(),
				StreamSubcontext: stream.Subcontext.String(),
				StreamLabel:      stream.Label,
				CreateTime:       messages[l-1].CreateTime.Seconds,
				CreateTime:       createTimeNano[messages[l-1].MessageId],
				Id:               messages[l-1].MessageId,
				Forward:          true,
				IsNext:           true,
@@ -296,7 +298,8 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
		logger.Error("Could not execute message list query", zap.Error(err))
		return nil, err
	}
	firstRecords, err := parseChannelMessages(logger, firstRows, stream, channelID, limit)
	createTimeNano := make(map[string]int64)
	firstRecords, err := parseChannelMessages(logger, firstRows, stream, channelID, limit, createTimeNano)
	if err != nil {
		return nil, err
	}
@@ -324,7 +327,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
		return nil, err
	}

	secondRecords, err := parseChannelMessages(logger, secondRows, stream, channelID, limit)
	secondRecords, err := parseChannelMessages(logger, secondRows, stream, channelID, limit, createTimeNano)
	if err != nil {
		return nil, err
	}
@@ -364,7 +367,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
			StreamSubject:    stream.Subject.String(),
			StreamSubcontext: stream.Subcontext.String(),
			StreamLabel:      stream.Label,
			CreateTime:       firstRecord.CreateTime.Seconds,
			CreateTime:       createTimeNano[firstRecord.MessageId],
			Id:               firstRecord.MessageId,
			Forward:          forward,
			IsNext:           true,
@@ -385,7 +388,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
			StreamSubject:    stream.Subject.String(),
			StreamSubcontext: stream.Subcontext.String(),
			StreamLabel:      stream.Label,
			CreateTime:       lastRecord.CreateTime.Seconds,
			CreateTime:       createTimeNano[lastRecord.MessageId],
			Id:               lastRecord.MessageId,
			Forward:          forward,
			IsNext:           false,
@@ -413,7 +416,7 @@ func marshalMessageListCursor(cursor *channelMessageListCursor) (string, error)
	return base64.StdEncoding.EncodeToString(cursorBuf.Bytes()), nil
}

func parseChannelMessages(logger *zap.Logger, rows *sql.Rows, stream PresenceStream, channelID string, limit int) ([]*api.ChannelMessage, error) {
func parseChannelMessages(logger *zap.Logger, rows *sql.Rows, stream PresenceStream, channelID string, limit int, createTimeNano map[string]int64) ([]*api.ChannelMessage, error) {
	defer rows.Close()
	groupID := stream.Subject.String()
	userIDOne := stream.Subject.String()
@@ -446,6 +449,7 @@ func parseChannelMessages(logger *zap.Logger, rows *sql.Rows, stream PresenceStr
			UpdateTime: &timestamppb.Timestamp{Seconds: dbUpdateTime.Time.Unix()},
			Persistent: &wrapperspb.BoolValue{Value: true},
		}
		createTimeNano[dbID] = dbCreateTime.Time.UnixNano()
		switch stream.Mode {
		case StreamModeChannel:
			message.RoomName = stream.Label