diff --git a/prometheus_client/mmap_dict.py b/prometheus_client/mmap_dict.py index 679597fa..274b7389 100644 --- a/prometheus_client/mmap_dict.py +++ b/prometheus_client/mmap_dict.py @@ -22,6 +22,29 @@ def _pack_integer(data, pos, value): data[pos:pos + 4] = _pack_integer_func(value) +def _read_all_values(data, used=0): + """Yield (key, value, pos). No locking is performed.""" + + if used <= 0: + # If not valid `used` value is passed in, read it from the file. + used = _unpack_integer(data, 0)[0] + + pos = 8 + + while pos < used: + encoded_len = _unpack_integer(data, pos)[0] + # check we are not reading beyond bounds + if encoded_len + pos > used: + raise RuntimeError('Read beyond file size detected, file is corrupted.') + pos += 4 + encoded_key = data[pos : pos + encoded_len] + padded_len = encoded_len + (8 - (encoded_len + 4) % 8) + pos += padded_len + value = _unpack_double(data, pos)[0] + yield encoded_key.decode('utf-8'), value, pos + pos += 8 + + class MmapedDict(object): """A dict of doubles, backed by an mmapped file. @@ -37,9 +60,11 @@ class MmapedDict(object): def __init__(self, filename, read_mode=False): self._f = open(filename, 'rb' if read_mode else 'a+b') self._fname = filename - if os.fstat(self._f.fileno()).st_size == 0: + capacity = os.fstat(self._f.fileno()).st_size + if capacity == 0: self._f.truncate(_INITIAL_MMAP_SIZE) - self._capacity = os.fstat(self._f.fileno()).st_size + capacity = _INITIAL_MMAP_SIZE + self._capacity = capacity self._m = mmap.mmap(self._f.fileno(), self._capacity, access=mmap.ACCESS_READ if read_mode else mmap.ACCESS_WRITE) @@ -53,6 +78,17 @@ def __init__(self, filename, read_mode=False): for key, _, pos in self._read_all_values(): self._positions[key] = pos + @staticmethod + def read_all_values_from_file(filename): + with open(filename, 'rb') as infp: + # Read the first block of data, including the first 4 bytes which tell us + # how much of the file (which is preallocated to _INITIAL_MMAP_SIZE bytes) is occupied. + data = infp.read(65535) + used = _unpack_integer(data, 0)[0] + if used > len(data): # Then read in the rest, if needed. + data += infp.read(used - len(data)) + return _read_all_values(data, used) + def _init_value(self, key): """Initialize a value. Lock must be held by caller.""" encoded = key.encode('utf-8') @@ -72,31 +108,10 @@ def _init_value(self, key): def _read_all_values(self): """Yield (key, value, pos). No locking is performed.""" - - pos = 8 - - # cache variables to local ones and prevent attributes lookup - # on every loop iteration - used = self._used - data = self._m - unpack_from = struct.unpack_from - - while pos < used: - encoded_len = _unpack_integer(data, pos)[0] - # check we are not reading beyond bounds - if encoded_len + pos > used: - msg = 'Read beyond file size detected, %s is corrupted.' - raise RuntimeError(msg % self._fname) - pos += 4 - encoded = unpack_from(('%ss' % encoded_len).encode(), data, pos)[0] - padded_len = encoded_len + (8 - (encoded_len + 4) % 8) - pos += padded_len - value = _unpack_double(data, pos)[0] - yield encoded.decode('utf-8'), value, pos - pos += 8 + return _read_all_values(data=self._m, used=self._used) def read_all_values(self): - """Yield (key, value, pos). No locking is performed.""" + """Yield (key, value). No locking is performed.""" for k, v, _ in self._read_all_values(): yield k, v diff --git a/prometheus_client/multiprocess.py b/prometheus_client/multiprocess.py index e34ced03..16547c1a 100644 --- a/prometheus_client/multiprocess.py +++ b/prometheus_client/multiprocess.py @@ -12,6 +12,8 @@ from .samples import Sample from .utils import floatToGoString +MP_METRIC_HELP = 'Multiprocess metric' + class MultiProcessCollector(object): """Collector for files for multi-process mode.""" @@ -33,18 +35,31 @@ def merge(files, accumulate=True): But if writing the merged data back to mmap files, use accumulate=False to avoid compound accumulation. """ + metrics = MultiProcessCollector._read_metrics(files) + return MultiProcessCollector._accumulate_metrics(metrics, accumulate) + + @staticmethod + def _read_metrics(files): metrics = {} + key_cache = {} + + def _parse_key(key): + val = key_cache.get(key) + if not val: + metric_name, name, labels = json.loads(key) + labels_key = tuple(sorted(labels.items())) + val = key_cache[key] = (metric_name, name, labels, labels_key) + return val + for f in files: parts = os.path.basename(f).split('_') typ = parts[0] - d = MmapedDict(f, read_mode=True) - for key, value in d.read_all_values(): - metric_name, name, labels = json.loads(key) - labels_key = tuple(sorted(labels.items())) + for key, value, pos in MmapedDict.read_all_values_from_file(f): + metric_name, name, labels, labels_key = _parse_key(key) metric = metrics.get(metric_name) if metric is None: - metric = Metric(metric_name, 'Multiprocess metric', typ) + metric = Metric(metric_name, MP_METRIC_HELP, typ) metrics[metric_name] = metric if typ == 'gauge': @@ -54,43 +69,47 @@ def merge(files, accumulate=True): else: # The duplicates and labels are fixed in the next for. metric.add_sample(name, labels_key, value) - d.close() + return metrics + @staticmethod + def _accumulate_metrics(metrics, accumulate): for metric in metrics.values(): samples = defaultdict(float) - buckets = {} + buckets = defaultdict(lambda: defaultdict(float)) + samples_setdefault = samples.setdefault for s in metric.samples: - name, labels, value = s.name, s.labels, s.value + name, labels, value, timestamp, exemplar = s if metric.type == 'gauge': - without_pid = tuple(l for l in labels if l[0] != 'pid') + without_pid_key = (name, tuple([l for l in labels if l[0] != 'pid'])) if metric._multiprocess_mode == 'min': - current = samples.setdefault((name, without_pid), value) + current = samples_setdefault(without_pid_key, value) if value < current: - samples[(s.name, without_pid)] = value + samples[without_pid_key] = value elif metric._multiprocess_mode == 'max': - current = samples.setdefault((name, without_pid), value) + current = samples_setdefault(without_pid_key, value) if value > current: - samples[(s.name, without_pid)] = value + samples[without_pid_key] = value elif metric._multiprocess_mode == 'livesum': - samples[(name, without_pid)] += value + samples[without_pid_key] += value else: # all/liveall samples[(name, labels)] = value elif metric.type == 'histogram': - bucket = tuple(float(l[1]) for l in labels if l[0] == 'le') - if bucket: - # _bucket - without_le = tuple(l for l in labels if l[0] != 'le') - buckets.setdefault(without_le, {}) - buckets[without_le].setdefault(bucket[0], 0.0) - buckets[without_le][bucket[0]] += value - else: + # A for loop with early exit is faster than a genexpr + # or a listcomp that ends up building unnecessary things + for l in labels: + if l[0] == 'le': + bucket_value = float(l[1]) + # _bucket + without_le = tuple(l for l in labels if l[0] != 'le') + buckets[without_le][bucket_value] += value + break + else: # did not find the `le` key # _sum/_count - samples[(s.name, labels)] += value - + samples[(name, labels)] += value else: # Counter and Summary. - samples[(s.name, labels)] += value + samples[(name, labels)] += value # Accumulate bucket values. if metric.type == 'histogram':