Commit 1ab27404 authored by Andrei Mihu's avatar Andrei Mihu
Browse files

Allow runtime stream user list function to filter hidden presences. (#222)

parent e54b956e
Loading
Loading
Loading
Loading
+4 −0
Original line number Diff line number Diff line
@@ -8,9 +8,13 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- New `bit32` module available in the code runtime.
- New code runtime function to create MD5 hashes.
- New code runtime function to create SHA256 hashes.
- Runtime stream user list function now allows filtering hidden presences.
- Allow optional request body compression on all API requests.

### Changed
- Reduce the frequency of socket checks on known active connections.
- Deleting a record from a leaderboard that does not exist now succeeds.
- Notification listings now use more accurate time in cacheable cursors.

## [2.0.2] - 2018-07-09
### Added
+25 −2
Original line number Diff line number Diff line
@@ -26,6 +26,8 @@ import (

	"crypto/tls"

	"compress/flate"
	"compress/gzip"
	"github.com/dgrijalva/jwt-go"
	"github.com/golang/protobuf/jsonpb"
	"github.com/golang/protobuf/ptypes/empty"
@@ -160,12 +162,14 @@ func StartApiServer(logger *zap.Logger, startupLogger *zap.Logger, db *sql.DB, j
	// Default to passing request to GRPC Gateway.
	// Enable max size check on requests coming arriving the gateway.
	// Enable compression on responses sent by the gateway.
	handlerWithGzip := handlers.CompressHandler(handlerWithStats)
	// Enable decompression on requests received by the gateway.
	handlerWithDecompressRequest := decompressHandler(logger, handlerWithStats)
	handlerWithCompressResponse := handlers.CompressHandler(handlerWithDecompressRequest)
	maxMessageSizeBytes := config.GetSocket().MaxMessageSizeBytes
	handlerWithMaxBody := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// Check max body size before decompressing incoming request body.
		r.Body = http.MaxBytesReader(w, r.Body, maxMessageSizeBytes)
		handlerWithGzip.ServeHTTP(w, r)
		handlerWithCompressResponse.ServeHTTP(w, r)
	})
	grpcGatewayRouter.NewRoute().Handler(handlerWithMaxBody)

@@ -452,3 +456,22 @@ func parseToken(hmacSecretByte []byte, tokenString string) (userID uuid.UUID, us
	}
	return userID, claims["usn"].(string), int64(claims["exp"].(float64)), true
}

func decompressHandler(logger *zap.Logger, h http.Handler) http.HandlerFunc {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		switch r.Header.Get("Content-Encoding") {
		case "gzip":
			gr, err := gzip.NewReader(r.Body)
			if err != nil {
				logger.Debug("Error processing gzip request body, attempting to read uncompressed", zap.Error(err))
				break
			}
			r.Body = gr
		case "deflate":
			r.Body = flate.NewReader(r.Body)
		default:
			// No request compression.
		}
		h.ServeHTTP(w, r)
	})
}
+1 −1
Original line number Diff line number Diff line
@@ -428,7 +428,7 @@ func LeaderboardRecordWrite(logger *zap.Logger, db *sql.DB, leaderboardCache Lea
func LeaderboardRecordDelete(logger *zap.Logger, db *sql.DB, leaderboardCache LeaderboardCache, caller uuid.UUID, leaderboardId, ownerId string) error {
	leaderboard := leaderboardCache.Get(leaderboardId)
	if leaderboard == nil {
		return ErrLeaderboardNotFound
		return nil
	}

	if leaderboard.Authoritative && caller != uuid.Nil {
+6 −3
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ import (
	"github.com/lib/pq"
	"github.com/satori/go.uuid"
	"go.uber.org/zap"
	"time"
)

const (
@@ -91,8 +92,8 @@ func NotificationList(logger *zap.Logger, db *sql.DB, userID uuid.UUID, limit in

	cursorQuery := " "
	if nc != nil && nc.NotificationID != nil {
		cursorQuery = " AND (user_id, create_time, id) > ($1::UUID, CAST($3::BIGINT AS TIMESTAMPTZ), $4::UUID)"
		params = append(params, nc.CreateTime, uuid.FromBytesOrNil(nc.NotificationID))
		cursorQuery = " AND (user_id, create_time, id) > ($1::UUID, $3::TIMESTAMPTZ, $4::UUID)"
		params = append(params, pq.NullTime{Time: time.Unix(0, nc.CreateTime), Valid: true}, uuid.FromBytesOrNil(nc.NotificationID))
	}

	rows, err := db.Query(`
@@ -108,6 +109,7 @@ ORDER BY create_time ASC`+limitQuery, params...)
	defer rows.Close()

	notifications := make([]*api.Notification, 0)
	var lastCreateTime int64
	for rows.Next() {
		no := &api.Notification{Persistent: true, CreateTime: &timestamp.Timestamp{}}
		var createTime pq.NullTime
@@ -116,6 +118,7 @@ ORDER BY create_time ASC`+limitQuery, params...)
			return nil, err
		}

		lastCreateTime = createTime.Time.UnixNano()
		no.CreateTime.Seconds = createTime.Time.Unix()
		if no.SenderId == uuid.Nil.String() {
			no.SenderId = ""
@@ -140,7 +143,7 @@ ORDER BY create_time ASC`+limitQuery, params...)
		lastNotification := notifications[len(notifications)-1]
		newCursor := &notificationCacheableCursor{
			NotificationID: uuid.FromStringOrNil(lastNotification.Id).Bytes(),
			CreateTime:     lastNotification.CreateTime.Seconds,
			CreateTime:     lastCreateTime,
		}
		if err := gob.NewEncoder(cursorBuf).Encode(newCursor); err != nil {
			logger.Error("Could not create new cursor.", zap.Error(err))
+1 −1
Original line number Diff line number Diff line
@@ -205,7 +205,7 @@ func (p *Pipeline) channelJoin(logger *zap.Logger, session Session, envelope *rt
	}

	// List current presences, not including hidden ones.
	presences := p.tracker.ListByStream(stream, false)
	presences := p.tracker.ListByStream(stream, false, true)

	// If the topic join is a DM check if we should notify the other user.
	// Only new presences are allowed to send notifications to avoid duplicates.
Loading