Commit aae6914b authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Allow matches to update their labels.

parent d5541b9e
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -11,6 +11,8 @@ data/*
install/cloud/**/*.json
install/cloud/**/*.tfvars

*.pprof

### Go ###
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
+3 −0
Original line number Diff line number Diff line
@@ -11,10 +11,13 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Authoritative match modules now allow a `match_join` callback that triggers when users have completed their join process.
- New stream API function to upsert a user presence.
- Extended validation of Google tokens to account for different token payloads.
- Authoritative match labels can now be updated using the dispatcher's `match_label_update` function.

### Changed
- Presence list in match join responses no longer contains the user's own presence. 
- Presence list in channel join responses no longer contains the user's own presence.
- Socket read/write buffer sizes are now set based on the `socket.max_message_size_bytes` config value.
- Console GRPC port now set relative to `console.port` config value.

## [2.0.1] - 2018-06-15
### Added
+8 −0
Original line number Diff line number Diff line
@@ -69,6 +69,8 @@ Dispatcher exposes useful functions to the match. Format:
    -- a presence to tag on the message as the 'sender', or nil
  match_kick = function(presences)
    -- a list of presences to remove from the match
  match_label_update = function(label)
    -- a new label to set for the match
}

Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
@@ -119,6 +121,8 @@ Dispatcher exposes useful functions to the match. Format:
    -- a presence to tag on the message as the 'sender', or nil
  match_kick = function(presences)
    -- a list of presences to remove from the match
  match_label_update = function(label)
    -- a new label to set for the match
}

Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
@@ -169,6 +173,8 @@ Dispatcher exposes useful functions to the match. Format:
    -- a presence to tag on the message as the 'sender', or nil
  match_kick = function(presences)
    -- a list of presences to remove from the match
  match_label_update = function(label)
    -- a new label to set for the match
}

Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
@@ -219,6 +225,8 @@ Dispatcher exposes useful functions to the match. Format:
    -- a presence to tag on the message as the 'sender', or nil
  match_kick = function(presences)
    -- a list of presences to remove from the match
  match_label_update = function(label)
    -- a new label to set for the match
}

