dlibrary/dlibrary/dlibrary.py

1089 lines
38 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
import argparse
import asyncio
import importlib_resources as resources
from pathlib import Path
import os
from os.path import relpath, splitext
import re
import readline
import shutil
import sqlite3
import textwrap
import unicodedata
from urllib.parse import urlparse
import zipfile
from dlsite_async import DlsiteAPI
import fitz
from PIL import Image
from jinja2 import Environment, PackageLoader, select_autoescape
import requests
NUMBER_REGEX = re.compile('[0-9-]+')
DLSITE_ID_REGEX = re.compile('^[BR]J[0-9]+$')
FANZA_ID_REGEX = re.compile('^d_[0-9]+$')
FAKKU_ID_REGEX = re.compile('.*_FAKKU$')
HI_RES_REGEX = re.compile('高解像度', re.I)
NO_TONE_REGEX = re.compile('トーン(効果)?[な無]し|グレースケール', re.I)
TONE_REGEX = re.compile('トーン(版|(効果)?[有あ]り)', re.I)
COLOR_REGEX = re.compile('カラー', re.I)
MONOCHROME_REGEX = re.compile('モノクロ', re.I)
IMAGE_QUALITY_REGEXES = [
{ 'better': HI_RES_REGEX },
{ 'better': NO_TONE_REGEX, 'worse': TONE_REGEX },
{ 'better': COLOR_REGEX, 'worse': MONOCHROME_REGEX },
]
LANGUAGE_REGEXES = {
'en_US': re.compile('english|英語', re.I),
'ja_JP': re.compile('日本語', re.I),
'zh_CN': re.compile('(^|[^體])中文|中国語', re.I),
'zh_TW': re.compile('繁體中文', re.I),
'ko_KR': re.compile('한국어', re.I),
}
TEXTLESS_REGEX = re.compile('(台詞|セリフ|せりふ|テキスト|文字)((な|無)し|抜き)|notext|textless', re.I)
FRONT_COVER_REGEX = re.compile('(^|[^裏])表紙|cover|hyoushi', re.I)
BACK_COVER_REGEX = re.compile('裏表紙', re.I)
BONUS_REGEX = re.compile('設定|キャラ|特典|ポスター', re.I)
EPILOGUE_REGEX = re.compile('after|後日談|おまけ', re.I)
SPLITS = [
{ 'later': TEXTLESS_REGEX },
{ 'earlier': FRONT_COVER_REGEX, 'later': BACK_COVER_REGEX },
{ 'later': BONUS_REGEX },
{ 'later': EPILOGUE_REGEX },
]
ALT_VERSIONS = [
'褐色',
'日焼け',
'pink',
'金髪',
'白肌',
'うつろ目',
'dark skin',
'ラバー',
'ゾンビ肌',
'マスク',
'アヘ顔',
]
IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff', '.bmp']
IGNOREABLE_FILES = ['Thumbs.db', '__MACOSX', '.DS_Store']
IGNOREABLE_EXTENSIONS = ['.txt', '.html', '.htm', '.psd', '.mp4']
def open_zipfile_with_encoding(path):
try:
return zipfile.ZipFile(path, metadata_encoding="utf-8")
except UnicodeDecodeError:
pass
try:
return zipfile.ZipFile(path, metadata_encoding="shift-jis")
except UnicodeDecodeError:
pass
return zipfile.ZipFile(path, metadata_encoding="shift-jisx0213")
def extract(args):
for zip_path in args.zipfiles:
work_id = zip_path.stem
work_extract_path = args.destdir / 'extract' / work_id
work_extract_path.mkdir(parents=True)
print(f'Extracting {zip_path} to {work_extract_path}')
with open_zipfile_with_encoding(zip_path) as z:
z.extractall(path=work_extract_path)
if args.remove:
zip_path.unlink()
def manual_input_metadata(work_id):
print(f"Don't know how to fetch metadata for {work_id}, input manually:")
title = input('Title: ')
circle = input('Circle [None]: ') or None
authors = [author.strip() for author in input('Authors (comma-separated): ').split(',') if author.strip()]
tags = [tag.strip() for tag in input('Tags (comma-separated): ').split(',') if tag.strip()]
date = input('Pub date (yyyy-mm-dd): ')
description = input('Description: ')
series = input('Series [None]: ') or None
return {
"id": work_id,
"title": title,
"circle": circle,
"authors": authors,
"tags": tags,
"date": date,
"description": description,
"series": series,
}
async def fetch_async(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)")
cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id), PRIMARY KEY(author, work))")
cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id), PRIMARY KEY(tag, work))")
thumbnails_dir = args.destdir / 'site' / 'thumbnails'
thumbnails_dir.mkdir(parents=True, exist_ok=True)
async with DlsiteAPI(locale=args.locale) as api:
for work_path in (args.destdir / 'extract').iterdir():
work_id = work_path.name
res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,))
if res.fetchone() is not None:
continue
if DLSITE_ID_REGEX.fullmatch(work_id):
print(f'Fetching DLSite metadata for {work_id}')
dlsite_metadata = await api.get_work(work_id)
db_row = {
"id": work_id,
"title": dlsite_metadata.work_name,
"circle": dlsite_metadata.circle,
"date": dlsite_metadata.regist_date.date().isoformat(),
"description": dlsite_metadata.description,
"series": dlsite_metadata.series,
}
authors = dlsite_metadata.author or []
tags = dlsite_metadata.genre or []
thumbnail_url = dlsite_metadata.work_image
if thumbnail_url.startswith('//'):
thumbnail_url = 'https:' + thumbnail_url
else:
db_row = manual_input_metadata(work_id)
authors = db_row.pop('authors')
tags = db_row.pop('tags')
if FANZA_ID_REGEX.fullmatch(work_id):
thumbnail_url = f'https://doujin-assets.dmm.co.jp/digital/comic/{work_id}/{work_id}pl.jpg'
elif FAKKU_ID_REGEX.fullmatch(work_id):
thumbnail_url = None
else:
thumbnail_url = input('Thumbnail image URL [default: first page]: ')
cur.execute(
"INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)",
db_row,
)
cur.executemany(
"INSERT INTO authors VALUES(:author, :work)",
[{ "author": author, "work": work_id } for author in authors],
)
cur.executemany(
"INSERT INTO tags VALUES(:tag, :work)",
[{ "tag": tag, "work": work_id } for tag in tags],
)
if thumbnail_url:
ext = url_file_ext(thumbnail_url)
dest_file = thumbnails_dir / (work_id + ext)
print(f'Downloading thumbnail for {work_id} from {thumbnail_url}')
with open(dest_file, 'wb') as fd:
with requests.get(thumbnail_url, stream=True) as r:
for chunk in r.iter_content(chunk_size=16384):
fd.write(chunk)
con.commit()
con.close()
def url_file_ext(url):
return splitext(urlparse(url).path)[1]
def fetch(args):
asyncio.run(fetch_async(args))
def collate(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
extraction_dir = args.destdir / 'extract'
hint_map = {Path(relpath(hint, extraction_dir)).parents[-2].name: hint for hint in args.hints}
collation_staging_area = args.destdir / 'site' / 'images-staging'
collation_staging_area.mkdir(parents=True)
collation_area = args.destdir / 'site' / 'images'
collation_area.mkdir(parents=True, exist_ok=True)
for work_path in extraction_dir.iterdir():
work_id = work_path.name
work_collation_dir = collation_area / work_id
if work_collation_dir.exists():
continue
virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
if virtual == (1,):
continue
work_staging_dir = collation_staging_area / work_id
collator = Collator(work_staging_dir, [], args.locale)
collation_result = collator.collate_from_paths([hint_map.get(work_id, work_path)])
if collation_result and collator.index > 0:
print(f'Collated {collator.index} pages for {work_id}')
work_staging_dir.rename(work_collation_dir)
else:
if work_staging_dir.is_dir():
for f in work_staging_dir.iterdir():
f.unlink()
work_staging_dir.rmdir()
if not collation_result:
print(f'Unable to deduce file structure for {work_id}, skipping')
elif collator.index == 0:
print(f'{work_id} contains no files? skipping')
collation_staging_area.rmdir()
con.close()
class Collator:
def __init__(self, dest, exclude, locale):
self.dest = dest
self.exclude = exclude
self.locale = locale
self.index = 0
def collate_from_paths(self, srcs):
if len(srcs) == 1 and srcs[0].is_dir():
return self.collate_from_paths(ls_ignore(srcs[0], self.exclude))
if len(srcs) == 1 and is_pdf(srcs[0]):
print(f'Extracting images from {srcs[0]}')
return self.link_pdf(srcs[0])
if len(srcs) == 0:
return True
select_language = self.try_collate_select_language(srcs)
if select_language is not False:
return select_language
if len(srcs) == 2 and all(src.is_dir() for src in srcs):
for quality in IMAGE_QUALITY_REGEXES:
def a_not_b(a, b, src):
if a in quality:
return quality[a].search(nname(src))
else:
return not quality[b].search(nname(src))
better_srcs = [src for src in srcs if a_not_b('better', 'worse', src)]
worse_srcs = [src for src in srcs if a_not_b('worse', 'better', src)]
if len(better_srcs) == 1 and len(worse_srcs) == 1 and better_srcs[0] != worse_srcs[0]:
better = better_srcs[0]
worse = worse_srcs[0]
if len(descendant_files_ignore(better, self.exclude)) == len(descendant_files_ignore(worse, self.exclude)):
return self.collate_from_paths([better])
images_vs_pdf = self.try_collate_images_vs_pdf(srcs)
if images_vs_pdf is not False:
return images_vs_pdf
for regexes in SPLITS:
split_attempt = self.try_collate_split_regex(srcs, **regexes)
if split_attempt is not False:
return split_attempt
if all(src.is_file() and is_image(src) for src in srcs):
ordering = complete_prefix_number_ordering(srcs)
if ordering:
print(f'Symlinking image files: {ordering[0]}...')
return self.link_ordered_files(ordering)
else:
return None
return None
def link_pdf(self, src):
with fitz.open(src) as pdf:
xrefs = image_xrefs(pdf)
if xrefs is None:
print(f'Support for weirder PDFs not yet implemented, skipping {src}')
return None
self.dest.mkdir(parents=True, exist_ok=True)
for (idx, xref) in enumerate(xrefs, start=self.index):
image = pdf.extract_image(xref)
file_path = self.dest / f'{idx:04d}.{image["ext"]}'
with open(file_path, 'wb') as f:
f.write(image["image"])
self.index += pdf.page_count
return True
def link_ordered_files(self, ordering):
self.dest.mkdir(parents=True, exist_ok=True)
for (idx, src_path) in enumerate(ordering, start=self.index):
ext = src_path.suffix.lower()
link_path = self.dest / f'{idx:04d}{ext}'
link_path.symlink_to(relpath(src_path, self.dest))
self.index += len(ordering)
return True
def try_collate_split_regex(self, srcs, earlier=None, later=None):
early_srcs = []
middle_srcs = []
late_srcs = []
for src in srcs:
if earlier and earlier.search(nname(src)):
early_srcs.append(src)
elif later and later.search(nname(src)):
late_srcs.append(src)
else:
middle_srcs.append(src)
if sum(1 for l in [early_srcs, middle_srcs, late_srcs] if l) <= 1:
return False
early_page_collation = self.collate_from_paths(early_srcs)
if early_page_collation is None:
return None
middle_page_collation = self.collate_from_paths(middle_srcs)
if middle_page_collation is None:
return None
late_page_collation = self.collate_from_paths(late_srcs)
if late_page_collation is None:
return None
return True
def try_collate_images_vs_pdf(self, srcs):
pdfs = [src for src in srcs if 'pdf' in src.name.lower()]
if len(pdfs) != 1:
return False
outer_pdf = pdfs[0]
inner_pdfs = [f for f in descendant_files_ignore(outer_pdf, self.exclude) if is_pdf(f)]
if len(inner_pdfs) != 1:
return False
inner_pdf = inner_pdfs[0]
non_pdf_srcs = [src for src in srcs if src != outer_pdf]
images = []
non_images = []
descendant_files = [f for src in non_pdf_srcs for f in descendant_files_ignore(src, self.exclude)]
for f in descendant_files:
if is_image(f):
images.append(f)
else:
non_images.append(f)
break
if len(non_images) != 0 or len(images) == 0:
return False
pdf_sizes = pdf_image_sizes(inner_pdf)
standalone_sizes = [standalone_image_size(f) for f in images]
median_pdf_size = median(pdf_sizes)
median_standalone_size = median(standalone_sizes)
if not (median_pdf_size and median_standalone_size):
return False
if abs(len(pdf_sizes) - len(standalone_sizes)) > 2:
with fitz.open(inner_pdf) as pdf:
pdf_page_count = len(pdf)
height_adjusted_pdf_image_count = (
len(pdf_sizes) *
mean([size[1] for size in pdf_sizes]) / mean([size[1] for size in standalone_sizes])
)
if (
abs(pdf_page_count - len(standalone_sizes)) <= 2 and
len(pdf_sizes) > len(standalone_sizes) and
median_pdf_size[0] == median_standalone_size[0] and
abs(height_adjusted_pdf_image_count - len(standalone_sizes)) <= 2
):
return self.collate_from_paths(non_pdf_srcs)
else:
return False
if superior_or_equal(median_standalone_size, median_pdf_size):
return self.collate_from_paths(non_pdf_srcs)
elif superior_or_equal(median_pdf_size, median_standalone_size):
return self.collate_from_paths([outer_pdf])
else:
return False
def try_collate_select_language(self, srcs):
if self.locale not in LANGUAGE_REGEXES:
return False
if not all(any(lang.search(nname(src)) for lang in LANGUAGE_REGEXES.values()) for src in srcs):
return False
srcs_matching_language = [src for src in srcs if LANGUAGE_REGEXES[self.locale].search(nname(src))]
if len(srcs_matching_language) == len(srcs) or len(srcs_matching_language) == 0:
return False
return self.collate_from_paths(srcs_matching_language)
def image_xrefs(pdf):
images_by_page = [page.get_images() for page in pdf]
if all(len(images) == 1 for images in images_by_page):
return [images[0][0] for images in images_by_page]
print("Checking PDF images the quick way failed, trying the slow way")
xrefs = []
for (idx, page) in enumerate(pdf):
print(f'\x1b[2K\r{idx}/{pdf.page_count} pages processed...', end='')
images = page.get_image_info(xrefs=True)
if len(images) != 1 or images[0]['xref'] == 0:
print('\nFailed')
return None
xrefs.append(images[0]['xref'])
print('\nSuccess')
return xrefs
def nfc(s):
return unicodedata.normalize('NFC', s)
def nname(entry):
return nfc(entry.name)
def complete_prefix_number_ordering(entries):
if len(entries) == 1:
return entries
entries_by_version = {}
for entry in entries:
version_code = 0
for (i, version) in enumerate(ALT_VERSIONS):
if version in nname(entry):
version_code |= (1 << i)
entries_by_version.setdefault(version_code, []).append(entry)
numberings_by_version = {ver: unique_hierarchical_prefix_numbering(entries_by_version[ver]) for ver in entries_by_version}
unified_indices = set()
for numbering in numberings_by_version.values():
if numbering is None:
return None
unified_indices |= set(numbering.keys())
unified_indices.discard(None)
unified_indices = list(unified_indices)
unified_indices.sort()
min_delta_by_level = {}
if len(unified_indices) > 1:
for i in range(1, len(unified_indices)):
cur = unified_indices[i]
prev = unified_indices[i-1]
for level in range(min(len(cur), len(prev))):
if cur[level] != prev[level]:
delta = cur[level] - prev[level]
min_delta_by_level[level] = min(min_delta_by_level.get(level, delta), delta)
if any(delta > 2 for delta in min_delta_by_level.values()):
return None
unified_indices.append(None)
versions = list(numberings_by_version.keys())
versions.sort()
version_lengths = {ver: len(numberings_by_version[ver]) for ver in numberings_by_version}
inner_versions = []
outer_versions = [versions[0]]
for ver in versions[1:]:
if version_lengths[ver] >= version_lengths[versions[0]] - 2:
outer_versions.append(ver)
else:
inner_versions.append(ver)
result = []
for out_ver in outer_versions:
for i in unified_indices:
for ver in ([out_ver] + (inner_versions if out_ver == versions[0] else [])):
result += numberings_by_version[ver].get(i, [])
return result
def unique_hierarchical_prefix_numbering(entries, start_point=0):
if len(entries) == 1 and not NUMBER_REGEX.search(nname(entries[0])):
return {None: entries}
longest_entry = max(entries, key=lambda e: len(nname(e)))
matches = reversed(list(NUMBER_REGEX.finditer(nname(longest_entry))))
for m in matches:
pos = m.start()
if pos < start_point:
return None
prefix = nname(longest_entry)[:pos]
if all(nname(e).startswith(prefix) or prefix.startswith(nfc(e.stem)) for e in entries):
numbering = {}
for e in entries:
if pos >= len(nfc(e.stem)):
i = 0
else:
n = NUMBER_REGEX.match(nname(e)[pos:])
if n is None:
return None
i = int(n.group())
numbering.setdefault((i,), []).append(e)
indices = list(numbering.keys())
for idx in indices:
if len(numbering[idx]) > 1:
ents_idx = numbering.pop(idx)
longest = max(ents_idx, key=lambda e: len(nname(e)))
next_layer_start = pos + NUMBER_REGEX.match(nname(longest)[pos:]).end()
sub_numbering = unique_hierarchical_prefix_numbering(ents_idx, start_point=next_layer_start) or alphabetic_numbering(ents_idx, next_layer_start)
if not sub_numbering:
return None
for sub_idx in sub_numbering:
numbering[(*idx, *sub_idx)] = sub_numbering[sub_idx]
return numbering
return None
def alphabetic_numbering(entries, start_point):
alphabetized = {}
for entry in entries:
ending = entry.stem[start_point:]
if len(ending) > 1:
return None
index = 0 if ending == '' else ord(ending.lower()) - ord('a') + 1
if (index,) in alphabetized:
return None
alphabetized[(index,)] = [entry]
indices = list(alphabetized.keys())
indices.sort()
if indices != [(i,) for i in range(len(indices))]:
return None
return alphabetized
def check_extension(path, exts):
return path.suffix.lower() in exts
def is_pdf(path):
return check_extension(path, ['.pdf'])
def is_image(path):
return check_extension(path, IMAGE_FILE_EXTENSIONS)
def ignoreable(path):
return path.name in IGNOREABLE_FILES or check_extension(path, IGNOREABLE_EXTENSIONS)
def ls_ignore(directory, exclude):
return [
path for path in directory.iterdir()
if not ignoreable(path) and path not in exclude
]
def descendant_files_ignore(path, exclude):
if path.is_file():
return [path]
result = []
for item in ls_ignore(path, exclude):
if item.is_dir():
result.extend(descendant_files_ignore(item, exclude))
else:
result.append(item)
return result
def standalone_image_size(filepath):
with Image.open(filepath) as im:
return im.size
def pdf_image_sizes(filepath):
sizes_by_xref = {}
with fitz.open(filepath) as pdf:
for page in pdf:
for (xref, _, width, height, *_) in page.get_images():
if xref in sizes_by_xref:
continue
sizes_by_xref[xref] = (width, height)
return list(sizes_by_xref.values())
def median(items):
if len(items) == 0:
return None
items.sort()
return items[len(items) // 2]
def mean(items):
if len(items) == 0:
return None
return sum(items) / len(items)
def superior_or_equal(a, b):
return len(a) >= len(b) and all(a[i] >= b[i] for i in range(len(b)))
def self_and_parents(path):
return [path] + list(path.parents)
def parse_expressions(tokens):
groups = []
exclusions = []
while tokens:
token = tokens.pop(0)
if token == '!':
exclusions.extend(parse_exclusion(tokens))
elif token == '(':
groups.append(parse_group(tokens))
else:
groups.append([token])
return (groups, exclusions)
def parse_exclusion(tokens):
token = tokens.pop(0)
if token == '(':
return parse_group(tokens)
else:
return [token]
def parse_group(tokens):
items = []
while True:
token = tokens.pop(0)
if token == ')':
return items
else:
items.append(token)
def normalize_to(path, ref):
return ref / Path(relpath(path, ref))
def manual_collate(args):
(raw_groups, raw_exclusions) = parse_expressions(args.expression)
extraction_dir = args.destdir / 'extract'
sample_path = next(path for group in (raw_groups + [raw_exclusions]) for path in group)
work_id = Path(relpath(sample_path, extraction_dir)).parents[-2].name
exclusions = [normalize_to(item, args.destdir) for item in raw_exclusions]
if raw_groups:
groups = [[normalize_to(item, args.destdir) for item in group] for group in raw_groups]
else:
groups = [[extraction_dir / work_id]]
collation_area = args.destdir / 'site' / 'images'
collation_area.mkdir(parents=True, exist_ok=True)
work_collation_dir = collation_area / work_id
if work_collation_dir.exists():
if len(list(work_collation_dir.iterdir())) > 0:
print(f'Collation directory already exists!')
return
else:
work_collation_dir.rmdir()
nonexistent = [path for group in (groups + [exclusions]) for path in group if not path.exists()]
if len(nonexistent) > 0:
print(f'Nonexistent paths: {nonexistent}')
return
collation_staging_area = args.destdir / 'site' / 'images-staging'
work_staging_dir = collation_staging_area / work_id
work_staging_dir.mkdir(parents=True)
collator = Collator(work_staging_dir, exclusions, args.locale)
for group in groups:
collation_result = collator.collate_from_paths([item for item in group if item not in exclusions])
if collation_result is None:
print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}')
break
if collation_result and collator.index > 0:
print(f'Collated {collator.index} pages for {work_id}')
work_staging_dir.rename(work_collation_dir)
else:
for f in work_staging_dir.iterdir():
f.unlink()
work_staging_dir.rmdir()
if collation_result and collator.index == 0:
print(f'No files found for {work_id}')
collation_staging_area.rmdir()
def fmt_size(s):
return f'{s[0]}x{s[1]}px'
def analyze(args):
extract_dir = args.destdir / 'extract'
files = descendant_files_ignore(extract_dir / args.work_id, [])
files.sort()
for f in files:
print(f'{relpath(f, extract_dir)}', end='')
if is_image(f):
size = standalone_image_size(f)
print(f'\t{fmt_size(size)}')
elif is_pdf(f):
sizes = pdf_image_sizes(f)
if len(sizes) == 0:
print(f'\tContains no images')
else:
print(f'\t{len(sizes)} images, median {fmt_size(median(sizes))}, min {fmt_size(min(sizes))}, max {fmt_size(max(sizes))}')
else:
print()
def metadata(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
if args.virtual is not None:
cur.execute("UPDATE works SET virtual = ? WHERE id = ?", (1 if args.virtual else 0, args.work_id))
con.commit()
res = cur.execute(
"SELECT title, circle, date, description, series, virtual FROM works WHERE id = ?",
(args.work_id,),
).fetchone()
if res is None:
print(f'Work id {args.work_id} not found!')
return
(title, circle, date, description, series, virtual) = res
print(f'Work ID: {args.work_id}')
print(f'Title: {title}')
print(f'Circle: {circle}')
print(f'Pub date: {date}')
print(f'Description: {description}')
print(f'Series: {series}')
print(f'Virtual: {"Yes" if virtual == 1 else "No"}')
con.close()
def copy_recursive(src, dest):
dest.mkdir(parents=True, exist_ok=True)
for item in src.iterdir():
if item.is_dir() and not item.is_symlink():
copy_recursive(item, dest / item.name)
else:
shutil.copyfile(item, dest / item.name)
def generate(args):
jenv = Environment(
loader=PackageLoader("dlibrary"),
autoescape=select_autoescape()
)
viewer_template = jenv.get_template("viewer.html")
list_template = jenv.get_template("list.html")
categorization_template = jenv.get_template("categorization.html")
work_template = jenv.get_template("work.html")
index_template = jenv.get_template("index.html")
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
site_dir = args.destdir / 'site'
collated_work_ids = {p.name for p in (site_dir / 'images').iterdir()}
actual_series = {series for (series,) in cur.execute('SELECT series FROM works GROUP BY series HAVING count(series) > 1')}
works = []
for (work_id, title, circle, date, description, series) in cur.execute('SELECT id, title, circle, date, description, series FROM works ORDER BY date DESC').fetchall():
if work_id not in collated_work_ids:
continue
authors = [author for (author,) in cur.execute('SELECT author FROM authors WHERE work = ?', (work_id,))]
tags = [tag for (tag,) in cur.execute('SELECT tag FROM tags WHERE work = ?', (work_id,))]
images = [path.name for path in (site_dir / 'images' / work_id).iterdir()]
images.sort()
try:
thumbnail_path = relpath(next(
f for f in (site_dir / 'thumbnails').iterdir() if f.stem == work_id
), site_dir)
except StopIteration:
thumbnail_path = f'images/{work_id}/{images[0]}'
work = {
'id': work_id,
'title': title,
'circle': circle,
'date': date,
'description': description,
'series': series,
'authors': authors,
'tags': tags,
'thumbnail_path': thumbnail_path,
}
works.append(work)
work_dir = site_dir / 'works' / work_id
viewer_dir = work_dir / 'view'
viewer_dir.mkdir(parents=True, exist_ok=True)
with open(work_dir / 'index.html', 'w') as f:
f.write(work_template.render(depth=2, work=work, title=title, images=images))
with open(viewer_dir / 'index.html', 'w') as f:
f.write(viewer_template.render(depth=3, work=work, title=title, images=images))
def make_categorization(categorization, query, work_filter, work_style_cards=False):
categorization_dir = site_dir / categorization
cats = [cat for (cat,) in cur.execute(query)]
cat_samples = {}
for cat in cats:
cat_works = list(filter(work_filter(cat), works))
cat_samples[cat] = cat_works[0] if len(cat_works) > 0 else None
safeish_cat = cat.replace('/', ' ')
cat_dir = categorization_dir / safeish_cat
cat_dir.mkdir(parents=True, exist_ok=True)
with open(cat_dir / 'index.html', 'w') as f:
f.write(list_template.render(
depth=2,
works=cat_works,
title=cat,
categorization=categorization,
))
categorization_dir.mkdir(parents=True, exist_ok=True)
with open(categorization_dir / 'index.html', 'w') as f:
f.write(categorization_template.render(
depth=1,
categorization=categorization,
categories=cats,
samples=cat_samples,
work_style_cards=work_style_cards,
))
make_categorization(
'authors',
'SELECT DISTINCT author FROM authors ORDER BY author',
lambda author: lambda work: author in work['authors'],
)
make_categorization(
'tags',
'SELECT DISTINCT tag FROM tags ORDER BY tag',
lambda tag: lambda work: tag in work['tags'],
)
make_categorization(
'circles',
'SELECT DISTINCT circle FROM works WHERE circle NOT NULL ORDER BY circle',
lambda circle: lambda work: work['circle'] == circle,
)
make_categorization(
'series',
'SELECT DISTINCT series FROM works WHERE series NOT NULL ORDER BY series',
lambda series: lambda work: work['series'] == series,
work_style_cards=True,
)
with resources.as_file(resources.files("dlibrary")) as r:
copy_recursive(r / 'static', site_dir / 'static')
with open(site_dir / 'index.html', 'w') as f:
f.write(index_template.render(depth=0, works=works))
con.close()
argparser = argparse.ArgumentParser(
prog='dlibrary',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
Organize DRM-free works purchased from DLSite into a library
that can be viewed in a web browser.
Intended workflow:
- `extract` a collection of zipfiles downloaded from DLSite
into DLibrary's data directory, giving each work its own
subfolder.
- `fetch` metadata and thumbnail images for extracted works
from DLSite.
- `collate` and/or `manual-collate` extracted works,
producing a single sequence of image files (or symlinks
into the extracted data, when possible) for each work.
- Manually adjust works' `metadata` when necessary.
- `generate` a static website providing a catalog and viewer
for all collated works.
"""),
)
argparser.add_argument(
'-d', '--destdir',
type=Path,
default=Path(os.getenv('DLIBRARY_DIR', './dlibrary')),
help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)',
)
argparser.add_argument(
'-l', '--locale',
type=str,
default=os.getenv('DLIBRARY_LOCALE', 'en_US'),
help=('preferred locale for requesting metadata and collating (e.g. "ja_JP", "en_US"). '
'May still fall back to Japanese if other languages are unavailable. '
'(default: $DLIBRARY_LOCALE or en_US)'),
)
subparsers = argparser.add_subparsers(title="subcommands", required=True)
parser_extract = subparsers.add_parser('extract', aliases=['x', 'ex'], help='extract zipfiles')
parser_extract.add_argument(
'-r', '--remove',
action='store_true',
help='remove original zipfiles after extraction',
)
parser_extract.add_argument(
'zipfiles',
metavar='FILE',
type=Path,
nargs='+',
help='zipfiles to extract',
)
parser_extract.set_defaults(func=extract)
parser_fetch = subparsers.add_parser('fetch', aliases=['f', 'fet'], help='fetch metadata and thumbnails')
parser_fetch.set_defaults(func=fetch)
parser_collate = subparsers.add_parser(
'collate',
aliases=['c', 'co', 'col'],
help='collate each work into a sequence of image files',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
For each extracted work that has not already been collated,
DLibrary will attempt to intuit its structure as follows:
- Enter the work's directory. If the directory contains
nothing except a single subdirectory (ignoring a few types
of files that are definitely not relevant), traverse
downwards repeatedly.
- If the current directory contains nothing except a single
PDF (again, ignoring irrelevant files), attempt to extract
a series of images from the PDF. This process expects that
each page of the PDF consists of a single embedded image,
which will be extracted at full resolution. Support for
more complex PDFs is not yet implemented.
- If the current directory contains nothing except image
files, and the image files are named in a way that clearly
indicates a complete numerical order (each filename
consists of a shared prefix followed by a distinct
number), symlink files in the inferred order.
- Otherwise, skip processing this work for now.
DLibrary can be given "collation hints" which provide
alternative starting points for this search process. A hint
is a path under $DLIBRARY_DIR/extract/[work id]/
indicating a different directory or PDF file to begin the
search process for that work, rather than starting at the
top level of the extracted data. There can be at most one
hint per work; for more complicated scenarios where a work
includes multiple folders that need to be collated together,
or where filenames do not clearly indicate an ordering, use
`manual-collate` instead.
"""),
)
parser_collate.add_argument(
'hints',
metavar='PATH',
type=Path,
nargs='*',
help='paths within extraction folders as collation hints'
)
parser_collate.set_defaults(func=collate)
parser_manual_collate = subparsers.add_parser(
'manual-collate',
aliases=['mc', 'man', 'manual'],
help='collate a single work manually',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
Provide an expression or sequence of expressions specifying groups
of paths to collate or skip. An expression can be:
PATH
A single path. If this is an image, it will be appended to
the sequence of collated images; if this is a PDF, images will be
extracted from it and concatenated to the sequence; if this is a
directory, the contents of the directory will be collated based on
the normal heuristics and concatenated to the sequence.
( PATH [PATH ...] )
A group of paths contained in parentheses. You may need to escape
the parentheses to avoid them getting parsed by your shell.
All the paths in this group will be considered together, and
collated based on the normal heuristics, regardless of what
order the paths are provided in.
! PATH
! ( PATH [PATH ...] )
A path or group of paths to exclude from collation. You may
need to escape the !. If an excluded path appears within any
of the other specified paths, it will be ignored.
If the only expressions provided are negations, then auto-collation
will start from the top level of the extracted work while excluding
the negated paths.
All provided paths must be under $DLIBRARY_DIR/extract/[work id]/
for the work being manually collated. `manual-collate` can
only handle one work at a time.
"""),
)
parser_manual_collate.add_argument(
'expression',
nargs='+',
help='expressions indicating paths to collate or skip',
)
parser_manual_collate.set_defaults(func=manual_collate)
parser_analyze = subparsers.add_parser('analyze', aliases=['a', 'an', 'anal'], help='analyze an extracted folder to assist in collation')
parser_analyze.add_argument('work_id')
parser_analyze.set_defaults(func=analyze)
parser_metadata = subparsers.add_parser('metadata', aliases=['m', 'me', 'meta'], help='view or modify metadata for a work')
parser_metadata.add_argument('work_id')
parser_metadata.add_argument(
'--virtual',
action=argparse.BooleanOptionalAction,
help='set work as virtual',
)
parser_metadata.set_defaults(func=metadata)
parser_generate = subparsers.add_parser(
'generate',
aliases=['g', 'gen'],
help='generate HTML/CSS/JS for library site',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
The static site will be generated under $DLIBRARY_DIR/site/
and can be served by pointing an HTTP server at that
directory. Note that some files inside the static site
hierarchy will be symlinks into $DLIBRARY_DIR/extract/
outside the site hierarchy, so make sure your HTTP server
will allow those symlinks to be read.
"""),
)
parser_generate.set_defaults(func=generate)
def main():
args = argparser.parse_args()
args.func(args)
if __name__ == "__main__":
main()