Unverified Commit ce41b152 authored by Tom Glenn's avatar Tom Glenn Committed by GitHub
Browse files

Add support for removing channel messages to all runtimes. (#910)

parent 630da26f
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -8,6 +8,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Allow the socket acceptor to read session tokens from request headers.
- Add support for custom response headers set in server configuration.
- Add missing fields to tournament end and reset JS runtime hooks.
- Add support for removing channel messages to all runtimes.

### Changed
- Stricter validation of limit in runtime storage list operations.
+168 −2
Original line number Diff line number Diff line
@@ -22,20 +22,27 @@ import (
	"encoding/gob"
	"errors"
	"fmt"
	"github.com/heroiclabs/nakama-common/rtapi"
	"github.com/heroiclabs/nakama-common/runtime"
	"strings"
	"time"
	"unicode/utf8"

	"github.com/gofrs/uuid"
	"github.com/heroiclabs/nakama-common/api"
	"github.com/heroiclabs/nakama-common/rtapi"
	"github.com/heroiclabs/nakama-common/runtime"
	"github.com/jackc/pgtype"
	"go.uber.org/zap"
	"google.golang.org/protobuf/types/known/timestamppb"
	"google.golang.org/protobuf/types/known/wrapperspb"
)

var (
	errChannelMessageIdInvalid = errors.New("Invalid message identifier")

	errChannelMessageNotFound = errors.New("channel message not found")
	errChannelMessagePersist  = errors.New("error persisting channel message")
)

// Wrapper type to avoid allocating a stream struct when the input is invalid.
type ChannelIdToStreamResult struct {
	Stream PresenceStream
@@ -270,6 +277,165 @@ WHERE stream_mode = $1 AND stream_subject = $2::UUID AND stream_descriptor = $3:
	}, nil
}

func ChannelMessageSend(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
	ts := time.Now().Unix()
	message := &api.ChannelMessage{
		ChannelId:  channelId,
		MessageId:  uuid.Must(uuid.NewV4()).String(),
		Code:       &wrapperspb.Int32Value{Value: ChannelMessageTypeChat},
		SenderId:   senderId,
		Username:   senderUsername,
		Content:    content,
		CreateTime: &timestamppb.Timestamp{Seconds: ts},
		UpdateTime: &timestamppb.Timestamp{Seconds: ts},
		Persistent: &wrapperspb.BoolValue{Value: persist},
	}

	ack := &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
		Username:   message.Username,
		CreateTime: message.CreateTime,
		UpdateTime: message.UpdateTime,
		Persistent: message.Persistent,
	}
	switch channelStream.Mode {
	case StreamModeChannel:
		message.RoomName, ack.RoomName = channelStream.Label, channelStream.Label
	case StreamModeGroup:
		message.GroupId, ack.GroupId = channelStream.Subject.String(), channelStream.Subject.String()
	case StreamModeDM:
		message.UserIdOne, ack.UserIdOne = channelStream.Subject.String(), channelStream.Subject.String()
		message.UserIdTwo, ack.UserIdTwo = channelStream.Subcontext.String(), channelStream.Subcontext.String()
	}

	if persist {
		query := `INSERT INTO message (id, code, sender_id, username, stream_mode, stream_subject, stream_descriptor, stream_label, content, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6::UUID, $7::UUID, $8, $9, $10, $10)`
		_, err := db.ExecContext(ctx, query, message.MessageId, message.Code.Value, message.SenderId, message.Username, channelStream.Mode, channelStream.Subject, channelStream.Subcontext, channelStream.Label, message.Content, time.Unix(message.CreateTime.Seconds, 0).UTC())
		if err != nil {
			logger.Error("Error persisting channel message", zap.Error(err))

			return nil, errChannelMessagePersist
		}
	}

	router.SendToStream(logger, channelStream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)

	return ack, nil
}

