Commit 85a285cc authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Clean up stream manager interface.

parent 1cc1b53c
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ require (
	github.com/gorilla/mux v1.7.4
	github.com/gorilla/websocket v1.4.2
	github.com/grpc-ecosystem/grpc-gateway v1.13.0
	github.com/heroiclabs/nakama-common v1.7.1
	github.com/heroiclabs/nakama-common v1.7.2
	github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
	github.com/jackc/pgx v3.5.0+incompatible
	github.com/jmhodges/levigo v1.0.0 // indirect
+2 −0
+1 −1
Original line number Diff line number Diff line
@@ -202,7 +202,7 @@ func NewMatchHandler(logger *zap.Logger, config Config, sessionRegistry SessionR
func (mh *MatchHandler) disconnectClients() {
	presenceIDs := mh.PresenceList.ListPresenceIDs()
	for _, presenceID := range presenceIDs {
		_ = mh.sessionRegistry.Disconnect(context.Background(), presenceID.SessionID, presenceID.Node)
		_ = mh.sessionRegistry.Disconnect(context.Background(), presenceID.SessionID)
	}
}

+12 −40
Original line number Diff line number Diff line
@@ -720,19 +720,10 @@ func (n *RuntimeGoNakamaModule) StreamUserJoin(mode uint8, subject, subcontext,
		}
	}

	// Look up the session.
	session := n.sessionRegistry.Get(sid)
	if session == nil {
		return false, errors.New("session id does not exist")
	}

	success, newlyTracked := n.tracker.Track(sid, stream, uid, PresenceMeta{
		Format:      session.Format(),
		Hidden:      hidden,
		Persistence: persistence,
		Username:    session.Username(),
		Status:      status,
	}, false)
	success, newlyTracked, err := n.streamManager.UserJoin(stream, uid, sid, hidden, persistence, status)
	if err != nil {
		return false, err
	}
	if !success {
		return false, errors.New("tracker rejected new presence, session is closing")
	}
@@ -768,19 +759,11 @@ func (n *RuntimeGoNakamaModule) StreamUserUpdate(mode uint8, subject, subcontext
		}
	}

	// Look up the session.
	session := n.sessionRegistry.Get(sid)
	if session == nil {
		return errors.New("session id does not exist")
	success, err := n.streamManager.UserUpdate(stream, uid, sid, hidden, persistence, status)
	if err != nil {
		return err
	}

	if !n.tracker.Update(sid, stream, uid, PresenceMeta{
		Format:      session.Format(),
		Hidden:      hidden,
		Persistence: persistence,
		Username:    session.Username(),
		Status:      status,
	}, false) {
	if !success {
		return errors.New("tracker rejected updated presence, session is closing")
	}

@@ -815,9 +798,7 @@ func (n *RuntimeGoNakamaModule) StreamUserLeave(mode uint8, subject, subcontext,
		}
	}

	n.tracker.Untrack(sid, stream, uid)

	return nil
	return n.streamManager.UserLeave(stream, uid, sid)
}

func (n *RuntimeGoNakamaModule) StreamUserKick(mode uint8, subject, subcontext, label string, presence runtime.Presence) error {
@@ -831,11 +812,6 @@ func (n *RuntimeGoNakamaModule) StreamUserKick(mode uint8, subject, subcontext,
		return errors.New("expects valid session id")
	}

	node := presence.GetNodeId()
	if node == "" {
		node = n.node
	}

	stream := PresenceStream{
		Mode:  mode,
		Label: label,
@@ -853,7 +829,7 @@ func (n *RuntimeGoNakamaModule) StreamUserKick(mode uint8, subject, subcontext,
		}
	}

	return n.streamManager.UserKick(uid, sid, node, stream)
	return n.streamManager.UserLeave(stream, uid, sid)
}

