dlibrary/dlibrary/dlibrary.py

638 lines
23 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import asyncio
import importlib_resources as resources
from pathlib import Path
from os import getenv
from os.path import relpath, splitext
import re
import shutil
import sqlite3
import textwrap
from urllib.parse import urlparse
import zipfile
from dlsite_async import DlsiteAPI
import fitz
from jinja2 import Environment, PackageLoader, select_autoescape
import requests
NUMBER_REGEX = re.compile('[0-9]+')
DLSITE_ID_REGEX = re.compile('^[BR]J[0-9]+$')
FANZA_ID_REGEX = re.compile('^d_[0-9]+$')
FAKKU_ID_REGEX = re.compile('.*_FAKKU$')
IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff']
IGNOREABLE_FILES = ['Thumbs.db', '__MACOSX', '.DS_Store']
IGNOREABLE_EXTENSIONS = ['.txt', '.html', '.htm']
def open_zipfile_with_encoding(path):
try:
return zipfile.ZipFile(path, metadata_encoding="utf-8")
except UnicodeDecodeError:
pass
try:
return zipfile.ZipFile(path, metadata_encoding="shift-jis")
except UnicodeDecodeError:
pass
return zipfile.ZipFile(path, metadata_encoding="shift-jisx0213")
def extract(args):
for zip_path in args.zipfiles:
work_id = zip_path.stem
work_extract_path = args.destdir / 'extract' / work_id
work_extract_path.mkdir(parents=True)
print(f'Extracting {zip_path} to {work_extract_path}')
with open_zipfile_with_encoding(zip_path) as z:
z.extractall(path=work_extract_path)
if args.remove:
zip_path.unlink()
def manual_input_metadata(work_id):
print(f"Don't know how to fetch metadata for {work_id}, input manually:")
title = input('Title: ')
circle = input('Circle [None]: ') or None
authors = [author.strip() for author in input('Authors (comma-separated): ').split(',') if author.strip()]
tags = [tag.strip() for tag in input('Tags (comma-separated): ').split(',') if tag.strip()]
date = input('Pub date (yyyy-mm-dd): ')
description = input('Description: ')
series = input('Series [None]: ') or None
return {
"id": work_id,
"title": title,
"circle": circle,
"authors": authors,
"tags": tags,
"date": date,
"description": description,
"series": series,
}
async def fetch_async(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)")
cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))")
cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))")
thumbnails_dir = args.destdir / 'site' / 'thumbnails'
thumbnails_dir.mkdir(parents=True, exist_ok=True)
async with DlsiteAPI(locale=args.locale) as api:
for work_path in (args.destdir / 'extract').iterdir():
work_id = work_path.name
res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,))
if res.fetchone() is not None:
continue
if DLSITE_ID_REGEX.fullmatch(work_id):
print(f'Fetching DLSite metadata for {work_id}')
dlsite_metadata = await api.get_work(work_id)
db_row = {
"id": work_id,
"title": dlsite_metadata.work_name,
"circle": dlsite_metadata.circle,
"date": dlsite_metadata.regist_date.date().isoformat(),
"description": dlsite_metadata.description,
"series": dlsite_metadata.series,
}
authors = dlsite_metadata.author or []
tags = dlsite_metadata.genre or []
thumbnail_local = False
thumbnail_url = dlsite_metadata.work_image
if thumbnail_url.startswith('//'):
thumbnail_url = 'https:' + thumbnail_url
else:
db_row = manual_input_metadata(work_id)
authors = db_row.pop('authors')
tags = db_row.pop('tags')
if FANZA_ID_REGEX.fullmatch(work_id):
thumbnail_local = False
thumbnail_url = f'https://doujin-assets.dmm.co.jp/digital/comic/{work_id}/{work_id}pl.jpg'
elif FAKKU_ID_REGEX.fullmatch(work_id):
thumbnail_local = True
thumbnail_path = min(work_path.iterdir())
if not thumbnail_path.is_file():
print(f'Fakku thumbnail path {thumbnail_path} is not a file! Skipping {work_id}')
continue
else:
thumbnail_local = False
thumbnail_url = input('Thumbnail image URL: ')
cur.execute(
"INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)",
db_row,
)
cur.executemany(
"INSERT INTO authors VALUES(:author, :work)",
[{ "author": author, "work": work_id } for author in authors],
)
cur.executemany(
"INSERT INTO tags VALUES(:tag, :work)",
[{ "tag": tag, "work": work_id } for tag in tags],
)
if thumbnail_local:
ext = thumbnail_path.suffix
dest_path = thumbnails_dir / (work_id + ext)
dest_path.symlink_to(relpath(thumbnail_path, thumbnails_dir))
else:
ext = url_file_ext(thumbnail_url)
dest_file = thumbnails_dir / (work_id + ext)
print(f'Downloading thumbnail for {work_id} from {thumbnail_url}')
with open(dest_file, 'wb') as fd:
with requests.get(thumbnail_url, stream=True) as r:
for chunk in r.iter_content(chunk_size=16384):
fd.write(chunk)
con.commit()
con.close()
def url_file_ext(url):
return splitext(urlparse(url).path)[1]
def fetch(args):
asyncio.run(fetch_async(args))
def image_xrefs(pdf):
images_by_page = [page.get_images() for page in pdf]
if all(len(images) == 1 for images in images_by_page):
return [images[0][0] for images in images_by_page]
print("Checking PDF images the quick way failed, trying the slow way")
xrefs = []
for (idx, page) in enumerate(pdf):
print(f'\x1b[2K\r{idx}/{pdf.page_count} pages processed...', end='')
images = page.get_image_info(xrefs=True)
if len(images) != 1 or images[0]['xref'] == 0:
print('\nFailed')
return None
xrefs.append(images[0]['xref'])
print('\nSuccess')
return xrefs
def link_pdf(src, dest, start_index=0):
with fitz.open(src) as pdf:
xrefs = image_xrefs(pdf)
if xrefs is None:
print(f'Support for weirder PDFs not yet implemented, skipping {src}')
return
dest.mkdir(parents=True, exist_ok=True)
for (idx, xref) in enumerate(xrefs, start=start_index):
image = pdf.extract_image(xref)
file_path = dest / f'{idx:04d}.{image["ext"]}'
with open(file_path, 'wb') as f:
f.write(image["image"])
def complete_prefix_number_ordering(entries):
matches = reversed(list(NUMBER_REGEX.finditer(entries[0].name)))
for m in matches:
pos = m.start()
prefix = entries[0].name[:pos]
if all(e.name.startswith(prefix) for e in entries):
entries_with_indices = []
indices = set()
for e in entries:
n = NUMBER_REGEX.match(e.name[pos:])
if n is None:
return None
i = int(n.group())
if i in indices:
return None
indices.add(i)
entries_with_indices.append((e, i))
entries_with_indices.sort(key=lambda ei: ei[1])
return [e for (e, i) in entries_with_indices]
return None
def link_ordered_files(ordering, dest, start_index=0):
dest.mkdir(parents=True, exist_ok=True)
for (idx, src_path) in enumerate(ordering, start=start_index):
ext = src_path.suffix.lower()
link_path = dest / f'{idx:04d}{ext}'
link_path.symlink_to(relpath(src_path, dest))
def ls_ignore(directory):
return [
path for path in directory.iterdir()
if path.name not in IGNOREABLE_FILES and path.suffix.lower() not in IGNOREABLE_EXTENSIONS
]
def collate(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
extraction_dir = args.destdir / 'extract'
hint_map = {hint.relative_to(extraction_dir).parents[-2].name: hint for hint in args.hints}
for work_path in extraction_dir.iterdir():
work_id = work_path.name
collation_dir = args.destdir / 'site' / 'images' / work_id
if collation_dir.exists():
continue
virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
if virtual == (1,):
continue
if work_id in hint_map:
hint = hint_map[work_id]
entries = [hint] if hint.is_file() else ls_ignore(hint)
else:
search_dir = work_path
while True:
entries = ls_ignore(search_dir)
if len(entries) == 1 and entries[0].is_dir():
search_dir = entries[0]
else:
break
if len(entries) == 1 and entries[0].suffix.lower() == '.pdf':
print(f'Extracting images from {entries[0]} for {work_id}')
link_pdf(entries[0], collation_dir)
continue
if len(entries) == 0:
print(f'{work_id} contains no files? skipping')
continue
if all(entry.is_file() and entry.suffix.lower() in IMAGE_FILE_EXTENSIONS for entry in entries):
ordering = complete_prefix_number_ordering(entries)
if ordering:
print(f'Symlinking image files for {work_id}')
link_ordered_files(ordering, collation_dir)
continue
print(f'Unable to deduce file structure for {work_id}, skipping')
con.close()
def self_and_parents(path):
return [path] + list(path.parents)
def manual_collate(args):
work_id = self_and_parents(args.paths[0].relative_to(args.destdir / 'extract'))[-2].name
collation_dir = args.destdir / 'site' / 'images' / work_id
if collation_dir.exists() and len(list(collation_dir.iterdir())) > 0:
print(f'Collation directory already exists!')
return
nonexistent = [path for path in args.paths if not path.exists()]
if len(nonexistent) > 0:
print(f'Nonexistent paths: {nonexistent}')
return
collation_dir.mkdir(parents=True, exist_ok=True)
index = 0
for path in args.paths:
if path.is_dir():
entries = [p for p in path.iterdir() if p.suffix.lower() in IMAGE_FILE_EXTENSIONS]
ordering = complete_prefix_number_ordering(entries)
if ordering is None:
ordering = entries
ordering.sort()
link_ordered_files(ordering, collation_dir, start_index=index)
index += len(ordering)
elif path.suffix.lower() in IMAGE_FILE_EXTENSIONS:
link_ordered_files([path], collation_dir, start_index=index)
index += 1
elif path.suffix.lower() == ".pdf":
link_pdf(path, collation_dir, start_index=index)
with fitz.open(path) as pdf:
index += pdf.page_count
else:
print(f'Unknown file type {path}, stopping')
return
def metadata(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
if args.virtual is not None:
cur.execute("UPDATE works SET virtual = ? WHERE id = ?", (1 if args.virtual else 0, args.work_id))
con.commit()
res = cur.execute(
"SELECT title, circle, date, description, series, virtual FROM works WHERE id = ?",
(args.work_id,),
).fetchone()
if res is None:
print(f'Work id {args.work_id} not found!')
return
(title, circle, date, description, series, virtual) = res
print(f'Work ID: {args.work_id}')
print(f'Title: {title}')
print(f'Circle: {circle}')
print(f'Pub date: {date}')
print(f'Description: {description}')
print(f'Series: {series}')
print(f'Virtual: {"Yes" if virtual == 1 else "No"}')
con.close()
def copy_recursive(src, dest):
dest.mkdir(parents=True, exist_ok=True)
for item in src.iterdir():
if item.is_dir() and not item.is_symlink():
copy_recursive(item, dest / item.name)
else:
shutil.copyfile(item, dest / item.name)
def generate(args):
jenv = Environment(
loader=PackageLoader("dlibrary"),
autoescape=select_autoescape()
)
viewer_template = jenv.get_template("viewer.html")
list_template = jenv.get_template("list.html")
categorization_template = jenv.get_template("categorization.html")
work_template = jenv.get_template("work.html")
index_template = jenv.get_template("index.html")
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
collated_work_ids = {p.name for p in (args.destdir / 'site' / 'images').iterdir()}
actual_series = {series for (series,) in cur.execute('SELECT series FROM works GROUP BY series HAVING count(series) > 1')}
works = []
for (work_id, title, circle, date, description, series) in cur.execute('SELECT id, title, circle, date, description, series FROM works ORDER BY date DESC').fetchall():
if work_id not in collated_work_ids:
continue
authors = [author for (author,) in cur.execute('SELECT author FROM authors WHERE work = ?', (work_id,))]
tags = [tag for (tag,) in cur.execute('SELECT tag FROM tags WHERE work = ?', (work_id,))]
thumbnail_filename = next(
f for f in (args.destdir / 'site' / 'thumbnails').iterdir() if f.stem == work_id
).name
work = {
'id': work_id,
'title': title,
'circle': circle,
'date': date,
'description': description,
'series': series,
'authors': authors,
'tags': tags,
'thumbnail_filename': thumbnail_filename,
}
works.append(work)
images = [path.name for path in (args.destdir / 'site' / 'images' / work_id).iterdir()]
images.sort()
work_dir = args.destdir / 'site' / 'works' / work_id
viewer_dir = work_dir / 'view'
viewer_dir.mkdir(parents=True, exist_ok=True)
with open(work_dir / 'index.html', 'w') as f:
f.write(work_template.render(depth=2, work=work, title=title, images=images))
with open(viewer_dir / 'index.html', 'w') as f:
f.write(viewer_template.render(depth=3, work=work, title=title, images=images))
def make_categorization(categorization, query, work_filter, work_style_cards=False):
categorization_dir = args.destdir / 'site' / categorization
cats = [cat for (cat,) in cur.execute(query)]
cat_samples = {}
for cat in cats:
cat_works = list(filter(work_filter(cat), works))
cat_samples[cat] = cat_works[0] if len(cat_works) > 0 else None
safeish_cat = cat.replace('/', ' ')
cat_dir = categorization_dir / safeish_cat
cat_dir.mkdir(parents=True, exist_ok=True)
with open(cat_dir / 'index.html', 'w') as f:
f.write(list_template.render(
depth=2,
works=cat_works,
title=cat,
categorization=categorization,
))
categorization_dir.mkdir(parents=True, exist_ok=True)
with open(categorization_dir / 'index.html', 'w') as f:
f.write(categorization_template.render(
depth=1,
categorization=categorization,
categories=cats,
samples=cat_samples,
work_style_cards=work_style_cards,
))
make_categorization(
'authors',
'SELECT DISTINCT author FROM authors ORDER BY author',
lambda author: lambda work: author in work['authors'],
)
make_categorization(
'tags',
'SELECT DISTINCT tag FROM tags ORDER BY tag',
lambda tag: lambda work: tag in work['tags'],
)
make_categorization(
'circles',
'SELECT DISTINCT circle FROM works WHERE circle NOT NULL ORDER BY circle',
lambda circle: lambda work: work['circle'] == circle,
)
make_categorization(
'series',
'SELECT DISTINCT series FROM works WHERE series NOT NULL ORDER BY series',
lambda series: lambda work: work['series'] == series,
work_style_cards=True,
)
with resources.as_file(resources.files("dlibrary")) as r:
copy_recursive(r / 'static', args.destdir / 'site' / 'static')
with open(args.destdir / 'site' / 'index.html', 'w') as f:
f.write(index_template.render(depth=0, works=works))
con.close()
argparser = argparse.ArgumentParser(
prog='dlibrary',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
Organize DRM-free works purchased from DLSite into a library
that can be viewed in a web browser.
Intended workflow:
- `extract` a collection of zipfiles downloaded from DLSite
into DLibrary's data directory, giving each work its own
subfolder.
- `fetch` metadata and thumbnail images for extracted works
from DLSite.
- `collate` and/or `manual-collate` extracted works,
producing a single sequence of image files (or symlinks
into the extracted data, when possible) for each work.
- Manually adjust works' `metadata` when necessary.
- `generate` a static website providing a catalog and viewer
for all collated works.
"""),
)
argparser.add_argument(
'-d', '--destdir',
type=Path,
default=Path(getenv('DLIBRARY_DIR', './dlibrary')),
help='directory to store dlibrary content and metadata to (default: $DLIBRARY_DIR or ./dlibrary)',
)
subparsers = argparser.add_subparsers(title="subcommands", required=True)
parser_extract = subparsers.add_parser('extract', help='extract zipfiles')
parser_extract.add_argument(
'-r', '--remove',
action='store_true',
help='remove original zipfiles after extraction',
)
parser_extract.add_argument(
'zipfiles',
metavar='FILE',
type=Path,
nargs='+',
help='zipfiles to extract',
)
parser_extract.set_defaults(func=extract)
parser_fetch = subparsers.add_parser('fetch', help='fetch metadata and thumbnails')
parser_fetch.add_argument(
'-l', '--locale',
type=str,
default=getenv('DLIBRARY_LOCALE', 'en_US'),
help=('locale to use when requesting metadata (e.g. "ja_JP", "en_US"). '
'May still fall back to Japanese if metadata in other languages is unavailable. '
'(default: $DLIBRARY_LOCALE or en_US)'),
)
parser_fetch.set_defaults(func=fetch)
parser_collate = subparsers.add_parser(
'collate',
help='collate each work into a sequence of image files',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
For each extracted work that has not already been collated,
DLibrary will attempt to intuit its structure as follows:
- Enter the work's directory. If the directory contains
nothing except a single subdirectory (ignoring a few types
of files that are definitely not relevant), traverse
downwards repeatedly.
- If the current directory contains nothing except a single
PDF (again, ignoring irrelevant files), attempt to extract
a series of images from the PDF. This process expects that
each page of the PDF consists of a single embedded image,
which will be extracted at full resolution. Support for
more complex PDFs is not yet implemented.
- If the current directory contains nothing except image
files, and the image files are named in a way that clearly
indicates a complete numerical order (each filename
consists of a shared prefix followed by a distinct
number), symlink files in the inferred order.
- Otherwise, skip processing this work for now.
DLibrary can be given "collation hints" which provide
alternative starting points for this search process. A hint
is a path under $DLIBRARY_DIR/extract/[work id]/
indicating a different directory or PDF file to begin the
search process for that work, rather than starting at the
top level of the extracted data. There can be at most one
hint per work; for more complicated scenarios where a work
includes multiple folders that need to be collated together,
or where filenames do not clearly indicate an ordering, use
`manual-collate` instead.
"""),
)
parser_collate.add_argument(
'hints',
metavar='PATH',
type=Path,
nargs='*',
help='paths within extraction folders as collation hints'
)
parser_collate.set_defaults(func=collate)
parser_manual_collate = subparsers.add_parser(
'manual-collate',
help='collate a single work manually',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
All provided paths must be under $DLIBRARY_DIR/extract/[work id]/
for the work being manually collated. `manual-collate` can
only handle one work at a time. Paths are used as follows:
- If a path is a directory, all *image files* immediately
inside that directory will be appended to the sequence. If
files are named in a way which indicates a clear ordering,
that ordering will be used. Otherwise, filenames will be
sorted lexicographically. Non-image files and
subdirectories will be ignored.
- If a path is an image file, that image file will be
appended to the sequence.
- If a path is a PDF file, page images will be extracted
from that PDF and appended to the sequence.
"""),
)
parser_manual_collate.add_argument(
'paths',
metavar='PATH',
type=Path,
nargs='+',
help='paths within a single work to be collated in sequence',
)
parser_manual_collate.set_defaults(func=manual_collate)
parser_metadata = subparsers.add_parser('metadata', help='view or modify metadata for a work')
parser_metadata.add_argument('work_id')
parser_metadata.add_argument(
'--virtual',
action=argparse.BooleanOptionalAction,
help='set work as virtual',
)
parser_metadata.set_defaults(func=metadata)
parser_generate = subparsers.add_parser(
'generate',
help='generate HTML/CSS/JS for library site',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
The static site will be generated under $DLIBRARY_DIR/site/
and can be served by pointing an HTTP server at that
directory. Note that some files inside the static site
hierarchy will be symlinks into $DLIBRARY_DIR/extract/
outside the site hierarchy, so make sure your HTTP server
will allow those symlinks to be read.
"""),
)
parser_generate.set_defaults(func=generate)
def main():
args = argparser.parse_args()
args.func(args)
if __name__ == "__main__":
main()