Commit cf3b25be authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Client outgoing queue QoS settings.

parent 50375d55
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
## [Unreleased]
### Added
- New timeout option to HTTP request function in the code runtime.
- QoS settings on client outgoing message queue.


## [2.0.0] - 2018-05-14
+1 −1
Original line number Diff line number Diff line
@@ -680,7 +680,7 @@ func (mh *MatchHandler) broadcastMessage(l *lua.LState) int {
	if presenceIDs == nil {
		mh.router.SendToStream(mh.logger, mh.Stream, msg)
	} else {
		mh.router.SendToPresenceIDs(mh.logger, presenceIDs, msg)
		mh.router.SendToPresenceIDs(mh.logger, presenceIDs, true, StreamModeMatchAuthoritative, msg)
	}

	return 0
+4 −4
Original line number Diff line number Diff line
@@ -22,7 +22,7 @@ import (

// MessageRouter is responsible for sending a message to a list of presences or to an entire stream.
type MessageRouter interface {
	SendToPresenceIDs(*zap.Logger, []*PresenceID, *rtapi.Envelope)
	SendToPresenceIDs(*zap.Logger, []*PresenceID, bool, uint8, *rtapi.Envelope)
	SendToStream(*zap.Logger, PresenceStream, *rtapi.Envelope)
}

@@ -40,7 +40,7 @@ func NewLocalMessageRouter(sessionRegistry *SessionRegistry, tracker Tracker, js
	}
}

func (r *LocalMessageRouter) SendToPresenceIDs(logger *zap.Logger, presenceIDs []*PresenceID, envelope *rtapi.Envelope) {
func (r *LocalMessageRouter) SendToPresenceIDs(logger *zap.Logger, presenceIDs []*PresenceID, isStream bool, mode uint8, envelope *rtapi.Envelope) {
	if len(presenceIDs) == 0 {
		return
	}
@@ -57,7 +57,7 @@ func (r *LocalMessageRouter) SendToPresenceIDs(logger *zap.Logger, presenceIDs [
			logger.Debug("No session to route to", zap.String("sid", presenceID.SessionID.String()))
			continue
		}
		if err := session.SendBytes(payloadBytes); err != nil {
		if err := session.SendBytes(isStream, mode, payloadBytes); err != nil {
			logger.Error("Failed to route to", zap.String("sid", presenceID.SessionID.String()), zap.Error(err))
		}
	}
@@ -65,5 +65,5 @@ func (r *LocalMessageRouter) SendToPresenceIDs(logger *zap.Logger, presenceIDs [

func (r *LocalMessageRouter) SendToStream(logger *zap.Logger, stream PresenceStream, envelope *rtapi.Envelope) {
	presenceIDs := r.tracker.ListPresenceIDByStream(stream)
	r.SendToPresenceIDs(logger, presenceIDs, envelope)
	r.SendToPresenceIDs(logger, presenceIDs, true, stream.Mode, envelope)
}
+5 −5
Original line number Diff line number Diff line
@@ -60,7 +60,7 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
	}

	if envelope.Message == nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_MISSING_PAYLOAD),
			Message: "Missing message.",
		}}})
@@ -90,7 +90,7 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
		hookResult, hookErr := invokeReqBeforeHook(logger, p.config, p.runtimePool, p.jsonpbMarshaler, p.jsonpbUnmarshaler, sessionID, uid, username, expiry, messageName, envelope)

		if hookErr != nil {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_RUNTIME_FUNCTION_EXCEPTION),
				Message: hookErr.Error(),
			}}})
@@ -98,7 +98,7 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
		} else if hookResult == nil {
			// if result is nil, requested resource is disabled.
			logger.Warn("Intercepted a disabled resource.", zap.String("resource", messageName))
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_UNRECOGNIZED_PAYLOAD),
				Message: "Requested resource was not found.",
			}}})
