Commit bac1fc86 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Ensure presence events respect session format.

parent 930b38f7
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -86,7 +86,7 @@ func (r *LocalMessageRouter) SendToPresenceIDs(logger *zap.Logger, presenceIDs [
			err = session.SendBytes(isStream, mode, payloadJson)
		}
		if err != nil {
			logger.Error("Failed to route to", zap.String("sid", presenceID.SessionID.String()), zap.Error(err))
			logger.Error("Failed to route message", zap.String("sid", presenceID.SessionID.String()), zap.Error(err))
		}
	}
}
+78 −18
Original line number Diff line number Diff line
@@ -15,6 +15,8 @@
package server

import (
	"bytes"
	"github.com/golang/protobuf/proto"
	"sync"

	"fmt"
@@ -799,19 +801,48 @@ func (t *LocalTracker) processEvent(e *PresenceEvent) {
				Leaves: leaves,
			}}}
		}
		payload, err := t.jsonpbMarshaler.MarshalToString(envelope)
		if err != nil {
			t.logger.Warn("Could not marshal presence event to json", zap.Error(err))
			continue
		}
		payloadByte := []byte(payload)

		// Prepare payload variables but do not initialize until we hit a session that needs them to avoid unnecessary work.
		var payloadProtobuf []byte
		var payloadJson []byte

		// Deliver event.
		for _, sessionID := range sessionIDs {
			if s := t.sessionRegistry.Get(sessionID); s != nil {
				s.SendBytes(true, stream.Mode, payloadByte)
			} else {
			session := t.sessionRegistry.Get(sessionID)
			if session == nil {
				t.logger.Debug("Could not deliver presence event, no session", zap.String("sid", sessionID.String()))
				continue
			}

			var err error
			switch session.Format() {
			case SessionFormatProtobuf:
				if payloadProtobuf == nil {
					// Marshal the payload now that we know this format is needed.
					payloadProtobuf, err = proto.Marshal(envelope)
					if err != nil {
						t.logger.Error("Could not marshal presence event", zap.Error(err))
						return
					}
				}
				err = session.SendBytes(true, stream.Mode, payloadProtobuf)
			case SessionFormatJson:
				fallthrough
			default:
				if payloadJson == nil {
					// Marshal the payload now that we know this format is needed.
					var buf bytes.Buffer
					if err = t.jsonpbMarshaler.Marshal(&buf, envelope); err == nil {
						payloadJson = buf.Bytes()
					} else {
						t.logger.Error("Could not marshal presence event", zap.Error(err))
						return
					}
				}
				err = session.SendBytes(true, stream.Mode, payloadJson)
			}
			if err != nil {
				t.logger.Error("Failed to deliver presence event", zap.String("sid", sessionID.String()), zap.Error(err))
			}
		}
	}
@@ -875,19 +906,48 @@ func (t *LocalTracker) processEvent(e *PresenceEvent) {
				Leaves: leaves,
			}}}
		}
		payload, err := t.jsonpbMarshaler.MarshalToString(envelope)
		if err != nil {
			t.logger.Warn("Could not marshal presence event to json", zap.Error(err))
			continue
		}
		payloadByte := []byte(payload)

		// Prepare payload variables but do not initialize until we hit a session that needs them to avoid unnecessary work.
		var payloadProtobuf []byte
		var payloadJson []byte

		// Deliver event.
		for _, sessionID := range sessionIDs {
			if s := t.sessionRegistry.Get(sessionID); s != nil {
				s.SendBytes(true, stream.Mode, payloadByte)
			} else {
			session := t.sessionRegistry.Get(sessionID)
			if session == nil {
				t.logger.Debug("Could not deliver presence event, no session", zap.String("sid", sessionID.String()))
				continue
			}

			var err error
			switch session.Format() {
			case SessionFormatProtobuf:
				if payloadProtobuf == nil {
					// Marshal the payload now that we know this format is needed.
					payloadProtobuf, err = proto.Marshal(envelope)
					if err != nil {
						t.logger.Error("Could not marshal presence event", zap.Error(err))
						return
					}
				}
				err = session.SendBytes(true, stream.Mode, payloadProtobuf)
			case SessionFormatJson:
				fallthrough
			default:
				if payloadJson == nil {
					// Marshal the payload now that we know this format is needed.
					var buf bytes.Buffer
					if err = t.jsonpbMarshaler.Marshal(&buf, envelope); err == nil {
						payloadJson = buf.Bytes()
					} else {
						t.logger.Error("Could not marshal presence event", zap.Error(err))
						return
					}
				}
				err = session.SendBytes(true, stream.Mode, payloadJson)
			}
			if err != nil {
				t.logger.Error("Failed to deliver presence event", zap.String("sid", sessionID.String()), zap.Error(err))
			}
		}
	}