Loading CHANGELOG.md +2 −1 Original line number Diff line number Diff line Loading @@ -4,7 +4,8 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org). ## [Unreleased] ### Added - Add final notification sent to sockets closed via single socket option. ## [3.7.0] - 2021-09-28 ### Added Loading server/core_notification.go +1 −0 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ const ( NotificationCodeGroupAdd int32 = -4 NotificationCodeGroupJoinRequest int32 = -5 NotificationCodeFriendJoinGame int32 = -6 NotificationCodeSingleSocket int32 = -7 ) type notificationCacheableCursor struct { Loading server/pipeline_match.go +6 −1 Original line number Diff line number Diff line Loading @@ -227,7 +227,12 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap Username: session.Username(), Format: session.Format(), } p.tracker.Track(session.Context(), session.ID(), stream, session.UserID(), m, false) if success, _ := p.tracker.Track(session.Context(), session.ID(), stream, session.UserID(), m, false); success { if p.config.GetSession().SingleMatch { // Kick the user from any other matches they may be part of. p.tracker.UntrackLocalByModes(session.ID(), matchStreamModes, stream) } } } label = &wrapperspb.StringValue{Value: l} Loading server/session_registry.go +20 −2 Original line number Diff line number Diff line Loading @@ -16,7 +16,10 @@ package server import ( "context" "github.com/heroiclabs/nakama-common/api" "google.golang.org/protobuf/types/known/timestamppb" "sync" "time" "github.com/gofrs/uuid" "github.com/heroiclabs/nakama-common/rtapi" Loading Loading @@ -53,7 +56,7 @@ type Session interface { Send(envelope *rtapi.Envelope, reliable bool) error SendBytes(payload []byte, reliable bool) error Close(msg string, reason runtime.PresenceReason) Close(msg string, reason runtime.PresenceReason, envelopes ...*rtapi.Envelope) } type SessionRegistry interface { Loading Loading @@ -131,7 +134,22 @@ func (r *LocalSessionRegistry) SingleSession(ctx context.Context, tracker Tracke session, ok := r.sessions.Load(foundSessionID) if ok { // No need to remove the session from the map, session.Close() will do that. session.(Session).Close("server-side session disconnect", runtime.PresenceReasonDisconnect) session.(Session).Close("server-side session disconnect", runtime.PresenceReasonDisconnect, &rtapi.Envelope{Message: &rtapi.Envelope_Notifications{ Notifications: &rtapi.Notifications{ Notifications: []*api.Notification{ { Id: uuid.Must(uuid.NewV4()).String(), Subject: "single_socket", Content: "{}", Code: NotificationCodeSingleSocket, SenderId: "", CreateTime: ×tamppb.Timestamp{Seconds: time.Now().Unix()}, Persistent: false, }, }, }, }}) } } } server/session_ws.go +43 −1 Original line number Diff line number Diff line Loading @@ -422,7 +422,7 @@ func (s *sessionWS) SendBytes(payload []byte, reliable bool) error { } } func (s *sessionWS) Close(msg string, reason runtime.PresenceReason) { func (s *sessionWS) Close(msg string, reason runtime.PresenceReason, envelopes ...*rtapi.Envelope) { s.Lock() if s.stopped { s.Unlock() Loading Loading @@ -462,6 +462,48 @@ func (s *sessionWS) Close(msg string, reason runtime.PresenceReason) { s.pingTimer.Stop() close(s.outgoingCh) // Send final messages, if any are specified. for _, envelope := range envelopes { var payload []byte var err error switch s.format { case SessionFormatProtobuf: payload, err = proto.Marshal(envelope) case SessionFormatJson: fallthrough default: if buf, err := s.protojsonMarshaler.Marshal(envelope); err == nil { payload = buf } } if err != nil { s.logger.Warn("Could not marshal envelope", zap.Error(err)) continue } if s.logger.Core().Enabled(zap.DebugLevel) { switch envelope.Message.(type) { case *rtapi.Envelope_Error: s.logger.Debug("Sending error message", zap.Binary("payload", payload)) default: s.logger.Debug(fmt.Sprintf("Sending %T message", envelope.Message), zap.Any("envelope", envelope)) } } s.Lock() if err := s.conn.SetWriteDeadline(time.Now().Add(s.writeWaitDuration)); err != nil { s.Unlock() s.logger.Warn("Failed to set write deadline", zap.Error(err)) continue } if err := s.conn.WriteMessage(s.wsMessageType, payload); err != nil { s.Unlock() s.logger.Warn("Could not write message", zap.Error(err)) continue } s.Unlock() } // Send close message. if err := s.conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(s.writeWaitDuration)); err != nil { // This may not be possible if the socket was already fully closed by an error. Loading Loading
CHANGELOG.md +2 −1 Original line number Diff line number Diff line Loading @@ -4,7 +4,8 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com) and this project uses [semantic versioning](http://semver.org). ## [Unreleased] ### Added - Add final notification sent to sockets closed via single socket option. ## [3.7.0] - 2021-09-28 ### Added Loading
server/core_notification.go +1 −0 Original line number Diff line number Diff line Loading @@ -39,6 +39,7 @@ const ( NotificationCodeGroupAdd int32 = -4 NotificationCodeGroupJoinRequest int32 = -5 NotificationCodeFriendJoinGame int32 = -6 NotificationCodeSingleSocket int32 = -7 ) type notificationCacheableCursor struct { Loading
server/pipeline_match.go +6 −1 Original line number Diff line number Diff line Loading @@ -227,7 +227,12 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap Username: session.Username(), Format: session.Format(), } p.tracker.Track(session.Context(), session.ID(), stream, session.UserID(), m, false) if success, _ := p.tracker.Track(session.Context(), session.ID(), stream, session.UserID(), m, false); success { if p.config.GetSession().SingleMatch { // Kick the user from any other matches they may be part of. p.tracker.UntrackLocalByModes(session.ID(), matchStreamModes, stream) } } } label = &wrapperspb.StringValue{Value: l} Loading
server/session_registry.go +20 −2 Original line number Diff line number Diff line Loading @@ -16,7 +16,10 @@ package server import ( "context" "github.com/heroiclabs/nakama-common/api" "google.golang.org/protobuf/types/known/timestamppb" "sync" "time" "github.com/gofrs/uuid" "github.com/heroiclabs/nakama-common/rtapi" Loading Loading @@ -53,7 +56,7 @@ type Session interface { Send(envelope *rtapi.Envelope, reliable bool) error SendBytes(payload []byte, reliable bool) error Close(msg string, reason runtime.PresenceReason) Close(msg string, reason runtime.PresenceReason, envelopes ...*rtapi.Envelope) } type SessionRegistry interface { Loading Loading @@ -131,7 +134,22 @@ func (r *LocalSessionRegistry) SingleSession(ctx context.Context, tracker Tracke session, ok := r.sessions.Load(foundSessionID) if ok { // No need to remove the session from the map, session.Close() will do that. session.(Session).Close("server-side session disconnect", runtime.PresenceReasonDisconnect) session.(Session).Close("server-side session disconnect", runtime.PresenceReasonDisconnect, &rtapi.Envelope{Message: &rtapi.Envelope_Notifications{ Notifications: &rtapi.Notifications{ Notifications: []*api.Notification{ { Id: uuid.Must(uuid.NewV4()).String(), Subject: "single_socket", Content: "{}", Code: NotificationCodeSingleSocket, SenderId: "", CreateTime: ×tamppb.Timestamp{Seconds: time.Now().Unix()}, Persistent: false, }, }, }, }}) } } }
server/session_ws.go +43 −1 Original line number Diff line number Diff line Loading @@ -422,7 +422,7 @@ func (s *sessionWS) SendBytes(payload []byte, reliable bool) error { } } func (s *sessionWS) Close(msg string, reason runtime.PresenceReason) { func (s *sessionWS) Close(msg string, reason runtime.PresenceReason, envelopes ...*rtapi.Envelope) { s.Lock() if s.stopped { s.Unlock() Loading Loading @@ -462,6 +462,48 @@ func (s *sessionWS) Close(msg string, reason runtime.PresenceReason) { s.pingTimer.Stop() close(s.outgoingCh) // Send final messages, if any are specified. for _, envelope := range envelopes { var payload []byte var err error switch s.format { case SessionFormatProtobuf: payload, err = proto.Marshal(envelope) case SessionFormatJson: fallthrough default: if buf, err := s.protojsonMarshaler.Marshal(envelope); err == nil { payload = buf } } if err != nil { s.logger.Warn("Could not marshal envelope", zap.Error(err)) continue } if s.logger.Core().Enabled(zap.DebugLevel) { switch envelope.Message.(type) { case *rtapi.Envelope_Error: s.logger.Debug("Sending error message", zap.Binary("payload", payload)) default: s.logger.Debug(fmt.Sprintf("Sending %T message", envelope.Message), zap.Any("envelope", envelope)) } } s.Lock() if err := s.conn.SetWriteDeadline(time.Now().Add(s.writeWaitDuration)); err != nil { s.Unlock() s.logger.Warn("Failed to set write deadline", zap.Error(err)) continue } if err := s.conn.WriteMessage(s.wsMessageType, payload); err != nil { s.Unlock() s.logger.Warn("Could not write message", zap.Error(err)) continue } s.Unlock() } // Send close message. if err := s.conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(s.writeWaitDuration)); err != nil { // This may not be possible if the socket was already fully closed by an error. Loading