Unverified Commit 80879a3e authored by Andrei Mihu's avatar Andrei Mihu Committed by GitHub
Browse files

Outgoing and incoming payloads in realtime after hooks. (#808)

parent 6c855dc9
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- JavaScript global variables are made immutable after the `InitModule` function is invoked.
- JavaScript global variables are made immutable by default after the `InitModule` function is invoked.
- Return system user uuid string in `StorageWrite` acks for all runtimes.
- Realtime after hooks now include both the outgoing and incoming payload.
- Realtime after hooks do not run when the operation fails.

### Fixed
- Fix the registered function name for 'nk.channelIdBuild' in the JavaScript runtime.
+1 −1
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@ require (
	github.com/gorilla/mux v1.8.0
	github.com/gorilla/websocket v1.4.2
	github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0
	github.com/heroiclabs/nakama-common v1.21.1-0.20220315125242-f39e5bc77bdb
	github.com/heroiclabs/nakama-common v1.21.1-0.20220317110306-60fbe58e3b1a
	github.com/jackc/pgconn v1.10.0
	github.com/jackc/pgerrcode v0.0.0-20201024163028-a0d42d470451
	github.com/jackc/pgtype v1.8.1
+2 −0
Original line number Diff line number Diff line
@@ -257,6 +257,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/heroiclabs/nakama-common v1.21.1-0.20220315125242-f39e5bc77bdb h1:FoxvzXI7Hna6Om/6VZCTwzEoflg+fXl+AxbRvyHu7cM=
github.com/heroiclabs/nakama-common v1.21.1-0.20220315125242-f39e5bc77bdb/go.mod h1:WF4YG46afwY3ibzsXnkt3zvhQ3tBY03IYeU7xSLr8HE=
github.com/heroiclabs/nakama-common v1.21.1-0.20220317110306-60fbe58e3b1a h1:VuU7YNVN/urZqNNcS05mF1U8UUpGlP1e7VtuH4lVJLs=
github.com/heroiclabs/nakama-common v1.21.1-0.20220317110306-60fbe58e3b1a/go.mod h1:WF4YG46afwY3ibzsXnkt3zvhQ3tBY03IYeU7xSLr8HE=
github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
+18 −17
Original line number Diff line number Diff line
@@ -60,22 +60,22 @@ func NewPipeline(logger *zap.Logger, config Config, db *sql.DB, protojsonMarshal
	}
}

func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope *rtapi.Envelope) bool {
func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, in *rtapi.Envelope) bool {
	if logger.Core().Enabled(zap.DebugLevel) { // remove extra heavy reflection processing
		logger.Debug(fmt.Sprintf("Received %T message", envelope.Message), zap.Any("message", envelope.Message))
		logger.Debug(fmt.Sprintf("Received %T message", in.Message), zap.Any("message", in.Message))
	}

	if envelope.Message == nil {
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
	if in.Message == nil {
		session.Send(&rtapi.Envelope{Cid: in.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_MISSING_PAYLOAD),
			Message: "Missing message.",
		}}}, true)
		return false
	}

	var pipelineFn func(*zap.Logger, Session, *rtapi.Envelope)
	var pipelineFn func(*zap.Logger, Session, *rtapi.Envelope) (bool, *rtapi.Envelope)

	switch envelope.Message.(type) {
	switch in.Message.(type) {
	case *rtapi.Envelope_ChannelJoin:
		pipelineFn = p.channelJoin
	case *rtapi.Envelope_ChannelLeave:
@@ -135,8 +135,8 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
	default:
		// If we reached this point the envelope was valid but the contents are missing or unknown.
		// Usually caused by a version mismatch, and should cause the session making this pipeline request to close.
		logger.Error("Unrecognizable payload received.", zap.Any("payload", envelope))
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
		logger.Error("Unrecognizable payload received.", zap.Any("payload", in))
		session.Send(&rtapi.Envelope{Cid: in.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_UNRECOGNIZED_PAYLOAD),
			Message: "Unrecognized message.",
		}}}, true)