func ChannelMessageUpdate(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, messageId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
	ts := time.Now().Unix()
	message := &api.ChannelMessage{
		ChannelId:  channelId,
		MessageId:  messageId,
		Code:       &wrapperspb.Int32Value{Value: ChannelMessageTypeChatUpdate},
		SenderId:   senderId,
		Username:   senderUsername,
		Content:    content,
		CreateTime: &timestamppb.Timestamp{Seconds: ts},
		UpdateTime: &timestamppb.Timestamp{Seconds: ts},
		Persistent: &wrapperspb.BoolValue{Value: persist},
	}

	ack := &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
		Username:   message.Username,
		CreateTime: message.CreateTime,
		UpdateTime: message.UpdateTime,
		Persistent: message.Persistent,
	}

	switch channelStream.Mode {
	case StreamModeChannel:
		message.RoomName, ack.RoomName = channelStream.Label, channelStream.Label
	case StreamModeGroup:
		message.GroupId, ack.GroupId = channelStream.Subject.String(), channelStream.Subject.String()
	case StreamModeDM:
		message.UserIdOne, ack.UserIdOne = channelStream.Subject.String(), channelStream.Subject.String()
		message.UserIdTwo, ack.UserIdTwo = channelStream.Subcontext.String(), channelStream.Subcontext.String()
	}

	if persist {
		// First find and update the referenced message.
		var dbCreateTime pgtype.Timestamptz
		query := "UPDATE message SET update_time = $5, username = $4, content = $3 WHERE id = $1 AND sender_id = $2 RETURNING create_time"
		err := db.QueryRowContext(ctx, query, messageId, message.SenderId, message.Content, message.Username, time.Unix(message.UpdateTime.Seconds, 0).UTC()).Scan(&dbCreateTime)
		if err != nil {
			if err == sql.ErrNoRows {
				return nil, errChannelMessageNotFound
			}
			logger.Error("Error persisting channel message update", zap.Error(err))
			return nil, errChannelMessagePersist
		}
		// Replace the message create time with the real one from DB.
		message.CreateTime = &timestamppb.Timestamp{Seconds: dbCreateTime.Time.Unix()}
	}

	router.SendToStream(logger, channelStream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)

	return ack, nil
}

func ChannelMessageRemove(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, messageId, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
	ts := time.Now().Unix()
	message := &api.ChannelMessage{
		ChannelId:  channelId,
		MessageId:  messageId,
		Code:       &wrapperspb.Int32Value{Value: ChannelMessageTypeChatRemove},
		SenderId:   senderId,
		Username:   senderUsername,
		Content:    "{}",
		CreateTime: &timestamppb.Timestamp{Seconds: ts},
		UpdateTime: &timestamppb.Timestamp{Seconds: ts},
		Persistent: &wrapperspb.BoolValue{Value: persist},
	}

	ack := &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
		Username:   message.Username,
		CreateTime: message.CreateTime,
		UpdateTime: message.UpdateTime,
		Persistent: message.Persistent,
	}

	switch channelStream.Mode {
	case StreamModeChannel:
		message.RoomName, ack.RoomName = channelStream.Label, channelStream.Label
	case StreamModeGroup:
		message.GroupId, ack.GroupId = channelStream.Subject.String(), channelStream.Subject.String()
	case StreamModeDM:
		message.UserIdOne, ack.UserIdOne = channelStream.Subject.String(), channelStream.Subject.String()
		message.UserIdTwo, ack.UserIdTwo = channelStream.Subcontext.String(), channelStream.Subcontext.String()
	}

	if persist {
		// First find and remove the referenced message.
		var dbCreateTime pgtype.Timestamptz
		query := "DELETE FROM message WHERE id = $1 AND sender_id = $2 RETURNING create_time"
		err := db.QueryRowContext(ctx, query, messageId, message.SenderId).Scan(&dbCreateTime)
		if err != nil {
			if err == sql.ErrNoRows {
				return nil, errChannelMessageNotFound
			}
			logger.Error("Error persisting channel message remove", zap.Error(err))
			return nil, errChannelMessagePersist
		}
		// Replace the message create time with the real one from DB.
		message.CreateTime = &timestamppb.Timestamp{Seconds: dbCreateTime.Time.Unix()}
	}

	router.SendToStream(logger, channelStream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)

	return ack, nil
}

