Loading server/core_channel.go +226 −148 Original line number Diff line number Diff line Loading @@ -59,8 +59,9 @@ type channelMessageListCursor struct { IsNext bool } func ChannelMessagesList(ctx context.Context, logger *zap.Logger, db *sql.DB, caller uuid.UUID, stream PresenceStream, channelID string, limit int, forward bool, cursor string) (*api.ChannelMessageList, error) { func ChannelMessagesList(ctx context.Context, logger *zap.Logger, db *sql.DB, caller uuid.UUID, stream PresenceStream, channelID string, limit int, forward bool, cursor string, haystack *time.Time) (*api.ChannelMessageList, error) { var incomingCursor *channelMessageListCursor if cursor != "" { cb, err := base64.StdEncoding.DecodeString(cursor) if err != nil { Loading Loading @@ -100,6 +101,10 @@ func ChannelMessagesList(ctx context.Context, logger *zap.Logger, db *sql.DB, ca } } if cursor == "" && haystack != nil { getChannelMessagesHaystack(ctx, logger, db, stream, channelID, limit, forward, haystack) } else { query := `SELECT id, code, sender_id, username, content, create_time, update_time FROM message WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3::UUID AND stream_label = $4` if incomingCursor == nil { Loading Loading @@ -276,6 +281,79 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3: CacheableCursor: cacheableCursorStr, }, nil } } func getChannelMessagesHaystack(ctx context.Context, logger *zap.Logger, db *sql.DB, stream PresenceStream, channelID string, limit int, forward bool, haystack *time.Time) (*api.ChannelMessageList, error) { query := `SELECT id, code, sender_id, username, content, create_time, update_time FROM message WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3::UUID AND stream_label = $4` params := []any{stream.Mode, stream.Subject, stream.Subcontext, stream.Label, haystack} // First half. firstQuery := query + " AND create_time <= $1 ORDER BY create_time DESC, id DESC LIMIT $2" firstParams := append(params, limit+1) firstRows, err := db.QueryContext(ctx, firstQuery, firstParams...) if err != nil { logger.Error("Could not execute message list query", zap.Error(err)) return nil, err } firstRecords, err := parseChannelMessages(logger, firstRows) if err != nil { return nil, err } setNextCursor := false if len(firstRecords) > limit { // Check if there might be a next cursor setNextCursor = true firstRecords = firstRecords[:len(firstRecords)-1] } // We went 'up' on the message history, so reverse the first half of records. for left, right := 0, len(firstRecords)-1; left < right; left, right = left+1, right-1 { firstRecords[left], firstRecords[right] = firstRecords[right], firstRecords[left] } // Second half. secondQuery := query + " AND create_time > $1 ORDER BY create_time ASC, id ASC LIMIT $2" secondLimit := limit / 2 if l := len(firstRecords); l < secondLimit { secondLimit = limit - l } secondParams := append(params, secondLimit+1) secondRows, err := db.QueryContext(ctx, secondQuery, secondParams...) if err != nil { logger.Error("Could not execute message list query", zap.Error(err)) return nil, err } secondRecords, err := parseChannelMessages(logger, secondRows) if err != nil { return nil, err } setPrevCursor := false if len(secondRecords) > secondLimit { // Check if there might be a prev cursor setPrevCursor = true secondRecords = secondRecords[:len(secondRecords)-1] } records := append(firstRecords, secondRecords...) numRecords := len(records) start := numRecords - limit if start < 0 || len(firstRecords) < limit/2 { start = 0 } end := start + limit if end > numRecords { end = numRecords } records = records[start:end] } func parseChannelMessages(logger *zap.Logger, rows *sql.Rows) ([]*api.ChannelMessage, error) { } func ChannelMessageSend(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) { ts := time.Now().Unix() Loading Loading
server/core_channel.go +226 −148 Original line number Diff line number Diff line Loading @@ -59,8 +59,9 @@ type channelMessageListCursor struct { IsNext bool } func ChannelMessagesList(ctx context.Context, logger *zap.Logger, db *sql.DB, caller uuid.UUID, stream PresenceStream, channelID string, limit int, forward bool, cursor string) (*api.ChannelMessageList, error) { func ChannelMessagesList(ctx context.Context, logger *zap.Logger, db *sql.DB, caller uuid.UUID, stream PresenceStream, channelID string, limit int, forward bool, cursor string, haystack *time.Time) (*api.ChannelMessageList, error) { var incomingCursor *channelMessageListCursor if cursor != "" { cb, err := base64.StdEncoding.DecodeString(cursor) if err != nil { Loading Loading @@ -100,6 +101,10 @@ func ChannelMessagesList(ctx context.Context, logger *zap.Logger, db *sql.DB, ca } } if cursor == "" && haystack != nil { getChannelMessagesHaystack(ctx, logger, db, stream, channelID, limit, forward, haystack) } else { query := `SELECT id, code, sender_id, username, content, create_time, update_time FROM message WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3::UUID AND stream_label = $4` if incomingCursor == nil { Loading Loading @@ -276,6 +281,79 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3: CacheableCursor: cacheableCursorStr, }, nil } } func getChannelMessagesHaystack(ctx context.Context, logger *zap.Logger, db *sql.DB, stream PresenceStream, channelID string, limit int, forward bool, haystack *time.Time) (*api.ChannelMessageList, error) { query := `SELECT id, code, sender_id, username, content, create_time, update_time FROM message WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3::UUID AND stream_label = $4` params := []any{stream.Mode, stream.Subject, stream.Subcontext, stream.Label, haystack} // First half. firstQuery := query + " AND create_time <= $1 ORDER BY create_time DESC, id DESC LIMIT $2" firstParams := append(params, limit+1) firstRows, err := db.QueryContext(ctx, firstQuery, firstParams...) if err != nil { logger.Error("Could not execute message list query", zap.Error(err)) return nil, err } firstRecords, err := parseChannelMessages(logger, firstRows) if err != nil { return nil, err } setNextCursor := false if len(firstRecords) > limit { // Check if there might be a next cursor setNextCursor = true firstRecords = firstRecords[:len(firstRecords)-1] } // We went 'up' on the message history, so reverse the first half of records. for left, right := 0, len(firstRecords)-1; left < right; left, right = left+1, right-1 { firstRecords[left], firstRecords[right] = firstRecords[right], firstRecords[left] } // Second half. secondQuery := query + " AND create_time > $1 ORDER BY create_time ASC, id ASC LIMIT $2" secondLimit := limit / 2 if l := len(firstRecords); l < secondLimit { secondLimit = limit - l } secondParams := append(params, secondLimit+1) secondRows, err := db.QueryContext(ctx, secondQuery, secondParams...) if err != nil { logger.Error("Could not execute message list query", zap.Error(err)) return nil, err } secondRecords, err := parseChannelMessages(logger, secondRows) if err != nil { return nil, err } setPrevCursor := false if len(secondRecords) > secondLimit { // Check if there might be a prev cursor setPrevCursor = true secondRecords = secondRecords[:len(secondRecords)-1] } records := append(firstRecords, secondRecords...) numRecords := len(records) start := numRecords - limit if start < 0 || len(firstRecords) < limit/2 { start = 0 } end := start + limit if end > numRecords { end = numRecords } records = records[start:end] } func parseChannelMessages(logger *zap.Logger, rows *sql.Rows) ([]*api.ChannelMessage, error) { } func ChannelMessageSend(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) { ts := time.Now().Unix() Loading