From f434f3da9489d96b3596050b3fe8ca91b228752c Mon Sep 17 00:00:00 2001 From: Andrei Mihu Date: Sat, 6 Oct 2018 13:40:17 +0100 Subject: [PATCH] Graceful server shutdown and match termination. (#246) --- CHANGELOG.md | 1 + data/modules/match.lua | 47 +++++++++++++++++++++- main.go | 37 +++++++++++++++-- runtime/runtime.go | 1 + sample_go_module/sample.go | 9 +++++ server/config.go | 64 ++++++++++++++++------------- server/match_handler.go | 27 +++++++++++++ server/match_registry.go | 69 ++++++++++++++++++++++++++++---- server/runtime.go | 1 + server/runtime_go_match_core.go | 5 +++ server/runtime_lua_match_core.go | 40 +++++++++++++++++- 11 files changed, 261 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e0759acb..14f00acb1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - Lua runtime AES-256 functions. - Lua runtime token generator function now returns a second value representing the token's expiry. - Add local cache for in-memory storage to the Lua runtime. +- Graceful server shutdown and match termination. ### Changed - Improved Postgres compatibility on TIMESTAMPTZ types. diff --git a/data/modules/match.lua b/data/modules/match.lua index 1e8258043..66ea78360 100644 --- a/data/modules/match.lua +++ b/data/modules/match.lua @@ -262,11 +262,56 @@ local function match_loop(context, dispatcher, tick, state, messages) end end +--[[ +Called when the server begins a graceful shutdown process. Will not be called if graceful shutdown is disabled. + +Context represents information about the match and server, for information purposes. Format: +{ + env = {}, -- key-value data set in the runtime.env server configuration. + executionMode = "Match", + match_id = "client-friendly match ID, can be shared with clients and used in match join operations", + match_node = "name of the Nakama node hosting this match", + match_label = "the label string returned from match_init", + match_tick_rate = 1 -- the tick rate returned by match_init +} + +Dispatcher exposes useful functions to the match. Format: +{ + broadcast_message = function(op_code, data, presences, sender), + -- numeric message op code + -- a data payload string, or nil + -- list of presences (a subset of match participants) to use as message targets, or nil to send to the whole match + -- a presence to tag on the message as the 'sender', or nil + match_kick = function(presences) + -- a list of presences to remove from the match + match_label_update = function(label) + -- a new label to set for the match +} + +Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with +calls to match_join_attempt, match_join, or match_leave. + +State is the current in-memory match state, may be any Lua term except nil. + +Grace Seconds is the number of seconds remaining until the server will shut down. + +Expected return these values (all required) in order: +1. An (optionally) updated state. May be any non-nil Lua term, or nil to end the match. +--]] +local function match_terminate(context, dispatcher, tick, state, grace_seconds) + if state.debug then + print("match " .. context.match_id .. " tick " .. tick) + print("match " .. context.match_id .. " grace_seconds " .. grace_seconds) + end + return state +end + -- Match modules must return a table with these functions defined. All functions are required. return { match_init = match_init, match_join_attempt = match_join_attempt, match_join = match_join, match_leave = match_leave, - match_loop = match_loop + match_loop = match_loop, + match_terminate = match_terminate } diff --git a/main.go b/main.go index de5935bfb..8bf44c40c 100644 --- a/main.go +++ b/main.go @@ -138,14 +138,43 @@ func main() { // Wait for a termination signal. <-c - startupLogger.Info("Shutting down") - // Gracefully stop server components. + graceSeconds := config.GetShutdownGraceSec() + + // If a shutdown grace period is allowed, prepare a timer. + var timer *time.Timer + timerCh := make(<-chan time.Time, 1) + if graceSeconds != 0 { + timer = time.NewTimer(time.Duration(graceSeconds) * time.Second) + timerCh = timer.C + startupLogger.Info("Shutdown started - use CTRL^C to force stop server", zap.Int("grace_period_sec", graceSeconds)) + } else { + // No grace period. + startupLogger.Info("Shutdown started") + } + + // Stop any running authoritative matches and do not accept any new ones. + select { + case <-matchRegistry.Stop(graceSeconds): + // Graceful shutdown has completed. + case <-timerCh: + // Timer has expired, terminate matches immediately. + startupLogger.Info("Shutdown grace period expired") + <-matchRegistry.Stop(0) + case <-c: + // A second interrupt has been received. + startupLogger.Info("Skipping graceful shutdown") + <-matchRegistry.Stop(0) + } + if timer != nil { + timer.Stop() + } + + // Gracefully stop remaining server components. apiServer.Stop() consoleServer.Stop() metrics.Stop(logger) leaderboardScheduler.Stop() - matchRegistry.Stop() tracker.Stop() sessionRegistry.Stop() @@ -153,6 +182,8 @@ func main() { ga.SendSessionStop(telemetryClient, gacode, cookie) } + startupLogger.Info("Shutdown complete") + os.Exit(0) } diff --git a/runtime/runtime.go b/runtime/runtime.go index 2e14dc6e2..ff7593737 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -220,6 +220,7 @@ type Match interface { MatchJoin(ctx context.Context, logger *log.Logger, db *sql.DB, nk NakamaModule, dispatcher MatchDispatcher, tick int64, state interface{}, presences []Presence) interface{} MatchLeave(ctx context.Context, logger *log.Logger, db *sql.DB, nk NakamaModule, dispatcher MatchDispatcher, tick int64, state interface{}, presences []Presence) interface{} MatchLoop(ctx context.Context, logger *log.Logger, db *sql.DB, nk NakamaModule, dispatcher MatchDispatcher, tick int64, state interface{}, messages []MatchData) interface{} + MatchTerminate(ctx context.Context, logger *log.Logger, db *sql.DB, nk NakamaModule, dispatcher MatchDispatcher, tick int64, state interface{}, graceSeconds int) interface{} } type NotificationSend struct { diff --git a/sample_go_module/sample.go b/sample_go_module/sample.go index d65822fcf..2470d6b05 100644 --- a/sample_go_module/sample.go +++ b/sample_go_module/sample.go @@ -105,3 +105,12 @@ func (m *Match) MatchLoop(ctx context.Context, logger *log.Logger, db *sql.DB, n } return state } + +func (m *Match) MatchTerminate(ctx context.Context, logger *log.Logger, db *sql.DB, nk runtime.NakamaModule, dispatcher runtime.MatchDispatcher, tick int64, state interface{}, graceSeconds int) interface{} { + if state.(*MatchState).debug { + logger.Printf("match terminate match_id %v tick %v", ctx.Value(runtime.RUNTIME_CTX_MATCH_ID), tick) + logger.Printf("match terminate match_id %v grace seconds %v", ctx.Value(runtime.RUNTIME_CTX_MATCH_ID), graceSeconds) + } + + return state +} diff --git a/server/config.go b/server/config.go index d7d1a6a4d..ec6496f37 100644 --- a/server/config.go +++ b/server/config.go @@ -34,6 +34,7 @@ import ( type Config interface { GetName() string GetDataDir() string + GetShutdownGraceSec() int GetLogger() *LoggerConfig GetMetrics() *MetricsConfig GetSession() *SessionConfig @@ -95,6 +96,9 @@ func ParseArgs(logger *zap.Logger, args []string) Config { if l := len(mainConfig.Name); l < 1 || l > 16 { logger.Fatal("Name must be 1-16 characters", zap.String("param", "name")) } + if mainConfig.ShutdownGraceSec < 0 { + logger.Fatal("Shutdown grace period must be >= 0", zap.Int("shutdown_grace_sec", mainConfig.ShutdownGraceSec)) + } if mainConfig.GetSocket().ServerKey == "" { logger.Fatal("Server key must be set", zap.String("param", "socket.server_key")) } @@ -204,19 +208,20 @@ func convertRuntimeEnv(logger *zap.Logger, existingEnv map[string]string, mergeE } type config struct { - Name string `yaml:"name" json:"name" usage:"Nakama server’s node name - must be unique."` - Config string `yaml:"config" json:"config" usage:"The absolute file path to configuration YAML file."` - Datadir string `yaml:"data_dir" json:"data_dir" usage:"An absolute path to a writeable folder where Nakama will store its data."` - Logger *LoggerConfig `yaml:"logger" json:"logger" usage:"Logger levels and output."` - Metrics *MetricsConfig `yaml:"metrics" json:"metrics" usage:"Metrics settings."` - Session *SessionConfig `yaml:"session" json:"session" usage:"Session authentication settings."` - Socket *SocketConfig `yaml:"socket" json:"socket" usage:"Socket configuration."` - Database *DatabaseConfig `yaml:"database" json:"database" usage:"Database connection settings."` - Social *SocialConfig `yaml:"social" json:"social" usage:"Properties for social provider integrations."` - Runtime *RuntimeConfig `yaml:"runtime" json:"runtime" usage:"Script Runtime properties."` - Match *MatchConfig `yaml:"match" json:"match" usage:"Authoritative realtime match properties."` - Console *ConsoleConfig `yaml:"console" json:"console" usage:"Console settings."` - Leaderboard *LeaderboardConfig `yaml:"leaderboard" json:"leaderboard" usage:"Leaderboard settings."` + Name string `yaml:"name" json:"name" usage:"Nakama server’s node name - must be unique."` + Config string `yaml:"config" json:"config" usage:"The absolute file path to configuration YAML file."` + ShutdownGraceSec int `yaml:"shutdown_grace_sec" json:"shutdown_grace_sec" usage:"Maximum number of seconds to wait for the server to complete work before shutting down. Default is 0 seconds. If 0 the server will shut down immediately when it receives a termination signal."` + Datadir string `yaml:"data_dir" json:"data_dir" usage:"An absolute path to a writeable folder where Nakama will store its data."` + Logger *LoggerConfig `yaml:"logger" json:"logger" usage:"Logger levels and output."` + Metrics *MetricsConfig `yaml:"metrics" json:"metrics" usage:"Metrics settings."` + Session *SessionConfig `yaml:"session" json:"session" usage:"Session authentication settings."` + Socket *SocketConfig `yaml:"socket" json:"socket" usage:"Socket configuration."` + Database *DatabaseConfig `yaml:"database" json:"database" usage:"Database connection settings."` + Social *SocialConfig `yaml:"social" json:"social" usage:"Properties for social provider integrations."` + Runtime *RuntimeConfig `yaml:"runtime" json:"runtime" usage:"Script Runtime properties."` + Match *MatchConfig `yaml:"match" json:"match" usage:"Authoritative realtime match properties."` + Console *ConsoleConfig `yaml:"console" json:"console" usage:"Console settings."` + Leaderboard *LeaderboardConfig `yaml:"leaderboard" json:"leaderboard" usage:"Leaderboard settings."` } // NewConfig constructs a Config struct which represents server settings, and populates it with default values. @@ -226,18 +231,19 @@ func NewConfig(logger *zap.Logger) *config { logger.Fatal("Error getting current working directory.", zap.Error(err)) } return &config{ - Name: "nakama", - Datadir: filepath.Join(cwd, "data"), - Logger: NewLoggerConfig(), - Metrics: NewMetricsConfig(), - Session: NewSessionConfig(), - Socket: NewSocketConfig(), - Database: NewDatabaseConfig(), - Social: NewSocialConfig(), - Runtime: NewRuntimeConfig(), - Match: NewMatchConfig(), - Console: NewConsoleConfig(), - Leaderboard: NewLeaderboardConfig(), + Name: "nakama", + Datadir: filepath.Join(cwd, "data"), + ShutdownGraceSec: 0, + Logger: NewLoggerConfig(), + Metrics: NewMetricsConfig(), + Session: NewSessionConfig(), + Socket: NewSocketConfig(), + Database: NewDatabaseConfig(), + Social: NewSocialConfig(), + Runtime: NewRuntimeConfig(), + Match: NewMatchConfig(), + Console: NewConsoleConfig(), + Leaderboard: NewLeaderboardConfig(), } } @@ -249,6 +255,10 @@ func (c *config) GetDataDir() string { return c.Datadir } +func (c *config) GetShutdownGraceSec() int { + return c.ShutdownGraceSec +} + func (c *config) GetLogger() *LoggerConfig { return c.Logger } @@ -291,12 +301,12 @@ func (c *config) GetLeaderboard() *LeaderboardConfig { // LoggerConfig is configuration relevant to logging levels and output. type LoggerConfig struct { - Level string `yaml:"level" json:"level" usage:"Log level to set. Valid values are 'debug', 'info', 'warn', 'error'. "` + Level string `yaml:"level" json:"level" usage:"Log level to set. Valid values are 'debug', 'info', 'warn', 'error'."` Stdout bool `yaml:"stdout" json:"stdout" usage:"Log to standard console output (as well as to a file if set)."` File string `yaml:"file" json:"file" usage:"Log output to a file (as well as stdout if set). Make sure that the directory and the file is writable."` } -// NewLogConfig creates a new LoggerConfig struct. +// NewLoggerConfig creates a new LoggerConfig struct. func NewLoggerConfig() *LoggerConfig { return &LoggerConfig{ Level: "info", diff --git a/server/match_handler.go b/server/match_handler.go index 8d4a73a98..96b01d705 100644 --- a/server/match_handler.go +++ b/server/match_handler.go @@ -292,3 +292,30 @@ func Leave(leaves []*MatchPresence) func(mh *MatchHandler) { mh.state = state } } + +func Terminate(graceSeconds int) func(mh *MatchHandler) { + return func(mh *MatchHandler) { + if mh.stopped.Load() { + return + } + + state, err := mh.core.MatchTerminate(mh.tick, mh.state, graceSeconds) + if err != nil { + mh.Stop() + mh.logger.Warn("Stopping match after error from match_terminate execution", zap.Int("tick", int(mh.tick)), zap.Error(err)) + return + } + if state == nil { + mh.Stop() + mh.logger.Info("Match terminate returned nil or no state, stopping match") + return + } + + mh.state = state + + // If grace period is 0 end the match immediately after the callback returns. + if graceSeconds == 0 { + mh.Stop() + } + } +} diff --git a/server/match_registry.go b/server/match_registry.go index ac8b39c6f..7ffc1aa7c 100644 --- a/server/match_registry.go +++ b/server/match_registry.go @@ -19,6 +19,7 @@ import ( "github.com/gofrs/uuid" "github.com/golang/protobuf/ptypes/wrappers" "github.com/heroiclabs/nakama/api" + "github.com/pkg/errors" "go.uber.org/atomic" "go.uber.org/zap" "sync" @@ -81,7 +82,7 @@ type MatchRegistry interface { // This can list across both authoritative and relayed matches. ListMatches(limit int, authoritative *wrappers.BoolValue, label *wrappers.StringValue, minSize *wrappers.Int32Value, maxSize *wrappers.Int32Value) []*api.Match // Stop the match registry and close all matches it's tracking. - Stop() + Stop(graceSeconds int) chan struct{} // Pass a user join attempt to a match handler. Returns if the match was found, if the join was accepted, a reason for any rejection, and the match label. JoinAttempt(id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string) (bool, bool, string, string) @@ -105,6 +106,9 @@ type LocalMatchRegistry struct { tracker Tracker node string matches map[uuid.UUID]*MatchHandler + + stopped *atomic.Bool + stoppedCh chan struct{} } func NewLocalMatchRegistry(logger *zap.Logger, config Config, tracker Tracker, node string) MatchRegistry { @@ -114,10 +118,18 @@ func NewLocalMatchRegistry(logger *zap.Logger, config Config, tracker Tracker, n tracker: tracker, node: node, matches: make(map[uuid.UUID]*MatchHandler), + + stopped: atomic.NewBool(false), + stoppedCh: make(chan struct{}, 2), } } func (r *LocalMatchRegistry) NewMatch(logger *zap.Logger, id uuid.UUID, label *atomic.String, core RuntimeMatchCore, params map[string]interface{}) (*MatchHandler, error) { + if r.stopped.Load() { + // Server is shutting down, reject new matches. + return nil, errors.New("shutdown in progress") + } + match, err := NewMatchHandler(logger, r.config, r, core, label, id, r.node, params) if err != nil { return nil, err @@ -140,8 +152,19 @@ func (r *LocalMatchRegistry) GetMatch(id uuid.UUID) *MatchHandler { func (r *LocalMatchRegistry) RemoveMatch(id uuid.UUID, stream PresenceStream) { r.Lock() delete(r.matches, id) + matchesRemaining := len(r.matches) r.Unlock() r.tracker.UntrackByStream(stream) + + // If there are no more matches in this registry and a shutdown was initiated then signal + // that the process is complete. + if matchesRemaining == 0 && r.stopped.Load() { + select { + case r.stoppedCh <- struct{}{}: + default: + // Ignore if the signal has already been sent. + } + } } func (r *LocalMatchRegistry) ListMatches(limit int, authoritative *wrappers.BoolValue, label *wrappers.StringValue, minSize *wrappers.Int32Value, maxSize *wrappers.Int32Value) []*api.Match { @@ -271,13 +294,45 @@ func (r *LocalMatchRegistry) ListMatches(limit int, authoritative *wrappers.Bool return results } -func (r *LocalMatchRegistry) Stop() { - r.Lock() - for id, mh := range r.matches { - mh.Close() - delete(r.matches, id) +func (r *LocalMatchRegistry) Stop(graceSeconds int) chan struct{} { + // Mark the match registry as stopped, but allow further calls here to signal periodic termination to any matches still running. + r.stopped.Store(true) + + // Graceful shutdown not allowed/required, or grace period has expired. + if graceSeconds == 0 { + r.RLock() + for id, mh := range r.matches { + mh.Close() + delete(r.matches, id) + } + r.RUnlock() + // Termination was triggered and there are no active matches. + select { + case r.stoppedCh <- struct{}{}: + default: + // Ignore if the signal has already been sent. + } + return r.stoppedCh } - r.Unlock() + + r.RLock() + if len(r.matches) == 0 { + // Termination was triggered and there are no active matches. + select { + case r.stoppedCh <- struct{}{}: + default: + // Ignore if the signal has already been sent. + } + r.RUnlock() + return r.stoppedCh + } + + for _, mh := range r.matches { + // Don't care if the call queue is full, match is supposed to end anyway. + mh.QueueCall(Terminate(graceSeconds)) + } + r.RUnlock() + return r.stoppedCh } func (r *LocalMatchRegistry) JoinAttempt(id uuid.UUID, node string, userID, sessionID uuid.UUID, username, fromNode string) (bool, bool, string, string) { diff --git a/server/runtime.go b/server/runtime.go index b7d2d0c75..90e4eb088 100644 --- a/server/runtime.go +++ b/server/runtime.go @@ -218,6 +218,7 @@ type RuntimeMatchCore interface { MatchJoin(tick int64, state interface{}, joins []*MatchPresence) (interface{}, error) MatchLeave(tick int64, state interface{}, leaves []*MatchPresence) (interface{}, error) MatchLoop(tick int64, state interface{}, inputCh chan *MatchDataMessage) (interface{}, error) + MatchTerminate(tick int64, state interface{}, graceSeconds int) (interface{}, error) } type RuntimeBeforeReqFunctions struct { diff --git a/server/runtime_go_match_core.go b/server/runtime_go_match_core.go index ea7e12811..b0c0bff59 100644 --- a/server/runtime_go_match_core.go +++ b/server/runtime_go_match_core.go @@ -139,6 +139,11 @@ func (r *RuntimeGoMatchCore) MatchLoop(tick int64, state interface{}, inputCh ch return newState, nil } +func (r *RuntimeGoMatchCore) MatchTerminate(tick int64, state interface{}, graceSeconds int) (interface{}, error) { + newState := r.match.MatchTerminate(r.ctx, r.stdLogger, r.db, r.nk, r, tick, state, graceSeconds) + return newState, nil +} + func (r *RuntimeGoMatchCore) BroadcastMessage(opCode int64, data []byte, presences []runtime.Presence, sender runtime.Presence) error { var presenceIDs []*PresenceID if presences != nil { diff --git a/server/runtime_lua_match_core.go b/server/runtime_lua_match_core.go index ecce248dc..baa2b0b3f 100644 --- a/server/runtime_lua_match_core.go +++ b/server/runtime_lua_match_core.go @@ -46,6 +46,7 @@ type RuntimeLuaMatchCore struct { joinFn lua.LValue leaveFn lua.LValue loopFn lua.LValue + terminateFn lua.LValue ctx *lua.LTable dispatcher *lua.LTable } @@ -108,8 +109,8 @@ func NewRuntimeLuaMatchCore(logger *zap.Logger, db *sql.DB, config Config, socia return nil, errors.New("match_join_attempt not found or not a function") } joinFn := tab.RawGet(lua.LString("match_join")) - if joinFn == nil || joinFn.Type() != lua.LTFunction { - joinFn = nil + if joinFn.Type() != lua.LTFunction { + return nil, errors.New("match_join not found or not a function") } leaveFn := tab.RawGet(lua.LString("match_leave")) if leaveFn.Type() != lua.LTFunction { @@ -119,6 +120,10 @@ func NewRuntimeLuaMatchCore(logger *zap.Logger, db *sql.DB, config Config, socia if loopFn.Type() != lua.LTFunction { return nil, errors.New("match_loop not found or not a function") } + terminateFn := tab.RawGet(lua.LString("match_terminate")) + if terminateFn.Type() != lua.LTFunction { + return nil, errors.New("match_terminate not found or not a function") + } core := &RuntimeLuaMatchCore{ logger: logger, @@ -143,6 +148,7 @@ func NewRuntimeLuaMatchCore(logger *zap.Logger, db *sql.DB, config Config, socia joinFn: joinFn, leaveFn: leaveFn, loopFn: loopFn, + terminateFn: terminateFn, ctx: ctx, // dispatcher set below. } @@ -429,6 +435,36 @@ func (r *RuntimeLuaMatchCore) MatchLoop(tick int64, state interface{}, inputCh c return newState, nil } +func (r *RuntimeLuaMatchCore) MatchTerminate(tick int64, state interface{}, graceSeconds int) (interface{}, error) { + // Execute the match_terminate call. + r.vm.Push(LSentinel) + r.vm.Push(r.terminateFn) + r.vm.Push(r.ctx) + r.vm.Push(r.dispatcher) + r.vm.Push(lua.LNumber(tick)) + r.vm.Push(state.(lua.LValue)) + r.vm.Push(lua.LNumber(graceSeconds)) + + err := r.vm.PCall(5, lua.MultRet, nil) + if err != nil { + return nil, err + } + + // Extract the resulting state. + newState := r.vm.Get(-1) + if newState.Type() == lua.LTNil || newState.Type() == LTSentinel { + return nil, nil + } + r.vm.Pop(1) + // Check for and remove the sentinel value, will fail if there are any extra return values. + if sentinel := r.vm.Get(-1); sentinel.Type() != LTSentinel { + return nil, errors.New("Match terminate returned too many values, stopping match") + } + r.vm.Pop(1) + + return newState, nil +} + func (r *RuntimeLuaMatchCore) broadcastMessage(l *lua.LState) int { opCode := l.CheckInt64(1) -- GitLab