@@ -145,19 +145,19 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope

	var messageName, messageNameID string

	switch envelope.Message.(type) {
	switch in.Message.(type) {
	case *rtapi.Envelope_Rpc:
		// No before/after hooks on RPC.
	default:
		messageName = fmt.Sprintf("%T", envelope.Message)
		messageName = fmt.Sprintf("%T", in.Message)
		messageNameID = strings.ToLower(messageName)

		if fn := p.runtime.BeforeRt(messageNameID); fn != nil {
			hookResult, hookErr := fn(session.Context(), logger, session.UserID().String(), session.Username(), session.Vars(), session.Expiry(), session.ID().String(), session.ClientIP(), session.ClientPort(), session.Lang(), envelope)
			hookResult, hookErr := fn(session.Context(), logger, session.UserID().String(), session.Username(), session.Vars(), session.Expiry(), session.ID().String(), session.ClientIP(), session.ClientPort(), session.Lang(), in)

			if hookErr != nil {
				// Errors from before hooks do not close the session.
				session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				session.Send(&rtapi.Envelope{Cid: in.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
					Code:    int32(rtapi.Error_RUNTIME_FUNCTION_EXCEPTION),
					Message: hookErr.Error(),
				}}}, true)
@@ -165,22 +165,23 @@ func (p *Pipeline) ProcessRequest(logger *zap.Logger, session Session, envelope
			} else if hookResult == nil {
				// If result is nil, requested resource is disabled. Sessions calling disabled resources will be closed.
				logger.Warn("Intercepted a disabled resource.", zap.String("resource", messageName))
				session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				session.Send(&rtapi.Envelope{Cid: in.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
					Code:    int32(rtapi.Error_UNRECOGNIZED_PAYLOAD),
					Message: "Requested resource was not found.",
				}}}, true)
				return false
			}

			envelope = hookResult
			in = hookResult
		}
	}

	pipelineFn(logger, session, envelope)
	success, out := pipelineFn(logger, session, in)

	if messageName != "" {
	if success && messageName != "" {
		// Unsuccessful operations do not trigger after hooks.
		if fn := p.runtime.AfterRt(messageNameID); fn != nil {
			fn(session.Context(), logger, session.UserID().String(), session.Username(), session.Vars(), session.Expiry(), session.ID().String(), session.ClientIP(), session.ClientPort(), session.Lang(), envelope)
			fn(session.Context(), logger, session.UserID().String(), session.Username(), session.Vars(), session.Expiry(), session.ID().String(), session.ClientIP(), session.ClientPort(), session.Lang(), out, in)
		}
	}

+44 −28
Original line number Diff line number Diff line
@@ -19,6 +19,9 @@ import (
	"encoding/json"
	"errors"
	"fmt"
	"regexp"
	"time"

	"github.com/gofrs/uuid"
	"github.com/heroiclabs/nakama-common/api"
	"github.com/heroiclabs/nakama-common/rtapi"
@@ -27,8 +30,6 @@ import (
	"go.uber.org/zap"
	"google.golang.org/protobuf/types/known/timestamppb"
	"google.golang.org/protobuf/types/known/wrapperspb"
	"regexp"
	"time"
)

const (
@@ -48,7 +49,7 @@ var ErrChannelMessageUpdateNotFound = errors.New("channel message not found")

var controlCharsRegex = regexp.MustCompilePOSIX("[[:cntrl:]]+")

func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rtapi.Envelope) (bool, *rtapi.Envelope) {
	incoming := envelope.GetChannelJoin()

	channelID, stream, err := BuildChannelId(session.Context(), logger, p.db, session.UserID(), incoming.Target, rtapi.ChannelJoin_Type(incoming.Type))
@@ -58,13 +59,13 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
				Code:    int32(rtapi.Error_BAD_INPUT),
				Message: err.Error(),
			}}}, true)
			return
			return false, nil
		} else {
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
				Message: err.Error(),
			}}}, true)
			return
			return false, nil
		}
	}

@@ -80,7 +81,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Error joining channel",
		}}}, true)
		return
		return false, nil
	}

	// List current presences, not including hidden ones.
@@ -163,10 +164,13 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
		channel.UserIdTwo = stream.Subcontext.String()
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Channel{Channel: channel}}, true)
	out := &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Channel{Channel: channel}}
	session.Send(out, true)

	return true, out
}

func (p *Pipeline) channelLeave(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
func (p *Pipeline) channelLeave(logger *zap.Logger, session Session, envelope *rtapi.Envelope) (bool, *rtapi.Envelope) {
	incoming := envelope.GetChannelLeave()

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
@@ -175,15 +179,18 @@ func (p *Pipeline) channelLeave(logger *zap.Logger, session Session, envelope *r
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid channel identifier",
		}}}, true)
		return
		return false, nil
	}

	p.tracker.Untrack(session.ID(), streamConversionResult.Stream, session.UserID())

	session.Send(&rtapi.Envelope{Cid: envelope.Cid}, true)
	out := &rtapi.Envelope{Cid: envelope.Cid}
	session.Send(out, true)

	return true, out
}