Tick is the current match tick number, starts at 0 and increments after every match_loop call. Does not increment with
+3 −3
Original line number Diff line number Diff line
@@ -59,9 +59,9 @@ func StartConsoleServer(logger *zap.Logger, startupLogger *zap.Logger, config Co
	}

	console.RegisterConsoleServer(grpcServer, s)
	startupLogger.Info("Starting Console server for gRPC requests", zap.Int("port", config.GetSocket().Port-2))
	startupLogger.Info("Starting Console server for gRPC requests", zap.Int("port", config.GetConsole().Port-3))
	go func() {
		listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", config.GetSocket().Port-2))
		listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", config.GetConsole().Port-3))
		if err != nil {
			startupLogger.Fatal("Console server listener failed to start", zap.Error(err))
		}
@@ -73,7 +73,7 @@ func StartConsoleServer(logger *zap.Logger, startupLogger *zap.Logger, config Co

	ctx := context.Background()
	grpcGateway := runtime.NewServeMux()
	dialAddr := fmt.Sprintf("127.0.0.1:%d", config.GetSocket().Port-2)
	dialAddr := fmt.Sprintf("127.0.0.1:%d", config.GetConsole().Port-3)
	dialOpts := []grpc.DialOption{
		//TODO (mo, zyro): Do we need to pass the statsHandler here as well?
		grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(int(config.GetSocket().MaxMessageSizeBytes))),
+25 −31
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import (
	"github.com/pkg/errors"
	"github.com/satori/go.uuid"
	"github.com/yuin/gopher-lua"
	"go.uber.org/atomic"
	"go.uber.org/zap"
)

@@ -39,7 +40,6 @@ type MatchDataMessage struct {
}

type MatchHandler struct {
	sync.Mutex
	logger        *zap.Logger
	matchRegistry MatchRegistry
	tracker       Tracker
@@ -67,10 +67,10 @@ type MatchHandler struct {
	ticker  *time.Ticker
	callCh  chan func(*MatchHandler)
	stopCh  chan struct{}
	stopped bool
	stopped *atomic.Bool

	// Immutable configuration set by match init.
	Label string
	// Configuration set by match init.
	Label *atomic.String
	Rate  int

	// Match state.
@@ -225,18 +225,19 @@ func NewMatchHandler(logger *zap.Logger, db *sql.DB, config Config, socialClient
		// Ticker below.
		callCh:  make(chan func(mh *MatchHandler), config.GetMatch().CallQueueSize),
		stopCh:  make(chan struct{}),
		stopped: false,
		stopped: atomic.NewBool(false),

		Label: labelStr,
		Label: atomic.NewString(labelStr),
		Rate:  rateInt,

		state: state,
	}

	// Set up the dispatcher that exposes control functions to the match loop.
	mh.dispatcher = vm.SetFuncs(vm.CreateTable(0, 2), map[string]lua.LGFunction{
	mh.dispatcher = vm.SetFuncs(vm.CreateTable(0, 3), map[string]lua.LGFunction{
		"broadcast_message":  mh.broadcastMessage,
		"match_kick":         mh.matchKick,
		"match_label_update": mh.matchLabelUpdate,
	})

	// Set up the ticker that governs the match loop.
@@ -274,13 +275,9 @@ func (mh *MatchHandler) Stop() {

// Used when the match is closed externally.
func (mh *MatchHandler) Close() {
	mh.Lock()
	if mh.stopped {
		mh.Unlock()
	if !mh.stopped.CAS(false, true) {
		return
	}
	mh.stopped = true
	mh.Unlock()
	close(mh.stopCh)
	mh.ticker.Stop()
}
@@ -309,12 +306,9 @@ func (mh *MatchHandler) QueueData(m *MatchDataMessage) {
}

func loop(mh *MatchHandler) {
	mh.Lock()
	if mh.stopped {
		mh.Unlock()
	if mh.stopped.Load() {
		return
	}
	mh.Unlock()

	// Drain the input queue into a Lua table.
	size := len(mh.inputCh)
@@ -379,13 +373,10 @@ func loop(mh *MatchHandler) {

func JoinAttempt(resultCh chan *MatchJoinResult, userID, sessionID uuid.UUID, username, node string) func(mh *MatchHandler) {
	return func(mh *MatchHandler) {
		mh.Lock()
		if mh.stopped {
			mh.Unlock()
		if mh.stopped.Load() {
			resultCh <- &MatchJoinResult{Allow: false}
			return
		}
		mh.Unlock()

		presence := mh.vm.CreateTable(0, 4)
		presence.RawSetString("user_id", lua.LString(userID.String()))
@@ -473,7 +464,7 @@ func JoinAttempt(resultCh chan *MatchJoinResult, userID, sessionID uuid.UUID, us
		mh.vm.Pop(1)

		mh.state = state
		resultCh <- &MatchJoinResult{Allow: allow, Reason: reason, Label: mh.Label}
		resultCh <- &MatchJoinResult{Allow: allow, Reason: reason, Label: mh.Label.Load()}
	}
}

@@ -483,12 +474,9 @@ func Join(joins []*MatchPresence) func(mh *MatchHandler) {
			return
		}

		mh.Lock()
		if mh.stopped {
			mh.Unlock()
		if mh.stopped.Load() {
			return
		}
		mh.Unlock()

		presences := mh.vm.CreateTable(len(joins), 0)
		for i, p := range joins {
@@ -539,12 +527,9 @@ func Join(joins []*MatchPresence) func(mh *MatchHandler) {

func Leave(leaves []*MatchPresence) func(mh *MatchHandler) {
	return func(mh *MatchHandler) {
		mh.Lock()
		if mh.stopped {
			mh.Unlock()
		if mh.stopped.Load() {
			return
		}
		mh.Unlock()

		presences := mh.vm.CreateTable(len(leaves), 0)
		for i, p := range leaves {
@@ -814,3 +799,12 @@ func (mh *MatchHandler) matchKick(l *lua.LState) int {
	mh.matchRegistry.Kick(mh.Stream, presences)
	return 0
}

func (mh *MatchHandler) matchLabelUpdate(l *lua.LState) int {
	input := l.OptString(1, "")

	mh.Label.Store(input)
	// This must be executed from inside a match call so safe to update here.
	mh.ctx.RawSetString(__CTX_MATCH_LABEL, lua.LString(input))
	return 0
}
Loading