diff --git a/CHANGELOG.md b/CHANGELOG.md index b18848dae1fa5e0d36be2c96a26ee3febdb934ad..0c3f1815abc9a33f4bffafe382b10b0a821ed400 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,9 +8,13 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr - New `bit32` module available in the code runtime. - New code runtime function to create MD5 hashes. - New code runtime function to create SHA256 hashes. +- Runtime stream user list function now allows filtering hidden presences. +- Allow optional request body compression on all API requests. ### Changed - Reduce the frequency of socket checks on known active connections. +- Deleting a record from a leaderboard that does not exist now succeeds. +- Notification listings now use more accurate time in cacheable cursors. ## [2.0.2] - 2018-07-09 ### Added diff --git a/server/api.go b/server/api.go index 87a3e80df6b2f2b6a5c8c40b0cab8e07e7de56e4..8f39cb7a4884be365c990606d1b2d0f5de8ada1c 100644 --- a/server/api.go +++ b/server/api.go @@ -26,6 +26,8 @@ import ( "crypto/tls" + "compress/flate" + "compress/gzip" "github.com/dgrijalva/jwt-go" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/ptypes/empty" @@ -160,12 +162,14 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, j // Default to passing request to GRPC Gateway. // Enable max size check on requests coming arriving the gateway. // Enable compression on responses sent by the gateway. - handlerWithGzip := handlers.CompressHandler(handlerWithStats) + // Enable decompression on requests received by the gateway. + handlerWithDecompressRequest := decompressHandler(logger, handlerWithStats) + handlerWithCompressResponse := handlers.CompressHandler(handlerWithDecompressRequest) maxMessageSizeBytes := config.GetSocket().MaxMessageSizeBytes handlerWithMaxBody := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Check max body size before decompressing incoming request body. r.Body = http.MaxBytesReader(w, r.Body, maxMessageSizeBytes) - handlerWithGzip.ServeHTTP(w, r) + handlerWithCompressResponse.ServeHTTP(w, r) }) grpcGatewayRouter.NewRoute().Handler(handlerWithMaxBody) @@ -452,3 +456,22 @@ func parseToken(hmacSecretByte []byte, tokenString string) (userID uuid.UUID, us } return userID, claims["usn"].(string), int64(claims["exp"].(float64)), true } + +func decompressHandler(logger *zap.Logger, h http.Handler) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.Header.Get("Content-Encoding") { + case "gzip": + gr, err := gzip.NewReader(r.Body) + if err != nil { + logger.Debug("Error processing gzip request body, attempting to read uncompressed", zap.Error(err)) + break + } + r.Body = gr + case "deflate": + r.Body = flate.NewReader(r.Body) + default: + // No request compression. + } + h.ServeHTTP(w, r) + }) +} diff --git a/server/core_leaderboard.go b/server/core_leaderboard.go index 8d7978174a547d774aa0e4ba0931f5b8a497284e..de446695919613673d92ae4fe9d6ab707ead8b83 100644 --- a/server/core_leaderboard.go +++ b/server/core_leaderboard.go @@ -428,7 +428,7 @@ func LeaderboardRecordWrite(logger *zap.Logger, db *sql.DB, leaderboardCache Lea func LeaderboardRecordDelete(logger *zap.Logger, db *sql.DB, leaderboardCache LeaderboardCache, caller uuid.UUID, leaderboardId, ownerId string) error { leaderboard := leaderboardCache.Get(leaderboardId) if leaderboard == nil { - return ErrLeaderboardNotFound + return nil } if leaderboard.Authoritative && caller != uuid.Nil { diff --git a/server/core_notification.go b/server/core_notification.go index 8d6921f387debc994e42808d797fa9ea1f6d3413..9732b2dbdbcde7a32240e31f7e06fb81382abee9 100644 --- a/server/core_notification.go +++ b/server/core_notification.go @@ -28,6 +28,7 @@ import ( "github.com/lib/pq" "github.com/satori/go.uuid" "go.uber.org/zap" + "time" ) const ( @@ -91,8 +92,8 @@ func NotificationList(logger *zap.Logger, db *sql.DB, userID uuid.UUID, limit in cursorQuery := " " if nc != nil && nc.NotificationID != nil { - cursorQuery = " AND (user_id, create_time, id) > ($1::UUID, CAST($3::BIGINT AS TIMESTAMPTZ), $4::UUID)" - params = append(params, nc.CreateTime, uuid.FromBytesOrNil(nc.NotificationID)) + cursorQuery = " AND (user_id, create_time, id) > ($1::UUID, $3::TIMESTAMPTZ, $4::UUID)" + params = append(params, pq.NullTime{Time: time.Unix(0, nc.CreateTime), Valid: true}, uuid.FromBytesOrNil(nc.NotificationID)) } rows, err := db.Query(` @@ -108,6 +109,7 @@ ORDER BY create_time ASC`+limitQuery, params...) defer rows.Close() notifications := make([]*api.Notification, 0) + var lastCreateTime int64 for rows.Next() { no := &api.Notification{Persistent: true, CreateTime: ×tamp.Timestamp{}} var createTime pq.NullTime @@ -116,6 +118,7 @@ ORDER BY create_time ASC`+limitQuery, params...) return nil, err } + lastCreateTime = createTime.Time.UnixNano() no.CreateTime.Seconds = createTime.Time.Unix() if no.SenderId == uuid.Nil.String() { no.SenderId = "" @@ -140,7 +143,7 @@ ORDER BY create_time ASC`+limitQuery, params...) lastNotification := notifications[len(notifications)-1] newCursor := ¬ificationCacheableCursor{ NotificationID: uuid.FromStringOrNil(lastNotification.Id).Bytes(), - CreateTime: lastNotification.CreateTime.Seconds, + CreateTime: lastCreateTime, } if err := gob.NewEncoder(cursorBuf).Encode(newCursor); err != nil { logger.Error("Could not create new cursor.", zap.Error(err)) diff --git a/server/pipeline_channel.go b/server/pipeline_channel.go index 0e2954554b3b1393e99e0a4a8148c4832a36a578..153eb47b0abcceaceacfb220835950242ca1c67e 100644 --- a/server/pipeline_channel.go +++ b/server/pipeline_channel.go @@ -205,7 +205,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt } // List current presences, not including hidden ones. - presences := p.tracker.ListByStream(stream, false) + presences := p.tracker.ListByStream(stream, false, true) // If the topic join is a DM check if we should notify the other user. // Only new presences are allowed to send notifications to avoid duplicates. diff --git a/server/pipeline_match.go b/server/pipeline_match.go index dff55794d7f3f6b2d247bf30341c5a98a97803cf..6d02685ab06d9904887dbc0e42b70c2e0c4c6bcc 100644 --- a/server/pipeline_match.go +++ b/server/pipeline_match.go @@ -211,7 +211,7 @@ func (p *Pipeline) matchJoin(logger *zap.Logger, session Session, envelope *rtap } // Whether the user has just (successfully) joined the match or was already a member, return the match info anyway. - ps := p.tracker.ListByStream(stream, true) + ps := p.tracker.ListByStream(stream, false, true) presences := make([]*rtapi.UserPresence, 0, len(ps)) for _, p := range ps { if isNew && p.UserID == session.UserID() && p.ID.SessionID == session.ID() { diff --git a/server/pipeline_status.go b/server/pipeline_status.go index 93e1b952b9da8d55a6e5ddb3e62567e3ea7a624f..4d85b86a839f6f891ffc15f386fac00f8ca99a56 100644 --- a/server/pipeline_status.go +++ b/server/pipeline_status.go @@ -86,7 +86,7 @@ func (p *Pipeline) statusFollow(logger *zap.Logger, session Session, envelope *r return } - ps := p.tracker.ListByStream(stream, false) + ps := p.tracker.ListByStream(stream, false, true) for _, p := range ps { presences = append(presences, &rtapi.UserPresence{ UserId: p.UserID.String(), diff --git a/server/runtime_nakama_module.go b/server/runtime_nakama_module.go index 14062834f20a2d17d447bb4b4cf4fdde4ba7f6d3..0a7ed1884d61c063ec7ad8bceecb4a885bf1ee86 100644 --- a/server/runtime_nakama_module.go +++ b/server/runtime_nakama_module.go @@ -1678,7 +1678,12 @@ func (n *NakamaModule) streamUserList(l *lua.LState) int { return 0 } - presences := n.tracker.ListByStream(stream, true) + // Optional argument to include hidden presences in the list or not, default true. + includeHidden := l.OptBool(2, true) + // Optional argument to include not hidden presences in the list or not, default true. + includeNotHidden := l.OptBool(3, true) + + presences := n.tracker.ListByStream(stream, includeHidden, includeNotHidden) presencesTable := l.CreateTable(len(presences), 0) for i, p := range presences { diff --git a/server/tracker.go b/server/tracker.go index e0c7264b8fe46532654834783af23484254c7a46..af5e0dff3662853480242604f3fa3400d0d6f7de 100644 --- a/server/tracker.go +++ b/server/tracker.go @@ -102,8 +102,8 @@ type Tracker interface { GetLocalBySessionIDStreamUserID(sessionID uuid.UUID, stream PresenceStream, userID uuid.UUID) *PresenceMeta // Check if a single presence on any node exists. GetBySessionIDStreamUserID(node string, sessionID uuid.UUID, stream PresenceStream, userID uuid.UUID) *PresenceMeta - // List presences by stream, optionally include hidden ones. - ListByStream(stream PresenceStream, includeHidden bool) []*Presence + // List presences by stream, optionally include hidden ones and not hidden ones. + ListByStream(stream PresenceStream, includeHidden bool, includeNotHidden bool) []*Presence // Fast lookup of local session IDs to use for message delivery. ListLocalSessionIDByStream(stream PresenceStream) []uuid.UUID @@ -548,7 +548,7 @@ func (t *LocalTracker) GetBySessionIDStreamUserID(node string, sessionID uuid.UU return &meta } -func (t *LocalTracker) ListByStream(stream PresenceStream, includeHidden bool) []*Presence { +func (t *LocalTracker) ListByStream(stream PresenceStream, includeHidden bool, includeNotHidden bool) []*Presence { t.RLock() byStream, anyTracked := t.presencesByStream[stream.Mode][stream] if !anyTracked { @@ -557,7 +557,7 @@ func (t *LocalTracker) ListByStream(stream PresenceStream, includeHidden bool) [ } ps := make([]*Presence, 0, len(byStream)) for pc, meta := range byStream { - if !meta.Hidden || includeHidden { + if (meta.Hidden && includeHidden) || (!meta.Hidden && includeNotHidden) { ps = append(ps, &Presence{ID: pc.ID, Stream: stream, UserID: pc.UserID, Meta: meta}) } }