Unverified Commit 47f940ce authored by Simon Esposito's avatar Simon Esposito Committed by GitHub
Browse files

Add ChannelMessageUpdate runtime function. (#685)

parent b761963c
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
## [Unreleased]
### Added
- More informational logging when groups are created, updated, or deleted.
- Add ChannelMessageUpdate function to server framework.

### Changed
- Use the Facebook Graph API v11.0 version.
+1 −1
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@ require (
	github.com/gorilla/mux v1.8.0
	github.com/gorilla/websocket v1.4.2
	github.com/grpc-ecosystem/grpc-gateway/v2 v2.3.0
	github.com/heroiclabs/nakama-common v0.0.0-20210907132733-e48b382b6dab
	github.com/heroiclabs/nakama-common v0.0.0-20210907132825-508fa27d8eeb
	github.com/jackc/pgconn v1.8.1
	github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
	github.com/jackc/pgtype v1.7.0
+2 −2
+66 −0
Original line number Diff line number Diff line
@@ -9,13 +9,16 @@ import (
	"github.com/gofrs/uuid"
	"github.com/heroiclabs/nakama-common/api"
	"github.com/heroiclabs/nakama-common/rtapi"
	"github.com/jackc/pgtype"
	"go.uber.org/zap"
	"google.golang.org/protobuf/types/known/timestamppb"
	"google.golang.org/protobuf/types/known/wrapperspb"
	"time"
)

var errInvalidMessageId = errors.New("Invalid message identifier")
var errInvalidMessageContent = errors.New("Message content must be a valid JSON object")
var errMessageNotFound = errors.New("Could not find message to update in channel history")
var errMessagePersist = errors.New("Error persisting channel message")

func ChannelMessageSend(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
@@ -70,3 +73,66 @@ VALUES ($1, $2, $3, $4, $5, $6::UUID, $7::UUID, $8, $9, $10, $10)`

	return ack, nil
}

func ChannelMessageUpdate(ctx context.Context, logger *zap.Logger, db *sql.DB, router MessageRouter, channelStream PresenceStream, channelId, messageId, content, senderId, senderUsername string, persist bool) (*rtapi.ChannelMessageAck, error) {
	if _, err := uuid.FromString(messageId); err != nil {
		return nil, errInvalidMessageId
	}

	if maybeJSON := []byte(content); !json.Valid(maybeJSON) || bytes.TrimSpace(maybeJSON)[0] != byteBracket {
		return nil, errInvalidMessageContent
	}

	ts := time.Now().Unix()
	message := &api.ChannelMessage{
		ChannelId:  channelId,
		MessageId:  messageId,
		Code:       &wrapperspb.Int32Value{Value: ChannelMessageTypeChatUpdate},
		SenderId:   senderId,
		Username:   senderUsername,
		Content:    content,
		CreateTime: &timestamppb.Timestamp{Seconds: ts},
		UpdateTime: &timestamppb.Timestamp{Seconds: ts},
		Persistent: &wrapperspb.BoolValue{Value: persist},
	}

	ack := &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
		Username:   message.Username,
		CreateTime: message.CreateTime,
		UpdateTime: message.UpdateTime,
		Persistent: message.Persistent,
	}

	switch channelStream.Mode {
	case StreamModeChannel:
		message.RoomName, ack.RoomName = channelStream.Label, channelStream.Label
	case StreamModeGroup:
		message.GroupId, ack.GroupId = channelStream.Subject.String(), channelStream.Subject.String()
	case StreamModeDM:
		message.UserIdOne, ack.UserIdOne = channelStream.Subject.String(), channelStream.Subject.String()
		message.UserIdTwo, ack.UserIdTwo = channelStream.Subcontext.String(), channelStream.Subcontext.String()
	}

	if persist {
		// First find and update the referenced message.
		var dbCreateTime pgtype.Timestamptz
		query := "UPDATE message SET update_time = $5, username = $4, content = $3 WHERE id = $1 AND sender_id = $2 RETURNING create_time"
		err := db.QueryRowContext(ctx, query, messageId, message.SenderId, message.Content, message.Username, time.Unix(message.UpdateTime.Seconds, 0).UTC()).Scan(&dbCreateTime)
		if err != nil {
			if err == sql.ErrNoRows {
				return nil, errMessageNotFound
			}
			logger.Error("Error persisting channel message update", zap.Error(err))
			return nil, errMessagePersist
		}
		// Replace the message create time with the real one from DB.
		message.CreateTime = &timestamppb.Timestamp{Seconds: dbCreateTime.Time.Unix()}
	}

	router.SendToStream(logger, channelStream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)

	return ack, nil
}
+12 −81
Original line number Diff line number Diff line
@@ -15,7 +15,6 @@
package server

import (
	"bytes"
	"database/sql"
	"encoding/json"
	"errors"
@@ -226,14 +225,6 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel
func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
	incoming := envelope.GetChannelMessageUpdate()

	if _, err := uuid.FromString(incoming.MessageId); err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid message identifier",
		}}}, true)
		return
	}

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
	if err != nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
@@ -243,14 +234,6 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
		return
	}

	if maybeJSON := []byte(incoming.Content); !json.Valid(maybeJSON) || bytes.TrimSpace(maybeJSON)[0] != byteBracket {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Message content must be a valid JSON object",
		}}}, true)
		return
	}

	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), streamConversionResult.Stream, session.UserID())
	if meta == nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
@@ -260,74 +243,22 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
		return
	}

	ts := time.Now().Unix()
	message := &api.ChannelMessage{
		ChannelId:  incoming.ChannelId,
		MessageId:  incoming.MessageId,
		Code:       &wrapperspb.Int32Value{Value: ChannelMessageTypeChatUpdate},
		SenderId:   session.UserID().String(),
		Username:   session.Username(),
		Content:    incoming.Content,
		CreateTime: &timestamppb.Timestamp{Seconds: ts},
		UpdateTime: &timestamppb.Timestamp{Seconds: ts},
		Persistent: &wrapperspb.BoolValue{Value: meta.Persistence},
	}
	switch streamConversionResult.Stream.Mode {
	case StreamModeChannel:
		message.RoomName = streamConversionResult.Stream.Label
	case StreamModeGroup:
		message.GroupId = streamConversionResult.Stream.Subject.String()
	case StreamModeDM:
		message.UserIdOne = streamConversionResult.Stream.Subject.String()
		message.UserIdTwo = streamConversionResult.Stream.Subcontext.String()
	}

	if meta.Persistence {
		// First find and update the referenced message.
		var dbCreateTime pgtype.Timestamptz
		query := "UPDATE message SET update_time = $5, username = $4, content = $3 WHERE id = $1 AND sender_id = $2 RETURNING create_time"
		err := p.db.QueryRowContext(session.Context(), query, incoming.MessageId, message.SenderId, message.Content, message.Username, time.Unix(message.UpdateTime.Seconds, 0).UTC()).Scan(&dbCreateTime)
		if err != nil {
			if err == sql.ErrNoRows {
	ack, err := ChannelMessageUpdate(session.Context(), p.logger, p.db, p.router, streamConversionResult.Stream, incoming.ChannelId, incoming.MessageId, incoming.Content, session.UserID().String(), session.Username(), meta.Persistence)
	switch err {
	case errInvalidMessageId, errInvalidMessageContent, errMessageNotFound:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_BAD_INPUT),
					Message: "Could not find message to update in channel history",
			Message: err.Error(),
		}}}, true)
		return
			}
			logger.Error("Error persisting channel message update", zap.Error(err))
	case errMessagePersist:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Could not persist message update to channel history",
		}}}, true)
			return
		}
		// Replace the message create time with the real one from DB.
		message.CreateTime = &timestamppb.Timestamp{Seconds: dbCreateTime.Time.Unix()}
	}

	ack := &rtapi.ChannelMessageAck{
		ChannelId:  message.ChannelId,
		MessageId:  message.MessageId,
		Code:       message.Code,
		Username:   message.Username,
		CreateTime: message.CreateTime,
		UpdateTime: message.UpdateTime,
		Persistent: message.Persistent,
	}
	switch streamConversionResult.Stream.Mode {
	case StreamModeChannel:
		ack.RoomName = streamConversionResult.Stream.Label
	case StreamModeGroup:
		ack.GroupId = streamConversionResult.Stream.Subject.String()
	case StreamModeDM:
		ack.UserIdOne = streamConversionResult.Stream.Subject.String()
		ack.UserIdTwo = streamConversionResult.Stream.Subcontext.String()
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: ack}}, true)

	p.router.SendToStream(logger, streamConversionResult.Stream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)
}

func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
Loading