diff --git a/CHANGELOG.md b/CHANGELOG.md index 8457b81d60b08e3dcbe6e935064941ce68096b8b..0c8be8d3866257c480fd6c42ef17ade1e4766d46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to this project are documented below. The format is based on [keep a changelog](http://keepachangelog.com/) and this project uses [semantic versioning](http://semver.org/). ## [Unreleased] +### Added +- New code runtime function to list leaderboard records for a given set of users. +- New code runtime function to list leaderboard records around a given user. +- New code runtime function to execute raw SQL queries. + +### Changed +- Handle update now returns a bad input error code if handle is too long. +- Improved handling of content type request headers in HTTP runtime script invocations. ## [1.0.1] - 2017-08-05 ### Added diff --git a/server/core_leaderboard.go b/server/core_leaderboard.go index 8b07aa6f2b3e3720ad18d3f75227ed6f78002f77..3763d91b0fd00c735e9390bf36383e7d2dd7a9d9 100644 --- a/server/core_leaderboard.go +++ b/server/core_leaderboard.go @@ -20,9 +20,13 @@ import ( "database/sql" "errors" + "bytes" + "encoding/gob" "github.com/gorhill/cronexpr" "github.com/satori/go.uuid" "go.uber.org/zap" + "strconv" + "strings" ) func leaderboardCreate(logger *zap.Logger, db *sql.DB, id []byte, sortOrder, resetSchedule, metadata string, authoritative bool) ([]byte, error) { @@ -75,8 +79,12 @@ func leaderboardCreate(logger *zap.Logger, db *sql.DB, id []byte, sortOrder, res res, err := db.Exec(query, params...) if err != nil { - logger.Error("Error creating leaderboard", zap.Error(err)) - return nil, err + if strings.HasSuffix(err.Error(), "violates unique constraint \"primary\"") { + return nil, errors.New("Leaderboard ID already in use") + } else { + logger.Error("Error creating leaderboard", zap.Error(err)) + return nil, err + } } if rowsAffected, _ := res.RowsAffected(); rowsAffected != 1 { logger.Error("Error creating leaderboard, unexpected insert result") @@ -86,7 +94,381 @@ func leaderboardCreate(logger *zap.Logger, db *sql.DB, id []byte, sortOrder, res return params[0].([]byte), nil } -func leaderboardSubmit(logger *zap.Logger, db *sql.DB, caller uuid.UUID, leaderboardID []byte, ownerID uuid.UUID, handle string, lang string, op string, value int64, location string, timezone string, metadata []byte) (*LeaderboardRecord, error) { +func leaderboardRecordsList(logger *zap.Logger, db *sql.DB, caller uuid.UUID, list *TLeaderboardRecordsList) ([]*LeaderboardRecord, []byte, Error_Code, error) { + if len(list.LeaderboardId) == 0 { + return nil, nil, BAD_INPUT, errors.New("Leaderboard ID must be present") + } + + limit := list.Limit + if limit == 0 { + limit = 10 + } else if limit < 10 || limit > 100 { + return nil, nil, BAD_INPUT, errors.New("Limit must be between 10 and 100") + } + + var incomingCursor *leaderboardRecordListCursor + if len(list.Cursor) != 0 { + incomingCursor = &leaderboardRecordListCursor{} + if err := gob.NewDecoder(bytes.NewReader(list.Cursor)).Decode(incomingCursor); err != nil { + return nil, nil, BAD_INPUT, errors.New("Invalid cursor data") + } + } + + var sortOrder int64 + var resetSchedule sql.NullString + query := "SELECT sort_order, reset_schedule FROM leaderboard WHERE id = $1" + logger.Debug("Leaderboard lookup", zap.String("query", query)) + err := db.QueryRow(query, list.LeaderboardId). + Scan(&sortOrder, &resetSchedule) + if err != nil { + logger.Error("Could not execute leaderboard records list metadata query", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + + currentExpiresAt := int64(0) + if resetSchedule.Valid { + expr, err := cronexpr.Parse(resetSchedule.String) + if err != nil { + logger.Error("Could not parse leaderboard reset schedule query", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + currentExpiresAt = timeToMs(expr.Next(now())) + } + + query = `SELECT id, owner_id, handle, lang, location, timezone, + rank_value, score, num_score, metadata, ranked_at, updated_at, expires_at, banned_at + FROM leaderboard_record + WHERE leaderboard_id = $1 + AND expires_at = $2` + params := []interface{}{list.LeaderboardId, currentExpiresAt} + + returnCursor := true + switch list.Filter.(type) { + case *TLeaderboardRecordsList_OwnerId: + if incomingCursor != nil { + return nil, nil, BAD_INPUT, errors.New("Cursor not allowed with haystack query") + } + // Haystack queries are executed in a separate flow. + return loadLeaderboardRecordsHaystack(logger, db, caller, list, list.LeaderboardId, list.GetOwnerId(), currentExpiresAt, limit, sortOrder, query, params) + case *TLeaderboardRecordsList_OwnerIds: + if incomingCursor != nil { + return nil, nil, BAD_INPUT, errors.New("Cursor not allowed with batch filter query") + } + if len(list.GetOwnerIds().OwnerIds) < 1 || len(list.GetOwnerIds().OwnerIds) > 100 { + return nil, nil, BAD_INPUT, errors.New("Must be 1-100 owner IDs") + } + statements := []string{} + for _, ownerId := range list.GetOwnerIds().OwnerIds { + params = append(params, ownerId) + statements = append(statements, "$"+strconv.Itoa(len(params))) + } + query += " AND owner_id IN (" + strings.Join(statements, ", ") + ")" + // Never return a cursor with this filter type. + returnCursor = false + case *TLeaderboardRecordsList_Lang: + query += " AND lang = $3" + params = append(params, list.GetLang()) + case *TLeaderboardRecordsList_Location: + query += " AND location = $3" + params = append(params, list.GetLocation()) + case *TLeaderboardRecordsList_Timezone: + query += " AND timezone = $3" + params = append(params, list.GetTimezone()) + case nil: + // No filter. + break + default: + return nil, nil, BAD_INPUT, errors.New("Unknown leaderboard record list filter") + } + + if incomingCursor != nil { + count := len(params) + if sortOrder == 0 { + // Ascending leaderboard. + query += " AND (score, updated_at, id) > ($" + strconv.Itoa(count) + + ", $" + strconv.Itoa(count+1) + + ", $" + strconv.Itoa(count+2) + ")" + params = append(params, incomingCursor.Score, incomingCursor.UpdatedAt, incomingCursor.Id) + } else { + // Descending leaderboard. + query += " AND (score, updated_at_inverse, id) < ($" + strconv.Itoa(count) + + ", $" + strconv.Itoa(count+1) + + ", $" + strconv.Itoa(count+2) + ")" + params = append(params, incomingCursor.Score, invertMs(incomingCursor.UpdatedAt), incomingCursor.Id) + } + } + + if sortOrder == 0 { + // Ascending leaderboard, lower score is better. + query += " ORDER BY score ASC, updated_at ASC" + } else { + // Descending leaderboard, higher score is better. + query += " ORDER BY score DESC, updated_at_inverse DESC" + } + + params = append(params, limit+1) + query += " LIMIT $" + strconv.Itoa(len(params)) + + logger.Debug("Leaderboard records list", zap.String("query", query)) + rows, err := db.Query(query, params...) + if err != nil { + logger.Error("Could not execute leaderboard records list query", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + defer rows.Close() + + leaderboardRecords := []*LeaderboardRecord{} + var outgoingCursor []byte + + var id []byte + var ownerId []byte + var handle string + var lang string + var location sql.NullString + var timezone sql.NullString + var rankValue int64 + var score int64 + var numScore int64 + var metadata []byte + var rankedAt int64 + var updatedAt int64 + var expiresAt int64 + var bannedAt int64 + for rows.Next() { + if returnCursor && int64(len(leaderboardRecords)) >= limit { + cursorBuf := new(bytes.Buffer) + newCursor := &leaderboardRecordListCursor{ + Score: score, + UpdatedAt: updatedAt, + Id: id, + } + if gob.NewEncoder(cursorBuf).Encode(newCursor); err != nil { + logger.Error("Error creating leaderboard records list cursor", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + outgoingCursor = cursorBuf.Bytes() + break + } + + err = rows.Scan(&id, &ownerId, &handle, &lang, &location, &timezone, + &rankValue, &score, &numScore, &metadata, &rankedAt, &updatedAt, &expiresAt, &bannedAt) + if err != nil { + logger.Error("Could not scan leaderboard records list query results", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + + leaderboardRecords = append(leaderboardRecords, &LeaderboardRecord{ + LeaderboardId: list.LeaderboardId, + OwnerId: ownerId, + Handle: handle, + Lang: lang, + Location: location.String, + Timezone: timezone.String, + Rank: rankValue, + Score: score, + NumScore: numScore, + Metadata: metadata, + RankedAt: rankedAt, + UpdatedAt: updatedAt, + ExpiresAt: expiresAt, + }) + } + if err = rows.Err(); err != nil { + logger.Error("Could not process leaderboard records list query results", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + + return normalizeLeaderboardRecords(leaderboardRecords), outgoingCursor, 0, nil +} + +func loadLeaderboardRecordsHaystack(logger *zap.Logger, db *sql.DB, caller uuid.UUID, list *TLeaderboardRecordsList, leaderboardId, findOwnerId []byte, currentExpiresAt, limit, sortOrder int64, query string, params []interface{}) ([]*LeaderboardRecord, []byte, Error_Code, error) { + // Find the owner's record. + var id []byte + var score int64 + var updatedAt int64 + findQuery := `SELECT id, score, updated_at + FROM leaderboard_record + WHERE leaderboard_id = $1 + AND expires_at = $2 + AND owner_id = $3` + logger.Debug("Leaderboard record find", zap.String("query", findQuery)) + err := db.QueryRow(findQuery, leaderboardId, currentExpiresAt, findOwnerId).Scan(&id, &score, &updatedAt) + if err == sql.ErrNoRows { + return []*LeaderboardRecord{}, nil, 0, nil + } else if err != nil { + logger.Error("Could not load owner record in leaderboard records list haystack", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + + // First half. + count := len(params) + firstQuery := query + firstParams := make([]interface{}, len(params)) + copy(firstParams, params) + if sortOrder == 0 { + // Lower score is better, but get in reverse order from current user to get those immediately above. + firstQuery += " AND (score, updated_at_inverse, id) <= ($" + strconv.Itoa(count+1) + + ", $" + strconv.Itoa(count+2) + + ", $" + strconv.Itoa(count+3) + ") ORDER BY score DESC, updated_at_inverse DESC" + firstParams = append(firstParams, score, invertMs(updatedAt), id) + } else { + // Higher score is better. + firstQuery += " AND (score, updated_at, id) >= ($" + strconv.Itoa(count+1) + + ", $" + strconv.Itoa(count+2) + + ", $" + strconv.Itoa(count+3) + ") ORDER BY score ASC, updated_at ASC" + firstParams = append(firstParams, score, updatedAt, id) + } + firstParams = append(firstParams, int64(limit/2)) + firstQuery += " LIMIT $" + strconv.Itoa(len(firstParams)) + + logger.Debug("Leaderboard records list", zap.String("query", firstQuery)) + firstRows, err := db.Query(firstQuery, firstParams...) + if err != nil { + logger.Error("Could not execute leaderboard records list query", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + defer firstRows.Close() + + leaderboardRecords := []*LeaderboardRecord{} + + var ownerId []byte + var handle string + var lang string + var location sql.NullString + var timezone sql.NullString + var rankValue int64 + var numScore int64 + var metadata []byte + var rankedAt int64 + var expiresAt int64 + var bannedAt int64 + for firstRows.Next() { + err = firstRows.Scan(&id, &ownerId, &handle, &lang, &location, &timezone, + &rankValue, &score, &numScore, &metadata, &rankedAt, &updatedAt, &expiresAt, &bannedAt) + if err != nil { + logger.Error("Could not scan leaderboard records list query results", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + + leaderboardRecords = append(leaderboardRecords, &LeaderboardRecord{ + LeaderboardId: leaderboardId, + OwnerId: ownerId, + Handle: handle, + Lang: lang, + Location: location.String, + Timezone: timezone.String, + Rank: rankValue, + Score: score, + NumScore: numScore, + Metadata: metadata, + RankedAt: rankedAt, + UpdatedAt: updatedAt, + ExpiresAt: expiresAt, + }) + } + if err = firstRows.Err(); err != nil { + logger.Error("Could not process leaderboard records list query results", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + + // We went 'up' on the leaderboard, so reverse the first half of records. + for left, right := 0, len(leaderboardRecords)-1; left < right; left, right = left+1, right-1 { + leaderboardRecords[left], leaderboardRecords[right] = leaderboardRecords[right], leaderboardRecords[left] + } + + // Second half. + secondQuery := query + secondParams := make([]interface{}, len(params)) + copy(secondParams, params) + if sortOrder == 0 { + // Lower score is better. + secondQuery += " AND (score, updated_at, id) > ($" + strconv.Itoa(count+1) + + ", $" + strconv.Itoa(count+2) + + ", $" + strconv.Itoa(count+3) + ") ORDER BY score ASC, updated_at ASC" + secondParams = append(secondParams, score, updatedAt, id) + } else { + // Higher score is better. + secondQuery += " AND (score, updated_at_inverse, id) < ($" + strconv.Itoa(count+1) + + ", $" + strconv.Itoa(count+2) + + ", $" + strconv.Itoa(count+3) + ") ORDER BY score DESC, updated_at DESC" + secondParams = append(secondParams, score, invertMs(updatedAt), id) + } + secondParams = append(secondParams, limit-int64(len(leaderboardRecords))+2) + secondQuery += " LIMIT $" + strconv.Itoa(len(secondParams)) + + logger.Debug("Leaderboard records list", zap.String("query", secondQuery)) + secondRows, err := db.Query(secondQuery, secondParams...) + if err != nil { + logger.Error("Could not execute leaderboard records list query", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + defer secondRows.Close() + + var outgoingCursor []byte + + for secondRows.Next() { + if int64(len(leaderboardRecords)) >= limit { + cursorBuf := new(bytes.Buffer) + newCursor := &leaderboardRecordListCursor{ + Score: score, + UpdatedAt: updatedAt, + Id: id, + } + if gob.NewEncoder(cursorBuf).Encode(newCursor); err != nil { + logger.Error("Error creating leaderboard records list cursor", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + outgoingCursor = cursorBuf.Bytes() + break + } + + err = secondRows.Scan(&id, &ownerId, &handle, &lang, &location, &timezone, + &rankValue, &score, &numScore, &metadata, &rankedAt, &updatedAt, &expiresAt, &bannedAt) + if err != nil { + logger.Error("Could not scan leaderboard records list query results", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + + leaderboardRecords = append(leaderboardRecords, &LeaderboardRecord{ + LeaderboardId: leaderboardId, + OwnerId: ownerId, + Handle: handle, + Lang: lang, + Location: location.String, + Timezone: timezone.String, + Rank: rankValue, + Score: score, + NumScore: numScore, + Metadata: metadata, + RankedAt: rankedAt, + UpdatedAt: updatedAt, + ExpiresAt: expiresAt, + }) + } + if err = secondRows.Err(); err != nil { + logger.Error("Could not process leaderboard records list query results", zap.Error(err)) + return nil, nil, RUNTIME_EXCEPTION, errors.New("Error loading leaderboard records") + } + + return normalizeLeaderboardRecords(leaderboardRecords), outgoingCursor, 0, nil +} + +func normalizeLeaderboardRecords(records []*LeaderboardRecord) []*LeaderboardRecord { + var bestRank int64 + for _, record := range records { + if record.Rank != 0 && record.Rank < bestRank { + bestRank = record.Rank + } + } + if bestRank != 0 { + for i := int64(0); i < int64(len(records)); i++ { + records[i].Rank = bestRank + i + } + } + return records +} + +func leaderboardSubmit(logger *zap.Logger, db *sql.DB, caller uuid.UUID, leaderboardID []byte, ownerID uuid.UUID, handle string, lang string, op string, value int64, location string, timezone string, metadata []byte) (*LeaderboardRecord, Error_Code, error) { var authoritative bool var sortOrder int64 var resetSchedule sql.NullString @@ -96,7 +478,7 @@ func leaderboardSubmit(logger *zap.Logger, db *sql.DB, caller uuid.UUID, leaderb Scan(&authoritative, &sortOrder, &resetSchedule) if err != nil { logger.Error("Could not execute leaderboard record write metadata query", zap.Error(err)) - return nil, errors.New("Error writing leaderboard record") + return nil, RUNTIME_EXCEPTION, errors.New("Error writing leaderboard record") } now := now() @@ -106,13 +488,13 @@ func leaderboardSubmit(logger *zap.Logger, db *sql.DB, caller uuid.UUID, leaderb expr, err := cronexpr.Parse(resetSchedule.String) if err != nil { logger.Error("Could not parse leaderboard reset schedule query", zap.Error(err)) - return nil, errors.New("Error writing leaderboard record") + return nil, RUNTIME_EXCEPTION, errors.New("Error writing leaderboard record") } expiresAt = timeToMs(expr.Next(now)) } - if authoritative == true && caller != uuid.Nil { - return nil, errors.New("Cannot submit to authoritative leaderboard") + if authoritative && caller != uuid.Nil { + return nil, BAD_INPUT, errors.New("Cannot submit to authoritative leaderboard") } var scoreOpSql string @@ -142,7 +524,7 @@ func leaderboardSubmit(logger *zap.Logger, db *sql.DB, caller uuid.UUID, leaderb scoreDelta = value scoreAbs = value default: - return nil, errors.New("Unknown leaderboard record write operator") + return nil, BAD_INPUT, errors.New("Unknown leaderboard record write operator") } params := []interface{}{uuid.NewV4().Bytes(), leaderboardID, ownerID.Bytes(), handle, lang} @@ -175,18 +557,18 @@ func leaderboardSubmit(logger *zap.Logger, db *sql.DB, caller uuid.UUID, leaderb res, err := db.Exec(query, params...) if err != nil { logger.Error("Could not execute leaderboard record write query", zap.Error(err)) - return nil, errors.New("Error writing leaderboard record") + return nil, RUNTIME_EXCEPTION, errors.New("Error writing leaderboard record") } if rowsAffected, _ := res.RowsAffected(); rowsAffected == 0 { logger.Error("Unexpected row count from leaderboard record write query") - return nil, errors.New("Error writing leaderboard record") + return nil, RUNTIME_EXCEPTION, errors.New("Error writing leaderboard record") } record, err := leaderboardQueryRecords(logger, db, leaderboardID, ownerID, handle, lang, expiresAt, updatedAt) if err != nil { - return nil, errors.New("Error writing leaderboard record") + return nil, RUNTIME_EXCEPTION, errors.New("Error writing leaderboard record") } - return record, nil + return record, 0, nil } func leaderboardQueryRecords(logger *zap.Logger, db *sql.DB, leaderboardID []byte, ownerID uuid.UUID, handle string, lang string, expiresAt int64, updatedAt int64) (*LeaderboardRecord, error) { diff --git a/server/pipeline_leaderboard.go b/server/pipeline_leaderboard.go index a57c5a602aece79d151b5cb6957d0694f130a1ed..dc4de1a9cac6f5e364ad8b816a015e477205dea3 100644 --- a/server/pipeline_leaderboard.go +++ b/server/pipeline_leaderboard.go @@ -22,8 +22,6 @@ import ( "strconv" "strings" - "github.com/gorhill/cronexpr" - "github.com/satori/go.uuid" "go.uber.org/zap" ) @@ -175,63 +173,21 @@ func (p *pipeline) leaderboardRecordWrite(logger *zap.Logger, session *session, } } - var authoritative bool - var sortOrder int64 - var resetSchedule sql.NullString - query := "SELECT authoritative, sort_order, reset_schedule FROM leaderboard WHERE id = $1" - logger.Debug("Leaderboard lookup", zap.String("query", query)) - err := p.db.QueryRow(query, incoming.LeaderboardId). - Scan(&authoritative, &sortOrder, &resetSchedule) - if err != nil { - logger.Error("Could not execute leaderboard record write metadata query", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error writing leaderboard record")) - return - } - - now := now() - updatedAt := timeToMs(now) - expiresAt := int64(0) - if resetSchedule.Valid { - expr, err := cronexpr.Parse(resetSchedule.String) - if err != nil { - logger.Error("Could not parse leaderboard reset schedule query", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error writing leaderboard record")) - return - } - expiresAt = timeToMs(expr.Next(now)) - } - - if authoritative == true { - session.Send(ErrorMessageBadInput(envelope.CollationId, "Cannot submit to authoritative leaderboard")) - return - } - - var scoreOpSql string - var scoreDelta int64 - var scoreAbs int64 + var op string + var value int64 switch incoming.Op.(type) { case *TLeaderboardRecordsWrite_LeaderboardRecordWrite_Incr: - scoreOpSql = "score = leaderboard_record.score + $17::BIGINT" - scoreDelta = incoming.GetIncr() - scoreAbs = incoming.GetIncr() + op = "incr" + value = incoming.GetIncr() case *TLeaderboardRecordsWrite_LeaderboardRecordWrite_Decr: - scoreOpSql = "score = leaderboard_record.score - $17::BIGINT" - scoreDelta = incoming.GetDecr() - scoreAbs = 0 - incoming.GetDecr() + op = "decr" + value = incoming.GetDecr() case *TLeaderboardRecordsWrite_LeaderboardRecordWrite_Set: - scoreOpSql = "score = $17::BIGINT" - scoreDelta = incoming.GetSet() - scoreAbs = incoming.GetSet() + op = "set" + value = incoming.GetSet() case *TLeaderboardRecordsWrite_LeaderboardRecordWrite_Best: - if sortOrder == 0 { - // Lower score is better. - scoreOpSql = "score = ((leaderboard_record.score + $17::BIGINT - abs(leaderboard_record.score - $17::BIGINT)) / 2)::BIGINT" - } else { - // Higher score is better. - scoreOpSql = "score = ((leaderboard_record.score + $17::BIGINT + abs(leaderboard_record.score - $17::BIGINT)) / 2)::BIGINT" - } - scoreDelta = incoming.GetBest() - scoreAbs = incoming.GetBest() + op = "best" + value = incoming.GetBest() case nil: session.Send(ErrorMessageBadInput(envelope.CollationId, "No leaderboard record write operator found")) return @@ -240,86 +196,16 @@ func (p *pipeline) leaderboardRecordWrite(logger *zap.Logger, session *session, return } - handle := session.handle.Load() - params := []interface{}{uuid.NewV4().Bytes(), incoming.LeaderboardId, session.userID.Bytes(), handle, session.lang} - if incoming.Location != "" { - params = append(params, incoming.Location) - } else { - params = append(params, nil) - } - if incoming.Timezone != "" { - params = append(params, incoming.Timezone) - } else { - params = append(params, nil) - } - params = append(params, 0, scoreAbs, 1) - if len(incoming.Metadata) != 0 { - params = append(params, incoming.Metadata) - } else { - params = append(params, nil) - } - params = append(params, 0, updatedAt, invertMs(updatedAt), expiresAt, 0, scoreDelta) - - query = `INSERT INTO leaderboard_record (id, leaderboard_id, owner_id, handle, lang, location, timezone, - rank_value, score, num_score, metadata, ranked_at, updated_at, updated_at_inverse, expires_at, banned_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, COALESCE($11, '{}'), $12, $13, $14, $15, $16) - ON CONFLICT (leaderboard_id, expires_at, owner_id) - DO UPDATE SET handle = $4, lang = $5, location = COALESCE($6, leaderboard_record.location), - timezone = COALESCE($7, leaderboard_record.timezone), ` + scoreOpSql + `, num_score = leaderboard_record.num_score + 1, - metadata = COALESCE($11, leaderboard_record.metadata), updated_at = $13` - logger.Debug("Leaderboard record write", zap.String("query", query)) - res, err := p.db.Exec(query, params...) - if err != nil { - logger.Error("Could not execute leaderboard record write query", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error writing leaderboard record")) - return - } - if rowsAffected, _ := res.RowsAffected(); rowsAffected == 0 { - logger.Error("Unexpected row count from leaderboard record write query") - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error writing leaderboard record")) - return - } - - var location sql.NullString - var timezone sql.NullString - var rankValue int64 - var score int64 - var numScore int64 - var metadata []byte - var rankedAt int64 - var bannedAt int64 - query = `SELECT location, timezone, rank_value, score, num_score, metadata, ranked_at, banned_at - FROM leaderboard_record - WHERE leaderboard_id = $1 - AND expires_at = $2 - AND owner_id = $3` - logger.Debug("Leaderboard record read", zap.String("query", query)) - err = p.db.QueryRow(query, incoming.LeaderboardId, expiresAt, session.userID.Bytes()). - Scan(&location, &timezone, &rankValue, &score, &numScore, &metadata, &rankedAt, &bannedAt) + record, code, err := leaderboardSubmit(logger, p.db, session.userID, incoming.LeaderboardId, session.userID, session.handle.Load(), session.lang, op, value, incoming.Location, incoming.Timezone, incoming.Metadata) if err != nil { - logger.Error("Could not execute leaderboard record read query", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error writing leaderboard record")) + session.Send(ErrorMessage(envelope.CollationId, code, err.Error())) return } session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_LeaderboardRecords{ LeaderboardRecords: &TLeaderboardRecords{ Records: []*LeaderboardRecord{ - &LeaderboardRecord{ - LeaderboardId: incoming.LeaderboardId, - OwnerId: session.userID.Bytes(), - Handle: handle, - Lang: session.lang, - Location: location.String, - Timezone: timezone.String, - Rank: rankValue, - Score: score, - NumScore: numScore, - Metadata: metadata, - RankedAt: rankedAt, - UpdatedAt: updatedAt, - ExpiresAt: expiresAt, - }, + record, }, // No cursor. }, @@ -456,403 +342,15 @@ func (p *pipeline) leaderboardRecordsFetch(logger *zap.Logger, session *session, func (p *pipeline) leaderboardRecordsList(logger *zap.Logger, session *session, envelope *Envelope) { incoming := envelope.GetLeaderboardRecordsList() - if len(incoming.LeaderboardId) == 0 { - session.Send(ErrorMessageBadInput(envelope.CollationId, "Leaderboard ID must be present")) - return - } - - limit := incoming.Limit - if limit == 0 { - limit = 10 - } else if limit < 10 || limit > 100 { - session.Send(ErrorMessageBadInput(envelope.CollationId, "Limit must be between 10 and 100")) - return - } - - var incomingCursor *leaderboardRecordListCursor - if len(incoming.Cursor) != 0 { - incomingCursor = &leaderboardRecordListCursor{} - if err := gob.NewDecoder(bytes.NewReader(incoming.Cursor)).Decode(incomingCursor); err != nil { - session.Send(ErrorMessageBadInput(envelope.CollationId, "Invalid cursor data")) - return - } - } - - var sortOrder int64 - var resetSchedule sql.NullString - query := "SELECT sort_order, reset_schedule FROM leaderboard WHERE id = $1" - logger.Debug("Leaderboard lookup", zap.String("query", query)) - err := p.db.QueryRow(query, incoming.LeaderboardId). - Scan(&sortOrder, &resetSchedule) + leaderboardRecords, outgoingCursor, code, err := leaderboardRecordsList(logger, p.db, session.userID, incoming) if err != nil { - logger.Error("Could not execute leaderboard records list metadata query", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) + session.Send(ErrorMessage(envelope.CollationId, code, err.Error())) return } - currentExpiresAt := int64(0) - if resetSchedule.Valid { - expr, err := cronexpr.Parse(resetSchedule.String) - if err != nil { - logger.Error("Could not parse leaderboard reset schedule query", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - currentExpiresAt = timeToMs(expr.Next(now())) - } - - query = `SELECT id, owner_id, handle, lang, location, timezone, - rank_value, score, num_score, metadata, ranked_at, updated_at, expires_at, banned_at - FROM leaderboard_record - WHERE leaderboard_id = $1 - AND expires_at = $2` - params := []interface{}{incoming.LeaderboardId, currentExpiresAt} - - returnCursor := true - switch incoming.Filter.(type) { - case *TLeaderboardRecordsList_OwnerId: - if incomingCursor != nil { - session.Send(ErrorMessageBadInput(envelope.CollationId, "Cursor not allowed with haystack query")) - return - } - // Haystack queries are executed in a separate flow. - p.loadLeaderboardRecordsHaystack(logger, session, envelope, incoming.LeaderboardId, incoming.GetOwnerId(), currentExpiresAt, limit, sortOrder, query, params) - return - case *TLeaderboardRecordsList_OwnerIds: - if incomingCursor != nil { - session.Send(ErrorMessageBadInput(envelope.CollationId, "Cursor not allowed with batch filter query")) - return - } - if len(incoming.GetOwnerIds().OwnerIds) < 1 || len(incoming.GetOwnerIds().OwnerIds) > 100 { - session.Send(ErrorMessageBadInput(envelope.CollationId, "Must be 1-100 owner IDs")) - return - } - statements := []string{} - for _, ownerId := range incoming.GetOwnerIds().OwnerIds { - params = append(params, ownerId) - statements = append(statements, "$"+strconv.Itoa(len(params))) - } - query += " AND owner_id IN (" + strings.Join(statements, ", ") + ")" - // Never return a cursor with this filter type. - returnCursor = false - case *TLeaderboardRecordsList_Lang: - query += " AND lang = $3" - params = append(params, incoming.GetLang()) - case *TLeaderboardRecordsList_Location: - query += " AND location = $3" - params = append(params, incoming.GetLocation()) - case *TLeaderboardRecordsList_Timezone: - query += " AND timezone = $3" - params = append(params, incoming.GetTimezone()) - case nil: - // No filter. - break - default: - session.Send(ErrorMessageBadInput(envelope.CollationId, "Unknown leaderboard record list filter")) - return - } - - if incomingCursor != nil { - count := len(params) - if sortOrder == 0 { - // Ascending leaderboard. - query += " AND (score, updated_at, id) > ($" + strconv.Itoa(count) + - ", $" + strconv.Itoa(count+1) + - ", $" + strconv.Itoa(count+2) + ")" - params = append(params, incomingCursor.Score, incomingCursor.UpdatedAt, incomingCursor.Id) - } else { - // Descending leaderboard. - query += " AND (score, updated_at_inverse, id) < ($" + strconv.Itoa(count) + - ", $" + strconv.Itoa(count+1) + - ", $" + strconv.Itoa(count+2) + ")" - params = append(params, incomingCursor.Score, invertMs(incomingCursor.UpdatedAt), incomingCursor.Id) - } - } - - if sortOrder == 0 { - // Ascending leaderboard, lower score is better. - query += " ORDER BY score ASC, updated_at ASC" - } else { - // Descending leaderboard, higher score is better. - query += " ORDER BY score DESC, updated_at_inverse DESC" - } - - params = append(params, limit+1) - query += " LIMIT $" + strconv.Itoa(len(params)) - - logger.Debug("Leaderboard records list", zap.String("query", query)) - rows, err := p.db.Query(query, params...) - if err != nil { - logger.Error("Could not execute leaderboard records list query", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - defer rows.Close() - - leaderboardRecords := []*LeaderboardRecord{} - var outgoingCursor []byte - - var id []byte - var ownerId []byte - var handle string - var lang string - var location sql.NullString - var timezone sql.NullString - var rankValue int64 - var score int64 - var numScore int64 - var metadata []byte - var rankedAt int64 - var updatedAt int64 - var expiresAt int64 - var bannedAt int64 - for rows.Next() { - if returnCursor && int64(len(leaderboardRecords)) >= limit { - cursorBuf := new(bytes.Buffer) - newCursor := &leaderboardRecordListCursor{ - Score: score, - UpdatedAt: updatedAt, - Id: id, - } - if gob.NewEncoder(cursorBuf).Encode(newCursor); err != nil { - logger.Error("Error creating leaderboard records list cursor", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - outgoingCursor = cursorBuf.Bytes() - break - } - - err = rows.Scan(&id, &ownerId, &handle, &lang, &location, &timezone, - &rankValue, &score, &numScore, &metadata, &rankedAt, &updatedAt, &expiresAt, &bannedAt) - if err != nil { - logger.Error("Could not scan leaderboard records list query results", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - - leaderboardRecords = append(leaderboardRecords, &LeaderboardRecord{ - LeaderboardId: incoming.LeaderboardId, - OwnerId: ownerId, - Handle: handle, - Lang: lang, - Location: location.String, - Timezone: timezone.String, - Rank: rankValue, - Score: score, - NumScore: numScore, - Metadata: metadata, - RankedAt: rankedAt, - UpdatedAt: updatedAt, - ExpiresAt: expiresAt, - }) - } - if err = rows.Err(); err != nil { - logger.Error("Could not process leaderboard records list query results", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - - p.normalizeAndSendLeaderboardRecords(logger, session, envelope, leaderboardRecords, outgoingCursor) -} - -func (p *pipeline) loadLeaderboardRecordsHaystack(logger *zap.Logger, session *session, envelope *Envelope, leaderboardId, findOwnerId []byte, currentExpiresAt, limit, sortOrder int64, query string, params []interface{}) { - // Find the owner's record. - var id []byte - var score int64 - var updatedAt int64 - findQuery := `SELECT id, score, updated_at - FROM leaderboard_record - WHERE leaderboard_id = $1 - AND expires_at = $2 - AND owner_id = $3` - logger.Debug("Leaderboard record find", zap.String("query", findQuery)) - err := p.db.QueryRow(findQuery, leaderboardId, currentExpiresAt, findOwnerId).Scan(&id, &score, &updatedAt) - if err != nil { - // TODO handle errors other than record not found? - session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_LeaderboardRecords{LeaderboardRecords: &TLeaderboardRecords{ - Records: []*LeaderboardRecord{}, - // No cursor. - }}}) - return - } - - // First half. - count := len(params) - firstQuery := query - firstParams := make([]interface{}, len(params)) - copy(firstParams, params) - if sortOrder == 0 { - // Lower score is better, but get in reverse order from current user to get those immediately above. - firstQuery += " AND (score, updated_at_inverse, id) <= ($" + strconv.Itoa(count+1) + - ", $" + strconv.Itoa(count+2) + - ", $" + strconv.Itoa(count+3) + ") ORDER BY score DESC, updated_at_inverse DESC" - firstParams = append(firstParams, score, invertMs(updatedAt), id) - } else { - // Higher score is better. - firstQuery += " AND (score, updated_at, id) >= ($" + strconv.Itoa(count+1) + - ", $" + strconv.Itoa(count+2) + - ", $" + strconv.Itoa(count+3) + ") ORDER BY score ASC, updated_at ASC" - firstParams = append(firstParams, score, updatedAt, id) - } - firstParams = append(firstParams, int64(limit/2)) - firstQuery += " LIMIT $" + strconv.Itoa(len(firstParams)) - - logger.Debug("Leaderboard records list", zap.String("query", firstQuery)) - firstRows, err := p.db.Query(firstQuery, firstParams...) - if err != nil { - logger.Error("Could not execute leaderboard records list query", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - defer firstRows.Close() - - leaderboardRecords := []*LeaderboardRecord{} - - var ownerId []byte - var handle string - var lang string - var location sql.NullString - var timezone sql.NullString - var rankValue int64 - var numScore int64 - var metadata []byte - var rankedAt int64 - var expiresAt int64 - var bannedAt int64 - for firstRows.Next() { - err = firstRows.Scan(&id, &ownerId, &handle, &lang, &location, &timezone, - &rankValue, &score, &numScore, &metadata, &rankedAt, &updatedAt, &expiresAt, &bannedAt) - if err != nil { - logger.Error("Could not scan leaderboard records list query results", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - - leaderboardRecords = append(leaderboardRecords, &LeaderboardRecord{ - LeaderboardId: leaderboardId, - OwnerId: ownerId, - Handle: handle, - Lang: lang, - Location: location.String, - Timezone: timezone.String, - Rank: rankValue, - Score: score, - NumScore: numScore, - Metadata: metadata, - RankedAt: rankedAt, - UpdatedAt: updatedAt, - ExpiresAt: expiresAt, - }) - } - if err = firstRows.Err(); err != nil { - logger.Error("Could not process leaderboard records list query results", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - - // We went 'up' on the leaderboard, so reverse the first half of records. - for left, right := 0, len(leaderboardRecords)-1; left < right; left, right = left+1, right-1 { - leaderboardRecords[left], leaderboardRecords[right] = leaderboardRecords[right], leaderboardRecords[left] - } - - // Second half. - secondQuery := query - secondParams := make([]interface{}, len(params)) - copy(secondParams, params) - if sortOrder == 0 { - // Lower score is better. - secondQuery += " AND (score, updated_at, id) > ($" + strconv.Itoa(count+1) + - ", $" + strconv.Itoa(count+2) + - ", $" + strconv.Itoa(count+3) + ") ORDER BY score ASC, updated_at ASC" - secondParams = append(secondParams, score, updatedAt, id) - } else { - // Higher score is better. - secondQuery += " AND (score, updated_at_inverse, id) < ($" + strconv.Itoa(count+1) + - ", $" + strconv.Itoa(count+2) + - ", $" + strconv.Itoa(count+3) + ") ORDER BY score DESC, updated_at DESC" - secondParams = append(secondParams, score, invertMs(updatedAt), id) - } - secondParams = append(secondParams, limit-int64(len(leaderboardRecords))+2) - secondQuery += " LIMIT $" + strconv.Itoa(len(secondParams)) - - logger.Debug("Leaderboard records list", zap.String("query", secondQuery)) - secondRows, err := p.db.Query(secondQuery, secondParams...) - if err != nil { - logger.Error("Could not execute leaderboard records list query", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - defer secondRows.Close() - - var outgoingCursor []byte - - for secondRows.Next() { - if int64(len(leaderboardRecords)) >= limit { - cursorBuf := new(bytes.Buffer) - newCursor := &leaderboardRecordListCursor{ - Score: score, - UpdatedAt: updatedAt, - Id: id, - } - if gob.NewEncoder(cursorBuf).Encode(newCursor); err != nil { - logger.Error("Error creating leaderboard records list cursor", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - outgoingCursor = cursorBuf.Bytes() - break - } - - err = secondRows.Scan(&id, &ownerId, &handle, &lang, &location, &timezone, - &rankValue, &score, &numScore, &metadata, &rankedAt, &updatedAt, &expiresAt, &bannedAt) - if err != nil { - logger.Error("Could not scan leaderboard records list query results", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - - leaderboardRecords = append(leaderboardRecords, &LeaderboardRecord{ - LeaderboardId: leaderboardId, - OwnerId: ownerId, - Handle: handle, - Lang: lang, - Location: location.String, - Timezone: timezone.String, - Rank: rankValue, - Score: score, - NumScore: numScore, - Metadata: metadata, - RankedAt: rankedAt, - UpdatedAt: updatedAt, - ExpiresAt: expiresAt, - }) - } - if err = secondRows.Err(); err != nil { - logger.Error("Could not process leaderboard records list query results", zap.Error(err)) - session.Send(ErrorMessageRuntimeException(envelope.CollationId, "Error loading leaderboard records")) - return - } - - p.normalizeAndSendLeaderboardRecords(logger, session, envelope, leaderboardRecords, outgoingCursor) -} - -func (p *pipeline) normalizeAndSendLeaderboardRecords(logger *zap.Logger, session *session, envelope *Envelope, records []*LeaderboardRecord, cursor []byte) { - var bestRank int64 - for _, record := range records { - if record.Rank != 0 && record.Rank < bestRank { - bestRank = record.Rank - } - } - if bestRank != 0 { - for i := int64(0); i < int64(len(records)); i++ { - records[i].Rank = bestRank + i - } - } - session.Send(&Envelope{CollationId: envelope.CollationId, Payload: &Envelope_LeaderboardRecords{LeaderboardRecords: &TLeaderboardRecords{ - Records: records, - Cursor: cursor, + Records: leaderboardRecords, + Cursor: outgoingCursor, }}}) } diff --git a/server/runtime_nakama_module.go b/server/runtime_nakama_module.go index c6f51626f329ee072de877d616df31885e67fe4c..9f7dacaf0f003750397f5743f379cf9ea9584493 100644 --- a/server/runtime_nakama_module.go +++ b/server/runtime_nakama_module.go @@ -76,48 +76,152 @@ func NewNakamaModule(logger *zap.Logger, db *sql.DB, l *lua.LState, notification func (n *NakamaModule) Loader(l *lua.LState) int { mod := l.SetFuncs(l.NewTable(), map[string]lua.LGFunction{ - "uuid_v4": n.uuidV4, - "uuid_bytes_to_string": n.uuidBytesToString, - "uuid_string_to_bytes": n.uuidStringToBytes, - "http_request": n.httpRequest, - "json_encode": n.jsonEncode, - "json_decode": n.jsonDecode, - "base64_encode": n.base64Encode, - "base64_decode": n.base64Decode, - "base16_encode": n.base16Encode, - "base16_decode": n.base16decode, - "logger_info": n.loggerInfo, - "logger_warn": n.loggerWarn, - "logger_error": n.loggerError, - "register_rpc": n.registerRPC, - "register_before": n.registerBefore, - "register_after": n.registerAfter, - "register_http": n.registerHTTP, - "users_fetch_id": n.usersFetchId, - "users_fetch_handle": n.usersFetchHandle, - "users_update": n.usersUpdate, - "users_ban": n.usersBan, - "storage_list": n.storageList, - "storage_fetch": n.storageFetch, - "storage_write": n.storageWrite, - "storage_update": n.storageUpdate, - "storage_remove": n.storageRemove, - "leaderboard_create": n.leaderboardCreate, - "leaderboard_submit_incr": n.leaderboardSubmitIncr, - "leaderboard_submit_decr": n.leaderboardSubmitDecr, - "leaderboard_submit_set": n.leaderboardSubmitSet, - "leaderboard_submit_best": n.leaderboardSubmitBest, - "groups_create": n.groupsCreate, - "groups_update": n.groupsUpdate, - "group_users_list": n.groupUsersList, - "groups_user_list": n.groupsUserList, - "notifications_send_id": n.notificationsSendId, + "sql_exec": n.sqlExec, + "sql_query": n.sqlQuery, + "uuid_v4": n.uuidV4, + "uuid_bytes_to_string": n.uuidBytesToString, + "uuid_string_to_bytes": n.uuidStringToBytes, + "http_request": n.httpRequest, + "json_encode": n.jsonEncode, + "json_decode": n.jsonDecode, + "base64_encode": n.base64Encode, + "base64_decode": n.base64Decode, + "base16_encode": n.base16Encode, + "base16_decode": n.base16decode, + "logger_info": n.loggerInfo, + "logger_warn": n.loggerWarn, + "logger_error": n.loggerError, + "register_rpc": n.registerRPC, + "register_before": n.registerBefore, + "register_after": n.registerAfter, + "register_http": n.registerHTTP, + "users_fetch_id": n.usersFetchId, + "users_fetch_handle": n.usersFetchHandle, + "users_update": n.usersUpdate, + "users_ban": n.usersBan, + "storage_list": n.storageList, + "storage_fetch": n.storageFetch, + "storage_write": n.storageWrite, + "storage_update": n.storageUpdate, + "storage_remove": n.storageRemove, + "leaderboard_create": n.leaderboardCreate, + "leaderboard_submit_incr": n.leaderboardSubmitIncr, + "leaderboard_submit_decr": n.leaderboardSubmitDecr, + "leaderboard_submit_set": n.leaderboardSubmitSet, + "leaderboard_submit_best": n.leaderboardSubmitBest, + "leaderboard_records_list_user": n.leaderboardRecordsListUser, + "leaderboard_records_list_users": n.leaderboardRecordsListUsers, + "groups_create": n.groupsCreate, + "groups_update": n.groupsUpdate, + "group_users_list": n.groupUsersList, + "groups_user_list": n.groupsUserList, + "notifications_send_id": n.notificationsSendId, }) l.Push(mod) return 1 } +func (n *NakamaModule) sqlExec(l *lua.LState) int { + query := l.CheckString(1) + if query == "" { + l.ArgError(1, "expects query string") + return 0 + } + paramsTable := l.OptTable(2, l.NewTable()) + if paramsTable == nil { + l.ArgError(2, "expects params table") + return 0 + } + var params []interface{} + if paramsTable.Len() != 0 { + var ok bool + params, ok = convertLuaValue(paramsTable).([]interface{}) + if !ok { + l.ArgError(2, "expects a list of params as a table") + return 0 + } + } + + result, err := n.db.Exec(query, params...) + if err != nil { + l.RaiseError("sql exec error: %v", err.Error()) + return 0 + } + count, err := result.RowsAffected() + if err != nil { + l.RaiseError("sql exec rows affected error: %v", err.Error()) + return 0 + } + + l.Push(lua.LNumber(count)) + return 1 +} + +func (n *NakamaModule) sqlQuery(l *lua.LState) int { + query := l.CheckString(1) + if query == "" { + l.ArgError(1, "expects query string") + return 0 + } + paramsTable := l.OptTable(2, l.NewTable()) + if paramsTable == nil { + l.ArgError(2, "expects params table") + return 0 + } + var params []interface{} + if paramsTable.Len() != 0 { + var ok bool + params, ok = convertLuaValue(paramsTable).([]interface{}) + if !ok { + l.ArgError(2, "expects a list of params as a table") + return 0 + } + } + + rows, err := n.db.Query(query, params...) + if err != nil { + l.RaiseError("sql query error: %v", err.Error()) + return 0 + } + defer rows.Close() + + resultColumns, err := rows.Columns() + if err != nil { + l.RaiseError("sql query column lookup error: %v", err.Error()) + return 0 + } + resultColumnCount := len(resultColumns) + resultRows := make([][]interface{}, 0) + for rows.Next() { + resultRowValues := make([]interface{}, resultColumnCount) + resultRowPointers := make([]interface{}, resultColumnCount) + for i, _ := range resultRowValues { + resultRowPointers[i] = &resultRowValues[i] + } + if err = rows.Scan(resultRowPointers...); err != nil { + l.RaiseError("sql query scan error: %v", err.Error()) + return 0 + } + resultRows = append(resultRows, resultRowValues) + } + if err = rows.Err(); err != nil { + l.RaiseError("sql query row scan error: %v", err.Error()) + return 0 + } + + rt := l.NewTable() + for i, r := range resultRows { + rowTable := l.NewTable() + for j, col := range resultColumns { + rowTable.RawSetString(col, convertValue(l, r[j])) + } + rt.RawSetInt(i+1, rowTable) + } + l.Push(rt) + return 1 +} + func (n *NakamaModule) uuidV4(l *lua.LState) int { // TODO ensure there were no arguments to the function l.Push(lua.LString(uuid.NewV4().String())) @@ -1288,18 +1392,15 @@ func (n *NakamaModule) leaderboardCreate(l *lua.LState) int { _, err = leaderboardCreate(n.logger, n.db, []byte(id), sort, reset, string(metadataBytes), authoritative) if err != nil { l.RaiseError(fmt.Sprintf("failed to create leaderboard: %s", err.Error())) - return 0 } return 0 } func (n *NakamaModule) leaderboardSubmitIncr(l *lua.LState) int { - return n.leaderboardSubmit(l, "incr") } func (n *NakamaModule) leaderboardSubmitDecr(l *lua.LState) int { - return n.leaderboardSubmit(l, "decr") } func (n *NakamaModule) leaderboardSubmitSet(l *lua.LState) int { @@ -1332,7 +1433,7 @@ func (n *NakamaModule) leaderboardSubmit(l *lua.LState, op string) int { return 0 } - record, err := leaderboardSubmit(n.logger, n.db, uuid.Nil, []byte(leaderboardID), ownerID, handle, lang, op, value, location, timezone, metadataBytes) + record, _, err := leaderboardSubmit(n.logger, n.db, uuid.Nil, []byte(leaderboardID), ownerID, handle, lang, op, value, location, timezone, metadataBytes) if err != nil { l.RaiseError(fmt.Sprintf("failed to submit leaderboard record: %s", err.Error())) return 0 @@ -1356,6 +1457,161 @@ func (n *NakamaModule) leaderboardSubmit(l *lua.LState, op string) int { return 1 } +func (n *NakamaModule) leaderboardRecordsListUser(l *lua.LState) int { + leaderboardID := l.CheckString(1) + if leaderboardID == "" { + l.ArgError(1, "expects a valid leaderboard id") + return 0 + } + user := l.CheckString(2) + if user == "" { + l.ArgError(1, "expects a valid user ID") + return 0 + } + userID, err := uuid.FromString(user) + if err != nil { + l.ArgError(1, "expects a valid user ID") + return 0 + } + limit := l.CheckInt64(3) + if limit == 0 { + l.ArgError(2, "expects a valid limit 10-100") + return 0 + } + + // Construct the operation. + list := &TLeaderboardRecordsList{ + LeaderboardId: []byte(leaderboardID), + Filter: &TLeaderboardRecordsList_OwnerId{ + OwnerId: userID.Bytes(), + }, + Limit: limit, + } + + records, newCursor, _, err := leaderboardRecordsList(n.logger, n.db, uuid.Nil, list) + if err != nil { + l.RaiseError(fmt.Sprintf("failed to list leadeboard records: %s", err.Error())) + return 0 + } + + // Convert and push the values. + lv := l.NewTable() + for i, r := range records { + // Convert UUIDs to string representation. + uid, _ := uuid.FromBytes(r.OwnerId) + r.OwnerId = []byte(uid.String()) + rm := structs.Map(r) + + metadataMap := make(map[string]interface{}) + err = json.Unmarshal(r.Metadata, &metadataMap) + if err != nil { + l.RaiseError(fmt.Sprintf("failed to convert metadata to json: %s", err.Error())) + return 0 + } + + rt := ConvertMap(l, rm) + rt.RawSetString("Metadata", ConvertMap(l, metadataMap)) + lv.RawSetInt(i+1, rt) + } + l.Push(lv) + + if newCursor == nil { + l.Push(lua.LNil) + } else { + l.Push(lua.LString(newCursor)) + } + + return 2 +} + +func (n *NakamaModule) leaderboardRecordsListUsers(l *lua.LState) int { + leaderboardID := l.CheckString(1) + if leaderboardID == "" { + l.ArgError(1, "expects a valid leaderboard id") + return 0 + } + users := l.CheckTable(2) + if users == nil { + l.ArgError(2, "expects a valid list of user ids") + return 0 + } + limit := l.CheckInt64(3) + if limit == 0 { + l.ArgError(2, "expects a valid limit 10-100") + return 0 + } + cursor := l.OptString(4, "") + + // Construct the operation. + list := &TLeaderboardRecordsList{ + LeaderboardId: []byte(leaderboardID), + Filter: &TLeaderboardRecordsList_OwnerIds{ + OwnerIds: &TLeaderboardRecordsList_Owners{ + OwnerIds: make([][]byte, 0), + }, + }, + Limit: limit, + } + if cursor != "" { + list.Cursor = []byte(cursor) + } + + conversionError := "" + users.ForEach(func(k lua.LValue, v lua.LValue) { + if v.Type() != lua.LTString { + conversionError = "expects user ids to be strings" + return + } + + u, err := uuid.FromString(v.String()) + if err != nil { + conversionError = "expects user ids to be valid" + return + } + list.GetOwnerIds().OwnerIds = append(list.GetOwnerIds().OwnerIds, u.Bytes()) + }) + + if conversionError != "" { + l.ArgError(2, conversionError) + return 0 + } + + records, newCursor, _, err := leaderboardRecordsList(n.logger, n.db, uuid.Nil, list) + if err != nil { + l.RaiseError(fmt.Sprintf("failed to list leadeboard records: %s", err.Error())) + return 0 + } + + // Convert and push the values. + lv := l.NewTable() + for i, r := range records { + // Convert UUIDs to string representation. + uid, _ := uuid.FromBytes(r.OwnerId) + r.OwnerId = []byte(uid.String()) + rm := structs.Map(r) + + metadataMap := make(map[string]interface{}) + err = json.Unmarshal(r.Metadata, &metadataMap) + if err != nil { + l.RaiseError(fmt.Sprintf("failed to convert metadata to json: %s", err.Error())) + return 0 + } + + rt := ConvertMap(l, rm) + rt.RawSetString("Metadata", ConvertMap(l, metadataMap)) + lv.RawSetInt(i+1, rt) + } + l.Push(lv) + + if newCursor == nil { + l.Push(lua.LNil) + } else { + l.Push(lua.LString(newCursor)) + } + + return 2 +} + func (n *NakamaModule) groupsCreate(l *lua.LState) int { groupsTable := l.CheckTable(1) if groupsTable == nil || groupsTable.Len() == 0 { diff --git a/tests/modules/e2e_runtime.lua b/tests/modules/e2e_runtime.lua index 2391006cc4d79bb604ef8c717e417372d3b27f4d..bd51d346e0f2eecb59d2e6ec3f5492c25ce22baf 100644 --- a/tests/modules/e2e_runtime.lua +++ b/tests/modules/e2e_runtime.lua @@ -41,6 +41,23 @@ function print_r(arr, indentLevel) return str end +-- qwertyuiopasdfghjklzxcvbnm +for i = 97, 122 do table.insert(charset, string.char(i)) end + +function string.random(length) + math.randomseed(os.time()) + + if length > 0 then + return string.random(length - 1) .. charset[math.random(1, #charset)] + else + return "" + end +end + +function string.ends(str, with) + return with == '' or string.sub(str, -string.len(with)) == with +end + --[[ Nakama module ]]-- @@ -76,6 +93,56 @@ do assert(status == true) end +-- leaderboard_records_list_users +do + local id = nk.uuid_v4() + local status, res = pcall(nk.leaderboard_create, id, "desc", "0 0 * * 1", {}, true) + if not status then + print(res) + end + assert(status == true) + + local status, res = pcall(nk.leaderboard_submit_set, id, 22, "4c2ae592-b2a7-445e-98ec-697694478b1c", "02ebb2c8") + if not status then + print(res) + end + assert(status == true) + + local status, res, cursor = pcall(nk.leaderboard_records_list_users, id, {"4c2ae592-b2a7-445e-98ec-697694478b1c"}, 10, nil) + if not status then + print(res) + end + assert(#res == 1) + assert(res[1].OwnerId == "4c2ae592-b2a7-445e-98ec-697694478b1c") + assert(res[1].Score == 22) + assert(cursor == nil) +end + +-- leaderboard_records_list_user +do + local id = nk.uuid_v4() + local status, res = pcall(nk.leaderboard_create, id, "desc", "0 0 * * 1", {}, true) + if not status then + print(res) + end + assert(status == true) + + local status, res = pcall(nk.leaderboard_submit_set, id, 33, "4c2ae592-b2a7-445e-98ec-697694478b1c", "02ebb2c8") + if not status then + print(res) + end + assert(status == true) + + local status, res, cursor = pcall(nk.leaderboard_records_list_user, id, "4c2ae592-b2a7-445e-98ec-697694478b1c", 10) + if not status then + print(res) + end + assert(#res == 1) + assert(res[1].OwnerId == "4c2ae592-b2a7-445e-98ec-697694478b1c") + assert(res[1].Score == 33) + assert(cursor == nil) +end + -- logger_info do local message = nk.logger_info(("%q"):format("INFO logger.")) @@ -275,3 +342,88 @@ do assert(objectDecode, "'objectDecode' must not be nil") assert(objectDecode == '{"hello": "world"}', '"objectDecode" must equal {"hello": "world"}') end + +-- sql_exec and sql_query +do + -- Table names cannot start with a number so we can't use our usual UUID here. + local t = string.random(20) + + local query = "CREATE TABLE " .. t .. " ( foo VARCHAR(20), bar BIGINT )" + local params = {} + local status, result = pcall(nk.sql_exec, query, params) + if not status then + print(result) + end + assert(result == 0) + + local query = "INSERT INTO " .. t .. " (foo, bar) VALUES ($1, $2), ($3, $4), ($5, $6)" + local params = {"foo1", 1, "foo2", 2, "foo3", 3} + local status, result = pcall(nk.sql_exec, query, params) + if not status then + print(result) + end + assert(result == 3) + + local query = "SELECT * FROM " .. t .. " WHERE bar = $1" + local params = {2} + local status, result = pcall(nk.sql_query, query, params) + if not status then + print(result) + end + assert(#result == 1) + assert(result[1].foo == "foo2") + assert(result[1].bar == 2) + + local query = "SELECT * FROM " .. t .. " WHERE bar >= $1 ORDER BY bar DESC" + local params = {2} + local status, result = pcall(nk.sql_query, query, params) + if not status then + print(result) + end + assert(#result == 2) + assert(result[1].foo == "foo3") + assert(result[1].bar == 3) + assert(result[2].foo == "foo2") + assert(result[2].bar == 2) + + local query = "DELETE FROM " .. t .. " WHERE bar = $1" + local params = {2} + local status, result = pcall(nk.sql_exec, query, params) + if not status then + print(result) + end + assert(result == 1) + + local status, result = pcall(nk.sql_exec, query, params) + if not status then + print(result) + end + assert(result == 0) + + local query = "SELECT * FROM " .. t .. " WHERE bar >= $1 ORDER BY bar DESC" + local params = {2} + local status, result = pcall(nk.sql_query, query, params) + if not status then + print(result) + end + assert(#result == 1) + assert(result[1].foo == "foo3") + assert(result[1].bar == 3) + + local query = "DROP TABLE " .. t + local params = {} + local status, result = pcall(nk.sql_exec, query, params) + if not status then + print(result) + end + assert(result == 0) + + local query = "SELECT * FROM " .. t + local params = {} + local status, result = pcall(nk.sql_query, query, params) + if not status then + print(result) + end + assert(not status) + assert(string.ends(result, 'sql query error: pq: table "' .. t .. '" does not exist')) +end