1605 lines
59 KiB
Python
Executable file
1605 lines
59 KiB
Python
Executable file
#!/usr/bin/env python3
|
||
|
||
import argparse
|
||
import asyncio
|
||
import importlib_resources as resources
|
||
from io import BytesIO
|
||
from pathlib import Path
|
||
import os
|
||
from os.path import relpath, splitext, join
|
||
import random
|
||
import re
|
||
import readline
|
||
import shutil
|
||
import sqlite3
|
||
import stat
|
||
import string
|
||
import textwrap
|
||
import time
|
||
import unicodedata
|
||
from urllib.parse import urlparse
|
||
import zipfile
|
||
|
||
import dlsite_async
|
||
import fitz
|
||
from libsixel import *
|
||
from PIL import Image, UnidentifiedImageError
|
||
import PIL.ImageFile
|
||
from jinja2 import Environment, PackageLoader, select_autoescape
|
||
import pyuca
|
||
import rarfile
|
||
import requests
|
||
|
||
NUMBER_REGEX = re.compile('[0-90-9]+')
|
||
ALPHABETIC_NUMBERING_REGEX = re.compile('^(?P<prefix>[^a-za-z0-90-9]*)((?P<letter>[a-za-z])(?P<suffix>[^a-za-z0-90-9]*))?$', re.I)
|
||
|
||
STRING_TOKENIZE_REGEX = re.compile('(?P<str>[^0-90-9]+)|(?P<num>[0-90-9]+)')
|
||
|
||
EXTRA_NORMALIZATION_TABLE = str.maketrans({'\u301c': '\uff5e'})
|
||
|
||
DLSITE_ID_REGEX = re.compile('^[BR]J[0-9]+$')
|
||
FANZA_ID_REGEX = re.compile('^d_[0-9]+$')
|
||
FAKKU_ID_REGEX = re.compile('.*_FAKKU$')
|
||
|
||
HI_RES_REGEX = re.compile('高解像度|原寸|実寸|大サイズ', re.I)
|
||
NO_TONE_REGEX = re.compile('トーン(効果)?[な無]し|グレー?スケ', re.I)
|
||
TONE_REGEX = re.compile('トーン($|版|(効果)?[有あ]り)', re.I)
|
||
COLOR_REGEX = re.compile('カラー', re.I)
|
||
MONOCHROME_REGEX = re.compile('モノクロ', re.I)
|
||
MOSAIC_REGEX = re.compile('モザイク', re.I)
|
||
BLACKBAR_REGEX = re.compile('墨消し', re.I)
|
||
IMAGE_QUALITY_REGEXES = [
|
||
{ 'better': HI_RES_REGEX },
|
||
{ 'better': re.compile('^大|L|L$'), 'worse': re.compile('^小|S|S$') },
|
||
{ 'better': NO_TONE_REGEX, 'worse': TONE_REGEX },
|
||
{ 'better': COLOR_REGEX, 'worse': MONOCHROME_REGEX },
|
||
{ 'better': BLACKBAR_REGEX, 'worse': MOSAIC_REGEX },
|
||
]
|
||
IMAGE_RESOLUTION_REGEX = re.compile('^(?P<x>[0-9]+)x(?P<y>[0-9]+)$')
|
||
|
||
LANGUAGE_REGEXES = {
|
||
'en_US': re.compile('english|英語', re.I),
|
||
'ja_JP': re.compile('日本語', re.I),
|
||
'zh_CN': re.compile('(^|[^體])中文|中国語', re.I),
|
||
'zh_TW': re.compile('繁體中文', re.I),
|
||
'ko_KR': re.compile('한국어', re.I),
|
||
}
|
||
|
||
TEXTLESS_REGEX = re.compile('(台詞|セリフ|せりふ|テキスト|文字|文章)((な|無)し|抜き)|notext|textless', re.I)
|
||
FXLESS_REGEX = re.compile('効果音(な|無)し', re.I)
|
||
FRONT_COVER_REGEX = re.compile('(?<!裏)表紙(?!裏)|(?<!back[-_ ])(?<!back)cover|(?<!ura[-_ ])(?<!ura)hyou?sh?i(?![-_ ]?ura)', re.I)
|
||
BACK_COVER_REGEX = re.compile('裏表紙|hyou?sh?i[-_ ]?ura|ura[-_ ]?hyou?sh?i', re.I)
|
||
BONUS_REGEX = re.compile('設定|キャラ|特典|ポスター|bonus', re.I)
|
||
EPILOGUE_REGEX = re.compile('after|後日談|おまけ|omake|オマケ', re.I)
|
||
AFTERWORD_REGEX = re.compile('あとがき', re.I)
|
||
SPLITS = [
|
||
{ 'later': TEXTLESS_REGEX },
|
||
{ 'later': FXLESS_REGEX },
|
||
{ 'earlier': FRONT_COVER_REGEX, 'later': BACK_COVER_REGEX },
|
||
{ 'later': BONUS_REGEX },
|
||
{ 'later': AFTERWORD_REGEX },
|
||
{ 'later': EPILOGUE_REGEX },
|
||
]
|
||
|
||
ALT_VERSIONS = [
|
||
'褐色',
|
||
'日焼け',
|
||
'pink',
|
||
'金髪',
|
||
'白肌',
|
||
'うつろ目',
|
||
'dark skin',
|
||
'ラバー',
|
||
'ゾンビ肌',
|
||
'マスク',
|
||
'アヘ顔',
|
||
]
|
||
|
||
IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff', '.bmp']
|
||
|
||
IGNOREABLE_FILES = ['Thumbs.db', '__MACOSX', '.DS_Store', 'desktop.ini']
|
||
IGNOREABLE_EXTENSIONS = ['.txt', '.html', '.htm', '.psd', '.mp4']
|
||
|
||
PDF_CONVERSION_DPI = 300
|
||
PDF_PREVIEW_DPI = 72
|
||
|
||
IRRELEVANT_PDF_BLOCK_REGEX = re.compile(r'\bTCPDF\b', re.I)
|
||
|
||
MULTIPART_RAR_HEAD_REGEX = re.compile(r'^(.+)\.part0*1\.exe$', re.I)
|
||
MULTIPART_RAR_TAIL_REGEX = re.compile(r'^(.+)\.part0*([^1]|[^0].+)\.rar$', re.I)
|
||
|
||
PDF_REFERENCED_IMAGE_REGEX = re.compile(r'(^|(?<=\s))/(?P<ref_name>\S+)\s+Do($|(?=\s))')
|
||
PDF_INLINE_IMAGE_REGEX = re.compile(r'(^|\s)(BI|ID|EI)($|\s)')
|
||
|
||
SUGGESTED_WORKS_COUNT = 10
|
||
|
||
READONLY_FILE = stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
|
||
READONLY_DIR = READONLY_FILE | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
|
||
|
||
ANSI_RED = '\x1b[1;31m'
|
||
ANSI_GREEN = '\x1b[1;32m'
|
||
ANSI_YELLOW = '\x1b[1;33m'
|
||
ANSI_GRAY = '\x1b[1;90m'
|
||
ANSI_NORMAL = '\x1b[0m'
|
||
ANSI_LINECLEAR = '\x1b[2K\r'
|
||
|
||
debug_mode = False
|
||
def debug(s):
|
||
if debug_mode:
|
||
print(f'{ANSI_GRAY}{time.strftime("%Y-%m-%d %H:%M:%S")} - {s}{ANSI_NORMAL}')
|
||
|
||
def succ(s):
|
||
print(f'{ANSI_GREEN}{s}{ANSI_NORMAL}')
|
||
|
||
def warn(s):
|
||
print(f'{ANSI_YELLOW}{s}{ANSI_NORMAL}')
|
||
|
||
def err(s):
|
||
print(f'{ANSI_RED}{s}{ANSI_NORMAL}')
|
||
|
||
def count_progress(idx, count, thing):
|
||
if idx + 1 < count:
|
||
pref = ''
|
||
suf = '...'
|
||
end = ''
|
||
else:
|
||
pref = ANSI_GREEN
|
||
suf = ANSI_NORMAL
|
||
end = '\n'
|
||
print(f'{ANSI_LINECLEAR}{pref}{idx+1}/{count} {thing}{suf}', end=end)
|
||
|
||
def open_zipfile_with_encoding(path):
|
||
for enc in ["utf-8", "shift-jis", "shift-jisx0213"]:
|
||
try:
|
||
return zipfile.ZipFile(path, metadata_encoding=enc)
|
||
except UnicodeDecodeError:
|
||
pass
|
||
|
||
err(f'{path} contains filenames with unknown character encoding!')
|
||
exit(1)
|
||
|
||
def open_rarfile_with_encoding(path):
|
||
for enc in ["utf-8", "shift-jis", "shift-jisx0213"]:
|
||
rf = rarfile.RarFile(path, charset=enc)
|
||
if all('<EFBFBD>' not in info.filename for info in rf.infolist()):
|
||
return rf
|
||
|
||
err(f'{path} contains filenames with unknown character encoding!')
|
||
exit(1)
|
||
|
||
def readonly(path):
|
||
for parentdir, dirs, files in os.walk(path, topdown=False):
|
||
for f in files:
|
||
os.chmod(join(parentdir, f), READONLY_FILE, follow_symlinks=False)
|
||
os.chmod(parentdir, READONLY_DIR, follow_symlinks=False)
|
||
|
||
def extract(args):
|
||
absolute_archive_paths = set(path.resolve(strict=True) for path in args.archives)
|
||
|
||
any_skipped = False
|
||
|
||
for archive_path in args.archives:
|
||
if archive_path.suffix.lower() == '.zip':
|
||
work_id = archive_path.stem
|
||
work_extract_path = args.destdir / 'extract' / work_id
|
||
|
||
print(f'Extracting {archive_path} to {work_extract_path}')
|
||
|
||
with open_zipfile_with_encoding(archive_path) as z:
|
||
work_extract_path.mkdir(parents=True)
|
||
z.extractall(path=work_extract_path)
|
||
|
||
readonly(work_extract_path)
|
||
|
||
if args.remove:
|
||
archive_path.unlink()
|
||
|
||
elif rar_match := MULTIPART_RAR_HEAD_REGEX.fullmatch(archive_path.name):
|
||
work_id = rar_match.group(1)
|
||
work_extract_path = args.destdir / 'extract' / work_id
|
||
|
||
print(f'Extracting multipart RAR archive beginning with {archive_path} to {work_extract_path}')
|
||
|
||
with open_rarfile_with_encoding(archive_path) as r:
|
||
volumes = [Path(vol).resolve(strict=True) for vol in r.volumelist()]
|
||
if any(vol not in absolute_archive_paths for vol in volumes):
|
||
print(f'Multipart RAR archive starting with {archive_path} contains volume files not listed on command-line, skipping')
|
||
any_skipped = True
|
||
continue
|
||
work_extract_path.mkdir(parents=True)
|
||
r.extractall(path=work_extract_path)
|
||
|
||
readonly(work_extract_path)
|
||
|
||
if args.remove:
|
||
for vol in volumes:
|
||
vol.unlink()
|
||
|
||
elif MULTIPART_RAR_TAIL_REGEX.fullmatch(archive_path.name):
|
||
pass
|
||
else:
|
||
print(f'Unknown archive file type {archive_path}, skipping')
|
||
any_skipped = True
|
||
|
||
if args.auto and not any_skipped:
|
||
parser_fetch.parse_args(args=[], namespace=args)
|
||
fetch(args)
|
||
|
||
|
||
def manual_input_metadata(work_id):
|
||
print(f"Don't know how to fetch metadata for {work_id}, input manually:")
|
||
|
||
title = input('Title: ')
|
||
circle = input('Circle [None]: ') or None
|
||
authors = [author.strip() for author in input('Authors (comma-separated): ').split(',') if author.strip()]
|
||
tags = [tag.strip() for tag in input('Tags (comma-separated): ').split(',') if tag.strip()]
|
||
date = input('Pub date (yyyy-mm-dd): ')
|
||
description = input('Description: ')
|
||
series = input('Series [None]: ') or None
|
||
|
||
return {
|
||
"id": work_id,
|
||
"title": title,
|
||
"circle": circle,
|
||
"authors": authors,
|
||
"tags": tags,
|
||
"date": date,
|
||
"description": description,
|
||
"series": series,
|
||
}
|
||
|
||
async def fetch_async(args):
|
||
any_warnings = False
|
||
|
||
con = sqlite3.connect(args.destdir / 'meta.db')
|
||
cur = con.cursor()
|
||
|
||
cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)")
|
||
cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id) ON DELETE CASCADE, PRIMARY KEY(author, work))")
|
||
cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id) ON DELETE CASCADE, PRIMARY KEY(tag, work))")
|
||
|
||
thumbnails_dir = args.destdir / 'site' / 'thumbnails'
|
||
thumbnails_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
async with dlsite_async.DlsiteAPI(locale=args.locale) as api:
|
||
for work_path in (args.destdir / 'extract').iterdir():
|
||
work_id = work_path.name
|
||
|
||
res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,))
|
||
if res.fetchone() is not None:
|
||
continue
|
||
|
||
if DLSITE_ID_REGEX.fullmatch(work_id):
|
||
print(f'Fetching DLSite metadata for {work_id}')
|
||
dlsite_metadata = await api.get_work(work_id)
|
||
if dlsite_metadata.work_type not in [dlsite_async.WorkType.MANGA, dlsite_async.WorkType.CG_ILLUSTRATIONS]:
|
||
warn(f'Work {work_id} is not a manga or CG set, skipping')
|
||
any_warnings = True
|
||
continue
|
||
db_row = {
|
||
"id": work_id,
|
||
"title": dlsite_metadata.work_name,
|
||
"circle": dlsite_metadata.circle,
|
||
"date": dlsite_metadata.regist_date.date().isoformat(),
|
||
"description": dlsite_metadata.description,
|
||
"series": dlsite_metadata.title_name,
|
||
}
|
||
authors = dlsite_metadata.author or []
|
||
tags = dlsite_metadata.genre or []
|
||
thumbnail_url = dlsite_metadata.work_image
|
||
if thumbnail_url.startswith('//'):
|
||
thumbnail_url = 'https:' + thumbnail_url
|
||
else:
|
||
db_row = manual_input_metadata(work_id)
|
||
authors = db_row.pop('authors')
|
||
tags = db_row.pop('tags')
|
||
if FANZA_ID_REGEX.fullmatch(work_id):
|
||
candidate_urls = [
|
||
f'https://doujin-assets.dmm.co.jp/digital/{work_type}/{work_id}/{work_id}pl.jpg'
|
||
for work_type in ['comic', 'cg']
|
||
]
|
||
thumbnail_url = None
|
||
for url in candidate_urls:
|
||
h = requests.head(url, allow_redirects=False)
|
||
if h.status_code == 200:
|
||
thumbnail_url = url
|
||
break
|
||
elif FAKKU_ID_REGEX.fullmatch(work_id):
|
||
thumbnail_url = None
|
||
else:
|
||
thumbnail_url = input('Thumbnail image URL [default: first page]: ')
|
||
|
||
cur.execute(
|
||
"INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)",
|
||
db_row,
|
||
)
|
||
cur.executemany(
|
||
"INSERT INTO authors VALUES(:author, :work)",
|
||
[{ "author": author, "work": work_id } for author in authors],
|
||
)
|
||
cur.executemany(
|
||
"INSERT INTO tags VALUES(:tag, :work)",
|
||
[{ "tag": tag, "work": work_id } for tag in tags],
|
||
)
|
||
|
||
if thumbnail_url:
|
||
ext = url_file_ext(thumbnail_url)
|
||
dest_file = thumbnails_dir / (work_id + ext)
|
||
print(f'Downloading thumbnail for {work_id} from {thumbnail_url}')
|
||
with open(dest_file, 'wb') as fd:
|
||
with requests.get(thumbnail_url, stream=True) as r:
|
||
for chunk in r.iter_content(chunk_size=16384):
|
||
fd.write(chunk)
|
||
|
||
con.commit()
|
||
|
||
con.close()
|
||
|
||
return any_warnings
|
||
|
||
def url_file_ext(url):
|
||
return splitext(urlparse(url).path)[1]
|
||
|
||
def fetch(args):
|
||
any_warnings = asyncio.run(fetch_async(args))
|
||
if args.auto and not any_warnings:
|
||
parser_collate.parse_args(args=[], namespace=args)
|
||
collate(args)
|
||
|
||
|
||
def self_and_parents(path):
|
||
return [path] + list(path.parents)
|
||
|
||
def collate(args):
|
||
extraction_dir = args.destdir / 'extract'
|
||
|
||
def extracted_path_work_id(path):
|
||
trail = self_and_parents(Path(relpath(path, extraction_dir)))
|
||
if len(trail) < 2:
|
||
return None
|
||
result = trail[-2].name
|
||
if result == '..':
|
||
return None
|
||
return result
|
||
|
||
(raw_groups, raw_exclusions) = parse_expressions(args.expression)
|
||
|
||
specified_works = set()
|
||
works_groups = {}
|
||
for group in raw_groups:
|
||
if len(group) == 0:
|
||
continue
|
||
work_id = extracted_path_work_id(group[0])
|
||
if not work_id:
|
||
print(f'Group {group} contains paths outside an extracted work!')
|
||
exit(1)
|
||
if not all(extracted_path_work_id(item) == work_id for item in group[1:]):
|
||
print(f'Group {group} contains paths from multiple works!')
|
||
exit(1)
|
||
specified_works.add(work_id)
|
||
if work_id not in works_groups:
|
||
works_groups[work_id] = []
|
||
normalized_paths = [normalize_to(item, args.destdir) for item in group]
|
||
if not all(path.exists() for path in normalized_paths):
|
||
print(f'Group {group} contains nonexistent paths!')
|
||
exit(1)
|
||
works_groups[work_id].append(normalized_paths)
|
||
|
||
exclusions = []
|
||
for exclusion in raw_exclusions:
|
||
work_id = extracted_path_work_id(exclusion)
|
||
if not work_id:
|
||
print(f'Excluded path {exclusion} does not belong to an extracted work!')
|
||
exit(1)
|
||
specified_works.add(work_id)
|
||
normalized_path = normalize_to(exclusion, args.destdir)
|
||
if not normalized_path.exists():
|
||
print(f'Excluded path {exclusion} does not exist!')
|
||
exit(1)
|
||
exclusions.append(normalized_path)
|
||
|
||
collation_staging_area = args.destdir / 'site' / 'images-staging'
|
||
collation_staging_area.mkdir(parents=True)
|
||
|
||
collation_area = args.destdir / 'site' / 'images'
|
||
collation_area.mkdir(parents=True, exist_ok=True)
|
||
|
||
con = sqlite3.connect(args.destdir / 'meta.db')
|
||
cur = con.cursor()
|
||
|
||
any_warnings = False
|
||
|
||
for work_path in extraction_dir.iterdir():
|
||
work_id = work_path.name
|
||
|
||
if args.only_specified_works and work_id not in specified_works:
|
||
continue
|
||
|
||
work_collation_dir = collation_area / work_id
|
||
if work_collation_dir.exists():
|
||
if work_id not in specified_works:
|
||
continue
|
||
if len(list(work_collation_dir.iterdir())) > 0:
|
||
print(f'Collation directory for work {work_id} already exists!')
|
||
any_warnings = True
|
||
break
|
||
else:
|
||
work_collation_dir.rmdir()
|
||
|
||
virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
|
||
if virtual in [(1,), None]:
|
||
if work_id in specified_works:
|
||
print(f'Work {work_id} {"is virtual" if virtual == (1,) else "has no metadata"}!')
|
||
any_warnings = True
|
||
break
|
||
continue
|
||
|
||
work_staging_dir = collation_staging_area / work_id
|
||
|
||
collator = Collator(work_staging_dir, exclusions, args)
|
||
for group in works_groups.get(work_id, [[work_path]]):
|
||
collation_result = collator.collate_from_paths([item for item in group if item not in exclusions])
|
||
if not collation_result:
|
||
print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}')
|
||
break
|
||
|
||
if collation_result and collator.index > 0:
|
||
succ(f'Collated {collator.index} pages for {work_id}')
|
||
work_staging_dir.rename(work_collation_dir)
|
||
else:
|
||
if work_staging_dir.is_dir():
|
||
for f in work_staging_dir.iterdir():
|
||
f.unlink()
|
||
work_staging_dir.rmdir()
|
||
|
||
if not collation_result:
|
||
warn(f'Unable to deduce file structure for {work_id}, skipping')
|
||
elif collator.index == 0:
|
||
warn(f'No files found for {work_id}, skipping')
|
||
|
||
any_warnings = True
|
||
|
||
collation_staging_area.rmdir()
|
||
con.close()
|
||
|
||
if args.auto and not any_warnings:
|
||
parser_generate.parse_args(args=[], namespace=args)
|
||
generate(args)
|
||
|
||
class Collator:
|
||
def __init__(self, dest, exclude, args):
|
||
self.dest = dest
|
||
self.exclude = exclude
|
||
self.args = args
|
||
self.index = 0
|
||
|
||
def collate_from_paths(self, srcs):
|
||
srcs = [src for src in srcs if len(descendant_files_ignore(src, self.exclude)) > 0]
|
||
|
||
if len(srcs) == 1 and srcs[0].is_dir():
|
||
return self.collate_from_paths(ls_ignore(srcs[0], self.exclude))
|
||
|
||
if len(srcs) == 1 and is_pdf(srcs[0]):
|
||
print(f'Extracting images from {srcs[0]}')
|
||
return self.link_pdf(srcs[0])
|
||
|
||
if len(srcs) == 0:
|
||
return True
|
||
|
||
if all(src.is_dir() and nname(src) == nname(srcs[0]) for src in srcs):
|
||
debug(f'Merging unicode-fucked directories for {srcs[0]}')
|
||
return self.collate_from_paths([item for src in srcs for item in ls_ignore(src, self.exclude)])
|
||
|
||
debug(f'Auto-collating {srcs}')
|
||
|
||
select_language = self.try_collate_select_language(srcs)
|
||
if select_language is not False:
|
||
return select_language
|
||
|
||
dirs = [src for src in srcs if src.is_dir()]
|
||
non_dirs = [src for src in srcs if not src.is_dir()]
|
||
if len(dirs) == 2 and len(descendant_files_ignore(dirs[0], self.exclude)) == len(descendant_files_ignore(dirs[1], self.exclude)):
|
||
debug(f'Checking for image quality references between dirs {dirs[0]} and {dirs[1]}')
|
||
resolution_matches = [IMAGE_RESOLUTION_REGEX.match(nname(src)) for src in dirs]
|
||
if all(resolution_matches):
|
||
debug(f'Directory names are resolutions')
|
||
pairs = [(int(m.group('x')), int(m.group('y'))) for m in resolution_matches]
|
||
for i in range(2):
|
||
if pairs[i][0] > pairs[1-i][0] and pairs[i][1] > pairs[1-i][1]:
|
||
return self.collate_from_paths([dirs[i]] + non_dirs)
|
||
debug(f'Checking image quality regexes')
|
||
for quality in IMAGE_QUALITY_REGEXES:
|
||
def a_not_b(a, b, src):
|
||
if a in quality:
|
||
return quality[a].search(nname(src))
|
||
else:
|
||
return not quality[b].search(nname(src))
|
||
better_dirs = [src for src in dirs if a_not_b('better', 'worse', src)]
|
||
worse_dirs = [src for src in dirs if a_not_b('worse', 'better', src)]
|
||
debug('Regex results')
|
||
debug(f'Better: {better_dirs}')
|
||
debug(f'Worse: {worse_dirs}')
|
||
if len(better_dirs) == 1 and len(worse_dirs) == 1 and better_dirs[0] != worse_dirs[0]:
|
||
return self.collate_from_paths(better_dirs + non_dirs)
|
||
|
||
images_vs_pdf = self.try_collate_images_vs_pdf(srcs)
|
||
if images_vs_pdf is not False:
|
||
return images_vs_pdf
|
||
|
||
for regexes in SPLITS:
|
||
split_attempt = self.try_collate_split_regex(srcs, **regexes)
|
||
if split_attempt is not False:
|
||
return split_attempt
|
||
|
||
if all(src.is_file() and is_image(src) for src in srcs):
|
||
debug('Attempting to detect ordering for image files')
|
||
ordering = complete_prefix_number_ordering(srcs)
|
||
if not ordering and self.args.sort:
|
||
ordering = srcs.copy()
|
||
ordering.sort(key=best_effort_sort_key)
|
||
debug(f'Applying best-effort sort: {ordering}')
|
||
if ordering:
|
||
print(f'Symlinking image files: {ordering[0]}...')
|
||
return self.link_ordered_files(ordering)
|
||
else:
|
||
return None
|
||
|
||
debug('Unable to collate available file types:')
|
||
debug(f'Images: {[src for src in srcs if src.is_file() and is_image(src)]}')
|
||
debug(f'PDFs: {[src for src in srcs if src.is_file() and is_pdf(src)]}')
|
||
debug(f'Directories: {[src for src in srcs if src.is_dir()]}')
|
||
debug(f'Unknown files: {[src for src in srcs if src.is_file() and not is_image(src) and not is_pdf(src)]}')
|
||
return None
|
||
|
||
def link_pdf(self, src):
|
||
with fitz.open(src) as pdf:
|
||
image_extractors = pdf_image_extractors(pdf, self.args.pdf_strategy)
|
||
if image_extractors is None:
|
||
print(f'Failed to enumerate page images in PDF {src}')
|
||
return None
|
||
|
||
self.dest.mkdir(parents=True, exist_ok=True)
|
||
|
||
print(f'0 pages collated...', end='')
|
||
for (idx, extractor) in enumerate(image_extractors, start=self.index):
|
||
image = extractor()
|
||
file_path = self.dest / f'{idx:04d}.{image["ext"]}'
|
||
with open(file_path, 'wb') as f:
|
||
f.write(image["image"])
|
||
count_progress(idx - self.index, len(image_extractors), 'pages collated')
|
||
|
||
self.index += pdf.page_count
|
||
return True
|
||
|
||
def link_ordered_files(self, ordering):
|
||
self.dest.mkdir(parents=True, exist_ok=True)
|
||
|
||
for (idx, src_path) in enumerate(ordering, start=self.index):
|
||
ext = src_path.suffix.lower()
|
||
link_path = self.dest / f'{idx:04d}{ext}'
|
||
link_path.symlink_to(relpath(src_path, self.dest))
|
||
|
||
self.index += len(ordering)
|
||
return True
|
||
|
||
def try_collate_split_regex(self, srcs, earlier=None, later=None):
|
||
early_srcs = []
|
||
middle_srcs = []
|
||
late_srcs = []
|
||
for src in srcs:
|
||
if earlier and earlier.search(nname(src)):
|
||
early_srcs.append(src)
|
||
elif later and later.search(nname(src)):
|
||
late_srcs.append(src)
|
||
else:
|
||
middle_srcs.append(src)
|
||
|
||
if sum(1 for l in [early_srcs, middle_srcs, late_srcs] if l) <= 1:
|
||
return False
|
||
|
||
debug(f'Splitting sources based on regex: {[early_srcs, middle_srcs, late_srcs]}')
|
||
|
||
early_page_collation = self.collate_from_paths(early_srcs)
|
||
if early_page_collation is None:
|
||
return None
|
||
|
||
middle_page_collation = self.collate_from_paths(middle_srcs)
|
||
if middle_page_collation is None:
|
||
return None
|
||
|
||
late_page_collation = self.collate_from_paths(late_srcs)
|
||
if late_page_collation is None:
|
||
return None
|
||
|
||
return True
|
||
|
||
def try_collate_images_vs_pdf(self, srcs):
|
||
pdfs = [src for src in srcs if 'pdf' in src.name.lower()]
|
||
if len(pdfs) != 1:
|
||
return False
|
||
outer_pdf = pdfs[0]
|
||
|
||
inner_pdfs = [f for f in descendant_files_ignore(outer_pdf, self.exclude) if is_pdf(f)]
|
||
if len(inner_pdfs) != 1:
|
||
return False
|
||
inner_pdf = inner_pdfs[0]
|
||
|
||
non_pdf_srcs = [src for src in srcs if src != outer_pdf]
|
||
images = []
|
||
non_images = []
|
||
descendant_files = [f for src in non_pdf_srcs for f in descendant_files_ignore(src, self.exclude)]
|
||
for f in descendant_files:
|
||
if is_image(f):
|
||
images.append(f)
|
||
else:
|
||
non_images.append(f)
|
||
break
|
||
|
||
if len(non_images) != 0 or len(images) == 0:
|
||
return False
|
||
|
||
debug(f'Comparing PDF {inner_pdf} and images {images}')
|
||
|
||
pdf_sizes = pdf_image_sizes(inner_pdf)
|
||
standalone_sizes = [standalone_image_size(f) for f in images]
|
||
|
||
median_pdf_size = median(pdf_sizes)
|
||
median_standalone_size = median(standalone_sizes)
|
||
if not (median_pdf_size and median_standalone_size):
|
||
return False
|
||
|
||
debug(f'PDF: {len(pdf_sizes)} images, {median_pdf_size}; standalone: {len(standalone_sizes)} images, median {median_standalone_size}')
|
||
|
||
if abs(len(pdf_sizes) - len(standalone_sizes)) > 2:
|
||
with fitz.open(inner_pdf) as pdf:
|
||
pdf_page_count = len(pdf)
|
||
height_adjusted_pdf_image_count = (
|
||
len(pdf_sizes) *
|
||
mean([size[1] for size in pdf_sizes]) / mean([size[1] for size in standalone_sizes])
|
||
)
|
||
if (
|
||
abs(pdf_page_count - len(standalone_sizes)) <= 2 and
|
||
len(pdf_sizes) > len(standalone_sizes) and
|
||
median_pdf_size[0] == median_standalone_size[0] and
|
||
abs(height_adjusted_pdf_image_count - len(standalone_sizes)) <= 2
|
||
):
|
||
return self.collate_from_paths(non_pdf_srcs)
|
||
else:
|
||
return False
|
||
|
||
if superior_or_equal(median_standalone_size, median_pdf_size):
|
||
return self.collate_from_paths(non_pdf_srcs)
|
||
elif superior_or_equal(median_pdf_size, median_standalone_size):
|
||
return self.collate_from_paths([outer_pdf])
|
||
else:
|
||
return False
|
||
|
||
def try_collate_select_language(self, srcs):
|
||
if self.args.locale not in LANGUAGE_REGEXES:
|
||
return False
|
||
if not all(any(lang.search(nname(src)) for lang in LANGUAGE_REGEXES.values()) for src in srcs):
|
||
return False
|
||
|
||
debug('Detected multiple language options, selecting preferred language')
|
||
srcs_matching_language = [src for src in srcs if LANGUAGE_REGEXES[self.args.locale].search(nname(src))]
|
||
if len(srcs_matching_language) == len(srcs) or len(srcs_matching_language) == 0:
|
||
return False
|
||
|
||
return self.collate_from_paths(srcs_matching_language)
|
||
|
||
def block_is_image(block):
|
||
return block[6] == 1
|
||
|
||
def block_text(block):
|
||
return block[4]
|
||
|
||
def block_relevant(block):
|
||
return block_is_image(block) or not IRRELEVANT_PDF_BLOCK_REGEX.search(block_text(block))
|
||
|
||
def relevant_blocks(page):
|
||
blocks = page.get_text('blocks')
|
||
return [block for block in blocks if block_relevant(block)]
|
||
|
||
def is_single_image(page):
|
||
blocks = relevant_blocks(page)
|
||
return len(blocks) == 1 and block_is_image(blocks[0])
|
||
|
||
def extract_image(pdf, xref):
|
||
image = pdf.extract_image(xref)
|
||
if f'.{image["ext"]}' in IMAGE_FILE_EXTENSIONS:
|
||
return image
|
||
print(f'Converting image from {image["ext"]} to png')
|
||
pix = fitz.Pixmap(pdf, xref)
|
||
return { 'ext': 'png', 'image': pix.tobytes('png') }
|
||
|
||
def get_displayed_image_xref(page):
|
||
ref_names = []
|
||
for content_xref in page.get_contents():
|
||
content = page.parent.xref_stream(content_xref).decode('ascii', 'replace')
|
||
if PDF_INLINE_IMAGE_REGEX.search(content):
|
||
debug('Inline image detected')
|
||
return None
|
||
for m in PDF_REFERENCED_IMAGE_REGEX.finditer(content):
|
||
ref_names.append(m.group('ref_name'))
|
||
|
||
if len(ref_names) == 0:
|
||
debug('Page does not reference any xobjects')
|
||
return None
|
||
if len(ref_names) > 1:
|
||
debug(f'Page references multiple xobjects: {ref_names}')
|
||
return None
|
||
|
||
image_xrefs = [image[0] for image in page.get_images() if image[7] == ref_names[0]]
|
||
if len(image_xrefs) == 1:
|
||
return image_xrefs[0]
|
||
|
||
if len(image_xrefs) == 0:
|
||
debug(f'No images found matching ref name {ref_names[0]}')
|
||
else:
|
||
debug(f"Multiple images found matching ref name {ref_names[0]}, that probably shouldn't happen")
|
||
return None
|
||
|
||
def display_sixel_pixmap(pixmap_bytes):
|
||
s = BytesIO()
|
||
image = Image.open(BytesIO(pixmap_bytes))
|
||
width, height = image.size
|
||
|
||
try:
|
||
data = image.tobytes()
|
||
except NotImplementedError:
|
||
data = image.tostring()
|
||
output = sixel_output_new(lambda data, s: s.write(data), s)
|
||
|
||
try:
|
||
if image.mode == 'RGBA':
|
||
dither = sixel_dither_new(256)
|
||
sixel_dither_initialize(dither, data, width, height, SIXEL_PIXELFORMAT_RGBA8888)
|
||
elif image.mode == 'RGB':
|
||
dither = sixel_dither_new(256)
|
||
sixel_dither_initialize(dither, data, width, height, SIXEL_PIXELFORMAT_RGB888)
|
||
elif image.mode == 'P':
|
||
palette = image.getpalette()
|
||
dither = sixel_dither_new(256)
|
||
sixel_dither_set_palette(dither, palette)
|
||
sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_PAL8)
|
||
elif image.mode == 'L':
|
||
dither = sixel_dither_get(SIXEL_BUILTIN_G8)
|
||
sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_G8)
|
||
elif image.mode == '1':
|
||
dither = sixel_dither_get(SIXEL_BUILTIN_G1)
|
||
sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_G1)
|
||
else:
|
||
raise RuntimeError('unexpected image mode')
|
||
try:
|
||
sixel_encode(data, width, height, 1, dither, output)
|
||
print(s.getvalue().decode('ascii'))
|
||
finally:
|
||
sixel_dither_unref(dither)
|
||
finally:
|
||
sixel_output_unref(output)
|
||
|
||
def pdf_image_extractors(pdf, strategy):
|
||
print(f'0/{pdf.page_count} pages analyzed...', end='')
|
||
image_extractors = []
|
||
for (idx, page) in enumerate(pdf):
|
||
xref = get_displayed_image_xref(page)
|
||
if xref is not None and is_single_image(page):
|
||
image_extractors.append(lambda p=pdf, x=xref: extract_image(p, x))
|
||
else:
|
||
page_images = page.get_image_info()
|
||
print(f'\nPage {idx+1}: {len(page_images)} images, {len(relevant_blocks(page))} total relevant objects')
|
||
choice = strategy
|
||
while True:
|
||
if choice.lower().startswith('n'):
|
||
return None
|
||
if choice.lower().startswith('c'):
|
||
if choice == strategy:
|
||
print(f'Converting page {idx+1}')
|
||
image_extractors.append(lambda p=page: { 'ext': 'png', 'image': p.get_pixmap(dpi=PDF_CONVERSION_DPI).tobytes('png') })
|
||
break
|
||
if xref is not None and (choice.lower().startswith('x') or choice.lower() == 'extract'):
|
||
if choice == strategy:
|
||
print(f'Extracting image from page {idx+1} without text')
|
||
image_extractors.append(lambda p=pdf, x=xref: extract_image(p, x))
|
||
break
|
||
if choice.lower().startswith('d'):
|
||
if choice == strategy:
|
||
print(f'Dropping page {idx+1}')
|
||
break
|
||
|
||
if choice.lower().startswith('s'):
|
||
display_sixel_pixmap(page.get_pixmap(dpi=PDF_PREVIEW_DPI).tobytes('png'))
|
||
if xref is not None:
|
||
pixmap = fitz.Pixmap(pdf, xref)
|
||
pixmap.shrink(2)
|
||
display_sixel_pixmap(pixmap.tobytes('png'))
|
||
|
||
choice = input(f'[N]ope out / [c]onvert page{"" if xref is None else " / e[x]tract image"} / [d]rop page / [s]how page? [n/c{"" if xref is None else "/x"}/d/s] ')
|
||
count_progress(idx, pdf.page_count, 'pages analyzed')
|
||
|
||
return image_extractors
|
||
|
||
def normalize_string(s):
|
||
return unicodedata.normalize('NFKC', s.translate(EXTRA_NORMALIZATION_TABLE))
|
||
|
||
def nname(entry):
|
||
return normalize_string(entry.name)
|
||
|
||
def nstem(entry):
|
||
return normalize_string(entry.stem)
|
||
|
||
def best_effort_sort_key(entry):
|
||
result = []
|
||
for token in STRING_TOKENIZE_REGEX.finditer(nstem(entry)):
|
||
if token.lastgroup == 'num':
|
||
if len(result) == 0:
|
||
result.append('') # to prevent failed int/string comparisons against other files
|
||
result.append(int(token.group()))
|
||
else:
|
||
result.append(token.group())
|
||
|
||
debug(f"Tokenized {entry} as {result}")
|
||
return result
|
||
|
||
def complete_prefix_number_ordering(entries):
|
||
if len(entries) == 1:
|
||
return entries
|
||
|
||
entries_by_version = {}
|
||
for entry in entries:
|
||
version_code = 0
|
||
for (i, version) in enumerate(ALT_VERSIONS):
|
||
if version in nname(entry):
|
||
version_code |= (1 << i)
|
||
entries_by_version.setdefault(version_code, []).append(entry)
|
||
|
||
numberings_by_version = {ver: unique_hierarchical_prefix_numbering(entries_by_version[ver]) for ver in entries_by_version}
|
||
|
||
unified_indices = set()
|
||
for numbering in numberings_by_version.values():
|
||
if numbering is None:
|
||
return None
|
||
unified_indices |= set(numbering.keys())
|
||
unified_indices.discard(None)
|
||
unified_indices = list(unified_indices)
|
||
unified_indices.sort()
|
||
|
||
min_delta_by_level = {}
|
||
if len(unified_indices) > 1:
|
||
for i in range(1, len(unified_indices)):
|
||
cur = unified_indices[i]
|
||
prev = unified_indices[i-1]
|
||
for level in range(min(len(cur), len(prev))):
|
||
if cur[level] != prev[level] and not (cur[level] == 5 and prev[level] == 0):
|
||
delta = cur[level] - prev[level]
|
||
min_delta_by_level[level] = min(min_delta_by_level.get(level, delta), delta)
|
||
for level, delta in min_delta_by_level.items():
|
||
if delta > 2:
|
||
debug(f'Found a minimum delta of {delta} at level {level}, this might not be a numbering')
|
||
return None
|
||
|
||
unified_indices.append(None)
|
||
|
||
versions = list(numberings_by_version.keys())
|
||
versions.sort()
|
||
|
||
version_lengths = {ver: len(numberings_by_version[ver]) for ver in numberings_by_version}
|
||
inner_versions = []
|
||
outer_versions = [versions[0]]
|
||
for ver in versions[1:]:
|
||
if version_lengths[ver] >= version_lengths[versions[0]] - 2:
|
||
outer_versions.append(ver)
|
||
else:
|
||
inner_versions.append(ver)
|
||
|
||
result = []
|
||
for out_ver in outer_versions:
|
||
for i in unified_indices:
|
||
for ver in ([out_ver] + (inner_versions if out_ver == versions[0] else [])):
|
||
result += numberings_by_version[ver].get(i, [])
|
||
return result
|
||
|
||
def unique_hierarchical_prefix_numbering(entries, start_point=0):
|
||
if len(entries) == 1 and not NUMBER_REGEX.search(nname(entries[0])):
|
||
return {None: entries}
|
||
|
||
debug(f'Finding unique hierarchical prefix ordering from start point {start_point} for {entries}')
|
||
|
||
longest_entry = max(entries, key=lambda e: len(nname(e)))
|
||
matches = reversed(list(NUMBER_REGEX.finditer(nname(longest_entry))))
|
||
for m in matches:
|
||
pos = m.start()
|
||
if pos < start_point:
|
||
break
|
||
prefix = nname(longest_entry)[:pos]
|
||
debug(f'Checking prefix {prefix}')
|
||
if all(nname(e).startswith(prefix) or prefix.startswith(nstem(e)) for e in entries):
|
||
numbering = {}
|
||
for e in entries:
|
||
if pos >= len(nstem(e)):
|
||
i = 0
|
||
else:
|
||
n = NUMBER_REGEX.match(nname(e)[pos:])
|
||
if n is None:
|
||
debug(f'Entry {e} does not have a number after prefix')
|
||
return None
|
||
i = int(n.group())
|
||
numbering.setdefault((i,), []).append(e)
|
||
|
||
debug(f'Numbering found for prefix {prefix}')
|
||
|
||
indices = list(numbering.keys())
|
||
for idx in indices:
|
||
if len(numbering[idx]) > 1:
|
||
ents_idx = numbering.pop(idx)
|
||
debug(f'Index {idx} has multiple entries')
|
||
longest = max(ents_idx, key=lambda e: len(nname(e)))
|
||
next_match = NUMBER_REGEX.match(nname(longest)[pos:])
|
||
if not next_match:
|
||
return None
|
||
next_layer_start = pos + next_match.end()
|
||
sub_numbering = unique_hierarchical_prefix_numbering(ents_idx, start_point=next_layer_start)
|
||
if not sub_numbering:
|
||
return None
|
||
for sub_idx in sub_numbering:
|
||
numbering[(*idx, *sub_idx)] = sub_numbering[sub_idx]
|
||
|
||
return numbering
|
||
|
||
return alphabetic_numbering(entries, start_point)
|
||
|
||
def alphabetic_numbering(entries, start_point):
|
||
debug(f'Finding alphabetic numbering from start point {start_point} for {entries}')
|
||
alphabetized = {}
|
||
prefix_suffix = None
|
||
for entry in entries:
|
||
ending = nstem(entry)[start_point:]
|
||
debug(f'{entry} has ending {ending}')
|
||
|
||
ending_match = ALPHABETIC_NUMBERING_REGEX.fullmatch(ending)
|
||
if not ending_match:
|
||
debug('Ending has more than one letter, giving up')
|
||
return None
|
||
|
||
current_prefix_suffix = (ending_match.group('prefix'), ending_match.group('suffix') or '')
|
||
if prefix_suffix is None:
|
||
prefix_suffix = current_prefix_suffix
|
||
elif current_prefix_suffix != prefix_suffix:
|
||
debug(f'Ending prefix/suffix does not match {prefix_suffix}, giving up')
|
||
return None
|
||
|
||
ending_letter = (ending_match.group('letter') or '').lower()
|
||
if ending_letter == '':
|
||
index = 0
|
||
elif ending_letter >= 'a' and ending_letter <= 'z':
|
||
index = ord(ending_letter) - ord('a') + 1
|
||
elif ending_letter >= 'a' and ending_letter <= 'z':
|
||
index = ord(ending_letter) - ord('a') + 1
|
||
else:
|
||
debug('Ending is not a letter, giving up')
|
||
return None
|
||
|
||
if (index,) in alphabetized:
|
||
debug(f'Index value {index} is already present, giving up')
|
||
return None
|
||
alphabetized[(index,)] = [entry]
|
||
|
||
return alphabetized
|
||
|
||
def check_extension(path, exts):
|
||
return path.suffix.lower() in exts
|
||
|
||
def is_pdf(path):
|
||
return check_extension(path, ['.pdf'])
|
||
|
||
def is_image(path):
|
||
return check_extension(path, IMAGE_FILE_EXTENSIONS)
|
||
|
||
def ignoreable(path):
|
||
return path.name in IGNOREABLE_FILES or check_extension(path, IGNOREABLE_EXTENSIONS)
|
||
|
||
def ls_ignore(directory, exclude):
|
||
return [
|
||
path for path in directory.iterdir()
|
||
if not ignoreable(path) and path not in exclude
|
||
]
|
||
|
||
def descendant_files_ignore(path, exclude):
|
||
if path.is_file():
|
||
return [path]
|
||
|
||
result = []
|
||
for item in ls_ignore(path, exclude):
|
||
if item.is_dir():
|
||
result.extend(descendant_files_ignore(item, exclude))
|
||
else:
|
||
result.append(item)
|
||
|
||
return result
|
||
|
||
def standalone_image_size(filepath):
|
||
try:
|
||
with Image.open(filepath) as im:
|
||
return im.size
|
||
except UnidentifiedImageError:
|
||
warn(f'PIL failed to load image {filepath}! Retrying with less strict settings')
|
||
PIL.ImageFile.LOAD_TRUNCATED_IMAGES = True
|
||
try:
|
||
with Image.open(filepath) as im:
|
||
return im.size
|
||
finally:
|
||
PIL.ImageFile.LOAD_TRUNCATED_IMAGES = False
|
||
|
||
def pdf_image_sizes(filepath):
|
||
sizes_by_xref = {}
|
||
|
||
with fitz.open(filepath) as pdf:
|
||
for page in pdf:
|
||
for (xref, _, width, height, *_) in page.get_images():
|
||
if xref in sizes_by_xref:
|
||
continue
|
||
sizes_by_xref[xref] = (width, height)
|
||
|
||
return list(sizes_by_xref.values())
|
||
|
||
def median(items):
|
||
if len(items) == 0:
|
||
return None
|
||
|
||
items.sort()
|
||
return items[len(items) // 2]
|
||
|
||
def mean(items):
|
||
if len(items) == 0:
|
||
return None
|
||
|
||
return sum(items) / len(items)
|
||
|
||
def superior_or_equal(a, b):
|
||
return len(a) >= len(b) and all(a[i] >= b[i] for i in range(len(b)))
|
||
|
||
|
||
def parse_expressions(tokens):
|
||
groups = []
|
||
exclusions = []
|
||
|
||
while tokens:
|
||
token = tokens.pop(0)
|
||
if token == '!':
|
||
exclusions.extend(parse_exclusion(tokens))
|
||
elif token == '(':
|
||
groups.append(parse_group(tokens))
|
||
else:
|
||
groups.append([token])
|
||
|
||
return (groups, exclusions)
|
||
|
||
def parse_exclusion(tokens):
|
||
token = tokens.pop(0)
|
||
|
||
if token == '(':
|
||
return parse_group(tokens)
|
||
else:
|
||
return [token]
|
||
|
||
def parse_group(tokens):
|
||
items = []
|
||
|
||
while True:
|
||
token = tokens.pop(0)
|
||
if token == ')':
|
||
return items
|
||
else:
|
||
items.append(token)
|
||
|
||
def normalize_to(path, ref):
|
||
return ref / Path(relpath(path, ref))
|
||
|
||
|
||
def fmt_size(s):
|
||
return f'{s[0]}x{s[1]}px'
|
||
|
||
def analyze(args):
|
||
extract_dir = args.destdir / 'extract'
|
||
files = descendant_files_ignore(extract_dir / args.work_id, [])
|
||
files.sort()
|
||
|
||
for f in files:
|
||
print(f'{relpath(f, extract_dir)}', end='')
|
||
if is_image(f):
|
||
size = standalone_image_size(f)
|
||
print(f'\t{fmt_size(size)}')
|
||
elif is_pdf(f):
|
||
sizes = pdf_image_sizes(f)
|
||
if len(sizes) == 0:
|
||
print('\tContains no images')
|
||
else:
|
||
print(f'\t{len(sizes)} images, median {fmt_size(median(sizes))}, min {fmt_size(min(sizes))}, max {fmt_size(max(sizes))}')
|
||
else:
|
||
print()
|
||
|
||
def metadata(args):
|
||
con = sqlite3.connect(args.destdir / 'meta.db')
|
||
cur = con.cursor()
|
||
|
||
if args.virtual is not None:
|
||
cur.execute("UPDATE works SET virtual = ? WHERE id = ?", (1 if args.virtual else 0, args.work_id))
|
||
con.commit()
|
||
|
||
res = cur.execute(
|
||
"SELECT title, circle, date, description, series, virtual FROM works WHERE id = ?",
|
||
(args.work_id,),
|
||
).fetchone()
|
||
|
||
if res is None:
|
||
print(f'Work id {args.work_id} not found!')
|
||
return
|
||
|
||
(title, circle, date, description, series, virtual) = res
|
||
print(f'Work ID: {args.work_id}')
|
||
print(f'Title: {title}')
|
||
print(f'Circle: {circle}')
|
||
print(f'Pub date: {date}')
|
||
print(f'Description: {description}')
|
||
print(f'Series: {series}')
|
||
print(f'Virtual: {"Yes" if virtual == 1 else "No"}')
|
||
|
||
con.close()
|
||
|
||
def copy_recursive(src, dest):
|
||
dest.mkdir(parents=True, exist_ok=True)
|
||
for item in src.iterdir():
|
||
if item.is_dir() and not item.is_symlink():
|
||
copy_recursive(item, dest / item.name)
|
||
else:
|
||
shutil.copyfile(item, dest / item.name)
|
||
|
||
|
||
memoized_similarities = {}
|
||
def string_similarity(a, b):
|
||
if len(a) < len(b) or (len(a) == len(b) and a < b):
|
||
shorter = a
|
||
longer = b
|
||
else:
|
||
shorter = b
|
||
longer = a
|
||
if len(shorter) == 0:
|
||
return 0
|
||
|
||
if (shorter, longer) in memoized_similarities:
|
||
return memoized_similarities[(shorter, longer)]
|
||
|
||
options = [string_similarity(shorter[1:], longer)]
|
||
for i in range(1, len(shorter)+1):
|
||
match_idx = longer.find(shorter[:i])
|
||
if match_idx == -1:
|
||
break
|
||
options.append(i*i + string_similarity(shorter[i:], longer[match_idx+i:]))
|
||
result = max(options)
|
||
|
||
memoized_similarities[(shorter, longer)] = result
|
||
return result
|
||
|
||
class TopScoreList:
|
||
def __init__(self, limit):
|
||
self.limit = limit
|
||
self.items_with_scores = []
|
||
self.randomized = True
|
||
|
||
def insert(self, item, score):
|
||
if len(self.items_with_scores) >= self.limit and score < self.items_with_scores[-1][1]:
|
||
return [item]
|
||
|
||
self.randomized = False
|
||
for i in range(len(self.items_with_scores) + 1):
|
||
if i == len(self.items_with_scores) or score >= self.items_with_scores[i][1]:
|
||
self.items_with_scores.insert(i, (item, score))
|
||
break
|
||
removed_items = []
|
||
while len(self.items_with_scores) > self.limit and self.items_with_scores[-1][1] < self.items_with_scores[self.limit-1][1]:
|
||
removed_items.append(self.items_with_scores.pop()[0])
|
||
return removed_items
|
||
|
||
def _randomize(self):
|
||
if self.randomized:
|
||
return
|
||
|
||
# shuffle followed by stable sort to randomly shuffle within each score tier
|
||
random.shuffle(self.items_with_scores)
|
||
self.items_with_scores.sort(key=lambda i: i[1], reverse=True)
|
||
self.randomized = True
|
||
|
||
def __iter__(self):
|
||
self._randomize()
|
||
return (item for (item, _) in self.items_with_scores[:self.limit])
|
||
|
||
def generate(args):
|
||
debug('loading templates')
|
||
jenv = Environment(
|
||
loader=PackageLoader("dlibrary"),
|
||
autoescape=select_autoescape()
|
||
)
|
||
viewer_template = jenv.get_template("viewer.html")
|
||
list_template = jenv.get_template("list.html")
|
||
categorization_template = jenv.get_template("categorization.html")
|
||
work_template = jenv.get_template("work.html")
|
||
index_template = jenv.get_template("index.html")
|
||
store_template = jenv.get_template("store.html")
|
||
debug('templates loaded')
|
||
|
||
store_token = ''.join(random.choices(string.ascii_letters + string.digits, k=32))
|
||
|
||
debug('opening main database')
|
||
con = sqlite3.connect(args.destdir / 'meta.db')
|
||
cur = con.cursor()
|
||
debug('main database open')
|
||
|
||
debug('opening suggestion cache database')
|
||
cache_con = sqlite3.connect(args.destdir / 'cache.db')
|
||
cache_cur = cache_con.cursor()
|
||
cache_cur.execute("CREATE TABLE IF NOT EXISTS suggestions(work TEXT, suggested TEXT, similarity INT, PRIMARY KEY(work, suggested))")
|
||
debug('suggestion cache database open')
|
||
cached_suggestions = {}
|
||
for (work, suggested, similarity) in cache_cur.execute('SELECT work, suggested, similarity FROM suggestions'):
|
||
cached_suggestions.setdefault(work, TopScoreList(SUGGESTED_WORKS_COUNT)).insert(suggested, similarity)
|
||
debug('cached suggestions loaded')
|
||
|
||
site_dir = args.destdir / 'site'
|
||
|
||
collated_work_ids = {p.name for p in (site_dir / 'images').iterdir()}
|
||
|
||
works = {}
|
||
debug('checking thumbnail files')
|
||
thumbnail_files = {f.stem: f for f in (site_dir / 'thumbnails').iterdir()}
|
||
debug(f'{len(thumbnail_files)} thumbnail files found')
|
||
debug('running database query for works')
|
||
for (idx, (work_id, title, circle, date, description, series)) in enumerate(cur.execute('SELECT id, title, circle, date, description, series FROM works ORDER BY date DESC').fetchall()):
|
||
if work_id not in collated_work_ids:
|
||
continue
|
||
authors = [author for (author,) in cur.execute('SELECT author FROM authors WHERE work = ?', (work_id,))]
|
||
tags = [tag for (tag,) in cur.execute('SELECT tag FROM tags WHERE work = ?', (work_id,))]
|
||
|
||
images = [path.name for path in (site_dir / 'images' / work_id).iterdir()]
|
||
images.sort()
|
||
|
||
thumbnail_path = relpath(thumbnail_files.get(work_id, site_dir / 'images' / work_id / images[0]), site_dir)
|
||
work = {
|
||
'id': work_id,
|
||
'title': title,
|
||
'circle': circle,
|
||
'date': date,
|
||
'description': description,
|
||
'series': series,
|
||
'authors': authors,
|
||
'tags': tags,
|
||
'thumbnail_path': thumbnail_path,
|
||
'images': images,
|
||
}
|
||
works[work_id] = work
|
||
|
||
print(f'{ANSI_LINECLEAR}{idx+1} database entries read...', end='')
|
||
print()
|
||
|
||
for work in works.values():
|
||
if work['id'] in cached_suggestions:
|
||
continue
|
||
debug(f'Computing suggestions for new work {work["title"]}')
|
||
cached_suggestions[work['id']] = TopScoreList(SUGGESTED_WORKS_COUNT)
|
||
for other_work in works.values():
|
||
if other_work is work:
|
||
continue
|
||
if work['series'] and work['series'] == other_work['series']:
|
||
continue
|
||
if other_work['id'] not in cached_suggestions:
|
||
continue # we'll get to it later
|
||
|
||
similarity = string_similarity(work['title'], other_work['title'])
|
||
cached_suggestions[work['id']].insert(other_work['id'], similarity)
|
||
removed = cached_suggestions[other_work['id']].insert(work['id'], similarity)
|
||
if removed != [work['id']]:
|
||
cache_cur.executemany(
|
||
'DELETE FROM suggestions WHERE work = :work AND suggested = :suggested',
|
||
[{ "work": other_work['id'], "suggested": item } for item in removed],
|
||
)
|
||
cache_cur.execute(
|
||
'INSERT INTO suggestions(work, suggested, similarity) VALUES(:work, :suggested, :similarity)',
|
||
{ "work": other_work['id'], "suggested": work['id'], "similarity": similarity },
|
||
)
|
||
cache_cur.executemany(
|
||
'INSERT INTO suggestions(work, suggested, similarity) VALUES(:work, :suggested, :similarity)',
|
||
[{ "work": work['id'], "suggested": suggested, "similarity": similarity } for (suggested, similarity) in cached_suggestions[work['id']].items_with_scores],
|
||
)
|
||
cache_con.commit()
|
||
|
||
for (idx, work) in enumerate(works.values()):
|
||
work_dir = site_dir / 'works' / work['id']
|
||
viewer_dir = work_dir / 'view'
|
||
viewer_dir.mkdir(parents=True, exist_ok=True)
|
||
with open(work_dir / 'index.html', 'w') as f:
|
||
f.write(work_template.render(
|
||
depth=2, work=work, title=work['title'],
|
||
suggested=[works[suggested] for suggested in cached_suggestions[work['id']]],
|
||
))
|
||
with open(viewer_dir / 'index.html', 'w') as f:
|
||
f.write(viewer_template.render(depth=3, work=work, title=work['title'], token=store_token))
|
||
|
||
count_progress(idx, len(works), 'works processed')
|
||
|
||
uca = pyuca.Collator().sort_key
|
||
def make_categorization(categorization, query, work_filter, work_style_cards=False):
|
||
categorization_dir = site_dir / categorization
|
||
|
||
cats = sorted((cat for (cat,) in cur.execute(query)), key=uca)
|
||
cat_samples = {}
|
||
for (idx, cat) in enumerate(cats):
|
||
cat_works = list(filter(work_filter(cat), works.values()))
|
||
cat_samples[cat] = cat_works[0] if len(cat_works) > 0 else None
|
||
|
||
safeish_cat = cat.replace('/', ' ')
|
||
cat_dir = categorization_dir / safeish_cat
|
||
cat_dir.mkdir(parents=True, exist_ok=True)
|
||
with open(cat_dir / 'index.html', 'w') as f:
|
||
f.write(list_template.render(
|
||
depth=2,
|
||
works=cat_works,
|
||
title=cat,
|
||
categorization=categorization,
|
||
))
|
||
count_progress(idx, len(cats), f'{categorization} processed')
|
||
|
||
categorization_dir.mkdir(parents=True, exist_ok=True)
|
||
with open(categorization_dir / 'index.html', 'w') as f:
|
||
f.write(categorization_template.render(
|
||
depth=1,
|
||
categorization=categorization,
|
||
categories=cats,
|
||
samples=cat_samples,
|
||
work_style_cards=work_style_cards,
|
||
))
|
||
|
||
make_categorization(
|
||
'authors',
|
||
'SELECT DISTINCT author FROM authors',
|
||
lambda author: lambda work: author in work['authors'],
|
||
)
|
||
make_categorization(
|
||
'tags',
|
||
'SELECT DISTINCT tag FROM tags',
|
||
lambda tag: lambda work: tag in work['tags'],
|
||
)
|
||
make_categorization(
|
||
'circles',
|
||
'SELECT DISTINCT circle FROM works WHERE circle NOT NULL',
|
||
lambda circle: lambda work: work['circle'] == circle,
|
||
)
|
||
make_categorization(
|
||
'series',
|
||
'SELECT DISTINCT series FROM works WHERE series NOT NULL',
|
||
lambda series: lambda work: work['series'] == series,
|
||
work_style_cards=True,
|
||
)
|
||
|
||
debug('copying static files')
|
||
with resources.as_file(resources.files("dlibrary")) as r:
|
||
copy_recursive(r / 'static', site_dir / 'static')
|
||
debug('static files copied')
|
||
|
||
debug('writing index page')
|
||
with open(site_dir / 'index.html', 'w') as f:
|
||
f.write(index_template.render(depth=0, works=list(works.values()), token=store_token))
|
||
debug('index page written')
|
||
|
||
debug('writing store iframe page')
|
||
with open(site_dir / 'store.html', 'w') as f:
|
||
f.write(store_template.render(depth=0, token=store_token))
|
||
debug('store iframe page written')
|
||
|
||
debug('closing cache database')
|
||
cache_con.close()
|
||
debug('cache database closed')
|
||
|
||
debug('closing main database')
|
||
con.close()
|
||
debug('main database closed')
|
||
|
||
|
||
argparser = argparse.ArgumentParser(
|
||
prog='dlibrary',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
description=textwrap.dedent("""\
|
||
Organize DRM-free works purchased from DLSite into a library
|
||
that can be viewed in a web browser.
|
||
|
||
Intended workflow:
|
||
- `extract` a collection of archive files into DLibrary's data
|
||
directory, automatically giving each work its own subfolder.
|
||
- `fetch` metadata and thumbnail images for extracted works
|
||
from DLSite.
|
||
- `collate` extracted works, producing a single sequence of
|
||
image files (or symlinks into the extracted data, when
|
||
possible) for each work.
|
||
- Manually adjust works' `metadata` when necessary.
|
||
- `generate` a static website providing a catalog and viewer
|
||
for all collated works.
|
||
"""),
|
||
)
|
||
|
||
argparser.add_argument(
|
||
'-d', '--destdir',
|
||
type=Path,
|
||
default=Path(os.getenv('DLIBRARY_DIR', './dlibrary')),
|
||
help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)',
|
||
)
|
||
argparser.add_argument(
|
||
'-D', '--debug',
|
||
action='store_true',
|
||
help='print out debugging info',
|
||
)
|
||
argparser.add_argument(
|
||
'-l', '--locale',
|
||
type=str,
|
||
default=os.getenv('DLIBRARY_LOCALE', 'en_US'),
|
||
help=('preferred locale for requesting metadata and collating (e.g. "ja_JP", "en_US"). '
|
||
'May still fall back to Japanese if other languages are unavailable. '
|
||
'(default: $DLIBRARY_LOCALE or en_US)'),
|
||
)
|
||
argparser.add_argument(
|
||
'-a', '--auto',
|
||
action='store_true',
|
||
help='automatically continue the extract->fetch->collate->generate pipeline starting from whatever subcommand is being run',
|
||
)
|
||
subparsers = argparser.add_subparsers(title="subcommands", required=True)
|
||
|
||
parser_extract = subparsers.add_parser('extract', aliases=['x'], help='extract archive files')
|
||
parser_extract.add_argument(
|
||
'-r', '--remove',
|
||
action='store_true',
|
||
help='remove original archive files after extraction',
|
||
)
|
||
parser_extract.add_argument(
|
||
'archives',
|
||
metavar='FILE',
|
||
type=Path,
|
||
nargs='+',
|
||
help='archive files to extract',
|
||
)
|
||
parser_extract.set_defaults(func=extract)
|
||
|
||
parser_fetch = subparsers.add_parser('fetch', aliases=['f'], help='fetch metadata and thumbnails')
|
||
parser_fetch.set_defaults(func=fetch)
|
||
|
||
parser_collate = subparsers.add_parser(
|
||
'collate',
|
||
aliases=['c'],
|
||
help='collate works into sequences of image files',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
description=textwrap.dedent("""\
|
||
For each extracted work that has not already been collated,
|
||
DLibrary will attempt to intuit its structure and create
|
||
a single ordered list of image files in the site data
|
||
directory. Each image will either be a symlink to an image
|
||
file in the extraction folder, or a single page extracted
|
||
from a PDF file.
|
||
|
||
DLibrary may fail to automatically collate a work if its
|
||
files and subdirectories are not named in a way that
|
||
indicates a clear linear ordering. In order to assist with
|
||
collation, you can provide a list of expressions specifying
|
||
where to start traversing the directory structure, what
|
||
files to include in what order, and/or what files to ignore
|
||
entirely.
|
||
|
||
An expression can be:
|
||
|
||
PATH
|
||
A single path. If this is an image, it will be appended to
|
||
the sequence of collated images for the work it belongs to;
|
||
if this is a PDF, images will be extracted from it and
|
||
concatenated to the sequence; if this is a directory, the
|
||
contents of the directory will be automatically collated
|
||
using DLibrary's default heuristics, and concatenated
|
||
to the sequence.
|
||
|
||
( PATH [PATH ...] )
|
||
A group of paths contained in parentheses. You may need to escape
|
||
the parentheses to avoid them getting parsed by your shell.
|
||
All the paths in this group will be considered together, and
|
||
automatically collated using the default heuristics, regardless
|
||
of what order the paths are provided in.
|
||
|
||
! PATH
|
||
! ( PATH [PATH ...] )
|
||
A path or group of paths to exclude from collation. You may
|
||
need to escape the !. If an excluded path appears within any
|
||
of the other specified paths, it will be skipped by the collation
|
||
heuristics.
|
||
|
||
If the only expressions provided are negations, then auto-collation
|
||
will start from the top level of the extracted work while skipping
|
||
the excluded paths.
|
||
|
||
All provided paths must be under $DLIBRARY_DIR/extract/[work id]/
|
||
for some not-yet-collated work. Paths belonging to multiple
|
||
different works can all be provided on the same command line, and
|
||
expressions will be clustered together by work id while otherwise
|
||
preserving the order they were provided in. A parenthesized group
|
||
expression must only contain paths belonging to a single work.
|
||
|
||
By default, DLibrary will attempt to collate every not-yet-collated
|
||
work (excluding "virtual" works), using the provided expressions
|
||
to assist in collation when available. The `-o` flag will direct
|
||
DLibrary to *only* collate works included in the provided expressions,
|
||
even if other uncollated works are present.
|
||
"""),
|
||
)
|
||
parser_collate.add_argument(
|
||
'-o', '--only-specified-works',
|
||
action='store_true',
|
||
help="only collate works that are explicitly specified",
|
||
)
|
||
parser_collate.add_argument(
|
||
'-s', '--sort',
|
||
action='store_true',
|
||
help="apply a best-effort sorting algorithm when the ordering of image files is unclear",
|
||
)
|
||
parser_collate.add_argument(
|
||
'-p', '--pdf-strategy',
|
||
choices=[
|
||
'ask', '?',
|
||
'show-ask', 's',
|
||
'convert', 'c',
|
||
'extract', 'x',
|
||
'drop', 'd',
|
||
'nope', 'n'
|
||
],
|
||
default='show-ask',
|
||
help="how to handle PDF pages that aren't a single image with no text",
|
||
)
|
||
parser_collate.add_argument(
|
||
'expression',
|
||
nargs='*',
|
||
help='expressions indicating paths to collate or skip',
|
||
)
|
||
parser_collate.set_defaults(func=collate)
|
||
|
||
|
||
parser_analyze = subparsers.add_parser('analyze', aliases=['a'], help='analyze an extracted folder to assist in collation')
|
||
parser_analyze.add_argument('work_id')
|
||
parser_analyze.set_defaults(func=analyze)
|
||
|
||
parser_metadata = subparsers.add_parser('metadata', aliases=['m'], help='view or modify metadata for a work')
|
||
parser_metadata.add_argument('work_id')
|
||
parser_metadata.add_argument(
|
||
'--virtual',
|
||
action=argparse.BooleanOptionalAction,
|
||
help='set work as virtual',
|
||
)
|
||
parser_metadata.set_defaults(func=metadata)
|
||
|
||
parser_generate = subparsers.add_parser(
|
||
'generate',
|
||
aliases=['g'],
|
||
help='generate HTML/CSS/JS for library site',
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
description=textwrap.dedent("""\
|
||
The static site will be generated under $DLIBRARY_DIR/site/
|
||
and can be served by pointing an HTTP server at that
|
||
directory. Note that some files inside the static site
|
||
hierarchy will be symlinks into $DLIBRARY_DIR/extract/
|
||
outside the site hierarchy, so make sure your HTTP server
|
||
will allow those symlinks to be read.
|
||
"""),
|
||
)
|
||
parser_generate.set_defaults(func=generate)
|
||
|
||
def main():
|
||
args = argparser.parse_args()
|
||
|
||
global debug_mode
|
||
debug_mode = args.debug
|
||
|
||
args.func(args)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|