398 lines
13 KiB
Python
398 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2014 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Wrapper for inframon's command-line flag based configuration."""
|
|
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
import contextlib
|
|
import multiprocessing
|
|
import os
|
|
import socket
|
|
import signal
|
|
import time
|
|
|
|
from six.moves import queue as Queue
|
|
|
|
import six
|
|
|
|
from autotest_lib.utils.frozen_chromite.lib import cros_logging as logging
|
|
from autotest_lib.utils.frozen_chromite.lib import metrics
|
|
from autotest_lib.utils.frozen_chromite.lib import parallel
|
|
|
|
try:
|
|
from infra_libs.ts_mon import config
|
|
from infra_libs.ts_mon import BooleanField
|
|
from infra_libs.ts_mon import IntegerField
|
|
from infra_libs.ts_mon import StringField
|
|
import googleapiclient.discovery
|
|
except (ImportError, RuntimeError) as e:
|
|
config = None
|
|
logging.warning('Failed to import ts_mon, monitoring is disabled: %s', e)
|
|
|
|
|
|
_WasSetup = False
|
|
_CommonMetricFields = {}
|
|
|
|
FLUSH_INTERVAL = 60
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def TrivialContextManager():
|
|
"""Context manager with no side effects."""
|
|
yield
|
|
|
|
|
|
def GetMetricFieldSpec(fields=None):
|
|
"""Return the corresponding field_spec for metric fields.
|
|
|
|
Args:
|
|
fields: Dictionary containing metric fields.
|
|
|
|
Returns:
|
|
field_spec: List containing any *Field object associated with metric.
|
|
"""
|
|
field_spec = []
|
|
if fields:
|
|
for key, val in fields.items():
|
|
if isinstance(val, bool):
|
|
field_spec.append(BooleanField(key))
|
|
elif isinstance(val, int):
|
|
field_spec.append(IntegerField(key))
|
|
elif isinstance(val, six.string_types):
|
|
field_spec.append(StringField(key))
|
|
else:
|
|
logging.error("Couldn't classify the metric field %s:%s",
|
|
key, val)
|
|
|
|
return field_spec
|
|
|
|
def AddCommonFields(fields=None, field_spec=None):
|
|
"""Add cbuildbot-wide common fields to a given field set.
|
|
|
|
Args:
|
|
fields: Dictionary containing metric fields to which common metric fields
|
|
will be added.
|
|
field_spec: List containing any *Field object associated with metric.
|
|
|
|
Returns:
|
|
Dictionary containing complete set of metric fields to be applied to
|
|
metric and a list of corresponding field_spec.
|
|
"""
|
|
metric_fields = (dict(_CommonMetricFields) if _CommonMetricFields
|
|
else {})
|
|
|
|
if metric_fields:
|
|
metric_fields.update(fields or {})
|
|
return metric_fields, GetMetricFieldSpec(metric_fields)
|
|
else:
|
|
return fields, field_spec
|
|
|
|
|
|
def SetupTsMonGlobalState(service_name,
|
|
indirect=False,
|
|
suppress_exception=True,
|
|
short_lived=False,
|
|
auto_flush=True,
|
|
common_metric_fields=None,
|
|
debug_file=None,
|
|
task_num=0):
|
|
"""Uses a dummy argument parser to get the default behavior from ts-mon.
|
|
|
|
Args:
|
|
service_name: The name of the task we are sending metrics from.
|
|
indirect: Whether to create a metrics.METRICS_QUEUE object and a separate
|
|
process for indirect metrics flushing. Useful for forking,
|
|
because forking would normally create a duplicate ts_mon thread.
|
|
suppress_exception: True to silence any exception during the setup. Default
|
|
is set to True.
|
|
short_lived: Whether this process is short-lived and should use the autogen
|
|
hostname prefix.
|
|
auto_flush: Whether to create a thread to automatically flush metrics every
|
|
minute.
|
|
common_metric_fields: Dictionary containing the metric fields that will be
|
|
added to all metrics.
|
|
debug_file: If non-none, send metrics to this path instead of to PubSub.
|
|
task_num: (Default 0) The task_num target field of the metrics to emit.
|
|
"""
|
|
if not config:
|
|
return TrivialContextManager()
|
|
|
|
# The flushing subprocess calls .flush manually.
|
|
if indirect:
|
|
auto_flush = False
|
|
|
|
if common_metric_fields:
|
|
_CommonMetricFields.update(common_metric_fields)
|
|
|
|
# google-api-client has too much noisey logging.
|
|
options = _GenerateTsMonArgparseOptions(
|
|
service_name, short_lived, auto_flush, debug_file, task_num)
|
|
|
|
if indirect:
|
|
return _CreateTsMonFlushingProcess(options)
|
|
else:
|
|
_SetupTsMonFromOptions(options, suppress_exception)
|
|
return TrivialContextManager()
|
|
|
|
|
|
def _SetupTsMonFromOptions(options, suppress_exception):
|
|
"""Sets up ts-mon global state given parsed argparse options.
|
|
|
|
Args:
|
|
options: An argparse options object containing ts-mon flags.
|
|
suppress_exception: True to silence any exception during the setup. Default
|
|
is set to True.
|
|
"""
|
|
googleapiclient.discovery.logger.setLevel(logging.WARNING)
|
|
try:
|
|
config.process_argparse_options(options)
|
|
logging.notice('ts_mon was set up.')
|
|
global _WasSetup # pylint: disable=global-statement
|
|
_WasSetup = True
|
|
except Exception as e:
|
|
logging.warning('Failed to configure ts_mon, monitoring is disabled: %s', e,
|
|
exc_info=True)
|
|
if not suppress_exception:
|
|
raise
|
|
|
|
|
|
def _GenerateTsMonArgparseOptions(service_name, short_lived,
|
|
auto_flush, debug_file, task_num):
|
|
"""Generates an arg list for ts-mon to consume.
|
|
|
|
Args:
|
|
service_name: The name of the task we are sending metrics from.
|
|
short_lived: Whether this process is short-lived and should use the autogen
|
|
hostname prefix.
|
|
auto_flush: Whether to create a thread to automatically flush metrics every
|
|
minute.
|
|
debug_file: If non-none, send metrics to this path instead of to PubSub.
|
|
task_num: Override the default task num of 0.
|
|
"""
|
|
parser = argparse.ArgumentParser()
|
|
config.add_argparse_options(parser)
|
|
|
|
args = [
|
|
'--ts-mon-target-type', 'task',
|
|
'--ts-mon-task-service-name', service_name,
|
|
'--ts-mon-task-job-name', service_name,
|
|
]
|
|
|
|
if debug_file:
|
|
args.extend(['--ts-mon-endpoint', 'file://' + debug_file])
|
|
|
|
# Short lived processes will have autogen: prepended to their hostname and
|
|
# use task-number=PID to trigger shorter retention policies under
|
|
# chrome-infra@, and used by a Monarch precomputation to group across the
|
|
# task number.
|
|
# Furthermore, we assume they manually call ts_mon.Flush(), because the
|
|
# ts_mon thread will drop messages if the process exits before it flushes.
|
|
if short_lived:
|
|
auto_flush = False
|
|
fqdn = socket.getfqdn().lower()
|
|
host = fqdn.split('.')[0]
|
|
args.extend(['--ts-mon-task-hostname', 'autogen:' + host,
|
|
'--ts-mon-task-number', str(os.getpid())])
|
|
elif task_num:
|
|
args.extend(['--ts-mon-task-number', str(task_num)])
|
|
|
|
args.extend(['--ts-mon-flush', 'auto' if auto_flush else 'manual'])
|
|
return parser.parse_args(args=args)
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _CreateTsMonFlushingProcess(options):
|
|
"""Creates a separate process to flush ts_mon metrics.
|
|
|
|
Useful for multiprocessing scenarios where we don't want multiple ts-mon
|
|
threads send contradictory metrics. Instead, functions in
|
|
chromite.lib.metrics will send their calls to a Queue, which is consumed by a
|
|
dedicated flushing process.
|
|
|
|
Args:
|
|
options: An argparse options object to configure ts-mon with.
|
|
|
|
Side effects:
|
|
Sets chromite.lib.metrics.MESSAGE_QUEUE, which causes the metric functions
|
|
to send their calls to the Queue instead of creating the metrics.
|
|
"""
|
|
# If this is nested, we don't need to create another queue and another
|
|
# message consumer. Do nothing to continue to use the existing queue.
|
|
if metrics.MESSAGE_QUEUE or metrics.FLUSHING_PROCESS:
|
|
return
|
|
|
|
with parallel.Manager() as manager:
|
|
message_q = manager.Queue()
|
|
|
|
metrics.FLUSHING_PROCESS = multiprocessing.Process(
|
|
target=lambda: _SetupAndConsumeMessages(message_q, options))
|
|
metrics.FLUSHING_PROCESS.start()
|
|
|
|
# this makes the chromite.lib.metric functions use the queue.
|
|
# note - we have to do this *after* forking the ConsumeMessages process.
|
|
metrics.MESSAGE_QUEUE = message_q
|
|
|
|
try:
|
|
yield message_q
|
|
finally:
|
|
_CleanupMetricsFlushingProcess()
|
|
|
|
|
|
def _CleanupMetricsFlushingProcess():
|
|
"""Sends sentinal value to flushing process and .joins it."""
|
|
# Now that there is no longer a process to listen to the Queue, re-set it
|
|
# to None so that any future metrics are created within this process.
|
|
message_q = metrics.MESSAGE_QUEUE
|
|
flushing_process = metrics.FLUSHING_PROCESS
|
|
metrics.MESSAGE_QUEUE = None
|
|
metrics.FLUSHING_PROCESS = None
|
|
|
|
# If the process has already died, we don't need to try to clean it up.
|
|
if not flushing_process.is_alive():
|
|
return
|
|
|
|
# Send the sentinal value for "flush one more time and exit".
|
|
try:
|
|
message_q.put(None)
|
|
# If the flushing process quits, the message Queue can become full.
|
|
except IOError:
|
|
if not flushing_process.is_alive():
|
|
return
|
|
|
|
logging.info('Waiting for ts_mon flushing process to finish...')
|
|
flushing_process.join(timeout=FLUSH_INTERVAL*2)
|
|
if flushing_process.is_alive():
|
|
flushing_process.terminate()
|
|
if flushing_process.exitcode:
|
|
logging.warning('ts_mon_config flushing process did not exit cleanly.')
|
|
logging.info('Finished waiting for ts_mon process.')
|
|
|
|
|
|
def _SetupAndConsumeMessages(message_q, options):
|
|
"""Sets up ts-mon, and starts a MetricConsumer loop.
|
|
|
|
Args:
|
|
message_q: The metric multiprocessing.Queue to read from.
|
|
options: An argparse options object to configure ts-mon with.
|
|
"""
|
|
# Configure ts-mon, but don't start up a sending thread.
|
|
_SetupTsMonFromOptions(options, suppress_exception=True)
|
|
if not _WasSetup:
|
|
return
|
|
|
|
return MetricConsumer(message_q).Consume()
|
|
|
|
|
|
class MetricConsumer(object):
|
|
"""Configures ts_mon and gets metrics from a message queue.
|
|
|
|
This class is meant to be used in a subprocess. It configures itself
|
|
to receive a SIGHUP signal when the parent process dies, and catches the
|
|
signal in order to have a chance to flush any pending metrics one more time
|
|
before quitting.
|
|
"""
|
|
def __init__(self, message_q):
|
|
# If our parent dies, finish flushing before exiting.
|
|
self.reset_after_flush = []
|
|
self.last_flush = 0
|
|
self.pending = False
|
|
self.message_q = message_q
|
|
|
|
if parallel.ExitWithParent(signal.SIGHUP):
|
|
signal.signal(signal.SIGHUP, lambda _sig, _stack: self._WaitToFlush())
|
|
|
|
|
|
def Consume(self):
|
|
"""Emits metrics from self.message_q, flushing periodically.
|
|
|
|
The loop is terminated by a None entry on the Queue, which is a friendly
|
|
signal from the parent process that it's time to shut down. Before
|
|
returning, we wait to flush one more time to make sure that all the
|
|
metrics were sent.
|
|
"""
|
|
message = self.message_q.get()
|
|
while message:
|
|
self._CallMetric(message)
|
|
message = self._WaitForNextMessage()
|
|
|
|
if self.pending:
|
|
self._WaitToFlush()
|
|
|
|
|
|
def _CallMetric(self, message):
|
|
"""Calls the metric method from |message|, ignoring exceptions."""
|
|
try:
|
|
cls = getattr(metrics, message.metric_name)
|
|
message.method_kwargs.setdefault('fields', {})
|
|
message.metric_kwargs.setdefault('field_spec', [])
|
|
message.method_kwargs['fields'], message.metric_kwargs['field_spec'] = (
|
|
AddCommonFields(message.method_kwargs['fields'],
|
|
message.metric_kwargs['field_spec']))
|
|
metric = cls(*message.metric_args, **message.metric_kwargs)
|
|
if message.reset_after:
|
|
self.reset_after_flush.append(metric)
|
|
getattr(metric, message.method)(
|
|
*message.method_args,
|
|
**message.method_kwargs)
|
|
self.pending = True
|
|
except Exception:
|
|
logging.exception('Caught an exception while running %s',
|
|
_MethodCallRepr(message))
|
|
|
|
|
|
def _WaitForNextMessage(self):
|
|
"""Waits for a new message, flushing every |FLUSH_INTERVAL| seconds."""
|
|
while True:
|
|
time_delta = self._FlushIfReady()
|
|
try:
|
|
timeout = FLUSH_INTERVAL - time_delta
|
|
message = self.message_q.get(timeout=timeout)
|
|
return message
|
|
except Queue.Empty:
|
|
pass
|
|
|
|
|
|
def _WaitToFlush(self):
|
|
"""Sleeps until the next time we can call metrics.Flush(), then flushes."""
|
|
time_delta = time.time() - self.last_flush
|
|
time.sleep(max(0, FLUSH_INTERVAL - time_delta))
|
|
metrics.Flush(reset_after=self.reset_after_flush)
|
|
|
|
|
|
def _FlushIfReady(self):
|
|
"""Call metrics.Flush() if we are ready and have pending metrics.
|
|
|
|
This allows us to only call flush every FLUSH_INTERVAL seconds.
|
|
"""
|
|
now = time.time()
|
|
time_delta = now - self.last_flush
|
|
if time_delta > FLUSH_INTERVAL:
|
|
self.last_flush = now
|
|
time_delta = 0
|
|
metrics.Flush(reset_after=self.reset_after_flush)
|
|
self.pending = False
|
|
return time_delta
|
|
|
|
|
|
def _MethodCallRepr(message):
|
|
"""Gives a string representation of |obj|.|method|(*|args|, **|kwargs|)
|
|
|
|
Args:
|
|
message: A MetricCall object.
|
|
"""
|
|
if not message:
|
|
return repr(message)
|
|
obj = message.metric_name
|
|
method = message.method
|
|
args = message.method_args
|
|
kwargs = message.method_kwargs
|
|
|
|
args_strings = ([repr(x) for x in args] +
|
|
[(str(k) + '=' + repr(v))
|
|
for k, v in kwargs.items()])
|
|
return '%s.%s(%s)' % (repr(obj), method, ', '.join(args_strings))
|