func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envelope *rtapi.Envelope) (bool, *rtapi.Envelope) {
	incoming := envelope.GetChannelMessageSend()

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
@@ -192,7 +199,7 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid channel identifier",
		}}}, true)
		return
		return false, nil
	}

	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), streamConversionResult.Stream, session.UserID())
@@ -201,7 +208,7 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Must join channel before sending messages",
		}}}, true)
		return
		return false, nil
	}

	ack, err := ChannelMessageSend(session.Context(), p.logger, p.db, p.router, streamConversionResult.Stream, incoming.ChannelId, incoming.Content, session.UserID().String(), session.Username(), meta.Persistence)
@@ -211,19 +218,22 @@ func (p *Pipeline) channelMessageSend(logger *zap.Logger, session Session, envel
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Message content must be a valid JSON object",
		}}}, true)
		return
		return false, nil
	case errMessagePersist:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
			Message: "Could not persist message to channel history",
		}}}, true)
		return
		return false, nil
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: ack}}, true)
	out := &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: ack}}
	session.Send(out, true)

	return true, out
}

func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, envelope *rtapi.Envelope) (bool, *rtapi.Envelope) {
	incoming := envelope.GetChannelMessageUpdate()

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
@@ -232,7 +242,7 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid channel identifier",
		}}}, true)
		return
		return false, nil
	}

	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), streamConversionResult.Stream, session.UserID())
@@ -241,7 +251,7 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Must join channel before updating messages",
		}}}, true)
		return
		return false, nil
	}

	ack, err := ChannelMessageUpdate(session.Context(), p.logger, p.db, p.router, streamConversionResult.Stream, incoming.ChannelId, incoming.MessageId, incoming.Content, session.UserID().String(), session.Username(), meta.Persistence)
@@ -251,7 +261,7 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: err.Error(),
		}}}, true)
		return
		return false, nil
	case errMessagePersist:
		session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
			Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
@@ -259,10 +269,13 @@ func (p *Pipeline) channelMessageUpdate(logger *zap.Logger, session Session, env
		}}}, true)
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: ack}}, true)
	out := &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: ack}}
	session.Send(out, true)

	return true, out
}

func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, envelope *rtapi.Envelope) {
func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, envelope *rtapi.Envelope) (bool, *rtapi.Envelope) {
	incoming := envelope.GetChannelMessageRemove()

	if _, err := uuid.FromString(incoming.MessageId); err != nil {
@@ -270,7 +283,7 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid message identifier",
		}}}, true)
		return
		return false, nil
	}

	streamConversionResult, err := ChannelIdToStream(incoming.ChannelId)
@@ -279,7 +292,7 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Invalid channel identifier",
		}}}, true)
		return
		return false, nil
	}

	meta := p.tracker.GetLocalBySessionIDStreamUserID(session.ID(), streamConversionResult.Stream, session.UserID())
@@ -288,7 +301,7 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env
			Code:    int32(rtapi.Error_BAD_INPUT),
			Message: "Must join channel before removing messages",
		}}}, true)
		return
		return false, nil
	}

	ts := time.Now().Unix()
@@ -324,14 +337,14 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env
					Code:    int32(rtapi.Error_BAD_INPUT),
					Message: "Could not find message to remove in channel history",
				}}}, true)
				return
				return false, nil
			}
			logger.Error("Error persisting channel message remove", zap.Error(err))
			session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_Error{Error: &rtapi.Error{
				Code:    int32(rtapi.Error_RUNTIME_EXCEPTION),
				Message: "Could not persist message remove to channel history",
			}}}, true)
			return
			return false, nil
		}
		// Replace the message create time with the real one from DB.
		message.CreateTime = &timestamppb.Timestamp{Seconds: dbCreateTime.Time.Unix()}
@@ -356,7 +369,10 @@ func (p *Pipeline) channelMessageRemove(logger *zap.Logger, session Session, env
		ack.UserIdTwo = streamConversionResult.Stream.Subcontext.String()
	}

	session.Send(&rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: ack}}, true)
	out := &rtapi.Envelope{Cid: envelope.Cid, Message: &rtapi.Envelope_ChannelMessageAck{ChannelMessageAck: ack}}
	session.Send(out, true)

	p.router.SendToStream(logger, streamConversionResult.Stream, &rtapi.Envelope{Message: &rtapi.Envelope_ChannelMessage{ChannelMessage: message}}, true)

	return true, out
}
Loading