Loading server/core_channel.go +53 −2 Original line number Diff line number Diff line Loading @@ -296,7 +296,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3: logger.Error("Could not execute message list query", zap.Error(err)) return nil, err } firstRecords, err := parseChannelMessages(logger, firstRows) firstRecords, err := parseChannelMessages(logger, firstRows, stream, channelID, limit) if err != nil { return nil, err } Loading Loading @@ -324,7 +324,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3: return nil, err } secondRecords, err := parseChannelMessages(logger, secondRows) secondRecords, err := parseChannelMessages(logger, secondRows, stream, channelID, limit) if err != nil { return nil, err } Loading @@ -349,6 +349,57 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3: records = records[start:end] var nextCursorStr string if setNextCursor { firstRecord := records[0] nextCursor := &channelMessageListCursor{ StreamMode: stream.Mode, StreamSubject: stream.Subject.String(), StreamSubcontext: stream.Subcontext.String(), StreamLabel: stream.Label, CreateTime: firstRecord.CreateTime.Seconds, Id: firstRecord.MessageId, Forward: forward, IsNext: true, } nextCursorStr, err = marshalMessageListCursor(nextCursor) if err != nil { logger.Error("Error creating leaderboard records list next cursor", zap.Error(err)) return nil, err } } var prevCursorStr string if setPrevCursor { lastRecord := records[len(records)-1] prevCursor := &channelMessageListCursor{ StreamMode: stream.Mode, StreamSubject: stream.Subject.String(), StreamSubcontext: stream.Subcontext.String(), StreamLabel: stream.Label, CreateTime: lastRecord.CreateTime.Seconds, Id: lastRecord.MessageId, Forward: forward, IsNext: false, } prevCursorStr, err = marshalMessageListCursor(prevCursor) if err != nil { logger.Error("Error creating leaderboard records list previous cursor", zap.Error(err)) return nil, err } } } func marshalMessageListCursor(cursor *channelMessageListCursor) (string, error) { cursorBuf := new(bytes.Buffer) if err := gob.NewEncoder(cursorBuf).Encode(cursor); err != nil { return "", err } return base64.URLEncoding.EncodeToString(cursorBuf.Bytes()), nil } func parseChannelMessages(logger *zap.Logger, rows *sql.Rows, stream PresenceStream, channelID string, limit int) ([]*api.ChannelMessage, error) { Loading Loading
server/core_channel.go +53 −2 Original line number Diff line number Diff line Loading @@ -296,7 +296,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3: logger.Error("Could not execute message list query", zap.Error(err)) return nil, err } firstRecords, err := parseChannelMessages(logger, firstRows) firstRecords, err := parseChannelMessages(logger, firstRows, stream, channelID, limit) if err != nil { return nil, err } Loading Loading @@ -324,7 +324,7 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3: return nil, err } secondRecords, err := parseChannelMessages(logger, secondRows) secondRecords, err := parseChannelMessages(logger, secondRows, stream, channelID, limit) if err != nil { return nil, err } Loading @@ -349,6 +349,57 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3: records = records[start:end] var nextCursorStr string if setNextCursor { firstRecord := records[0] nextCursor := &channelMessageListCursor{ StreamMode: stream.Mode, StreamSubject: stream.Subject.String(), StreamSubcontext: stream.Subcontext.String(), StreamLabel: stream.Label, CreateTime: firstRecord.CreateTime.Seconds, Id: firstRecord.MessageId, Forward: forward, IsNext: true, } nextCursorStr, err = marshalMessageListCursor(nextCursor) if err != nil { logger.Error("Error creating leaderboard records list next cursor", zap.Error(err)) return nil, err } } var prevCursorStr string if setPrevCursor { lastRecord := records[len(records)-1] prevCursor := &channelMessageListCursor{ StreamMode: stream.Mode, StreamSubject: stream.Subject.String(), StreamSubcontext: stream.Subcontext.String(), StreamLabel: stream.Label, CreateTime: lastRecord.CreateTime.Seconds, Id: lastRecord.MessageId, Forward: forward, IsNext: false, } prevCursorStr, err = marshalMessageListCursor(prevCursor) if err != nil { logger.Error("Error creating leaderboard records list previous cursor", zap.Error(err)) return nil, err } } } func marshalMessageListCursor(cursor *channelMessageListCursor) (string, error) { cursorBuf := new(bytes.Buffer) if err := gob.NewEncoder(cursorBuf).Encode(cursor); err != nil { return "", err } return base64.URLEncoding.EncodeToString(cursorBuf.Bytes()), nil } func parseChannelMessages(logger *zap.Logger, rows *sql.Rows, stream PresenceStream, channelID string, limit int) ([]*api.ChannelMessage, error) { Loading