From 0f9b5d08e986eb27e30a36058ebb3bbd8383ef07 Mon Sep 17 00:00:00 2001 From: Ozzie Isaacs Date: Sun, 18 Aug 2024 10:12:04 +0200 Subject: [PATCH] Fix cover handling on upload formats --- cps/audio.py | 35 +++++++++++++++++---------------- cps/comic.py | 15 +++++++++++---- cps/editbooks.py | 50 +++++++++++++++++++++++------------------------- cps/epub.py | 7 +++++-- cps/uploader.py | 17 ++++++++-------- 5 files changed, 67 insertions(+), 57 deletions(-) diff --git a/cps/audio.py b/cps/audio.py index 8fd28c81..b17bc7bd 100644 --- a/cps/audio.py +++ b/cps/audio.py @@ -26,7 +26,7 @@ from cps.constants import BookMeta log = logger.create() -def get_audio_file_info(tmp_file_path, original_file_extension, original_file_name): +def get_audio_file_info(tmp_file_path, original_file_extension, original_file_name, no_cover_processing): tmp_cover_name = None audio_file = mutagen.File(tmp_file_path) comments = None @@ -50,7 +50,7 @@ def get_audio_file_info(tmp_file_path, original_file_extension, original_file_na pubdate = str(audio_file.tags.get('TDRC').text[0]) if "TDRC" in audio_file.tags else None if not pubdate: pubdate = str(audio_file.tags.get('TDOR').text[0]) if "TDOR" in audio_file.tags else None - if cover_data: + if cover_data and not no_cover_processing: tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') cover_info = cover_data[0] for dat in cover_data: @@ -68,18 +68,19 @@ def get_audio_file_info(tmp_file_path, original_file_extension, original_file_na publisher = audio_file.tags.get('LABEL')[0] if "LABEL" in audio_file else None pubdate = audio_file.tags.get('DATE')[0] if "DATE" in audio_file else None cover_data = audio_file.tags.get('METADATA_BLOCK_PICTURE') - if cover_data: - tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') - cover_info = mutagen.flac.Picture(base64.b64decode(cover_data[0])) - cover.cover_processing(tmp_file_path, cover_info.data, "." + cover_info.mime[-3:]) - if hasattr(audio_file, "pictures"): - cover_info = audio_file.pictures[0] - for dat in audio_file.pictures: - if dat.type == mutagen.id3.PictureType.COVER_FRONT: - cover_info = dat - break - tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') - cover.cover_processing(tmp_file_path, cover_info.data, "." + cover_info.mime[-3:]) + if not no_cover_processing: + if cover_data: + tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') + cover_info = mutagen.flac.Picture(base64.b64decode(cover_data[0])) + cover.cover_processing(tmp_file_path, cover_info.data, "." + cover_info.mime[-3:]) + if hasattr(audio_file, "pictures"): + cover_info = audio_file.pictures[0] + for dat in audio_file.pictures: + if dat.type == mutagen.id3.PictureType.COVER_FRONT: + cover_info = dat + break + tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') + cover.cover_processing(tmp_file_path, cover_info.data, "." + cover_info.mime[-3:]) elif original_file_extension in [".aac"]: title = audio_file.tags.get('Title').value if "Title" in audio_file else None author = audio_file.tags.get('Artist').value if "Artist" in audio_file else None @@ -90,7 +91,7 @@ def get_audio_file_info(tmp_file_path, original_file_extension, original_file_na publisher = audio_file.tags.get('Label').value if "Label" in audio_file else None pubdate = audio_file.tags.get('Year').value if "Year" in audio_file else None cover_data = audio_file.tags['Cover Art (Front)'] - if cover_data: + if cover_data and not no_cover_processing: tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') with open(tmp_cover_name, "wb") as cover_file: cover_file.write(cover_data.value.split(b"\x00",1)[1]) @@ -104,7 +105,7 @@ def get_audio_file_info(tmp_file_path, original_file_extension, original_file_na publisher = audio_file.tags.get('Label')[0].value if "Label" in audio_file else None pubdate = audio_file.tags.get('Year')[0].value if "Year" in audio_file else None cover_data = audio_file.tags.get('WM/Picture', None) - if cover_data: + if cover_data and not no_cover_processing: tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') with open(tmp_cover_name, "wb") as cover_file: cover_file.write(cover_data[0].value) @@ -118,7 +119,7 @@ def get_audio_file_info(tmp_file_path, original_file_extension, original_file_na publisher = "" pubdate = audio_file.tags.get('©day')[0] if "©day" in audio_file.tags else None cover_data = audio_file.tags.get('covr', None) - if cover_data: + if cover_data and not no_cover_processing: tmp_cover_name = os.path.join(os.path.dirname(tmp_file_path), 'cover.jpg') cover_type = None for c in cover_data: diff --git a/cps/comic.py b/cps/comic.py index 27c86c9a..a4a4f136 100644 --- a/cps/comic.py +++ b/cps/comic.py @@ -130,7 +130,7 @@ def _extract_cover(tmp_file_name, original_file_extension, rar_executable): return cover.cover_processing(tmp_file_name, cover_data, extension) -def get_comic_info(tmp_file_path, original_file_name, original_file_extension, rar_executable): +def get_comic_info(tmp_file_path, original_file_name, original_file_extension, rar_executable, no_cover_processing): if use_comic_meta: try: archive = ComicArchive(tmp_file_path, rar_exe_path=rar_executable) @@ -155,14 +155,17 @@ def get_comic_info(tmp_file_path, original_file_name, original_file_extension, r lang = loaded_metadata.language or "" loaded_metadata.language = isoLanguages.get_lang3(lang) - + if not no_cover_processing: + cover_file = _extract_cover(tmp_file_path, original_file_extension, rar_executable) + else: + cover_file = None return BookMeta( file_path=tmp_file_path, extension=original_file_extension, title=loaded_metadata.title or original_file_name, author=" & ".join([credit["person"] for credit in loaded_metadata.credits if credit["role"] == "Writer"]) or 'Unknown', - cover=_extract_cover(tmp_file_path, original_file_extension, rar_executable), + cover=cover_file, description=loaded_metadata.comments or "", tags="", series=loaded_metadata.series or "", @@ -171,13 +174,17 @@ def get_comic_info(tmp_file_path, original_file_name, original_file_extension, r publisher="", pubdate="", identifiers=[]) + if not no_cover_processing: + cover_file = _extract_cover(tmp_file_path, original_file_extension, rar_executable) + else: + cover_file = None return BookMeta( file_path=tmp_file_path, extension=original_file_extension, title=original_file_name, author='Unknown', - cover=_extract_cover(tmp_file_path, original_file_extension, rar_executable), + cover=cover_file, description="", tags="", series="", diff --git a/cps/editbooks.py b/cps/editbooks.py index e8d5fbd3..95d0a766 100644 --- a/cps/editbooks.py +++ b/cps/editbooks.py @@ -140,9 +140,7 @@ def upload(): input_authors[0], meta.file_path, title_dir + meta.extension.lower()) - move_coverfile(meta, db_book) - if modify_date: calibre_db.set_metadata_dirty(book_id) # save data to database, reread data @@ -480,13 +478,10 @@ def do_edit_book(book_id, upload_formats=None): meta = True else: # handle upload other formats from local disk - meta = upload_single_file(upload_formats, book, book_id) - # only merge metadata if file was uploaded and no error occurred (meta equals not false or none) - if meta: - upload_format = merge_metadata(to_save, meta) + to_save, upload_format = upload_book_formats(upload_formats, book, book_id, book.has_cover) # handle upload covers from local disk cover_upload_success = upload_cover(request, book) - if cover_upload_success: + if cover_upload_success or to_save.get("format_cover"): book.has_cover = 1 modify_date = True @@ -494,7 +489,7 @@ def do_edit_book(book_id, upload_formats=None): if config.config_use_google_drive: gdriveutils.updateGdriveCalibreFromLocal() - if to_save.get("cover_url", None): + if to_save.get("cover_url",): if not current_user.role_upload(): edit_error = True flash(_("User has no rights to upload cover"), category="error") @@ -554,13 +549,15 @@ def do_edit_book(book_id, upload_formats=None): calibre_db.session.commit() if config.config_use_google_drive: gdriveutils.updateGdriveCalibreFromLocal() + # if format is upladed and something goes wrong to_save is set to empty dictonary + if (len(to_save) + and edit_error is not True and title_author_error is not True and cover_upload_success is not False): + flash(_("Metadata successfully updated"), category="success") + if upload_formats: resp = {"location": url_for('edit-book.show_edit_book', book_id=book_id)} return Response(json.dumps(resp), mimetype='application/json') - # if meta is not False \ - if edit_error is not True and title_author_error is not True and cover_upload_success is not False: - flash(_("Metadata successfully updated"), category="success") if "detail_view" in to_save: return redirect(url_for('web.show_book', book_id=book.id)) else: @@ -590,7 +587,7 @@ def merge_metadata(to_save, meta): for s_field, m_field in [ ('tags', 'tags'), ('author_name', 'author'), ('series', 'series'), ('series_index', 'series_id'), ('languages', 'languages'), - ('book_title', 'title'), ('description', 'description'),]: + ('book_title', 'title'), ('description', 'description'),('format_cover', 'cover')]: val = getattr(meta, m_field, '') if val: to_save[s_field] = val @@ -1205,26 +1202,26 @@ def edit_cc_data(book_id, book, to_save, cc): # returns None if no file is uploaded # returns False if an error occurs, in all other cases the ebook metadata is returned -def upload_single_file(requested_files, book, book_id): +def upload_book_formats(requested_files, book, book_id, no_cover=True): # Check and handle Uploaded file - # requested_file = file_request.files.get('btn-upload-format', None) - # ToDo: Handle multiple files - meta = {} + to_save = dict() + upload_format = False allowed_extensions = config.config_upload_formats.split(',') for requested_file in requested_files: + current_filename = requested_file.filename if config.config_check_extensions and allowed_extensions != ['']: if not validate_mime_type(requested_file, allowed_extensions): flash(_("File type isn't allowed to be uploaded to this server"), category="error") continue # return False # check for empty request - if requested_file.filename != '': + if current_filename != '': if not current_user.role_upload(): flash(_("User has no rights to upload additional file formats"), category="error") continue # return False - if '.' in requested_file.filename: - file_ext = requested_file.filename.rsplit('.', 1)[-1].lower() + if '.' in current_filename: + file_ext = current_filename.rsplit('.', 1)[-1].lower() if file_ext not in allowed_extensions and '' not in allowed_extensions: flash(_("File extension '%(ext)s' is not allowed to be uploaded to this server", ext=file_ext), category="error") @@ -1256,10 +1253,9 @@ def upload_single_file(requested_files, book, book_id): # return False file_size = os.path.getsize(saved_filename) - is_format = calibre_db.get_book_format(book_id, file_ext.upper()) # Format entry already exists, no need to update the database - if is_format: + if calibre_db.get_book_format(book_id, file_ext.upper()): log.warning('Book format %s already existing', file_ext.upper()) else: try: @@ -1279,11 +1275,13 @@ def upload_single_file(requested_files, book, book_id): link = '{}'.format(url_for('web.show_book', book_id=book.id), escape(book.title)) upload_text = N_("File format %(ext)s added to %(book)s", ext=file_ext.upper(), book=link) WorkerThread.add(current_user.name, TaskUpload(upload_text, escape(book.title))) - - return uploader.process( - saved_filename, *os.path.splitext(requested_file.filename), - rar_executable=config.config_rarfile_location) - return meta if len(meta) else False + meta = uploader.process( + saved_filename, + *os.path.splitext(current_filename), + rar_executable=config.config_rarfile_location, + no_cover=no_cover) + upload_format |= merge_metadata(to_save, meta) + return to_save if len(to_save) else False, upload_format def upload_cover(cover_request, book): diff --git a/cps/epub.py b/cps/epub.py index e84822f3..626e68ee 100644 --- a/cps/epub.py +++ b/cps/epub.py @@ -66,7 +66,7 @@ def get_epub_layout(book, book_data): return layout[0] -def get_epub_info(tmp_file_path, original_file_name, original_file_extension): +def get_epub_info(tmp_file_path, original_file_name, original_file_extension, no_cover_processing): ns = { 'n': 'urn:oasis:names:tc:opendocument:xmlns:container', 'pkg': 'http://www.idpf.org/2007/opf', @@ -117,7 +117,10 @@ def get_epub_info(tmp_file_path, original_file_name, original_file_extension): epub_metadata = parse_epub_series(ns, tree, epub_metadata) epub_zip = zipfile.ZipFile(tmp_file_path) - cover_file = parse_epub_cover(ns, tree, epub_zip, cover_path, tmp_file_path) + if not no_cover_processing: + cover_file = parse_epub_cover(ns, tree, epub_zip, cover_path, tmp_file_path) + else: + cover_file = None identifiers = [] for node in p.xpath('dc:identifier', namespaces=ns): diff --git a/cps/uploader.py b/cps/uploader.py index d59142c4..e63ceaad 100644 --- a/cps/uploader.py +++ b/cps/uploader.py @@ -77,24 +77,25 @@ except ImportError as e: use_audio_meta = False -def process(tmp_file_path, original_file_name, original_file_extension, rar_executable): +def process(tmp_file_path, original_file_name, original_file_extension, rar_executable, no_cover=False): meta = default_meta(tmp_file_path, original_file_name, original_file_extension) extension_upper = original_file_extension.upper() try: if ".PDF" == extension_upper: - meta = pdf_meta(tmp_file_path, original_file_name, original_file_extension) + meta = pdf_meta(tmp_file_path, original_file_name, original_file_extension, no_cover) elif extension_upper in [".KEPUB", ".EPUB"] and use_epub_meta is True: - meta = epub.get_epub_info(tmp_file_path, original_file_name, original_file_extension) + meta = epub.get_epub_info(tmp_file_path, original_file_name, original_file_extension, no_cover) elif ".FB2" == extension_upper and use_fb2_meta is True: meta = fb2.get_fb2_info(tmp_file_path, original_file_extension) elif extension_upper in ['.CBZ', '.CBT', '.CBR', ".CB7"]: meta = comic.get_comic_info(tmp_file_path, original_file_name, original_file_extension, - rar_executable) + rar_executable, + no_cover) elif extension_upper in [".MP3", ".OGG", ".FLAC", ".WAV", ".AAC", ".AIFF", ".ASF", ".MP4", ".M4A", ".M4B", ".OGV", ".OPUS"] and use_audio_meta: - meta = audio.get_audio_file_info(tmp_file_path, original_file_extension, original_file_name) + meta = audio.get_audio_file_info(tmp_file_path, original_file_extension, original_file_name, no_cover) except Exception as ex: log.warning('cannot parse metadata, using default: %s', ex) @@ -168,7 +169,7 @@ def parse_xmp(pdf_file): } -def pdf_meta(tmp_file_path, original_file_name, original_file_extension): +def pdf_meta(tmp_file_path, original_file_name, original_file_extension, no_cover_processing): doc_info = None xmp_info = None @@ -216,7 +217,7 @@ def pdf_meta(tmp_file_path, original_file_name, original_file_extension): extension=original_file_extension, title=title, author=author, - cover=pdf_preview(tmp_file_path, original_file_name), + cover=pdf_preview(tmp_file_path, original_file_name) if not no_cover_processing else None, description=subject, tags=tags, series="", @@ -231,7 +232,7 @@ def pdf_preview(tmp_file_path, tmp_dir): if use_generic_pdf_cover: return None try: - cover_file_name = os.path.splitext(tmp_file_path)[0] + ".cover.jpg" + cover_file_name = os.path.join(os.path.dirname(tmp_file_path), "cover.jpg") with Image() as img: img.options["pdf:use-cropbox"] = "true" img.read(filename=tmp_file_path + '[0]', resolution=150)