From 0be720599dc84b3ce1d626d87adc70426e384bf6 Mon Sep 17 00:00:00 2001 From: xenofem Date: Fri, 1 Mar 2024 23:43:38 -0500 Subject: [PATCH] refactor collation code --- dlibrary/dlibrary.py | 448 ++++++++++++++++++++++--------------------- 1 file changed, 225 insertions(+), 223 deletions(-) diff --git a/dlibrary/dlibrary.py b/dlibrary/dlibrary.py index c69623f..b9fe859 100755 --- a/dlibrary/dlibrary.py +++ b/dlibrary/dlibrary.py @@ -206,6 +206,217 @@ def fetch(args): asyncio.run(fetch_async(args)) +def collate(args): + con = sqlite3.connect(args.destdir / 'meta.db') + cur = con.cursor() + + extraction_dir = args.destdir / 'extract' + hint_map = {Path(relpath(hint, extraction_dir)).parents[-2].name: hint for hint in args.hints} + + collation_staging_area = args.destdir / 'site' / 'images-staging' + collation_staging_area.mkdir(parents=True) + + collation_area = args.destdir / 'site' / 'images' + collation_area.mkdir(parents=True, exist_ok=True) + + for work_path in extraction_dir.iterdir(): + work_id = work_path.name + + work_collation_dir = collation_area / work_id + if work_collation_dir.exists(): + continue + + virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone() + if virtual == (1,): + continue + + work_staging_dir = collation_staging_area / work_id + + collator = Collator(work_staging_dir, [], args.locale) + collation_result = collator.collate_from_paths([hint_map.get(work_id, work_path)]) + if collation_result and collator.index > 0: + print(f'Collated {collator.index} pages for {work_id}') + work_staging_dir.rename(work_collation_dir) + else: + if work_staging_dir.is_dir(): + for f in work_staging_dir.iterdir(): + f.unlink() + work_staging_dir.rmdir() + + if not collation_result: + print(f'Unable to deduce file structure for {work_id}, skipping') + elif collator.index == 0: + print(f'{work_id} contains no files? skipping') + + collation_staging_area.rmdir() + con.close() + +class Collator: + def __init__(self, dest, exclude, locale): + self.dest = dest + self.exclude = exclude + self.locale = locale + self.index = 0 + + def collate_from_paths(self, srcs): + if len(srcs) == 1 and srcs[0].is_dir(): + return self.collate_from_paths(ls_ignore(srcs[0], self.exclude)) + + if len(srcs) == 1 and is_pdf(srcs[0]): + print(f'Extracting images from {srcs[0]}') + return self.link_pdf(srcs[0]) + + if len(srcs) == 0: + return True + + if len(srcs) == 2 and all(src.is_dir() for src in srcs): + for quality in IMAGE_QUALITY_REGEXES: + def a_not_b(a, b, src): + if a in quality: + return quality[a].search(nname(src)) + else: + return not quality[b].search(nname(src)) + better_srcs = [src for src in srcs if a_not_b('better', 'worse', src)] + worse_srcs = [src for src in srcs if a_not_b('worse', 'better', src)] + if len(better_srcs) == 1 and len(worse_srcs) == 1 and better_srcs[0] != worse_srcs[0]: + better = better_srcs[0] + worse = worse_srcs[0] + if len(descendant_files_ignore(better, self.exclude)) == len(descendant_files_ignore(worse, self.exclude)): + return self.collate_from_paths([better]) + + images_vs_pdf = self.try_collate_images_vs_pdf(srcs) + if images_vs_pdf is not False: + return images_vs_pdf + + for regexes in SPLITS: + split_attempt = self.try_collate_split_regex(srcs, **regexes) + if split_attempt is not False: + return split_attempt + + if all(src.is_file() and is_image(src) for src in srcs): + ordering = complete_prefix_number_ordering(srcs) + if ordering: + print(f'Symlinking image files: {ordering[0]}...') + return self.link_ordered_files(ordering) + else: + return None + + return None + + def link_pdf(self, src): + with fitz.open(src) as pdf: + xrefs = image_xrefs(pdf) + if xrefs is None: + print(f'Support for weirder PDFs not yet implemented, skipping {src}') + return None + + self.dest.mkdir(parents=True, exist_ok=True) + for (idx, xref) in enumerate(xrefs, start=self.index): + image = pdf.extract_image(xref) + file_path = self.dest / f'{idx:04d}.{image["ext"]}' + with open(file_path, 'wb') as f: + f.write(image["image"]) + + self.index += pdf.page_count + return True + + def link_ordered_files(self, ordering): + self.dest.mkdir(parents=True, exist_ok=True) + + for (idx, src_path) in enumerate(ordering, start=self.index): + ext = src_path.suffix.lower() + link_path = self.dest / f'{idx:04d}{ext}' + link_path.symlink_to(relpath(src_path, self.dest)) + + self.index += len(ordering) + return True + + def try_collate_split_regex(self, srcs, earlier=None, later=None): + early_srcs = [] + middle_srcs = [] + late_srcs = [] + for src in srcs: + if earlier and earlier.search(nname(src)): + early_srcs.append(src) + elif later and later.search(nname(src)): + late_srcs.append(src) + else: + middle_srcs.append(src) + + if sum(1 for l in [early_srcs, middle_srcs, late_srcs] if l) <= 1: + return False + + early_page_collation = self.collate_from_paths(early_srcs) + if early_page_collation is None: + return None + + middle_page_collation = self.collate_from_paths(middle_srcs) + if middle_page_collation is None: + return None + + late_page_collation = self.collate_from_paths(late_srcs) + if late_page_collation is None: + return None + + return True + + def try_collate_images_vs_pdf(self, srcs): + pdfs = [src for src in srcs if 'pdf' in src.name.lower()] + if len(pdfs) != 1: + return False + outer_pdf = pdfs[0] + + inner_pdfs = [f for f in descendant_files_ignore(outer_pdf, self.exclude) if is_pdf(f)] + if len(inner_pdfs) != 1: + return False + inner_pdf = inner_pdfs[0] + + non_pdf_srcs = [src for src in srcs if src != outer_pdf] + images = [] + non_images = [] + descendant_files = [f for src in non_pdf_srcs for f in descendant_files_ignore(src, self.exclude)] + for f in descendant_files: + if is_image(f): + images.append(f) + else: + non_images.append(f) + break + + if len(non_images) != 0 or len(images) == 0: + return False + + pdf_sizes = pdf_image_sizes(inner_pdf) + standalone_sizes = [standalone_image_size(f) for f in images] + + median_pdf_size = median(pdf_sizes) + median_standalone_size = median(standalone_sizes) + if not (median_pdf_size and median_standalone_size): + return False + + if abs(len(pdf_sizes) - len(standalone_sizes)) > 2: + with fitz.open(inner_pdf) as pdf: + pdf_page_count = len(pdf) + height_adjusted_pdf_image_count = ( + len(pdf_sizes) * + mean([size[1] for size in pdf_sizes]) / mean([size[1] for size in standalone_sizes]) + ) + if ( + abs(pdf_page_count - len(standalone_sizes)) <= 2 and + len(pdf_sizes) > len(standalone_sizes) and + median_pdf_size[0] == median_standalone_size[0] and + abs(height_adjusted_pdf_image_count - len(standalone_sizes)) <= 2 + ): + return self.collate_from_paths(non_pdf_srcs) + else: + return False + + if superior_or_equal(median_standalone_size, median_pdf_size): + return self.collate_from_paths(non_pdf_srcs) + elif superior_or_equal(median_pdf_size, median_standalone_size): + return self.collate_from_paths([outer_pdf]) + else: + return False + def image_xrefs(pdf): images_by_page = [page.get_images() for page in pdf] if all(len(images) == 1 for images in images_by_page): @@ -224,22 +435,6 @@ def image_xrefs(pdf): print('\nSuccess') return xrefs -def link_pdf(src, dest, start_index): - with fitz.open(src) as pdf: - xrefs = image_xrefs(pdf) - if xrefs is None: - print(f'Support for weirder PDFs not yet implemented, skipping {src}') - return None - - dest.mkdir(parents=True, exist_ok=True) - for (idx, xref) in enumerate(xrefs, start=start_index): - image = pdf.extract_image(xref) - file_path = dest / f'{idx:04d}.{image["ext"]}' - with open(file_path, 'wb') as f: - f.write(image["image"]) - - return pdf.page_count - def nfc(s): return unicodedata.normalize('NFC', s) @@ -357,14 +552,6 @@ def alphabetic_numbering(entries, start_point): return None return alphabetized -def link_ordered_files(ordering, dest, start_index): - dest.mkdir(parents=True, exist_ok=True) - - for (idx, src_path) in enumerate(ordering, start=start_index): - ext = src_path.suffix.lower() - link_path = dest / f'{idx:04d}{ext}' - link_path.symlink_to(relpath(src_path, dest)) - def check_extension(path, exts): return path.suffix.lower() in exts @@ -396,81 +583,6 @@ def descendant_files_ignore(path, exclude): return result -def collate(args): - con = sqlite3.connect(args.destdir / 'meta.db') - cur = con.cursor() - - extraction_dir = args.destdir / 'extract' - hint_map = {Path(relpath(hint, extraction_dir)).parents[-2].name: hint for hint in args.hints} - - collation_staging_area = args.destdir / 'site' / 'images-staging' - collation_staging_area.mkdir(parents=True) - - collation_area = args.destdir / 'site' / 'images' - collation_area.mkdir(parents=True, exist_ok=True) - - for work_path in extraction_dir.iterdir(): - work_id = work_path.name - - work_collation_dir = collation_area / work_id - if work_collation_dir.exists(): - continue - - virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone() - if virtual == (1,): - continue - - work_staging_dir = collation_staging_area / work_id - - pages_collated = collate_from_paths([hint_map.get(work_id, work_path)], work_staging_dir, 0, []) - if pages_collated: - print(f'Collated {pages_collated} pages for {work_id}') - work_staging_dir.rename(work_collation_dir) - else: - if work_staging_dir.is_dir(): - for f in work_staging_dir.iterdir(): - f.unlink() - work_staging_dir.rmdir() - - if pages_collated == 0: - print(f'{work_id} contains no files? skipping') - elif pages_collated is None: - print(f'Unable to deduce file structure for {work_id}, skipping') - - collation_staging_area.rmdir() - con.close() - -def try_collate_split_regex(srcs, dest, start_index, exclude, earlier=None, later=None): - early_srcs = [] - middle_srcs = [] - late_srcs = [] - for src in srcs: - if earlier and earlier.search(nname(src)): - early_srcs.append(src) - elif later and later.search(nname(src)): - late_srcs.append(src) - else: - middle_srcs.append(src) - - if sum(1 for l in [early_srcs, middle_srcs, late_srcs] if l) <= 1: - return False - - early_page_count = collate_from_paths(early_srcs, dest, start_index, exclude) - if early_page_count is None: - return None - start_index += early_page_count - - middle_page_count = collate_from_paths(middle_srcs, dest, start_index, exclude) - if middle_page_count is None: - return None - start_index += middle_page_count - - late_page_count = collate_from_paths(late_srcs, dest, start_index, exclude) - if late_page_count is None: - return None - - return early_page_count + middle_page_count + late_page_count - def standalone_image_size(filepath): with Image.open(filepath) as im: return im.size @@ -503,108 +615,6 @@ def mean(items): def superior_or_equal(a, b): return len(a) >= len(b) and all(a[i] >= b[i] for i in range(len(b))) -def try_collate_images_vs_pdf(srcs, dest, start_index, exclude): - pdfs = [src for src in srcs if 'pdf' in src.name.lower()] - if len(pdfs) != 1: - return False - outer_pdf = pdfs[0] - - inner_pdfs = [f for f in descendant_files_ignore(outer_pdf, exclude) if is_pdf(f)] - if len(inner_pdfs) != 1: - return False - inner_pdf = inner_pdfs[0] - - non_pdf_srcs = [src for src in srcs if src != outer_pdf] - images = [] - non_images = [] - descendant_files = [f for src in non_pdf_srcs for f in descendant_files_ignore(src, exclude)] - for f in descendant_files: - if is_image(f): - images.append(f) - else: - non_images.append(f) - break - - if len(non_images) != 0 or len(images) == 0: - return False - - pdf_sizes = pdf_image_sizes(inner_pdf) - standalone_sizes = [standalone_image_size(f) for f in images] - - median_pdf_size = median(pdf_sizes) - median_standalone_size = median(standalone_sizes) - if not (median_pdf_size and median_standalone_size): - return False - - if abs(len(pdf_sizes) - len(standalone_sizes)) > 2: - with fitz.open(inner_pdf) as pdf: - pdf_page_count = len(pdf) - height_adjusted_pdf_image_count = ( - len(pdf_sizes) * - mean([size[1] for size in pdf_sizes]) / mean([size[1] for size in standalone_sizes]) - ) - if ( - abs(pdf_page_count - len(standalone_sizes)) <= 2 and - len(pdf_sizes) > len(standalone_sizes) and - median_pdf_size[0] == median_standalone_size[0] and - abs(height_adjusted_pdf_image_count - len(standalone_sizes)) <= 2 - ): - return collate_from_paths(non_pdf_srcs, dest, start_index, exclude) - else: - return False - - if superior_or_equal(median_standalone_size, median_pdf_size): - return collate_from_paths(non_pdf_srcs, dest, start_index, exclude) - elif superior_or_equal(median_pdf_size, median_standalone_size): - return collate_from_paths([outer_pdf], dest, start_index, exclude) - else: - return False - -def collate_from_paths(srcs, dest, start_index, exclude): - if len(srcs) == 1 and srcs[0].is_dir(): - return collate_from_paths(ls_ignore(srcs[0], exclude), dest, start_index, exclude) - - if len(srcs) == 1 and is_pdf(srcs[0]): - print(f'Extracting images from {srcs[0]}') - return link_pdf(srcs[0], dest, start_index) - - if len(srcs) == 0: - return 0 - - if len(srcs) == 2 and all(src.is_dir() for src in srcs): - for quality in IMAGE_QUALITY_REGEXES: - def a_not_b(a, b, src): - if a in quality: - return quality[a].search(nname(src)) - else: - return not quality[b].search(nname(src)) - better_srcs = [src for src in srcs if a_not_b('better', 'worse', src)] - worse_srcs = [src for src in srcs if a_not_b('worse', 'better', src)] - if len(better_srcs) == 1 and len(worse_srcs) == 1 and better_srcs[0] != worse_srcs[0]: - better = better_srcs[0] - worse = worse_srcs[0] - if len(descendant_files_ignore(better, exclude)) == len(descendant_files_ignore(worse, exclude)): - return collate_from_paths([better], dest, start_index, exclude) - - images_vs_pdf = try_collate_images_vs_pdf(srcs, dest, start_index, exclude) - if images_vs_pdf != False: - return images_vs_pdf - - for regexes in SPLITS: - split_attempt = try_collate_split_regex(srcs, dest, start_index, exclude, **regexes) - if split_attempt != False: - return split_attempt - - if all(src.is_file() and is_image(src) for src in srcs): - ordering = complete_prefix_number_ordering(srcs) - if ordering: - print(f'Symlinking image files: {ordering[0]}...') - link_ordered_files(ordering, dest, start_index) - return len(ordering) - else: - return None - - return None def self_and_parents(path): return [path] + list(path.parents) @@ -680,30 +690,22 @@ def manual_collate(args): work_staging_dir = collation_staging_area / work_id work_staging_dir.mkdir(parents=True) - pages_collated = 0 + collator = Collator(work_staging_dir, exclusions, args.locale) for group in groups: - pages_added = collate_from_paths( - [item for item in group if item not in exclusions], - work_staging_dir, - pages_collated, - exclusions, - ) - if pages_added is None: + collation_result = collator.collate_from_paths([item for item in group if item not in exclusions]) + if collation_result is None: print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}') - pages_collated = None break - pages_collated += pages_added - - if pages_collated: - print(f'Collated {pages_collated} pages for {work_id}') + if collation_result and collator.index > 0: + print(f'Collated {collator.index} pages for {work_id}') work_staging_dir.rename(work_collation_dir) else: for f in work_staging_dir.iterdir(): f.unlink() work_staging_dir.rmdir() - if pages_collated == 0: + if collation_result and collator.index == 0: print(f'No files found for {work_id}') collation_staging_area.rmdir() @@ -913,6 +915,14 @@ argparser.add_argument( default=Path(os.getenv('DLIBRARY_DIR', './dlibrary')), help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)', ) +argparser.add_argument( + '-l', '--locale', + type=str, + default=os.getenv('DLIBRARY_LOCALE', 'en_US'), + help=('preferred locale for requesting metadata and collating (e.g. "ja_JP", "en_US"). ' + 'May still fall back to Japanese if other languages are unavailable. ' + '(default: $DLIBRARY_LOCALE or en_US)'), +) subparsers = argparser.add_subparsers(title="subcommands", required=True) parser_extract = subparsers.add_parser('extract', aliases=['x', 'ex'], help='extract zipfiles') @@ -931,14 +941,6 @@ parser_extract.add_argument( parser_extract.set_defaults(func=extract) parser_fetch = subparsers.add_parser('fetch', aliases=['f', 'fet'], help='fetch metadata and thumbnails') -parser_fetch.add_argument( - '-l', '--locale', - type=str, - default=os.getenv('DLIBRARY_LOCALE', 'en_US'), - help=('locale to use when requesting metadata (e.g. "ja_JP", "en_US"). ' - 'May still fall back to Japanese if metadata in other languages is unavailable. ' - '(default: $DLIBRARY_LOCALE or en_US)'), -) parser_fetch.set_defaults(func=fetch) parser_collate = subparsers.add_parser(