Source code for robgracli.client

from urlparse import urljoin
from collections import OrderedDict

from .http import HttpClient


def average(values):
    return float(sum(values)) / len(values)


[docs]class GraphiteClient(HttpClient): ''' A simple client for querying Graphite. :param endpoint: the Graphite URL; :param min_queries_range: The minimum range of data to query. Graphite occasionally returns empty data when querying small time ranges (probably on busy servers). The workaround is to query a larger time range and filter out unneeded values, e.g. if we want the data points from 1 minute ago, we query 10 minutes and filter out the oldest 9 minutes. Care must be taken when choosing this value, if it's too large Graphite may return aggregated values, so it must be adapted to your storage schemas. As a guideline, the default value of 10 minutes gave good results on our server for querying 1 minute data ranges with a ``10s:1d,1min:7d,10min:1y`` retention schema; Additional arguments are passed to :class:`robgracli.http.HttpClient`. ''' def __init__(self, endpoint, min_queries_range=60 * 10, *args, **kwargs): super(GraphiteClient, self).__init__(*args, **kwargs) self.endpoint = endpoint self.min_queries_range = min_queries_range
[docs] def query(self, query, from_=60): ''' Return datapoints for *query* over the last *from_* seconds. The return value is an :class:`~collections.OrderedDict` with target names as keys and datapoints ``(value, timestamp)`` pairs as values. ''' query_from = max(self.min_queries_range, from_) url = urljoin(self.endpoint, '/render') response = self.get(url, params={ 'target': query, 'format': 'json', 'from': '-%ss' % query_from, }) data = response.json() ret = OrderedDict() for entry in data: ret[entry['target']] = trim_datapoints(entry['datapoints'], from_) return ret
[docs] def aggregate(self, query, from_=60, aggregator=average): ''' Get the current value of a metric, by aggregating Graphite datapoints over an interval. Values returned by *query* over the last *from_* seconds are aggregated using the *aggregator* function, after filtering out None values. The return value is an :class:`~collections.OrderedDict` with target names as keys and aggregated values as values, or None for targets that returned no datapoints or only None values. ''' data = self.query(query, from_) ret = OrderedDict() for key, values in data.items(): values = [v[0] for v in values if v[0] is not None] if len(values): ret[key] = aggregator(values) else: ret[key] = None return ret
[docs] def find_metrics(self, query): ''' Find metrics on the server. Querying '*' will return the root of all metrics, and you can then find other metrics from there. Return a list of dicts of the form:: [ { 'text': 'carbon', 'expandable': 1, 'leaf': 0, 'id': 'carbon', 'allowChildren': 1 }, { 'text': 'statsd', 'expandable': 1, 'leaf': 0, 'id': 'statsd', 'allowChildren': 1 } ] ''' url = urljoin(self.endpoint, '/metrics/find') response = self.get(url, {'query': query}) return response.json()
def trim_datapoints(datapoints, max_age): if len(datapoints): last_ts = datapoints[-1][1] return filter(lambda (_, ts): last_ts - ts <= max_age, datapoints) else: return []