#!/usr/bin/env python3 import argparse import asyncio from pathlib import Path from os.path import relpath, splitext import re import sqlite3 from urllib.parse import urlparse import zipfile from dlsite_async import DlsiteAPI import fitz import requests NUMBER_REGEX = re.compile('[0-9]+') IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff'] def open_zipfile_with_encoding(path): try: return zipfile.ZipFile(path, metadata_encoding="utf-8") except UnicodeDecodeError: pass try: return zipfile.ZipFile(path, metadata_encoding="shift-jis") except UnicodeDecodeError: pass return zipfile.ZipFile(path, metadata_encoding="shift-jisx0213") def extract(args): for zip_path in args.zipfiles: work_id = zip_path.stem work_extract_path = args.destdir / 'extract' / work_id work_extract_path.mkdir(parents=True) with open_zipfile_with_encoding(zip_path) as z: z.extractall(path=work_extract_path) if args.remove: zip_path.unlink() async def fetch_async(args): con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)") cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))") cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))") thumbnails_dir = args.destdir / 'site' / 'thumbnails' thumbnails_dir.mkdir(parents=True, exist_ok=True) async with DlsiteAPI() as api: for work_path in (args.destdir / 'extract').iterdir(): work_id = work_path.name res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,)) if res.fetchone() is not None: continue print(f'Fetching metadata for {work_id}') metadata = await api.get_work(work_id) cur.execute( "INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)", { "id": work_id, "title": metadata.work_name, "circle": metadata.circle, "date": metadata.regist_date.date().isoformat(), "description": metadata.description, "series": metadata.series, }, ) cur.executemany( "INSERT INTO authors VALUES(:author, :work)", [{ "author": author, "work": work_id } for author in (metadata.author or [])], ) cur.executemany( "INSERT INTO tags VALUES(:tag, :work)", [{ "tag": tag, "work": work_id } for tag in (metadata.genre or [])], ) thumbnail_url = metadata.work_image if thumbnail_url.startswith('//'): thumbnail_url = 'https:' + thumbnail_url ext = url_file_ext(thumbnail_url) dest_file = thumbnails_dir / (work_id + ext) print(f'Downloading thumbnail for {work_id} from {thumbnail_url}') with open(dest_file, 'wb') as fd: with requests.get(thumbnail_url, stream=True) as r: for chunk in r.iter_content(chunk_size=16384): fd.write(chunk) con.commit() con.close() def url_file_ext(url): return splitext(urlparse(url).path)[1] def fetch(args): asyncio.run(fetch_async(args)) def collate(args): con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() for work_path in (args.destdir / 'extract').iterdir(): work_id = work_path.name collation_dir = args.destdir / 'site' / 'works' / work_id if collation_dir.exists(): continue virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone() if virtual == (1,): continue search_dir = work_path while True: entries = list(search_dir.iterdir()) if len(entries) == 1 and entries[0].is_dir(): search_dir = entries[0] else: break if len(entries) == 1 and entries[0].suffix.lower() == '.pdf': print(f'Extracting images from {entries[0].name} for {work_id}') link_pdf(entries[0], collation_dir) continue if len(entries) == 0: print(f'{work_id} contains no files? skipping') continue if all(entry.is_file() and entry.suffix.lower() in IMAGE_FILE_EXTENSIONS for entry in entries): ordering = complete_prefix_number_ordering(entries) if ordering: print(f'Symlinking image files for {work_id}') link_ordered_files(ordering, collation_dir) continue print(f'Unable to deduce file structure for {work_id}, skipping') con.close() def link_pdf(src, dest): with fitz.open(src) as pdf: images_by_page = [page.get_images() for page in pdf] if all(len(images) == 1 for images in images_by_page): dest.mkdir(parents=True) for (idx, images) in enumerate(images_by_page): xref = images[0][0] image = pdf.extract_image(xref) file_path = dest / f'{idx:04d}.{image["ext"]}' with open(file_path, 'wb') as f: f.write(image["image"]) else: print(f'Support for weirder PDFs not yet implemented, skipping {src}') def complete_prefix_number_ordering(entries): matches = reversed(list(NUMBER_REGEX.finditer(entries[0].name))) for m in matches: pos = m.start() prefix = entries[0].name[:pos] if all(e.name.startswith(prefix) for e in entries): entries_with_indices = [] indices = set() for e in entries: n = NUMBER_REGEX.match(e.name[pos:]) if n is None: return None i = int(n.group()) if i in indices: return None indices.add(i) entries_with_indices.append((e, i)) entries_with_indices.sort(key=lambda ei: ei[1]) return [e for (e, i) in entries_with_indices] return None def link_ordered_files(ordering, dest): dest.mkdir(parents=True) for (idx, src_path) in enumerate(ordering): ext = src_path.suffix.lower() link_path = dest / f'{idx:04d}{ext}' link_path.symlink_to(relpath(src_path, dest)) def metadata(args): con = sqlite3.connect(args.destdir / 'meta.db') cur = con.cursor() if args.virtual is not None: cur.execute("UPDATE works SET virtual = ? WHERE id = ?", (1 if args.virtual else 0, args.work_id)) con.commit() res = cur.execute( "SELECT title, circle, date, description, series, virtual FROM works WHERE id = ?", (args.work_id,), ).fetchone() if res is None: print(f'Work id {args.work_id} not found!') return (title, circle, date, description, series, virtual) = res print(f'Work ID: {args.work_id}') print(f'Title: {title}') print(f'Circle: {circle}') print(f'Pub date: {date}') print(f'Description: {description}') print(f'Series: {series}') print(f'Virtual: {"Yes" if virtual == 1 else "No"}') con.close() def publish(args): pass argparser = argparse.ArgumentParser(prog='dlibrary') argparser.add_argument( '-d', '--destdir', type=Path, default=Path('./dlibrary'), help='directory to store dlibrary content and metadata to (default: ./dlibrary)', ) subparsers = argparser.add_subparsers(title="subcommands") parser_extract = subparsers.add_parser('extract', help='extract zipfiles') parser_extract.add_argument( '-r', '--remove', action='store_true', help='remove original zipfiles after extraction', ) parser_extract.add_argument( 'zipfiles', metavar='FILE', type=Path, nargs='+', help='zipfiles to extract', ) parser_extract.set_defaults(func=extract) parser_fetch = subparsers.add_parser('fetch', help='fetch metadata and thumbnails') parser_fetch.set_defaults(func=fetch) parser_collate = subparsers.add_parser('collate', help='collate a single sequence of image files for each work') parser_collate.set_defaults(func=collate) parser_metadata = subparsers.add_parser('metadata', help='view or modify metadata for a work') parser_metadata.add_argument('work_id') parser_metadata.add_argument( '--virtual', action=argparse.BooleanOptionalAction, help='set work as virtual', ) parser_metadata.set_defaults(func=metadata) parser_publish = subparsers.add_parser('publish', help='generate HTML/CSS/JS for library site') parser_publish.set_defaults(func=publish) if __name__ == "__main__": args = argparser.parse_args() args.func(args)