dlibrary/dlibrary/dlibrary.py

419 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import asyncio
import importlib_resources as resources
from pathlib import Path
from os.path import relpath, splitext
import re
import shutil
import sqlite3
from urllib.parse import urlparse
import zipfile
from dlsite_async import DlsiteAPI
import fitz
from jinja2 import Environment, PackageLoader, select_autoescape
import requests
NUMBER_REGEX = re.compile('[0-9]+')
IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff']
IGNOREABLE_FILES = ['Thumbs.db', '__MACOSX', '.DS_Store']
IGNOREABLE_EXTENSIONS = ['.txt', '.html', '.htm']
def open_zipfile_with_encoding(path):
try:
return zipfile.ZipFile(path, metadata_encoding="utf-8")
except UnicodeDecodeError:
pass
try:
return zipfile.ZipFile(path, metadata_encoding="shift-jis")
except UnicodeDecodeError:
pass
return zipfile.ZipFile(path, metadata_encoding="shift-jisx0213")
def extract(args):
for zip_path in args.zipfiles:
work_id = zip_path.stem
work_extract_path = args.destdir / 'extract' / work_id
work_extract_path.mkdir(parents=True)
with open_zipfile_with_encoding(zip_path) as z:
z.extractall(path=work_extract_path)
if args.remove:
zip_path.unlink()
async def fetch_async(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)")
cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))")
cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))")
thumbnails_dir = args.destdir / 'site' / 'thumbnails'
thumbnails_dir.mkdir(parents=True, exist_ok=True)
async with DlsiteAPI() as api:
for work_path in (args.destdir / 'extract').iterdir():
work_id = work_path.name
res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,))
if res.fetchone() is not None:
continue
print(f'Fetching metadata for {work_id}')
metadata = await api.get_work(work_id)
cur.execute(
"INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)",
{
"id": work_id,
"title": metadata.work_name,
"circle": metadata.circle,
"date": metadata.regist_date.date().isoformat(),
"description": metadata.description,
"series": metadata.series,
},
)
cur.executemany(
"INSERT INTO authors VALUES(:author, :work)",
[{ "author": author, "work": work_id } for author in (metadata.author or [])],
)
cur.executemany(
"INSERT INTO tags VALUES(:tag, :work)",
[{ "tag": tag, "work": work_id } for tag in (metadata.genre or [])],
)
thumbnail_url = metadata.work_image
if thumbnail_url.startswith('//'):
thumbnail_url = 'https:' + thumbnail_url
ext = url_file_ext(thumbnail_url)
dest_file = thumbnails_dir / (work_id + ext)
print(f'Downloading thumbnail for {work_id} from {thumbnail_url}')
with open(dest_file, 'wb') as fd:
with requests.get(thumbnail_url, stream=True) as r:
for chunk in r.iter_content(chunk_size=16384):
fd.write(chunk)
con.commit()
con.close()
def url_file_ext(url):
return splitext(urlparse(url).path)[1]
def fetch(args):
asyncio.run(fetch_async(args))
def image_xrefs(pdf):
images_by_page = [page.get_images() for page in pdf]
if all(len(images) == 1 for images in images_by_page):
return [images[0][0] for images in images_by_page]
print("Checking PDF images the quick way failed, trying the slow way")
xrefs = []
for (idx, page) in enumerate(pdf):
print(f'\x1b[2K\r{idx}/{pdf.page_count} pages processed...', end='')
images = page.get_image_info(xrefs=True)
if len(images) != 1 or images[0]['xref'] == 0:
print('\nFailed')
return None
xrefs.append(images[0]['xref'])
print('\nSuccess')
return xrefs
def link_pdf(src, dest, start_index=0):
with fitz.open(src) as pdf:
xrefs = image_xrefs(pdf)
if xrefs is None:
print(f'Support for weirder PDFs not yet implemented, skipping {src}')
return
dest.mkdir(parents=True, exist_ok=True)
for (idx, xref) in enumerate(xrefs, start=start_index):
image = pdf.extract_image(xref)
file_path = dest / f'{idx:04d}.{image["ext"]}'
with open(file_path, 'wb') as f:
f.write(image["image"])
def complete_prefix_number_ordering(entries):
matches = reversed(list(NUMBER_REGEX.finditer(entries[0].name)))
for m in matches:
pos = m.start()
prefix = entries[0].name[:pos]
if all(e.name.startswith(prefix) for e in entries):
entries_with_indices = []
indices = set()
for e in entries:
n = NUMBER_REGEX.match(e.name[pos:])
if n is None:
return None
i = int(n.group())
if i in indices:
return None
indices.add(i)
entries_with_indices.append((e, i))
entries_with_indices.sort(key=lambda ei: ei[1])
return [e for (e, i) in entries_with_indices]
return None
def link_ordered_files(ordering, dest, start_index=0):
dest.mkdir(parents=True, exist_ok=True)
for (idx, src_path) in enumerate(ordering, start=start_index):
ext = src_path.suffix.lower()
link_path = dest / f'{idx:04d}{ext}'
link_path.symlink_to(relpath(src_path, dest))
def ls_ignore(directory):
return [
path for path in directory.iterdir()
if path.name not in IGNOREABLE_FILES and path.suffix.lower() not in IGNOREABLE_EXTENSIONS
]
def collate(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
extraction_dir = args.destdir / 'extract'
hint_map = {hint.relative_to(extraction_dir).parents[-2].name: hint for hint in args.hints}
for work_path in extraction_dir.iterdir():
work_id = work_path.name
collation_dir = args.destdir / 'site' / 'images' / work_id
if collation_dir.exists():
continue
virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
if virtual == (1,):
continue
if work_id in hint_map:
hint = hint_map[work_id]
entries = [hint] if hint.is_file() else ls_ignore(hint)
else:
search_dir = work_path
while True:
entries = ls_ignore(search_dir)
if len(entries) == 1 and entries[0].is_dir():
search_dir = entries[0]
else:
break
if len(entries) == 1 and entries[0].suffix.lower() == '.pdf':
print(f'Extracting images from {entries[0]} for {work_id}')
link_pdf(entries[0], collation_dir)
continue
if len(entries) == 0:
print(f'{work_id} contains no files? skipping')
continue
if all(entry.is_file() and entry.suffix.lower() in IMAGE_FILE_EXTENSIONS for entry in entries):
ordering = complete_prefix_number_ordering(entries)
if ordering:
print(f'Symlinking image files for {work_id}')
link_ordered_files(ordering, collation_dir)
continue
print(f'Unable to deduce file structure for {work_id}, skipping')
con.close()
def self_and_parents(path):
return [path] + list(path.parents)
def manual_collate(args):
work_id = self_and_parents(args.paths[0].relative_to(args.destdir / 'extract'))[-2].name
collation_dir = args.destdir / 'site' / 'images' / work_id
if collation_dir.exists() and len(list(collation_dir.iterdir())) > 0:
print(f'Collation directory already exists!')
return
nonexistent = [path for path in args.paths if not path.exists()]
if len(nonexistent) > 0:
print(f'Nonexistent paths: {nonexistent}')
return
collation_dir.mkdir(parents=True, exist_ok=True)
index = 0
for path in args.paths:
if path.is_dir():
entries = [p for p in path.iterdir() if p.suffix.lower() in IMAGE_FILE_EXTENSIONS]
ordering = complete_prefix_number_ordering(entries)
if ordering is None:
ordering = entries
ordering.sort()
link_ordered_files(ordering, collation_dir, start_index=index)
index += len(ordering)
elif path.suffix.lower() in IMAGE_FILE_EXTENSIONS:
link_ordered_files([path], collation_dir, start_index=index)
index += 1
elif path.suffix.lower() == ".pdf":
link_pdf(path, collation_dir, start_index=index)
with fitz.open(path) as pdf:
index += pdf.page_count
else:
print(f'Unknown file type {path}, stopping')
return
def metadata(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
if args.virtual is not None:
cur.execute("UPDATE works SET virtual = ? WHERE id = ?", (1 if args.virtual else 0, args.work_id))
con.commit()
res = cur.execute(
"SELECT title, circle, date, description, series, virtual FROM works WHERE id = ?",
(args.work_id,),
).fetchone()
if res is None:
print(f'Work id {args.work_id} not found!')
return
(title, circle, date, description, series, virtual) = res
print(f'Work ID: {args.work_id}')
print(f'Title: {title}')
print(f'Circle: {circle}')
print(f'Pub date: {date}')
print(f'Description: {description}')
print(f'Series: {series}')
print(f'Virtual: {"Yes" if virtual == 1 else "No"}')
con.close()
def copy_contents(src, dest):
dest.mkdir(parents=True, exist_ok=True)
for item in src.iterdir():
shutil.copyfile(item, dest / item.name)
def publish(args):
jenv = Environment(
loader=PackageLoader("dlibrary"),
autoescape=select_autoescape()
)
viewer_template = jenv.get_template("viewer.html")
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
collated_work_ids = {p.name for p in (args.destdir / 'site' / 'images').iterdir()}
works = []
for (work_id, title, circle, date, description, series) in cur.execute('SELECT id, title, circle, date, description, series FROM works ORDER BY date DESC').fetchall():
if work_id not in collated_work_ids:
continue
authors = [author for (author,) in cur.execute('SELECT author FROM authors WHERE work = ?', (work_id,))]
tags = [tag for (tag,) in cur.execute('SELECT tag FROM tags WHERE work = ?', (work_id,))]
work = {
'id': work_id,
'title': title,
'circle': circle,
'date': date,
'description': description,
'series': series,
'authors': authors,
'tags': tags,
}
works.append(work)
images = [path.name for path in (args.destdir / 'site' / 'images' / work_id).iterdir()]
images.sort()
work_dir = args.destdir / 'site' / 'works' / work_id
work_dir.mkdir(parents=True, exist_ok=True)
with open(work_dir / 'index.html', 'w') as f:
f.write(viewer_template.render(depth=2, work=work, title=title, images=images))
with resources.as_file(resources.files("dlibrary")) as r:
copy_contents(r / 'static', args.destdir / 'site' / 'static')
list_template = jenv.get_template("list.html")
with open(args.destdir / 'site' / 'index.html', 'w') as f:
f.write(list_template.render(depth=0, works=works))
con.close()
argparser = argparse.ArgumentParser(prog='dlibrary')
argparser.add_argument(
'-d', '--destdir',
type=Path,
default=Path('./dlibrary'),
help='directory to store dlibrary content and metadata to (default: ./dlibrary)',
)
subparsers = argparser.add_subparsers(title="subcommands")
parser_extract = subparsers.add_parser('extract', help='extract zipfiles')
parser_extract.add_argument(
'-r', '--remove',
action='store_true',
help='remove original zipfiles after extraction',
)
parser_extract.add_argument(
'zipfiles',
metavar='FILE',
type=Path,
nargs='+',
help='zipfiles to extract',
)
parser_extract.set_defaults(func=extract)
parser_fetch = subparsers.add_parser('fetch', help='fetch metadata and thumbnails')
parser_fetch.set_defaults(func=fetch)
parser_collate = subparsers.add_parser('collate', help='collate a single sequence of image files for each work')
parser_collate.add_argument(
'hints',
metavar='PATH',
type=Path,
nargs='*',
help='manually-specified paths of subdirectories or PDFs within extraction folders, at most one per work',
)
parser_collate.set_defaults(func=collate)
parser_manual_collate = subparsers.add_parser('manual-collate', help='collate a specific work manually, specifying all paths to include')
parser_manual_collate.add_argument(
'paths',
metavar='PATH',
type=Path,
nargs='+',
help='paths of files (images to symlink, pdfs to extract) or directories (symlink all images in the directory, no recursion, best-effort sorting)'
)
parser_manual_collate.set_defaults(func=manual_collate)
parser_metadata = subparsers.add_parser('metadata', help='view or modify metadata for a work')
parser_metadata.add_argument('work_id')
parser_metadata.add_argument(
'--virtual',
action=argparse.BooleanOptionalAction,
help='set work as virtual',
)
parser_metadata.set_defaults(func=metadata)
parser_publish = subparsers.add_parser('publish', help='generate HTML/CSS/JS for library site')
parser_publish.set_defaults(func=publish)
def main():
args = argparser.parse_args()
args.func(args)
if __name__ == "__main__":
main()