refactor collation code

This commit is contained in:
xenofem 2024-03-01 23:43:38 -05:00
parent 3ed462972a
commit 0be720599d

View file

@ -206,6 +206,217 @@ def fetch(args):
asyncio.run(fetch_async(args)) asyncio.run(fetch_async(args))
def collate(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
extraction_dir = args.destdir / 'extract'
hint_map = {Path(relpath(hint, extraction_dir)).parents[-2].name: hint for hint in args.hints}
collation_staging_area = args.destdir / 'site' / 'images-staging'
collation_staging_area.mkdir(parents=True)
collation_area = args.destdir / 'site' / 'images'
collation_area.mkdir(parents=True, exist_ok=True)
for work_path in extraction_dir.iterdir():
work_id = work_path.name
work_collation_dir = collation_area / work_id
if work_collation_dir.exists():
continue
virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
if virtual == (1,):
continue
work_staging_dir = collation_staging_area / work_id
collator = Collator(work_staging_dir, [], args.locale)
collation_result = collator.collate_from_paths([hint_map.get(work_id, work_path)])
if collation_result and collator.index > 0:
print(f'Collated {collator.index} pages for {work_id}')
work_staging_dir.rename(work_collation_dir)
else:
if work_staging_dir.is_dir():
for f in work_staging_dir.iterdir():
f.unlink()
work_staging_dir.rmdir()
if not collation_result:
print(f'Unable to deduce file structure for {work_id}, skipping')
elif collator.index == 0:
print(f'{work_id} contains no files? skipping')
collation_staging_area.rmdir()
con.close()
class Collator:
def __init__(self, dest, exclude, locale):
self.dest = dest
self.exclude = exclude
self.locale = locale
self.index = 0
def collate_from_paths(self, srcs):
if len(srcs) == 1 and srcs[0].is_dir():
return self.collate_from_paths(ls_ignore(srcs[0], self.exclude))
if len(srcs) == 1 and is_pdf(srcs[0]):
print(f'Extracting images from {srcs[0]}')
return self.link_pdf(srcs[0])
if len(srcs) == 0:
return True
if len(srcs) == 2 and all(src.is_dir() for src in srcs):
for quality in IMAGE_QUALITY_REGEXES:
def a_not_b(a, b, src):
if a in quality:
return quality[a].search(nname(src))
else:
return not quality[b].search(nname(src))
better_srcs = [src for src in srcs if a_not_b('better', 'worse', src)]
worse_srcs = [src for src in srcs if a_not_b('worse', 'better', src)]
if len(better_srcs) == 1 and len(worse_srcs) == 1 and better_srcs[0] != worse_srcs[0]:
better = better_srcs[0]
worse = worse_srcs[0]
if len(descendant_files_ignore(better, self.exclude)) == len(descendant_files_ignore(worse, self.exclude)):
return self.collate_from_paths([better])
images_vs_pdf = self.try_collate_images_vs_pdf(srcs)
if images_vs_pdf is not False:
return images_vs_pdf
for regexes in SPLITS:
split_attempt = self.try_collate_split_regex(srcs, **regexes)
if split_attempt is not False:
return split_attempt
if all(src.is_file() and is_image(src) for src in srcs):
ordering = complete_prefix_number_ordering(srcs)
if ordering:
print(f'Symlinking image files: {ordering[0]}...')
return self.link_ordered_files(ordering)
else:
return None
return None
def link_pdf(self, src):
with fitz.open(src) as pdf:
xrefs = image_xrefs(pdf)
if xrefs is None:
print(f'Support for weirder PDFs not yet implemented, skipping {src}')
return None
self.dest.mkdir(parents=True, exist_ok=True)
for (idx, xref) in enumerate(xrefs, start=self.index):
image = pdf.extract_image(xref)
file_path = self.dest / f'{idx:04d}.{image["ext"]}'
with open(file_path, 'wb') as f:
f.write(image["image"])
self.index += pdf.page_count
return True
def link_ordered_files(self, ordering):
self.dest.mkdir(parents=True, exist_ok=True)
for (idx, src_path) in enumerate(ordering, start=self.index):
ext = src_path.suffix.lower()
link_path = self.dest / f'{idx:04d}{ext}'
link_path.symlink_to(relpath(src_path, self.dest))
self.index += len(ordering)
return True
def try_collate_split_regex(self, srcs, earlier=None, later=None):
early_srcs = []
middle_srcs = []
late_srcs = []
for src in srcs:
if earlier and earlier.search(nname(src)):
early_srcs.append(src)
elif later and later.search(nname(src)):
late_srcs.append(src)
else:
middle_srcs.append(src)
if sum(1 for l in [early_srcs, middle_srcs, late_srcs] if l) <= 1:
return False
early_page_collation = self.collate_from_paths(early_srcs)
if early_page_collation is None:
return None
middle_page_collation = self.collate_from_paths(middle_srcs)
if middle_page_collation is None:
return None
late_page_collation = self.collate_from_paths(late_srcs)
if late_page_collation is None:
return None
return True
def try_collate_images_vs_pdf(self, srcs):
pdfs = [src for src in srcs if 'pdf' in src.name.lower()]
if len(pdfs) != 1:
return False
outer_pdf = pdfs[0]
inner_pdfs = [f for f in descendant_files_ignore(outer_pdf, self.exclude) if is_pdf(f)]
if len(inner_pdfs) != 1:
return False
inner_pdf = inner_pdfs[0]
non_pdf_srcs = [src for src in srcs if src != outer_pdf]
images = []
non_images = []
descendant_files = [f for src in non_pdf_srcs for f in descendant_files_ignore(src, self.exclude)]
for f in descendant_files:
if is_image(f):
images.append(f)
else:
non_images.append(f)
break
if len(non_images) != 0 or len(images) == 0:
return False
pdf_sizes = pdf_image_sizes(inner_pdf)
standalone_sizes = [standalone_image_size(f) for f in images]
median_pdf_size = median(pdf_sizes)
median_standalone_size = median(standalone_sizes)
if not (median_pdf_size and median_standalone_size):
return False
if abs(len(pdf_sizes) - len(standalone_sizes)) > 2:
with fitz.open(inner_pdf) as pdf:
pdf_page_count = len(pdf)
height_adjusted_pdf_image_count = (
len(pdf_sizes) *
mean([size[1] for size in pdf_sizes]) / mean([size[1] for size in standalone_sizes])
)
if (
abs(pdf_page_count - len(standalone_sizes)) <= 2 and
len(pdf_sizes) > len(standalone_sizes) and
median_pdf_size[0] == median_standalone_size[0] and
abs(height_adjusted_pdf_image_count - len(standalone_sizes)) <= 2
):
return self.collate_from_paths(non_pdf_srcs)
else:
return False
if superior_or_equal(median_standalone_size, median_pdf_size):
return self.collate_from_paths(non_pdf_srcs)
elif superior_or_equal(median_pdf_size, median_standalone_size):
return self.collate_from_paths([outer_pdf])
else:
return False
def image_xrefs(pdf): def image_xrefs(pdf):
images_by_page = [page.get_images() for page in pdf] images_by_page = [page.get_images() for page in pdf]
if all(len(images) == 1 for images in images_by_page): if all(len(images) == 1 for images in images_by_page):
@ -224,22 +435,6 @@ def image_xrefs(pdf):
print('\nSuccess') print('\nSuccess')
return xrefs return xrefs
def link_pdf(src, dest, start_index):
with fitz.open(src) as pdf:
xrefs = image_xrefs(pdf)
if xrefs is None:
print(f'Support for weirder PDFs not yet implemented, skipping {src}')
return None
dest.mkdir(parents=True, exist_ok=True)
for (idx, xref) in enumerate(xrefs, start=start_index):
image = pdf.extract_image(xref)
file_path = dest / f'{idx:04d}.{image["ext"]}'
with open(file_path, 'wb') as f:
f.write(image["image"])
return pdf.page_count
def nfc(s): def nfc(s):
return unicodedata.normalize('NFC', s) return unicodedata.normalize('NFC', s)
@ -357,14 +552,6 @@ def alphabetic_numbering(entries, start_point):
return None return None
return alphabetized return alphabetized
def link_ordered_files(ordering, dest, start_index):
dest.mkdir(parents=True, exist_ok=True)
for (idx, src_path) in enumerate(ordering, start=start_index):
ext = src_path.suffix.lower()
link_path = dest / f'{idx:04d}{ext}'
link_path.symlink_to(relpath(src_path, dest))
def check_extension(path, exts): def check_extension(path, exts):
return path.suffix.lower() in exts return path.suffix.lower() in exts
@ -396,81 +583,6 @@ def descendant_files_ignore(path, exclude):
return result return result
def collate(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
extraction_dir = args.destdir / 'extract'
hint_map = {Path(relpath(hint, extraction_dir)).parents[-2].name: hint for hint in args.hints}
collation_staging_area = args.destdir / 'site' / 'images-staging'
collation_staging_area.mkdir(parents=True)
collation_area = args.destdir / 'site' / 'images'
collation_area.mkdir(parents=True, exist_ok=True)
for work_path in extraction_dir.iterdir():
work_id = work_path.name
work_collation_dir = collation_area / work_id
if work_collation_dir.exists():
continue
virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
if virtual == (1,):
continue
work_staging_dir = collation_staging_area / work_id
pages_collated = collate_from_paths([hint_map.get(work_id, work_path)], work_staging_dir, 0, [])
if pages_collated:
print(f'Collated {pages_collated} pages for {work_id}')
work_staging_dir.rename(work_collation_dir)
else:
if work_staging_dir.is_dir():
for f in work_staging_dir.iterdir():
f.unlink()
work_staging_dir.rmdir()
if pages_collated == 0:
print(f'{work_id} contains no files? skipping')
elif pages_collated is None:
print(f'Unable to deduce file structure for {work_id}, skipping')
collation_staging_area.rmdir()
con.close()
def try_collate_split_regex(srcs, dest, start_index, exclude, earlier=None, later=None):
early_srcs = []
middle_srcs = []
late_srcs = []
for src in srcs:
if earlier and earlier.search(nname(src)):
early_srcs.append(src)
elif later and later.search(nname(src)):
late_srcs.append(src)
else:
middle_srcs.append(src)
if sum(1 for l in [early_srcs, middle_srcs, late_srcs] if l) <= 1:
return False
early_page_count = collate_from_paths(early_srcs, dest, start_index, exclude)
if early_page_count is None:
return None
start_index += early_page_count
middle_page_count = collate_from_paths(middle_srcs, dest, start_index, exclude)
if middle_page_count is None:
return None
start_index += middle_page_count
late_page_count = collate_from_paths(late_srcs, dest, start_index, exclude)
if late_page_count is None:
return None
return early_page_count + middle_page_count + late_page_count
def standalone_image_size(filepath): def standalone_image_size(filepath):
with Image.open(filepath) as im: with Image.open(filepath) as im:
return im.size return im.size
@ -503,108 +615,6 @@ def mean(items):
def superior_or_equal(a, b): def superior_or_equal(a, b):
return len(a) >= len(b) and all(a[i] >= b[i] for i in range(len(b))) return len(a) >= len(b) and all(a[i] >= b[i] for i in range(len(b)))
def try_collate_images_vs_pdf(srcs, dest, start_index, exclude):
pdfs = [src for src in srcs if 'pdf' in src.name.lower()]
if len(pdfs) != 1:
return False
outer_pdf = pdfs[0]
inner_pdfs = [f for f in descendant_files_ignore(outer_pdf, exclude) if is_pdf(f)]
if len(inner_pdfs) != 1:
return False
inner_pdf = inner_pdfs[0]
non_pdf_srcs = [src for src in srcs if src != outer_pdf]
images = []
non_images = []
descendant_files = [f for src in non_pdf_srcs for f in descendant_files_ignore(src, exclude)]
for f in descendant_files:
if is_image(f):
images.append(f)
else:
non_images.append(f)
break
if len(non_images) != 0 or len(images) == 0:
return False
pdf_sizes = pdf_image_sizes(inner_pdf)
standalone_sizes = [standalone_image_size(f) for f in images]
median_pdf_size = median(pdf_sizes)
median_standalone_size = median(standalone_sizes)
if not (median_pdf_size and median_standalone_size):
return False
if abs(len(pdf_sizes) - len(standalone_sizes)) > 2:
with fitz.open(inner_pdf) as pdf:
pdf_page_count = len(pdf)
height_adjusted_pdf_image_count = (
len(pdf_sizes) *
mean([size[1] for size in pdf_sizes]) / mean([size[1] for size in standalone_sizes])
)
if (
abs(pdf_page_count - len(standalone_sizes)) <= 2 and
len(pdf_sizes) > len(standalone_sizes) and
median_pdf_size[0] == median_standalone_size[0] and
abs(height_adjusted_pdf_image_count - len(standalone_sizes)) <= 2
):
return collate_from_paths(non_pdf_srcs, dest, start_index, exclude)
else:
return False
if superior_or_equal(median_standalone_size, median_pdf_size):
return collate_from_paths(non_pdf_srcs, dest, start_index, exclude)
elif superior_or_equal(median_pdf_size, median_standalone_size):
return collate_from_paths([outer_pdf], dest, start_index, exclude)
else:
return False
def collate_from_paths(srcs, dest, start_index, exclude):
if len(srcs) == 1 and srcs[0].is_dir():
return collate_from_paths(ls_ignore(srcs[0], exclude), dest, start_index, exclude)
if len(srcs) == 1 and is_pdf(srcs[0]):
print(f'Extracting images from {srcs[0]}')
return link_pdf(srcs[0], dest, start_index)
if len(srcs) == 0:
return 0
if len(srcs) == 2 and all(src.is_dir() for src in srcs):
for quality in IMAGE_QUALITY_REGEXES:
def a_not_b(a, b, src):
if a in quality:
return quality[a].search(nname(src))
else:
return not quality[b].search(nname(src))
better_srcs = [src for src in srcs if a_not_b('better', 'worse', src)]
worse_srcs = [src for src in srcs if a_not_b('worse', 'better', src)]
if len(better_srcs) == 1 and len(worse_srcs) == 1 and better_srcs[0] != worse_srcs[0]:
better = better_srcs[0]
worse = worse_srcs[0]
if len(descendant_files_ignore(better, exclude)) == len(descendant_files_ignore(worse, exclude)):
return collate_from_paths([better], dest, start_index, exclude)
images_vs_pdf = try_collate_images_vs_pdf(srcs, dest, start_index, exclude)
if images_vs_pdf != False:
return images_vs_pdf
for regexes in SPLITS:
split_attempt = try_collate_split_regex(srcs, dest, start_index, exclude, **regexes)
if split_attempt != False:
return split_attempt
if all(src.is_file() and is_image(src) for src in srcs):
ordering = complete_prefix_number_ordering(srcs)
if ordering:
print(f'Symlinking image files: {ordering[0]}...')
link_ordered_files(ordering, dest, start_index)
return len(ordering)
else:
return None
return None
def self_and_parents(path): def self_and_parents(path):
return [path] + list(path.parents) return [path] + list(path.parents)
@ -680,30 +690,22 @@ def manual_collate(args):
work_staging_dir = collation_staging_area / work_id work_staging_dir = collation_staging_area / work_id
work_staging_dir.mkdir(parents=True) work_staging_dir.mkdir(parents=True)
pages_collated = 0 collator = Collator(work_staging_dir, exclusions, args.locale)
for group in groups: for group in groups:
pages_added = collate_from_paths( collation_result = collator.collate_from_paths([item for item in group if item not in exclusions])
[item for item in group if item not in exclusions], if collation_result is None:
work_staging_dir,
pages_collated,
exclusions,
)
if pages_added is None:
print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}') print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}')
pages_collated = None
break break
pages_collated += pages_added if collation_result and collator.index > 0:
print(f'Collated {collator.index} pages for {work_id}')
if pages_collated:
print(f'Collated {pages_collated} pages for {work_id}')
work_staging_dir.rename(work_collation_dir) work_staging_dir.rename(work_collation_dir)
else: else:
for f in work_staging_dir.iterdir(): for f in work_staging_dir.iterdir():
f.unlink() f.unlink()
work_staging_dir.rmdir() work_staging_dir.rmdir()
if pages_collated == 0: if collation_result and collator.index == 0:
print(f'No files found for {work_id}') print(f'No files found for {work_id}')
collation_staging_area.rmdir() collation_staging_area.rmdir()
@ -913,6 +915,14 @@ argparser.add_argument(
default=Path(os.getenv('DLIBRARY_DIR', './dlibrary')), default=Path(os.getenv('DLIBRARY_DIR', './dlibrary')),
help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)', help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)',
) )
argparser.add_argument(
'-l', '--locale',
type=str,
default=os.getenv('DLIBRARY_LOCALE', 'en_US'),
help=('preferred locale for requesting metadata and collating (e.g. "ja_JP", "en_US"). '
'May still fall back to Japanese if other languages are unavailable. '
'(default: $DLIBRARY_LOCALE or en_US)'),
)
subparsers = argparser.add_subparsers(title="subcommands", required=True) subparsers = argparser.add_subparsers(title="subcommands", required=True)
parser_extract = subparsers.add_parser('extract', aliases=['x', 'ex'], help='extract zipfiles') parser_extract = subparsers.add_parser('extract', aliases=['x', 'ex'], help='extract zipfiles')
@ -931,14 +941,6 @@ parser_extract.add_argument(
parser_extract.set_defaults(func=extract) parser_extract.set_defaults(func=extract)
parser_fetch = subparsers.add_parser('fetch', aliases=['f', 'fet'], help='fetch metadata and thumbnails') parser_fetch = subparsers.add_parser('fetch', aliases=['f', 'fet'], help='fetch metadata and thumbnails')
parser_fetch.add_argument(
'-l', '--locale',
type=str,
default=os.getenv('DLIBRARY_LOCALE', 'en_US'),
help=('locale to use when requesting metadata (e.g. "ja_JP", "en_US"). '
'May still fall back to Japanese if metadata in other languages is unavailable. '
'(default: $DLIBRARY_LOCALE or en_US)'),
)
parser_fetch.set_defaults(func=fetch) parser_fetch.set_defaults(func=fetch)
parser_collate = subparsers.add_parser( parser_collate = subparsers.add_parser(