diff --git a/CHANGELOG.md b/CHANGELOG.md
index ac36c6eb53ed3170ea64d793a840e56fb509b07f..bf267863f6c0fb99a5064f67a8b4a78a3a506faf 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,13 +5,23 @@ The format is based on [keep a changelog](http://keepachangelog.com/) and this p
## [Unreleased]
### Added
+- Lua script runtime for custom code.
- Node status now also reports a startup timestamp.
- New matchmaking feature.
-- Optionally send match data to only a subset of match participants.
+- Optionally send match data to a subset of match participants.
+- Fetch users by handle.
+- Add friend by handle.
+- When listing leaderboards, optionally filter by ID.
+- Allow users to store publicly readable data.
+
+### Changed
+- The build system now suffixes Windows binaries with `exe` extension.
### Fixed
- Set correct initial group member count when group is created.
- Do not update group count when join requests are rejected.
+- Use correct type in leaderboard record write best operation.
+- Correctly store global storage data.
## [0.12.2] - 2017-04-22
### Added
diff --git a/Makefile b/Makefile
index 56c6720c0d9c3172b06284414ba8998092da12fa..54b1e3751c7f2a50eb9af15ad71c17506721d319 100644
--- a/Makefile
+++ b/Makefile
@@ -66,6 +66,8 @@ $(PLATFORMS):
echo " Packaged '${OUTDIR}-$@-${arch}'";\
)
+windows: BINNAME := $(BINNAME).exe
+
.PHONY: relupload
relupload: JQ := $(shell jq --version)
relupload: TOKEN :=
diff --git a/README.md b/README.md
index d599732152405f1e51a1cff11edfd105d5eb1ca1..3271688a042a15317df3aa75163b025508deb936 100644
--- a/README.md
+++ b/README.md
@@ -2,31 +2,42 @@
> Distributed server for social and realtime games and apps.
+### Features
+
+* **Users** - Register/login new users via social networks, email, or device ID.
+* **Storage** - Store user records, settings, and other objects in collections.
+* **Social** - Users can connect with friends, and join groups. Builtin social graph to see how users can be connected.
+* **Chat** - 1-on-1, group, and global chat between users. Persist messages for chat history.
+* **Multiplayer** - Realtime, or turn-based active and passive multiplayer.
+* **Leaderboards** - Dynamic, seasonal, get top members, or members around a user. Have as many as you need.
+* **Runtime code** - Extend the server with custom logic written in Lua.
+
For more information have a look at the [documentation](https://heroiclabs.com/docs/) and for a quick list of build targets run `make help`.
If you encounter any issues with the server you can generate diagnostics for us with `nakama doctor`. Send these to support@heroiclabs.com or [open an issue](https://github.com/heroiclabs/nakama/issues).
### Start server
-Have a look at our [documentation](https://heroiclabs.com/docs/start-server/) for a full introduction on how to run Nakama in development and/or production.
+Have a look at our [documentation](https://heroiclabs.com/docs/running-nakama/) for a full introduction on how to run Nakama in development and/or production.
-To start a server locally and bind it to all network interfaces once it's installed and on your path - `nakama`. The server output will show how it's been configured by default.
+To start a server locally and bind it to all network interfaces once it's installed and on your path - run `nakama`. The server output will show how it's been configured by default.
```
$> nakama
-[I] Nakama starting at=$$now$$
-[I] Node name=nakama-97f4 version=$$version$$
-[I] Data directory path=$$datadir$$
-[I] Dashboard url=http://127.0.0.1:7351
-[I] Client port=7350
-[I] Startup done
+{"level":"info","ts":"$$timestamp$$","msg":"Node","name":"nakama-97f4","version":"$$version$$"}
+{"level":"info","ts":"$$timestamp$$","msg":"Data directory","path":"$$datadir$$"}
+{"level":"info","ts":"$$timestamp$$","msg":"Database connections","dsns":["root@localhost:26257"]}
+{"level":"info","ts":"$$timestamp$$","msg":"Evaluating modules","count":0,"modules":[]}
+{"level":"info","ts":"$$timestamp$$","msg":"Ops","port":7351}
+{"level":"info","ts":"$$timestamp$$","msg":"Dashboard","url":"http://127.0.0.1:7351"}
+{"level":"info","ts":"$$timestamp$$","msg":"Client","port":7350}
```
### Run Nakama with Docker
-Follow the [guide](https://heroiclabs.com/docs/setup/docker) to run Nakama (and CockroachDB) in Docker.
+Follow the [guide](https://heroiclabs.com/docs/install/docker/) to run Nakama (and CockroachDB) in Docker.
-
+
Nakama Docker images are available on [Docker Hub](http://hub.docker.com/r/heroiclabs/nakama/). If you'd like to publish your own Docker image have a look at our [Docker README](https://github.com/heroiclabs/nakama/blob/master/install/docker/README.md).
@@ -34,6 +45,14 @@ Nakama Docker images are available on [Docker Hub](http://hub.docker.com/r/heroi
Nakama can be deployed to any cloud with Docker Cloud such as AWS, Google Cloud, Azure, Digital Ocean or your own private cloud. You'll need to setup Docker Cloud and provision separate nodes for Nakama and CockroachDB.
+### Production deployments
+
+Nakama server uses cockroachdb as its database server. You're responsible for the [uptime](https://en.wikipedia.org/wiki/Uptime), [replication](https://en.wikipedia.org/wiki/Replication_(computing)), [backups](https://en.wikipedia.org/wiki/Backup), logs, and upgrades of your data.
+
+You also need to update the Nakama server with every new release and configure the server to auto-scale. If you use our Docker releases follow along with the "latest" image tag and check for new releases once a month.
+
+[Using our managed cloud service](https://heroiclabs.com/managed-cloud/) helps save you time, development costs, and eliminates managing your own clusters which is simpler and cheaper as you grow. We recommend our [Managed cloud](https://heroiclabs.com/managed-cloud/) if you're running production games or apps.
+
### Contribute
To build the codebase you will need to install these dependencies:
diff --git a/glide.lock b/glide.lock
index f425557151ec67e2cf459e521bbb21e4809803f8..7e1317d57cbd1a4247c61d600cbf5d9344f14f7e 100644
--- a/glide.lock
+++ b/glide.lock
@@ -1,5 +1,5 @@
-hash: 150ab75d51b17b2fb2b097193ea43bac8e07dfcb8ce0905c115be7e71639f506
-updated: 2017-04-13T23:43:22.128299698+01:00
+hash: eaca7acd10cdfdd13ddd0fcf302c626a8b7a450c13970abbbd1fac319f02c2ad
+updated: 2017-05-24T13:55:34.751285113+01:00
imports:
- name: github.com/armon/go-metrics
version: 97c69685293dce4c0a2d0b19535179bbc976e4d2
@@ -9,6 +9,8 @@ imports:
version: 9a6736ed45b44bf3835afeebb3034b57ed329f3e
subpackages:
- '...'
+- name: github.com/fatih/structs
+ version: a720dfa8df582c51dee1b36feabb906bde1588bd
- name: github.com/go-gorp/gorp
version: 4deece61034873cb5b5416e81abe4cea7bd0da72
- name: github.com/go-yaml/yaml
@@ -40,16 +42,16 @@ imports:
- sqlparse
- name: github.com/satori/go.uuid
version: b061729afc07e77a8aa4fad0a2fd840958f1942a
-- name: github.com/uber-go/atomic
- version: 3b8db5e93c4c02efbc313e17b2e796b0914a01fb
-- name: github.com/uber-go/zap
- version: a2773be06b9ac7c318a3a105b5c310af5730c6b4
+- name: github.com/yuin/gopher-lua
+ version: b402f3114ec730d8bddb074a6c137309f561aa78
subpackages:
- - zapcore
+ - ast
+ - parse
+ - pm
- name: go.uber.org/atomic
version: 4e336646b2ef9fc6e47be8e21594178f98e5ebcf
- name: go.uber.org/zap
- version: a2773be06b9ac7c318a3a105b5c310af5730c6b4
+ version: fab453050a7a08c35f31fc5fff6f2dbd962285ab
subpackages:
- buffer
- internal/bufferpool
diff --git a/glide.yaml b/glide.yaml
index c26c78afede8cc9d9acb516c5c748a9fbf966957..682c9c24e8339b6a65495fa66e1bf3a9ea05558f 100644
--- a/glide.yaml
+++ b/glide.yaml
@@ -32,7 +32,7 @@ import:
version: v2
- package: github.com/armon/go-metrics
- package: go.uber.org/zap
- version: ~1.1.0
+ version: ~1.4.0
- package: go.uber.org/atomic
version: ~1.2.0
- package: github.com/satori/go.uuid
@@ -43,3 +43,6 @@ import:
- '...'
- package: github.com/gorhill/cronexpr
version: ~1.0.0
+- package: github.com/yuin/gopher-lua
+- package: github.com/fatih/structs
+ version: ~1.0.0
diff --git a/main.go b/main.go
index 5f9f47670f8fa1e8c439af36634119479a82c357..f043d7695135674cd98230e4ab70aba43eb2cb4a 100644
--- a/main.go
+++ b/main.go
@@ -31,11 +31,12 @@ import (
"nakama/pkg/ga"
"nakama/server"
+ "nakama/pkg/social"
+
"github.com/armon/go-metrics"
"github.com/go-yaml/yaml"
_ "github.com/lib/pq"
uuid "github.com/satori/go.uuid"
-
"go.uber.org/zap"
)
@@ -100,7 +101,15 @@ func main() {
messageRouter := server.NewMessageRouterService(sessionRegistry)
presenceNotifier := server.NewPresenceNotifier(jsonLogger, config.GetName(), trackerService, messageRouter)
trackerService.AddDiffListener(presenceNotifier.HandleDiff)
- authService := server.NewAuthenticationService(jsonLogger, config, db, statsService, sessionRegistry, trackerService, matchmakerService, messageRouter)
+
+ runtime, err := server.NewRuntime(jsonLogger, multiLogger, db, config.GetRuntime())
+ if err != nil {
+ multiLogger.Fatal("Failed initializing runtime modules.", zap.Error(err))
+ }
+
+ socialClient := social.NewClient(5 * time.Second)
+ pipeline := server.NewPipeline(config, db, trackerService, matchmakerService, messageRouter, sessionRegistry, socialClient, runtime)
+ authService := server.NewAuthenticationService(jsonLogger, config, db, statsService, sessionRegistry, socialClient, pipeline, runtime)
opsService := server.NewOpsService(jsonLogger, multiLogger, semver, config, statsService)
gaenabled := len(os.Getenv("NAKAMA_TELEMETRY")) < 1
@@ -118,9 +127,10 @@ func main() {
<-c
multiLogger.Info("Shutting down")
- trackerService.Stop()
authService.Stop()
opsService.Stop()
+ trackerService.Stop()
+ runtime.Stop()
if gaenabled {
ga.SendSessionStop(http.DefaultClient, gacode, cookie)
diff --git a/server/api.proto b/server/api.proto
index 89b7efc0aa7ae61e2bf0310fb99771638028480c..6bae4a53d5bfc31e290e70214e0938e15cc52ed3 100644
--- a/server/api.proto
+++ b/server/api.proto
@@ -33,8 +33,10 @@ message Error {
USER_UNLINK_DISALLOWED = 7;
USER_HANDLE_INUSE = 8;
GROUP_NAME_INUSE = 9;
- STORAGE_FETCH_DISALLOWED = 10;
+ STORAGE_REJECTED = 10;
MATCH_NOT_FOUND = 11;
+ RUNTIME_FUNCTION_NOT_FOUND = 12;
+ RUNTIME_FUNCTION_EXCEPTION = 13;
}
int32 code = 1;
@@ -161,6 +163,8 @@ message Envelope {
TMatchmakeRemove matchmake_remove = 61;
TMatchmakeTicket matchmake_ticket = 62;
MatchmakeMatched matchmake_matched = 63;
+
+ TRpc rpc = 64;
}
}
@@ -234,7 +238,18 @@ message TSelfUpdate {
}
message TUsersFetch {
- repeated bytes user_ids = 1;
+ message UserIds {
+ repeated bytes user_ids = 1;
+ }
+
+ message Handles {
+ repeated string handles = 1;
+ }
+
+ oneof set {
+ UserIds user_ids = 1;
+ Handles handles = 2;
+ }
}
message TUsers {
repeated User users = 1;
@@ -246,7 +261,10 @@ message Friend {
}
message TFriendAdd {
- bytes user_id = 1;
+ oneof set {
+ bytes user_id = 1;
+ string handle = 2;
+ }
}
message TFriendRemove {
@@ -497,6 +515,15 @@ message MatchPresence {
repeated UserPresence leaves = 3;
}
+enum StoragePermissionRead {
+ NO_READ = 0;
+ OWNER_READ = 1;
+ PUBLIC_READ = 2;
+}
+enum StoragePermissionWrite {
+ NO_WRITE = 0;
+ OWNER_WRITE = 1;
+}
message TStorageFetch {
message StorageKey {
string bucket = 1;
@@ -514,8 +541,8 @@ message TStorageData {
bytes user_id = 4;
bytes value = 5;
bytes version = 6;
- int64 permission_read = 7;
- int64 permission_write = 8;
+ int32 permission_read = 7;
+ int32 permission_write = 8;
int64 created_at = 9;
int64 updated_at = 10;
int64 expires_at = 11;
@@ -531,6 +558,8 @@ message TStorageWrite {
string record = 3;
bytes value = 4;
bytes version = 5; // if-match and if-none-match
+ int32 permission_read = 6;
+ int32 permission_write = 7;
}
repeated StorageData data = 3;
@@ -586,6 +615,7 @@ message LeaderboardRecord {
message TLeaderboardsList {
int64 limit = 1;
bytes cursor = 2;
+ repeated bytes filter_leaderboard_id = 3;
}
message TLeaderboards {
repeated Leaderboard leaderboards = 1;
@@ -633,3 +663,8 @@ message TLeaderboardRecords {
repeated LeaderboardRecord records = 1;
bytes cursor = 2;
}
+
+message TRpc {
+ string id = 1;
+ bytes payload = 2;
+}
diff --git a/server/config.go b/server/config.go
index 7332e99d3311356b42ebf632f7b0b73bfcd93657..26346ce6109336b655f89dca6a6332caf0dbb476 100644
--- a/server/config.go
+++ b/server/config.go
@@ -33,6 +33,7 @@ type Config interface {
GetTransport() *TransportConfig
GetDatabase() *DatabaseConfig
GetSocial() *SocialConfig
+ GetRuntime() *RuntimeConfig
}
type config struct {
@@ -45,12 +46,13 @@ type config struct {
Transport *TransportConfig `yaml:"transport" json:"transport"`
Database *DatabaseConfig `yaml:"database" json:"database"`
Social *SocialConfig `yaml:"social" json:"social"`
+ Runtime *RuntimeConfig `yaml:"runtime" json"runtime"`
}
// NewConfig constructs a Config struct which represents server settings.
func NewConfig() *config {
cwd, _ := os.Getwd()
- dataDirectory := filepath.FromSlash(cwd + "/data")
+ dataDirectory := filepath.Join(cwd, "data")
nodeName := "nakama-" + strings.Split(uuid.NewV4().String(), "-")[3]
return &config{
Name: nodeName,
@@ -62,6 +64,7 @@ func NewConfig() *config {
Transport: NewTransportConfig(),
Database: NewDatabaseConfig(),
Social: NewSocialConfig(),
+ Runtime: NewRuntimeConfig(dataDirectory),
}
}
@@ -101,6 +104,10 @@ func (c *config) GetSocial() *SocialConfig {
return c.Social
}
+func (c *config) GetRuntime() *RuntimeConfig {
+ return c.Runtime
+}
+
// SessionConfig is configuration relevant to the session
type SessionConfig struct {
EncryptionKey string `yaml:"encryption_key" json:"encryption_key"`
@@ -171,3 +178,19 @@ func NewSocialConfig() *SocialConfig {
},
}
}
+
+// RuntimeConfig is configuration relevant to the Runtime Lua VM
+type RuntimeConfig struct {
+ Environment map[string]interface{} `yaml:"env" json:"env"`
+ Path string `yaml:"path" json:"path"`
+ HTTPKey string `yaml:"http_key" json:"http_key"`
+}
+
+// NewRuntimeConfig creates a new RuntimeConfig struct
+func NewRuntimeConfig(dataDirectory string) *RuntimeConfig {
+ return &RuntimeConfig{
+ Environment: make(map[string]interface{}),
+ Path: filepath.Join(dataDirectory, "modules"),
+ HTTPKey: "defaultkey",
+ }
+}
diff --git a/server/core_friend.go b/server/core_friend.go
new file mode 100644
index 0000000000000000000000000000000000000000..ce18fd6687d59cb4a44ea04783f0ba322c8f76dd
--- /dev/null
+++ b/server/core_friend.go
@@ -0,0 +1,105 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "database/sql"
+ "errors"
+
+ "go.uber.org/zap"
+)
+
+func friendAdd(logger *zap.Logger, db *sql.DB, userID []byte, friendID []byte) error {
+ tx, txErr := db.Begin()
+ if txErr != nil {
+ return txErr
+ }
+
+ var err error
+ defer func() {
+ if err != nil {
+ if rollbackErr := tx.Rollback(); rollbackErr != nil { // don't override value of err
+ logger.Error("Could not rollback transaction", zap.Error(rollbackErr))
+ }
+ } else {
+ if err = tx.Commit(); err != nil {
+ logger.Error("Could not commit transaction", zap.Error(err))
+ }
+ }
+ }()
+
+ updatedAt := nowMs()
+ // Mark an invite as accepted, if one was in place.
+ res, err := tx.Exec(`
+UPDATE user_edge SET state = 0, updated_at = $3
+WHERE (source_id = $1 AND destination_id = $2 AND state = 2)
+OR (source_id = $2 AND destination_id = $1 AND state = 1)
+ `, friendID, userID, updatedAt)
+ if err != nil {
+ return err
+ }
+ // If both edges were updated, it was accepting an invite was successful.
+ if rowsAffected, _ := res.RowsAffected(); rowsAffected == 2 {
+ return nil
+ }
+
+ // If no edge updates took place, it's a new invite being set up.
+ res, err = tx.Exec(`
+INSERT INTO user_edge (source_id, destination_id, state, position, updated_at)
+SELECT source_id, destination_id, state, position, updated_at
+FROM (VALUES
+ ($1::BYTEA, $2::BYTEA, 2, $3::BIGINT, $3::BIGINT),
+ ($2::BYTEA, $1::BYTEA, 1, $3::BIGINT, $3::BIGINT)
+) AS ue(source_id, destination_id, state, position, updated_at)
+WHERE EXISTS (SELECT id FROM users WHERE id = $2::BYTEA)
+ `, userID, friendID, updatedAt)
+ if err != nil {
+ return err
+ }
+
+ // An invite was successfully added if both components were inserted.
+ if rowsAffected, _ := res.RowsAffected(); rowsAffected != 2 {
+ err = errors.New("user ID not found or unavailable")
+ return err
+ }
+
+ // Update the user edge metadata counts.
+ res, err = tx.Exec(`
+UPDATE user_edge_metadata
+SET count = count + 1, updated_at = $1
+WHERE source_id = $2
+OR source_id = $3`,
+ updatedAt, userID, friendID)
+ if err != nil {
+ return err
+ }
+
+ if rowsAffected, _ := res.RowsAffected(); rowsAffected != 2 {
+ err = errors.New("could not update user friend counts")
+ return err
+ }
+
+ return nil
+}
+
+func friendAddHandle(logger *zap.Logger, db *sql.DB, userID []byte, friendHandle string) error {
+ var friendIdBytes []byte
+ err := db.QueryRow("SELECT id FROM users WHERE handle = $1", friendHandle).Scan(&friendIdBytes)
+ if err != nil {
+ return err
+ }
+
+ return friendAdd(logger, db, userID, friendIdBytes)
+}
diff --git a/server/core_leaderboard.go b/server/core_leaderboard.go
new file mode 100644
index 0000000000000000000000000000000000000000..65ae36c59fd9aa4845acbd439421b6dfbd174e53
--- /dev/null
+++ b/server/core_leaderboard.go
@@ -0,0 +1,87 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "encoding/json"
+
+ "database/sql"
+ "errors"
+
+ "github.com/gorhill/cronexpr"
+ "github.com/satori/go.uuid"
+ "go.uber.org/zap"
+)
+
+func createLeaderboard(logger *zap.Logger, db *sql.DB, id, sortOrder, resetSchedule, metadata string, authoritative bool) ([]byte, error) {
+ query := `INSERT INTO leaderboard (id, authoritative, sort_order, reset_schedule, metadata)
+ VALUES ($1, $2, $3, $4, $5)`
+ params := []interface{}{}
+
+ // ID.
+ if id == "" {
+ params = append(params, uuid.NewV4().Bytes())
+ } else {
+ params = append(params, []byte(id))
+ }
+
+ // Authoritative.
+ params = append(params, authoritative)
+
+ // Sort order.
+ if sortOrder == "asc" {
+ params = append(params, 0)
+ } else if sortOrder == "desc" {
+ params = append(params, 1)
+ } else {
+ logger.Warn("Invalid sort value, must be 'asc' or 'desc'.", zap.String("sort", sortOrder))
+ return nil, errors.New("Invalid sort value, must be 'asc' or 'desc'.")
+ }
+
+ // Count is hardcoded in the INSERT above.
+
+ // Reset schedule.
+ if resetSchedule != "" {
+ _, err := cronexpr.Parse(resetSchedule)
+ if err != nil {
+ logger.Warn("Failed to parse reset schedule", zap.String("reset", resetSchedule), zap.Error(err))
+ return nil, err
+ }
+ params = append(params, resetSchedule)
+ } else {
+ params = append(params, nil)
+ }
+
+ // Metadata.
+ metadataBytes := []byte(metadata)
+ var maybeJSON map[string]interface{}
+ if err := json.Unmarshal(metadataBytes, &maybeJSON); err != nil {
+ logger.Warn("Failed to unmarshall metadata", zap.String("metadata", metadata), zap.Error(err))
+ return nil, err
+ }
+ params = append(params, metadataBytes)
+
+ res, err := db.Exec(query, params...)
+ if err != nil {
+ logger.Error("Error creating leaderboard", zap.Error(err))
+ return nil, err
+ }
+ if rowsAffected, _ := res.RowsAffected(); rowsAffected != 1 {
+ logger.Error("Error creating leaderboard, unexpected insert result")
+ return nil, errors.New("Error creating leaderboard, unexpected insert result")
+ }
+
+ return params[0].([]byte), nil
+}
diff --git a/server/core_runtime_hooks.go b/server/core_runtime_hooks.go
new file mode 100644
index 0000000000000000000000000000000000000000..48f24f6543a499d2f8c7bd89a6bde46f9e0877ec
--- /dev/null
+++ b/server/core_runtime_hooks.go
@@ -0,0 +1,169 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "bytes"
+ "encoding/json"
+
+ "fmt"
+ "strings"
+
+ "github.com/gogo/protobuf/jsonpb"
+ "github.com/satori/go.uuid"
+ "go.uber.org/zap"
+)
+
+func RuntimeBeforeHook(runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, messageType string, envelope *Envelope, session *session) (*Envelope, error) {
+ fn := runtime.GetRuntimeCallback(BEFORE, messageType)
+ if fn == nil {
+ return envelope, nil
+ }
+
+ strEnvelope, err := jsonpbMarshaler.MarshalToString(envelope)
+ if err != nil {
+ return nil, err
+ }
+
+ var jsonEnvelope map[string]interface{}
+ if err = json.Unmarshal([]byte(strEnvelope), &jsonEnvelope); err != nil {
+ return nil, err
+ }
+
+ userId := uuid.Nil
+ handle := ""
+ expiry := int64(0)
+ if session != nil {
+ userId = session.userID
+ handle = session.handle.Load()
+ expiry = session.expiry
+ }
+
+ result, fnErr := runtime.InvokeFunctionBefore(fn, userId, handle, expiry, jsonEnvelope)
+ if fnErr != nil {
+ return nil, fnErr
+ }
+
+ bytesEnvelope, err := json.Marshal(result)
+ if err != nil {
+ return nil, err
+ }
+
+ resultEnvelope := &Envelope{}
+ if err = jsonpbUnmarshaler.Unmarshal(bytes.NewReader(bytesEnvelope), resultEnvelope); err != nil {
+ return nil, err
+ }
+
+ return resultEnvelope, nil
+}
+
+func RuntimeAfterHook(logger *zap.Logger, runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, messageType string, envelope *Envelope, session *session) {
+ fn := runtime.GetRuntimeCallback(AFTER, messageType)
+ if fn == nil {
+ return
+ }
+
+ strEnvelope, err := jsonpbMarshaler.MarshalToString(envelope)
+ if err != nil {
+ logger.Error("Failed to convert proto message to protoJSON in After invocation", zap.String("message", messageType), zap.Error(err))
+ return
+ }
+
+ var jsonEnvelope map[string]interface{}
+ if err = json.Unmarshal([]byte(strEnvelope), &jsonEnvelope); err != nil {
+ logger.Error("Failed to convert protoJSON message to Map in After invocation", zap.String("message", messageType), zap.Error(err))
+ return
+ }
+
+ userId := uuid.Nil
+ handle := ""
+ expiry := int64(0)
+ if session != nil {
+ userId = session.userID
+ handle = session.handle.Load()
+ expiry = session.expiry
+ }
+
+ if fnErr := runtime.InvokeFunctionAfter(fn, userId, handle, expiry, jsonEnvelope); fnErr != nil {
+ logger.Error("Runtime after function caused an error", zap.String("message", messageType), zap.Error(fnErr))
+ }
+}
+
+func RuntimeBeforeHookAuthentication(runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, jsonpbUnmarshaler *jsonpb.Unmarshaler, envelope *AuthenticateRequest) (*AuthenticateRequest, error) {
+ messageType := strings.TrimPrefix(fmt.Sprintf("%T", envelope.Payload), "*server")
+ fn := runtime.GetRuntimeCallback(BEFORE, messageType)
+ if fn == nil {
+ return envelope, nil
+ }
+
+ strEnvelope, err := jsonpbMarshaler.MarshalToString(envelope)
+ if err != nil {
+ return nil, err
+ }
+
+ var jsonEnvelope map[string]interface{}
+ if err = json.Unmarshal([]byte(strEnvelope), &jsonEnvelope); err != nil {
+ return nil, err
+ }
+
+ userId := uuid.Nil
+ handle := ""
+ expiry := int64(0)
+
+ result, fnErr := runtime.InvokeFunctionBefore(fn, userId, handle, expiry, jsonEnvelope)
+ if fnErr != nil {
+ return nil, fnErr
+ }
+
+ bytesEnvelope, err := json.Marshal(result)
+ if err != nil {
+ return nil, err
+ }
+
+ authenticationResult := &AuthenticateRequest{}
+ if err = jsonpbUnmarshaler.Unmarshal(bytes.NewReader(bytesEnvelope), authenticationResult); err != nil {
+ return nil, err
+ }
+
+ return authenticationResult, nil
+}
+
+func RuntimeAfterHookAuthentication(logger *zap.Logger, runtime *Runtime, jsonpbMarshaler *jsonpb.Marshaler, envelope *AuthenticateRequest) {
+ messageType := strings.TrimPrefix(fmt.Sprintf("%T", envelope.Payload), "*server")
+ fn := runtime.GetRuntimeCallback(AFTER, messageType)
+ if fn == nil {
+ return
+ }
+
+ strEnvelope, err := jsonpbMarshaler.MarshalToString(envelope)
+ if err != nil {
+ logger.Error("Failed to convert proto message to protoJSON in After invocation", zap.String("message", messageType), zap.Error(err))
+ return
+ }
+
+ var jsonEnvelope map[string]interface{}
+ if err = json.Unmarshal([]byte(strEnvelope), &jsonEnvelope); err != nil {
+ logger.Error("Failed to convert protoJSON message to Map in After invocation", zap.String("message", messageType), zap.Error(err))
+ return
+ }
+
+ userId := uuid.Nil
+ handle := ""
+ expiry := int64(0)
+
+ if fnErr := runtime.InvokeFunctionAfter(fn, userId, handle, expiry, jsonEnvelope); fnErr != nil {
+ logger.Error("Runtime after function caused an error", zap.String("message", messageType), zap.Error(fnErr))
+ }
+}
diff --git a/server/core_storage.go b/server/core_storage.go
new file mode 100644
index 0000000000000000000000000000000000000000..11772471701fc6e3074d83b5052677958219cc35
--- /dev/null
+++ b/server/core_storage.go
@@ -0,0 +1,368 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "bytes"
+ "crypto/sha256"
+ "database/sql"
+ "encoding/json"
+ "errors"
+ "fmt"
+
+ "github.com/satori/go.uuid"
+ "go.uber.org/zap"
+)
+
+type StorageKey struct {
+ Bucket string
+ Collection string
+ Record string
+ UserId []byte // this must be UserId not UserID
+ // Version is used when returning results from write ops, does not apply to fetch ops.
+ Version []byte
+}
+
+type StorageData struct {
+ Bucket string
+ Collection string
+ Record string
+ UserId []byte // this must be UserId not UserID
+ Value []byte
+ Version []byte
+ PermissionRead int64
+ PermissionWrite int64
+ CreatedAt int64
+ UpdatedAt int64
+ ExpiresAt int64
+}
+
+func StorageFetch(logger *zap.Logger, db *sql.DB, caller uuid.UUID, keys []*StorageKey) ([]*StorageData, Error_Code, error) {
+ // Ensure there is at least one key requested.
+ if len(keys) == 0 {
+ return nil, BAD_INPUT, errors.New("At least one fetch key is required")
+ }
+
+ query := `
+SELECT user_id, bucket, collection, record, value, version, read, write, created_at, updated_at, expires_at
+FROM storage
+WHERE `
+ params := make([]interface{}, 0)
+
+ // Accumulate the query clauses and corresponding parameters.
+ for i, key := range keys {
+ // Check the storage identifiers.
+ if key.Bucket == "" || key.Collection == "" || key.Record == "" {
+ return nil, BAD_INPUT, errors.New("Invalid values for bucket, collection, or record")
+ }
+
+ // If a user ID is provided, validate the format.
+ owner := []byte{}
+ if len(key.UserId) != 0 {
+ if uid, err := uuid.FromBytes(key.UserId); err != nil {
+ return nil, BAD_INPUT, errors.New("Invalid user ID")
+ } else {
+ owner = uid.Bytes()
+ }
+ }
+
+ if i != 0 {
+ query += " OR "
+ }
+ l := len(params)
+ query += fmt.Sprintf("(bucket = $%v AND collection = $%v AND user_id = $%v AND record = $%v AND deleted_at = 0", l+1, l+2, l+3, l+4)
+ params = append(params, key.Bucket, key.Collection, owner, key.Record)
+ if caller != uuid.Nil {
+ query += fmt.Sprintf(" AND (read = 2 OR (read = 1 AND user_id = $%v))", len(params)+1)
+ params = append(params, caller.Bytes())
+ }
+ query += ")"
+ }
+
+ // Execute the query.
+ rows, err := db.Query(query, params...)
+ if err != nil {
+ logger.Error("Error in storage fetch", zap.Error(err))
+ return nil, RUNTIME_EXCEPTION, err
+ }
+ defer rows.Close()
+
+ storageData := make([]*StorageData, 0)
+
+ // Parse the results.
+ for rows.Next() {
+ var userID []byte
+ var bucket sql.NullString
+ var collection sql.NullString
+ var record sql.NullString
+ var value []byte
+ var version []byte
+ var read sql.NullInt64
+ var write sql.NullInt64
+ var createdAt sql.NullInt64
+ var updatedAt sql.NullInt64
+ var expiresAt sql.NullInt64
+
+ err := rows.Scan(&userID, &bucket, &collection, &record, &value, &version,
+ &read, &write, &createdAt, &updatedAt, &expiresAt)
+ if err != nil {
+ logger.Error("Could not execute storage fetch query", zap.Error(err))
+ return nil, RUNTIME_EXCEPTION, err
+ }
+
+ // Potentially coerce zero-length global owner field.
+ if len(userID) == 0 {
+ userID = nil
+ }
+
+ // Accumulate the response.
+ storageData = append(storageData, &StorageData{
+ Bucket: bucket.String,
+ Collection: collection.String,
+ Record: record.String,
+ UserId: userID,
+ Value: value,
+ Version: version,
+ PermissionRead: read.Int64,
+ PermissionWrite: write.Int64,
+ CreatedAt: createdAt.Int64,
+ UpdatedAt: updatedAt.Int64,
+ ExpiresAt: expiresAt.Int64,
+ })
+ }
+ if err = rows.Err(); err != nil {
+ logger.Error("Could not execute storage fetch query", zap.Error(err))
+ return nil, RUNTIME_EXCEPTION, err
+ }
+
+ return storageData, 0, nil
+}
+
+func StorageWrite(logger *zap.Logger, db *sql.DB, caller uuid.UUID, data []*StorageData) ([]*StorageKey, Error_Code, error) {
+ // Ensure there is at least one value requested.
+ if len(data) == 0 {
+ return nil, BAD_INPUT, errors.New("At least one write value is required")
+ }
+
+ // Validate all input before starting DB operations.
+ for _, d := range data {
+ // Check the storage identifiers.
+ if d.Bucket == "" || d.Collection == "" || d.Record == "" {
+ return nil, BAD_INPUT, errors.New("Invalid values for bucket, collection, or record")
+ }
+
+ // Check the read permission value.
+ if d.PermissionRead != 0 && d.PermissionRead != 1 && d.PermissionRead != 2 {
+ return nil, BAD_INPUT, errors.New("Invalid read permission value")
+ }
+
+ // Check the write permission value.
+ if d.PermissionWrite != 0 && d.PermissionWrite != 1 {
+ return nil, BAD_INPUT, errors.New("Invalid write permission value")
+ }
+
+ // If a user ID is provided, validate the format.
+ if len(d.UserId) != 0 {
+ if uid, err := uuid.FromBytes(d.UserId); err != nil {
+ return nil, BAD_INPUT, errors.New("Invalid user ID")
+ } else if caller != uid {
+ // If the caller is a client, only allow them to write their own data.
+ return nil, BAD_INPUT, errors.New("Clients can only write their own data")
+ }
+ } else if caller != uuid.Nil {
+ // If the caller is a client, do not allow them to write global data.
+ return nil, BAD_INPUT, errors.New("Clients cannot write global data")
+ }
+
+ // Make this `var js interface{}` if we want to allow top-level JSON arrays.
+ var maybeJSON map[string]interface{}
+ if json.Unmarshal(d.Value, &maybeJSON) != nil {
+ return nil, BAD_INPUT, errors.New("All values must be valid JSON objects")
+ }
+ }
+
+ // Prepare response structure, expect to return as many keys as we're writing.
+ keys := make([]*StorageKey, len(data))
+
+ // Use same timestamp for all operations in this batch.
+ ts := nowMs()
+
+ // Start a transaction.
+ tx, err := db.Begin()
+ if err != nil {
+ logger.Error("Could not write storage, transaction error", zap.Error(err))
+ return nil, RUNTIME_EXCEPTION, errors.New("Could not write storage")
+ }
+
+ // Execute each storage write.
+ for i, d := range data {
+ id := uuid.NewV4().Bytes()
+ //sha := fmt.Sprintf("%x", sha256.Sum256(d.Value))
+ version := []byte(fmt.Sprintf("%x", sha256.Sum256(d.Value)))
+
+ // Check if it's global or user-owned data.
+ owner := []byte{}
+ if len(d.UserId) != 0 {
+ owner = d.UserId
+ }
+
+ query := `
+INSERT INTO storage (id, user_id, bucket, collection, record, value, version, read, write, created_at, updated_at, deleted_at)
+SELECT $1, $2, $3, $4, $5, $6::BYTEA, $7, $8, $9, $10, $10, 0`
+ params := []interface{}{id, owner, d.Bucket, d.Collection, d.Record, d.Value, version, d.PermissionRead, d.PermissionWrite, ts}
+
+ if len(d.Version) == 0 {
+ // Simple write.
+ query += " WHERE NOT EXISTS (SELECT record FROM storage WHERE user_id = $2 AND bucket = $3 AND collection = $4 AND record = $5 AND deleted_at = 0"
+ // If needed use an additional clause to enforce permissions.
+ if caller != uuid.Nil {
+ query += " AND write = 0"
+ }
+ query += `)
+ON CONFLICT (bucket, collection, user_id, record, deleted_at)
+DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at = $10`
+ } else if bytes.Equal(d.Version, []byte("*")) {
+ // if-none-match
+ query += " WHERE NOT EXISTS (SELECT record FROM storage WHERE user_id = $2 AND bucket = $3 AND collection = $4 AND record = $5 AND deleted_at = 0)"
+ // No additional clause needed to enforce permissions.
+ // Any existing record, no matter its write permission, will cause this operation to be rejected.
+ } else {
+ // if-match
+ query += " WHERE EXISTS (SELECT record FROM storage WHERE user_id = $2 AND bucket = $3 AND collection = $4 AND record = $5 AND deleted_at = 0 AND version = $11"
+ // If needed use an additional clause to enforce permissions.
+ if caller != uuid.Nil {
+ query += " AND write = 1"
+ }
+ query += `)
+ON CONFLICT (bucket, collection, user_id, record, deleted_at)
+DO UPDATE SET value = $6::BYTEA, version = $7, read = $8, write = $9, updated_at = $10`
+ params = append(params, d.Version)
+ }
+
+ // Execute the query.
+ res, err := tx.Exec(query, params...)
+ if err != nil {
+ logger.Error("Could not write storage, exec error", zap.Error(err))
+ if e := tx.Rollback(); e != nil {
+ logger.Error("Could not write storage, rollback error", zap.Error(e))
+ }
+ return nil, RUNTIME_EXCEPTION, errors.New("Could not write storage")
+ }
+
+ // Check there was exactly 1 row affected.
+ if rowsAffected, _ := res.RowsAffected(); rowsAffected != 1 {
+ err = tx.Rollback()
+ if err != nil {
+ logger.Error("Could not write storage, rollback error", zap.Error(err))
+ }
+ return nil, STORAGE_REJECTED, errors.New("Storage write rejected: not found, version check failed, or permission denied")
+ }
+
+ keys[i] = &StorageKey{
+ Bucket: d.Bucket,
+ Collection: d.Collection,
+ Record: d.Record,
+ UserId: d.UserId,
+ Version: version[:],
+ }
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ logger.Error("Could not write storage, commit error", zap.Error(err))
+ return nil, RUNTIME_EXCEPTION, errors.New("Could not write storage")
+ }
+
+ return keys, 0, nil
+}
+
+func StorageRemove(logger *zap.Logger, db *sql.DB, caller uuid.UUID, keys []*StorageKey) (Error_Code, error) {
+ // Ensure there is at least one key requested.
+ if len(keys) == 0 {
+ return BAD_INPUT, errors.New("At least one remove key is required")
+ }
+
+ query := `
+UPDATE storage SET deleted_at = $1, updated_at = $1
+WHERE `
+ params := []interface{}{nowMs()}
+
+ // Accumulate the query clauses and corresponding parameters.
+ for i, key := range keys {
+ // Check the storage identifiers.
+ if key.Bucket == "" || key.Collection == "" || key.Record == "" {
+ return BAD_INPUT, errors.New("Invalid values for bucket, collection, or record")
+ }
+
+ // If a user ID is provided, validate the format.
+ owner := []byte{}
+ if len(key.UserId) != 0 {
+ if uid, err := uuid.FromBytes(key.UserId); err != nil {
+ return BAD_INPUT, errors.New("Invalid user ID")
+ } else {
+ owner = uid.Bytes()
+ }
+ }
+
+ if i != 0 {
+ query += " OR "
+ }
+ l := len(params)
+ query += fmt.Sprintf("(bucket = $%v AND collection = $%v AND user_id = $%v AND record = $%v AND deleted_at = 0", l+1, l+2, l+3, l+4)
+ params = append(params, key.Bucket, key.Collection, owner, key.Record)
+ // Permission.
+ if caller != uuid.Nil {
+ query += fmt.Sprintf(" AND write = 1 AND user_id = $%v", len(params)+1)
+ params = append(params, caller.Bytes())
+ }
+ // Version.
+ if len(key.Version) != 0 {
+ query += fmt.Sprintf(" AND version = $%v", len(params)+1)
+ params = append(params, key.Version)
+ }
+ query += ")"
+ }
+
+ // Start a transaction.
+ tx, err := db.Begin()
+ if err != nil {
+ logger.Error("Could not remove storage, transaction error", zap.Error(err))
+ return RUNTIME_EXCEPTION, errors.New("Could not remove storage")
+ }
+
+ // Execute the query.
+ res, err := tx.Exec(query, params...)
+ if err != nil {
+ logger.Error("Could not remove storage, exec error", zap.Error(err))
+ return RUNTIME_EXCEPTION, errors.New("Could not remove storage")
+ }
+
+ // If not all keys resulted in a delete, rollback and return an error an error.
+ if rowsAffected, _ := res.RowsAffected(); rowsAffected != int64(len(keys)) {
+ err = tx.Rollback()
+ if err != nil {
+ logger.Error("Could not remove storage, rollback error", zap.Error(err))
+ }
+ return STORAGE_REJECTED, errors.New("Storage remove rejected: not found, version check failed, or permission denied")
+ }
+
+ err = tx.Commit()
+ if err != nil {
+ logger.Error("Could not remove storage, commit error", zap.Error(err))
+ return RUNTIME_EXCEPTION, errors.New("Could not remove storage")
+ }
+
+ return 0, nil
+}
diff --git a/server/core_user.go b/server/core_user.go
new file mode 100644
index 0000000000000000000000000000000000000000..d475dc4b690a7a33a22cbcf045daf5438d9a7c58
--- /dev/null
+++ b/server/core_user.go
@@ -0,0 +1,128 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "database/sql"
+
+ "errors"
+ "strconv"
+ "strings"
+
+ "go.uber.org/zap"
+)
+
+func querySocialGraph(logger *zap.Logger, db *sql.DB, filterQuery string, params []interface{}) ([]*User, error) {
+ users := []*User{}
+
+ query := `
+SELECT id, handle, fullname, avatar_url,
+ lang, location, timezone, metadata,
+ created_at, users.updated_at, last_online_at
+FROM users ` + filterQuery
+
+ rows, err := db.Query(query, params...)
+ if err != nil {
+ logger.Error("Could not execute social graph query", zap.String("query", query), zap.Error(err))
+ return nil, err
+ }
+ defer rows.Close()
+
+ var id []byte
+ var handle sql.NullString
+ var fullname sql.NullString
+ var avatarURL sql.NullString
+ var lang sql.NullString
+ var location sql.NullString
+ var timezone sql.NullString
+ var metadata []byte
+ var createdAt sql.NullInt64
+ var updatedAt sql.NullInt64
+ var lastOnlineAt sql.NullInt64
+
+ for rows.Next() {
+ err = rows.Scan(&id, &handle, &fullname, &avatarURL, &lang, &location, &timezone, &metadata, &createdAt, &updatedAt, &lastOnlineAt)
+ if err != nil {
+ logger.Error("Could not execute social graph query", zap.Error(err))
+ return nil, err
+ }
+
+ users = append(users, &User{
+ Id: id,
+ Handle: handle.String,
+ Fullname: fullname.String,
+ AvatarUrl: avatarURL.String,
+ Lang: lang.String,
+ Location: location.String,
+ Timezone: timezone.String,
+ Metadata: metadata,
+ CreatedAt: createdAt.Int64,
+ UpdatedAt: updatedAt.Int64,
+ LastOnlineAt: lastOnlineAt.Int64,
+ })
+ }
+ if err = rows.Err(); err != nil {
+ logger.Error("Could not execute social graph query", zap.Error(err))
+ return nil, err
+ }
+
+ return users, nil
+}
+
+func UsersFetchIds(logger *zap.Logger, db *sql.DB, userIds [][]byte) ([]*User, error) {
+ statements := make([]string, 0)
+ params := make([]interface{}, 0)
+
+ counter := 1
+ for _, userID := range userIds {
+ statement := "$" + strconv.Itoa(counter)
+ counter += 1
+ statements = append(statements, statement)
+ params = append(params, userID)
+ }
+
+ if len(statements) == 0 {
+ return nil, errors.New("No valid user IDs received")
+ }
+
+ query := "WHERE users.id IN (" + strings.Join(statements, ", ") + ")"
+ users, err := querySocialGraph(logger, db, query, params)
+ if err != nil {
+ return nil, errors.New("Could not retrieve users")
+ }
+
+ return users, nil
+}
+
+func UsersFetchHandle(logger *zap.Logger, db *sql.DB, handles []string) ([]*User, error) {
+ statements := make([]string, 0)
+ params := make([]interface{}, 0)
+
+ counter := 1
+ for _, handle := range handles {
+ statement := "$" + strconv.Itoa(counter)
+ counter += 1
+ statements = append(statements, statement)
+ params = append(params, handle)
+ }
+
+ query := "WHERE users.handle IN (" + strings.Join(statements, ", ") + ")"
+ users, err := querySocialGraph(logger, db, query, params)
+ if err != nil {
+ return nil, errors.New("Could not retrieve users")
+ }
+
+ return users, nil
+}
diff --git a/server/pipeline.go b/server/pipeline.go
index c61bc30cd9705366778241e2a62127b74a9bc18d..962690f443122ad2181c214fb053b24e66de4c9f 100644
--- a/server/pipeline.go
+++ b/server/pipeline.go
@@ -18,38 +18,68 @@ import (
"database/sql"
"fmt"
- "go.uber.org/zap"
-
"nakama/pkg/social"
+
+ "strings"
+
+ "github.com/gogo/protobuf/jsonpb"
+ "go.uber.org/zap"
)
type pipeline struct {
- config Config
- db *sql.DB
- socialClient *social.Client
- tracker Tracker
- matchmaker Matchmaker
- hmacSecretByte []byte
- messageRouter MessageRouter
- sessionRegistry *SessionRegistry
+ config Config
+ db *sql.DB
+ tracker Tracker
+ matchmaker Matchmaker
+ hmacSecretByte []byte
+ messageRouter MessageRouter
+ sessionRegistry *SessionRegistry
+ socialClient *social.Client
+ runtime *Runtime
+ jsonpbMarshaler *jsonpb.Marshaler
+ jsonpbUnmarshaler *jsonpb.Unmarshaler
}
// NewPipeline creates a new Pipeline
-func NewPipeline(config Config, db *sql.DB, socialClient *social.Client, tracker Tracker, matchmaker Matchmaker, messageRouter MessageRouter, registry *SessionRegistry) *pipeline {
+func NewPipeline(config Config, db *sql.DB, tracker Tracker, matchmaker Matchmaker, messageRouter MessageRouter, registry *SessionRegistry, socialClient *social.Client, runtime *Runtime) *pipeline {
return &pipeline{
config: config,
db: db,
- socialClient: socialClient,
tracker: tracker,
matchmaker: matchmaker,
hmacSecretByte: []byte(config.GetSession().EncryptionKey),
messageRouter: messageRouter,
sessionRegistry: registry,
+ socialClient: socialClient,
+ runtime: runtime,
+ jsonpbMarshaler: &jsonpb.Marshaler{
+ EnumsAsInts: true,
+ EmitDefaults: false,
+ Indent: "",
+ OrigName: false,
+ },
+ jsonpbUnmarshaler: &jsonpb.Unmarshaler{
+ AllowUnknownFields: false,
+ },
}
}
-func (p *pipeline) processRequest(logger *zap.Logger, session *session, envelope *Envelope) {
- logger.Debug(fmt.Sprintf("Received %T message", envelope.Payload))
+func (p *pipeline) processRequest(logger *zap.Logger, session *session, originalEnvelope *Envelope) {
+ if originalEnvelope.Payload == nil {
+ session.Send(ErrorMessage(originalEnvelope.CollationId, MISSING_PAYLOAD, "No payload found"))
+ return
+ }
+
+ messageType := fmt.Sprintf("%T", originalEnvelope.Payload)
+ logger.Debug("Received message", zap.String("type", messageType))
+
+ messageType = strings.TrimPrefix(messageType, "*server.Envelope_")
+ envelope, fnErr := RuntimeBeforeHook(p.runtime, p.jsonpbMarshaler, p.jsonpbUnmarshaler, messageType, originalEnvelope, session)
+ if fnErr != nil {
+ logger.Error("Runtime before function caused an error", zap.String("message", messageType), zap.Error(fnErr))
+ session.Send(ErrorMessage(originalEnvelope.CollationId, RUNTIME_FUNCTION_EXCEPTION, fmt.Sprintf("Runtime before function caused an error: %s", fnErr.Error())))
+ return
+ }
switch envelope.Payload.(type) {
case *Envelope_Logout:
@@ -142,11 +172,15 @@ func (p *pipeline) processRequest(logger *zap.Logger, session *session, envelope
case *Envelope_LeaderboardRecordsList:
p.leaderboardRecordsList(logger, session, envelope)
- case nil:
- session.Send(ErrorMessage(envelope.CollationId, MISSING_PAYLOAD, "No payload found"))
+ case *Envelope_Rpc:
+ p.rpc(logger, session, envelope)
+
default:
session.Send(ErrorMessage(envelope.CollationId, UNRECOGNIZED_PAYLOAD, "Unrecognized payload"))
+ return
}
+
+ RuntimeAfterHook(logger, p.runtime, p.jsonpbMarshaler, messageType, envelope, session)
}
func ErrorMessageRuntimeException(collationID string, message string) *Envelope {
diff --git a/server/pipeline_friend.go b/server/pipeline_friend.go
index 756f0428e5f05ed9ca0cbed9764734fdae08a61b..4a3df5a4b1b5c64d8a89f88034cff806dbeb1c82 100644
--- a/server/pipeline_friend.go
+++ b/server/pipeline_friend.go
@@ -196,104 +196,60 @@ FROM users, user_edge ` + filterQuery
}
func (p *pipeline) friendAdd(l *zap.Logger, session *session, envelope *Envelope) {
- addFriendRequest := envelope.GetFriendAdd()
- if len(addFriendRequest.UserId) == 0 {
+ f := envelope.GetFriendAdd()
+
+ switch f.Set.(type) {
+ case *TFriendAdd_UserId:
+ p.friendAddById(l, session, envelope, f.GetUserId())
+ case *TFriendAdd_Handle:
+ p.friendAddByHandle(l, session, envelope, f.GetHandle())
+ }
+}
+
+func (p *pipeline) friendAddById(l *zap.Logger, session *session, envelope *Envelope, friendIdBytes []byte) {
+ if len(friendIdBytes) == 0 {
session.Send(ErrorMessageBadInput(envelope.CollationId, "User ID must be present"))
return
}
-
- friendID, err := uuid.FromBytes(addFriendRequest.UserId)
+ friendID, err := uuid.FromBytes(friendIdBytes)
if err != nil {
l.Warn("Could not add friend", zap.Error(err))
session.Send(ErrorMessageBadInput(envelope.CollationId, "Invalid User ID"))
return
}
- logger := l.With(zap.String("friend_id", friendID.String()))
- friendIDBytes := friendID.Bytes()
+ logger := l.With(zap.String("friend_id", friendID.String()))
if friendID == session.userID {
logger.Warn("Cannot add self", zap.Error(err))
session.Send(ErrorMessageBadInput(envelope.CollationId, "Cannot add self"))
return
}
- tx, err := p.db.Begin()
- if err != nil {
+ if err := friendAdd(logger, p.db, session.userID.Bytes(), friendID.Bytes()); err != nil {
logger.Error("Could not add friend", zap.Error(err))
session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Failed to add friend"))
return
}
- defer func() {
- if err != nil {
- logger.Error("Could not add friend", zap.Error(err))
- err = tx.Rollback()
- if err != nil {
- logger.Error("Could not rollback transaction", zap.Error(err))
- }
-
- session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Failed to add friend"))
- } else {
- err = tx.Commit()
- if err != nil {
- logger.Error("Could not commit transaction", zap.Error(err))
- session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Failed to add friend"))
- } else {
- logger.Info("Added friend")
- session.Send(&Envelope{CollationId: envelope.CollationId})
- }
- }
- }()
-
- updatedAt := nowMs()
- // Mark an invite as accepted, if one was in place.
- res, err := tx.Exec(`
-UPDATE user_edge SET state = 0, updated_at = $3
-WHERE (source_id = $1 AND destination_id = $2 AND state = 2)
-OR (source_id = $2 AND destination_id = $1 AND state = 1)
- `, friendIDBytes, session.userID.Bytes(), updatedAt)
- if err != nil {
- return
- }
- // If both edges were updated, it was accepting an invite was successful.
- if rowsAffected, _ := res.RowsAffected(); rowsAffected == 2 {
- return
- }
- // If no edge updates took place, it's a new invite being set up.
- res, err = tx.Exec(`
-INSERT INTO user_edge (source_id, destination_id, state, position, updated_at)
-SELECT source_id, destination_id, state, position, updated_at
-FROM (VALUES
- ($1::BYTEA, $2::BYTEA, 2, $3::BIGINT, $3::BIGINT),
- ($2::BYTEA, $1::BYTEA, 1, $3::BIGINT, $3::BIGINT)
-) AS ue(source_id, destination_id, state, position, updated_at)
-WHERE EXISTS (SELECT id FROM users WHERE id = $2::BYTEA)
- `, session.userID.Bytes(), friendIDBytes, updatedAt)
- if err != nil {
- return
- }
+ logger.Info("Added friend")
+ session.Send(&Envelope{CollationId: envelope.CollationId})
+}
- // An invite was successfully added if both components were inserted.
- if rowsAffected, _ := res.RowsAffected(); rowsAffected != 2 {
- err = errors.New("user ID not found or unavailable")
+func (p *pipeline) friendAddByHandle(l *zap.Logger, session *session, envelope *Envelope, friendHandle string) {
+ if friendHandle == "" || friendHandle == session.handle.Load() {
+ session.Send(ErrorMessageBadInput(envelope.CollationId, "User handle must be present and not equal to user's handle"))
return
}
- // Update the user edge metadata counts.
- res, err = tx.Exec(`
-UPDATE user_edge_metadata
-SET count = count + 1, updated_at = $1
-WHERE source_id = $2
-OR source_id = $3`,
- updatedAt, session.userID.Bytes(), friendIDBytes)
- if err != nil {
+ logger := l.With(zap.String("friend_handle", friendHandle))
+ if err := friendAddHandle(logger, p.db, session.userID.Bytes(), friendHandle); err != nil {
+ logger.Error("Could not add friend", zap.Error(err))
+ session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Failed to add friend"))
return
}
- if rowsAffected, _ := res.RowsAffected(); rowsAffected != 2 {
- err = errors.New("could not update user friend counts")
- return
- }
+ logger.Info("Added friend")
+ session.Send(&Envelope{CollationId: envelope.CollationId})
}
func (p *pipeline) friendRemove(l *zap.Logger, session *session, envelope *Envelope) {
diff --git a/server/pipeline_leaderboard.go b/server/pipeline_leaderboard.go
index 43525ca2a41d200b245351808fdb72a07554f421..6ba49028982a06d1756fbec7c11db12f4292e942 100644
--- a/server/pipeline_leaderboard.go
+++ b/server/pipeline_leaderboard.go
@@ -66,6 +66,21 @@ func (p *pipeline) leaderboardsList(logger *zap.Logger, session *session, envelo
params = append(params, incomingCursor.Id)
}
+ if len(incoming.GetFilterLeaderboardId()) != 0 {
+ statements := make([]string, 0)
+ for _, filterId := range incoming.GetFilterLeaderboardId() {
+ params = append(params, filterId)
+ statement := "$" + strconv.Itoa(len(params))
+ statements = append(statements, statement)
+ }
+
+ if len(incoming.Cursor) != 0 {
+ query += " AND "
+ }
+
+ query += " WHERE id IN (" + strings.Join(statements, ", ") + ")"
+ }
+
params = append(params, limit+1)
query += " LIMIT $" + strconv.Itoa(len(params))
@@ -200,10 +215,10 @@ func (p *pipeline) leaderboardRecordWrite(logger *zap.Logger, session *session,
case *TLeaderboardRecordWrite_Best:
if sortOrder == 0 {
// Lower score is better.
- scoreOpSql = "score = (leaderboard_record.score + $17::BIGINT - abs(leaderboard_record.score - $17::BIGINT)) / 2"
+ scoreOpSql = "score = ((leaderboard_record.score + $17::BIGINT - abs(leaderboard_record.score - $17::BIGINT)) / 2)::BIGINT"
} else {
// Higher score is better.
- scoreOpSql = "score = (leaderboard_record.score + $17::BIGINT + abs(leaderboard_record.score - $17::BIGINT)) / 2"
+ scoreOpSql = "score = ((leaderboard_record.score + $17::BIGINT + abs(leaderboard_record.score - $17::BIGINT)) / 2)::BIGINT"
}
scoreDelta = incoming.GetBest()
scoreAbs = incoming.GetBest()
diff --git a/server/pipeline_runtime.go b/server/pipeline_runtime.go
new file mode 100644
index 0000000000000000000000000000000000000000..c8ad438ed895da46ff06ea25464624a81712d111
--- /dev/null
+++ b/server/pipeline_runtime.go
@@ -0,0 +1,44 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "fmt"
+
+ "go.uber.org/zap"
+)
+
+func (p *pipeline) rpc(logger *zap.Logger, session *session, envelope *Envelope) {
+ rpcMessage := envelope.GetRpc()
+ if rpcMessage.Id == "" {
+ session.Send(ErrorMessageBadInput(envelope.CollationId, "RPC ID must be set"))
+ return
+ }
+
+ lf := p.runtime.GetRuntimeCallback(RPC, rpcMessage.Id)
+ if lf == nil {
+ session.Send(ErrorMessage(envelope.CollationId, RUNTIME_FUNCTION_NOT_FOUND, "RPC function not found"))
+ return
+ }
+
+ result, fnErr := p.runtime.InvokeFunctionRPC(lf, session.userID, session.handle.Load(), session.expiry, rpcMessage.Payload)
+ if fnErr != nil {
+ logger.Error("Runtime RPC function caused an error", zap.String("id", rpcMessage.Id), zap.Error(fnErr))
+ session.Send(ErrorMessage(envelope.CollationId, RUNTIME_FUNCTION_EXCEPTION, fmt.Sprintf("Runtime function caused an error: %s", fnErr.Error())))
+ return
+ }
+
+ session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_Rpc{Rpc: &TRpc{Id: rpcMessage.Id, Payload: result}}})
+}
diff --git a/server/pipeline_storage.go b/server/pipeline_storage.go
index b8c39e1e3a4db1729ff74dc6a39a1c1a9f800eb8..8ad4e0c53eb0c3f20c82fdc3329313a43f7f48d1 100644
--- a/server/pipeline_storage.go
+++ b/server/pipeline_storage.go
@@ -14,106 +14,45 @@
package server
-import (
- "bytes"
- "crypto/sha256"
- "database/sql"
- "errors"
- "fmt"
-
- "github.com/satori/go.uuid"
- "go.uber.org/zap"
-)
-
-func (p *pipeline) fetchStorageData(r scanner) (*TStorageData_StorageData, error) {
- var userID []byte
- var bucket sql.NullString
- var collection sql.NullString
- var record sql.NullString
- var value []byte
- var version []byte
- var read sql.NullInt64
- var write sql.NullInt64
- var createdAt sql.NullInt64
- var updatedAt sql.NullInt64
- var expiresAt sql.NullInt64
-
- err := r.Scan(&userID, &bucket, &collection, &record,
- &value, &version, &read, &write,
- &createdAt, &updatedAt, &expiresAt)
-
- if err != nil {
- return &TStorageData_StorageData{}, err
- }
-
- return &TStorageData_StorageData{
- Bucket: bucket.String,
- Collection: collection.String,
- Record: record.String,
- UserId: userID,
- Value: value,
- Version: version,
- PermissionRead: read.Int64,
- PermissionWrite: write.Int64,
- CreatedAt: createdAt.Int64,
- UpdatedAt: updatedAt.Int64,
- ExpiresAt: expiresAt.Int64,
- }, nil
-}
+import "go.uber.org/zap"
func (p *pipeline) storageFetch(logger *zap.Logger, session *session, envelope *Envelope) {
incoming := envelope.GetStorageFetch()
- storageData := make([]*TStorageData_StorageData, 0)
-
- for _, key := range incoming.Keys {
- if key.Bucket == "" || key.Collection == "" || key.Record == "" {
- logger.Error("Invalid values for Bucket or Collection or Record")
- session.Send(ErrorMessageBadInput(envelope.CollationId, "Invalid values for Bucket or Collection or Record"))
- return
- }
-
- if len(key.UserId) != 0 {
- userID, err := uuid.FromBytes(key.UserId)
- if err != nil {
- session.Send(ErrorMessageBadInput(envelope.CollationId, "Invalid User ID"))
- return
- }
+ if len(incoming.Keys) == 0 {
+ session.Send(ErrorMessageBadInput(envelope.CollationId, "At least one fetch key is required"))
+ return
+ }
- if userID != session.userID {
- logger.Error("Not allowed to fetch from storage of a different user")
- session.Send(ErrorMessage(envelope.CollationId, STORAGE_FETCH_DISALLOWED, "Not allowed to fetch from storage of a different user"))
- return
- }
+ keys := make([]*StorageKey, len(incoming.Keys))
+ for i, key := range incoming.Keys {
+ keys[i] = &StorageKey{
+ Bucket: key.Bucket,
+ Collection: key.Collection,
+ Record: key.Record,
+ UserId: key.UserId,
}
+ }
- var row *sql.Row
- if len(key.UserId) == 0 {
- query := `
-SELECT user_id, bucket, collection, record,
- value, version, read, write,
- created_at, updated_at, expires_at
-FROM storage
-WHERE bucket = $1 AND collection = $2 AND record = $4 AND user_id IS NULL AND deleted_at = 0 AND read = 1`
- row = p.db.QueryRow(query, key.Bucket, key.Collection, key.Record)
- } else {
- query := `
-SELECT user_id, bucket, collection, record,
- value, version, read, write,
- created_at, updated_at, expires_at
-FROM storage
-WHERE bucket = $1 AND collection = $2 AND user_id = $3 AND record = $4 AND deleted_at = 0 AND read = 1`
- row = p.db.QueryRow(query, key.Bucket, key.Collection, session.userID.Bytes(), key.Record)
- }
+ data, code, err := StorageFetch(logger, p.db, session.userID, keys)
+ if err != nil {
+ session.Send(ErrorMessage(envelope.CollationId, code, err.Error()))
+ return
+ }
- data, err := p.fetchStorageData(row)
- if err != nil {
- logger.Error("Could not fetch from storage",
- zap.Error(err),
- zap.String("bucket", key.Bucket),
- zap.String("collection", key.Collection),
- zap.String("record", key.Record))
- } else {
- storageData = append(storageData, data)
+ storageData := make([]*TStorageData_StorageData, len(data))
+ for i, d := range data {
+ storageData[i] = &TStorageData_StorageData{
+ Bucket: d.Bucket,
+ Collection: d.Collection,
+ Record: d.Record,
+ UserId: d.UserId,
+ Value: d.Value,
+ Version: d.Version,
+ PermissionRead: int32(d.PermissionRead),
+ PermissionWrite: int32(d.PermissionWrite),
+ CreatedAt: d.CreatedAt,
+ UpdatedAt: d.UpdatedAt,
+ ExpiresAt: d.ExpiresAt,
}
}
@@ -122,185 +61,66 @@ WHERE bucket = $1 AND collection = $2 AND user_id = $3 AND record = $4 AND delet
func (p *pipeline) storageWrite(logger *zap.Logger, session *session, envelope *Envelope) {
incoming := envelope.GetStorageWrite()
-
- tx, err := p.db.Begin()
- if err != nil {
- logger.Error("Could not store data", zap.Error(err))
- session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Could not store data"))
+ if len(incoming.Data) == 0 {
+ session.Send(ErrorMessageRuntimeException(envelope.CollationId, "At least one write value is required"))
return
}
- response := make([]*TStorageKey_StorageKey, 0)
-
- errorMessage := "Could not store data"
-
- defer func() {
- if err != nil {
- logger.Error("Could not store data", zap.Error(err))
- err = tx.Rollback()
- if err != nil {
- logger.Error("Could not rollback transaction", zap.Error(err))
- }
-
- session.Send(ErrorMessageRuntimeException(envelope.CollationId, errorMessage))
- } else {
- err = tx.Commit()
- if err != nil {
- logger.Error("Could not commit transaction", zap.Error(err))
- session.Send(ErrorMessageRuntimeException(envelope.CollationId, errorMessage))
- } else {
- logger.Info("Stored data successfully")
- session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_StorageKey{StorageKey: &TStorageKey{Keys: response}}})
- }
+ data := make([]*StorageData, len(incoming.Data))
+ for i, d := range incoming.Data {
+ data[i] = &StorageData{
+ Bucket: d.Bucket,
+ Collection: d.Collection,
+ Record: d.Record,
+ UserId: session.userID.Bytes(),
+ Value: d.Value,
+ Version: d.Version,
+ PermissionRead: int64(d.PermissionRead),
+ PermissionWrite: int64(d.PermissionWrite),
}
- }()
-
- updatedAt := nowMs()
-
- for _, data := range incoming.Data {
-
- if data.Bucket == "" {
- errorMessage = "Bucket value is empty"
- err = errors.New(errorMessage)
- return
- } else if data.Collection == "" {
- errorMessage = "Collection value is empty"
- err = errors.New(errorMessage)
- return
- } else if data.Record == "" {
- errorMessage = "Record value is empty"
- err = errors.New(errorMessage)
- return
- }
-
- recordID := uuid.NewV4().Bytes()
- sha := fmt.Sprintf("%x", sha256.Sum256(data.Value))
- version := []byte(sha)
-
- query := ""
- params := []interface{}{}
+ }
- if len(data.Version) == 0 {
- query = `
-INSERT INTO storage (id, user_id, bucket, collection, record, value, version, created_at, updated_at, deleted_at)
-SELECT $1, $2, $3, $4, $5, $6, $7, $8, $8, 0
-WHERE NOT EXISTS (SELECT record FROM storage WHERE user_id = $2 AND bucket = $3 AND collection = $4 AND record = $5 AND deleted_at = 0 AND write = 0)
-ON CONFLICT (bucket, collection, user_id, record, deleted_at)
-DO UPDATE SET value = $6, version = $7, updated_at = $8
-`
- params = []interface{}{recordID, session.userID.Bytes(), data.Bucket, data.Collection, data.Record, data.Value, version, updatedAt}
- errorMessage = "Could not store data"
- } else if bytes.Equal(data.Version, []byte("*")) {
- // if-none-match
- query = `
-INSERT INTO storage (id, user_id, bucket, collection, record, value, version, created_at, updated_at, deleted_at)
-SELECT $1, $2, $3, $4, $5, $6, $7, $8, $8, 0
-WHERE NOT EXISTS (SELECT record FROM storage WHERE user_id = $2 AND bucket = $3 AND collection = $4 AND record = $5 AND deleted_at = 0)
-`
- params = []interface{}{recordID, session.userID.Bytes(), data.Bucket, data.Collection, data.Record, data.Value, version, updatedAt}
- errorMessage = "Could not store data. This could be caused by failure of if-none-match version check"
- } else {
- // if-match
- query = `
-INSERT INTO storage (id, user_id, bucket, collection, record, value, version, created_at, updated_at, deleted_at)
-SELECT $1, $2, $3, $4, $5, $6, $7, $8, $8, 0
-WHERE EXISTS (SELECT record FROM storage WHERE user_id = $2 AND bucket = $3 AND collection = $4 and record = $5 AND version = $9 AND deleted_at = 0 AND write = 1)
-ON CONFLICT (bucket, collection, user_id, record, deleted_at)
-DO UPDATE SET value = $6, version = $7, updated_at = $8
-`
- params = []interface{}{recordID, session.userID.Bytes(), data.Bucket, data.Collection, data.Record, data.Value, version, updatedAt, data.Version}
- errorMessage = "Could not store data. This could be caused by failure of if-match version check"
- }
+ keys, code, err := StorageWrite(logger, p.db, session.userID, data)
+ if err != nil {
+ session.Send(ErrorMessage(envelope.CollationId, code, err.Error()))
+ return
+ }
- _, err = tx.Exec(query, params...)
- if err != nil {
- return
+ storageKeys := make([]*TStorageKey_StorageKey, len(keys))
+ for i, key := range keys {
+ storageKeys[i] = &TStorageKey_StorageKey{
+ Bucket: key.Bucket,
+ Collection: key.Collection,
+ Record: key.Record,
+ Version: key.Version,
}
-
- response = append(response, &TStorageKey_StorageKey{
- Bucket: data.Bucket,
- Collection: data.Collection,
- Record: data.Record,
- Version: version[:],
- })
}
+
+ session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_StorageKey{StorageKey: &TStorageKey{Keys: storageKeys}}})
}
func (p *pipeline) storageRemove(logger *zap.Logger, session *session, envelope *Envelope) {
incoming := envelope.GetStorageRemove()
-
- tx, err := p.db.Begin()
- if err != nil {
- logger.Error("Could not remove data", zap.Error(err))
- session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Could not remove data"))
+ if len(incoming.Keys) == 0 {
+ session.Send(ErrorMessageRuntimeException(envelope.CollationId, "At least one remove key is required"))
return
}
- errorMessage := "Could not remove data"
-
- defer func() {
- if err != nil {
- logger.Error("Could not remove data", zap.Error(err))
- err = tx.Rollback()
- if err != nil {
- logger.Error("Could not rollback transaction", zap.Error(err))
- }
-
- session.Send(ErrorMessageRuntimeException(envelope.CollationId, errorMessage))
- } else {
- err = tx.Commit()
- if err != nil {
- logger.Error("Could not commit transaction", zap.Error(err))
- session.Send(ErrorMessageRuntimeException(envelope.CollationId, errorMessage))
- } else {
- logger.Info("Removed data successfully")
- session.Send(&Envelope{CollationId: envelope.CollationId})
- }
- }
- }()
-
- updatedAt := nowMs()
-
- for _, key := range incoming.Keys {
- var res sql.Result
-
- if key.Bucket == "" {
- errorMessage = "Bucket value is empty"
- err = errors.New(errorMessage)
- return
- } else if key.Collection == "" {
- errorMessage = "Collection value is empty"
- err = errors.New(errorMessage)
- return
- } else if key.Record == "" {
- errorMessage = "Record value is empty"
- err = errors.New(errorMessage)
- return
- }
-
- if key.Version != nil {
- query := `
-UPDATE storage SET deleted_at = $1, updated_at = $1
-WHERE bucket = $2 AND collection = $3 AND record = $4 AND user_id = $5 AND version = $6 AND deleted_at = 0 AND write = 1`
- res, err = tx.Exec(query, updatedAt, key.Bucket, key.Collection, key.Record, session.userID.Bytes(), key.Version)
- } else {
- query := `
-UPDATE storage SET deleted_at = $1, updated_at = $1
-WHERE bucket = $2 AND collection = $3 AND record = $4 AND user_id = $5 AND deleted_at = 0 AND write = 1`
- res, err = tx.Exec(query, updatedAt, key.Bucket, key.Collection, key.Record, session.userID.Bytes())
- }
-
- if err != nil {
- return
+ keys := make([]*StorageKey, len(incoming.Keys))
+ for i, key := range incoming.Keys {
+ keys[i] = &StorageKey{
+ Bucket: key.Bucket,
+ Collection: key.Collection,
+ Record: key.Record,
+ UserId: session.userID.Bytes(),
+ Version: key.Version,
}
+ }
- rowsAffected, _ := res.RowsAffected()
- logger.Info("Soft deleted record sent as part of an uncommitted transaction",
- zap.Int64("count", rowsAffected),
- zap.String("bucket", key.Bucket),
- zap.String("collection", key.Collection),
- zap.String("record", key.Record),
- zap.String("version", string(key.Version)))
+ code, err := StorageRemove(logger, p.db, session.userID, keys)
+ if err != nil {
+ session.Send(ErrorMessage(envelope.CollationId, code, err.Error()))
+ return
}
session.Send(&Envelope{CollationId: envelope.CollationId})
diff --git a/server/pipeline_user.go b/server/pipeline_user.go
index c03310b7239b2712d76f7de3a70aca42a1f6c28a..69588993cd980542ea422b1dc25ca209d6b68571 100644
--- a/server/pipeline_user.go
+++ b/server/pipeline_user.go
@@ -14,43 +14,33 @@
package server
-import (
- "strconv"
- "strings"
-
- "github.com/satori/go.uuid"
- "go.uber.org/zap"
-)
+import "go.uber.org/zap"
func (p *pipeline) usersFetch(logger *zap.Logger, session *session, envelope *Envelope) {
- userIds := envelope.GetUsersFetch().UserIds
- if len(userIds) == 0 {
- session.Send(ErrorMessageBadInput(envelope.CollationId, "List must contain at least one user ID"))
- return
- }
+ f := envelope.GetUsersFetch()
- statements := make([]string, 0)
- params := make([]interface{}, 0)
+ var users []*User
+ var err error
- counter := 1
- for _, uid := range userIds {
- userID, err := uuid.FromBytes(uid)
- if err == nil {
- statement := "$" + strconv.Itoa(counter)
- counter += 1
- statements = append(statements, statement)
- params = append(params, userID.Bytes())
+ switch f.Set.(type) {
+ case *TUsersFetch_UserIds_:
+ userIds := f.GetUserIds().UserIds
+ if len(userIds) == 0 {
+ session.Send(ErrorMessageBadInput(envelope.CollationId, "List must contain at least one user ID"))
+ return
}
+ users, err = UsersFetchIds(logger, p.db, userIds)
+ case *TUsersFetch_Handles_:
+ handles := f.GetHandles().Handles
+ if len(handles) == 0 {
+ session.Send(ErrorMessageBadInput(envelope.CollationId, "List must contain at least one handle"))
+ return
+ }
+ users, err = UsersFetchHandle(logger, p.db, handles)
}
- if len(statements) == 0 {
- session.Send(ErrorMessageBadInput(envelope.CollationId, "No valid user IDs received"))
- return
- }
-
- query := "WHERE users.id IN (" + strings.Join(statements, ", ") + ")"
- users, err := p.querySocialGraph(logger, query, params)
if err != nil {
+ logger.Warn("Could not retrieve users", zap.Error(err))
session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Could not retrieve users"))
return
}
diff --git a/server/runtime.go b/server/runtime.go
new file mode 100644
index 0000000000000000000000000000000000000000..ab638ed46bc1c0d31e434650d254353fe4aff38b
--- /dev/null
+++ b/server/runtime.go
@@ -0,0 +1,317 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "os"
+ "path/filepath"
+
+ "errors"
+
+ "strings"
+
+ "database/sql"
+
+ "github.com/satori/go.uuid"
+ "github.com/yuin/gopher-lua"
+ "go.uber.org/zap"
+ "golang.org/x/net/context"
+)
+
+const (
+ __nakamaReturnValue = "__nakama_return_flag__"
+)
+
+type BuiltinModule interface {
+ Loader(l *lua.LState) int
+}
+
+type Runtime struct {
+ logger *zap.Logger
+ vm *lua.LState
+ luaEnv *lua.LTable
+}
+
+func NewRuntime(logger *zap.Logger, multiLogger *zap.Logger, db *sql.DB, config *RuntimeConfig) (*Runtime, error) {
+ if err := os.MkdirAll(config.Path, os.ModePerm); err != nil {
+ return nil, err
+ }
+
+ // override before Package library is invoked.
+ lua.LuaLDir = config.Path
+ lua.LuaPathDefault = lua.LuaLDir + "/?.lua;" + lua.LuaLDir + "/?/init.lua"
+ os.Setenv(lua.LuaPath, lua.LuaPathDefault)
+
+ vm := lua.NewState(lua.Options{
+ CallStackSize: 1024,
+ RegistrySize: 1024,
+ SkipOpenLibs: true,
+ IncludeGoStackTrace: true,
+ })
+
+ stdLibs := map[string]lua.LGFunction{
+ lua.LoadLibName: lua.OpenPackage,
+ lua.BaseLibName: lua.OpenBase,
+ lua.TabLibName: lua.OpenTable,
+ lua.OsLibName: OpenOs,
+ lua.StringLibName: lua.OpenString,
+ lua.MathLibName: lua.OpenMath,
+ }
+ for name, lib := range stdLibs {
+ vm.Push(vm.NewFunction(lib))
+ vm.Push(lua.LString(name))
+ vm.Call(1, 0)
+ }
+
+ nakamaModule := NewNakamaModule(logger, db, vm)
+ vm.PreloadModule("nakama", nakamaModule.Loader)
+ nakamaxModule := NewNakamaxModule(logger)
+ vm.PreloadModule("nakamax", nakamaxModule.Loader)
+
+ r := &Runtime{
+ logger: logger,
+ vm: vm,
+ luaEnv: ConvertMap(vm, config.Environment),
+ }
+
+ logger.Info("Initialising modules", zap.String("path", lua.LuaLDir))
+ modules := make([]string, 0)
+ err := filepath.Walk(lua.LuaLDir, func(path string, f os.FileInfo, err error) error {
+ if err != nil {
+ logger.Error("Could not read module", zap.Error(err))
+ return err
+ } else if !f.IsDir() {
+ if strings.ToLower(filepath.Ext(path)) == ".lua" {
+ modules = append(modules, path)
+ }
+ }
+ return nil
+ })
+ if err != nil {
+ logger.Error("Failed to list modules", zap.Error(err))
+ return nil, err
+ }
+
+ multiLogger.Info("Evaluating modules", zap.Int("count", len(modules)), zap.Strings("modules", modules))
+ if err = r.loadModules(lua.LuaLDir, modules); err != nil {
+ return nil, err
+ }
+ multiLogger.Info("Modules loaded")
+
+ return r, nil
+}
+
+func (r *Runtime) loadModules(luaPath string, modules []string) error {
+ // `DoFile(..)` only parses and evaluates modules. Calling it multiple times, will load and eval the file multiple times.
+ // So to make sure that we only load and evaluate modules once, regardless of whether there is dependency between files, we load them all into `preload`.
+ // This is to make sure that modules are only loaded and evaluated once as `doFile()` does not (always) update _LOADED table.
+ // Bear in mind two separate thoughts around the script runtime design choice:
+ //
+ // 1) This is only a problem if one module is dependent on another module.
+ // This means that the global functions are evaluated once at system startup and then later on when the module is required through `require`.
+ // We circumvent this by checking the _LOADED table to check if `require` had evaluated the module and avoiding double-eval.
+ //
+ // 2) Second item is that modules must be pre-loaded into the state for callback-func eval to work properly (in case of HTTP/RPC/etc invokes)
+ // So we need to always load the modules into the system via `preload` so that they are always available in the LState.
+ // We can't rely on `require` to have seen the module in case there is no dependency between the modules.
+
+ //for _, mod := range r.modules {
+ // relPath, _ := filepath.Rel(r.luaPath, mod)
+ // moduleName := strings.TrimSuffix(relPath, filepath.Ext(relPath))
+ //
+ // // check to see if this module was loaded by `require` before executing it
+ // loaded := l.GetField(l.Get(lua.RegistryIndex), "_LOADED")
+ // lv := l.GetField(loaded, moduleName)
+ // if lua.LVAsBool(lv) {
+ // // Already evaluated module via `require(..)`
+ // continue
+ // }
+ //
+ // if err = l.DoFile(mod); err != nil {
+ // failedModules++
+ // r.logger.Error("Failed to evaluate module - skipping", zap.String("path", mod), zap.Error(err))
+ // }
+ //}
+
+ preload := r.vm.GetField(r.vm.GetField(r.vm.Get(lua.EnvironIndex), "package"), "preload")
+ fns := make(map[string]*lua.LFunction)
+ for _, path := range modules {
+ f, err := r.vm.LoadFile(path)
+ if err != nil {
+ r.logger.Error("Could not load module", zap.String("name", path), zap.Error(err))
+ return err
+ } else {
+ relPath, _ := filepath.Rel(luaPath, path)
+ moduleName := strings.TrimSuffix(relPath, filepath.Ext(relPath))
+ moduleName = strings.Replace(moduleName, "/", ".", -1) //make paths Lua friendly
+ r.vm.SetField(preload, moduleName, f)
+ fns[moduleName] = f
+ }
+ }
+
+ for name, fn := range fns {
+ loaded := r.vm.GetField(r.vm.Get(lua.RegistryIndex), "_LOADED")
+ lv := r.vm.GetField(loaded, name)
+ if lua.LVAsBool(lv) {
+ // Already evaluated module via `require(..)`
+ continue
+ }
+
+ r.vm.Push(fn)
+ fnErr := r.vm.PCall(0, -1, nil)
+ if fnErr != nil {
+ r.logger.Error("Could not complete runtime invocation", zap.Error(fnErr))
+ return fnErr
+ }
+ }
+
+ return nil
+}
+
+func (r *Runtime) NewStateThread() (*lua.LState, context.CancelFunc) {
+ return r.vm.NewThread()
+}
+
+func (r *Runtime) GetRuntimeCallback(e ExecutionMode, key string) *lua.LFunction {
+ k := strings.ToLower(key)
+ cp := r.vm.Context().Value(CALLBACKS).(*Callbacks)
+ switch e {
+ case HTTP:
+ return cp.HTTP[k]
+ case RPC:
+ return cp.RPC[k]
+ case BEFORE:
+ return cp.Before[k]
+ case AFTER:
+ return cp.After[k]
+ }
+
+ return nil
+}
+
+func (r *Runtime) InvokeFunctionRPC(fn *lua.LFunction, uid uuid.UUID, handle string, sessionExpiry int64, payload []byte) ([]byte, error) {
+ l, _ := r.NewStateThread()
+ defer l.Close()
+
+ ctx := NewLuaContext(l, r.luaEnv, RPC, uid, handle, sessionExpiry)
+ var lv lua.LValue
+ if payload != nil {
+ lv = lua.LString(payload)
+ }
+
+ retValue, err := r.invokeFunction(l, fn, ctx, lv)
+ if err != nil {
+ return nil, err
+ }
+
+ if retValue == nil || retValue == lua.LNil {
+ return nil, nil
+ } else if retValue.Type() == lua.LTString {
+ return []byte(retValue.String()), nil
+ }
+
+ return nil, errors.New("Runtime function returned invalid data. Only allowed one return value of type String/Byte")
+}
+
+func (r *Runtime) InvokeFunctionBefore(fn *lua.LFunction, uid uuid.UUID, handle string, sessionExpiry int64, payload map[string]interface{}) (map[string]interface{}, error) {
+ l, _ := r.NewStateThread()
+ defer l.Close()
+
+ ctx := NewLuaContext(l, r.luaEnv, BEFORE, uid, handle, sessionExpiry)
+ var lv lua.LValue
+ if payload != nil {
+ lv = ConvertMap(l, payload)
+ }
+
+ retValue, err := r.invokeFunction(l, fn, ctx, lv)
+ if err != nil {
+ return nil, err
+ }
+
+ if retValue == nil || retValue == lua.LNil {
+ return nil, nil
+ } else if retValue.Type() == lua.LTTable {
+ return ConvertLuaTable(retValue.(*lua.LTable)), nil
+ }
+
+ return nil, errors.New("Runtime function returned invalid data. Only allowed one return value of type Table")
+}
+
+func (r *Runtime) InvokeFunctionAfter(fn *lua.LFunction, uid uuid.UUID, handle string, sessionExpiry int64, payload map[string]interface{}) error {
+ l, _ := r.NewStateThread()
+ defer l.Close()
+
+ ctx := NewLuaContext(l, r.luaEnv, BEFORE, uid, handle, sessionExpiry)
+ var lv lua.LValue
+ if payload != nil {
+ lv = ConvertMap(l, payload)
+ }
+
+ _, err := r.invokeFunction(l, fn, ctx, lv)
+ return err
+}
+
+func (r *Runtime) InvokeFunctionHTTP(fn *lua.LFunction, uid uuid.UUID, handle string, sessionExpiry int64, payload map[string]interface{}) (map[string]interface{}, error) {
+ l, _ := r.NewStateThread()
+ defer l.Close()
+
+ ctx := NewLuaContext(l, r.luaEnv, HTTP, uid, handle, sessionExpiry)
+ var lv lua.LValue
+ if payload != nil {
+ lv = ConvertMap(l, payload)
+ }
+
+ retValue, err := r.invokeFunction(l, fn, ctx, lv)
+ if err != nil {
+ return nil, err
+ }
+
+ if retValue == nil || retValue == lua.LNil {
+ return nil, nil
+ } else if retValue.Type() == lua.LTTable {
+ return ConvertLuaTable(retValue.(*lua.LTable)), nil
+ }
+
+ return nil, errors.New("Runtime function returned invalid data. Only allowed one return value of type Table")
+}
+
+func (r *Runtime) invokeFunction(l *lua.LState, fn *lua.LFunction, ctx *lua.LTable, payload lua.LValue) (lua.LValue, error) {
+ l.Push(lua.LString(__nakamaReturnValue))
+ l.Push(fn)
+
+ nargs := 1
+ l.Push(ctx)
+
+ if payload != nil {
+ nargs = 2
+ l.Push(payload)
+ }
+
+ err := l.PCall(nargs, lua.MultRet, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ retValue := l.Get(-1)
+ if retValue.Type() == lua.LTString && lua.LVAsString(retValue) == __nakamaReturnValue {
+ return nil, nil
+ }
+
+ return retValue, nil
+}
+
+func (r *Runtime) Stop() {
+ r.vm.Close()
+}
diff --git a/server/runtime_lua_context.go b/server/runtime_lua_context.go
new file mode 100644
index 0000000000000000000000000000000000000000..bd66f11fd4284eb0d3bcf14e9a8e53e9330eb6e5
--- /dev/null
+++ b/server/runtime_lua_context.go
@@ -0,0 +1,162 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "fmt"
+
+ "github.com/satori/go.uuid"
+ "github.com/yuin/gopher-lua"
+)
+
+type ExecutionMode int
+
+const (
+ RPC ExecutionMode = iota
+ BEFORE
+ AFTER
+ HTTP
+ JOB
+ LEADERBOARD_RESET
+)
+
+func (e ExecutionMode) String() string {
+ switch e {
+ case HTTP:
+ return "http"
+ case RPC:
+ return "rpc"
+ case BEFORE:
+ return "before"
+ case AFTER:
+ return "after"
+ case JOB:
+ return "job"
+ case LEADERBOARD_RESET:
+ return "leaderboard_reset"
+ }
+
+ return ""
+}
+
+const (
+ __CTX_ENV = "env"
+ __CTX_MODE = "execution_mode"
+ __CTX_USER_ID = "user_id"
+ __CTX_USER_HANDLE = "user_handle"
+ __CTX_USER_SESSION_EXP = "user_session_exp"
+)
+
+func NewLuaContext(l *lua.LState, env *lua.LTable, mode ExecutionMode, uid uuid.UUID, handle string, sessionExpiry int64) *lua.LTable {
+ lt := l.NewTable()
+ lt.RawSetString(__CTX_ENV, env)
+ lt.RawSetString(__CTX_MODE, lua.LString(mode.String()))
+
+ if uid != uuid.Nil {
+ lt.RawSetString(__CTX_USER_ID, lua.LString(uid.String()))
+ lt.RawSetString(__CTX_USER_HANDLE, lua.LString(handle))
+ lt.RawSetString(__CTX_USER_SESSION_EXP, lua.LNumber(sessionExpiry))
+ }
+
+ return lt
+}
+
+func ConvertMap(l *lua.LState, data map[string]interface{}) *lua.LTable {
+ lt := l.NewTable()
+ for k, v := range data {
+ lt.RawSetString(k, convertValue(l, v))
+ }
+
+ return lt
+}
+
+func ConvertLuaTable(lv *lua.LTable) map[string]interface{} {
+ returnData, _ := convertLuaValue(lv).(map[string]interface{})
+ return returnData
+}
+
+func convertValue(l *lua.LState, val interface{}) lua.LValue {
+ if val == nil {
+ return lua.LNil
+ }
+
+ // types looked up from
+ // https://golang.org/pkg/encoding/json/#Unmarshal
+ // https://developers.google.com/protocol-buffers/docs/proto3#scalar
+ switch v := val.(type) {
+ case bool:
+ return lua.LBool(v)
+ case string:
+ return lua.LString(v)
+ case []byte:
+ return lua.LString(v)
+ case float32:
+ return lua.LNumber(v)
+ case float64:
+ return lua.LNumber(v)
+ case int:
+ return lua.LNumber(v)
+ case int32:
+ return lua.LNumber(v)
+ case int64:
+ return lua.LNumber(v)
+ case uint32:
+ return lua.LNumber(v)
+ case uint64:
+ return lua.LNumber(v)
+ case map[string]interface{}:
+ return ConvertMap(l, v)
+ case []interface{}:
+ lt := l.NewTable()
+ for k, v := range v {
+ lt.RawSetInt(k+1, convertValue(l, v))
+ }
+ return lt
+ default:
+ return nil
+ }
+}
+
+func convertLuaValue(lv lua.LValue) interface{} {
+ // taken from https://github.com/yuin/gluamapper/blob/master/gluamapper.go#L79
+ switch v := lv.(type) {
+ case *lua.LNilType:
+ return nil
+ case lua.LBool:
+ return bool(v)
+ case lua.LString:
+ return string(v)
+ case lua.LNumber:
+ return float64(v)
+ case *lua.LTable:
+ maxn := v.MaxN()
+ if maxn == 0 { // table
+ ret := make(map[string]interface{})
+ v.ForEach(func(key, value lua.LValue) {
+ keystr := fmt.Sprint(convertLuaValue(key))
+ ret[keystr] = convertLuaValue(value)
+ })
+ return ret
+ } else { // array
+ ret := make([]interface{}, 0, maxn)
+ for i := 1; i <= maxn; i++ {
+ ret = append(ret, convertLuaValue(v.RawGetInt(i)))
+ }
+ return ret
+ }
+ default:
+ return v
+ }
+}
diff --git a/server/runtime_nakama_module.go b/server/runtime_nakama_module.go
new file mode 100644
index 0000000000000000000000000000000000000000..3ea7387b98c13d0f1b74a96bd3faefe9d791ef29
--- /dev/null
+++ b/server/runtime_nakama_module.go
@@ -0,0 +1,501 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "context"
+
+ "strings"
+
+ "database/sql"
+
+ "fmt"
+
+ "encoding/json"
+
+ "github.com/fatih/structs"
+ "github.com/satori/go.uuid"
+ "github.com/yuin/gopher-lua"
+ "go.uber.org/zap"
+)
+
+const CALLBACKS = "runtime_callbacks"
+
+type Callbacks struct {
+ HTTP map[string]*lua.LFunction
+ RPC map[string]*lua.LFunction
+ Before map[string]*lua.LFunction
+ After map[string]*lua.LFunction
+}
+
+type NakamaModule struct {
+ logger *zap.Logger
+ db *sql.DB
+}
+
+func NewNakamaModule(logger *zap.Logger, db *sql.DB, l *lua.LState) *NakamaModule {
+ l.SetContext(context.WithValue(context.Background(), CALLBACKS, &Callbacks{
+ RPC: make(map[string]*lua.LFunction),
+ Before: make(map[string]*lua.LFunction),
+ After: make(map[string]*lua.LFunction),
+ HTTP: make(map[string]*lua.LFunction),
+ }))
+ return &NakamaModule{
+ logger: logger,
+ db: db,
+ }
+}
+
+func (n *NakamaModule) Loader(l *lua.LState) int {
+ mod := l.SetFuncs(l.NewTable(), map[string]lua.LGFunction{
+ "logger_info": n.loggerInfo,
+ "logger_warn": n.loggerWarn,
+ "logger_error": n.loggerError,
+ "register_rpc": n.registerRPC,
+ "register_before": n.registerBefore,
+ "register_after": n.registerAfter,
+ "register_http": n.registerHTTP,
+ "user_fetch_id": n.userFetchId,
+ "user_fetch_handle": n.userFetchHandle,
+ "storage_fetch": n.storageFetch,
+ "storage_write": n.storageWrite,
+ "storage_remove": n.storageRemove,
+ "leaderboard_create": n.leaderboardCreate,
+ })
+
+ l.Push(mod)
+ return 1
+}
+
+func (n *NakamaModule) loggerInfo(l *lua.LState) int {
+ message := l.CheckString(1)
+ if message == "" {
+ l.ArgError(1, "expects message string")
+ return 0
+ }
+ n.logger.Info(message)
+ l.Push(lua.LString(message))
+ return 1
+}
+
+func (n *NakamaModule) loggerWarn(l *lua.LState) int {
+ message := l.CheckString(1)
+ if message == "" {
+ l.ArgError(1, "expects message string")
+ return 0
+ }
+ n.logger.Warn(message)
+ l.Push(lua.LString(message))
+ return 1
+}
+
+func (n *NakamaModule) loggerError(l *lua.LState) int {
+ message := l.CheckString(1)
+ if message == "" {
+ l.ArgError(1, "expects message string")
+ return 0
+ }
+ n.logger.Error(message)
+ l.Push(lua.LString(message))
+ return 1
+}
+
+func (n *NakamaModule) registerRPC(l *lua.LState) int {
+ fn := l.CheckFunction(1)
+ id := l.CheckString(2)
+
+ if id == "" {
+ l.ArgError(2, "expects rpc id")
+ return 0
+ }
+
+ id = strings.ToLower(id)
+
+ rc := l.Context().Value(CALLBACKS).(*Callbacks)
+ rc.RPC[id] = fn
+ n.logger.Info("Registered RPC function invocation", zap.String("id", id))
+ return 0
+}
+
+func (n *NakamaModule) registerBefore(l *lua.LState) int {
+ fn := l.CheckFunction(1)
+ messageName := l.CheckString(2)
+
+ if messageName == "" {
+ l.ArgError(2, "expects message name")
+ return 0
+ }
+
+ messageName = strings.ToLower(messageName)
+
+ rc := l.Context().Value(CALLBACKS).(*Callbacks)
+ rc.Before[messageName] = fn
+ n.logger.Info("Registered Before function invocation", zap.String("message", messageName))
+ return 0
+}
+
+func (n *NakamaModule) registerAfter(l *lua.LState) int {
+ fn := l.CheckFunction(1)
+ messageName := l.CheckString(2)
+
+ if messageName == "" {
+ l.ArgError(2, "expects message name")
+ return 0
+ }
+
+ messageName = strings.ToLower(messageName)
+
+ rc := l.Context().Value(CALLBACKS).(*Callbacks)
+ rc.After[messageName] = fn
+ n.logger.Info("Registered After function invocation", zap.String("message", messageName))
+ return 0
+}
+
+func (n *NakamaModule) registerHTTP(l *lua.LState) int {
+ fn := l.CheckFunction(1)
+ path := l.CheckString(2)
+
+ if path == "" {
+ l.ArgError(2, "expects http path")
+ return 0
+ }
+
+ if strings.HasPrefix(path, "/") {
+ l.ArgError(2, "http path should not start with leading slash")
+ return 0
+ }
+
+ path = strings.ToLower(path)
+
+ rc := l.Context().Value(CALLBACKS).(*Callbacks)
+ rc.HTTP[path] = fn
+ n.logger.Info("Registered HTTP function invocation", zap.String("path", path))
+ return 0
+}
+
+func (n *NakamaModule) userFetchId(l *lua.LState) int {
+ lt := l.CheckTable(1)
+ userIds, ok := convertLuaValue(lt).([]interface{})
+ if !ok {
+ l.ArgError(1, "invalid user id data")
+ return 0
+ }
+
+ userIdBytes := make([][]byte, 0)
+ for _, id := range userIds {
+ uid, err := uuid.FromString(id.(string))
+ if err != nil {
+ l.ArgError(1, "invalid user id")
+ return 0
+ }
+ userIdBytes = append(userIdBytes, uid.Bytes())
+ }
+
+ users, err := UsersFetchIds(n.logger, n.db, userIdBytes)
+ if err != nil {
+ l.RaiseError(fmt.Sprintf("failed to retrieve users: %s", err.Error()))
+ return 0
+ }
+
+ //translate uuid to string bytes
+ lv := l.NewTable()
+ for i, u := range users {
+ uid, _ := uuid.FromBytes(u.Id)
+ u.Id = []byte(uid.String())
+ um := structs.Map(u)
+ lv.RawSetInt(i+1, convertValue(l, um))
+ }
+
+ l.Push(lv)
+ return 1
+}
+
+func (n *NakamaModule) userFetchHandle(l *lua.LState) int {
+ lt := l.CheckTable(1)
+ handles, ok := convertLuaValue(lt).([]interface{})
+ if !ok {
+ l.ArgError(1, "invalid user handle data")
+ return 0
+ }
+
+ userHandles := make([]string, 0)
+ for _, h := range handles {
+ if hs, ok := h.(string); !ok {
+ l.ArgError(1, "invalid user handle data, each handle must be a string")
+ return 0
+ } else {
+ userHandles = append(userHandles, hs)
+ }
+ }
+
+ users, err := UsersFetchHandle(n.logger, n.db, userHandles)
+ if err != nil {
+ l.RaiseError(fmt.Sprintf("failed to retrieve users: %s", err.Error()))
+ return 0
+ }
+
+ //translate uuid to string bytes
+ lv := l.NewTable()
+ for i, u := range users {
+ uid, _ := uuid.FromBytes(u.Id)
+ u.Id = []byte(uid.String())
+ um := structs.Map(u)
+ lv.RawSetInt(i+1, convertValue(l, um))
+ }
+
+ l.Push(lv)
+ return 1
+}
+
+func (n *NakamaModule) storageFetch(l *lua.LState) int {
+ keysTable := l.CheckTable(1)
+ if keysTable == nil || keysTable.Len() == 0 {
+ l.ArgError(1, "Expects a valid set of keys")
+ return 0
+ }
+ keysRaw, ok := convertLuaValue(keysTable).([]interface{})
+ if !ok {
+ l.ArgError(1, "Expects a valid set of data")
+ return 0
+ }
+ keyMap := make([]map[string]interface{}, 0)
+ for _, d := range keysRaw {
+ if m, ok := d.(map[string]interface{}); ok {
+ keyMap = append(keyMap, m)
+ } else {
+ l.ArgError(1, "Expects a valid set of data")
+ return 0
+ }
+ }
+
+ keys := make([]*StorageKey, len(keyMap))
+ idx := 0
+ for _, k := range keyMap {
+ var userID []byte
+ if u, ok := k["UserId"]; ok {
+ if us, ok := u.(string); !ok {
+ l.ArgError(1, "Expects valid user IDs in each key, when provided")
+ return 0
+ } else {
+ uid, err := uuid.FromString(us)
+ if err != nil {
+ l.ArgError(1, "Expects valid user IDs in each key, when provided")
+ return 0
+ }
+ userID = uid.Bytes()
+ }
+ }
+
+ keys[idx] = &StorageKey{
+ Bucket: k["Bucket"].(string),
+ Collection: k["Collection"].(string),
+ Record: k["Record"].(string),
+ UserId: userID,
+ }
+ idx++
+ }
+
+ values, _, err := StorageFetch(n.logger, n.db, uuid.Nil, keys)
+ if err != nil {
+ l.RaiseError(fmt.Sprintf("failed to fetch storage: %s", err.Error()))
+ return 0
+ }
+
+ lv := l.NewTable()
+ for i, v := range values {
+ // Convert UUIDs to string representation if needed.
+ if len(v.UserId) != 0 {
+ uid, _ := uuid.FromBytes(v.UserId)
+ v.UserId = []byte(uid.String())
+ }
+ vm := structs.Map(v)
+ lv.RawSetInt(i+1, convertValue(l, vm))
+ }
+
+ l.Push(lv)
+ return 1
+}
+
+func (n *NakamaModule) storageWrite(l *lua.LState) int {
+ dataTable := l.CheckTable(1)
+ if dataTable == nil || dataTable.Len() == 0 {
+ l.ArgError(1, "Expects a valid set of data")
+ return 0
+ }
+ dataRaw, ok := convertLuaValue(dataTable).([]interface{})
+ if !ok {
+ l.ArgError(1, "Expects a valid set of data")
+ return 0
+ }
+ dataMap := make([]map[string]interface{}, 0)
+ for _, d := range dataRaw {
+ if m, ok := d.(map[string]interface{}); ok {
+ dataMap = append(dataMap, m)
+ } else {
+ l.ArgError(1, "Expects a valid set of data")
+ return 0
+ }
+ }
+
+ data := make([]*StorageData, len(dataMap))
+ idx := 0
+ for _, k := range dataMap {
+ var userID []byte
+ if u, ok := k["UserId"]; ok {
+ if us, ok := u.(string); !ok {
+ l.ArgError(1, "Expects valid user IDs in each value, when provided")
+ return 0
+ } else {
+ uid, err := uuid.FromString(us)
+ if err != nil {
+ l.ArgError(1, "Expects valid user IDs in each value, when provided")
+ return 0
+ }
+ userID = uid.Bytes()
+ }
+ }
+ var version []byte
+ if v, ok := k["Version"]; ok {
+ version = []byte(v.(string))
+ }
+
+ readPermission := int64(1)
+ writePermission := int64(1)
+ if r, ok := k["PermissionRead"]; ok {
+ readPermission = r.(int64)
+ }
+ if w, ok := k["PermissionWrite"]; ok {
+ writePermission = w.(int64)
+ }
+ data[idx] = &StorageData{
+ Bucket: k["Bucket"].(string),
+ Collection: k["Collection"].(string),
+ Record: k["Record"].(string),
+ UserId: userID,
+ Value: []byte(k["Value"].(string)),
+ Version: version,
+ PermissionRead: readPermission,
+ PermissionWrite: writePermission,
+ }
+ idx++
+ }
+
+ keys, _, err := StorageWrite(n.logger, n.db, uuid.Nil, data)
+ if err != nil {
+ l.RaiseError(fmt.Sprintf("failed to write storage: %s", err.Error()))
+ return 0
+ }
+
+ lv := l.NewTable()
+ for i, k := range keys {
+ km := structs.Map(k)
+ lv.RawSetInt(i+1, convertValue(l, km))
+ }
+
+ l.Push(lv)
+ return 1
+}
+
+func (n *NakamaModule) storageRemove(l *lua.LState) int {
+ keysTable := l.CheckTable(1)
+ if keysTable == nil || keysTable.Len() == 0 {
+ l.ArgError(1, "Expects a valid set of keys")
+ return 0
+ }
+ keysRaw, ok := convertLuaValue(keysTable).([]interface{})
+ if !ok {
+ l.ArgError(1, "Expects a valid set of data")
+ return 0
+ }
+ keyMap := make([]map[string]interface{}, 0)
+ for _, d := range keysRaw {
+ if m, ok := d.(map[string]interface{}); ok {
+ keyMap = append(keyMap, m)
+ } else {
+ l.ArgError(1, "Expects a valid set of data")
+ return 0
+ }
+ }
+
+ keys := make([]*StorageKey, len(keyMap))
+ idx := 0
+ for _, k := range keyMap {
+ var userID []byte
+ if u, ok := k["UserId"]; ok {
+ if us, ok := u.(string); !ok {
+ l.ArgError(1, "Expects valid user IDs in each key, when provided")
+ return 0
+ } else {
+ uid, err := uuid.FromString(us)
+ if err != nil {
+ l.ArgError(1, "Expects valid user IDs in each key, when provided")
+ return 0
+ }
+ userID = uid.Bytes()
+ }
+ }
+ var version []byte
+ if v, ok := k["Version"]; ok {
+ version = []byte(v.(string))
+ }
+ keys[idx] = &StorageKey{
+ Bucket: k["Bucket"].(string),
+ Collection: k["Collection"].(string),
+ Record: k["Record"].(string),
+ UserId: userID,
+ Version: version,
+ }
+ idx++
+ }
+
+ if _, err := StorageRemove(n.logger, n.db, uuid.Nil, keys); err != nil {
+ l.RaiseError(fmt.Sprintf("failed to remove storage: %s", err.Error()))
+ }
+ return 0
+}
+
+func (n *NakamaModule) leaderboardCreate(l *lua.LState) int {
+ id := l.CheckString(1)
+ sort := l.CheckString(2)
+ reset := l.OptString(3, "")
+ metadata := l.OptTable(4, l.NewTable())
+ authoritative := l.OptBool(5, false)
+
+ leaderboardId, err := uuid.FromString(id)
+ if err != nil {
+ l.ArgError(1, "invalid leaderboard id")
+ return 0
+ }
+
+ if sort != "asc" && sort != "desc" {
+ l.ArgError(2, "invalid sort - only acceptable values are 'asc' and 'desc'")
+ return 0
+ }
+
+ metadataMap := ConvertLuaTable(metadata)
+ metadataBytes, err := json.Marshal(metadataMap)
+ if err != nil {
+ l.RaiseError(fmt.Sprintf("failed to convert metadata: %s", err.Error()))
+ return 0
+ }
+
+ _, err = createLeaderboard(n.logger, n.db, leaderboardId.String(), sort, reset, string(metadataBytes), authoritative)
+ if err != nil {
+ l.RaiseError(fmt.Sprintf("failed to create leaderboard: %s", err.Error()))
+ return 0
+ }
+
+ return 0
+}
diff --git a/server/runtime_nakamax_module.go b/server/runtime_nakamax_module.go
new file mode 100644
index 0000000000000000000000000000000000000000..d62651449de4bbb56a164a45969de570485e2d13
--- /dev/null
+++ b/server/runtime_nakamax_module.go
@@ -0,0 +1,161 @@
+// Copyright 2017 The Nakama Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package server
+
+import (
+ "encoding/json"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/satori/go.uuid"
+ "github.com/yuin/gopher-lua"
+ "go.uber.org/zap"
+)
+
+type NakamaxModule struct {
+ logger *zap.Logger
+ client *http.Client
+}
+
+func NewNakamaxModule(logger *zap.Logger) *NakamaxModule {
+ return &NakamaxModule{
+ logger: logger,
+ client: &http.Client{
+ Timeout: 5 * time.Second,
+ },
+ }
+}
+
+func (nx *NakamaxModule) Loader(l *lua.LState) int {
+ mod := l.SetFuncs(l.NewTable(), map[string]lua.LGFunction{
+ "uuid_v4": nx.uuidV4,
+ "json_encode": nx.jsonEncode,
+ "json_decode": nx.jsonDecode,
+ "http_request": nx.httpRequest,
+ })
+
+ l.Push(mod)
+ return 1
+}
+
+func (nx *NakamaxModule) uuidV4(l *lua.LState) int {
+ // TODO ensure there were no arguments to the function
+ l.Push(lua.LString(uuid.NewV4().String()))
+ return 1
+}
+
+func (nx *NakamaxModule) jsonEncode(l *lua.LState) int {
+ // TODO allow top-level arrays or primitives?
+ jsonTable := l.CheckTable(1)
+ if jsonTable == nil {
+ l.ArgError(1, "Expects a table to encode")
+ return 0
+ }
+
+ jsonData := ConvertLuaTable(jsonTable)
+ jsonBytes, err := json.Marshal(jsonData)
+ if err != nil {
+ l.ArgError(1, "Error encoding to JSON")
+ return 0
+ }
+
+ l.Push(lua.LString(string(jsonBytes)))
+ return 1
+}
+
+func (nx *NakamaxModule) jsonDecode(l *lua.LState) int {
+ jsonString := l.CheckString(1)
+ if jsonString == "" {
+ l.ArgError(1, "Expects JSON string")
+ return 0
+ }
+
+ // TODO allow top-level arrays or primitives?
+ var jsonData map[string]interface{}
+ if err := json.Unmarshal([]byte(jsonString), &jsonData); err != nil {
+ l.RaiseError("Not a valid JSON string: %v", err.Error())
+ return 0
+ }
+
+ l.Push(ConvertMap(l, jsonData))
+ return 1
+}
+
+func (nx *NakamaxModule) httpRequest(l *lua.LState) int {
+ url := l.CheckString(1)
+ method := l.CheckString(2)
+ headers := l.CheckTable(3)
+ body := l.OptString(4, "")
+ if url == "" {
+ l.ArgError(1, "Expects URL string")
+ return 0
+ }
+ if method == "" {
+ l.ArgError(2, "Expects method string")
+ return 0
+ }
+
+ // Prepare request body, if any.
+ var requestBody io.Reader
+ if body != "" {
+ requestBody = strings.NewReader(body)
+ }
+ // Prepare the request.
+ req, err := http.NewRequest(method, url, requestBody)
+ if err != nil {
+ l.RaiseError("HTTP request error: %v", err.Error())
+ return 0
+ }
+ // Apply any request headers.
+ httpHeaders := ConvertLuaTable(headers)
+ for k, v := range httpHeaders {
+ if vs, ok := v.(string); !ok {
+ l.RaiseError("HTTP header values must be strings")
+ return 0
+ } else {
+ req.Header.Add(k, vs)
+ }
+ }
+ // Execute the request.
+ resp, err := nx.client.Do(req)
+ if err != nil {
+ l.RaiseError("HTTP request error: %v", err.Error())
+ return 0
+ }
+ // Read the response body.
+ responseBody, err := ioutil.ReadAll(resp.Body)
+ resp.Body.Close()
+ if err != nil {
+ l.RaiseError("HTTP response body error: %v", err.Error())
+ return 0
+ }
+ // Read the response headers.
+ responseHeaders := make(map[string]interface{}, len(resp.Header))
+ for k, vs := range resp.Header {
+ // TODO accept multiple values per header
+ for _, v := range vs {
+ responseHeaders[k] = v
+ break
+ }
+ }
+
+ l.Push(lua.LNumber(resp.StatusCode))
+ l.Push(ConvertMap(l, responseHeaders))
+ l.Push(lua.LString(string(responseBody)))
+ return 3
+}
diff --git a/server/runtime_oslib.go b/server/runtime_oslib.go
new file mode 100644
index 0000000000000000000000000000000000000000..c0e7ef06c97de4cced72c863e44ed0439c4ebb1e
--- /dev/null
+++ b/server/runtime_oslib.go
@@ -0,0 +1,128 @@
+// The MIT License (MIT)
+//
+// Copyright (c) 2015 Yusuke Inuzuka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package server
+
+import (
+ "github.com/yuin/gopher-lua"
+ "strings"
+ "time"
+)
+
+var startedAt time.Time
+
+func init() {
+ startedAt = time.Now()
+}
+
+func getIntField(L *lua.LState, tb *lua.LTable, key string, v int) int {
+ ret := tb.RawGetString(key)
+ if ln, ok := ret.(lua.LNumber); ok {
+ return int(ln)
+ }
+ return v
+}
+
+func getBoolField(L *lua.LState, tb *lua.LTable, key string, v bool) bool {
+ ret := tb.RawGetString(key)
+ if lb, ok := ret.(lua.LBool); ok {
+ return bool(lb)
+ }
+ return v
+}
+
+func OpenOs(L *lua.LState) int {
+ osmod := L.RegisterModule(lua.OsLibName, osFuncs)
+ L.Push(osmod)
+ return 1
+}
+
+var osFuncs = map[string]lua.LGFunction{
+ "clock": osClock,
+ "difftime": osDiffTime,
+ "date": osDate,
+ "time": osTime,
+}
+
+func osClock(L *lua.LState) int {
+ L.Push(lua.LNumber(float64(time.Now().Sub(startedAt)) / float64(time.Second)))
+ return 1
+}
+
+func osDiffTime(L *lua.LState) int {
+ L.Push(lua.LNumber(L.CheckInt64(1) - L.CheckInt64(2)))
+ return 1
+}
+
+func osDate(L *lua.LState) int {
+ t := time.Now()
+ cfmt := "%c"
+ if L.GetTop() >= 1 {
+ cfmt = L.CheckString(1)
+ if strings.HasPrefix(cfmt, "!") {
+ t = time.Now().UTC()
+ cfmt = strings.TrimLeft(cfmt, "!")
+ }
+ if L.GetTop() >= 2 {
+ t = time.Unix(L.CheckInt64(2), 0)
+ }
+ if strings.HasPrefix(cfmt, "*t") {
+ ret := L.NewTable()
+ ret.RawSetString("year", lua.LNumber(t.Year()))
+ ret.RawSetString("month", lua.LNumber(t.Month()))
+ ret.RawSetString("day", lua.LNumber(t.Day()))
+ ret.RawSetString("hour", lua.LNumber(t.Hour()))
+ ret.RawSetString("min", lua.LNumber(t.Minute()))
+ ret.RawSetString("sec", lua.LNumber(t.Second()))
+ ret.RawSetString("wday", lua.LNumber(t.Weekday()))
+ // TODO yday & dst
+ ret.RawSetString("yday", lua.LNumber(0))
+ ret.RawSetString("isdst", lua.LFalse)
+ L.Push(ret)
+ return 1
+ }
+ }
+ L.Push(lua.LString(strftime(t, cfmt)))
+ return 1
+}
+
+func osTime(L *lua.LState) int {
+ if L.GetTop() == 0 {
+ L.Push(lua.LNumber(time.Now().Unix()))
+ } else {
+ tbl := L.CheckTable(1)
+ sec := getIntField(L, tbl, "sec", 0)
+ min := getIntField(L, tbl, "min", 0)
+ hour := getIntField(L, tbl, "hour", 12)
+ day := getIntField(L, tbl, "day", -1)
+ month := getIntField(L, tbl, "month", -1)
+ year := getIntField(L, tbl, "year", -1)
+ isdst := getBoolField(L, tbl, "isdst", false)
+ t := time.Date(year, time.Month(month), day, hour, min, sec, 0, time.Local)
+ // TODO dst
+ if false {
+ print(isdst)
+ }
+ L.Push(lua.LNumber(t.Unix()))
+ }
+ return 1
+}
diff --git a/server/runtime_utils.go b/server/runtime_utils.go
new file mode 100644
index 0000000000000000000000000000000000000000..2a984595b678bb778dca182ee489fa82d6ebd2e6
--- /dev/null
+++ b/server/runtime_utils.go
@@ -0,0 +1,111 @@
+// The MIT License (MIT)
+//
+// Copyright (c) 2015 Yusuke Inuzuka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in all
+// copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+// SOFTWARE.
+
+package server
+
+import (
+ "fmt"
+ "time"
+)
+
+type flagScanner struct {
+ flag byte
+ start string
+ end string
+ buf []byte
+ str string
+ Length int
+ Pos int
+ HasFlag bool
+ ChangeFlag bool
+}
+
+func newFlagScanner(flag byte, start, end, str string) *flagScanner {
+ return &flagScanner{flag, start, end, make([]byte, 0, len(str)), str, len(str), 0, false, false}
+}
+
+func (fs *flagScanner) AppendString(str string) { fs.buf = append(fs.buf, str...) }
+
+func (fs *flagScanner) AppendChar(ch byte) { fs.buf = append(fs.buf, ch) }
+
+func (fs *flagScanner) String() string { return string(fs.buf) }
+
+func (fs *flagScanner) Next() (byte, bool) {
+ c := byte('\000')
+ fs.ChangeFlag = false
+ if fs.Pos == fs.Length {
+ if fs.HasFlag {
+ fs.AppendString(fs.end)
+ }
+ return c, true
+ } else {
+ c = fs.str[fs.Pos]
+ if c == fs.flag {
+ if fs.Pos < (fs.Length-1) && fs.str[fs.Pos+1] == fs.flag {
+ fs.HasFlag = false
+ fs.AppendChar(fs.flag)
+ fs.Pos += 2
+ return fs.Next()
+ } else if fs.Pos != fs.Length-1 {
+ if fs.HasFlag {
+ fs.AppendString(fs.end)
+ }
+ fs.AppendString(fs.start)
+ fs.ChangeFlag = true
+ fs.HasFlag = true
+ }
+ }
+ }
+ fs.Pos++
+ return c, false
+}
+
+var cDateFlagToGo = map[byte]string{
+ 'a': "mon", 'A': "Monday", 'b': "Jan", 'B': "January", 'c': "02 Jan 06 15:04 MST", 'd': "02",
+ 'F': "2006-01-02", 'H': "15", 'I': "03", 'm': "01", 'M': "04", 'p': "PM", 'P': "pm", 'S': "05",
+ 'x': "15/04/05", 'X': "15:04:05", 'y': "06", 'Y': "2006", 'z': "-0700", 'Z': "MST"}
+
+func strftime(t time.Time, cfmt string) string {
+ sc := newFlagScanner('%', "", "", cfmt)
+ for c, eos := sc.Next(); !eos; c, eos = sc.Next() {
+ if !sc.ChangeFlag {
+ if sc.HasFlag {
+ if v, ok := cDateFlagToGo[c]; ok {
+ sc.AppendString(t.Format(v))
+ } else {
+ switch c {
+ case 'w':
+ sc.AppendString(fmt.Sprint(int(t.Weekday())))
+ default:
+ sc.AppendChar('%')
+ sc.AppendChar(c)
+ }
+ }
+ sc.HasFlag = false
+ } else {
+ sc.AppendChar(c)
+ }
+ }
+ }
+
+ return sc.String()
+}
diff --git a/server/session.go b/server/session.go
index 91952aa81a70ed8663029ef04ee984c67f86f1e6..3b72ec7b79450e02e0aedf3e849e96b06d3c68b3 100644
--- a/server/session.go
+++ b/server/session.go
@@ -36,6 +36,7 @@ type session struct {
userID uuid.UUID
handle *atomic.String
lang string
+ expiry int64
stopped bool
conn *websocket.Conn
pingTicker *time.Ticker
@@ -44,7 +45,7 @@ type session struct {
}
// NewSession creates a new session which encapsulates a socket connection
-func NewSession(logger *zap.Logger, config Config, userID uuid.UUID, handle string, lang string, websocketConn *websocket.Conn, unregister func(s *session)) *session {
+func NewSession(logger *zap.Logger, config Config, userID uuid.UUID, handle string, lang string, expiry int64, websocketConn *websocket.Conn, unregister func(s *session)) *session {
sessionID := uuid.NewV4()
sessionLogger := logger.With(zap.String("uid", userID.String()), zap.String("sid", sessionID.String()))
@@ -57,6 +58,7 @@ func NewSession(logger *zap.Logger, config Config, userID uuid.UUID, handle stri
userID: userID,
handle: atomic.NewString(handle),
lang: lang,
+ expiry: expiry,
conn: websocketConn,
stopped: false,
pingTicker: time.NewTicker(time.Duration(config.GetTransport().PingPeriodMs) * time.Millisecond),
diff --git a/server/session_auth.go b/server/session_auth.go
index 387ad46b4a97047b3b3a818cb7c5676bef55a314..9910b981807f9090ee744f7c435c7118b12ba460 100644
--- a/server/session_auth.go
+++ b/server/session_auth.go
@@ -15,21 +15,22 @@
package server
import (
+ "bytes"
"database/sql"
+ "encoding/json"
"fmt"
+ "io"
"io/ioutil"
"math/rand"
+ "mime"
"net/http"
"regexp"
"strconv"
+ "strings"
"time"
"nakama/pkg/social"
- "bytes"
- "mime"
- "strings"
-
"github.com/dgrijalva/jwt-go"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
@@ -63,6 +64,7 @@ type authenticationService struct {
statsService StatsService
registry *SessionRegistry
pipeline *pipeline
+ runtime *Runtime
mux *mux.Router
hmacSecretByte []byte
upgrader *websocket.Upgrader
@@ -73,24 +75,23 @@ type authenticationService struct {
}
// NewAuthenticationService creates a new AuthenticationService
-func NewAuthenticationService(logger *zap.Logger, config Config, db *sql.DB, statService StatsService, registry *SessionRegistry, tracker Tracker, matchmaker Matchmaker, messageRouter MessageRouter) *authenticationService {
- s := social.NewClient(5 * time.Second)
- p := NewPipeline(config, db, s, tracker, matchmaker, messageRouter, registry)
+func NewAuthenticationService(logger *zap.Logger, config Config, db *sql.DB, statService StatsService, registry *SessionRegistry, socialClient *social.Client, pipeline *pipeline, runtime *Runtime) *authenticationService {
a := &authenticationService{
logger: logger,
config: config,
db: db,
statsService: statService,
registry: registry,
- pipeline: p,
+ pipeline: pipeline,
+ runtime: runtime,
+ socialClient: socialClient,
+ random: rand.New(rand.NewSource(time.Now().UnixNano())),
hmacSecretByte: []byte(config.GetSession().EncryptionKey),
upgrader: &websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool { return true },
},
- socialClient: s,
- random: rand.New(rand.NewSource(time.Now().UnixNano())),
jsonpbMarshaler: &jsonpb.Marshaler{
EnumsAsInts: true,
EmitDefaults: false,
@@ -139,7 +140,7 @@ func (a *authenticationService) configure() {
}
token := r.URL.Query().Get("token")
- uid, handle, auth := a.authenticateToken(token)
+ uid, handle, exp, auth := a.authenticateToken(token)
if !auth {
http.Error(w, "Missing or invalid token", 401)
return
@@ -158,8 +159,72 @@ func (a *authenticationService) configure() {
return
}
- a.registry.add(uid, handle, lang, conn, a.pipeline.processRequest)
+ a.registry.add(uid, handle, lang, exp, conn, a.pipeline.processRequest)
}).Methods("GET", "OPTIONS")
+
+ a.mux.HandleFunc("/runtime/{path}", func(w http.ResponseWriter, r *http.Request) {
+ accept := r.Header.Get("accept")
+ if accept != "" && accept != "application/json" {
+ http.Error(w, "Runtime function only accept JSON data", 400)
+ return
+ }
+
+ contentType := r.Header.Get("content-type")
+ if contentType != "" && contentType != "application/json" {
+ http.Error(w, "Runtime function expects JSON data", 400)
+ return
+ }
+
+ key := r.URL.Query().Get("key")
+ if key != a.config.GetRuntime().HTTPKey {
+ http.Error(w, "Invalid runtime key", 401)
+ return
+ }
+
+ if r.Method == "OPTIONS" {
+ //TODO(mo): Do we need to return non-200 for path that don't exist?
+ return
+ }
+
+ path := mux.Vars(r)["path"]
+ fn := a.runtime.GetRuntimeCallback(HTTP, path)
+ if fn == nil {
+ a.logger.Warn("HTTP invocation failed as path was not found", zap.String("path", path))
+ http.Error(w, "Runtime function could not be invoked. Path not found.", 404)
+ return
+ }
+
+ payload := make(map[string]interface{})
+ defer r.Body.Close()
+ err := json.NewDecoder(r.Body).Decode(&payload)
+ switch {
+ case err == io.EOF:
+ payload = nil
+ case err != nil:
+ a.logger.Error("Could not decode request data", zap.Error(err))
+ http.Error(w, "Bad request data", 400)
+ return
+ }
+
+ responseData, funError := a.runtime.InvokeFunctionHTTP(fn, uuid.Nil, "", 0, payload)
+ if funError != nil {
+ a.logger.Error("Runtime function caused an error", zap.String("path", path), zap.Error(funError))
+ http.Error(w, fmt.Sprintf("Runtime function caused an error: %s", funError.Error()), 500)
+ return
+ }
+
+ responseBytes, err := json.Marshal(responseData)
+ if err != nil {
+ a.logger.Error("Could not marshal function response data", zap.Error(err))
+ http.Error(w, "Runtime function caused an error", 500)
+ return
+ }
+ w.Header().Set("X-Content-Type-Options", "nosniff")
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(200)
+ w.Write(responseBytes)
+
+ }).Methods("POST", "OPTIONS")
}
func (a *authenticationService) StartServer(logger *zap.Logger) {
@@ -221,6 +286,15 @@ func (a *authenticationService) handleAuth(w http.ResponseWriter, r *http.Reques
return
}
+ messageType := fmt.Sprintf("%T", authReq.Payload)
+ a.logger.Debug("Received message", zap.String("type", messageType))
+ authReq, fnErr := RuntimeBeforeHookAuthentication(a.runtime, a.jsonpbMarshaler, a.jsonpbUnmarshaler, authReq)
+ if fnErr != nil {
+ a.logger.Error("Runtime before function caused an error", zap.String("message", messageType), zap.Error(fnErr))
+ a.sendAuthError(w, r, "Runtime before function caused an error", 500, authReq)
+ return
+ }
+
userID, handle, errString, errCode := retrieveUserID(authReq)
if errString != "" {
a.logger.Debug("Could not retrieve user ID", zap.String("error", errString), zap.Int("code", errCode))
@@ -239,6 +313,8 @@ func (a *authenticationService) handleAuth(w http.ResponseWriter, r *http.Reques
authResponse := &AuthenticateResponse{CollationId: authReq.CollationId, Payload: &AuthenticateResponse_Session_{&AuthenticateResponse_Session{Token: signedToken}}}
a.sendAuthResponse(w, r, 200, authResponse)
+
+ RuntimeAfterHookAuthentication(a.logger, a.runtime, a.jsonpbMarshaler, authReq)
}
func (a *authenticationService) sendAuthError(w http.ResponseWriter, r *http.Request, error string, errorCode int, authRequest *AuthenticateRequest) {
@@ -932,10 +1008,10 @@ func (a *authenticationService) generateHandle() string {
return string(b)
}
-func (a *authenticationService) authenticateToken(tokenString string) (uuid.UUID, string, bool) {
+func (a *authenticationService) authenticateToken(tokenString string) (uuid.UUID, string, int64, bool) {
if tokenString == "" {
a.logger.Warn("Token missing")
- return uuid.Nil, "", false
+ return uuid.Nil, "", 0, false
}
token, err := jwt.Parse(tokenString, func(token *jwt.Token) (interface{}, error) {
@@ -950,14 +1026,14 @@ func (a *authenticationService) authenticateToken(tokenString string) (uuid.UUID
uid, uerr := uuid.FromString(claims["uid"].(string))
if uerr != nil {
a.logger.Warn("Invalid user ID in token", zap.String("token", tokenString), zap.Error(uerr))
- return uuid.Nil, "", false
+ return uuid.Nil, "", 0, false
}
- return uid, claims["han"].(string), true
+ return uid, claims["han"].(string), int64(claims["exp"].(float64)), true
}
}
a.logger.Warn("Token invalid", zap.String("token", tokenString), zap.Error(err))
- return uuid.Nil, "", false
+ return uuid.Nil, "", 0, false
}
func (a *authenticationService) Stop() {
diff --git a/server/session_registry.go b/server/session_registry.go
index 77cd73b718e738eb5635432792d1f940427f3904..a9616dfc5c1cbd2fb90ec03295f7c6ca67acd372 100644
--- a/server/session_registry.go
+++ b/server/session_registry.go
@@ -67,8 +67,8 @@ func (a *SessionRegistry) Get(sessionID uuid.UUID) *session {
return s
}
-func (a *SessionRegistry) add(userID uuid.UUID, handle string, lang string, conn *websocket.Conn, processRequest func(logger *zap.Logger, session *session, envelope *Envelope)) {
- s := NewSession(a.logger, a.config, userID, handle, lang, conn, a.remove)
+func (a *SessionRegistry) add(userID uuid.UUID, handle string, lang string, expiry int64, conn *websocket.Conn, processRequest func(logger *zap.Logger, session *session, envelope *Envelope)) {
+ s := NewSession(a.logger, a.config, userID, handle, lang, expiry, conn, a.remove)
a.Lock()
a.sessions[s.id] = s
a.Unlock()
diff --git a/tests/modules/e2e_runtime.lua b/tests/modules/e2e_runtime.lua
new file mode 100644
index 0000000000000000000000000000000000000000..c36c0317d769e5fe744df362bfd86fc2ce727db6
--- /dev/null
+++ b/tests/modules/e2e_runtime.lua
@@ -0,0 +1,163 @@
+--[[
+ Copyright 2017 The Nakama Authors
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+--]]
+
+local nk = require("nakama")
+local nx = require("nakamax")
+
+-- NOTE You must preload datasets with "e2e_runtime.sql" before each run.
+
+function print_r(arr, indentLevel)
+ local str = ""
+ local indentStr = "#"
+
+ if(indentLevel == nil) then
+ print(print_r(arr, 0))
+ return
+ end
+
+ for i = 0, indentLevel do
+ indentStr = indentStr.."\t"
+ end
+
+ for index,Value in pairs(arr) do
+ if type(Value) == "table" then
+ str = str..indentStr..index..": \n"..print_r(Value, (indentLevel + 1))
+ else
+ str = str..indentStr..index..": "..Value.."\n"
+ end
+ end
+ return str
+end
+
+--[[
+ Nakama module
+]]--
+
+-- leaderboard_create
+do
+ local id = nx.uuid_v4()
+ local md = {}
+ -- This will error if it fails.
+ nk.leaderboard_create(id, "desc", "0 0 * * 1", md, false)
+end
+
+-- leaderboard_create - which already exists
+do
+ local id = "ce042d38-c3db-4ebd-bc99-3aaa0adbdef7"
+ -- This will error if it fails.
+ -- nk.leaderboard_create(id, "desc", "0 0 * * 1", {}, false)
+end
+
+-- logger_info
+do
+ local message = nk.logger_info(("%q"):format("INFO logger."))
+ assert(message == "\"INFO logger.\"")
+end
+
+-- logger_warn
+do
+ local message = nk.logger_warn(("%q"):format("WARN logger."))
+ assert(message == "\"WARN logger.\"")
+end
+
+-- logger_error
+do
+ local message = nk.logger_error(("%q"):format("ERROR logger."))
+ assert(message == "\"ERROR logger.\"")
+end
+
+-- storage_write
+do
+ local new_Records = {
+ {Bucket = "mygame", Collection = "settings", Record = "a", UserId = nil, Value = "{}"},
+ {Bucket = "mygame", Collection = "settings", Record = "b", UserId = nil, Value = "{}"},
+ {Bucket = "mygame", Collection = "settings", Record = "c", UserId = nil, Value = "{}"}
+ }
+ -- This will error if it fails.
+ nk.storage_write(new_Records)
+end
+
+-- storage_fetch
+do
+ local Record_keys = {
+ {Bucket = "mygame", Collection = "settings", Record = "a", UserId = nil},
+ {Bucket = "mygame", Collection = "settings", Record = "b", UserId = nil},
+ {Bucket = "mygame", Collection = "settings", Record = "c", UserId = nil}
+ }
+ local Records = nk.storage_fetch(Record_keys)
+ for i, r in ipairs(Records)
+ do
+ assert(r.Value == "{}", "'r.Value' must be '{}'")
+ end
+end
+
+-- storage_remove
+do
+ local Record_keys = {
+ {Bucket = "mygame", Collection = "settings", Record = "a", UserId = nil},
+ {Bucket = "mygame", Collection = "settings", Record = "b", UserId = nil},
+ {Bucket = "mygame", Collection = "settings", Record = "c", UserId = nil}
+ }
+ -- This will error if it fails.
+ nk.storage_remove(Record_keys)
+end
+
+-- user_fetch_id
+do
+ local user_ids = {"4c2ae592-b2a7-445e-98ec-697694478b1c"}
+ local users = nk.user_fetch_id(user_ids)
+ assert(#users == 1)
+ assert(user_ids[1] == users[1].Id)
+end
+
+-- user_fetch_handle
+do
+ local user_handles = {"02ebb2c8"}
+ local users = nk.user_fetch_handle(user_handles)
+ assert(user_handles[1] == users[1].Handle)
+end
+
+--[[
+ Nakamax module
+]]--
+
+-- http_request
+do
+ local url = "https://google.com/"
+ local method = "HEAD"
+ local code, headers, respbody = nx.http_request(url, method, {}, nil)
+ assert(code == 200, "'code' must equal 200")
+end
+
+-- json_decode
+do
+ local object = nx.json_decode('{"hello": "world"}')
+ assert(object.hello, "'object.hello' must not be nil")
+ assert(object.hello == "world", "'object.hello' must equal 'world'")
+end
+
+-- json_encode
+do
+ local json = nx.json_encode({["id"] = "blah"})
+ assert(json == '{"id":"blah"}', '"json" must equal "{"id":"blah"}"')
+end
+
+-- uuid_v4
+do
+ local uuid = nx.uuid_v4()
+ assert(uuid, "'uuid' must not be nil")
+ assert(type(uuid) == "string", "'uuid' type must be string")
+end
diff --git a/tests/modules/e2e_runtime.sql b/tests/modules/e2e_runtime.sql
new file mode 100644
index 0000000000000000000000000000000000000000..92e775a142a9228594977fb2aab2af1641226146
--- /dev/null
+++ b/tests/modules/e2e_runtime.sql
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2017 The Nakama Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+SET DATABASE = nakama;
+
+BEGIN;
+UPSERT INTO leaderboard (id, authoritative, sort_order, reset_schedule, metadata)
+VALUES (b'ce042d38-c3db-4ebd-bc99-3aaa0adbdef7', true, 1, '0 0 * * 1', b'{}');
+
+UPSERT INTO users (id, handle, created_at, updated_at)
+VALUES (to_uuid('4c2ae592-b2a7-445e-98ec-697694478b1c'), b'02ebb2c8', now()::INT, now()::INT);
+COMMIT;
diff --git a/tests/runtime_test.go b/tests/runtime_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..9ffcf1fe0c68cd294d191a8eaea621ceddab779a
--- /dev/null
+++ b/tests/runtime_test.go
@@ -0,0 +1,520 @@
+package tests
+
+import (
+ "errors"
+ "io/ioutil"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "nakama/server"
+
+ "encoding/json"
+
+ "bytes"
+
+ "reflect"
+
+ "database/sql"
+ "fmt"
+ "net/url"
+
+ "github.com/gogo/protobuf/jsonpb"
+ "github.com/satori/go.uuid"
+ "go.uber.org/zap"
+)
+
+const DATA_PATH = "/tmp/nakama/data/"
+
+var db *sql.DB
+
+func setupDB() {
+ rawurl := fmt.Sprintf("postgresql://%s?sslmode=disable", "root@localhost:26257/nakama")
+ url, _ := url.Parse(rawurl)
+ db, _ = sql.Open("postgres", url.String())
+}
+
+func newRuntime() (*server.Runtime, error) {
+ logger, _ := zap.NewDevelopment(zap.AddStacktrace(zap.ErrorLevel))
+ return server.NewRuntime(logger, logger, db, server.NewRuntimeConfig(DATA_PATH))
+}
+
+func writeStatsModule() {
+ writeFile("stats.lua", `
+stats={}
+
+-- Get the mean value of a table
+function stats.mean( t )
+ local sum = 0
+ local count= 0
+
+ for k,v in pairs(t) do
+ if type(v) == 'number' then
+ sum = sum + v
+ count = count + 1
+ end
+ end
+
+ return (sum / count)
+end
+print("Stats Module Loaded")
+return stats`)
+}
+
+func writeTestModule() {
+ writeFile("test.lua", `
+test={}
+-- Get the mean value of a table
+function test.printWorld()
+ print("Hello World")
+ return {["message"]="Hello World"}
+end
+
+print("Test Module Loaded")
+return test
+`)
+}
+
+func writeFile(name, content string) {
+ os.MkdirAll(filepath.Join(DATA_PATH, "modules"), os.ModePerm)
+ ioutil.WriteFile(filepath.Join(DATA_PATH, "/modules/"+name), []byte(content), 0644)
+}
+
+func TestRuntimeSampleScript(t *testing.T) {
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+
+ l, _ := r.NewStateThread()
+ defer l.Close()
+ err = l.DoString(`
+local example = "an example string"
+for i in string.gmatch(example, "%S+") do
+ print(i)
+end`)
+
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestRuntimeDisallowStandardLibs(t *testing.T) {
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+
+ l, _ := r.NewStateThread()
+ defer l.Close()
+ err = l.DoString(`
+-- Return true if file exists and is readable.
+function file_exists(path)
+ local file = io.open(path, "r")
+ if file then file:close() end
+ return file ~= nil
+end
+file_exists "./"`)
+
+ if err == nil {
+ t.Error(errors.New("Successfully accessed IO package"))
+ }
+}
+
+// This test will always pass.
+// Have a look at the stdout messages to see if the module was loaded multiple times
+// You should only see "Test Module Loaded" once
+func TestRuntimeRequireEval(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeTestModule()
+ writeFile("test-invoke.lua", `
+local nakama = require("nakama")
+local test = require("test")
+test.printWorld()
+`)
+
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestRuntimeRequireFile(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeStatsModule()
+ writeFile("local_test.lua", `
+local stats = require("stats")
+t = {[1]=5, [2]=7, [3]=8, [4]='Something else.'}
+assert(stats.mean(t) > 0)
+`)
+
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestRuntimeRequirePreload(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeStatsModule()
+ writeFile("states-invoke.lua", `
+local stats = require("stats")
+t = {[1]=5, [2]=7, [3]=8, [4]='Something else.'}
+print(stats.mean(t))
+`)
+
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestRuntimeKeepChangesBetweenStates(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeFile("var.lua", `
+var={}
+var.count = 1
+return var
+ `)
+
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+
+ l, _ := r.NewStateThread()
+ defer l.Close()
+
+ err = l.DoString(`
+local var = require("var")
+var.count = 2`)
+
+ if err != nil {
+ t.Error(err)
+ }
+
+ err = l.DoString(`
+local var = require("var")
+assert(var.count == 2)`)
+
+ if err != nil {
+ t.Error(err)
+ }
+
+ l2, _ := r.NewStateThread()
+ defer l2.Close()
+ err = l2.DoString(`
+local var = require("var")
+assert(var.count == 2)`)
+
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestRuntimeRegisterHTTP(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeTestModule()
+ writeFile("http-invoke.lua", `
+local nakama = require("nakama")
+local test = require("test")
+nakama.register_http(test.printWorld, "/test/helloworld")
+ `)
+
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+
+ fn := r.GetRuntimeCallback(server.HTTP, "/test/helloworld")
+ m, err := r.InvokeFunctionHTTP(fn, uuid.Nil, "", 0, nil)
+ if err != nil {
+ t.Error(err)
+ }
+
+ msg := m["message"]
+ if msg != "Hello World" {
+ t.Error("Invocation failed. Return result not expected")
+ }
+}
+
+func TestRuntimeRegisterHTTPNoResponse(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeFile("test.lua", `
+test={}
+-- Get the mean value of a table
+function test.printWorld(ctx)
+ print("Hello World")
+end
+
+print("Test Module Loaded")
+return test
+ `)
+ writeFile("http-invoke.lua", `
+local nakama = require("nakama")
+local test = require("test")
+nakama.register_http(test.printWorld, "/test/helloworld")
+ `)
+
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+
+ fn := r.GetRuntimeCallback(server.HTTP, "/test/helloworld")
+ _, err = r.InvokeFunctionHTTP(fn, uuid.Nil, "", 0, nil)
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestRuntimeRegisterHTTPWithPayload(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeFile("test.lua", `
+test={}
+-- Get the mean value of a table
+function test.printWorld(ctx, payload)
+ print("Hello World")
+ print(ctx.execution_mode)
+ return payload
+end
+
+print("Test Module Loaded")
+return test
+ `)
+ writeFile("http-invoke.lua", `
+local nakama = require("nakama")
+local test = require("test")
+nakama.register_http(test.printWorld, "/test/helloworld")
+ `)
+
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+
+ fn := r.GetRuntimeCallback(server.HTTP, "/test/helloworld")
+ payload := make(map[string]interface{})
+ payload["message"] = "Hello World"
+
+ m, err := r.InvokeFunctionHTTP(fn, uuid.Nil, "", 0, payload)
+ if err != nil {
+ t.Error(err)
+ }
+
+ msg := m["message"]
+ if msg != "Hello World" {
+ t.Error("Invocation failed. Return result not expected")
+ }
+}
+
+func TestRuntimeRegisterRPCWithPayload(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeFile("test.lua", `
+test={}
+-- Get the mean value of a table
+function test.printWorld(ctx, payload)
+ print("Hello World")
+ print(ctx.execution_mode)
+ return payload
+end
+
+print("Test Module Loaded")
+return test
+ `)
+ writeFile("http-invoke.lua", `
+local nakama = require("nakama")
+local test = require("test")
+nakama.register_rpc(test.printWorld, "helloworld")
+ `)
+
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+
+ fn := r.GetRuntimeCallback(server.RPC, "helloworld")
+ payload := []byte("Hello World")
+
+ m, err := r.InvokeFunctionRPC(fn, uuid.Nil, "", 0, payload)
+ if err != nil {
+ t.Error(err)
+ }
+
+ if string(m) != "Hello World" {
+ t.Error("Invocation failed. Return result not expected")
+ }
+}
+
+func TestRuntimeRegisterBeforeWithPayload(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeFile("test.lua", `
+test={}
+-- Get the mean value of a table
+function test.printWorld(ctx, payload)
+ print("Hello World")
+ print(ctx.execution_mode)
+ return payload
+end
+
+print("Test Module Loaded")
+return test
+ `)
+ writeFile("http-invoke.lua", `
+local nakama = require("nakama")
+local test = require("test")
+nakama.register_before(test.printWorld, "SelfFetch")
+ `)
+
+ jsonpbMarshaler := &jsonpb.Marshaler{
+ EnumsAsInts: true,
+ EmitDefaults: false,
+ Indent: "",
+ OrigName: false,
+ }
+ jsonpbUnmarshaler := &jsonpb.Unmarshaler{
+ AllowUnknownFields: false,
+ }
+
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+
+ fn := r.GetRuntimeCallback(server.BEFORE, "SelfFetch")
+ envelope := &server.Envelope{
+ CollationId: "123",
+ Payload: &server.Envelope_SelfFetch{
+ SelfFetch: &server.TSelfFetch{},
+ }}
+
+ strEnvelope, err := jsonpbMarshaler.MarshalToString(envelope)
+ if err != nil {
+ t.Error(err)
+ }
+
+ var jsonEnvelope map[string]interface{}
+ if err = json.Unmarshal([]byte(strEnvelope), &jsonEnvelope); err != nil {
+ t.Error(err)
+ }
+
+ result, err := r.InvokeFunctionBefore(fn, uuid.Nil, "", 0, jsonEnvelope)
+ if err != nil {
+ t.Error(err)
+ }
+
+ bytesEnvelope, err := json.Marshal(result)
+ if err != nil {
+ t.Error(err)
+ }
+
+ resultEnvelope := &server.Envelope{}
+ if err = jsonpbUnmarshaler.Unmarshal(bytes.NewReader(bytesEnvelope), resultEnvelope); err != nil {
+ t.Error(err)
+ }
+
+ if !reflect.DeepEqual(envelope, resultEnvelope) {
+ t.Error("Input Proto is not the same as Output proto.")
+ }
+
+}
+
+func TestRuntimeUserId(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeFile("userid.lua", `
+local nk = require("nakama")
+
+local user_ids = {
+ "fd09791f-3297-40bd-b411-1afe316fd2e8",
+ "fd8db1fc-6f79-4302-a54c-5960c99601a1"
+}
+
+local users = nk.user_fetch_id(user_ids)
+ `)
+
+ setupDB()
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestRuntimeLeaderboardCreate(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeFile("userid.lua", `
+local nk = require("nakama")
+local nkx = require("nakamax")
+
+leaderboard_id = nkx.uuid_v4()
+local metadata = {
+ weather_conditions = "rain"
+}
+
+nk.leaderboard_create(leaderboard_id, "desc", "0 0 * * 1", metadata, false)
+ `)
+
+ setupDB()
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestStorageWrite(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeFile("storage_write.lua", `
+local nk = require("nakama")
+
+local new_records = {
+ {bucket = "mygame", collection = "settings", record = "a", user_id = nil, value = "{}"},
+ {bucket = "mygame", collection = "settings", record = "b", user_id = nil, value = "{}"},
+ {bucket = "mygame", collection = "settings", record = "c", user_id = nil, value = "{}"}
+}
+
+nk.storage_write(new_records)
+`)
+
+ setupDB()
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+}
+
+func TestStorageFetch(t *testing.T) {
+ defer os.RemoveAll(DATA_PATH)
+ writeFile("storage_fetch.lua", `
+local nk = require("nakama")
+local record_keys = {
+ {bucket = "mygame", collection = "settings", record = "a", user_id = nil},
+ {bucket = "mygame", collection = "settings", record = "b", user_id = nil},
+ {bucket = "mygame", collection = "settings", record = "c", user_id = nil}
+}
+local records = nk.storage_fetch(record_keys)
+for i, r in ipairs(records)
+do
+ assert(r.value == "{}", "'r.value' must be '{}'")
+end
+`)
+
+ setupDB()
+ r, err := newRuntime()
+ defer r.Stop()
+ if err != nil {
+ t.Error(err)
+ }
+}