@@ -108,7 +108,7 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
		resultCast, ok := hookResult.(*rtapi.Envelope)
		if !ok {
			logger.Error("Invalid runtime Before function result. Make sure that the result matches the structure of the payload.", zap.Any("payload", envelope), zap.Any("result", hookResult))
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_RUNTIME_FUNCTION_EXCEPTION),
				Message: "Invalid runtime Before function result.",
			}}})
@@ -152,7 +152,7 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
		// If we reached this point the envelope was valid but the contents are missing or unknown.
		// Usually caused by a version mismatch, and should cause the session making this pipeline request to close.
		logger.Error("Unrecognizable payload received.", zap.Any("payload", envelope))
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_UNRECOGNIZED_PAYLOAD),
			Message: "Unrecognized message.",
		}}})
+36 −36
Original line number Diff line number Diff line
@@ -50,7 +50,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
	incoming := envelope.GetChannelJoin()

	if incoming.Target == "" {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid channel target",
		}}})
@@ -67,21 +67,21 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		fallthrough
	case int32(rtapi.ChannelJoin_ROOM):
		if len(incoming.Target) < 1 || len(incoming.Target) > 64 {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Channel name is required and must be 1-64 chars",
			}}})
			return
		}
		if controlCharsRegex.MatchString(incoming.Target) {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Channel name must not contain control chars",
			}}})
			return
		}
		if !utf8.ValidString(incoming.Target) {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Channel name must only contain valid UTF-8 bytes",
			}}})
@@ -93,7 +93,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		// Check if user ID is valid.
		uid, err := uuid.FromString(incoming.Target)
		if err != nil {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Invalid user ID in direct message join",
			}}})
@@ -101,7 +101,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		}
		// Not allowed to chat to the nil uuid.
		if uid == uuid.Nil {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Invalid user ID in direct message join",
			}}})
@@ -110,7 +110,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		// Check if attempting to chat to self.
		userID := session.UserID()
		if userID == uid {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Cannot open direct message channel with self",
			}}})
@@ -119,14 +119,14 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		// Check if the other user exists and has not blocked this user.
		allowed, err := UserExistsAndDoesNotBlock(p.db, uid, userID)
		if err != nil {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
				Message: "Failed to look up user ID",
			}}})
			return
		}
		if !allowed {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "User ID not found",
			}}})
@@ -145,7 +145,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		// Check if group ID is valid.
		gid, err := uuid.FromString(incoming.Target)
		if err != nil {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Invalid group ID in group channel join",
			}}})
@@ -153,14 +153,14 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		}
		allowed, err := groupCheckUserPermission(logger, p.db, gid, session.UserID(), 2)
		if err != nil {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
				Message: "Failed to look up group membership",
			}}})
			return
		}
		if !allowed {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: "Group not found",
			}}})
@@ -169,7 +169,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		stream.Subject = gid
		stream.Mode = StreamModeGroup
	default:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Unrecognized channel type",
		}}})
@@ -180,7 +180,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
	if err != nil {
		// Should not happen after the input validation above, but guard just in case.
		logger.Error("Error converting stream to channel identifier", zap.Error(err), zap.Any("stream", stream))
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Error identifying channel stream",
		}}})
@@ -195,7 +195,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
	}
	success, isNew := p.tracker.Track(session.ID(), stream, session.UserID(), meta, false)
	if !success {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Error joining channel",
		}}})
@@ -256,7 +256,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		}
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Channel{Channel: &rtapi.Channel{
	session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Channel{Channel: &rtapi.Channel{
		Id:        channelId,
		Presences: userPresences,
		Self: &rtapi.UserPresence{
@@ -273,7 +273,7 @@ func (p *Pipeline) channelLeave(logger *zap.Logger, session Session, envelope *r

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
	if err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid channel identifier",
		}}})
@@ -282,7 +282,7 @@ func (p *Pipeline) channelLeave(logger *zap.Logger, session Session, envelope *r

	p.tracker.Untrack(session.ID(), streamConversionResult.Stream, session.UserID())

	session.Send(&rtapi.Envelope{Cid: envelope.Cid})
	session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid})
}

