Commit 50274ef1 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Add close reason to session end events.

parent 9cc03728
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -173,7 +173,7 @@ type (
	RuntimeEventFunction func(ctx context.Context, logger runtime.Logger, evt *api.Event)

	RuntimeEventSessionStartFunction func(userID, username string, expiry int64, sessionID, clientIP, clientPort string, evtTimeSec int64)
	RuntimeEventSessionEndFunction   func(userID, username string, expiry int64, sessionID, clientIP, clientPort string, evtTimeSec int64)
	RuntimeEventSessionEndFunction   func(userID, username string, expiry int64, sessionID, clientIP, clientPort string, evtTimeSec int64, reason string)
)

type RuntimeExecutionMode int
+4 −3
Original line number Diff line number Diff line
@@ -1866,10 +1866,11 @@ func NewRuntimeProviderGo(logger, startupLogger *zap.Logger, db *sql.DB, config
		}
	}
	if len(initializer.sessionEndFunctions) > 0 {
		events.sessionEndFunction = func(userID, username string, expiry int64, sessionID, clientIP, clientPort string, evtTimeSec int64) {
		events.sessionEndFunction = func(userID, username string, expiry int64, sessionID, clientIP, clientPort string, evtTimeSec int64, reason string) {
			ctx := NewRuntimeGoContext(context.Background(), initializer.env, RuntimeExecutionModeEvent, nil, expiry, userID, username, sessionID, clientIP, clientPort)
			evt := &api.Event{
				Name:       "session_end",
				Properties: map[string]string{"reason": reason},
				Timestamp:  &timestamp.Timestamp{Seconds: evtTimeSec},
			}
			eventQueue.Queue(func() {
+3 −2
Original line number Diff line number Diff line
@@ -49,7 +49,7 @@ type Session interface {
	Send(isStream bool, mode uint8, envelope *rtapi.Envelope) error
	SendBytes(isStream bool, mode uint8, payload []byte) error

	Close()
	Close(reason string)
}

type SessionRegistry interface {
@@ -63,6 +63,7 @@ type SessionRegistry interface {

type LocalSessionRegistry struct {
	sync.RWMutex

	sessions map[uuid.UUID]Session
}

@@ -108,7 +109,7 @@ func (r *LocalSessionRegistry) Disconnect(ctx context.Context, sessionID uuid.UU
	session = r.sessions[sessionID]
	r.RUnlock()
	if session != nil {
		session.Close()
		session.Close("server-side session disconnect")
	}
	return nil
}
+46 −24
Original line number Diff line number Diff line
@@ -157,10 +157,15 @@ func (s *sessionWS) Expiry() int64 {
}

func (s *sessionWS) Consume(processRequest func(logger *zap.Logger, session Session, envelope *rtapi.Envelope) bool) {
	defer s.Close()
	// Fire an event for session start.
	if fn := s.runtime.EventSessionStart(); fn != nil {
		fn(s.userID.String(), s.username.Load(), s.expiry, s.id.String(), s.clientIP, s.clientPort, time.Now().UTC().Unix())
	}

	s.conn.SetReadLimit(s.config.GetSocket().MaxMessageSizeBytes)
	if err := s.conn.SetReadDeadline(time.Now().Add(s.pongWaitDuration)); err != nil {
		s.logger.Warn("Failed to set initial read deadline", zap.Error(err))
		s.Close("failed to set initial read deadline")
		return
	}
	s.conn.SetPongHandler(func(string) error {
@@ -168,14 +173,12 @@ func (s *sessionWS) Consume(processRequest func(logger *zap.Logger, session Sess
		return nil
	})

	// Fire an event for session start.
	if fn := s.runtime.EventSessionStart(); fn != nil {
		fn(s.userID.String(), s.username.Load(), s.expiry, s.id.String(), s.clientIP, s.clientPort, time.Now().UTC().Unix())
	}

	// Start a routine to process outbound messages.
	go s.processOutgoing()

	var reason string

IncomingLoop:
	for {
		messageType, data, err := s.conn.ReadMessage()
		if err != nil {
@@ -184,6 +187,7 @@ func (s *sessionWS) Consume(processRequest func(logger *zap.Logger, session Sess
				// Ignore underlying connection being shut down while read is waiting for data.
				if e, ok := err.(*net.OpError); !ok || e.Err.Error() != "use of closed network connection" {
					s.logger.Debug("Error reading message from client", zap.Error(err))
					reason = err.Error()
				}
			}
			break
@@ -192,6 +196,7 @@ func (s *sessionWS) Consume(processRequest func(logger *zap.Logger, session Sess
			// Expected text but received binary, or expected binary but received text.
			// Disconnect client if it attempts to use this kind of mixed protocol mode.
			s.logger.Debug("Received unexpected WebSocket message type", zap.Int("expected", s.wsMessageType), zap.Int("actual", messageType))
			reason = "received unexpected WebSocket message type"
			break
		}

@@ -216,21 +221,26 @@ func (s *sessionWS) Consume(processRequest func(logger *zap.Logger, session Sess
		if err != nil {
			// If the payload is malformed the client is incompatible or misbehaving, either way disconnect it now.
			s.logger.Warn("Received malformed payload", zap.Binary("data", data))
			reason = "received malformed payload"
			break
		}

		switch request.Cid {
		case "":
			if !processRequest(s.logger, s, request) {
				break
				reason = "error processing message"
				break IncomingLoop
			}
		default:
			requestLogger := s.logger.With(zap.String("cid", request.Cid))
			if !processRequest(requestLogger, s, request) {
				break
				reason = "error processing message"
				break IncomingLoop
			}
		}
	}

	s.Close(reason)
}

func (s *sessionWS) maybeResetPingTimer() bool {
@@ -257,24 +267,27 @@ func (s *sessionWS) maybeResetPingTimer() bool {
	s.Unlock()
	if err != nil {
		s.logger.Warn("Failed to set read deadline", zap.Error(err))
		s.Close()
		s.Close("failed to set read deadline")
		return false
	}
	return true
}

func (s *sessionWS) processOutgoing() {
	defer s.Close()
	var reason string

OutgoingLoop:
	for {
		select {
		case <-s.ctx.Done():
			// Session is closing, close the outgoing process routine.
			return
			break OutgoingLoop
		case <-s.pingTimer.C:
			// Periodically send pings.
			if !s.pingNow() {
			if msg, ok := s.pingNow(); !ok {
				// If ping fails the session will be stopped, clean up the loop.
				return
				reason = msg
				break OutgoingLoop
			}
		case payload := <-s.outgoingCh:
			s.Lock()
@@ -282,39 +295,47 @@ func (s *sessionWS) processOutgoing() {
				// The connection may have stopped between the payload being queued on the outgoing channel and reaching here.
				// If that's the case then abort outgoing processing at this point and exit.
				s.Unlock()
				return
				break OutgoingLoop
			}
			// Process the outgoing message queue.
			s.conn.SetWriteDeadline(time.Now().Add(s.writeWaitDuration))
			if err := s.conn.SetWriteDeadline(time.Now().Add(s.writeWaitDuration)); err != nil {
				s.Unlock()
				s.logger.Warn("Failed to set write deadline", zap.Error(err))
				reason = err.Error()
				break OutgoingLoop
			}
			if err := s.conn.WriteMessage(s.wsMessageType, payload); err != nil {
				s.Unlock()
				s.logger.Warn("Could not write message", zap.Error(err))
				return
				reason = err.Error()
				break OutgoingLoop
			}
			s.Unlock()
		}
	}

	s.Close(reason)
}

func (s *sessionWS) pingNow() bool {
func (s *sessionWS) pingNow() (string, bool) {
	s.Lock()
	if s.stopped {
		s.Unlock()
		return false
		return "", false
	}
	if err := s.conn.SetWriteDeadline(time.Now().Add(s.writeWaitDuration)); err != nil {
		s.Unlock()
		s.logger.Warn("Could not set write deadline to ping", zap.Error(err))
		return false
		return err.Error(), false
	}
	err := s.conn.WriteMessage(websocket.PingMessage, []byte{})
	s.Unlock()
	if err != nil {
		s.logger.Warn("Could not send ping", zap.Error(err))
		return false
		return err.Error(), false
	}

	return true
	return "", true
}

func (s *sessionWS) Format() SessionFormat {
@@ -387,12 +408,12 @@ func (s *sessionWS) SendBytes(isStream bool, mode uint8, payload []byte) error {
		// to start dropping messages, which might cause unexpected behaviour.
		s.Unlock()
		s.logger.Warn("Could not write message, session outgoing queue full")
		s.Close()
		s.Close(ErrSessionQueueFull.Error())
		return ErrSessionQueueFull
	}
}

func (s *sessionWS) Close() {
func (s *sessionWS) Close(reason string) {
	s.Lock()
	if s.stopped {
		s.Unlock()
@@ -430,6 +451,7 @@ func (s *sessionWS) Close() {

	// Send close message.
	if err := s.conn.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(s.writeWaitDuration)); err != nil {
		// This may not be possible if the socket was already fully closed by an error.
		s.logger.Debug("Could not send close message", zap.Error(err))
	}
	// Close WebSocket.
@@ -441,6 +463,6 @@ func (s *sessionWS) Close() {

	// Fire an event for session end.
	if fn := s.runtime.EventSessionEnd(); fn != nil {
		fn(s.userID.String(), s.username.Load(), s.expiry, s.id.String(), s.clientIP, s.clientPort, time.Now().UTC().Unix())
		fn(s.userID.String(), s.username.Load(), s.expiry, s.id.String(), s.clientIP, s.clientPort, time.Now().UTC().Unix(), reason)
	}
}
+1 −1
Original line number Diff line number Diff line
@@ -105,7 +105,7 @@ func (d *DummySession) SendBytes(isStream bool, mode uint8, payload []byte) erro
	return nil
}

func (d *DummySession) Close() {}
func (d *DummySession) Close(reason string) {}

type loggerEnabler struct{}