Source code for invenio_stats.tasks

# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2016-2018 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Celery background tasks."""

from __future__ import absolute_import, print_function

from celery import shared_task
from dateutil.parser import parse as dateutil_parse

from .proxies import current_stats


[docs]@shared_task def process_events(event_types): """Index statistics events.""" results = [] for e in event_types: processor = current_stats.events[e].processor_class( **current_stats.events[e].processor_config) results.append((e, processor.run())) return results
[docs]@shared_task def aggregate_events(aggregations, start_date=None, end_date=None, update_bookmark=True): """Aggregate indexed events.""" start_date = dateutil_parse(start_date) if start_date else None end_date = dateutil_parse(end_date) if end_date else None results = [] for a in aggregations: aggr_cfg = current_stats.aggregations[a] aggregator = aggr_cfg.aggregator_class( name=aggr_cfg.name, **aggr_cfg.aggregator_config) results.append(aggregator.run(start_date, end_date, update_bookmark)) return results