Loading CHANGELOG.md +2 −0 Original line number Diff line number Diff line Loading @@ -8,6 +8,8 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - New timeout option to HTTP request function in the code runtime. - Set QoS settings on client outgoing message queue. - New runtime pool min/max size options. - New user ban and unban functions. - RPC functions triggered by HTTP GET requests now expose any custom query parameters. ### Changed - The avatar URL fields in various domain objects now support up to 512 characters for FBIG. Loading server/api.go +19 −1 Original line number Diff line number Diff line Loading @@ -106,7 +106,25 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, j // Register and start GRPC Gateway server. // Should start after GRPC server itself because RegisterNakamaHandlerFromEndpoint below tries to dial GRPC. ctx := context.Background() grpcGateway := runtime.NewServeMux() grpcGateway := runtime.NewServeMux( runtime.WithMetadata(func(ctx context.Context, r *http.Request) metadata.MD { // For RPC GET operations pass through any custom query parameters. if r.Method != "GET" || !strings.HasPrefix(r.URL.Path, "/v2/rpc/") { return metadata.MD{} } q := r.URL.Query() p := make(map[string][]string, len(q)) for k, vs := range q { if k == "http_key" { // Skip Nakama's own query params, only process custom ones. continue } p["q_"+k] = vs } return metadata.MD(p) }), ) dialAddr := fmt.Sprintf("127.0.0.1:%d", config.GetSocket().Port-1) dialOpts := []grpc.DialOption{ //TODO (mo, zyro): Do we need to pass the statsHandler here as well? Loading server/api_rpc.go +14 −1 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ import ( "go.uber.org/zap/zapcore" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) Loading @@ -38,6 +39,18 @@ func (s *ApiServer) RpcFunc(ctx context.Context, in *api.Rpc) (*api.Rpc, error) return nil, status.Error(codes.NotFound, "RPC function not found") } queryParams := make(map[string][]string, 0) if md, ok := metadata.FromIncomingContext(ctx); !ok { return nil, status.Error(codes.Internal, "RPC function could not get incoming context") } else { for k, vs := range md { // Only process the keys representing custom query parameters. if strings.HasPrefix(k, "q_") { queryParams[k[2:]] = vs } } } uid := "" username := "" expiry := int64(0) Loading @@ -58,7 +71,7 @@ func (s *ApiServer) RpcFunc(ctx context.Context, in *api.Rpc) (*api.Rpc, error) return nil, status.Error(codes.NotFound, "RPC function not found") } result, fnErr, code := runtime.InvokeFunction(ExecutionModeRPC, lf, uid, username, expiry, "", in.Payload) result, fnErr, code := runtime.InvokeFunction(ExecutionModeRPC, lf, queryParams, uid, username, expiry, "", in.Payload) s.runtimePool.Put(runtime) if fnErr != nil { Loading server/core_user.go +34 −0 Original line number Diff line number Diff line Loading @@ -111,6 +111,40 @@ func DeleteUser(tx *sql.Tx, userID uuid.UUID) (int64, error) { return res.RowsAffected() } func BanUsers(logger *zap.Logger, db *sql.DB, ids []string) error { statements := make([]string, 0, len(ids)) params := make([]interface{}, 0, len(ids)) for i, id := range ids { statements = append(statements, "$"+strconv.Itoa(i+1)) params = append(params, id) } query := "UPDATE users SET disable_time = now() WHERE id IN (" + strings.Join(statements, ", ") + ")" _, err := db.Exec(query, params...) if err != nil { logger.Error("Error banning user accounts.", zap.Error(err), zap.Strings("ids", ids)) return err } return nil } func UnbanUsers(logger *zap.Logger, db *sql.DB, ids []string) error { statements := make([]string, 0, len(ids)) params := make([]interface{}, 0, len(ids)) for i, id := range ids { statements = append(statements, "$"+strconv.Itoa(i+1)) params = append(params, id) } query := "UPDATE users SET disable_time = CAST(0 AS TIMESTAMPTZ) WHERE id IN (" + strings.Join(statements, ", ") + ")" _, err := db.Exec(query, params...) if err != nil { logger.Error("Error unbanning user accounts.", zap.Error(err), zap.Strings("ids", ids)) return err } return nil } func UserExistsAndDoesNotBlock(db *sql.DB, checkUserID, blocksUserID uuid.UUID) (bool, error) { var count int err := db.QueryRow(` Loading server/pipeline_rpc.go +1 −1 Original line number Diff line number Diff line Loading @@ -55,7 +55,7 @@ func (p *Pipeline) rpc(logger *zap.Logger, session Session, envelope *rtapi.Enve return } result, fnErr, _ := runtime.InvokeFunction(ExecutionModeRPC, lf, session.UserID().String(), session.Username(), session.Expiry(), session.ID().String(), rpcMessage.Payload) result, fnErr, _ := runtime.InvokeFunction(ExecutionModeRPC, lf, nil, session.UserID().String(), session.Username(), session.Expiry(), session.ID().String(), rpcMessage.Payload) p.runtimePool.Put(runtime) if fnErr != nil { logger.Error("Runtime RPC function caused an error", zap.String("id", rpcMessage.Id), zap.Error(fnErr)) Loading Loading
CHANGELOG.md +2 −0 Original line number Diff line number Diff line Loading @@ -8,6 +8,8 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - New timeout option to HTTP request function in the code runtime. - Set QoS settings on client outgoing message queue. - New runtime pool min/max size options. - New user ban and unban functions. - RPC functions triggered by HTTP GET requests now expose any custom query parameters. ### Changed - The avatar URL fields in various domain objects now support up to 512 characters for FBIG. Loading
server/api.go +19 −1 Original line number Diff line number Diff line Loading @@ -106,7 +106,25 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, j // Register and start GRPC Gateway server. // Should start after GRPC server itself because RegisterNakamaHandlerFromEndpoint below tries to dial GRPC. ctx := context.Background() grpcGateway := runtime.NewServeMux() grpcGateway := runtime.NewServeMux( runtime.WithMetadata(func(ctx context.Context, r *http.Request) metadata.MD { // For RPC GET operations pass through any custom query parameters. if r.Method != "GET" || !strings.HasPrefix(r.URL.Path, "/v2/rpc/") { return metadata.MD{} } q := r.URL.Query() p := make(map[string][]string, len(q)) for k, vs := range q { if k == "http_key" { // Skip Nakama's own query params, only process custom ones. continue } p["q_"+k] = vs } return metadata.MD(p) }), ) dialAddr := fmt.Sprintf("127.0.0.1:%d", config.GetSocket().Port-1) dialOpts := []grpc.DialOption{ //TODO (mo, zyro): Do we need to pass the statsHandler here as well? Loading
server/api_rpc.go +14 −1 Original line number Diff line number Diff line Loading @@ -24,6 +24,7 @@ import ( "go.uber.org/zap/zapcore" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) Loading @@ -38,6 +39,18 @@ func (s *ApiServer) RpcFunc(ctx context.Context, in *api.Rpc) (*api.Rpc, error) return nil, status.Error(codes.NotFound, "RPC function not found") } queryParams := make(map[string][]string, 0) if md, ok := metadata.FromIncomingContext(ctx); !ok { return nil, status.Error(codes.Internal, "RPC function could not get incoming context") } else { for k, vs := range md { // Only process the keys representing custom query parameters. if strings.HasPrefix(k, "q_") { queryParams[k[2:]] = vs } } } uid := "" username := "" expiry := int64(0) Loading @@ -58,7 +71,7 @@ func (s *ApiServer) RpcFunc(ctx context.Context, in *api.Rpc) (*api.Rpc, error) return nil, status.Error(codes.NotFound, "RPC function not found") } result, fnErr, code := runtime.InvokeFunction(ExecutionModeRPC, lf, uid, username, expiry, "", in.Payload) result, fnErr, code := runtime.InvokeFunction(ExecutionModeRPC, lf, queryParams, uid, username, expiry, "", in.Payload) s.runtimePool.Put(runtime) if fnErr != nil { Loading
server/core_user.go +34 −0 Original line number Diff line number Diff line Loading @@ -111,6 +111,40 @@ func DeleteUser(tx *sql.Tx, userID uuid.UUID) (int64, error) { return res.RowsAffected() } func BanUsers(logger *zap.Logger, db *sql.DB, ids []string) error { statements := make([]string, 0, len(ids)) params := make([]interface{}, 0, len(ids)) for i, id := range ids { statements = append(statements, "$"+strconv.Itoa(i+1)) params = append(params, id) } query := "UPDATE users SET disable_time = now() WHERE id IN (" + strings.Join(statements, ", ") + ")" _, err := db.Exec(query, params...) if err != nil { logger.Error("Error banning user accounts.", zap.Error(err), zap.Strings("ids", ids)) return err } return nil } func UnbanUsers(logger *zap.Logger, db *sql.DB, ids []string) error { statements := make([]string, 0, len(ids)) params := make([]interface{}, 0, len(ids)) for i, id := range ids { statements = append(statements, "$"+strconv.Itoa(i+1)) params = append(params, id) } query := "UPDATE users SET disable_time = CAST(0 AS TIMESTAMPTZ) WHERE id IN (" + strings.Join(statements, ", ") + ")" _, err := db.Exec(query, params...) if err != nil { logger.Error("Error unbanning user accounts.", zap.Error(err), zap.Strings("ids", ids)) return err } return nil } func UserExistsAndDoesNotBlock(db *sql.DB, checkUserID, blocksUserID uuid.UUID) (bool, error) { var count int err := db.QueryRow(` Loading
server/pipeline_rpc.go +1 −1 Original line number Diff line number Diff line Loading @@ -55,7 +55,7 @@ func (p *Pipeline) rpc(logger *zap.Logger, session Session, envelope *rtapi.Enve return } result, fnErr, _ := runtime.InvokeFunction(ExecutionModeRPC, lf, session.UserID().String(), session.Username(), session.Expiry(), session.ID().String(), rpcMessage.Payload) result, fnErr, _ := runtime.InvokeFunction(ExecutionModeRPC, lf, nil, session.UserID().String(), session.Username(), session.Expiry(), session.ID().String(), rpcMessage.Payload) p.runtimePool.Put(runtime) if fnErr != nil { logger.Error("Runtime RPC function caused an error", zap.String("id", rpcMessage.Id), zap.Error(fnErr)) Loading