diff --git a/tubearchivist/api/src/aggs.py b/tubearchivist/api/src/aggs.py new file mode 100644 index 00000000..7b3dcdb1 --- /dev/null +++ b/tubearchivist/api/src/aggs.py @@ -0,0 +1,93 @@ +"""aggregations""" + +from home.src.es.connect import ElasticWrap +from home.src.index.video_streams import DurationConverter + + +class AggBase: + """base class for aggregation calls""" + + path: str = "" + data: dict = {} + name: str = "" + + def get(self): + """make get call""" + data_size = {"size": 0, "aggs": self.data} + response, _ = ElasticWrap(self.path).get(data_size) + print(f"[agg][{self.name}] took {response.get('took')} ms to process") + + return response.get("aggregations") + + def process(self): + """implement in subclassess""" + raise NotImplementedError + + +class Primary(AggBase): + """primary aggregation for total documents indexed""" + + name = "primary" + path = "ta_video,ta_channel,ta_playlist,ta_subtitle,ta_download/_search" + data = {name: {"terms": {"field": "_index"}}} + + def process(self): + """make the call""" + aggregations = self.get() + buck = aggregations[self.name]["buckets"] + + return {i.get("key").lstrip("_ta"): i.get("doc_count") for i in buck} + + +class WatchProgress(AggBase): + """get watch progress""" + + name = "watch_progress" + path = "ta_video/_search" + data = { + name: { + "terms": {"field": "player.watched"}, + "aggs": { + "watch_docs": { + "filter": {"terms": {"player.watched": [True, False]}}, + "aggs": { + "true_count": {"value_count": {"field": "_index"}}, + "duration": {"sum": {"field": "player.duration"}}, + }, + }, + }, + } + } + + def process(self): + """make the call""" + aggregations = self.get() + buckets = aggregations[self.name]["buckets"] + + response = {} + for bucket in buckets: + response.update(self._build_bucket(bucket)) + + return response + + @staticmethod + def _build_bucket(bucket): + """parse bucket""" + + duration = int(bucket["watch_docs"]["duration"]["value"]) + duration_str = DurationConverter().get_str(duration) + items = bucket["watch_docs"]["true_count"]["value"] + if bucket["key_as_string"] == "false": + key = "unwatched" + else: + key = "watched" + + bucket_parsed = { + key: { + "duration": duration, + "duration_str": duration_str, + "items": items, + } + } + + return bucket_parsed diff --git a/tubearchivist/api/urls.py b/tubearchivist/api/urls.py index 1af11133..2b98a94a 100644 --- a/tubearchivist/api/urls.py +++ b/tubearchivist/api/urls.py @@ -136,4 +136,14 @@ urlpatterns = [ views.NotificationView.as_view(), name="api-notification", ), + path( + "stats/primary/", + views.StatPrimaryView.as_view(), + name="api-stats-primary", + ), + path( + "stats/watch/", + views.StatWatchProgress.as_view(), + name="api-stats-watch", + ), ] diff --git a/tubearchivist/api/views.py b/tubearchivist/api/views.py index 34762d29..008f8907 100644 --- a/tubearchivist/api/views.py +++ b/tubearchivist/api/views.py @@ -1,5 +1,6 @@ """all API views""" +from api.src.aggs import Primary, WatchProgress from api.src.search_processor import SearchProcess from home.src.download.queue import PendingInteract from home.src.download.subscriptions import ( @@ -975,3 +976,27 @@ class NotificationView(ApiBaseView): query = f"{query}:{filter_by}" return Response(RedisArchivist().list_items(query)) + + +class StatPrimaryView(ApiBaseView): + """resolves to /api/stats/primary/ + GET: return document count + """ + + def get(self, request): + """get stats""" + # pylint: disable=unused-argument + + return Response(Primary().process()) + + +class StatWatchProgress(ApiBaseView): + """resolves to /api/stats/watchprogress/ + GET: return watch/unwatch progress stats + """ + + def get(self, request): + """handle get request""" + # pylint: disable=unused-argument + + return Response(WatchProgress().process())