Unverified Commit 8ffe1e66 authored by Fernando Takagi's avatar Fernando Takagi Committed by GitHub
Browse files

Add prometheus labels for custom per-RPC identifier metrics. (#807)

parent ef114988
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ The format is based on [keep a changelog](http://keepachangelog.com) and this pr
- Add Groups page and associated endpoints to the developer console.
- Add NotificationSendAll function to the runtimes, for sending a notification to all users.
- Log a warning when client IP address cannot be resolved.
- Add tagged Prometheus stats containing RPC function identifiers.

### Changed
- Improve Stackdriver log format timestamp and message field formats.
+6 −4
Original line number Diff line number Diff line
@@ -86,14 +86,16 @@ func (s *ApiServer) RpcFuncHttp(w http.ResponseWriter, r *http.Request) {
		return
	}

	// After this point the RPC will be captured in metrics.
	start := time.Now()
	var success bool
	var recvBytes, sentBytes int
	var err error
	var id string

	// After this point the RPC will be captured in metrics.
	defer func() {
		s.metrics.Api("Rpc", time.Since(start), int64(recvBytes), int64(sentBytes), !success)
		s.metrics.ApiRpc(id, time.Since(start), int64(recvBytes), int64(sentBytes), !success)
	}()
	var err error

	// Check the RPC function ID.
	maybeID, ok := mux.Vars(r)["id"]
@@ -107,7 +109,7 @@ func (s *ApiServer) RpcFuncHttp(w http.ResponseWriter, r *http.Request) {
		}
		return
	}
	id := strings.ToLower(maybeID)
	id = strings.ToLower(maybeID)

	// Find the correct RPC function.
	fn := s.runtime.Rpc(id)
+2 −0
Original line number Diff line number Diff line
@@ -166,6 +166,8 @@ func (s *testMetrics) SnapshotRecvKbSec() float64 { return 0 }
func (s *testMetrics) SnapshotSentKbSec() float64 { return 0 }
func (s *testMetrics) Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool) {
}
func (s *testMetrics) ApiRpc(id string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool) {
}
func (s *testMetrics) ApiBefore(name string, elapsed time.Duration, isErr bool)             {}
func (s *testMetrics) ApiAfter(name string, elapsed time.Duration, isErr bool)              {}
func (s *testMetrics) Message(recvBytes int64, isErr bool)                                  {}
+44 −0
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ type Metrics interface {
	SnapshotSentKbSec() float64

	Api(name string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool)
	ApiRpc(id string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool)
	ApiBefore(name string, elapsed time.Duration, isErr bool)
	ApiAfter(name string, elapsed time.Duration, isErr bool)

@@ -260,6 +261,49 @@ func (m *LocalMetrics) Api(name string, elapsed time.Duration, recvBytes, sentBy
	}
}

func (m *LocalMetrics) ApiRpc(id string, elapsed time.Duration, recvBytes, sentBytes int64, isErr bool) {
	// Increment ongoing statistics for current measurement window.
	m.currentMsTotal.Add(int64(elapsed / time.Millisecond))
	m.currentReqCount.Inc()
	m.currentRecvBytes.Add(recvBytes)
	m.currentSentBytes.Add(sentBytes)

	// Global stats.
	m.PrometheusScope.Counter("overall_count").Inc(1)
	m.PrometheusScope.Counter("overall_request_count").Inc(1)
	m.PrometheusScope.Counter("overall_recv_bytes").Inc(recvBytes)
	m.PrometheusScope.Counter("overall_request_recv_bytes").Inc(recvBytes)
	m.PrometheusScope.Counter("overall_sent_bytes").Inc(sentBytes)
	m.PrometheusScope.Counter("overall_request_sent_bytes").Inc(sentBytes)
	m.PrometheusScope.Timer("overall_latency_ms").Record(elapsed)

	// Per-endpoint stats.
	m.PrometheusScope.Counter("Rpc_count").Inc(1)
	m.PrometheusScope.Counter("Rpc_recv_bytes").Inc(recvBytes)
	m.PrometheusScope.Counter("Rpc_sent_bytes").Inc(sentBytes)
	m.PrometheusScope.Timer("Rpc_latency_ms").Record(elapsed)

	// Per-endpoint, per-RPC ID stats.
	var taggedScope tally.Scope
	if id != "" {
		taggedScope = m.PrometheusScope.Tagged(map[string]string{"rpc_id": id})
		taggedScope.Counter("Rpc_count").Inc(1)
		taggedScope.Counter("Rpc_recv_bytes").Inc(recvBytes)
		taggedScope.Counter("Rpc_sent_bytes").Inc(sentBytes)
		taggedScope.Timer("Rpc_latency_ms").Record(elapsed)
	}

	// Error stats if applicable.
	if isErr {
		m.PrometheusScope.Counter("overall_errors").Inc(1)
		m.PrometheusScope.Counter("overall_request_errors").Inc(1)
		m.PrometheusScope.Counter("Rpc_errors").Inc(1)
		if taggedScope != nil {
			taggedScope.Counter("Rpc_errors").Inc(1)
		}
	}
}

func (m *LocalMetrics) ApiBefore(name string, elapsed time.Duration, isErr bool) {
	name = "before_" + strings.TrimPrefix(name, API_PREFIX)