#!/usr/bin/env python3 import argparse import asyncio import importlib_resources as resources from io import BytesIO from pathlib import Path import os from os.path import relpath, splitext import random import re import readline import shutil import sqlite3 import textwrap import unicodedata from urllib.parse import urlparse import zipfile from dlsite_async import DlsiteAPI import fitz from libsixel import * from PIL import Image from jinja2 import Environment, PackageLoader, select_autoescape import rarfile import requests NUMBER_REGEX = re.compile('[0-90-9]+') DLSITE_ID_REGEX = re.compile('^[BR]J[0-9]+$') FANZA_ID_REGEX = re.compile('^d_[0-9]+$') FAKKU_ID_REGEX = re.compile('.*_FAKKU$') HI_RES_REGEX = re.compile('高解像度|原寸', re.I) NO_TONE_REGEX = re.compile('トーン(効果)?[な無]し|グレースケール', re.I) TONE_REGEX = re.compile('トーン(版|(効果)?[有あ]り)', re.I) COLOR_REGEX = re.compile('カラー', re.I) MONOCHROME_REGEX = re.compile('モノクロ', re.I) IMAGE_QUALITY_REGEXES = [ { 'better': HI_RES_REGEX }, { 'better': NO_TONE_REGEX, 'worse': TONE_REGEX }, { 'better': COLOR_REGEX, 'worse': MONOCHROME_REGEX }, ] LANGUAGE_REGEXES = { 'en_US': re.compile('english|英語', re.I), 'ja_JP': re.compile('日本語', re.I), 'zh_CN': re.compile('(^|[^體])中文|中国語', re.I), 'zh_TW': re.compile('繁體中文', re.I), 'ko_KR': re.compile('한국어', re.I), } TEXTLESS_REGEX = re.compile('(台詞|セリフ|せりふ|テキスト|文字)((な|無)し|抜き)|notext|textless', re.I) FXLESS_REGEX = re.compile('効果音(な|無)し', re.I) FRONT_COVER_REGEX = re.compile('(?\S+)\s+Do($|(?=\s))') PDF_INLINE_IMAGE_REGEX = re.compile(r'(^|\s)(BI|ID|EI)($|\s)') SUGGESTED_WORKS_COUNT = 10 debug_mode = False def debug(s): if debug_mode: print(s) def open_zipfile_with_encoding(path): for enc in ["utf-8", "shift-jis", "shift-jisx0213"]: try: return zipfile.ZipFile(path, metadata_encoding=enc) except UnicodeDecodeError: pass print(f'{path} contains filenames with unknown character encoding!') exit(1) def open_rarfile_with_encoding(path): for enc in ["utf-8", "shift-jis", "shift-jisx0213"]: rf = rarfile.RarFile(path, charset=enc) if all('�' not in info.filename for info in rf.infolist()): return rf print(f'{path} contains filenames with unknown character encoding!') exit(1) def extract(args): absolute_archive_paths = set(path.resolve(strict=True) for path in args.archives) any_skipped = False for archive_path in args.archives: if archive_path.suffix.lower() == '.zip': work_id = archive_path.stem work_extract_path = args.destdir / 'extract' / work_id print(f'Extracting {archive_path} to {work_extract_path}') with open_zipfile_with_encoding(archive_path) as z: work_extract_path.mkdir(parents=True) z.extractall(path=work_extract_path) if args.remove: archive_path.unlink() elif rar_match := MULTIPART_RAR_HEAD_REGEX.fullmatch(archive_path.name): work_id = rar_match.group(1) work_extract_path = args.destdir / 'extract' / work_id print(f'Extracting multipart RAR archive beginning with {archive_path} to {work_extract_path}') with open_rarfile_with_encoding(archive_path) as r: volumes = [Path(vol).resolve(strict=True) for vol in r.volumelist()] if any(vol not in absolute_archive_paths for vol in volumes): print(f'Multipart RAR archive starting with {archive_path} contains volume files not listed on command-line, skipping') any_skipped = True continue work_extract_path.mkdir(parents=True) r.extractall(path=work_extract_path) if args.remove: for vol in volumes: vol.unlink() elif MULTIPART_RAR_TAIL_REGEX.fullmatch(archive_path.name): pass else: print(f'Unknown archive file type {archive_path}, skipping') any_skipped = True if args.auto and not any_skipped: parser_fetch.parse_args(args=[], namespace=args) fetch(args) def manual_input_metadata(work_id): print(f"Don't know how to fetch metadata for {work_id}, input manually:") title = input('Title: ') circle = input('Circle [None]: ') or None authors = [author.strip() for author in input('Authors (comma-separated): ').split(',') if author.strip()] tags = [tag.strip() for tag in input('Tags (comma-separated): ').split(',') if tag.strip()] date = input('Pub date (yyyy-mm-dd): ') description = input('Description: ') series = input('Series [None]: ') or None return { "id": work_id, "title": title, "circle": circle, "authors": authors, "tags": tags, "date": date, "description": description, "series": series, } async def fetch_async(args): con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)") cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id), PRIMARY KEY(author, work))") cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id), PRIMARY KEY(tag, work))") thumbnails_dir = args.destdir / 'site' / 'thumbnails' thumbnails_dir.mkdir(parents=True, exist_ok=True) async with DlsiteAPI(locale=args.locale) as api: for work_path in (args.destdir / 'extract').iterdir(): work_id = work_path.name res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,)) if res.fetchone() is not None: continue if DLSITE_ID_REGEX.fullmatch(work_id): print(f'Fetching DLSite metadata for {work_id}') dlsite_metadata = await api.get_work(work_id) db_row = { "id": work_id, "title": dlsite_metadata.work_name, "circle": dlsite_metadata.circle, "date": dlsite_metadata.regist_date.date().isoformat(), "description": dlsite_metadata.description, "series": dlsite_metadata.series, } authors = dlsite_metadata.author or [] tags = dlsite_metadata.genre or [] thumbnail_url = dlsite_metadata.work_image if thumbnail_url.startswith('//'): thumbnail_url = 'https:' + thumbnail_url else: db_row = manual_input_metadata(work_id) authors = db_row.pop('authors') tags = db_row.pop('tags') if FANZA_ID_REGEX.fullmatch(work_id): candidate_urls = [ f'https://doujin-assets.dmm.co.jp/digital/{work_type}/{work_id}/{work_id}pl.jpg' for work_type in ['comic', 'cg'] ] thumbnail_url = None for url in candidate_urls: h = requests.head(url, allow_redirects=False) if h.status_code == 200: thumbnail_url = url break elif FAKKU_ID_REGEX.fullmatch(work_id): thumbnail_url = None else: thumbnail_url = input('Thumbnail image URL [default: first page]: ') cur.execute( "INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)", db_row, ) cur.executemany( "INSERT INTO authors VALUES(:author, :work)", [{ "author": author, "work": work_id } for author in authors], ) cur.executemany( "INSERT INTO tags VALUES(:tag, :work)", [{ "tag": tag, "work": work_id } for tag in tags], ) if thumbnail_url: ext = url_file_ext(thumbnail_url) dest_file = thumbnails_dir / (work_id + ext) print(f'Downloading thumbnail for {work_id} from {thumbnail_url}') with open(dest_file, 'wb') as fd: with requests.get(thumbnail_url, stream=True) as r: for chunk in r.iter_content(chunk_size=16384): fd.write(chunk) con.commit() con.close() def url_file_ext(url): return splitext(urlparse(url).path)[1] def fetch(args): asyncio.run(fetch_async(args)) if args.auto: parser_collate.parse_args(args=[], namespace=args) collate(args) def self_and_parents(path): return [path] + list(path.parents) def collate(args): extraction_dir = args.destdir / 'extract' def extracted_path_work_id(path): trail = self_and_parents(Path(relpath(path, extraction_dir))) if len(trail) < 2: return None result = trail[-2].name if result == '..': return None return result (raw_groups, raw_exclusions) = parse_expressions(args.expression) specified_works = set() works_groups = {} for group in raw_groups: if len(group) == 0: continue work_id = extracted_path_work_id(group[0]) if not work_id: print(f'Group {group} contains paths outside an extracted work!') exit(1) if not all(extracted_path_work_id(item) == work_id for item in group[1:]): print(f'Group {group} contains paths from multiple works!') exit(1) specified_works.add(work_id) if work_id not in works_groups: works_groups[work_id] = [] normalized_paths = [normalize_to(item, args.destdir) for item in group] if not all(path.exists() for path in normalized_paths): print(f'Group {group} contains nonexistent paths!') exit(1) works_groups[work_id].append(normalized_paths) exclusions = [] for exclusion in raw_exclusions: work_id = extracted_path_work_id(exclusion) if not work_id: print(f'Excluded path {exclusion} does not belong to an extracted work!') exit(1) specified_works.add(work_id) normalized_path = normalize_to(exclusion, args.destdir) if not normalized_path.exists(): print(f'Excluded path {exclusion} does not exist!') exit(1) exclusions.append(normalized_path) collation_staging_area = args.destdir / 'site' / 'images-staging' collation_staging_area.mkdir(parents=True) collation_area = args.destdir / 'site' / 'images' collation_area.mkdir(parents=True, exist_ok=True) con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() any_warnings = False for work_path in extraction_dir.iterdir(): work_id = work_path.name if args.only_specified_works and work_id not in specified_works: continue work_collation_dir = collation_area / work_id if work_collation_dir.exists(): if work_id not in specified_works: continue if len(list(work_collation_dir.iterdir())) > 0: print(f'Collation directory for work {work_id} already exists!') any_warnings = True break else: work_collation_dir.rmdir() virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone() if virtual == (1,): if work_id in specified_works: print(f'Work {work_id} is virtual!') any_warnings = True break continue work_staging_dir = collation_staging_area / work_id collator = Collator(work_staging_dir, exclusions, args) for group in works_groups.get(work_id, [[work_path]]): collation_result = collator.collate_from_paths([item for item in group if item not in exclusions]) if not collation_result: print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}') break if collation_result and collator.index > 0: print(f'Collated {collator.index} pages for {work_id}') work_staging_dir.rename(work_collation_dir) else: if work_staging_dir.is_dir(): for f in work_staging_dir.iterdir(): f.unlink() work_staging_dir.rmdir() if not collation_result: print(f'Unable to deduce file structure for {work_id}, skipping') elif collator.index == 0: print(f'No files found for {work_id}, skipping') any_warnings = True collation_staging_area.rmdir() con.close() if args.auto and not any_warnings: parser_generate.parse_args(args=[], namespace=args) generate(args) class Collator: def __init__(self, dest, exclude, args): self.dest = dest self.exclude = exclude self.args = args self.index = 0 def collate_from_paths(self, srcs): srcs = [src for src in srcs if len(descendant_files_ignore(src, self.exclude)) > 0] if len(srcs) == 1 and srcs[0].is_dir(): return self.collate_from_paths(ls_ignore(srcs[0], self.exclude)) if len(srcs) == 1 and is_pdf(srcs[0]): print(f'Extracting images from {srcs[0]}') return self.link_pdf(srcs[0]) if len(srcs) == 0: return True debug(f'Auto-collating {srcs}') select_language = self.try_collate_select_language(srcs) if select_language is not False: return select_language if len(srcs) == 2 and all(src.is_dir() for src in srcs): for quality in IMAGE_QUALITY_REGEXES: def a_not_b(a, b, src): if a in quality: return quality[a].search(nname(src)) else: return not quality[b].search(nname(src)) better_srcs = [src for src in srcs if a_not_b('better', 'worse', src)] worse_srcs = [src for src in srcs if a_not_b('worse', 'better', src)] if len(better_srcs) == 1 and len(worse_srcs) == 1 and better_srcs[0] != worse_srcs[0]: better = better_srcs[0] worse = worse_srcs[0] if len(descendant_files_ignore(better, self.exclude)) == len(descendant_files_ignore(worse, self.exclude)): return self.collate_from_paths([better]) images_vs_pdf = self.try_collate_images_vs_pdf(srcs) if images_vs_pdf is not False: return images_vs_pdf for regexes in SPLITS: split_attempt = self.try_collate_split_regex(srcs, **regexes) if split_attempt is not False: return split_attempt if all(src.is_file() and is_image(src) for src in srcs): ordering = complete_prefix_number_ordering(srcs) if ordering: print(f'Symlinking image files: {ordering[0]}...') return self.link_ordered_files(ordering) else: return None return None def link_pdf(self, src): with fitz.open(src) as pdf: images = pdf_images(pdf, self.args.pdf_strategy) if images is None: print(f'Failed to enumerate page images in PDF {src}') return None self.dest.mkdir(parents=True, exist_ok=True) print(f'0 pages collated...', end='') for (idx, image) in enumerate(images, start=self.index): file_path = self.dest / f'{idx:04d}.{image["ext"]}' with open(file_path, 'wb') as f: f.write(image["image"]) print(f'\x1b[2K\r{idx+1-self.index} pages collated...', end='') print() self.index += pdf.page_count return True def link_ordered_files(self, ordering): self.dest.mkdir(parents=True, exist_ok=True) for (idx, src_path) in enumerate(ordering, start=self.index): ext = src_path.suffix.lower() link_path = self.dest / f'{idx:04d}{ext}' link_path.symlink_to(relpath(src_path, self.dest)) self.index += len(ordering) return True def try_collate_split_regex(self, srcs, earlier=None, later=None): early_srcs = [] middle_srcs = [] late_srcs = [] for src in srcs: if earlier and earlier.search(nname(src)): early_srcs.append(src) elif later and later.search(nname(src)): late_srcs.append(src) else: middle_srcs.append(src) if sum(1 for l in [early_srcs, middle_srcs, late_srcs] if l) <= 1: return False early_page_collation = self.collate_from_paths(early_srcs) if early_page_collation is None: return None middle_page_collation = self.collate_from_paths(middle_srcs) if middle_page_collation is None: return None late_page_collation = self.collate_from_paths(late_srcs) if late_page_collation is None: return None return True def try_collate_images_vs_pdf(self, srcs): pdfs = [src for src in srcs if 'pdf' in src.name.lower()] if len(pdfs) != 1: return False outer_pdf = pdfs[0] inner_pdfs = [f for f in descendant_files_ignore(outer_pdf, self.exclude) if is_pdf(f)] if len(inner_pdfs) != 1: return False inner_pdf = inner_pdfs[0] non_pdf_srcs = [src for src in srcs if src != outer_pdf] images = [] non_images = [] descendant_files = [f for src in non_pdf_srcs for f in descendant_files_ignore(src, self.exclude)] for f in descendant_files: if is_image(f): images.append(f) else: non_images.append(f) break if len(non_images) != 0 or len(images) == 0: return False debug(f'Comparing PDF {inner_pdf} and images {images}') pdf_sizes = pdf_image_sizes(inner_pdf) standalone_sizes = [standalone_image_size(f) for f in images] median_pdf_size = median(pdf_sizes) median_standalone_size = median(standalone_sizes) if not (median_pdf_size and median_standalone_size): return False debug(f'PDF: {len(pdf_sizes)} images, {median_pdf_size}; standalone: {len(standalone_sizes)} images, median {median_standalone_size}') if abs(len(pdf_sizes) - len(standalone_sizes)) > 2: with fitz.open(inner_pdf) as pdf: pdf_page_count = len(pdf) height_adjusted_pdf_image_count = ( len(pdf_sizes) * mean([size[1] for size in pdf_sizes]) / mean([size[1] for size in standalone_sizes]) ) if ( abs(pdf_page_count - len(standalone_sizes)) <= 2 and len(pdf_sizes) > len(standalone_sizes) and median_pdf_size[0] == median_standalone_size[0] and abs(height_adjusted_pdf_image_count - len(standalone_sizes)) <= 2 ): return self.collate_from_paths(non_pdf_srcs) else: return False if superior_or_equal(median_standalone_size, median_pdf_size): return self.collate_from_paths(non_pdf_srcs) elif superior_or_equal(median_pdf_size, median_standalone_size): return self.collate_from_paths([outer_pdf]) else: return False def try_collate_select_language(self, srcs): if self.args.locale not in LANGUAGE_REGEXES: return False if not all(any(lang.search(nname(src)) for lang in LANGUAGE_REGEXES.values()) for src in srcs): return False srcs_matching_language = [src for src in srcs if LANGUAGE_REGEXES[self.args.locale].search(nname(src))] if len(srcs_matching_language) == len(srcs) or len(srcs_matching_language) == 0: return False return self.collate_from_paths(srcs_matching_language) def block_is_image(block): return block[6] == 1 def block_text(block): return block[4] def block_relevant(block): return block_is_image(block) or not IRRELEVANT_PDF_BLOCK_REGEX.search(block_text(block)) def relevant_blocks(page): blocks = page.get_text('blocks') return [block for block in blocks if block_relevant(block)] def is_single_image(page): blocks = relevant_blocks(page) return len(blocks) == 1 and block_is_image(blocks[0]) def extract_image(pdf, xref): image = pdf.extract_image(xref) if f'.{image["ext"]}' in IMAGE_FILE_EXTENSIONS: return image print(f'Converting image from {image["ext"]} to png') pix = fitz.Pixmap(pdf, xref) return { 'ext': 'png', 'image': pix.tobytes('png') } def get_displayed_image_xref(page): ref_names = [] for content_xref in page.get_contents(): content = page.parent.xref_stream(content_xref).decode('ascii', 'replace') if PDF_INLINE_IMAGE_REGEX.search(content): debug('Inline image detected') return None for m in PDF_REFERENCED_IMAGE_REGEX.finditer(content): ref_names.append(m.group('ref_name')) if len(ref_names) == 0: debug('Page does not reference any xobjects') return None if len(ref_names) > 1: debug(f'Page references multiple xobjects: {ref_names}') return None image_xrefs = [image[0] for image in page.get_images() if image[7] == ref_names[0]] if len(image_xrefs) == 1: return image_xrefs[0] if len(image_xrefs) == 0: debug(f'No images found matching ref name {ref_names[0]}') else: debug(f"Multiple images found matching ref name {ref_names[0]}, that probably shouldn't happen") return None def display_sixel_page(page): s = BytesIO() image = Image.open(BytesIO(page.get_pixmap(dpi=PDF_PREVIEW_DPI).tobytes('png'))) width, height = image.size try: data = image.tobytes() except NotImplementedError: data = image.tostring() output = sixel_output_new(lambda data, s: s.write(data), s) try: if image.mode == 'RGBA': dither = sixel_dither_new(256) sixel_dither_initialize(dither, data, width, height, SIXEL_PIXELFORMAT_RGBA8888) elif image.mode == 'RGB': dither = sixel_dither_new(256) sixel_dither_initialize(dither, data, width, height, SIXEL_PIXELFORMAT_RGB888) elif image.mode == 'P': palette = image.getpalette() dither = sixel_dither_new(256) sixel_dither_set_palette(dither, palette) sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_PAL8) elif image.mode == 'L': dither = sixel_dither_get(SIXEL_BUILTIN_G8) sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_G8) elif image.mode == '1': dither = sixel_dither_get(SIXEL_BUILTIN_G1) sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_G1) else: raise RuntimeError('unexpected image mode') try: sixel_encode(data, width, height, 1, dither, output) print(s.getvalue().decode('ascii')) finally: sixel_dither_unref(dither) finally: sixel_output_unref(output) def pdf_images(pdf, strategy): print(f'0/{pdf.page_count} pages analyzed...', end='') image_extractors = [] for (idx, page) in enumerate(pdf): xref = get_displayed_image_xref(page) if xref is not None and is_single_image(page): image_extractors.append(lambda p=pdf, x=xref: extract_image(p, x)) else: page_images = page.get_image_info() print(f'\nPage {idx+1}: {len(page_images)} images, {len(relevant_blocks(page))} total relevant objects') choice = strategy while True: if choice.lower().startswith('n'): return None if choice.lower().startswith('c'): if choice == strategy: print(f'Converting page {idx+1}') image_extractors.append(lambda p=page: { 'ext': 'png', 'image': p.get_pixmap(dpi=PDF_CONVERSION_DPI).tobytes('png') }) break if xref is not None and (choice.lower().startswith('x') or choice.lower() == 'extract'): if choice == strategy: print(f'Extracting image from page {idx+1} without text') image_extractors.append(lambda p=pdf, x=xref: extract_image(p, x)) break if choice.lower().startswith('d'): if choice == strategy: print(f'Dropping page {idx+1}') break if choice.lower().startswith('s'): display_sixel_page(page) choice = input(f'[N]ope out / [c]onvert page{"" if xref is None else " / e[x]tract image"} / [d]rop page / [s]how page? [n/c{"" if xref is None else "/x"}/d/s] ') print(f'\x1b[2K\r{idx+1}/{pdf.page_count} pages analyzed...', end=('' if idx+1 < pdf.page_count else '\n')) return (extractor() for extractor in image_extractors) def nfc(s): return unicodedata.normalize('NFC', s) def nname(entry): return nfc(entry.name) def complete_prefix_number_ordering(entries): if len(entries) == 1: return entries entries_by_version = {} for entry in entries: version_code = 0 for (i, version) in enumerate(ALT_VERSIONS): if version in nname(entry): version_code |= (1 << i) entries_by_version.setdefault(version_code, []).append(entry) numberings_by_version = {ver: unique_hierarchical_prefix_numbering(entries_by_version[ver]) for ver in entries_by_version} unified_indices = set() for numbering in numberings_by_version.values(): if numbering is None: return None unified_indices |= set(numbering.keys()) unified_indices.discard(None) unified_indices = list(unified_indices) unified_indices.sort() min_delta_by_level = {} if len(unified_indices) > 1: for i in range(1, len(unified_indices)): cur = unified_indices[i] prev = unified_indices[i-1] for level in range(min(len(cur), len(prev))): if cur[level] != prev[level] and not (cur[level] == 5 and prev[level] == 0): delta = cur[level] - prev[level] min_delta_by_level[level] = min(min_delta_by_level.get(level, delta), delta) if any(delta > 2 for delta in min_delta_by_level.values()): return None unified_indices.append(None) versions = list(numberings_by_version.keys()) versions.sort() version_lengths = {ver: len(numberings_by_version[ver]) for ver in numberings_by_version} inner_versions = [] outer_versions = [versions[0]] for ver in versions[1:]: if version_lengths[ver] >= version_lengths[versions[0]] - 2: outer_versions.append(ver) else: inner_versions.append(ver) result = [] for out_ver in outer_versions: for i in unified_indices: for ver in ([out_ver] + (inner_versions if out_ver == versions[0] else [])): result += numberings_by_version[ver].get(i, []) return result def unique_hierarchical_prefix_numbering(entries, start_point=0): if len(entries) == 1 and not NUMBER_REGEX.search(nname(entries[0])): return {None: entries} debug(f'Finding unique hierarchical prefix ordering from start point {start_point} for {entries}') longest_entry = max(entries, key=lambda e: len(nname(e))) matches = reversed(list(NUMBER_REGEX.finditer(nname(longest_entry)))) for m in matches: pos = m.start() if pos < start_point: return None prefix = nname(longest_entry)[:pos] debug(f'Checking prefix {prefix}') if all(nname(e).startswith(prefix) or prefix.startswith(nfc(e.stem)) for e in entries): numbering = {} for e in entries: if pos >= len(nfc(e.stem)): i = 0 else: n = NUMBER_REGEX.match(nname(e)[pos:]) if n is None: return None i = int(n.group()) numbering.setdefault((i,), []).append(e) indices = list(numbering.keys()) for idx in indices: if len(numbering[idx]) > 1: ents_idx = numbering.pop(idx) debug(f'Index {idx} has multiple entries') longest = max(ents_idx, key=lambda e: len(nname(e))) next_layer_start = pos + NUMBER_REGEX.match(nname(longest)[pos:]).end() sub_numbering = unique_hierarchical_prefix_numbering(ents_idx, start_point=next_layer_start) or alphabetic_numbering(ents_idx, next_layer_start) if not sub_numbering: return None for sub_idx in sub_numbering: numbering[(*idx, *sub_idx)] = sub_numbering[sub_idx] return numbering return None def alphabetic_numbering(entries, start_point): debug(f'Finding alphabetic numbering from start point {start_point} for {entries}') alphabetized = {} for entry in entries: ending = nfc(entry.stem)[start_point:].strip(' -_()') debug(f'{entry} has ending {ending}') if len(ending) > 1: debug('Ending is more than one character, giving up') return None index = 0 if ending == '' else ord(ending.lower()) - ord('a') + 1 if index < 0 or index > 26: debug('Ending is not a letter, giving up') return None if (index,) in alphabetized: debug(f'Index value {index} is already present, giving up') return None alphabetized[(index,)] = [entry] return alphabetized def check_extension(path, exts): return path.suffix.lower() in exts def is_pdf(path): return check_extension(path, ['.pdf']) def is_image(path): return check_extension(path, IMAGE_FILE_EXTENSIONS) def ignoreable(path): return path.name in IGNOREABLE_FILES or check_extension(path, IGNOREABLE_EXTENSIONS) def ls_ignore(directory, exclude): return [ path for path in directory.iterdir() if not ignoreable(path) and path not in exclude ] def descendant_files_ignore(path, exclude): if path.is_file(): return [path] result = [] for item in ls_ignore(path, exclude): if item.is_dir(): result.extend(descendant_files_ignore(item, exclude)) else: result.append(item) return result def standalone_image_size(filepath): with Image.open(filepath) as im: return im.size def pdf_image_sizes(filepath): sizes_by_xref = {} with fitz.open(filepath) as pdf: for page in pdf: for (xref, _, width, height, *_) in page.get_images(): if xref in sizes_by_xref: continue sizes_by_xref[xref] = (width, height) return list(sizes_by_xref.values()) def median(items): if len(items) == 0: return None items.sort() return items[len(items) // 2] def mean(items): if len(items) == 0: return None return sum(items) / len(items) def superior_or_equal(a, b): return len(a) >= len(b) and all(a[i] >= b[i] for i in range(len(b))) def parse_expressions(tokens): groups = [] exclusions = [] while tokens: token = tokens.pop(0) if token == '!': exclusions.extend(parse_exclusion(tokens)) elif token == '(': groups.append(parse_group(tokens)) else: groups.append([token]) return (groups, exclusions) def parse_exclusion(tokens): token = tokens.pop(0) if token == '(': return parse_group(tokens) else: return [token] def parse_group(tokens): items = [] while True: token = tokens.pop(0) if token == ')': return items else: items.append(token) def normalize_to(path, ref): return ref / Path(relpath(path, ref)) def fmt_size(s): return f'{s[0]}x{s[1]}px' def analyze(args): extract_dir = args.destdir / 'extract' files = descendant_files_ignore(extract_dir / args.work_id, []) files.sort() for f in files: print(f'{relpath(f, extract_dir)}', end='') if is_image(f): size = standalone_image_size(f) print(f'\t{fmt_size(size)}') elif is_pdf(f): sizes = pdf_image_sizes(f) if len(sizes) == 0: print('\tContains no images') else: print(f'\t{len(sizes)} images, median {fmt_size(median(sizes))}, min {fmt_size(min(sizes))}, max {fmt_size(max(sizes))}') else: print() def metadata(args): con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() if args.virtual is not None: cur.execute("UPDATE works SET virtual = ? WHERE id = ?", (1 if args.virtual else 0, args.work_id)) con.commit() res = cur.execute( "SELECT title, circle, date, description, series, virtual FROM works WHERE id = ?", (args.work_id,), ).fetchone() if res is None: print(f'Work id {args.work_id} not found!') return (title, circle, date, description, series, virtual) = res print(f'Work ID: {args.work_id}') print(f'Title: {title}') print(f'Circle: {circle}') print(f'Pub date: {date}') print(f'Description: {description}') print(f'Series: {series}') print(f'Virtual: {"Yes" if virtual == 1 else "No"}') con.close() def copy_recursive(src, dest): dest.mkdir(parents=True, exist_ok=True) for item in src.iterdir(): if item.is_dir() and not item.is_symlink(): copy_recursive(item, dest / item.name) else: shutil.copyfile(item, dest / item.name) memoized_similarities = {} def similarity(a, b): if len(a) < len(b) or (len(a) == len(b) and a < b): shorter = a longer = b else: shorter = b longer = a if len(shorter) == 0: return 0 if (shorter, longer) in memoized_similarities: return memoized_similarities[(shorter, longer)] options = [similarity(shorter[1:], longer)] for i in range(1, len(shorter)+1): match_idx = longer.find(shorter[:i]) if match_idx == -1: break options.append(i*i + similarity(shorter[i:], longer[match_idx+i:])) result = max(options) memoized_similarities[(shorter, longer)] = result return result def top(items, n, key, overflow=0): winners = [] for item in items: score = key(item) if len(winners) < n or score >= winners[-1][1]: for i in range(len(winners) + 1): if i == len(winners) or score >= winners[i][1]: winners.insert(i, (item, score)) break while len(winners) > n and winners[-1][1] < winners[n-1][1]: winners.pop() # shuffle followed by stable sort to randomly shuffle within each score tier random.shuffle(winners) winners.sort(key=lambda w: w[1], reverse=True) return [item for (item, score) in winners[:n+overflow]] def generate(args): jenv = Environment( loader=PackageLoader("dlibrary"), autoescape=select_autoescape() ) viewer_template = jenv.get_template("viewer.html") list_template = jenv.get_template("list.html") categorization_template = jenv.get_template("categorization.html") work_template = jenv.get_template("work.html") index_template = jenv.get_template("index.html") con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() site_dir = args.destdir / 'site' collated_work_ids = {p.name for p in (site_dir / 'images').iterdir()} works = [] for (work_id, title, circle, date, description, series) in cur.execute('SELECT id, title, circle, date, description, series FROM works ORDER BY date DESC').fetchall(): if work_id not in collated_work_ids: continue authors = [author for (author,) in cur.execute('SELECT author FROM authors WHERE work = ?', (work_id,))] tags = [tag for (tag,) in cur.execute('SELECT tag FROM tags WHERE work = ?', (work_id,))] images = [path.name for path in (site_dir / 'images' / work_id).iterdir()] images.sort() try: thumbnail_path = relpath(next( f for f in (site_dir / 'thumbnails').iterdir() if f.stem == work_id ), site_dir) except StopIteration: thumbnail_path = f'images/{work_id}/{images[0]}' work = { 'id': work_id, 'title': title, 'circle': circle, 'date': date, 'description': description, 'series': series, 'authors': authors, 'tags': tags, 'thumbnail_path': thumbnail_path, 'images': images, } works.append(work) for (idx, work) in enumerate(works): def suggestion_priority(other_work): if other_work is work: return -2 if work['series'] and work['series'] == other_work['series']: return -1 return similarity(work['title'], other_work['title']) suggested = top(works, SUGGESTED_WORKS_COUNT, suggestion_priority) work_dir = site_dir / 'works' / work['id'] viewer_dir = work_dir / 'view' viewer_dir.mkdir(parents=True, exist_ok=True) with open(work_dir / 'index.html', 'w') as f: f.write(work_template.render(depth=2, work=work, title=work['title'], suggested=suggested)) with open(viewer_dir / 'index.html', 'w') as f: f.write(viewer_template.render(depth=3, work=work, title=work['title'])) print(f'\x1b[2K\r{idx+1}/{len(works)} works processed...', end=('' if idx+1 < len(works) else '\n')) def make_categorization(categorization, query, work_filter, work_style_cards=False): categorization_dir = site_dir / categorization cats = [cat for (cat,) in cur.execute(query)] cat_samples = {} for cat in cats: cat_works = list(filter(work_filter(cat), works)) cat_samples[cat] = cat_works[0] if len(cat_works) > 0 else None safeish_cat = cat.replace('/', ' ') cat_dir = categorization_dir / safeish_cat cat_dir.mkdir(parents=True, exist_ok=True) with open(cat_dir / 'index.html', 'w') as f: f.write(list_template.render( depth=2, works=cat_works, title=cat, categorization=categorization, )) categorization_dir.mkdir(parents=True, exist_ok=True) with open(categorization_dir / 'index.html', 'w') as f: f.write(categorization_template.render( depth=1, categorization=categorization, categories=cats, samples=cat_samples, work_style_cards=work_style_cards, )) make_categorization( 'authors', 'SELECT DISTINCT author FROM authors ORDER BY author', lambda author: lambda work: author in work['authors'], ) make_categorization( 'tags', 'SELECT DISTINCT tag FROM tags ORDER BY tag', lambda tag: lambda work: tag in work['tags'], ) make_categorization( 'circles', 'SELECT DISTINCT circle FROM works WHERE circle NOT NULL ORDER BY circle', lambda circle: lambda work: work['circle'] == circle, ) make_categorization( 'series', 'SELECT DISTINCT series FROM works WHERE series NOT NULL ORDER BY series', lambda series: lambda work: work['series'] == series, work_style_cards=True, ) with resources.as_file(resources.files("dlibrary")) as r: copy_recursive(r / 'static', site_dir / 'static') with open(site_dir / 'index.html', 'w') as f: f.write(index_template.render(depth=0, works=works)) con.close() argparser = argparse.ArgumentParser( prog='dlibrary', formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent("""\ Organize DRM-free works purchased from DLSite into a library that can be viewed in a web browser. Intended workflow: - `extract` a collection of archive files into DLibrary's data directory, automatically giving each work its own subfolder. - `fetch` metadata and thumbnail images for extracted works from DLSite. - `collate` extracted works, producing a single sequence of image files (or symlinks into the extracted data, when possible) for each work. - Manually adjust works' `metadata` when necessary. - `generate` a static website providing a catalog and viewer for all collated works. """), ) argparser.add_argument( '-d', '--destdir', type=Path, default=Path(os.getenv('DLIBRARY_DIR', './dlibrary')), help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)', ) argparser.add_argument( '-D', '--debug', action='store_true', help='print out debugging info', ) argparser.add_argument( '-l', '--locale', type=str, default=os.getenv('DLIBRARY_LOCALE', 'en_US'), help=('preferred locale for requesting metadata and collating (e.g. "ja_JP", "en_US"). ' 'May still fall back to Japanese if other languages are unavailable. ' '(default: $DLIBRARY_LOCALE or en_US)'), ) argparser.add_argument( '-a', '--auto', action='store_true', help='automatically continue the extract->fetch->collate->generate pipeline starting from whatever subcommand is being run', ) subparsers = argparser.add_subparsers(title="subcommands", required=True) parser_extract = subparsers.add_parser('extract', aliases=['x'], help='extract archive files') parser_extract.add_argument( '-r', '--remove', action='store_true', help='remove original archive files after extraction', ) parser_extract.add_argument( 'archives', metavar='FILE', type=Path, nargs='+', help='archive files to extract', ) parser_extract.set_defaults(func=extract) parser_fetch = subparsers.add_parser('fetch', aliases=['f'], help='fetch metadata and thumbnails') parser_fetch.set_defaults(func=fetch) parser_collate = subparsers.add_parser( 'collate', aliases=['c'], help='collate works into sequences of image files', formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent("""\ For each extracted work that has not already been collated, DLibrary will attempt to intuit its structure and create a single ordered list of image files in the site data directory. Each image will either be a symlink to an image file in the extraction folder, or a single page extracted from a PDF file. DLibrary may fail to automatically collate a work if its files and subdirectories are not named in a way that indicates a clear linear ordering. In order to assist with collation, you can provide a list of expressions specifying where to start traversing the directory structure, what files to include in what order, and/or what files to ignore entirely. An expression can be: PATH A single path. If this is an image, it will be appended to the sequence of collated images for the work it belongs to; if this is a PDF, images will be extracted from it and concatenated to the sequence; if this is a directory, the contents of the directory will be automatically collated using DLibrary's default heuristics, and concatenated to the sequence. ( PATH [PATH ...] ) A group of paths contained in parentheses. You may need to escape the parentheses to avoid them getting parsed by your shell. All the paths in this group will be considered together, and automatically collated using the default heuristics, regardless of what order the paths are provided in. ! PATH ! ( PATH [PATH ...] ) A path or group of paths to exclude from collation. You may need to escape the !. If an excluded path appears within any of the other specified paths, it will be skipped by the collation heuristics. If the only expressions provided are negations, then auto-collation will start from the top level of the extracted work while skipping the excluded paths. All provided paths must be under $DLIBRARY_DIR/extract/[work id]/ for some not-yet-collated work. Paths belonging to multiple different works can all be provided on the same command line, and expressions will be clustered together by work id while otherwise preserving the order they were provided in. A parenthesized group expression must only contain paths belonging to a single work. By default, DLibrary will attempt to collate every not-yet-collated work (excluding "virtual" works), using the provided expressions to assist in collation when available. The `-o` flag will direct DLibrary to *only* collate works included in the provided expressions, even if other uncollated works are present. """), ) parser_collate.add_argument( '-o', '--only-specified-works', action='store_true', help="only collate works that are explicitly specified", ) parser_collate.add_argument( '-p', '--pdf-strategy', choices=[ 'ask', '?', 'show-ask', 's', 'convert', 'c', 'extract', 'x', 'drop', 'd', 'nope', 'n' ], default='show-ask', help="how to handle PDF pages that aren't a single image with no text", ) parser_collate.add_argument( 'expression', nargs='*', help='expressions indicating paths to collate or skip', ) parser_collate.set_defaults(func=collate) parser_analyze = subparsers.add_parser('analyze', aliases=['a'], help='analyze an extracted folder to assist in collation') parser_analyze.add_argument('work_id') parser_analyze.set_defaults(func=analyze) parser_metadata = subparsers.add_parser('metadata', aliases=['m'], help='view or modify metadata for a work') parser_metadata.add_argument('work_id') parser_metadata.add_argument( '--virtual', action=argparse.BooleanOptionalAction, help='set work as virtual', ) parser_metadata.set_defaults(func=metadata) parser_generate = subparsers.add_parser( 'generate', aliases=['g'], help='generate HTML/CSS/JS for library site', formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent("""\ The static site will be generated under $DLIBRARY_DIR/site/ and can be served by pointing an HTTP server at that directory. Note that some files inside the static site hierarchy will be symlinks into $DLIBRARY_DIR/extract/ outside the site hierarchy, so make sure your HTTP server will allow those symlinks to be read. """), ) parser_generate.set_defaults(func=generate) def main(): args = argparser.parse_args() global debug_mode debug_mode = args.debug args.func(args) if __name__ == "__main__": main()