func GetChannelMessages(ctx context.Context, logger *zap.Logger, db *sql.DB, userID uuid.UUID) ([]*api.ChannelMessage, error) {
	query := "SELECT id, code, username, stream_mode, stream_subject, stream_descriptor, stream_label, content, create_time, update_time FROM message WHERE sender_id = $1::UUID"
	rows, err := db.QueryContext(ctx, query, userID)

server/core_message.go

deleted100644 → 0
+0 −138
Original line number Diff line number Diff line
package server

import (
	"bytes"
	"context"
	"database/sql"
	"encoding/json"
	"errors"
	"github.com/gofrs/uuid"
	"github.com/heroiclabs/nakama-common/api"
	"github.com/heroiclabs/nakama-common/rtapi"
	"github.com/jackc/pgtype"
	"go.uber.org/zap"
	"google.golang.org/protobuf/types/known/timestamppb"
	"google.golang.org/protobuf/types/known/wrapperspb"
	"time"
)

var errInvalidMessageId = errors.New("Invalid message identifier")
var errInvalidMessageContent = errors.New("Message content must be a valid JSON object")
var errMessageNotFound = errors.New("Could not find message to update in channel history")
var errMessagePersist = errors.New("Error persisting channel message")

func ChannelMessageSend(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
	if maybeJSON := []byte(content); !json.Valid(maybeJSON) || bytes.TrimSpace(maybeJSON)[0] != byteBracket {
		return nil, errInvalidMessageContent
	}

	ts := time.Now().Unix()
	message := &api.ChannelMessage{
		ChannelId:  channelId,
		MessageId:  uuid.Must(uuid.NewV4()).String(),
		Code:       &wrapperspb.Int32Value{Value: ChannelMessageTypeChat},
		SenderId:   senderId,
		Username:   senderUsername,
		Content:    content,
		CreateTime: &timestamppb.Timestamp{Seconds: ts},
		UpdateTime: &timestamppb.Timestamp{Seconds: ts},
		Persistent: &wrapperspb.BoolValue{Value: persist},
	}

	ack := &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
		Username:   message.Username,
		CreateTime: message.CreateTime,
		UpdateTime: message.UpdateTime,
		Persistent: message.Persistent,
	}
	switch channelStream.Mode {
	case StreamModeChannel:
		message.RoomName, ack.RoomName = channelStream.Label, channelStream.Label
	case StreamModeGroup:
		message.GroupId, ack.GroupId = channelStream.Subject.String(), channelStream.Subject.String()
	case StreamModeDM:
		message.UserIdOne, ack.UserIdOne = channelStream.Subject.String(), channelStream.Subject.String()
		message.UserIdTwo, ack.UserIdTwo = channelStream.Subcontext.String(), channelStream.Subcontext.String()
	}

	if persist {
		query := `INSERT INTO message (id, code, sender_id, username, stream_mode, stream_subject, stream_descriptor, stream_label, content, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6::UUID, $7::UUID, $8, $9, $10, $10)`
		_, err := db.ExecContext(ctx, query, message.MessageId, message.Code.Value, message.SenderId, message.Username, channelStream.Mode, channelStream.Subject, channelStream.Subcontext, channelStream.Label, message.Content, time.Unix(message.CreateTime.Seconds, 0).UTC())
		if err != nil {
			logger.Error("Error persisting channel message", zap.Error(err))

			return nil, errMessagePersist
		}
	}

	router.SendToStream(logger, channelStream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)

	return ack, nil
}

func ChannelMessageUpdate(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, messageId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
	if _, err := uuid.FromString(messageId); err != nil {
		return nil, errInvalidMessageId
	}

	if maybeJSON := []byte(content); !json.Valid(maybeJSON) || bytes.TrimSpace(maybeJSON)[0] != byteBracket {
		return nil, errInvalidMessageContent
	}

	ts := time.Now().Unix()
	message := &api.ChannelMessage{
		ChannelId:  channelId,
		MessageId:  messageId,
		Code:       &wrapperspb.Int32Value{Value: ChannelMessageTypeChatUpdate},
		SenderId:   senderId,
		Username:   senderUsername,
		Content:    content,
		CreateTime: &timestamppb.Timestamp{Seconds: ts},
		UpdateTime: &timestamppb.Timestamp{Seconds: ts},
		Persistent: &wrapperspb.BoolValue{Value: persist},
	}

	ack := &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
		Username:   message.Username,
		CreateTime: message.CreateTime,
		UpdateTime: message.UpdateTime,
		Persistent: message.Persistent,
	}

	switch channelStream.Mode {
	case StreamModeChannel:
		message.RoomName, ack.RoomName = channelStream.Label, channelStream.Label
	case StreamModeGroup:
		message.GroupId, ack.GroupId = channelStream.Subject.String(), channelStream.Subject.String()
	case StreamModeDM:
		message.UserIdOne, ack.UserIdOne = channelStream.Subject.String(), channelStream.Subject.String()
		message.UserIdTwo, ack.UserIdTwo = channelStream.Subcontext.String(), channelStream.Subcontext.String()
	}

	if persist {
		// First find and update the referenced message.
		var dbCreateTime pgtype.Timestamptz
		query := "UPDATE message SET update_time = $5, username = $4, content = $3 WHERE id = $1 AND sender_id = $2 RETURNING create_time"
		err := db.QueryRowContext(ctx, query, messageId, message.SenderId, message.Content, message.Username, time.Unix(message.UpdateTime.Seconds, 0).UTC()).Scan(&dbCreateTime)
		if err != nil {
			if err == sql.ErrNoRows {
				return nil, errMessageNotFound
			}
			logger.Error("Error persisting channel message update", zap.Error(err))
			return nil, errMessagePersist
		}
		// Replace the message create time with the real one from DB.
		message.CreateTime = &timestamppb.Timestamp{Seconds: dbCreateTime.Time.Unix()}
	}

	router.SendToStream(logger, channelStream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)

	return ack, nil
}
+42 −80
Original line number Diff line number Diff line
@@ -15,7 +15,7 @@
package server