func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
@@ -290,7 +290,7 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
	if err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid channel identifier",
		}}})
@@ -299,7 +299,7 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel

	var maybeJSON map[string]interface{}
	if json.Unmarshal([]byte(incoming.Content), &maybeJSON) != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Message content must be a valid JSON object",
		}}})
@@ -308,7 +308,7 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel

	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), streamConversionResult.Stream, session.UserID())
	if meta == nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Must join channel before sending messages",
		}}})
@@ -334,7 +334,7 @@ VALUES ($1, $2, $3, $4, $5, $6::UUID, $7::UUID, $8, $9, CAST($10::BIGINT AS TIME
		_, err := p.db.Exec(query, message.MessageId, message.Code.Value, message.SenderId, message.Username, streamConversionResult.Stream.Mode, streamConversionResult.Stream.Subject, streamConversionResult.Stream.Descriptor, streamConversionResult.Stream.Label, message.Content, message.CreateTime.Seconds)
		if err != nil {
			logger.Error("Error persisting channel message", zap.Error(err))
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
				Message: "Could not persist message to channel history",
			}}})
@@ -342,7 +342,7 @@ VALUES ($1, $2, $3, $4, $5, $6::UUID, $7::UUID, $8, $9, CAST($10::BIGINT AS TIME
		}
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: &rtapi.ChannelMessageAck{
	session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
@@ -359,7 +359,7 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
	incoming := envelope.GetChannelMessageUpdate()

	if _, err := uuid.FromString(incoming.MessageId); err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid message identifier",
		}}})
@@ -368,7 +368,7 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
	if err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid channel identifier",
		}}})
@@ -377,7 +377,7 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env

	var maybeJSON map[string]interface{}
	if json.Unmarshal([]byte(incoming.Content), &maybeJSON) != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Message content must be a valid JSON object",
		}}})
@@ -386,7 +386,7 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env

	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), streamConversionResult.Stream, session.UserID())
	if meta == nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Must join channel before updating messages",
		}}})
@@ -413,14 +413,14 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
		err := p.db.QueryRow(query, incoming.MessageId, message.SenderId, message.Content, message.Username, message.UpdateTime.Seconds).Scan(&dbCreateTime)
		if err != nil {
			if err == sql.ErrNoRows {
				session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
					Code:    int32(rtapi.Error_BAD_INPUT),
					Message: "Could not find message to update in channel history",
				}}})
				return
			} else {
				logger.Error("Error persisting channel message update", zap.Error(err))
				session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
					Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
					Message: "Could not persist message update to channel history",
				}}})
@@ -431,7 +431,7 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
		message.CreateTime = &timestamp.Timestamp{Seconds: dbCreateTime.Time.Unix()}
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: &rtapi.ChannelMessageAck{
	session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
@@ -448,7 +448,7 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env
	incoming := envelope.GetChannelMessageRemove()

	if _, err := uuid.FromString(incoming.MessageId); err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid message identifier",
		}}})
@@ -457,7 +457,7 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
	if err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid channel identifier",
		}}})
@@ -466,7 +466,7 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env

	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), streamConversionResult.Stream, session.UserID())
	if meta == nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Must join channel before removing messages",
		}}})
@@ -493,14 +493,14 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env
		err := p.db.QueryRow(query, incoming.MessageId, message.SenderId).Scan(&dbCreateTime)
		if err != nil {
			if err == sql.ErrNoRows {
				session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
					Code:    int32(rtapi.Error_BAD_INPUT),
					Message: "Could not find message to remove in channel history",
				}}})
				return
			} else {
				logger.Error("Error persisting channel message remove", zap.Error(err))
				session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
					Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
					Message: "Could not persist message remove to channel history",
				}}})
@@ -511,7 +511,7 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env
		message.CreateTime = &timestamp.Timestamp{Seconds: dbCreateTime.Time.Unix()}
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: &rtapi.ChannelMessageAck{
	session.Send(false, 0, &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
Loading