#!/usr/bin/env python3 import argparse import asyncio import importlib_resources as resources from pathlib import Path from os import getenv from os.path import relpath, splitext import re import shutil import sqlite3 import textwrap from urllib.parse import urlparse import zipfile from dlsite_async import DlsiteAPI import fitz from jinja2 import Environment, PackageLoader, select_autoescape import requests NUMBER_REGEX = re.compile('[0-9]+') DLSITE_ID_REGEX = re.compile('^[BR]J[0-9]+$') FANZA_ID_REGEX = re.compile('^d_[0-9]+$') FAKKU_ID_REGEX = re.compile('.*_FAKKU$') TEXTLESS_REGEX = re.compile('(台詞|セリフ)(な|無)し|notext|textless') EPILOGUE_REGEX = re.compile('after|後日談') ALT_VERSIONS = ['褐色', '日焼け', 'pink', '金髪', '白肌', 'うつろ目'] IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff'] IGNOREABLE_FILES = ['Thumbs.db', '__MACOSX', '.DS_Store'] IGNOREABLE_EXTENSIONS = ['.txt', '.html', '.htm', '.psd'] def open_zipfile_with_encoding(path): try: return zipfile.ZipFile(path, metadata_encoding="utf-8") except UnicodeDecodeError: pass try: return zipfile.ZipFile(path, metadata_encoding="shift-jis") except UnicodeDecodeError: pass return zipfile.ZipFile(path, metadata_encoding="shift-jisx0213") def extract(args): for zip_path in args.zipfiles: work_id = zip_path.stem work_extract_path = args.destdir / 'extract' / work_id work_extract_path.mkdir(parents=True) print(f'Extracting {zip_path} to {work_extract_path}') with open_zipfile_with_encoding(zip_path) as z: z.extractall(path=work_extract_path) if args.remove: zip_path.unlink() def manual_input_metadata(work_id): print(f"Don't know how to fetch metadata for {work_id}, input manually:") title = input('Title: ') circle = input('Circle [None]: ') or None authors = [author.strip() for author in input('Authors (comma-separated): ').split(',') if author.strip()] tags = [tag.strip() for tag in input('Tags (comma-separated): ').split(',') if tag.strip()] date = input('Pub date (yyyy-mm-dd): ') description = input('Description: ') series = input('Series [None]: ') or None return { "id": work_id, "title": title, "circle": circle, "authors": authors, "tags": tags, "date": date, "description": description, "series": series, } async def fetch_async(args): con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)") cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))") cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))") thumbnails_dir = args.destdir / 'site' / 'thumbnails' thumbnails_dir.mkdir(parents=True, exist_ok=True) async with DlsiteAPI(locale=args.locale) as api: for work_path in (args.destdir / 'extract').iterdir(): work_id = work_path.name res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,)) if res.fetchone() is not None: continue if DLSITE_ID_REGEX.fullmatch(work_id): print(f'Fetching DLSite metadata for {work_id}') dlsite_metadata = await api.get_work(work_id) db_row = { "id": work_id, "title": dlsite_metadata.work_name, "circle": dlsite_metadata.circle, "date": dlsite_metadata.regist_date.date().isoformat(), "description": dlsite_metadata.description, "series": dlsite_metadata.series, } authors = dlsite_metadata.author or [] tags = dlsite_metadata.genre or [] thumbnail_url = dlsite_metadata.work_image if thumbnail_url.startswith('//'): thumbnail_url = 'https:' + thumbnail_url else: db_row = manual_input_metadata(work_id) authors = db_row.pop('authors') tags = db_row.pop('tags') if FANZA_ID_REGEX.fullmatch(work_id): thumbnail_url = f'https://doujin-assets.dmm.co.jp/digital/comic/{work_id}/{work_id}pl.jpg' elif FAKKU_ID_REGEX.fullmatch(work_id): thumbnail_url = None else: thumbnail_url = input('Thumbnail image URL [default: first page]: ') cur.execute( "INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)", db_row, ) cur.executemany( "INSERT INTO authors VALUES(:author, :work)", [{ "author": author, "work": work_id } for author in authors], ) cur.executemany( "INSERT INTO tags VALUES(:tag, :work)", [{ "tag": tag, "work": work_id } for tag in tags], ) if thumbnail_url: ext = url_file_ext(thumbnail_url) dest_file = thumbnails_dir / (work_id + ext) print(f'Downloading thumbnail for {work_id} from {thumbnail_url}') with open(dest_file, 'wb') as fd: with requests.get(thumbnail_url, stream=True) as r: for chunk in r.iter_content(chunk_size=16384): fd.write(chunk) con.commit() con.close() def url_file_ext(url): return splitext(urlparse(url).path)[1] def fetch(args): asyncio.run(fetch_async(args)) def image_xrefs(pdf): images_by_page = [page.get_images() for page in pdf] if all(len(images) == 1 for images in images_by_page): return [images[0][0] for images in images_by_page] print("Checking PDF images the quick way failed, trying the slow way") xrefs = [] for (idx, page) in enumerate(pdf): print(f'\x1b[2K\r{idx}/{pdf.page_count} pages processed...', end='') images = page.get_image_info(xrefs=True) if len(images) != 1 or images[0]['xref'] == 0: print('\nFailed') return None xrefs.append(images[0]['xref']) print('\nSuccess') return xrefs def link_pdf(src, dest, start_index): with fitz.open(src) as pdf: xrefs = image_xrefs(pdf) if xrefs is None: print(f'Support for weirder PDFs not yet implemented, skipping {src}') return None dest.mkdir(parents=True, exist_ok=True) for (idx, xref) in enumerate(xrefs, start=start_index): image = pdf.extract_image(xref) file_path = dest / f'{idx:04d}.{image["ext"]}' with open(file_path, 'wb') as f: f.write(image["image"]) return pdf.page_count def complete_prefix_number_ordering(entries): if len(entries) == 1: return entries entries_by_version = {} for entry in entries: version_code = 0 for (i, version) in enumerate(ALT_VERSIONS): if version in entry.name: version_code |= (1 << i) entries_by_version.setdefault(version_code, []).append(entry) numberings_by_version = {ver: unique_hierarchical_prefix_numbering(entries_by_version[ver]) for ver in entries_by_version} unified_indices = set() for numbering in numberings_by_version.values(): if numbering is None: return None unified_indices |= set(numbering.keys()) unified_indices.discard(None) unified_indices = list(unified_indices) unified_indices.sort() if len(unified_indices) > 1: for i in range(1, len(unified_indices)): cur = unified_indices[i] prev = unified_indices[i-1] for level in range(min(len(cur), len(prev))): if cur[level] != prev[level]: if cur[level] - prev[level] > 2: return None break unified_indices.append(None) versions = list(numberings_by_version.keys()) versions.sort() version_lengths = {ver: len(numberings_by_version[ver]) for ver in numberings_by_version} inner_versions = [] outer_versions = [versions[0]] for ver in versions[1:]: if version_lengths[ver] >= version_lengths[versions[0]] - 2: outer_versions.append(ver) else: inner_versions.append(ver) result = [] for out_ver in outer_versions: for i in unified_indices: for ver in ([out_ver] + (inner_versions if out_ver == versions[0] else [])): result += numberings_by_version[ver].get(i, []) return result def unique_hierarchical_prefix_numbering(entries, start_point=0): if len(entries) == 1 and not NUMBER_REGEX.search(entries[0].name): return {None: entries} longest_entry = max(entries, key=lambda e: len(e.name)) matches = reversed(list(NUMBER_REGEX.finditer(longest_entry.name))) for m in matches: pos = m.start() if pos < start_point: return None prefix = longest_entry.name[:pos] if all(e.name.startswith(prefix) or prefix.startswith(e.stem) for e in entries): numbering = {} for e in entries: if pos >= len(e.stem): i = 0 else: n = NUMBER_REGEX.match(e.name[pos:]) if n is None: return None i = int(n.group()) numbering.setdefault((i,), []).append(e) indices = list(numbering.keys()) for idx in indices: if len(numbering[idx]) > 1: ents_idx = numbering.pop(idx) longest = max(ents_idx, key=lambda e: len(e.name)) next_layer_start = pos + NUMBER_REGEX.match(longest.name[pos:]).end() sub_numbering = unique_hierarchical_prefix_numbering(ents_idx, start_point=next_layer_start) or alphabetic_numbering(ents_idx, next_layer_start) if not sub_numbering: return None for sub_idx in sub_numbering: numbering[(*idx, *sub_idx)] = sub_numbering[sub_idx] return numbering return None def alphabetic_numbering(entries, start_point): alphabetized = {} for entry in entries: ending = entry.stem[start_point:] if len(ending) > 1: return None index = 0 if ending == '' else ord(ending.lower()) - ord('a') if index in alphabetized: return None alphabetized[(index,)] = [entry] indices = list(alphabetized.keys()) indices.sort() if indices != [(i,) for i in range(len(indices))]: return None return alphabetized def link_ordered_files(ordering, dest, start_index): dest.mkdir(parents=True, exist_ok=True) for (idx, src_path) in enumerate(ordering, start=start_index): ext = src_path.suffix.lower() link_path = dest / f'{idx:04d}{ext}' link_path.symlink_to(relpath(src_path, dest)) def ls_ignore(directory): return [ path for path in directory.iterdir() if path.name not in IGNOREABLE_FILES and path.suffix.lower() not in IGNOREABLE_EXTENSIONS ] def collate(args): con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() extraction_dir = args.destdir / 'extract' hint_map = {hint.relative_to(extraction_dir).parents[-2].name: hint for hint in args.hints} collation_staging_area = args.destdir / 'site' / 'images-staging' collation_staging_area.mkdir(parents=True) for work_path in extraction_dir.iterdir(): work_id = work_path.name collation_dir = args.destdir / 'site' / 'images' / work_id if collation_dir.exists(): continue virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone() if virtual == (1,): continue work_staging_dir = collation_staging_area / work_id pages_collated = collate_from_paths([hint_map.get(work_id, work_path)], work_staging_dir, 0) if pages_collated: print(f'Collated {pages_collated} pages for {work_id}') work_staging_dir.rename(collation_dir) else: if work_staging_dir.is_dir(): for f in work_staging_dir.iterdir(): f.unlink() work_staging_dir.rmdir() if pages_collated == 0: print(f'{work_id} contains no files? skipping') elif pages_collated is None: print(f'Unable to deduce file structure for {work_id}, skipping') collation_staging_area.rmdir() con.close() def collate_regex_later(srcs, dest, regex, start_index): matching = [] nonmatching = [] for src in srcs: if regex.search(src.name): matching.append(src) else: nonmatching.append(src) if not (matching and nonmatching): return False nonmatching_pages = collate_from_paths(nonmatching, dest, start_index) if nonmatching_pages is None: return None matching_pages = collate_from_paths(matching, dest, start_index+nonmatching_pages) if matching_pages is None: return None return nonmatching_pages + matching_pages def collate_from_paths(srcs, dest, start_index): if len(srcs) == 1 and srcs[0].is_dir(): return collate_from_paths(ls_ignore(srcs[0]), dest, start_index) if len(srcs) == 1 and srcs[0].suffix.lower() == '.pdf': print(f'Extracting images from {srcs[0]}') return link_pdf(srcs[0], dest, start_index) if len(srcs) == 0: return 0 textless_split = collate_regex_later(srcs, dest, TEXTLESS_REGEX, start_index) if textless_split != False: return textless_split epilogue_split = collate_regex_later(srcs, dest, EPILOGUE_REGEX, start_index) if epilogue_split != False: return epilogue_split if all(src.is_file() and src.suffix.lower() in IMAGE_FILE_EXTENSIONS for src in srcs): ordering = complete_prefix_number_ordering(srcs) if ordering: print(f'Symlinking image files: {ordering[0]}...') link_ordered_files(ordering, dest, start_index) return len(ordering) else: return None return None def self_and_parents(path): return [path] + list(path.parents) def manual_collate(args): work_id = self_and_parents(args.paths[0].relative_to(args.destdir / 'extract'))[-2].name collation_dir = args.destdir / 'site' / 'images' / work_id if collation_dir.exists() and len(list(collation_dir.iterdir())) > 0: print(f'Collation directory already exists!') return nonexistent = [path for path in args.paths if not path.exists()] if len(nonexistent) > 0: print(f'Nonexistent paths: {nonexistent}') return collation_dir.mkdir(parents=True, exist_ok=True) index = 0 for path in args.paths: if path.is_dir(): entries = [p for p in path.iterdir() if p.suffix.lower() in IMAGE_FILE_EXTENSIONS] ordering = complete_prefix_number_ordering(entries) if ordering is None: ordering = entries ordering.sort() link_ordered_files(ordering, collation_dir, index) index += len(ordering) elif path.suffix.lower() in IMAGE_FILE_EXTENSIONS: link_ordered_files([path], collation_dir, index) index += 1 elif path.suffix.lower() == ".pdf": pdf_page_count = link_pdf(path, collation_dir, index) if pdf_page_count is None: return index += pdf_page_count else: print(f'Unknown file type {path}, stopping') return def metadata(args): con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() if args.virtual is not None: cur.execute("UPDATE works SET virtual = ? WHERE id = ?", (1 if args.virtual else 0, args.work_id)) con.commit() res = cur.execute( "SELECT title, circle, date, description, series, virtual FROM works WHERE id = ?", (args.work_id,), ).fetchone() if res is None: print(f'Work id {args.work_id} not found!') return (title, circle, date, description, series, virtual) = res print(f'Work ID: {args.work_id}') print(f'Title: {title}') print(f'Circle: {circle}') print(f'Pub date: {date}') print(f'Description: {description}') print(f'Series: {series}') print(f'Virtual: {"Yes" if virtual == 1 else "No"}') con.close() def copy_recursive(src, dest): dest.mkdir(parents=True, exist_ok=True) for item in src.iterdir(): if item.is_dir() and not item.is_symlink(): copy_recursive(item, dest / item.name) else: shutil.copyfile(item, dest / item.name) def generate(args): jenv = Environment( loader=PackageLoader("dlibrary"), autoescape=select_autoescape() ) viewer_template = jenv.get_template("viewer.html") list_template = jenv.get_template("list.html") categorization_template = jenv.get_template("categorization.html") work_template = jenv.get_template("work.html") index_template = jenv.get_template("index.html") con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() site_dir = args.destdir / 'site' collated_work_ids = {p.name for p in (site_dir / 'images').iterdir()} actual_series = {series for (series,) in cur.execute('SELECT series FROM works GROUP BY series HAVING count(series) > 1')} works = [] for (work_id, title, circle, date, description, series) in cur.execute('SELECT id, title, circle, date, description, series FROM works ORDER BY date DESC').fetchall(): if work_id not in collated_work_ids: continue authors = [author for (author,) in cur.execute('SELECT author FROM authors WHERE work = ?', (work_id,))] tags = [tag for (tag,) in cur.execute('SELECT tag FROM tags WHERE work = ?', (work_id,))] images = [path.name for path in (site_dir / 'images' / work_id).iterdir()] images.sort() try: thumbnail_path = relpath(next( f for f in (site_dir / 'thumbnails').iterdir() if f.stem == work_id ), site_dir) except StopIteration: thumbnail_path = f'images/{work_id}/{images[0]}' work = { 'id': work_id, 'title': title, 'circle': circle, 'date': date, 'description': description, 'series': series, 'authors': authors, 'tags': tags, 'thumbnail_path': thumbnail_path, } works.append(work) work_dir = site_dir / 'works' / work_id viewer_dir = work_dir / 'view' viewer_dir.mkdir(parents=True, exist_ok=True) with open(work_dir / 'index.html', 'w') as f: f.write(work_template.render(depth=2, work=work, title=title, images=images)) with open(viewer_dir / 'index.html', 'w') as f: f.write(viewer_template.render(depth=3, work=work, title=title, images=images)) def make_categorization(categorization, query, work_filter, work_style_cards=False): categorization_dir = site_dir / categorization cats = [cat for (cat,) in cur.execute(query)] cat_samples = {} for cat in cats: cat_works = list(filter(work_filter(cat), works)) cat_samples[cat] = cat_works[0] if len(cat_works) > 0 else None safeish_cat = cat.replace('/', ' ') cat_dir = categorization_dir / safeish_cat cat_dir.mkdir(parents=True, exist_ok=True) with open(cat_dir / 'index.html', 'w') as f: f.write(list_template.render( depth=2, works=cat_works, title=cat, categorization=categorization, )) categorization_dir.mkdir(parents=True, exist_ok=True) with open(categorization_dir / 'index.html', 'w') as f: f.write(categorization_template.render( depth=1, categorization=categorization, categories=cats, samples=cat_samples, work_style_cards=work_style_cards, )) make_categorization( 'authors', 'SELECT DISTINCT author FROM authors ORDER BY author', lambda author: lambda work: author in work['authors'], ) make_categorization( 'tags', 'SELECT DISTINCT tag FROM tags ORDER BY tag', lambda tag: lambda work: tag in work['tags'], ) make_categorization( 'circles', 'SELECT DISTINCT circle FROM works WHERE circle NOT NULL ORDER BY circle', lambda circle: lambda work: work['circle'] == circle, ) make_categorization( 'series', 'SELECT DISTINCT series FROM works WHERE series NOT NULL ORDER BY series', lambda series: lambda work: work['series'] == series, work_style_cards=True, ) with resources.as_file(resources.files("dlibrary")) as r: copy_recursive(r / 'static', site_dir / 'static') with open(site_dir / 'index.html', 'w') as f: f.write(index_template.render(depth=0, works=works)) con.close() argparser = argparse.ArgumentParser( prog='dlibrary', formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent("""\ Organize DRM-free works purchased from DLSite into a library that can be viewed in a web browser. Intended workflow: - `extract` a collection of zipfiles downloaded from DLSite into DLibrary's data directory, giving each work its own subfolder. - `fetch` metadata and thumbnail images for extracted works from DLSite. - `collate` and/or `manual-collate` extracted works, producing a single sequence of image files (or symlinks into the extracted data, when possible) for each work. - Manually adjust works' `metadata` when necessary. - `generate` a static website providing a catalog and viewer for all collated works. """), ) argparser.add_argument( '-d', '--destdir', type=Path, default=Path(getenv('DLIBRARY_DIR', './dlibrary')), help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)', ) subparsers = argparser.add_subparsers(title="subcommands", required=True) parser_extract = subparsers.add_parser('extract', help='extract zipfiles') parser_extract.add_argument( '-r', '--remove', action='store_true', help='remove original zipfiles after extraction', ) parser_extract.add_argument( 'zipfiles', metavar='FILE', type=Path, nargs='+', help='zipfiles to extract', ) parser_extract.set_defaults(func=extract) parser_fetch = subparsers.add_parser('fetch', help='fetch metadata and thumbnails') parser_fetch.add_argument( '-l', '--locale', type=str, default=getenv('DLIBRARY_LOCALE', 'en_US'), help=('locale to use when requesting metadata (e.g. "ja_JP", "en_US"). ' 'May still fall back to Japanese if metadata in other languages is unavailable. ' '(default: $DLIBRARY_LOCALE or en_US)'), ) parser_fetch.set_defaults(func=fetch) parser_collate = subparsers.add_parser( 'collate', help='collate each work into a sequence of image files', formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent("""\ For each extracted work that has not already been collated, DLibrary will attempt to intuit its structure as follows: - Enter the work's directory. If the directory contains nothing except a single subdirectory (ignoring a few types of files that are definitely not relevant), traverse downwards repeatedly. - If the current directory contains nothing except a single PDF (again, ignoring irrelevant files), attempt to extract a series of images from the PDF. This process expects that each page of the PDF consists of a single embedded image, which will be extracted at full resolution. Support for more complex PDFs is not yet implemented. - If the current directory contains nothing except image files, and the image files are named in a way that clearly indicates a complete numerical order (each filename consists of a shared prefix followed by a distinct number), symlink files in the inferred order. - Otherwise, skip processing this work for now. DLibrary can be given "collation hints" which provide alternative starting points for this search process. A hint is a path under $DLIBRARY_DIR/extract/[work id]/ indicating a different directory or PDF file to begin the search process for that work, rather than starting at the top level of the extracted data. There can be at most one hint per work; for more complicated scenarios where a work includes multiple folders that need to be collated together, or where filenames do not clearly indicate an ordering, use `manual-collate` instead. """), ) parser_collate.add_argument( 'hints', metavar='PATH', type=Path, nargs='*', help='paths within extraction folders as collation hints' ) parser_collate.set_defaults(func=collate) parser_manual_collate = subparsers.add_parser( 'manual-collate', help='collate a single work manually', formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent("""\ All provided paths must be under $DLIBRARY_DIR/extract/[work id]/ for the work being manually collated. `manual-collate` can only handle one work at a time. Paths are used as follows: - If a path is a directory, all *image files* immediately inside that directory will be appended to the sequence. If files are named in a way which indicates a clear ordering, that ordering will be used. Otherwise, filenames will be sorted lexicographically. Non-image files and subdirectories will be ignored. - If a path is an image file, that image file will be appended to the sequence. - If a path is a PDF file, page images will be extracted from that PDF and appended to the sequence. """), ) parser_manual_collate.add_argument( 'paths', metavar='PATH', type=Path, nargs='+', help='paths within a single work to be collated in sequence', ) parser_manual_collate.set_defaults(func=manual_collate) parser_metadata = subparsers.add_parser('metadata', help='view or modify metadata for a work') parser_metadata.add_argument('work_id') parser_metadata.add_argument( '--virtual', action=argparse.BooleanOptionalAction, help='set work as virtual', ) parser_metadata.set_defaults(func=metadata) parser_generate = subparsers.add_parser( 'generate', help='generate HTML/CSS/JS for library site', formatter_class=argparse.RawDescriptionHelpFormatter, description=textwrap.dedent("""\ The static site will be generated under $DLIBRARY_DIR/site/ and can be served by pointing an HTTP server at that directory. Note that some files inside the static site hierarchy will be symlinks into $DLIBRARY_DIR/extract/ outside the site hierarchy, so make sure your HTTP server will allow those symlinks to be read. """), ) parser_generate.set_defaults(func=generate) def main(): args = argparser.parse_args() args.func(args) if __name__ == "__main__": main()