Commit bdfc74d1 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Script runtime invocations now use separate underlying states to improve concurrency. Merge #119

parent dc0a5964
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p

### Changed
- Script runtime RPC and HTTP hook errors now return more detail when verbose logging is enabled.
- Script runtime invocations now use separate underlying states to improve concurrency.

### Fixed
- Haystack leaderboard record listings now return correct results around both sides of the pivot record.
+3 −4
Original line number Diff line number Diff line
@@ -92,15 +92,15 @@ func main() {
	trackerService.AddDiffListener(presenceNotifier.HandleDiff)
	notificationService := server.NewNotificationService(jsonLogger, db, trackerService, messageRouter, config.GetSocial().Notification)

	runtime, err := server.NewRuntime(jsonLogger, multiLogger, db, config.GetRuntime(), notificationService)
	runtimePool, err := server.NewRuntimePool(jsonLogger, multiLogger, db, config.GetRuntime(), notificationService)
	if err != nil {
		multiLogger.Fatal("Failed initializing runtime modules.", zap.Error(err))
	}

	socialClient := social.NewClient(5 * time.Second)
	purchaseService := server.NewPurchaseService(jsonLogger, multiLogger, db, config.GetPurchase())
	pipeline := server.NewPipeline(config, db, trackerService, matchmakerService, messageRouter, sessionRegistry, socialClient, runtime, purchaseService, notificationService)
	authService := server.NewAuthenticationService(jsonLogger, config, db, statsService, sessionRegistry, socialClient, pipeline, runtime)
	pipeline := server.NewPipeline(config, db, trackerService, matchmakerService, messageRouter, sessionRegistry, socialClient, runtimePool, purchaseService, notificationService)
	authService := server.NewAuthenticationService(jsonLogger, config, db, statsService, sessionRegistry, socialClient, pipeline, runtimePool)
	dashboardService := server.NewDashboardService(jsonLogger, multiLogger, semver, config, statsService)

	gaenabled := len(os.Getenv("NAKAMA_TELEMETRY")) < 1
@@ -121,7 +121,6 @@ func main() {
		authService.Stop()
		dashboardService.Stop()
		trackerService.Stop()
		runtime.Stop()

		if gaenabled {
			ga.SendSessionStop(http.DefaultClient, gacode, cookie)
+22 −5
Original line number Diff line number Diff line
@@ -25,9 +25,11 @@ import (
	"go.uber.org/zap"
)

func RuntimeBeforeHook(runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, messageType string, envelope *Envelope, session *session) (*Envelope, error) {
func RuntimeBeforeHook(runtimePool *RuntimePool, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, messageType string, envelope *Envelope, session *session) (*Envelope, error) {
	runtime := runtimePool.Get()
	fn := runtime.GetRuntimeCallback(BEFORE, messageType)
	if fn == nil {
		runtimePool.Put(runtime)
		return envelope, nil
	}

@@ -40,12 +42,16 @@ func RuntimeBeforeHook(runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, json
		expiry = session.expiry
	}

	return runtime.InvokeFunctionBefore(fn, userId, handle, expiry, jsonpbMarshaler, jsonpbUnmarshaler, envelope)
	env, err := runtime.InvokeFunctionBefore(fn, userId, handle, expiry, jsonpbMarshaler, jsonpbUnmarshaler, envelope)
	runtimePool.Put(runtime)
	return env, err
}

func RuntimeAfterHook(logger *zap.Logger, runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, messageType string, envelope *Envelope, session *session) {
func RuntimeAfterHook(logger *zap.Logger, runtimePool *RuntimePool, jsonpbMarshaler *jsonpb.Marshaler, messageType string, envelope *Envelope, session *session) {
	runtime := runtimePool.Get()
	fn := runtime.GetRuntimeCallback(AFTER, messageType)
	if fn == nil {
		runtimePool.Put(runtime)
		return
	}

@@ -73,22 +79,27 @@ func RuntimeAfterHook(logger *zap.Logger, runtime *Runtime, jsonpbMarshaler *jso
	if fnErr := runtime.InvokeFunctionAfter(fn, userId, handle, expiry, jsonEnvelope); fnErr != nil {
		logger.Error("Runtime after function caused an error", zap.String("message", messageType), zap.Error(fnErr))
	}
	runtimePool.Put(runtime)
}

func RuntimeBeforeHookAuthentication(runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, envelope *AuthenticateRequest) (*AuthenticateRequest, error) {
func RuntimeBeforeHookAuthentication(runtimePool *RuntimePool, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, envelope *AuthenticateRequest) (*AuthenticateRequest, error) {
	messageType := RUNTIME_MESSAGES[fmt.Sprintf("%T", envelope.Id)]
	runtime := runtimePool.Get()
	fn := runtime.GetRuntimeCallback(BEFORE, messageType)
	if fn == nil {
		runtimePool.Put(runtime)
		return envelope, nil
	}

	strEnvelope, err := jsonpbMarshaler.MarshalToString(envelope)
	if err != nil {
		runtimePool.Put(runtime)
		return nil, err
	}

	var jsonEnvelope map[string]interface{}
	if err = json.Unmarshal([]byte(strEnvelope), &jsonEnvelope); err != nil {
		runtimePool.Put(runtime)
		return nil, err
	}

@@ -97,6 +108,7 @@ func RuntimeBeforeHookAuthentication(runtime *Runtime, jsonpbMarshaler *jsonpb.M
	expiry := int64(0)

	result, fnErr := runtime.InvokeFunctionBeforeAuthentication(fn, userId, handle, expiry, jsonEnvelope)
	runtimePool.Put(runtime)
	if fnErr != nil {
		return nil, fnErr
	}
@@ -114,21 +126,25 @@ func RuntimeBeforeHookAuthentication(runtime *Runtime, jsonpbMarshaler *jsonpb.M
	return authenticationResult, nil
}

func RuntimeAfterHookAuthentication(logger *zap.Logger, runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, envelope *AuthenticateRequest, userId uuid.UUID, handle string, expiry int64) {
func RuntimeAfterHookAuthentication(logger *zap.Logger, runtimePool *RuntimePool, jsonpbMarshaler *jsonpb.Marshaler, envelope *AuthenticateRequest, userId uuid.UUID, handle string, expiry int64) {
	messageType := RUNTIME_MESSAGES[fmt.Sprintf("%T", envelope.Id)]
	runtime := runtimePool.Get()
	fn := runtime.GetRuntimeCallback(AFTER, messageType)
	if fn == nil {
		runtimePool.Put(runtime)
		return
	}

	strEnvelope, err := jsonpbMarshaler.MarshalToString(envelope)
	if err != nil {
		runtimePool.Put(runtime)
		logger.Error("Failed to convert proto message to protoJSON in After invocation", zap.String("message", messageType), zap.Error(err))
		return
	}

	var jsonEnvelope map[string]interface{}
	if err = json.Unmarshal([]byte(strEnvelope), &jsonEnvelope); err != nil {
		runtimePool.Put(runtime)
		logger.Error("Failed to convert protoJSON message to Map in After invocation", zap.String("message", messageType), zap.Error(err))
		return
	}
@@ -136,4 +152,5 @@ func RuntimeAfterHookAuthentication(logger *zap.Logger, runtime *Runtime, jsonpb
	if fnErr := runtime.InvokeFunctionAfter(fn, userId, handle, expiry, jsonEnvelope); fnErr != nil {
		logger.Error("Runtime after function caused an error", zap.String("message", messageType), zap.Error(fnErr))
	}
	runtimePool.Put(runtime)
}
+5 −5
Original line number Diff line number Diff line
@@ -33,7 +33,7 @@ type pipeline struct {
	messageRouter       MessageRouter
	sessionRegistry     *SessionRegistry
	socialClient        *social.Client
	runtime             *Runtime
	runtimePool         *RuntimePool
	purchaseService     *PurchaseService
	notificationService *NotificationService
	jsonpbMarshaler     *jsonpb.Marshaler
@@ -48,7 +48,7 @@ func NewPipeline(config Config,
	messageRouter MessageRouter,
	registry *SessionRegistry,
	socialClient *social.Client,
	runtime *Runtime,
	runtimePool *RuntimePool,
	purchaseService *PurchaseService,
	notificationService *NotificationService) *pipeline {
	return &pipeline{
@@ -60,7 +60,7 @@ func NewPipeline(config Config,
		messageRouter:       messageRouter,
		sessionRegistry:     registry,
		socialClient:        socialClient,
		runtime:             runtime,
		runtimePool:         runtimePool,
		purchaseService:     purchaseService,
		notificationService: notificationService,
		jsonpbMarshaler: &jsonpb.Marshaler{
@@ -85,7 +85,7 @@ func (p *pipeline) processRequest(logger *zap.Logger, session *session, original
	logger.Debug("Received message", zap.String("type", messageType))

	messageType = RUNTIME_MESSAGES[messageType]
	envelope, fnErr := RuntimeBeforeHook(p.runtime, p.jsonpbMarshaler, p.jsonpbUnmarshaler, messageType, originalEnvelope, session)
	envelope, fnErr := RuntimeBeforeHook(p.runtimePool, p.jsonpbMarshaler, p.jsonpbUnmarshaler, messageType, originalEnvelope, session)
	if fnErr != nil {
		logger.Error("Runtime before function caused an error", zap.String("message", messageType), zap.Error(fnErr))
		session.Send(ErrorMessage(originalEnvelope.CollationId, RUNTIME_FUNCTION_EXCEPTION, fmt.Sprintf("Runtime before function caused an error: %s", fnErr.Error())))
@@ -203,7 +203,7 @@ func (p *pipeline) processRequest(logger *zap.Logger, session *session, original
		return
	}

	RuntimeAfterHook(logger, p.runtime, p.jsonpbMarshaler, messageType, envelope, session)
	RuntimeAfterHook(logger, p.runtimePool, p.jsonpbMarshaler, messageType, envelope, session)
}

func ErrorMessageRuntimeException(collationID string, message string) *Envelope {
+5 −2
Original line number Diff line number Diff line
@@ -27,13 +27,16 @@ func (p *pipeline) rpc(logger *zap.Logger, session *session, envelope *Envelope)
		return
	}

	lf := p.runtime.GetRuntimeCallback(RPC, rpcMessage.Id)
	runtime := p.runtimePool.Get()
	lf := runtime.GetRuntimeCallback(RPC, rpcMessage.Id)
	if lf == nil {
		p.runtimePool.Put(runtime)
		session.Send(ErrorMessage(envelope.CollationId, RUNTIME_FUNCTION_NOT_FOUND, "RPC function not found"))
		return
	}

	result, fnErr := p.runtime.InvokeFunctionRPC(lf, session.userID, session.handle.Load(), session.expiry, rpcMessage.Payload)
	result, fnErr := runtime.InvokeFunctionRPC(lf, session.userID, session.handle.Load(), session.expiry, rpcMessage.Payload)
	p.runtimePool.Put(runtime)
	if fnErr != nil {
		logger.Error("Runtime RPC function caused an error", zap.String("id", rpcMessage.Id), zap.Error(fnErr))
		if apiErr, ok := fnErr.(*lua.ApiError); ok && !p.config.GetLog().Verbose {
Loading