862 lines
31 KiB
Python
862 lines
31 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2016 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Wrapper library around ts_mon.
|
|
|
|
This library provides some wrapper functionality around ts_mon, to make it more
|
|
friendly to developers. It also provides import safety, in case ts_mon is not
|
|
deployed with your code.
|
|
"""
|
|
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import collections
|
|
import contextlib
|
|
import ssl
|
|
import time
|
|
from functools import wraps
|
|
|
|
import six
|
|
from six.moves import queue as Queue
|
|
|
|
from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
|
|
|
|
try:
|
|
from infra_libs import ts_mon
|
|
except (ImportError, RuntimeError):
|
|
ts_mon = None
|
|
|
|
|
|
# This number is chosen because 1.16^100 seconds is about
|
|
# 32 days. This is a good compromise between bucket size
|
|
# and dynamic range.
|
|
_SECONDS_BUCKET_FACTOR = 1.16
|
|
|
|
# If none, we create metrics in this process. Otherwise, we send metrics via
|
|
# this Queue to a dedicated flushing processes.
|
|
# These attributes are set by chromite.lib.ts_mon_config.SetupTsMonGlobalState.
|
|
FLUSHING_PROCESS = None
|
|
MESSAGE_QUEUE = None
|
|
|
|
_MISSING = object()
|
|
|
|
MetricCall = collections.namedtuple('MetricCall', [
|
|
'metric_name', 'metric_args', 'metric_kwargs',
|
|
'method', 'method_args', 'method_kwargs',
|
|
'reset_after'
|
|
])
|
|
|
|
|
|
def _FlushingProcessClosed():
|
|
"""Returns whether the metrics flushing process has been closed."""
|
|
return (FLUSHING_PROCESS is not None and
|
|
FLUSHING_PROCESS.exitcode is not None)
|
|
|
|
|
|
class ProxyMetric(object):
|
|
"""Redirects any method calls to the message queue."""
|
|
def __init__(self, metric, metric_args, metric_kwargs):
|
|
self.metric = metric
|
|
self.metric_args = metric_args
|
|
self.reset_after = metric_kwargs.pop('reset_after', False)
|
|
self.metric_kwargs = metric_kwargs
|
|
|
|
def __getattr__(self, method_name):
|
|
"""Redirects all method calls to the MESSAGE_QUEUE."""
|
|
def enqueue(*args, **kwargs):
|
|
if not _FlushingProcessClosed():
|
|
try:
|
|
MESSAGE_QUEUE.put_nowait(
|
|
MetricCall(
|
|
metric_name=self.metric,
|
|
metric_args=self.metric_args,
|
|
metric_kwargs=self.metric_kwargs,
|
|
method=method_name,
|
|
method_args=args,
|
|
method_kwargs=kwargs,
|
|
reset_after=self.reset_after))
|
|
except Queue.Full:
|
|
logging.warning(
|
|
"Metrics queue is full; skipped sending metric '%s'",
|
|
self.metric)
|
|
else:
|
|
try:
|
|
exit_code = FLUSHING_PROCESS.exitcode
|
|
except AttributeError:
|
|
exit_code = None
|
|
logging.warning(
|
|
'Flushing process has been closed (exit code %s),'
|
|
" skipped sending metric '%s'",
|
|
exit_code,
|
|
self.metric)
|
|
|
|
return enqueue
|
|
|
|
|
|
def _Indirect(fn):
|
|
"""Decorates a function to be indirect If MESSAGE_QUEUE is set.
|
|
|
|
If MESSAGE_QUEUE is set, the indirect function will return a proxy metrics
|
|
object; otherwise, it behaves normally.
|
|
"""
|
|
@wraps(fn)
|
|
def AddToQueueIfPresent(*args, **kwargs):
|
|
if MESSAGE_QUEUE:
|
|
return ProxyMetric(fn.__name__, args, kwargs)
|
|
else:
|
|
# Whether to reset the metric after the flush; this is only used by
|
|
# |ProxyMetric|, so remove this from the kwargs.
|
|
kwargs.pop('reset_after', None)
|
|
return fn(*args, **kwargs)
|
|
return AddToQueueIfPresent
|
|
|
|
|
|
class MockMetric(object):
|
|
"""Mock metric object, to be returned if ts_mon is not set up."""
|
|
|
|
def _mock_method(self, *args, **kwargs):
|
|
pass
|
|
|
|
def __getattr__(self, _):
|
|
return self._mock_method
|
|
|
|
|
|
def _ImportSafe(fn):
|
|
"""Decorator which causes |fn| to return MockMetric if ts_mon not imported."""
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
if ts_mon:
|
|
return fn(*args, **kwargs)
|
|
else:
|
|
return MockMetric()
|
|
|
|
return wrapper
|
|
|
|
|
|
class FieldSpecAdapter(object):
|
|
"""Infers the types of fields values to work around field_spec requirement.
|
|
|
|
See: https://chromium-review.googlesource.com/c/432120/ for the change
|
|
which added a required field_spec argument. This class is a temporary
|
|
workaround to allow inferring the field_spec if is not provided.
|
|
"""
|
|
FIELD_CLASSES = {} if ts_mon is None else {
|
|
bool: ts_mon.BooleanField,
|
|
int: ts_mon.IntegerField,
|
|
str: ts_mon.StringField,
|
|
six.text_type: ts_mon.StringField,
|
|
}
|
|
|
|
def __init__(self, metric_cls, *args, **kwargs):
|
|
self._metric_cls = metric_cls
|
|
self._args = args
|
|
self._kwargs = kwargs
|
|
self._instance = _MISSING
|
|
|
|
def __getattr__(self, prop):
|
|
"""Return a wrapper which constructs the metric object on demand.
|
|
|
|
Args:
|
|
prop: The property name
|
|
|
|
Returns:
|
|
If self._instance has been created, the instance's .|prop| property,
|
|
otherwise, a wrapper function which creates the ._instance and then
|
|
calls the |prop| method on the instance.
|
|
"""
|
|
if self._instance is not _MISSING:
|
|
return getattr(self._instance, prop)
|
|
|
|
def func(*args, **kwargs):
|
|
if self._instance is not _MISSING:
|
|
return getattr(self._instance, prop)(*args, **kwargs)
|
|
fields = FieldSpecAdapter._InferFields(prop, args, kwargs)
|
|
self._kwargs['field_spec'] = FieldSpecAdapter._InferFieldSpec(fields)
|
|
self._instance = self._metric_cls(*self._args, **self._kwargs)
|
|
return getattr(self._instance, prop)(*args, **kwargs)
|
|
|
|
func.__name__ = prop
|
|
return func
|
|
|
|
@staticmethod
|
|
def _InferFields(method_name, args, kwargs):
|
|
"""Infers the fields argument.
|
|
|
|
Args:
|
|
method_name: The method called.
|
|
args: The args list
|
|
kwargs: The keyword args
|
|
"""
|
|
if 'fields' in kwargs:
|
|
return kwargs['fields']
|
|
|
|
if method_name == 'increment' and args:
|
|
return args[0]
|
|
|
|
if len(args) >= 2:
|
|
return args[1]
|
|
|
|
@staticmethod
|
|
def _InferFieldSpec(fields):
|
|
"""Infers the fields types from the given fields.
|
|
|
|
Args:
|
|
fields: A dictionary with metric fields.
|
|
"""
|
|
if not fields or not ts_mon:
|
|
return None
|
|
|
|
return [FieldSpecAdapter.FIELD_CLASSES[type(v)](field)
|
|
for (field, v) in sorted(fields.items())]
|
|
|
|
|
|
def _OptionalFieldSpec(fn):
|
|
"""Decorates a function to allow an optional description and field_spec."""
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
kwargs = dict(**kwargs) # It's bad practice to mutate **kwargs
|
|
# Slightly different than .setdefault, this line sets a default even when
|
|
# the key is present (as long as the value is not truthy). Empty or None is
|
|
# not allowed for descriptions.
|
|
kwargs['description'] = kwargs.get('description') or 'No description.'
|
|
if 'field_spec' in kwargs and kwargs['field_spec'] is not _MISSING:
|
|
return fn(*args, **kwargs)
|
|
else:
|
|
return FieldSpecAdapter(fn, *args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
def _Metric(fn):
|
|
"""A pipeline of decorators to apply to our metric constructors."""
|
|
return _OptionalFieldSpec(_ImportSafe(_Indirect(fn)))
|
|
|
|
|
|
# This is needed for the reset_after flag used by @Indirect.
|
|
# pylint: disable=unused-argument
|
|
|
|
@_Metric
|
|
def CounterMetric(name, reset_after=False, description=None,
|
|
field_spec=_MISSING, start_time=None):
|
|
"""Returns a metric handle for a counter named |name|."""
|
|
return ts_mon.CounterMetric(name,
|
|
description=description, field_spec=field_spec,
|
|
start_time=start_time)
|
|
Counter = CounterMetric
|
|
|
|
|
|
@_Metric
|
|
def GaugeMetric(name, reset_after=False, description=None, field_spec=_MISSING):
|
|
"""Returns a metric handle for a gauge named |name|."""
|
|
return ts_mon.GaugeMetric(name, description=description,
|
|
field_spec=field_spec)
|
|
Gauge = GaugeMetric
|
|
|
|
|
|
@_Metric
|
|
def CumulativeMetric(name, reset_after=False, description=None,
|
|
field_spec=_MISSING):
|
|
"""Returns a metric handle for a cumulative float named |name|."""
|
|
return ts_mon.CumulativeMetric(name, description=description,
|
|
field_spec=field_spec)
|
|
|
|
|
|
@_Metric
|
|
def StringMetric(name, reset_after=False, description=None,
|
|
field_spec=_MISSING):
|
|
"""Returns a metric handle for a string named |name|."""
|
|
return ts_mon.StringMetric(name, description=description,
|
|
field_spec=field_spec)
|
|
String = StringMetric
|
|
|
|
|
|
@_Metric
|
|
def BooleanMetric(name, reset_after=False, description=None,
|
|
field_spec=_MISSING):
|
|
"""Returns a metric handle for a boolean named |name|."""
|
|
return ts_mon.BooleanMetric(name, description=description,
|
|
field_spec=field_spec)
|
|
Boolean = BooleanMetric
|
|
|
|
|
|
@_Metric
|
|
def FloatMetric(name, reset_after=False, description=None, field_spec=_MISSING):
|
|
"""Returns a metric handle for a float named |name|."""
|
|
return ts_mon.FloatMetric(name, description=description,
|
|
field_spec=field_spec)
|
|
Float = FloatMetric
|
|
|
|
|
|
@_Metric
|
|
def CumulativeDistributionMetric(name, reset_after=False, description=None,
|
|
bucketer=None, field_spec=_MISSING):
|
|
"""Returns a metric handle for a cumulative distribution named |name|."""
|
|
return ts_mon.CumulativeDistributionMetric(
|
|
name, description=description, bucketer=bucketer, field_spec=field_spec)
|
|
CumulativeDistribution = CumulativeDistributionMetric
|
|
|
|
|
|
@_Metric
|
|
def DistributionMetric(name, reset_after=False, description=None,
|
|
bucketer=None, field_spec=_MISSING):
|
|
"""Returns a metric handle for a distribution named |name|."""
|
|
return ts_mon.NonCumulativeDistributionMetric(
|
|
name, description=description, bucketer=bucketer, field_spec=field_spec)
|
|
Distribution = DistributionMetric
|
|
|
|
|
|
@_Metric
|
|
def CumulativeSmallIntegerDistribution(name, reset_after=False,
|
|
description=None, field_spec=_MISSING):
|
|
"""Returns a metric handle for a cumulative distribution named |name|.
|
|
|
|
This differs slightly from CumulativeDistribution, in that the underlying
|
|
metric uses a uniform bucketer rather than a geometric one.
|
|
|
|
This metric type is suitable for holding a distribution of numbers that are
|
|
nonnegative integers in the range of 0 to 100.
|
|
"""
|
|
return ts_mon.CumulativeDistributionMetric(
|
|
name,
|
|
bucketer=ts_mon.FixedWidthBucketer(1),
|
|
description=description,
|
|
field_spec=field_spec)
|
|
|
|
|
|
@_Metric
|
|
def CumulativeSecondsDistribution(name, scale=1, reset_after=False,
|
|
description=None, field_spec=_MISSING):
|
|
"""Returns a metric handle for a cumulative distribution named |name|.
|
|
|
|
The distribution handle returned by this method is better suited than the
|
|
default one for recording handling times, in seconds.
|
|
|
|
This metric handle has bucketing that is optimized for time intervals
|
|
(in seconds) in the range of 1 second to 32 days. Use |scale| to adjust this
|
|
(e.g. scale=0.1 covers a range from .1 seconds to 3.2 days).
|
|
|
|
Args:
|
|
name: string name of metric
|
|
scale: scaling factor of buckets, and size of the first bucket. default: 1
|
|
reset_after: Should the metric be reset after reporting.
|
|
description: A string description of the metric.
|
|
field_spec: A sequence of ts_mon.Field objects to specify the field schema.
|
|
"""
|
|
b = ts_mon.GeometricBucketer(growth_factor=_SECONDS_BUCKET_FACTOR,
|
|
scale=scale)
|
|
return ts_mon.CumulativeDistributionMetric(
|
|
name, bucketer=b, units=ts_mon.MetricsDataUnits.SECONDS,
|
|
description=description, field_spec=field_spec)
|
|
|
|
SecondsDistribution = CumulativeSecondsDistribution
|
|
|
|
|
|
@_Metric
|
|
def PercentageDistribution(
|
|
name, num_buckets=1000, reset_after=False,
|
|
description=None, field_spec=_MISSING):
|
|
"""Returns a metric handle for a cumulative distribution for percentage.
|
|
|
|
The distribution handle returned by this method is better suited for reporting
|
|
percentage values than the default one. The bucketing is optimized for values
|
|
in [0,100].
|
|
|
|
Args:
|
|
name: The name of this metric.
|
|
num_buckets: This metric buckets the percentage values before
|
|
reporting. This argument controls the number of the bucket the range
|
|
[0,100] is divided in. The default gives you 0.1% resolution.
|
|
reset_after: Should the metric be reset after reporting.
|
|
description: A string description of the metric.
|
|
field_spec: A sequence of ts_mon.Field objects to specify the field schema.
|
|
"""
|
|
# The last bucket actually covers [100, 100 + 1.0/num_buckets), so it
|
|
# corresponds to values that exactly match 100%.
|
|
bucket_width = 100 / num_buckets
|
|
b = ts_mon.FixedWidthBucketer(bucket_width, num_buckets)
|
|
return ts_mon.CumulativeDistributionMetric(
|
|
name, bucketer=b,
|
|
description=description, field_spec=field_spec)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def SecondsTimer(name, fields=None, description=None, field_spec=_MISSING,
|
|
scale=1, record_on_exception=True, add_exception_field=False):
|
|
"""Record the time of an operation to a CumulativeSecondsDistributionMetric.
|
|
|
|
Records the time taken inside of the context block, to the
|
|
CumulativeSecondsDistribution named |name|, with the given fields.
|
|
|
|
Examples:
|
|
# Time the doSomething() call, with field values that are independent of the
|
|
# results of the operation.
|
|
with SecondsTimer('timer/name', fields={'foo': 'bar'},
|
|
description='My timer',
|
|
field_spec=[ts_mon.StringField('foo'),
|
|
ts_mon.BooleanField('success')]):
|
|
doSomething()
|
|
|
|
# Time the doSomethingElse call, with field values that depend on the
|
|
# results of that operation. Note that it is important that a default value
|
|
# is specified for these fields, in case an exception is thrown by
|
|
# doSomethingElse()
|
|
f = {'success': False, 'foo': 'bar'}
|
|
with SecondsTimer('timer/name', fields=f, description='My timer',
|
|
field_spec=[ts_mon.StringField('foo')]) as c:
|
|
doSomethingElse()
|
|
c['success'] = True
|
|
|
|
# Incorrect Usage!
|
|
with SecondsTimer('timer/name', description='My timer') as c:
|
|
doSomething()
|
|
c['foo'] = bar # 'foo' is not a valid field, because no default
|
|
# value for it was specified in the context constructor.
|
|
# It will be silently ignored.
|
|
|
|
Args:
|
|
name: The name of the metric to create
|
|
fields: The fields of the metric to create.
|
|
description: A string description of the metric.
|
|
field_spec: A sequence of ts_mon.Field objects to specify the field schema.
|
|
scale: A float to scale the CumulativeSecondsDistribution buckets by.
|
|
record_on_exception: Whether to record metrics if an exception is raised.
|
|
add_exception_field: Whether to add a BooleanField('encountered_exception')
|
|
to the FieldSpec provided, and set its value to True iff an exception
|
|
was raised in the context.
|
|
"""
|
|
if field_spec is not None and field_spec is not _MISSING:
|
|
field_spec.append(ts_mon.BooleanField('encountered_exception'))
|
|
|
|
m = CumulativeSecondsDistribution(
|
|
name, scale=scale, description=description, field_spec=field_spec)
|
|
f = fields or {}
|
|
f = dict(f)
|
|
keys = list(f)
|
|
t0 = _GetSystemClock()
|
|
|
|
error = True
|
|
try:
|
|
yield f
|
|
error = False
|
|
finally:
|
|
if record_on_exception and add_exception_field:
|
|
keys.append('encountered_exception')
|
|
f.setdefault('encountered_exception', error)
|
|
# Filter out keys that were not part of the initial key set. This is to
|
|
# avoid inconsistent fields.
|
|
# TODO(akeshet): Doing this filtering isn't super efficient. Would be better
|
|
# to implement some key-restricted subclass or wrapper around dict, and just
|
|
# yield that above rather than yielding a regular dict.
|
|
if record_on_exception or not error:
|
|
dt = _GetSystemClock() - t0
|
|
# TODO(ayatane): Handle backward clock jumps. See _GetSystemClock.
|
|
if dt >= 0:
|
|
m.add(dt, fields={k: f[k] for k in keys})
|
|
|
|
|
|
def SecondsTimerDecorator(name, fields=None, description=None,
|
|
field_spec=_MISSING, scale=1,
|
|
record_on_exception=True, add_exception_field=False):
|
|
"""Decorator to time the duration of function calls.
|
|
|
|
Examples:
|
|
@SecondsTimerDecorator('timer/name', fields={'foo': 'bar'},
|
|
description='My timer',
|
|
field_spec=[ts_mon.StringField('foo')])
|
|
def Foo(bar):
|
|
return doStuff()
|
|
|
|
is equivalent to
|
|
|
|
def Foo(bar):
|
|
with SecondsTimer('timer/name', fields={'foo': 'bar'},
|
|
description='My timer',
|
|
field_spec=[ts_mon.StringField('foo')])
|
|
return doStuff()
|
|
|
|
Args:
|
|
name: The name of the metric to create
|
|
fields: The fields of the metric to create
|
|
description: A string description of the metric.
|
|
field_spec: A sequence of ts_mon.Field objects to specify the field schema.
|
|
scale: A float to scale the distrubtion by
|
|
record_on_exception: Whether to record metrics if an exception is raised.
|
|
add_exception_field: Whether to add a BooleanField('encountered_exception')
|
|
to the FieldSpec provided, and set its value to True iff an exception
|
|
was raised in the context.
|
|
"""
|
|
def decorator(fn):
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
with SecondsTimer(name, fields=fields, description=description,
|
|
field_spec=field_spec, scale=scale,
|
|
record_on_exception=record_on_exception,
|
|
add_exception_field=add_exception_field):
|
|
return fn(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def SecondsInstanceTimer(name, fields=None, description=None,
|
|
field_spec=_MISSING, record_on_exception=True,
|
|
add_exception_field=False):
|
|
"""Record the time of an operation to a FloatMetric.
|
|
|
|
Records the time taken inside of the context block, to the
|
|
Float metric named |name|, with the given fields. This is
|
|
a non-cumulative metric; this represents the absolute time
|
|
taken for a specific block. The duration is stored in a float
|
|
to provide flexibility in the future for higher accuracy.
|
|
|
|
Examples:
|
|
# Time the doSomething() call, with field values that are independent of the
|
|
# results of the operation.
|
|
with SecondsInstanceTimer('timer/name', fields={'foo': 'bar'},
|
|
description='My timer',
|
|
field_spec=[ts_mon.StringField('foo'),
|
|
ts_mon.BooleanField('success')]):
|
|
doSomething()
|
|
|
|
# Time the doSomethingElse call, with field values that depend on the
|
|
# results of that operation. Note that it is important that a default value
|
|
# is specified for these fields, in case an exception is thrown by
|
|
# doSomethingElse()
|
|
f = {'success': False, 'foo': 'bar'}
|
|
with SecondsInstanceTimer('timer/name', fields=f, description='My timer',
|
|
field_spec=[ts_mon.StringField('foo')]) as c:
|
|
doSomethingElse()
|
|
c['success'] = True
|
|
|
|
# Incorrect Usage!
|
|
with SecondsInstanceTimer('timer/name', description='My timer') as c:
|
|
doSomething()
|
|
c['foo'] = bar # 'foo' is not a valid field, because no default
|
|
# value for it was specified in the context constructor.
|
|
# It will be silently ignored.
|
|
|
|
Args:
|
|
name: The name of the metric to create
|
|
fields: The fields of the metric to create.
|
|
description: A string description of the metric.
|
|
field_spec: A sequence of ts_mon.Field objects to specify the field schema.
|
|
record_on_exception: Whether to record metrics if an exception is raised.
|
|
add_exception_field: Whether to add a BooleanField('encountered_exception')
|
|
to the FieldSpec provided, and set its value to True iff an exception
|
|
was raised in the context.
|
|
|
|
Yields:
|
|
Float based metric measing the duration of execution.
|
|
"""
|
|
if field_spec is not None and field_spec is not _MISSING:
|
|
field_spec.append(ts_mon.BooleanField('encountered_exception'))
|
|
|
|
m = FloatMetric(name, description=description, field_spec=field_spec)
|
|
f = dict(fields or {})
|
|
keys = list(f)
|
|
t0 = _GetSystemClock()
|
|
|
|
error = True
|
|
try:
|
|
yield f
|
|
error = False
|
|
finally:
|
|
if record_on_exception and add_exception_field:
|
|
keys.append('encountered_exception')
|
|
f.setdefault('encountered_exception', error)
|
|
# Filter out keys that were not part of the initial key set. This is to
|
|
# avoid inconsistent fields.
|
|
# TODO(akeshet): Doing this filtering isn't super efficient. Would be better
|
|
# to implement some key-restricted subclass or wrapper around dict, and just
|
|
# yield that above rather than yielding a regular dict.
|
|
if record_on_exception or not error:
|
|
dt = _GetSystemClock() - t0
|
|
m.set(dt, fields={k: f[k] for k in keys})
|
|
|
|
|
|
def SecondsInstanceTimerDecorator(name, fields=None, description=None,
|
|
field_spec=_MISSING,
|
|
record_on_exception=True,
|
|
add_exception_field=False):
|
|
"""Decorator to time the gauge duration of function calls.
|
|
|
|
Examples:
|
|
@SecondsInstanceTimerDecorator('timer/name', fields={'foo': 'bar'},
|
|
description='My timer',
|
|
field_spec=[ts_mon.StringField('foo'),
|
|
ts_mon.BooleanField('success')]):
|
|
|
|
def Foo(bar):
|
|
return doStuff()
|
|
|
|
is equivalent to
|
|
|
|
def Foo(bar):
|
|
with SecondsInstanceTimer('timer/name', fields={'foo': 'bar'},
|
|
description='My timer',
|
|
field_spec=[ts_mon.StringField('foo'),
|
|
ts_mon.BooleanField('success')]):
|
|
return doStuff()
|
|
|
|
Args:
|
|
name: The name of the metric to create
|
|
fields: The fields of the metric to create
|
|
description: A string description of the metric.
|
|
field_spec: A sequence of ts_mon.Field objects to specify the field schema.
|
|
record_on_exception: Whether to record metrics if an exception is raised.
|
|
add_exception_field: Whether to add a BooleanField('encountered_exception')
|
|
to the FieldSpec provided, and set its value to True iff an exception
|
|
was raised in the context.
|
|
|
|
Returns:
|
|
A SecondsInstanceTimer metric decorator.
|
|
"""
|
|
def decorator(fn):
|
|
@wraps(fn)
|
|
def wrapper(*args, **kwargs):
|
|
with SecondsInstanceTimer(name, fields=fields, description=description,
|
|
field_spec=field_spec,
|
|
record_on_exception=record_on_exception,
|
|
add_exception_field=add_exception_field):
|
|
return fn(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def SuccessCounter(name, fields=None, description=None, field_spec=_MISSING):
|
|
"""Create a counter that tracks if something succeeds.
|
|
|
|
Args:
|
|
name: The name of the metric to create
|
|
fields: The fields of the metric
|
|
description: A string description of the metric.
|
|
field_spec: A sequence of ts_mon.Field objects to specify the field schema.
|
|
"""
|
|
c = Counter(name)
|
|
f = fields or {}
|
|
f = f.copy()
|
|
# We add in the additional field success.
|
|
keys = list(f) + ['success']
|
|
success = False
|
|
try:
|
|
yield f
|
|
success = True
|
|
finally:
|
|
f.setdefault('success', success)
|
|
f = {k: f[k] for k in keys}
|
|
c.increment(fields=f)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def Presence(name, fields=None, description=None, field_spec=_MISSING):
|
|
"""A counter of 'active' things.
|
|
|
|
This keeps track of how many name's are active at any given time. However,
|
|
it's only suitable for long running tasks, since the initial true value may
|
|
never be written out if the task doesn't run for at least a minute.
|
|
"""
|
|
b = Boolean(name, description=None, field_spec=field_spec)
|
|
b.set(True, fields=fields)
|
|
try:
|
|
yield
|
|
finally:
|
|
b.set(False, fields=fields)
|
|
|
|
|
|
class RuntimeBreakdownTimer(object):
|
|
"""Record the time of an operation and the breakdown into sub-steps.
|
|
|
|
Examples:
|
|
with RuntimeBreakdownTimer('timer/name', fields={'foo':'bar'},
|
|
description='My timer',
|
|
field_spec=[ts_mon.StringField('foo')]) as timer:
|
|
with timer.Step('first_step'):
|
|
doFirstStep()
|
|
with timer.Step('second_step'):
|
|
doSecondStep()
|
|
# The time spent next will show up under .../timer/name/breakdown_no_step
|
|
doSomeNonStepWork()
|
|
|
|
This will emit the following metrics:
|
|
- .../timer/name/total_duration - A CumulativeSecondsDistribution metric for
|
|
the time spent inside the outer with block.
|
|
- .../timer/name/breakdown/first_step and
|
|
.../timer/name/breakdown/second_step - PercentageDistribution metrics for
|
|
the fraction of time devoted to each substep.
|
|
- .../timer/name/breakdown_unaccounted - PercentageDistribution metric for the
|
|
fraction of time that is not accounted for in any of the substeps.
|
|
- .../timer/name/bucketing_loss - PercentageDistribution metric buckets values
|
|
before reporting them as distributions. This causes small errors in the
|
|
reported values because they are rounded to the reported buckets lower
|
|
bound. This is a CumulativeMetric measuring the total rounding error
|
|
accrued in reporting all the percentages. The worst case bucketing loss
|
|
for x steps is (x+1)/10. So, if you time across 9 steps, you should
|
|
expect no more than 1% rounding error.
|
|
[experimental]
|
|
- .../timer/name/duration_breakdown - A Float metric, with one stream per Step
|
|
indicating the ratio of time spent in that step. The different steps are
|
|
differentiated via a field with key 'step_name'. Since some of the time
|
|
can be spent outside any steps, these ratios will sum to <= 1.
|
|
|
|
NB: This helper can only be used if the field values are known at the
|
|
beginning of the outer context and do not change as a result of any of the
|
|
operations timed.
|
|
"""
|
|
|
|
PERCENT_BUCKET_COUNT = 1000
|
|
|
|
_StepMetrics = collections.namedtuple('_StepMetrics', ['name', 'time_s'])
|
|
|
|
def __init__(self, name, fields=None, description=None, field_spec=_MISSING):
|
|
self._name = name
|
|
self._fields = fields
|
|
self._field_spec = field_spec
|
|
self._description = description
|
|
self._outer_t0 = None
|
|
self._total_time_s = 0
|
|
self._inside_step = False
|
|
self._step_metrics = []
|
|
|
|
def __enter__(self):
|
|
self._outer_t0 = _GetSystemClock()
|
|
return self
|
|
|
|
def __exit__(self, _type, _value, _traceback):
|
|
self._RecordTotalTime()
|
|
|
|
outer_timer = CumulativeSecondsDistribution(
|
|
'%s/total_duration' % (self._name,),
|
|
field_spec=self._field_spec,
|
|
description=self._description)
|
|
outer_timer.add(self._total_time_s, fields=self._fields)
|
|
|
|
for name, percent in self._GetStepBreakdowns().items():
|
|
step_metric = PercentageDistribution(
|
|
'%s/breakdown/%s' % (self._name, name),
|
|
num_buckets=self.PERCENT_BUCKET_COUNT,
|
|
field_spec=self._field_spec,
|
|
description=self._description)
|
|
step_metric.add(percent, fields=self._fields)
|
|
|
|
fields = dict(self._fields) if self._fields is not None else dict()
|
|
fields['step_name'] = name
|
|
# TODO(pprabhu): Convert _GetStepBreakdowns() to return ratios instead of
|
|
# percentage when the old PercentageDistribution reporting is deleted.
|
|
Float('%s/duration_breakdown' % self._name).set(percent / 100,
|
|
fields=fields)
|
|
|
|
unaccounted_metric = PercentageDistribution(
|
|
'%s/breakdown_unaccounted' % self._name,
|
|
num_buckets=self.PERCENT_BUCKET_COUNT,
|
|
field_spec=self._field_spec,
|
|
description=self._description)
|
|
unaccounted_metric.add(self._GetUnaccountedBreakdown(), fields=self._fields)
|
|
|
|
bucketing_loss_metric = CumulativeMetric(
|
|
'%s/bucketing_loss' % self._name,
|
|
field_spec=self._field_spec,
|
|
description=self._description)
|
|
bucketing_loss_metric.increment_by(self._GetBucketingLoss(),
|
|
fields=self._fields)
|
|
|
|
@contextlib.contextmanager
|
|
def Step(self, step_name):
|
|
"""Start a new step named step_name in the timed operation.
|
|
|
|
Note that it is not possible to start a step inside a step. i.e.,
|
|
|
|
with RuntimeBreakdownTimer('timer') as timer:
|
|
with timer.Step('outer_step'):
|
|
with timer.Step('inner_step'):
|
|
# will by design raise an exception.
|
|
|
|
Args:
|
|
step_name: The name of the step being timed.
|
|
"""
|
|
if self._inside_step:
|
|
logging.error('RuntimeBreakdownTimer.Step is not reentrant. '
|
|
'Dropping step: %s', step_name)
|
|
yield
|
|
return
|
|
|
|
self._inside_step = True
|
|
t0 = _GetSystemClock()
|
|
try:
|
|
yield
|
|
finally:
|
|
self._inside_step = False
|
|
step_time_s = _GetSystemClock() - t0
|
|
# TODO(ayatane): Handle backward clock jumps. See _GetSystemClock.
|
|
step_time_s = max(0, step_time_s)
|
|
self._step_metrics.append(self._StepMetrics(step_name, step_time_s))
|
|
|
|
def _GetStepBreakdowns(self):
|
|
"""Returns percentage of time spent in each step.
|
|
|
|
Must be called after |_RecordTotalTime|.
|
|
"""
|
|
if not self._total_time_s:
|
|
return {}
|
|
return {x.name: (x.time_s * 100) / self._total_time_s
|
|
for x in self._step_metrics}
|
|
|
|
def _GetUnaccountedBreakdown(self):
|
|
"""Returns the percentage time spent outside of all steps.
|
|
|
|
Must be called after |_RecordTotalTime|.
|
|
"""
|
|
breakdown_percentages = sum(self._GetStepBreakdowns().values())
|
|
return max(0, 100 - breakdown_percentages)
|
|
|
|
def _GetBucketingLoss(self):
|
|
"""Compute the actual loss in reported percentages due to bucketing.
|
|
|
|
Must be called after |_RecordTotalTime|.
|
|
"""
|
|
reported = list(self._GetStepBreakdowns().values())
|
|
reported.append(self._GetUnaccountedBreakdown())
|
|
bucket_width = 100 / self.PERCENT_BUCKET_COUNT
|
|
return sum(x % bucket_width for x in reported)
|
|
|
|
def _RecordTotalTime(self):
|
|
self._total_time_s = _GetSystemClock() - self._outer_t0
|
|
# TODO(ayatane): Handle backward clock jumps. See _GetSystemClock.
|
|
self._total_time_s = max(0, self._total_time_s)
|
|
|
|
|
|
def _GetSystemClock():
|
|
"""Return a clock time.
|
|
|
|
The only thing that the return value can be used for is to subtract from
|
|
other instances to determine time elapsed.
|
|
"""
|
|
# TODO(ayatane): We should use a monotonic clock to measure this,
|
|
# but Python 2 does not have one.
|
|
return time.time()
|
|
|
|
|
|
def Flush(reset_after=()):
|
|
"""Flushes metrics, but warns on transient errors.
|
|
|
|
Args:
|
|
reset_after: A list of metrics to reset after flushing.
|
|
"""
|
|
if not ts_mon:
|
|
return
|
|
|
|
try:
|
|
ts_mon.flush()
|
|
while reset_after:
|
|
reset_after.pop().reset()
|
|
except ssl.SSLError as e:
|
|
logging.warning('Caught transient network error while flushing: %s', e)
|
|
except Exception as e:
|
|
logging.error('Caught exception while flushing: %s', e)
|