func (n *RuntimeGoNakamaModule) StreamCount(mode uint8, subject, subcontext, label string) (int, error) {
@@ -1022,17 +998,13 @@ func (n *RuntimeGoNakamaModule) StreamSendRaw(mode uint8, subject, subcontext, l
	return nil
}

func (n *RuntimeGoNakamaModule) SessionDisconnect(ctx context.Context, sessionID, node string) error {
func (n *RuntimeGoNakamaModule) SessionDisconnect(ctx context.Context, sessionID string) error {
	sid, err := uuid.FromString(sessionID)
	if err != nil {
		return errors.New("expects valid session id")
	}

	if node == "" {
		node = n.node
	}

	return n.sessionRegistry.Disconnect(ctx, sid, node)
	return n.sessionRegistry.Disconnect(ctx, sid)
}

func (n *RuntimeGoNakamaModule) MatchCreate(ctx context.Context, module string, params map[string]interface{}) (string, error) {
+20 −29
Original line number Diff line number Diff line
@@ -2898,20 +2898,15 @@ func (n *RuntimeLuaNakamaModule) streamUserJoin(l *lua.LState) int {
	// By default no status is set.
	status := l.OptString(6, "")

	// Look up the session.
	session := n.sessionRegistry.Get(sessionID)
	if session == nil {
	success, newlyTracked, err := n.streamManager.UserJoin(stream, userID, sessionID, hidden, persistence, status)
	if err != nil {
		if err == ErrSessionNotFound {
			l.ArgError(2, "session id does not exist")
			return 0
		}

	success, newlyTracked := n.tracker.Track(sessionID, stream, userID, PresenceMeta{
		Format:      session.Format(),
		Hidden:      hidden,
		Persistence: persistence,
		Username:    session.Username(),
		Status:      status,
	}, false)
		l.RaiseError(fmt.Sprintf("stream user join failed: %v", err.Error()))
		return 0
	}
	if !success {
		l.RaiseError("tracker rejected new presence, session is closing")
		return 0
@@ -3013,20 +3008,16 @@ func (n *RuntimeLuaNakamaModule) streamUserUpdate(l *lua.LState) int {
	// By default no status is set.
	status := l.OptString(6, "")

	// Look up the session.
	session := n.sessionRegistry.Get(sessionID)
	if session == nil {
	success, err := n.streamManager.UserUpdate(stream, userID, sessionID, hidden, persistence, status)
	if err != nil {
		if err == ErrSessionNotFound {
			l.ArgError(2, "session id does not exist")
			return 0
		}

	if !n.tracker.Update(sessionID, stream, userID, PresenceMeta{
		Format:      session.Format(),
		Hidden:      hidden,
		Persistence: persistence,
		Username:    session.Username(),
		Status:      status,
	}, false) {
		l.RaiseError(fmt.Sprintf("stream user update failed: %v", err.Error()))
		return 0
	}
	if !success {
		l.RaiseError("tracker rejected updated presence, session is closing")
	}

@@ -3118,7 +3109,9 @@ func (n *RuntimeLuaNakamaModule) streamUserLeave(l *lua.LState) int {
		return 0
	}

	n.tracker.Untrack(sessionID, stream, userID)
	if err := n.streamManager.UserLeave(stream, userID, sessionID); err != nil {
		l.RaiseError(fmt.Sprintf("stream user leave failed: %v", err.Error()))
	}

	return 0
}
@@ -3232,7 +3225,7 @@ func (n *RuntimeLuaNakamaModule) streamUserKick(l *lua.LState) int {
		return 0
	}

	if err := n.streamManager.UserKick(userID, sessionID, node, stream); err != nil {
	if err := n.streamManager.UserLeave(stream, userID, sessionID); err != nil {
		l.RaiseError(fmt.Sprintf("stream user kick failed: %v", err.Error()))
	}

@@ -3693,9 +3686,7 @@ func (n *RuntimeLuaNakamaModule) sessionDisconnect(l *lua.LState) int {
		return 0
	}

	node := l.OptString(2, n.node)

	if err := n.sessionRegistry.Disconnect(l.Context(), sessionID, node); err != nil {
	if err := n.sessionRegistry.Disconnect(l.Context(), sessionID); err != nil {
		l.RaiseError(fmt.Sprintf("failed to disconnect: %s", err.Error()))
	}
	return 0
Loading