dlibrary/dlibrary/dlibrary.py

1394 lines
50 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
import argparse
import asyncio
import importlib_resources as resources
from io import BytesIO
from pathlib import Path
import os
from os.path import relpath, splitext, join
import random
import re
import readline
import shutil
import sqlite3
import stat
import textwrap
import unicodedata
from urllib.parse import urlparse
import zipfile
from dlsite_async import DlsiteAPI
import fitz
from libsixel import *
from PIL import Image
from jinja2 import Environment, PackageLoader, select_autoescape
import rarfile
import requests
NUMBER_REGEX = re.compile('[0-9-]+')
DLSITE_ID_REGEX = re.compile('^[BR]J[0-9]+$')
FANZA_ID_REGEX = re.compile('^d_[0-9]+$')
FAKKU_ID_REGEX = re.compile('.*_FAKKU$')
HI_RES_REGEX = re.compile('高解像度|原寸|大サイズ', re.I)
NO_TONE_REGEX = re.compile('トーン(効果)?[な無]し|グレースケール', re.I)
TONE_REGEX = re.compile('トーン(版|(効果)?[有あ]り)', re.I)
COLOR_REGEX = re.compile('カラー', re.I)
MONOCHROME_REGEX = re.compile('モノクロ', re.I)
IMAGE_QUALITY_REGEXES = [
{ 'better': HI_RES_REGEX },
{ 'better': NO_TONE_REGEX, 'worse': TONE_REGEX },
{ 'better': COLOR_REGEX, 'worse': MONOCHROME_REGEX },
]
LANGUAGE_REGEXES = {
'en_US': re.compile('english|英語', re.I),
'ja_JP': re.compile('日本語', re.I),
'zh_CN': re.compile('(^|[^體])中文|中国語', re.I),
'zh_TW': re.compile('繁體中文', re.I),
'ko_KR': re.compile('한국어', re.I),
}
TEXTLESS_REGEX = re.compile('(台詞|セリフ|せりふ|テキスト|文字)((な|無)し|抜き)|notext|textless', re.I)
FXLESS_REGEX = re.compile('効果音(な|無)し', re.I)
FRONT_COVER_REGEX = re.compile('(?<!裏)表紙(?!裏)|(?<!back[-_ ])(?<!back)cover|(?<!ura[-_ ])(?<!ura)hyou?sh?i(?![-_ ]?ura)', re.I)
BACK_COVER_REGEX = re.compile('裏表紙|hyou?sh?i[-_ ]?ura|ura[-_ ]?hyou?sh?i', re.I)
BONUS_REGEX = re.compile('設定|キャラ|特典|ポスター|bonus', re.I)
EPILOGUE_REGEX = re.compile('after|後日談|おまけ', re.I)
SPLITS = [
{ 'later': TEXTLESS_REGEX },
{ 'later': FXLESS_REGEX },
{ 'earlier': FRONT_COVER_REGEX, 'later': BACK_COVER_REGEX },
{ 'later': BONUS_REGEX },
{ 'later': EPILOGUE_REGEX },
]
ALT_VERSIONS = [
'褐色',
'日焼け',
'pink',
'金髪',
'白肌',
'うつろ目',
'dark skin',
'ラバー',
'ゾンビ肌',
'マスク',
'アヘ顔',
]
IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff', '.bmp']
IGNOREABLE_FILES = ['Thumbs.db', '__MACOSX', '.DS_Store', 'desktop.ini']
IGNOREABLE_EXTENSIONS = ['.txt', '.html', '.htm', '.psd', '.mp4']
PDF_CONVERSION_DPI = 300
PDF_PREVIEW_DPI = 72
IRRELEVANT_PDF_BLOCK_REGEX = re.compile(r'\bTCPDF\b', re.I)
MULTIPART_RAR_HEAD_REGEX = re.compile(r'^(.+)\.part0*1\.exe$', re.I)
MULTIPART_RAR_TAIL_REGEX = re.compile(r'^(.+)\.part0*([^1]|[^0].+)\.rar$', re.I)
PDF_REFERENCED_IMAGE_REGEX = re.compile(r'(^|(?<=\s))/(?P<ref_name>\S+)\s+Do($|(?=\s))')
PDF_INLINE_IMAGE_REGEX = re.compile(r'(^|\s)(BI|ID|EI)($|\s)')
SUGGESTED_WORKS_COUNT = 10
READONLY_FILE = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
READONLY_DIR = READONLY_FILE | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
debug_mode = False
def debug(s):
if debug_mode:
print(s)
def open_zipfile_with_encoding(path):
for enc in ["utf-8", "shift-jis", "shift-jisx0213"]:
try:
return zipfile.ZipFile(path, metadata_encoding=enc)
except UnicodeDecodeError:
pass
print(f'{path} contains filenames with unknown character encoding!')
exit(1)
def open_rarfile_with_encoding(path):
for enc in ["utf-8", "shift-jis", "shift-jisx0213"]:
rf = rarfile.RarFile(path, charset=enc)
if all('<EFBFBD>' not in info.filename for info in rf.infolist()):
return rf
print(f'{path} contains filenames with unknown character encoding!')
exit(1)
def readonly(path):
for parentdir, dirs, files in os.walk(path, topdown=False):
for f in files:
os.chmod(join(parentdir, f), READONLY_FILE, follow_symlinks=False)
os.chmod(parentdir, READONLY_DIR, follow_symlinks=False)
def extract(args):
absolute_archive_paths = set(path.resolve(strict=True) for path in args.archives)
any_skipped = False
for archive_path in args.archives:
if archive_path.suffix.lower() == '.zip':
work_id = archive_path.stem
work_extract_path = args.destdir / 'extract' / work_id
print(f'Extracting {archive_path} to {work_extract_path}')
with open_zipfile_with_encoding(archive_path) as z:
work_extract_path.mkdir(parents=True)
z.extractall(path=work_extract_path)
readonly(work_extract_path)
if args.remove:
archive_path.unlink()
elif rar_match := MULTIPART_RAR_HEAD_REGEX.fullmatch(archive_path.name):
work_id = rar_match.group(1)
work_extract_path = args.destdir / 'extract' / work_id
print(f'Extracting multipart RAR archive beginning with {archive_path} to {work_extract_path}')
with open_rarfile_with_encoding(archive_path) as r:
volumes = [Path(vol).resolve(strict=True) for vol in r.volumelist()]
if any(vol not in absolute_archive_paths for vol in volumes):
print(f'Multipart RAR archive starting with {archive_path} contains volume files not listed on command-line, skipping')
any_skipped = True
continue
work_extract_path.mkdir(parents=True)
r.extractall(path=work_extract_path)
readonly(work_extract_path)
if args.remove:
for vol in volumes:
vol.unlink()
elif MULTIPART_RAR_TAIL_REGEX.fullmatch(archive_path.name):
pass
else:
print(f'Unknown archive file type {archive_path}, skipping')
any_skipped = True
if args.auto and not any_skipped:
parser_fetch.parse_args(args=[], namespace=args)
fetch(args)
def manual_input_metadata(work_id):
print(f"Don't know how to fetch metadata for {work_id}, input manually:")
title = input('Title: ')
circle = input('Circle [None]: ') or None
authors = [author.strip() for author in input('Authors (comma-separated): ').split(',') if author.strip()]
tags = [tag.strip() for tag in input('Tags (comma-separated): ').split(',') if tag.strip()]
date = input('Pub date (yyyy-mm-dd): ')
description = input('Description: ')
series = input('Series [None]: ') or None
return {
"id": work_id,
"title": title,
"circle": circle,
"authors": authors,
"tags": tags,
"date": date,
"description": description,
"series": series,
}
async def fetch_async(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)")
cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id), PRIMARY KEY(author, work))")
cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id), PRIMARY KEY(tag, work))")
thumbnails_dir = args.destdir / 'site' / 'thumbnails'
thumbnails_dir.mkdir(parents=True, exist_ok=True)
async with DlsiteAPI(locale=args.locale) as api:
for work_path in (args.destdir / 'extract').iterdir():
work_id = work_path.name
res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,))
if res.fetchone() is not None:
continue
if DLSITE_ID_REGEX.fullmatch(work_id):
print(f'Fetching DLSite metadata for {work_id}')
dlsite_metadata = await api.get_work(work_id)
db_row = {
"id": work_id,
"title": dlsite_metadata.work_name,
"circle": dlsite_metadata.circle,
"date": dlsite_metadata.regist_date.date().isoformat(),
"description": dlsite_metadata.description,
"series": dlsite_metadata.series,
}
authors = dlsite_metadata.author or []
tags = dlsite_metadata.genre or []
thumbnail_url = dlsite_metadata.work_image
if thumbnail_url.startswith('//'):
thumbnail_url = 'https:' + thumbnail_url
else:
db_row = manual_input_metadata(work_id)
authors = db_row.pop('authors')
tags = db_row.pop('tags')
if FANZA_ID_REGEX.fullmatch(work_id):
candidate_urls = [
f'https://doujin-assets.dmm.co.jp/digital/{work_type}/{work_id}/{work_id}pl.jpg'
for work_type in ['comic', 'cg']
]
thumbnail_url = None
for url in candidate_urls:
h = requests.head(url, allow_redirects=False)
if h.status_code == 200:
thumbnail_url = url
break
elif FAKKU_ID_REGEX.fullmatch(work_id):
thumbnail_url = None
else:
thumbnail_url = input('Thumbnail image URL [default: first page]: ')
cur.execute(
"INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)",
db_row,
)
cur.executemany(
"INSERT INTO authors VALUES(:author, :work)",
[{ "author": author, "work": work_id } for author in authors],
)
cur.executemany(
"INSERT INTO tags VALUES(:tag, :work)",
[{ "tag": tag, "work": work_id } for tag in tags],
)
if thumbnail_url:
ext = url_file_ext(thumbnail_url)
dest_file = thumbnails_dir / (work_id + ext)
print(f'Downloading thumbnail for {work_id} from {thumbnail_url}')
with open(dest_file, 'wb') as fd:
with requests.get(thumbnail_url, stream=True) as r:
for chunk in r.iter_content(chunk_size=16384):
fd.write(chunk)
con.commit()
con.close()
def url_file_ext(url):
return splitext(urlparse(url).path)[1]
def fetch(args):
asyncio.run(fetch_async(args))
if args.auto:
parser_collate.parse_args(args=[], namespace=args)
collate(args)
def self_and_parents(path):
return [path] + list(path.parents)
def collate(args):
extraction_dir = args.destdir / 'extract'
def extracted_path_work_id(path):
trail = self_and_parents(Path(relpath(path, extraction_dir)))
if len(trail) < 2:
return None
result = trail[-2].name
if result == '..':
return None
return result
(raw_groups, raw_exclusions) = parse_expressions(args.expression)
specified_works = set()
works_groups = {}
for group in raw_groups:
if len(group) == 0:
continue
work_id = extracted_path_work_id(group[0])
if not work_id:
print(f'Group {group} contains paths outside an extracted work!')
exit(1)
if not all(extracted_path_work_id(item) == work_id for item in group[1:]):
print(f'Group {group} contains paths from multiple works!')
exit(1)
specified_works.add(work_id)
if work_id not in works_groups:
works_groups[work_id] = []
normalized_paths = [normalize_to(item, args.destdir) for item in group]
if not all(path.exists() for path in normalized_paths):
print(f'Group {group} contains nonexistent paths!')
exit(1)
works_groups[work_id].append(normalized_paths)
exclusions = []
for exclusion in raw_exclusions:
work_id = extracted_path_work_id(exclusion)
if not work_id:
print(f'Excluded path {exclusion} does not belong to an extracted work!')
exit(1)
specified_works.add(work_id)
normalized_path = normalize_to(exclusion, args.destdir)
if not normalized_path.exists():
print(f'Excluded path {exclusion} does not exist!')
exit(1)
exclusions.append(normalized_path)
collation_staging_area = args.destdir / 'site' / 'images-staging'
collation_staging_area.mkdir(parents=True)
collation_area = args.destdir / 'site' / 'images'
collation_area.mkdir(parents=True, exist_ok=True)
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
any_warnings = False
for work_path in extraction_dir.iterdir():
work_id = work_path.name
if args.only_specified_works and work_id not in specified_works:
continue
work_collation_dir = collation_area / work_id
if work_collation_dir.exists():
if work_id not in specified_works:
continue
if len(list(work_collation_dir.iterdir())) > 0:
print(f'Collation directory for work {work_id} already exists!')
any_warnings = True
break
else:
work_collation_dir.rmdir()
virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
if virtual in [(1,), None]:
if work_id in specified_works:
print(f'Work {work_id} {"is virtual" if virtual == (1,) else "has no metadata"}!')
any_warnings = True
break
continue
work_staging_dir = collation_staging_area / work_id
collator = Collator(work_staging_dir, exclusions, args)
for group in works_groups.get(work_id, [[work_path]]):
collation_result = collator.collate_from_paths([item for item in group if item not in exclusions])
if not collation_result:
print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}')
break
if collation_result and collator.index > 0:
print(f'Collated {collator.index} pages for {work_id}')
work_staging_dir.rename(work_collation_dir)
else:
if work_staging_dir.is_dir():
for f in work_staging_dir.iterdir():
f.unlink()
work_staging_dir.rmdir()
if not collation_result:
print(f'Unable to deduce file structure for {work_id}, skipping')
elif collator.index == 0:
print(f'No files found for {work_id}, skipping')
any_warnings = True
collation_staging_area.rmdir()
con.close()
if args.auto and not any_warnings:
parser_generate.parse_args(args=[], namespace=args)
generate(args)
class Collator:
def __init__(self, dest, exclude, args):
self.dest = dest
self.exclude = exclude
self.args = args
self.index = 0
def collate_from_paths(self, srcs):
srcs = [src for src in srcs if len(descendant_files_ignore(src, self.exclude)) > 0]
if len(srcs) == 1 and srcs[0].is_dir():
return self.collate_from_paths(ls_ignore(srcs[0], self.exclude))
if len(srcs) == 1 and is_pdf(srcs[0]):
print(f'Extracting images from {srcs[0]}')
return self.link_pdf(srcs[0])
if len(srcs) == 0:
return True
debug(f'Auto-collating {srcs}')
select_language = self.try_collate_select_language(srcs)
if select_language is not False:
return select_language
dirs = [src for src in srcs if src.is_dir()]
if len(dirs) == 2:
for quality in IMAGE_QUALITY_REGEXES:
def a_not_b(a, b, src):
if a in quality:
return quality[a].search(nname(src))
else:
return not quality[b].search(nname(src))
better_srcs = [src for src in dirs if a_not_b('better', 'worse', src)]
worse_srcs = [src for src in dirs if a_not_b('worse', 'better', src)]
if len(better_srcs) == 1 and len(worse_srcs) == 1 and better_srcs[0] != worse_srcs[0]:
better = better_srcs[0]
worse = worse_srcs[0]
if len(descendant_files_ignore(better, self.exclude)) == len(descendant_files_ignore(worse, self.exclude)):
non_dirs = [src for src in srcs if not src.is_dir()]
return self.collate_from_paths([better] + non_dirs)
images_vs_pdf = self.try_collate_images_vs_pdf(srcs)
if images_vs_pdf is not False:
return images_vs_pdf
for regexes in SPLITS:
split_attempt = self.try_collate_split_regex(srcs, **regexes)
if split_attempt is not False:
return split_attempt
if all(src.is_file() and is_image(src) for src in srcs):
ordering = complete_prefix_number_ordering(srcs)
if ordering:
print(f'Symlinking image files: {ordering[0]}...')
return self.link_ordered_files(ordering)
else:
return None
return None
def link_pdf(self, src):
with fitz.open(src) as pdf:
images = pdf_images(pdf, self.args.pdf_strategy)
if images is None:
print(f'Failed to enumerate page images in PDF {src}')
return None
self.dest.mkdir(parents=True, exist_ok=True)
print(f'0 pages collated...', end='')
for (idx, image) in enumerate(images, start=self.index):
file_path = self.dest / f'{idx:04d}.{image["ext"]}'
with open(file_path, 'wb') as f:
f.write(image["image"])
print(f'\x1b[2K\r{idx+1-self.index} pages collated...', end='')
print()
self.index += pdf.page_count
return True
def link_ordered_files(self, ordering):
self.dest.mkdir(parents=True, exist_ok=True)
for (idx, src_path) in enumerate(ordering, start=self.index):
ext = src_path.suffix.lower()
link_path = self.dest / f'{idx:04d}{ext}'
link_path.symlink_to(relpath(src_path, self.dest))
self.index += len(ordering)
return True
def try_collate_split_regex(self, srcs, earlier=None, later=None):
early_srcs = []
middle_srcs = []
late_srcs = []
for src in srcs:
if earlier and earlier.search(nname(src)):
early_srcs.append(src)
elif later and later.search(nname(src)):
late_srcs.append(src)
else:
middle_srcs.append(src)
if sum(1 for l in [early_srcs, middle_srcs, late_srcs] if l) <= 1:
return False
early_page_collation = self.collate_from_paths(early_srcs)
if early_page_collation is None:
return None
middle_page_collation = self.collate_from_paths(middle_srcs)
if middle_page_collation is None:
return None
late_page_collation = self.collate_from_paths(late_srcs)
if late_page_collation is None:
return None
return True
def try_collate_images_vs_pdf(self, srcs):
pdfs = [src for src in srcs if 'pdf' in src.name.lower()]
if len(pdfs) != 1:
return False
outer_pdf = pdfs[0]
inner_pdfs = [f for f in descendant_files_ignore(outer_pdf, self.exclude) if is_pdf(f)]
if len(inner_pdfs) != 1:
return False
inner_pdf = inner_pdfs[0]
non_pdf_srcs = [src for src in srcs if src != outer_pdf]
images = []
non_images = []
descendant_files = [f for src in non_pdf_srcs for f in descendant_files_ignore(src, self.exclude)]
for f in descendant_files:
if is_image(f):
images.append(f)
else:
non_images.append(f)
break
if len(non_images) != 0 or len(images) == 0:
return False
debug(f'Comparing PDF {inner_pdf} and images {images}')
pdf_sizes = pdf_image_sizes(inner_pdf)
standalone_sizes = [standalone_image_size(f) for f in images]
median_pdf_size = median(pdf_sizes)
median_standalone_size = median(standalone_sizes)
if not (median_pdf_size and median_standalone_size):
return False
debug(f'PDF: {len(pdf_sizes)} images, {median_pdf_size}; standalone: {len(standalone_sizes)} images, median {median_standalone_size}')
if abs(len(pdf_sizes) - len(standalone_sizes)) > 2:
with fitz.open(inner_pdf) as pdf:
pdf_page_count = len(pdf)
height_adjusted_pdf_image_count = (
len(pdf_sizes) *
mean([size[1] for size in pdf_sizes]) / mean([size[1] for size in standalone_sizes])
)
if (
abs(pdf_page_count - len(standalone_sizes)) <= 2 and
len(pdf_sizes) > len(standalone_sizes) and
median_pdf_size[0] == median_standalone_size[0] and
abs(height_adjusted_pdf_image_count - len(standalone_sizes)) <= 2
):
return self.collate_from_paths(non_pdf_srcs)
else:
return False
if superior_or_equal(median_standalone_size, median_pdf_size):
return self.collate_from_paths(non_pdf_srcs)
elif superior_or_equal(median_pdf_size, median_standalone_size):
return self.collate_from_paths([outer_pdf])
else:
return False
def try_collate_select_language(self, srcs):
if self.args.locale not in LANGUAGE_REGEXES:
return False
if not all(any(lang.search(nname(src)) for lang in LANGUAGE_REGEXES.values()) for src in srcs):
return False
srcs_matching_language = [src for src in srcs if LANGUAGE_REGEXES[self.args.locale].search(nname(src))]
if len(srcs_matching_language) == len(srcs) or len(srcs_matching_language) == 0:
return False
return self.collate_from_paths(srcs_matching_language)
def block_is_image(block):
return block[6] == 1
def block_text(block):
return block[4]
def block_relevant(block):
return block_is_image(block) or not IRRELEVANT_PDF_BLOCK_REGEX.search(block_text(block))
def relevant_blocks(page):
blocks = page.get_text('blocks')
return [block for block in blocks if block_relevant(block)]
def is_single_image(page):
blocks = relevant_blocks(page)
return len(blocks) == 1 and block_is_image(blocks[0])
def extract_image(pdf, xref):
image = pdf.extract_image(xref)
if f'.{image["ext"]}' in IMAGE_FILE_EXTENSIONS:
return image
print(f'Converting image from {image["ext"]} to png')
pix = fitz.Pixmap(pdf, xref)
return { 'ext': 'png', 'image': pix.tobytes('png') }
def get_displayed_image_xref(page):
ref_names = []
for content_xref in page.get_contents():
content = page.parent.xref_stream(content_xref).decode('ascii', 'replace')
if PDF_INLINE_IMAGE_REGEX.search(content):
debug('Inline image detected')
return None
for m in PDF_REFERENCED_IMAGE_REGEX.finditer(content):
ref_names.append(m.group('ref_name'))
if len(ref_names) == 0:
debug('Page does not reference any xobjects')
return None
if len(ref_names) > 1:
debug(f'Page references multiple xobjects: {ref_names}')
return None
image_xrefs = [image[0] for image in page.get_images() if image[7] == ref_names[0]]
if len(image_xrefs) == 1:
return image_xrefs[0]
if len(image_xrefs) == 0:
debug(f'No images found matching ref name {ref_names[0]}')
else:
debug(f"Multiple images found matching ref name {ref_names[0]}, that probably shouldn't happen")
return None
def display_sixel_page(page):
s = BytesIO()
image = Image.open(BytesIO(page.get_pixmap(dpi=PDF_PREVIEW_DPI).tobytes('png')))
width, height = image.size
try:
data = image.tobytes()
except NotImplementedError:
data = image.tostring()
output = sixel_output_new(lambda data, s: s.write(data), s)
try:
if image.mode == 'RGBA':
dither = sixel_dither_new(256)
sixel_dither_initialize(dither, data, width, height, SIXEL_PIXELFORMAT_RGBA8888)
elif image.mode == 'RGB':
dither = sixel_dither_new(256)
sixel_dither_initialize(dither, data, width, height, SIXEL_PIXELFORMAT_RGB888)
elif image.mode == 'P':
palette = image.getpalette()
dither = sixel_dither_new(256)
sixel_dither_set_palette(dither, palette)
sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_PAL8)
elif image.mode == 'L':
dither = sixel_dither_get(SIXEL_BUILTIN_G8)
sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_G8)
elif image.mode == '1':
dither = sixel_dither_get(SIXEL_BUILTIN_G1)
sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_G1)
else:
raise RuntimeError('unexpected image mode')
try:
sixel_encode(data, width, height, 1, dither, output)
print(s.getvalue().decode('ascii'))
finally:
sixel_dither_unref(dither)
finally:
sixel_output_unref(output)
def pdf_images(pdf, strategy):
print(f'0/{pdf.page_count} pages analyzed...', end='')
image_extractors = []
for (idx, page) in enumerate(pdf):
xref = get_displayed_image_xref(page)
if xref is not None and is_single_image(page):
image_extractors.append(lambda p=pdf, x=xref: extract_image(p, x))
else:
page_images = page.get_image_info()
print(f'\nPage {idx+1}: {len(page_images)} images, {len(relevant_blocks(page))} total relevant objects')
choice = strategy
while True:
if choice.lower().startswith('n'):
return None
if choice.lower().startswith('c'):
if choice == strategy:
print(f'Converting page {idx+1}')
image_extractors.append(lambda p=page: { 'ext': 'png', 'image': p.get_pixmap(dpi=PDF_CONVERSION_DPI).tobytes('png') })
break
if xref is not None and (choice.lower().startswith('x') or choice.lower() == 'extract'):
if choice == strategy:
print(f'Extracting image from page {idx+1} without text')
image_extractors.append(lambda p=pdf, x=xref: extract_image(p, x))
break
if choice.lower().startswith('d'):
if choice == strategy:
print(f'Dropping page {idx+1}')
break
if choice.lower().startswith('s'):
display_sixel_page(page)
choice = input(f'[N]ope out / [c]onvert page{"" if xref is None else " / e[x]tract image"} / [d]rop page / [s]how page? [n/c{"" if xref is None else "/x"}/d/s] ')
print(f'\x1b[2K\r{idx+1}/{pdf.page_count} pages analyzed...', end=('' if idx+1 < pdf.page_count else '\n'))
return (extractor() for extractor in image_extractors)
def nfc(s):
return unicodedata.normalize('NFC', s)
def nname(entry):
return nfc(entry.name)
def complete_prefix_number_ordering(entries):
if len(entries) == 1:
return entries
entries_by_version = {}
for entry in entries:
version_code = 0
for (i, version) in enumerate(ALT_VERSIONS):
if version in nname(entry):
version_code |= (1 << i)
entries_by_version.setdefault(version_code, []).append(entry)
numberings_by_version = {ver: unique_hierarchical_prefix_numbering(entries_by_version[ver]) for ver in entries_by_version}
unified_indices = set()
for numbering in numberings_by_version.values():
if numbering is None:
return None
unified_indices |= set(numbering.keys())
unified_indices.discard(None)
unified_indices = list(unified_indices)
unified_indices.sort()
min_delta_by_level = {}
if len(unified_indices) > 1:
for i in range(1, len(unified_indices)):
cur = unified_indices[i]
prev = unified_indices[i-1]
for level in range(min(len(cur), len(prev))):
if cur[level] != prev[level] and not (cur[level] == 5 and prev[level] == 0):
delta = cur[level] - prev[level]
min_delta_by_level[level] = min(min_delta_by_level.get(level, delta), delta)
if any(delta > 2 for delta in min_delta_by_level.values()):
return None
unified_indices.append(None)
versions = list(numberings_by_version.keys())
versions.sort()
version_lengths = {ver: len(numberings_by_version[ver]) for ver in numberings_by_version}
inner_versions = []
outer_versions = [versions[0]]
for ver in versions[1:]:
if version_lengths[ver] >= version_lengths[versions[0]] - 2:
outer_versions.append(ver)
else:
inner_versions.append(ver)
result = []
for out_ver in outer_versions:
for i in unified_indices:
for ver in ([out_ver] + (inner_versions if out_ver == versions[0] else [])):
result += numberings_by_version[ver].get(i, [])
return result
def unique_hierarchical_prefix_numbering(entries, start_point=0):
if len(entries) == 1 and not NUMBER_REGEX.search(nname(entries[0])):
return {None: entries}
debug(f'Finding unique hierarchical prefix ordering from start point {start_point} for {entries}')
longest_entry = max(entries, key=lambda e: len(nname(e)))
matches = reversed(list(NUMBER_REGEX.finditer(nname(longest_entry))))
for m in matches:
pos = m.start()
if pos < start_point:
return None
prefix = nname(longest_entry)[:pos]
debug(f'Checking prefix {prefix}')
if all(nname(e).startswith(prefix) or prefix.startswith(nfc(e.stem)) for e in entries):
numbering = {}
for e in entries:
if pos >= len(nfc(e.stem)):
i = 0
else:
n = NUMBER_REGEX.match(nname(e)[pos:])
if n is None:
return None
i = int(n.group())
numbering.setdefault((i,), []).append(e)
indices = list(numbering.keys())
for idx in indices:
if len(numbering[idx]) > 1:
ents_idx = numbering.pop(idx)
debug(f'Index {idx} has multiple entries')
longest = max(ents_idx, key=lambda e: len(nname(e)))
next_match = NUMBER_REGEX.match(nname(longest)[pos:])
if not next_match:
return None
next_layer_start = pos + next_match.end()
sub_numbering = unique_hierarchical_prefix_numbering(ents_idx, start_point=next_layer_start) or alphabetic_numbering(ents_idx, next_layer_start)
if not sub_numbering:
return None
for sub_idx in sub_numbering:
numbering[(*idx, *sub_idx)] = sub_numbering[sub_idx]
return numbering
return None
def alphabetic_numbering(entries, start_point):
debug(f'Finding alphabetic numbering from start point {start_point} for {entries}')
alphabetized = {}
for entry in entries:
ending = nfc(entry.stem)[start_point:].strip(' -_()')
debug(f'{entry} has ending {ending}')
if len(ending) > 1:
debug('Ending is more than one character, giving up')
return None
index = 0 if ending == '' else ord(ending.lower()) - ord('a') + 1
if index < 0 or index > 26:
debug('Ending is not a letter, giving up')
return None
if (index,) in alphabetized:
debug(f'Index value {index} is already present, giving up')
return None
alphabetized[(index,)] = [entry]
return alphabetized
def check_extension(path, exts):
return path.suffix.lower() in exts
def is_pdf(path):
return check_extension(path, ['.pdf'])
def is_image(path):
return check_extension(path, IMAGE_FILE_EXTENSIONS)
def ignoreable(path):
return path.name in IGNOREABLE_FILES or check_extension(path, IGNOREABLE_EXTENSIONS)
def ls_ignore(directory, exclude):
return [
path for path in directory.iterdir()
if not ignoreable(path) and path not in exclude
]
def descendant_files_ignore(path, exclude):
if path.is_file():
return [path]
result = []
for item in ls_ignore(path, exclude):
if item.is_dir():
result.extend(descendant_files_ignore(item, exclude))
else:
result.append(item)
return result
def standalone_image_size(filepath):
with Image.open(filepath) as im:
return im.size
def pdf_image_sizes(filepath):
sizes_by_xref = {}
with fitz.open(filepath) as pdf:
for page in pdf:
for (xref, _, width, height, *_) in page.get_images():
if xref in sizes_by_xref:
continue
sizes_by_xref[xref] = (width, height)
return list(sizes_by_xref.values())
def median(items):
if len(items) == 0:
return None
items.sort()
return items[len(items) // 2]
def mean(items):
if len(items) == 0:
return None
return sum(items) / len(items)
def superior_or_equal(a, b):
return len(a) >= len(b) and all(a[i] >= b[i] for i in range(len(b)))
def parse_expressions(tokens):
groups = []
exclusions = []
while tokens:
token = tokens.pop(0)
if token == '!':
exclusions.extend(parse_exclusion(tokens))
elif token == '(':
groups.append(parse_group(tokens))
else:
groups.append([token])
return (groups, exclusions)
def parse_exclusion(tokens):
token = tokens.pop(0)
if token == '(':
return parse_group(tokens)
else:
return [token]
def parse_group(tokens):
items = []
while True:
token = tokens.pop(0)
if token == ')':
return items
else:
items.append(token)
def normalize_to(path, ref):
return ref / Path(relpath(path, ref))
def fmt_size(s):
return f'{s[0]}x{s[1]}px'
def analyze(args):
extract_dir = args.destdir / 'extract'
files = descendant_files_ignore(extract_dir / args.work_id, [])
files.sort()
for f in files:
print(f'{relpath(f, extract_dir)}', end='')
if is_image(f):
size = standalone_image_size(f)
print(f'\t{fmt_size(size)}')
elif is_pdf(f):
sizes = pdf_image_sizes(f)
if len(sizes) == 0:
print('\tContains no images')
else:
print(f'\t{len(sizes)} images, median {fmt_size(median(sizes))}, min {fmt_size(min(sizes))}, max {fmt_size(max(sizes))}')
else:
print()
def metadata(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
if args.virtual is not None:
cur.execute("UPDATE works SET virtual = ? WHERE id = ?", (1 if args.virtual else 0, args.work_id))
con.commit()
res = cur.execute(
"SELECT title, circle, date, description, series, virtual FROM works WHERE id = ?",
(args.work_id,),
).fetchone()
if res is None:
print(f'Work id {args.work_id} not found!')
return
(title, circle, date, description, series, virtual) = res
print(f'Work ID: {args.work_id}')
print(f'Title: {title}')
print(f'Circle: {circle}')
print(f'Pub date: {date}')
print(f'Description: {description}')
print(f'Series: {series}')
print(f'Virtual: {"Yes" if virtual == 1 else "No"}')
con.close()
def copy_recursive(src, dest):
dest.mkdir(parents=True, exist_ok=True)
for item in src.iterdir():
if item.is_dir() and not item.is_symlink():
copy_recursive(item, dest / item.name)
else:
shutil.copyfile(item, dest / item.name)
memoized_similarities = {}
def similarity(a, b):
if len(a) < len(b) or (len(a) == len(b) and a < b):
shorter = a
longer = b
else:
shorter = b
longer = a
if len(shorter) == 0:
return 0
if (shorter, longer) in memoized_similarities:
return memoized_similarities[(shorter, longer)]
options = [similarity(shorter[1:], longer)]
for i in range(1, len(shorter)+1):
match_idx = longer.find(shorter[:i])
if match_idx == -1:
break
options.append(i*i + similarity(shorter[i:], longer[match_idx+i:]))
result = max(options)
memoized_similarities[(shorter, longer)] = result
return result
def top(items, n, key, overflow=0):
winners = []
for item in items:
score = key(item)
if len(winners) < n or score >= winners[-1][1]:
for i in range(len(winners) + 1):
if i == len(winners) or score >= winners[i][1]:
winners.insert(i, (item, score))
break
while len(winners) > n and winners[-1][1] < winners[n-1][1]:
winners.pop()
# shuffle followed by stable sort to randomly shuffle within each score tier
random.shuffle(winners)
winners.sort(key=lambda w: w[1], reverse=True)
return [item for (item, score) in winners[:n+overflow]]
def generate(args):
jenv = Environment(
loader=PackageLoader("dlibrary"),
autoescape=select_autoescape()
)
viewer_template = jenv.get_template("viewer.html")
list_template = jenv.get_template("list.html")
categorization_template = jenv.get_template("categorization.html")
work_template = jenv.get_template("work.html")
index_template = jenv.get_template("index.html")
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
site_dir = args.destdir / 'site'
collated_work_ids = {p.name for p in (site_dir / 'images').iterdir()}
works = []
for (work_id, title, circle, date, description, series) in cur.execute('SELECT id, title, circle, date, description, series FROM works ORDER BY date DESC').fetchall():
if work_id not in collated_work_ids:
continue
authors = [author for (author,) in cur.execute('SELECT author FROM authors WHERE work = ?', (work_id,))]
tags = [tag for (tag,) in cur.execute('SELECT tag FROM tags WHERE work = ?', (work_id,))]
images = [path.name for path in (site_dir / 'images' / work_id).iterdir()]
images.sort()
try:
thumbnail_path = relpath(next(
f for f in (site_dir / 'thumbnails').iterdir() if f.stem == work_id
), site_dir)
except StopIteration:
thumbnail_path = f'images/{work_id}/{images[0]}'
work = {
'id': work_id,
'title': title,
'circle': circle,
'date': date,
'description': description,
'series': series,
'authors': authors,
'tags': tags,
'thumbnail_path': thumbnail_path,
'images': images,
}
works.append(work)
for (idx, work) in enumerate(works):
def suggestion_priority(other_work):
if other_work is work:
return -2
if work['series'] and work['series'] == other_work['series']:
return -1
return similarity(work['title'], other_work['title'])
suggested = top(works, SUGGESTED_WORKS_COUNT, suggestion_priority)
work_dir = site_dir / 'works' / work['id']
viewer_dir = work_dir / 'view'
viewer_dir.mkdir(parents=True, exist_ok=True)
with open(work_dir / 'index.html', 'w') as f:
f.write(work_template.render(depth=2, work=work, title=work['title'], suggested=suggested))
with open(viewer_dir / 'index.html', 'w') as f:
f.write(viewer_template.render(depth=3, work=work, title=work['title']))
print(f'\x1b[2K\r{idx+1}/{len(works)} works processed...', end=('' if idx+1 < len(works) else '\n'))
def make_categorization(categorization, query, work_filter, work_style_cards=False):
categorization_dir = site_dir / categorization
cats = [cat for (cat,) in cur.execute(query)]
cat_samples = {}
for cat in cats:
cat_works = list(filter(work_filter(cat), works))
cat_samples[cat] = cat_works[0] if len(cat_works) > 0 else None
safeish_cat = cat.replace('/', ' ')
cat_dir = categorization_dir / safeish_cat
cat_dir.mkdir(parents=True, exist_ok=True)
with open(cat_dir / 'index.html', 'w') as f:
f.write(list_template.render(
depth=2,
works=cat_works,
title=cat,
categorization=categorization,
))
categorization_dir.mkdir(parents=True, exist_ok=True)
with open(categorization_dir / 'index.html', 'w') as f:
f.write(categorization_template.render(
depth=1,
categorization=categorization,
categories=cats,
samples=cat_samples,
work_style_cards=work_style_cards,
))
make_categorization(
'authors',
'SELECT DISTINCT author FROM authors ORDER BY author',
lambda author: lambda work: author in work['authors'],
)
make_categorization(
'tags',
'SELECT DISTINCT tag FROM tags ORDER BY tag',
lambda tag: lambda work: tag in work['tags'],
)
make_categorization(
'circles',
'SELECT DISTINCT circle FROM works WHERE circle NOT NULL ORDER BY circle',
lambda circle: lambda work: work['circle'] == circle,
)
make_categorization(
'series',
'SELECT DISTINCT series FROM works WHERE series NOT NULL ORDER BY series',
lambda series: lambda work: work['series'] == series,
work_style_cards=True,
)
with resources.as_file(resources.files("dlibrary")) as r:
copy_recursive(r / 'static', site_dir / 'static')
with open(site_dir / 'index.html', 'w') as f:
f.write(index_template.render(depth=0, works=works))
con.close()
argparser = argparse.ArgumentParser(
prog='dlibrary',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
Organize DRM-free works purchased from DLSite into a library
that can be viewed in a web browser.
Intended workflow:
- `extract` a collection of archive files into DLibrary's data
directory, automatically giving each work its own subfolder.
- `fetch` metadata and thumbnail images for extracted works
from DLSite.
- `collate` extracted works, producing a single sequence of
image files (or symlinks into the extracted data, when
possible) for each work.
- Manually adjust works' `metadata` when necessary.
- `generate` a static website providing a catalog and viewer
for all collated works.
"""),
)
argparser.add_argument(
'-d', '--destdir',
type=Path,
default=Path(os.getenv('DLIBRARY_DIR', './dlibrary')),
help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)',
)
argparser.add_argument(
'-D', '--debug',
action='store_true',
help='print out debugging info',
)
argparser.add_argument(
'-l', '--locale',
type=str,
default=os.getenv('DLIBRARY_LOCALE', 'en_US'),
help=('preferred locale for requesting metadata and collating (e.g. "ja_JP", "en_US"). '
'May still fall back to Japanese if other languages are unavailable. '
'(default: $DLIBRARY_LOCALE or en_US)'),
)
argparser.add_argument(
'-a', '--auto',
action='store_true',
help='automatically continue the extract->fetch->collate->generate pipeline starting from whatever subcommand is being run',
)
subparsers = argparser.add_subparsers(title="subcommands", required=True)
parser_extract = subparsers.add_parser('extract', aliases=['x'], help='extract archive files')
parser_extract.add_argument(
'-r', '--remove',
action='store_true',
help='remove original archive files after extraction',
)
parser_extract.add_argument(
'archives',
metavar='FILE',
type=Path,
nargs='+',
help='archive files to extract',
)
parser_extract.set_defaults(func=extract)
parser_fetch = subparsers.add_parser('fetch', aliases=['f'], help='fetch metadata and thumbnails')
parser_fetch.set_defaults(func=fetch)
parser_collate = subparsers.add_parser(
'collate',
aliases=['c'],
help='collate works into sequences of image files',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
For each extracted work that has not already been collated,
DLibrary will attempt to intuit its structure and create
a single ordered list of image files in the site data
directory. Each image will either be a symlink to an image
file in the extraction folder, or a single page extracted
from a PDF file.
DLibrary may fail to automatically collate a work if its
files and subdirectories are not named in a way that
indicates a clear linear ordering. In order to assist with
collation, you can provide a list of expressions specifying
where to start traversing the directory structure, what
files to include in what order, and/or what files to ignore
entirely.
An expression can be:
PATH
A single path. If this is an image, it will be appended to
the sequence of collated images for the work it belongs to;
if this is a PDF, images will be extracted from it and
concatenated to the sequence; if this is a directory, the
contents of the directory will be automatically collated
using DLibrary's default heuristics, and concatenated
to the sequence.
( PATH [PATH ...] )
A group of paths contained in parentheses. You may need to escape
the parentheses to avoid them getting parsed by your shell.
All the paths in this group will be considered together, and
automatically collated using the default heuristics, regardless
of what order the paths are provided in.
! PATH
! ( PATH [PATH ...] )
A path or group of paths to exclude from collation. You may
need to escape the !. If an excluded path appears within any
of the other specified paths, it will be skipped by the collation
heuristics.
If the only expressions provided are negations, then auto-collation
will start from the top level of the extracted work while skipping
the excluded paths.
All provided paths must be under $DLIBRARY_DIR/extract/[work id]/
for some not-yet-collated work. Paths belonging to multiple
different works can all be provided on the same command line, and
expressions will be clustered together by work id while otherwise
preserving the order they were provided in. A parenthesized group
expression must only contain paths belonging to a single work.
By default, DLibrary will attempt to collate every not-yet-collated
work (excluding "virtual" works), using the provided expressions
to assist in collation when available. The `-o` flag will direct
DLibrary to *only* collate works included in the provided expressions,
even if other uncollated works are present.
"""),
)
parser_collate.add_argument(
'-o', '--only-specified-works',
action='store_true',
help="only collate works that are explicitly specified",
)
parser_collate.add_argument(
'-p', '--pdf-strategy',
choices=[
'ask', '?',
'show-ask', 's',
'convert', 'c',
'extract', 'x',
'drop', 'd',
'nope', 'n'
],
default='show-ask',
help="how to handle PDF pages that aren't a single image with no text",
)
parser_collate.add_argument(
'expression',
nargs='*',
help='expressions indicating paths to collate or skip',
)
parser_collate.set_defaults(func=collate)
parser_analyze = subparsers.add_parser('analyze', aliases=['a'], help='analyze an extracted folder to assist in collation')
parser_analyze.add_argument('work_id')
parser_analyze.set_defaults(func=analyze)
parser_metadata = subparsers.add_parser('metadata', aliases=['m'], help='view or modify metadata for a work')
parser_metadata.add_argument('work_id')
parser_metadata.add_argument(
'--virtual',
action=argparse.BooleanOptionalAction,
help='set work as virtual',
)
parser_metadata.set_defaults(func=metadata)
parser_generate = subparsers.add_parser(
'generate',
aliases=['g'],
help='generate HTML/CSS/JS for library site',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
The static site will be generated under $DLIBRARY_DIR/site/
and can be served by pointing an HTTP server at that
directory. Note that some files inside the static site
hierarchy will be symlinks into $DLIBRARY_DIR/extract/
outside the site hierarchy, so make sure your HTTP server
will allow those symlinks to be read.
"""),
)
parser_generate.set_defaults(func=generate)
def main():
args = argparser.parse_args()
global debug_mode
debug_mode = args.debug
args.func(args)
if __name__ == "__main__":
main()