1005 lines
35 KiB
Python
Executable file
1005 lines
35 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import asyncio
|
|
import importlib_resources as resources
|
|
from pathlib import Path
|
|
import os
|
|
from os.path import relpath, splitext
|
|
import re
|
|
import readline
|
|
import shutil
|
|
import sqlite3
|
|
import textwrap
|
|
from urllib.parse import urlparse
|
|
import zipfile
|
|
|
|
from dlsite_async import DlsiteAPI
|
|
import fitz
|
|
from PIL import Image
|
|
from jinja2 import Environment, PackageLoader, select_autoescape
|
|
import requests
|
|
|
|
NUMBER_REGEX = re.compile('[0-9]+')
|
|
|
|
DLSITE_ID_REGEX = re.compile('^[BR]J[0-9]+$')
|
|
FANZA_ID_REGEX = re.compile('^d_[0-9]+$')
|
|
FAKKU_ID_REGEX = re.compile('.*_FAKKU$')
|
|
|
|
TEXTLESS_REGEX = re.compile('(台詞|セリフ|テキスト|文字)(な|無)し|notext|textless')
|
|
EPILOGUE_REGEX = re.compile('after|後日談')
|
|
HI_RES_REGEX = re.compile('高解像度')
|
|
COVER_REGEX = re.compile('表紙')
|
|
ALT_VERSIONS = [
|
|
'褐色',
|
|
'日焼け',
|
|
'pink',
|
|
'金髪',
|
|
'白肌',
|
|
'うつろ目',
|
|
'dark skin',
|
|
'ラバー',
|
|
'ゾンビ肌',
|
|
'マスク',
|
|
'アヘ顔',
|
|
]
|
|
|
|
IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff']
|
|
|
|
IGNOREABLE_FILES = ['Thumbs.db', '__MACOSX', '.DS_Store']
|
|
IGNOREABLE_EXTENSIONS = ['.txt', '.html', '.htm', '.psd', '.mp4']
|
|
|
|
def open_zipfile_with_encoding(path):
|
|
try:
|
|
return zipfile.ZipFile(path, metadata_encoding="utf-8")
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
try:
|
|
return zipfile.ZipFile(path, metadata_encoding="shift-jis")
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
return zipfile.ZipFile(path, metadata_encoding="shift-jisx0213")
|
|
|
|
def extract(args):
|
|
for zip_path in args.zipfiles:
|
|
work_id = zip_path.stem
|
|
work_extract_path = args.destdir / 'extract' / work_id
|
|
work_extract_path.mkdir(parents=True)
|
|
|
|
print(f'Extracting {zip_path} to {work_extract_path}')
|
|
|
|
with open_zipfile_with_encoding(zip_path) as z:
|
|
z.extractall(path=work_extract_path)
|
|
|
|
if args.remove:
|
|
zip_path.unlink()
|
|
|
|
|
|
def manual_input_metadata(work_id):
|
|
print(f"Don't know how to fetch metadata for {work_id}, input manually:")
|
|
|
|
title = input('Title: ')
|
|
circle = input('Circle [None]: ') or None
|
|
authors = [author.strip() for author in input('Authors (comma-separated): ').split(',') if author.strip()]
|
|
tags = [tag.strip() for tag in input('Tags (comma-separated): ').split(',') if tag.strip()]
|
|
date = input('Pub date (yyyy-mm-dd): ')
|
|
description = input('Description: ')
|
|
series = input('Series [None]: ') or None
|
|
|
|
return {
|
|
"id": work_id,
|
|
"title": title,
|
|
"circle": circle,
|
|
"authors": authors,
|
|
"tags": tags,
|
|
"date": date,
|
|
"description": description,
|
|
"series": series,
|
|
}
|
|
|
|
async def fetch_async(args):
|
|
con = sqlite3.connect(args.destdir / 'meta.db')
|
|
cur = con.cursor()
|
|
|
|
cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)")
|
|
cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id), PRIMARY KEY(author, work))")
|
|
cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id), PRIMARY KEY(tag, work))")
|
|
|
|
thumbnails_dir = args.destdir / 'site' / 'thumbnails'
|
|
thumbnails_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
async with DlsiteAPI(locale=args.locale) as api:
|
|
for work_path in (args.destdir / 'extract').iterdir():
|
|
work_id = work_path.name
|
|
|
|
res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,))
|
|
if res.fetchone() is not None:
|
|
continue
|
|
|
|
if DLSITE_ID_REGEX.fullmatch(work_id):
|
|
print(f'Fetching DLSite metadata for {work_id}')
|
|
dlsite_metadata = await api.get_work(work_id)
|
|
db_row = {
|
|
"id": work_id,
|
|
"title": dlsite_metadata.work_name,
|
|
"circle": dlsite_metadata.circle,
|
|
"date": dlsite_metadata.regist_date.date().isoformat(),
|
|
"description": dlsite_metadata.description,
|
|
"series": dlsite_metadata.series,
|
|
}
|
|
authors = dlsite_metadata.author or []
|
|
tags = dlsite_metadata.genre or []
|
|
thumbnail_url = dlsite_metadata.work_image
|
|
if thumbnail_url.startswith('//'):
|
|
thumbnail_url = 'https:' + thumbnail_url
|
|
else:
|
|
db_row = manual_input_metadata(work_id)
|
|
authors = db_row.pop('authors')
|
|
tags = db_row.pop('tags')
|
|
if FANZA_ID_REGEX.fullmatch(work_id):
|
|
thumbnail_url = f'https://doujin-assets.dmm.co.jp/digital/comic/{work_id}/{work_id}pl.jpg'
|
|
elif FAKKU_ID_REGEX.fullmatch(work_id):
|
|
thumbnail_url = None
|
|
else:
|
|
thumbnail_url = input('Thumbnail image URL [default: first page]: ')
|
|
|
|
cur.execute(
|
|
"INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)",
|
|
db_row,
|
|
)
|
|
cur.executemany(
|
|
"INSERT INTO authors VALUES(:author, :work)",
|
|
[{ "author": author, "work": work_id } for author in authors],
|
|
)
|
|
cur.executemany(
|
|
"INSERT INTO tags VALUES(:tag, :work)",
|
|
[{ "tag": tag, "work": work_id } for tag in tags],
|
|
)
|
|
|
|
if thumbnail_url:
|
|
ext = url_file_ext(thumbnail_url)
|
|
dest_file = thumbnails_dir / (work_id + ext)
|
|
print(f'Downloading thumbnail for {work_id} from {thumbnail_url}')
|
|
with open(dest_file, 'wb') as fd:
|
|
with requests.get(thumbnail_url, stream=True) as r:
|
|
for chunk in r.iter_content(chunk_size=16384):
|
|
fd.write(chunk)
|
|
|
|
con.commit()
|
|
|
|
con.close()
|
|
|
|
def url_file_ext(url):
|
|
return splitext(urlparse(url).path)[1]
|
|
|
|
def fetch(args):
|
|
asyncio.run(fetch_async(args))
|
|
|
|
|
|
def image_xrefs(pdf):
|
|
images_by_page = [page.get_images() for page in pdf]
|
|
if all(len(images) == 1 for images in images_by_page):
|
|
return [images[0][0] for images in images_by_page]
|
|
|
|
print("Checking PDF images the quick way failed, trying the slow way")
|
|
xrefs = []
|
|
for (idx, page) in enumerate(pdf):
|
|
print(f'\x1b[2K\r{idx}/{pdf.page_count} pages processed...', end='')
|
|
images = page.get_image_info(xrefs=True)
|
|
if len(images) != 1 or images[0]['xref'] == 0:
|
|
print('\nFailed')
|
|
return None
|
|
xrefs.append(images[0]['xref'])
|
|
|
|
print('\nSuccess')
|
|
return xrefs
|
|
|
|
def link_pdf(src, dest, start_index):
|
|
with fitz.open(src) as pdf:
|
|
xrefs = image_xrefs(pdf)
|
|
if xrefs is None:
|
|
print(f'Support for weirder PDFs not yet implemented, skipping {src}')
|
|
return None
|
|
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
for (idx, xref) in enumerate(xrefs, start=start_index):
|
|
image = pdf.extract_image(xref)
|
|
file_path = dest / f'{idx:04d}.{image["ext"]}'
|
|
with open(file_path, 'wb') as f:
|
|
f.write(image["image"])
|
|
|
|
return pdf.page_count
|
|
|
|
def complete_prefix_number_ordering(entries):
|
|
if len(entries) == 1:
|
|
return entries
|
|
|
|
entries_by_version = {}
|
|
for entry in entries:
|
|
version_code = 0
|
|
for (i, version) in enumerate(ALT_VERSIONS):
|
|
if version in entry.name:
|
|
version_code |= (1 << i)
|
|
entries_by_version.setdefault(version_code, []).append(entry)
|
|
|
|
numberings_by_version = {ver: unique_hierarchical_prefix_numbering(entries_by_version[ver]) for ver in entries_by_version}
|
|
|
|
unified_indices = set()
|
|
for numbering in numberings_by_version.values():
|
|
if numbering is None:
|
|
return None
|
|
unified_indices |= set(numbering.keys())
|
|
unified_indices.discard(None)
|
|
unified_indices = list(unified_indices)
|
|
unified_indices.sort()
|
|
|
|
if len(unified_indices) > 1:
|
|
for i in range(1, len(unified_indices)):
|
|
cur = unified_indices[i]
|
|
prev = unified_indices[i-1]
|
|
for level in range(min(len(cur), len(prev))):
|
|
if cur[level] != prev[level]:
|
|
if cur[level] - prev[level] > 2:
|
|
return None
|
|
break
|
|
|
|
unified_indices.append(None)
|
|
|
|
versions = list(numberings_by_version.keys())
|
|
versions.sort()
|
|
|
|
version_lengths = {ver: len(numberings_by_version[ver]) for ver in numberings_by_version}
|
|
inner_versions = []
|
|
outer_versions = [versions[0]]
|
|
for ver in versions[1:]:
|
|
if version_lengths[ver] >= version_lengths[versions[0]] - 2:
|
|
outer_versions.append(ver)
|
|
else:
|
|
inner_versions.append(ver)
|
|
|
|
result = []
|
|
for out_ver in outer_versions:
|
|
for i in unified_indices:
|
|
for ver in ([out_ver] + (inner_versions if out_ver == versions[0] else [])):
|
|
result += numberings_by_version[ver].get(i, [])
|
|
return result
|
|
|
|
def unique_hierarchical_prefix_numbering(entries, start_point=0):
|
|
if len(entries) == 1 and not NUMBER_REGEX.search(entries[0].name):
|
|
return {None: entries}
|
|
|
|
longest_entry = max(entries, key=lambda e: len(e.name))
|
|
matches = reversed(list(NUMBER_REGEX.finditer(longest_entry.name)))
|
|
for m in matches:
|
|
pos = m.start()
|
|
if pos < start_point:
|
|
return None
|
|
prefix = longest_entry.name[:pos]
|
|
if all(e.name.startswith(prefix) or prefix.startswith(e.stem) for e in entries):
|
|
numbering = {}
|
|
for e in entries:
|
|
if pos >= len(e.stem):
|
|
i = 0
|
|
else:
|
|
n = NUMBER_REGEX.match(e.name[pos:])
|
|
if n is None:
|
|
return None
|
|
i = int(n.group())
|
|
numbering.setdefault((i,), []).append(e)
|
|
|
|
indices = list(numbering.keys())
|
|
for idx in indices:
|
|
if len(numbering[idx]) > 1:
|
|
ents_idx = numbering.pop(idx)
|
|
longest = max(ents_idx, key=lambda e: len(e.name))
|
|
next_layer_start = pos + NUMBER_REGEX.match(longest.name[pos:]).end()
|
|
sub_numbering = unique_hierarchical_prefix_numbering(ents_idx, start_point=next_layer_start) or alphabetic_numbering(ents_idx, next_layer_start)
|
|
if not sub_numbering:
|
|
return None
|
|
for sub_idx in sub_numbering:
|
|
numbering[(*idx, *sub_idx)] = sub_numbering[sub_idx]
|
|
|
|
return numbering
|
|
|
|
return None
|
|
|
|
def alphabetic_numbering(entries, start_point):
|
|
alphabetized = {}
|
|
for entry in entries:
|
|
ending = entry.stem[start_point:]
|
|
if len(ending) > 1:
|
|
return None
|
|
index = 0 if ending == '' else ord(ending.lower()) - ord('a') + 1
|
|
if (index,) in alphabetized:
|
|
return None
|
|
alphabetized[(index,)] = [entry]
|
|
indices = list(alphabetized.keys())
|
|
indices.sort()
|
|
if indices != [(i,) for i in range(len(indices))]:
|
|
return None
|
|
return alphabetized
|
|
|
|
def link_ordered_files(ordering, dest, start_index):
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
|
|
for (idx, src_path) in enumerate(ordering, start=start_index):
|
|
ext = src_path.suffix.lower()
|
|
link_path = dest / f'{idx:04d}{ext}'
|
|
link_path.symlink_to(relpath(src_path, dest))
|
|
|
|
def check_extension(path, exts):
|
|
return path.suffix.lower() in exts
|
|
|
|
def is_pdf(path):
|
|
return check_extension(path, ['.pdf'])
|
|
|
|
def is_image(path):
|
|
return check_extension(path, IMAGE_FILE_EXTENSIONS)
|
|
|
|
def ignoreable(path):
|
|
return path.name in IGNOREABLE_FILES or check_extension(path, IGNOREABLE_EXTENSIONS)
|
|
|
|
def ls_ignore(directory, exclude):
|
|
return [
|
|
path for path in directory.iterdir()
|
|
if not ignoreable(path) and path not in exclude
|
|
]
|
|
|
|
def descendant_files_ignore(path, exclude):
|
|
if path.is_file():
|
|
return [path]
|
|
|
|
result = []
|
|
for item in ls_ignore(path, exclude):
|
|
if item.is_dir():
|
|
result.extend(descendant_files_ignore(item, exclude))
|
|
else:
|
|
result.append(item)
|
|
|
|
return result
|
|
|
|
def collate(args):
|
|
con = sqlite3.connect(args.destdir / 'meta.db')
|
|
cur = con.cursor()
|
|
|
|
extraction_dir = args.destdir / 'extract'
|
|
hint_map = {Path(relpath(hint, extraction_dir)).parents[-2].name: hint for hint in args.hints}
|
|
|
|
collation_staging_area = args.destdir / 'site' / 'images-staging'
|
|
collation_staging_area.mkdir(parents=True)
|
|
|
|
for work_path in extraction_dir.iterdir():
|
|
work_id = work_path.name
|
|
|
|
collation_dir = args.destdir / 'site' / 'images' / work_id
|
|
if collation_dir.exists():
|
|
continue
|
|
|
|
virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
|
|
if virtual == (1,):
|
|
continue
|
|
|
|
work_staging_dir = collation_staging_area / work_id
|
|
|
|
pages_collated = collate_from_paths([hint_map.get(work_id, work_path)], work_staging_dir, 0, [])
|
|
if pages_collated:
|
|
print(f'Collated {pages_collated} pages for {work_id}')
|
|
work_staging_dir.rename(collation_dir)
|
|
else:
|
|
if work_staging_dir.is_dir():
|
|
for f in work_staging_dir.iterdir():
|
|
f.unlink()
|
|
work_staging_dir.rmdir()
|
|
|
|
if pages_collated == 0:
|
|
print(f'{work_id} contains no files? skipping')
|
|
elif pages_collated is None:
|
|
print(f'Unable to deduce file structure for {work_id}, skipping')
|
|
|
|
collation_staging_area.rmdir()
|
|
con.close()
|
|
|
|
def try_collate_split_regex(srcs, dest, start_index, exclude, earlier=None, later=None):
|
|
early_srcs = []
|
|
middle_srcs = []
|
|
late_srcs = []
|
|
for src in srcs:
|
|
if earlier and earlier.search(src.name):
|
|
early_srcs.append(src)
|
|
elif later and later.search(src.name):
|
|
late_srcs.append(src)
|
|
else:
|
|
middle_srcs.append(src)
|
|
|
|
if sum(1 for l in [early_srcs, middle_srcs, late_srcs] if l) <= 1:
|
|
return False
|
|
|
|
early_page_count = collate_from_paths(early_srcs, dest, start_index, exclude)
|
|
if early_page_count is None:
|
|
return None
|
|
start_index += early_page_count
|
|
|
|
middle_page_count = collate_from_paths(middle_srcs, dest, start_index, exclude)
|
|
if middle_page_count is None:
|
|
return None
|
|
start_index += middle_page_count
|
|
|
|
late_page_count = collate_from_paths(late_srcs, dest, start_index, exclude)
|
|
if late_page_count is None:
|
|
return None
|
|
|
|
return early_page_count + middle_page_count + late_page_count
|
|
|
|
def standalone_image_size(filepath):
|
|
with Image.open(filepath) as im:
|
|
return im.size
|
|
|
|
def pdf_image_sizes(filepath):
|
|
sizes_by_xref = {}
|
|
|
|
with fitz.open(filepath) as pdf:
|
|
for page in pdf:
|
|
for (xref, _, width, height, *_) in page.get_images():
|
|
if xref in sizes_by_xref:
|
|
continue
|
|
sizes_by_xref[xref] = (width, height)
|
|
|
|
return list(sizes_by_xref.values())
|
|
|
|
def median(items):
|
|
if len(items) == 0:
|
|
return None
|
|
|
|
items.sort()
|
|
return items[len(items) // 2]
|
|
|
|
def superior_or_equal(a, b):
|
|
return len(a) >= len(b) and all(a[i] >= b[i] for i in range(len(b)))
|
|
|
|
def try_collate_images_vs_pdf(srcs, dest, start_index, exclude):
|
|
pdfs = [src for src in srcs if 'pdf' in src.name.lower()]
|
|
if len(pdfs) != 1:
|
|
return False
|
|
outer_pdf = pdfs[0]
|
|
|
|
inner_pdfs = [f for f in descendant_files_ignore(outer_pdf, exclude) if is_pdf(f)]
|
|
if len(inner_pdfs) != 1:
|
|
return False
|
|
inner_pdf = inner_pdfs[0]
|
|
|
|
non_pdf_srcs = [src for src in srcs if src != outer_pdf]
|
|
images = []
|
|
non_images = []
|
|
descendant_files = [f for src in non_pdf_srcs for f in descendant_files_ignore(src, exclude)]
|
|
for f in descendant_files:
|
|
if is_image(f):
|
|
images.append(f)
|
|
else:
|
|
non_images.append(f)
|
|
break
|
|
|
|
if len(non_images) != 0 or len(images) == 0:
|
|
return False
|
|
|
|
pdf_sizes = pdf_image_sizes(inner_pdf)
|
|
standalone_sizes = [standalone_image_size(f) for f in images]
|
|
if abs(len(pdf_sizes) - len(standalone_sizes)) > 2:
|
|
return False
|
|
|
|
median_pdf_size = median(pdf_sizes)
|
|
median_standalone_size = median(standalone_sizes)
|
|
if not (median_pdf_size and median_standalone_size):
|
|
return False
|
|
|
|
if superior_or_equal(median_standalone_size, median_pdf_size):
|
|
return collate_from_paths(non_pdf_srcs, dest, start_index, exclude)
|
|
elif superior_or_equal(median_pdf_size, median_standalone_size):
|
|
return collate_from_paths([outer_pdf], dest, start_index, exclude)
|
|
else:
|
|
return False
|
|
|
|
def collate_from_paths(srcs, dest, start_index, exclude):
|
|
if len(srcs) == 1 and srcs[0].is_dir():
|
|
return collate_from_paths(ls_ignore(srcs[0], exclude), dest, start_index, exclude)
|
|
|
|
if len(srcs) == 1 and is_pdf(srcs[0]):
|
|
print(f'Extracting images from {srcs[0]}')
|
|
return link_pdf(srcs[0], dest, start_index)
|
|
|
|
if len(srcs) == 0:
|
|
return 0
|
|
|
|
if len(srcs) == 2 and all(src.is_dir() for src in srcs):
|
|
hi_res_dirs = [src for src in srcs if HI_RES_REGEX.search(src.name)]
|
|
if len(hi_res_dirs) == 1:
|
|
hi_res_dir = hi_res_dirs[0]
|
|
lo_res_dir = next(src for src in srcs if src != hi_res_dir)
|
|
if len(descendant_files_ignore(lo_res_dir, exclude)) == len(descendant_files_ignore(hi_res_dir, exclude)):
|
|
return collate_from_paths([hi_res_dir], dest, start_index, exclude)
|
|
|
|
textless_split = try_collate_split_regex(srcs, dest, start_index, exclude, later=TEXTLESS_REGEX)
|
|
if textless_split != False:
|
|
return textless_split
|
|
|
|
epilogue_split = try_collate_split_regex(srcs, dest, start_index, exclude, later=EPILOGUE_REGEX)
|
|
if epilogue_split != False:
|
|
return epilogue_split
|
|
|
|
cover_split = try_collate_split_regex(srcs, dest, start_index, exclude, earlier=COVER_REGEX)
|
|
if cover_split != False:
|
|
return cover_split
|
|
|
|
if all(src.is_file() and is_image(src) for src in srcs):
|
|
ordering = complete_prefix_number_ordering(srcs)
|
|
if ordering:
|
|
print(f'Symlinking image files: {ordering[0]}...')
|
|
link_ordered_files(ordering, dest, start_index)
|
|
return len(ordering)
|
|
else:
|
|
return None
|
|
|
|
images_vs_pdf = try_collate_images_vs_pdf(srcs, dest, start_index, exclude)
|
|
if images_vs_pdf != False:
|
|
return images_vs_pdf
|
|
|
|
return None
|
|
|
|
def self_and_parents(path):
|
|
return [path] + list(path.parents)
|
|
|
|
def parse_expressions(tokens):
|
|
groups = []
|
|
exclusions = []
|
|
|
|
while tokens:
|
|
token = tokens.pop(0)
|
|
if token == '!':
|
|
exclusions.extend(parse_exclusion(tokens))
|
|
elif token == '(':
|
|
groups.append(parse_group(tokens))
|
|
else:
|
|
groups.append([token])
|
|
|
|
return (groups, exclusions)
|
|
|
|
def parse_exclusion(tokens):
|
|
token = tokens.pop(0)
|
|
|
|
if token == '(':
|
|
return parse_group(tokens)
|
|
else:
|
|
return [token]
|
|
|
|
def parse_group(tokens):
|
|
items = []
|
|
|
|
while True:
|
|
token = tokens.pop(0)
|
|
if token == ')':
|
|
return items
|
|
else:
|
|
items.append(token)
|
|
|
|
def normalize_to(path, ref):
|
|
return ref / Path(relpath(path, ref))
|
|
|
|
def manual_collate(args):
|
|
(raw_groups, raw_exclusions) = parse_expressions(args.expression)
|
|
|
|
extraction_dir = args.destdir / 'extract'
|
|
|
|
sample_path = next(path for group in (raw_groups + [raw_exclusions]) for path in group)
|
|
work_id = Path(relpath(sample_path, extraction_dir)).parents[-2].name
|
|
|
|
exclusions = [normalize_to(item, args.destdir) for item in raw_exclusions]
|
|
|
|
if raw_groups:
|
|
groups = [[normalize_to(item, args.destdir) for item in group] for group in raw_groups]
|
|
else:
|
|
groups = [[extraction_dir / work_id]]
|
|
|
|
collation_dir = args.destdir / 'site' / 'images' / work_id
|
|
if collation_dir.exists():
|
|
if len(list(collation_dir.iterdir())) > 0:
|
|
print(f'Collation directory already exists!')
|
|
return
|
|
else:
|
|
collation_dir.rmdir()
|
|
|
|
nonexistent = [path for group in (groups + [exclusions]) for path in group if not path.exists()]
|
|
if len(nonexistent) > 0:
|
|
print(f'Nonexistent paths: {nonexistent}')
|
|
return
|
|
|
|
collation_staging_area = args.destdir / 'site' / 'images-staging'
|
|
work_staging_dir = collation_staging_area / work_id
|
|
work_staging_dir.mkdir(parents=True)
|
|
|
|
pages_collated = 0
|
|
for group in groups:
|
|
pages_added = collate_from_paths(
|
|
[item for item in group if item not in exclusions],
|
|
work_staging_dir,
|
|
pages_collated,
|
|
exclusions,
|
|
)
|
|
if pages_added is None:
|
|
print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}')
|
|
pages_collated = None
|
|
break
|
|
|
|
pages_collated += pages_added
|
|
|
|
if pages_collated:
|
|
print(f'Collated {pages_collated} pages for {work_id}')
|
|
work_staging_dir.rename(collation_dir)
|
|
else:
|
|
for f in work_staging_dir.iterdir():
|
|
f.unlink()
|
|
work_staging_dir.rmdir()
|
|
|
|
if pages_collated == 0:
|
|
print(f'No files found for {work_id}')
|
|
|
|
collation_staging_area.rmdir()
|
|
|
|
|
|
def fmt_size(s):
|
|
return f'{s[0]}x{s[1]}px'
|
|
|
|
def analyze(args):
|
|
extract_dir = args.destdir / 'extract'
|
|
files = descendant_files_ignore(extract_dir / args.work_id, [])
|
|
files.sort()
|
|
|
|
for f in files:
|
|
print(f'{relpath(f, extract_dir)}', end='')
|
|
if is_image(f):
|
|
size = standalone_image_size(f)
|
|
print(f'\t{fmt_size(size)}')
|
|
elif is_pdf(f):
|
|
sizes = pdf_image_sizes(f)
|
|
if len(sizes) == 0:
|
|
print(f'\tContains no images')
|
|
else:
|
|
print(f'\t{len(sizes)} images, median {fmt_size(median(sizes))}, min {fmt_size(min(sizes))}, max {fmt_size(max(sizes))}')
|
|
else:
|
|
print()
|
|
|
|
def metadata(args):
|
|
con = sqlite3.connect(args.destdir / 'meta.db')
|
|
cur = con.cursor()
|
|
|
|
if args.virtual is not None:
|
|
cur.execute("UPDATE works SET virtual = ? WHERE id = ?", (1 if args.virtual else 0, args.work_id))
|
|
con.commit()
|
|
|
|
res = cur.execute(
|
|
"SELECT title, circle, date, description, series, virtual FROM works WHERE id = ?",
|
|
(args.work_id,),
|
|
).fetchone()
|
|
|
|
if res is None:
|
|
print(f'Work id {args.work_id} not found!')
|
|
return
|
|
|
|
(title, circle, date, description, series, virtual) = res
|
|
print(f'Work ID: {args.work_id}')
|
|
print(f'Title: {title}')
|
|
print(f'Circle: {circle}')
|
|
print(f'Pub date: {date}')
|
|
print(f'Description: {description}')
|
|
print(f'Series: {series}')
|
|
print(f'Virtual: {"Yes" if virtual == 1 else "No"}')
|
|
|
|
con.close()
|
|
|
|
def copy_recursive(src, dest):
|
|
dest.mkdir(parents=True, exist_ok=True)
|
|
for item in src.iterdir():
|
|
if item.is_dir() and not item.is_symlink():
|
|
copy_recursive(item, dest / item.name)
|
|
else:
|
|
shutil.copyfile(item, dest / item.name)
|
|
|
|
def generate(args):
|
|
jenv = Environment(
|
|
loader=PackageLoader("dlibrary"),
|
|
autoescape=select_autoescape()
|
|
)
|
|
viewer_template = jenv.get_template("viewer.html")
|
|
list_template = jenv.get_template("list.html")
|
|
categorization_template = jenv.get_template("categorization.html")
|
|
work_template = jenv.get_template("work.html")
|
|
index_template = jenv.get_template("index.html")
|
|
|
|
con = sqlite3.connect(args.destdir / 'meta.db')
|
|
cur = con.cursor()
|
|
|
|
site_dir = args.destdir / 'site'
|
|
|
|
collated_work_ids = {p.name for p in (site_dir / 'images').iterdir()}
|
|
|
|
actual_series = {series for (series,) in cur.execute('SELECT series FROM works GROUP BY series HAVING count(series) > 1')}
|
|
|
|
works = []
|
|
for (work_id, title, circle, date, description, series) in cur.execute('SELECT id, title, circle, date, description, series FROM works ORDER BY date DESC').fetchall():
|
|
if work_id not in collated_work_ids:
|
|
continue
|
|
authors = [author for (author,) in cur.execute('SELECT author FROM authors WHERE work = ?', (work_id,))]
|
|
tags = [tag for (tag,) in cur.execute('SELECT tag FROM tags WHERE work = ?', (work_id,))]
|
|
|
|
images = [path.name for path in (site_dir / 'images' / work_id).iterdir()]
|
|
images.sort()
|
|
|
|
try:
|
|
thumbnail_path = relpath(next(
|
|
f for f in (site_dir / 'thumbnails').iterdir() if f.stem == work_id
|
|
), site_dir)
|
|
except StopIteration:
|
|
thumbnail_path = f'images/{work_id}/{images[0]}'
|
|
work = {
|
|
'id': work_id,
|
|
'title': title,
|
|
'circle': circle,
|
|
'date': date,
|
|
'description': description,
|
|
'series': series,
|
|
'authors': authors,
|
|
'tags': tags,
|
|
'thumbnail_path': thumbnail_path,
|
|
}
|
|
works.append(work)
|
|
|
|
work_dir = site_dir / 'works' / work_id
|
|
viewer_dir = work_dir / 'view'
|
|
viewer_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(work_dir / 'index.html', 'w') as f:
|
|
f.write(work_template.render(depth=2, work=work, title=title, images=images))
|
|
with open(viewer_dir / 'index.html', 'w') as f:
|
|
f.write(viewer_template.render(depth=3, work=work, title=title, images=images))
|
|
|
|
def make_categorization(categorization, query, work_filter, work_style_cards=False):
|
|
categorization_dir = site_dir / categorization
|
|
|
|
cats = [cat for (cat,) in cur.execute(query)]
|
|
cat_samples = {}
|
|
for cat in cats:
|
|
cat_works = list(filter(work_filter(cat), works))
|
|
cat_samples[cat] = cat_works[0] if len(cat_works) > 0 else None
|
|
|
|
safeish_cat = cat.replace('/', ' ')
|
|
cat_dir = categorization_dir / safeish_cat
|
|
cat_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(cat_dir / 'index.html', 'w') as f:
|
|
f.write(list_template.render(
|
|
depth=2,
|
|
works=cat_works,
|
|
title=cat,
|
|
categorization=categorization,
|
|
))
|
|
|
|
categorization_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(categorization_dir / 'index.html', 'w') as f:
|
|
f.write(categorization_template.render(
|
|
depth=1,
|
|
categorization=categorization,
|
|
categories=cats,
|
|
samples=cat_samples,
|
|
work_style_cards=work_style_cards,
|
|
))
|
|
|
|
make_categorization(
|
|
'authors',
|
|
'SELECT DISTINCT author FROM authors ORDER BY author',
|
|
lambda author: lambda work: author in work['authors'],
|
|
)
|
|
make_categorization(
|
|
'tags',
|
|
'SELECT DISTINCT tag FROM tags ORDER BY tag',
|
|
lambda tag: lambda work: tag in work['tags'],
|
|
)
|
|
make_categorization(
|
|
'circles',
|
|
'SELECT DISTINCT circle FROM works WHERE circle NOT NULL ORDER BY circle',
|
|
lambda circle: lambda work: work['circle'] == circle,
|
|
)
|
|
make_categorization(
|
|
'series',
|
|
'SELECT DISTINCT series FROM works WHERE series NOT NULL ORDER BY series',
|
|
lambda series: lambda work: work['series'] == series,
|
|
work_style_cards=True,
|
|
)
|
|
|
|
with resources.as_file(resources.files("dlibrary")) as r:
|
|
copy_recursive(r / 'static', site_dir / 'static')
|
|
|
|
with open(site_dir / 'index.html', 'w') as f:
|
|
f.write(index_template.render(depth=0, works=works))
|
|
|
|
con.close()
|
|
|
|
|
|
argparser = argparse.ArgumentParser(
|
|
prog='dlibrary',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=textwrap.dedent("""\
|
|
Organize DRM-free works purchased from DLSite into a library
|
|
that can be viewed in a web browser.
|
|
|
|
Intended workflow:
|
|
- `extract` a collection of zipfiles downloaded from DLSite
|
|
into DLibrary's data directory, giving each work its own
|
|
subfolder.
|
|
- `fetch` metadata and thumbnail images for extracted works
|
|
from DLSite.
|
|
- `collate` and/or `manual-collate` extracted works,
|
|
producing a single sequence of image files (or symlinks
|
|
into the extracted data, when possible) for each work.
|
|
- Manually adjust works' `metadata` when necessary.
|
|
- `generate` a static website providing a catalog and viewer
|
|
for all collated works.
|
|
"""),
|
|
)
|
|
|
|
argparser.add_argument(
|
|
'-d', '--destdir',
|
|
type=Path,
|
|
default=Path(os.getenv('DLIBRARY_DIR', './dlibrary')),
|
|
help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)',
|
|
)
|
|
subparsers = argparser.add_subparsers(title="subcommands", required=True)
|
|
|
|
parser_extract = subparsers.add_parser('extract', help='extract zipfiles')
|
|
parser_extract.add_argument(
|
|
'-r', '--remove',
|
|
action='store_true',
|
|
help='remove original zipfiles after extraction',
|
|
)
|
|
parser_extract.add_argument(
|
|
'zipfiles',
|
|
metavar='FILE',
|
|
type=Path,
|
|
nargs='+',
|
|
help='zipfiles to extract',
|
|
)
|
|
parser_extract.set_defaults(func=extract)
|
|
|
|
parser_fetch = subparsers.add_parser('fetch', help='fetch metadata and thumbnails')
|
|
parser_fetch.add_argument(
|
|
'-l', '--locale',
|
|
type=str,
|
|
default=os.getenv('DLIBRARY_LOCALE', 'en_US'),
|
|
help=('locale to use when requesting metadata (e.g. "ja_JP", "en_US"). '
|
|
'May still fall back to Japanese if metadata in other languages is unavailable. '
|
|
'(default: $DLIBRARY_LOCALE or en_US)'),
|
|
)
|
|
parser_fetch.set_defaults(func=fetch)
|
|
|
|
parser_collate = subparsers.add_parser(
|
|
'collate',
|
|
help='collate each work into a sequence of image files',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=textwrap.dedent("""\
|
|
For each extracted work that has not already been collated,
|
|
DLibrary will attempt to intuit its structure as follows:
|
|
|
|
- Enter the work's directory. If the directory contains
|
|
nothing except a single subdirectory (ignoring a few types
|
|
of files that are definitely not relevant), traverse
|
|
downwards repeatedly.
|
|
- If the current directory contains nothing except a single
|
|
PDF (again, ignoring irrelevant files), attempt to extract
|
|
a series of images from the PDF. This process expects that
|
|
each page of the PDF consists of a single embedded image,
|
|
which will be extracted at full resolution. Support for
|
|
more complex PDFs is not yet implemented.
|
|
- If the current directory contains nothing except image
|
|
files, and the image files are named in a way that clearly
|
|
indicates a complete numerical order (each filename
|
|
consists of a shared prefix followed by a distinct
|
|
number), symlink files in the inferred order.
|
|
- Otherwise, skip processing this work for now.
|
|
|
|
DLibrary can be given "collation hints" which provide
|
|
alternative starting points for this search process. A hint
|
|
is a path under $DLIBRARY_DIR/extract/[work id]/
|
|
indicating a different directory or PDF file to begin the
|
|
search process for that work, rather than starting at the
|
|
top level of the extracted data. There can be at most one
|
|
hint per work; for more complicated scenarios where a work
|
|
includes multiple folders that need to be collated together,
|
|
or where filenames do not clearly indicate an ordering, use
|
|
`manual-collate` instead.
|
|
"""),
|
|
)
|
|
parser_collate.add_argument(
|
|
'hints',
|
|
metavar='PATH',
|
|
type=Path,
|
|
nargs='*',
|
|
help='paths within extraction folders as collation hints'
|
|
)
|
|
parser_collate.set_defaults(func=collate)
|
|
|
|
parser_manual_collate = subparsers.add_parser(
|
|
'manual-collate',
|
|
help='collate a single work manually',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=textwrap.dedent("""\
|
|
Provide an expression or sequence of expressions specifying groups
|
|
of paths to collate or skip. An expression can be:
|
|
|
|
PATH
|
|
A single path. If this is an image, it will be appended to
|
|
the sequence of collated images; if this is a PDF, images will be
|
|
extracted from it and concatenated to the sequence; if this is a
|
|
directory, the contents of the directory will be collated based on
|
|
the normal heuristics and concatenated to the sequence.
|
|
|
|
( PATH [PATH ...] )
|
|
A group of paths contained in parentheses. You may need to escape
|
|
the parentheses to avoid them getting parsed by your shell.
|
|
All the paths in this group will be considered together, and
|
|
collated based on the normal heuristics, regardless of what
|
|
order the paths are provided in.
|
|
|
|
! PATH
|
|
! ( PATH [PATH ...] )
|
|
A path or group of paths to exclude from collation. You may
|
|
need to escape the !. If an excluded path appears within any
|
|
of the other specified paths, it will be ignored.
|
|
|
|
If the only expressions provided are negations, then auto-collation
|
|
will start from the top level of the extracted work while excluding
|
|
the negated paths.
|
|
|
|
All provided paths must be under $DLIBRARY_DIR/extract/[work id]/
|
|
for the work being manually collated. `manual-collate` can
|
|
only handle one work at a time.
|
|
"""),
|
|
)
|
|
parser_manual_collate.add_argument(
|
|
'expression',
|
|
nargs='+',
|
|
help='expressions indicating paths to collate or skip',
|
|
)
|
|
parser_manual_collate.set_defaults(func=manual_collate)
|
|
|
|
parser_analyze = subparsers.add_parser('analyze', help='analyze an extracted folder to assist in collation')
|
|
parser_analyze.add_argument('work_id')
|
|
parser_analyze.set_defaults(func=analyze)
|
|
|
|
parser_metadata = subparsers.add_parser('metadata', help='view or modify metadata for a work')
|
|
parser_metadata.add_argument('work_id')
|
|
parser_metadata.add_argument(
|
|
'--virtual',
|
|
action=argparse.BooleanOptionalAction,
|
|
help='set work as virtual',
|
|
)
|
|
parser_metadata.set_defaults(func=metadata)
|
|
|
|
parser_generate = subparsers.add_parser(
|
|
'generate',
|
|
help='generate HTML/CSS/JS for library site',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
description=textwrap.dedent("""\
|
|
The static site will be generated under $DLIBRARY_DIR/site/
|
|
and can be served by pointing an HTTP server at that
|
|
directory. Note that some files inside the static site
|
|
hierarchy will be symlinks into $DLIBRARY_DIR/extract/
|
|
outside the site hierarchy, so make sure your HTTP server
|
|
will allow those symlinks to be read.
|
|
"""),
|
|
)
|
|
parser_generate.set_defaults(func=generate)
|
|
|
|
def main():
|
|
args = argparser.parse_args()
|
|
args.func(args)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|