import (
	"database/sql"
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
@@ -26,10 +26,8 @@ import (
	"github.com/heroiclabs/nakama-common/api"
	"github.com/heroiclabs/nakama-common/rtapi"
	"github.com/heroiclabs/nakama-common/runtime"
	"github.com/jackc/pgtype"
	"go.uber.org/zap"
	"google.golang.org/protobuf/types/known/timestamppb"
	"google.golang.org/protobuf/types/known/wrapperspb"
)

const (
@@ -45,8 +43,6 @@ const (
	ChannelMessageTypeGroupDemote
)

var ErrChannelMessageUpdateNotFound = errors.New("channel message not found")

var controlCharsRegex = regexp.MustCompilePOSIX("[[:cntrl:]]+")

func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rtapi.Envelope) (bool, *rtapi.Envelope) {
@@ -202,6 +198,14 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel
		return false, nil
	}

	if maybeJSON := []byte(incoming.Content); !json.Valid(maybeJSON) || bytes.TrimSpace(maybeJSON)[0] != byteBracket {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Message content must be a valid JSON object",
		}}}, true)
		return false, nil
	}

	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), streamConversionResult.Stream, session.UserID())
	if meta == nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
@@ -213,13 +217,7 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel

	ack, err := ChannelMessageSend(session.Context(), p.logger, p.db, p.router, streamConversionResult.Stream, incoming.ChannelId, incoming.Content, session.UserID().String(), session.Username(), meta.Persistence)
	switch err {
	case errInvalidMessageContent:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Message content must be a valid JSON object",
		}}}, true)
		return false, nil
	case errMessagePersist:
	case errChannelMessagePersist:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Could not persist message to channel history",
@@ -236,6 +234,14 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel
func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, envelope *rtapi.Envelope) (bool, *rtapi.Envelope) {
	incoming := envelope.GetChannelMessageUpdate()

	if _, err := uuid.FromString(incoming.MessageId); err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid message identifier",
		}}}, true)
		return false, nil
	}

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
	if err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
@@ -245,6 +251,14 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
		return false, nil
	}

	if maybeJSON := []byte(incoming.Content); !json.Valid(maybeJSON) || bytes.TrimSpace(maybeJSON)[0] != byteBracket {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Message content must be a valid JSON object",
		}}}, true)
		return false, nil
	}

	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), streamConversionResult.Stream, session.UserID())
	if meta == nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
@@ -256,13 +270,13 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env

	ack, err := ChannelMessageUpdate(session.Context(), p.logger, p.db, p.router, streamConversionResult.Stream, incoming.ChannelId, incoming.MessageId, incoming.Content, session.UserID().String(), session.Username(), meta.Persistence)
	switch err {
	case errInvalidMessageId, errInvalidMessageContent, errMessageNotFound:
	case errChannelMessageNotFound:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: err.Error(),
			Message: "Could not find message to update in channel history",
		}}}, true)
		return false, nil
	case errMessagePersist:
	case errChannelMessagePersist:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Could not persist message update to channel history",
