refactor subscribe from form task

This commit is contained in:
simon 2023-03-02 12:30:48 +07:00
parent dcb0cf6a6d
commit 4c0de78fb4
No known key found for this signature in database
GPG Key ID: 2C15AA5E89985DD4
2 changed files with 53 additions and 30 deletions

View File

@ -13,6 +13,7 @@ from home.src.index.playlist import YoutubePlaylist
from home.src.index.video_constants import VideoTypeEnum from home.src.index.video_constants import VideoTypeEnum
from home.src.ta.config import AppConfig from home.src.ta.config import AppConfig
from home.src.ta.ta_redis import RedisArchivist from home.src.ta.ta_redis import RedisArchivist
from home.src.ta.urlparser import Parser
class ChannelSubscription: class ChannelSubscription:
@ -323,3 +324,53 @@ class SubscriptionScanner:
pending_handler = queue.PendingList(youtube_ids=self.missing_videos) pending_handler = queue.PendingList(youtube_ids=self.missing_videos)
pending_handler.parse_url_list() pending_handler.parse_url_list()
pending_handler.add_to_pending() pending_handler.add_to_pending()
class SubscriptionHandler:
"""subscribe to channels and playlists from url_str"""
def __init__(self, url_str):
self.url_str = url_str
self.to_subscribe = False
def subscribe(self):
"""subscribe to url_str items"""
self.to_subscribe = Parser(self.url_str).parse()
for idx, item in enumerate(self.to_subscribe):
self.subscribe_type(item)
self._notify(idx)
def subscribe_type(self, item):
"""process single item"""
if item["type"] == "playlist":
PlaylistSubscription().process_url_str([item])
return
if item["type"] == "video":
# extract channel id from video
vid = queue.PendingList().get_youtube_details(item["url"])
channel_id = vid["channel_id"]
elif item["type"] == "channel":
channel_id = item["url"]
else:
raise ValueError("failed to subscribe to: " + item["url"])
self._subscribe(channel_id)
def _subscribe(self, channel_id):
"""subscribe to channel"""
ChannelSubscription().change_subscribe(
channel_id, channel_subscribed=True
)
def _notify(self, idx):
"""send notification message to redis"""
key = "message:subchannel"
message = {
"status": key,
"level": "info",
"title": "Subscribing",
"message": f"Processing {idx + 1} of {len(self.to_subscribe)}",
}
RedisArchivist().set_message(key, message=message, expire=True)

View File

@ -12,8 +12,7 @@ import os
from celery import Celery, shared_task from celery import Celery, shared_task
from home.src.download.queue import PendingList from home.src.download.queue import PendingList
from home.src.download.subscriptions import ( from home.src.download.subscriptions import (
ChannelSubscription, SubscriptionHandler,
PlaylistSubscription,
SubscriptionScanner, SubscriptionScanner,
) )
from home.src.download.thumbnails import ThumbFilesystem, ThumbValidator from home.src.download.thumbnails import ThumbFilesystem, ThumbValidator
@ -28,7 +27,6 @@ from home.src.ta.config import AppConfig, ReleaseVersion, ScheduleBuilder
from home.src.ta.helper import clear_dl_cache from home.src.ta.helper import clear_dl_cache
from home.src.ta.ta_redis import RedisArchivist, RedisQueue from home.src.ta.ta_redis import RedisArchivist, RedisQueue
from home.src.ta.task_manager import TaskManager from home.src.ta.task_manager import TaskManager
from home.src.ta.urlparser import Parser
CONFIG = AppConfig().config CONFIG = AppConfig().config
REDIS_HOST = os.environ.get("REDIS_HOST") REDIS_HOST = os.environ.get("REDIS_HOST")
@ -244,33 +242,7 @@ def re_sync_thumbs(self):
@shared_task(name="subscribe_to") @shared_task(name="subscribe_to")
def subscribe_to(url_str): def subscribe_to(url_str):
"""take a list of urls to subscribe to""" """take a list of urls to subscribe to"""
to_subscribe_list = Parser(url_str).parse() SubscriptionHandler(url_str).subscribe()
for idx, item in enumerate(to_subscribe_list):
to_sub_id = item["url"]
if item["type"] == "playlist":
PlaylistSubscription().process_url_str([item])
continue
if item["type"] == "video":
vid_details = PendingList().get_youtube_details(to_sub_id)
channel_id_sub = vid_details["channel_id"]
elif item["type"] == "channel":
channel_id_sub = to_sub_id
else:
raise ValueError("failed to subscribe to: " + to_sub_id)
ChannelSubscription().change_subscribe(
channel_id_sub, channel_subscribed=True
)
# notify for channels
key = "message:subchannel"
message = {
"status": key,
"level": "info",
"title": "Subscribing to Channels",
"message": f"Processing {idx + 1} of {len(to_subscribe_list)}",
}
RedisArchivist().set_message(key, message=message, expire=True)
@shared_task(name="index_playlists") @shared_task(name="index_playlists")