diff --git a/CHANGELOG.md b/CHANGELOG.md index 5bc4e6638968eb2e70221fe87a71632dc18429bb..416d9a27594d1c7a2470cbd0c84a318f45a946ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,8 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org). ## [Unreleased] - +### Added +- Add final notification sent to sockets closed via single socket option. ## [3.7.0] - 2021-09-28 ### Added diff --git a/server/core_notification.go b/server/core_notification.go index 5ac2d99762449c342ac7ceb7fdb4a0e7784659fe..4746dcbd8b12e4f3b941d0906cbe58fc018d8589 100644 --- a/server/core_notification.go +++ b/server/core_notification.go @@ -39,6 +39,7 @@ const ( NotificationCodeGroupAdd int32 = -4 NotificationCodeGroupJoinRequest int32 = -5 NotificationCodeFriendJoinGame int32 = -6 + NotificationCodeSingleSocket int32 = -7 ) type notificationCacheableCursor struct { diff --git a/server/pipeline_match.go b/server/pipeline_match.go index b111cc381c541342240c52ba70a434ef8c46ce7d..b099c903a13727f898debbed01750d86fb9f1c59 100644 --- a/server/pipeline_match.go +++ b/server/pipeline_match.go @@ -227,7 +227,12 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap Username: session.Username(), Format: session.Format(), } - p.tracker.Track(session.Context(), session.ID(), stream, session.UserID(), m, false) + if success, _ := p.tracker.Track(session.Context(), session.ID(), stream, session.UserID(), m, false); success { + if p.config.GetSession().SingleMatch { + // Kick the user from any other matches they may be part of. + p.tracker.UntrackLocalByModes(session.ID(), matchStreamModes, stream) + } + } } label = &wrapperspb.StringValue{Value: l} diff --git a/server/session_registry.go b/server/session_registry.go index d6a1d7e67db7c6e41be4c9990f9c5ac8a4daf8fa..a1cb5bc8328fb687d24187e8d4b22e08bec333a9 100644 --- a/server/session_registry.go +++ b/server/session_registry.go @@ -16,7 +16,10 @@ package server import ( "context" + "github.com/heroiclabs/nakama-common/api" + "google.golang.org/protobuf/types/known/timestamppb" "sync" + "time" "github.com/gofrs/uuid" "github.com/heroiclabs/nakama-common/rtapi" @@ -53,7 +56,7 @@ type Session interface { Send(envelope *rtapi.Envelope, reliable bool) error SendBytes(payload []byte, reliable bool) error - Close(msg string, reason runtime.PresenceReason) + Close(msg string, reason runtime.PresenceReason, envelopes ...*rtapi.Envelope) } type SessionRegistry interface { @@ -131,7 +134,22 @@ func (r *LocalSessionRegistry) SingleSession(ctx context.Context, tracker Tracke session, ok := r.sessions.Load(foundSessionID) if ok { // No need to remove the session from the map, session.Close() will do that. - session.(Session).Close("server-side session disconnect", runtime.PresenceReasonDisconnect) + session.(Session).Close("server-side session disconnect", runtime.PresenceReasonDisconnect, + &rtapi.Envelope{Message: &rtapi.Envelope_Notifications{ + Notifications: &rtapi.Notifications{ + Notifications: []*api.Notification{ + { + Id: uuid.Must(uuid.NewV4()).String(), + Subject: "single_socket", + Content: "{}", + Code: NotificationCodeSingleSocket, + SenderId: "", + CreateTime: ×tamppb.Timestamp{Seconds: time.Now().Unix()}, + Persistent: false, + }, + }, + }, + }}) } } } diff --git a/server/session_ws.go b/server/session_ws.go index f10b39d79f74a00a5ab1bbf831dbcab646445f7b..3719b1dbeb720e502e70af1eb9c6fc708a1bf78c 100644 --- a/server/session_ws.go +++ b/server/session_ws.go @@ -422,7 +422,7 @@ func (s *sessionWS) SendBytes(payload []byte, reliable bool) error { } } -func (s *sessionWS) Close(msg string, reason runtime.PresenceReason) { +func (s *sessionWS) Close(msg string, reason runtime.PresenceReason, envelopes ...*rtapi.Envelope) { s.Lock() if s.stopped { s.Unlock() @@ -462,6 +462,48 @@ func (s *sessionWS) Close(msg string, reason runtime.PresenceReason) { s.pingTimer.Stop() close(s.outgoingCh) + // Send final messages, if any are specified. + for _, envelope := range envelopes { + var payload []byte + var err error + switch s.format { + case SessionFormatProtobuf: + payload, err = proto.Marshal(envelope) + case SessionFormatJson: + fallthrough + default: + if buf, err := s.protojsonMarshaler.Marshal(envelope); err == nil { + payload = buf + } + } + if err != nil { + s.logger.Warn("Could not marshal envelope", zap.Error(err)) + continue + } + + if s.logger.Core().Enabled(zap.DebugLevel) { + switch envelope.Message.(type) { + case *rtapi.Envelope_Error: + s.logger.Debug("Sending error message", zap.Binary("payload", payload)) + default: + s.logger.Debug(fmt.Sprintf("Sending %T message", envelope.Message), zap.Any("envelope", envelope)) + } + } + + s.Lock() + if err := s.conn.SetWriteDeadline(time.Now().Add(s.writeWaitDuration)); err != nil { + s.Unlock() + s.logger.Warn("Failed to set write deadline", zap.Error(err)) + continue + } + if err := s.conn.WriteMessage(s.wsMessageType, payload); err != nil { + s.Unlock() + s.logger.Warn("Could not write message", zap.Error(err)) + continue + } + s.Unlock() + } + // Send close message. if err := s.conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(s.writeWaitDuration)); err != nil { // This may not be possible if the socket was already fully closed by an error.