@@ -304,75 +318,23 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env
		return false, nil
	}

	ts := time.Now().Unix()
	message := &api.ChannelMessage{
		ChannelId:  incoming.ChannelId,
		MessageId:  incoming.MessageId,
		Code:       &wrapperspb.Int32Value{Value: ChannelMessageTypeChatRemove},
		SenderId:   session.UserID().String(),
		Username:   session.Username(),
		Content:    "{}",
		CreateTime: &timestamppb.Timestamp{Seconds: ts},
		UpdateTime: &timestamppb.Timestamp{Seconds: ts},
		Persistent: &wrapperspb.BoolValue{Value: meta.Persistence},
	}
	switch streamConversionResult.Stream.Mode {
	case StreamModeChannel:
		message.RoomName = streamConversionResult.Stream.Label
	case StreamModeGroup:
		message.GroupId = streamConversionResult.Stream.Subject.String()
	case StreamModeDM:
		message.UserIdOne = streamConversionResult.Stream.Subject.String()
		message.UserIdTwo = streamConversionResult.Stream.Subcontext.String()
	}

	if meta.Persistence {
		// First find and remove the referenced message.
		var dbCreateTime pgtype.Timestamptz
		query := "DELETE FROM message WHERE id = $1 AND sender_id = $2 RETURNING create_time"
		err := p.db.QueryRowContext(session.Context(), query, incoming.MessageId, message.SenderId).Scan(&dbCreateTime)
		if err != nil {
			if err == sql.ErrNoRows {
	ack, err := ChannelMessageRemove(session.Context(), p.logger, p.db, p.router, streamConversionResult.Stream, incoming.ChannelId, incoming.MessageId, session.UserID().String(), session.Username(), meta.Persistence)
	switch err {
	case errChannelMessageNotFound:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Could not find message to remove in channel history",
		}}}, true)
		return false, nil
			}
			logger.Error("Error persisting channel message remove", zap.Error(err))
	case errChannelMessagePersist:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Could not persist message remove to channel history",
		}}}, true)
			return false, nil
		}
		// Replace the message create time with the real one from DB.
		message.CreateTime = &timestamppb.Timestamp{Seconds: dbCreateTime.Time.Unix()}
	}

	ack := &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
		Username:   message.Username,
		CreateTime: message.CreateTime,
		UpdateTime: message.UpdateTime,
		Persistent: message.Persistent,
	}
	switch streamConversionResult.Stream.Mode {
	case StreamModeChannel:
		ack.RoomName = streamConversionResult.Stream.Label
	case StreamModeGroup:
		ack.GroupId = streamConversionResult.Stream.Subject.String()
	case StreamModeDM:
		ack.UserIdOne = streamConversionResult.Stream.Subject.String()
		ack.UserIdTwo = streamConversionResult.Stream.Subcontext.String()
	}

	out := &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: ack}}
	session.Send(out, true)

	p.router.SendToStream(logger, streamConversionResult.Stream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)

	return true, out
}
+27 −0
Original line number Diff line number Diff line
@@ -3861,6 +3861,10 @@ func (n *RuntimeGoNakamaModule) ChannelMessageUpdate(ctx context.Context, channe
		return nil, err
	}

	if _, err := uuid.FromString(messageId); err != nil {
		return nil, errChannelMessageIdInvalid
	}

	contentStr := "{}"
	if content != nil {
		contentBytes, err := json.Marshal(content)
@@ -3873,6 +3877,29 @@ func (n *RuntimeGoNakamaModule) ChannelMessageUpdate(ctx context.Context, channe
	return ChannelMessageUpdate(ctx, n.logger, n.db, n.router, channelIdToStreamResult.Stream, channelId, messageId, contentStr, senderId, senderUsername, persist)
}

// @group chat
// @summary Remove a message on a realtime chat channel.
// @param ctx(type=context.Context) The context object represents information about the server and requester.
// @param channelId(type=string) The ID of the channel to remove the message on.
// @param messageId(type=string) The ID of the message to remove.
// @param senderId(type=string, optional=true) The UUID for the sender of this message. If left empty, it will be assumed that it is a system message.
// @param senderUsername(type=string, optional=true) The username of the user who sent this message. If left empty, it will be assumed that it is a system message.
// @param persist(type=bool, optional=true, default=true) Whether to record this in the channel history.
// @return channelMessageRemove(*rtapi.ChannelMessageAck) Message removed ack.
// @return error(error) An optional error value if an error occurred.
func (n *RuntimeGoNakamaModule) ChannelMessageRemove(ctx context.Context, channelId, messageId string, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
	channelIdToStreamResult, err := ChannelIdToStream(channelId)
	if err != nil {
		return nil, err
	}

	if _, err := uuid.FromString(messageId); err != nil {
		return nil, errChannelMessageIdInvalid
	}

	return ChannelMessageRemove(ctx, n.logger, n.db, n.router, channelIdToStreamResult.Stream, channelId, messageId, senderId, senderUsername, persist)
}

// @group chat
// @summary List messages from a realtime chat channel.
// @param ctx(type=context.Context) The context object represents information about the server and requester.
Loading