refactor as a CLI program with nice subcommands, fuck GUIs, we hate GUIs

This commit is contained in:
xenofem 2024-01-22 02:16:06 -05:00
parent 8089a9e55a
commit e907deda75

237
dlibrary.py Normal file → Executable file
View file

@ -1,24 +1,22 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse
import asyncio import asyncio
import os from pathlib import Path
import os.path from os.path import relpath, splitext
import re import re
import sqlite3 import sqlite3
from urllib.parse import urlparse from urllib.parse import urlparse
import zipfile import zipfile
from dlsite_async import DlsiteAPI from dlsite_async import DlsiteAPI
# import fitz import fitz
import requests import requests
ZIP_DIR = "./zip"
EXTRACT_DIR = "./extract"
SITE_DIR = "./site"
DB_PATH = "./dlibrary.db"
NUMBER_REGEX = re.compile('[0-9]+') NUMBER_REGEX = re.compile('[0-9]+')
IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff']
def open_zipfile_with_encoding(path): def open_zipfile_with_encoding(path):
try: try:
return zipfile.ZipFile(path, metadata_encoding="utf-8") return zipfile.ZipFile(path, metadata_encoding="utf-8")
@ -32,50 +30,49 @@ def open_zipfile_with_encoding(path):
return zipfile.ZipFile(path, metadata_encoding="shift-jisx0213") return zipfile.ZipFile(path, metadata_encoding="shift-jisx0213")
def extract(zip_path, remove=False): def extract(args):
work_id = os.path.splitext(os.path.basename(zip_path))[0] for zip_path in args.zipfiles:
work_extract_path = os.path.join(EXTRACT_DIR, work_id) work_id = zip_path.stem
os.makedirs(work_extract_path) work_extract_path = args.destdir / 'extract' / work_id
work_extract_path.mkdir(parents=True)
with open_zipfile_with_encoding(zip_path) as z: with open_zipfile_with_encoding(zip_path) as z:
z.extractall(path=work_extract_path) z.extractall(path=work_extract_path)
if remove: if args.remove:
os.remove(zip_path) zip_path.unlink()
def extract_all(remove=False): async def fetch_async(args):
for f in os.listdir(ZIP_DIR): con = sqlite3.connect(args.destdir / 'meta.db')
if f.endswith('.zip'):
print(f'Extracting {f}')
extract(os.path.join(ZIP_DIR, f), remove=remove)
async def populate_db(refresh=False):
con = sqlite3.connect(DB_PATH)
cur = con.cursor() cur = con.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, thumbnail_url TEXT)") cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, series TEXT, virtual INT)")
cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))") cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))")
cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))") cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))")
thumbnails_dir = args.destdir / 'site' / 'thumbnails'
thumbnails_dir.mkdir(parents=True, exist_ok=True)
async with DlsiteAPI() as api: async with DlsiteAPI() as api:
for work_id in os.listdir(EXTRACT_DIR): for work_path in (args.destdir / 'extract').iterdir():
if not refresh: work_id = work_path.name
res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,))
if res.fetchone() is not None: res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,))
print(f'Metadata for {work_id} is already cached, skipping') if res.fetchone() is not None:
continue continue
print(f'Fetching metadata for {work_id}') print(f'Fetching metadata for {work_id}')
metadata = await api.get_work(work_id) metadata = await api.get_work(work_id)
cur.execute( cur.execute(
"INSERT INTO works VALUES(:id, :title, :circle, :date, :description, :thumbnail_url)", "INSERT INTO works(id, title, circle, date, description, series) VALUES(:id, :title, :circle, :date, :description, :series)",
{ {
"id": work_id, "id": work_id,
"title": metadata.work_name, "title": metadata.work_name,
"circle": metadata.circle, "circle": metadata.circle,
"date": metadata.regist_date.date().isoformat(), "date": metadata.regist_date.date().isoformat(),
"description": metadata.description, "description": metadata.description,
"thumbnail_url": metadata.work_image, "series": metadata.series,
}, },
) )
cur.executemany( cur.executemany(
@ -86,76 +83,85 @@ async def populate_db(refresh=False):
"INSERT INTO tags VALUES(:tag, :work)", "INSERT INTO tags VALUES(:tag, :work)",
[{ "tag": tag, "work": work_id } for tag in (metadata.genre or [])], [{ "tag": tag, "work": work_id } for tag in (metadata.genre or [])],
) )
thumbnail_url = metadata.work_image
if thumbnail_url.startswith('//'):
thumbnail_url = 'https:' + thumbnail_url
ext = url_file_ext(thumbnail_url)
dest_file = thumbnails_dir / (work_id + ext)
print(f'Downloading thumbnail for {work_id} from {thumbnail_url}')
with open(dest_file, 'wb') as fd:
with requests.get(thumbnail_url, stream=True) as r:
for chunk in r.iter_content(chunk_size=16384):
fd.write(chunk)
con.commit() con.commit()
con.close() con.close()
def url_file_ext(url): def url_file_ext(url):
return os.path.splitext(urlparse(url).path)[1] return splitext(urlparse(url).path)[1]
def get_thumbnails(refresh=False): def fetch(args):
con = sqlite3.connect(DB_PATH) asyncio.run(fetch_async(args))
def collate(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor() cur = con.cursor()
for (work_id, thumbnail_url) in cur.execute("SELECT id, thumbnail_url FROM works"): for work_path in (args.destdir / 'extract').iterdir():
if thumbnail_url.startswith('//'): work_id = work_path.name
thumbnail_url = 'https:' + thumbnail_url
ext = url_file_ext(thumbnail_url) collation_dir = args.destdir / 'site' / 'works' / work_id
dest_file = os.path.join(SITE_DIR, 'thumbnails', work_id + ext) if collation_dir.exists():
if not refresh: continue
if os.path.exists(dest_file):
print(f'Thumbnail for {work_id} is already cached, skipping') virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
if virtual == (1,):
continue
search_dir = work_path
while True:
entries = list(search_dir.iterdir())
if len(entries) == 1 and entries[0].is_dir():
search_dir = entries[0]
else:
break
if len(entries) == 1 and entries[0].suffix.lower() == '.pdf':
print(f'Extracting images from {entries[0].name} for {work_id}')
link_pdf(entries[0], collation_dir)
continue
if len(entries) == 0:
print(f'{work_id} contains no files? skipping')
continue
if all(entry.is_file() and entry.suffix.lower() in IMAGE_FILE_EXTENSIONS for entry in entries):
ordering = complete_prefix_number_ordering(entries)
if ordering:
print(f'Symlinking image files for {work_id}')
link_ordered_files(ordering, collation_dir)
continue continue
print(f'Downloading thumbnail for {work_id} from {thumbnail_url}') print(f'Unable to deduce file structure for {work_id}, skipping')
with open(dest_file, 'wb') as fd:
with requests.get(thumbnail_url, stream=True) as r:
for chunk in r.iter_content(chunk_size=16384):
fd.write(chunk)
def link_files(work_id): con.close()
work_site_dir = os.path.join(SITE_DIR, "works", work_id)
work_images_dir = os.path.join(work_site_dir, "images")
os.makedirs(work_images_dir)
search_dir = os.path.join(EXTRACT_DIR, work_id)
while True:
entries = os.listdir(search_dir)
if len(entries) == 1:
entry_path = os.path.join(search_dir, entries[0])
if os.path.isdir(entry_path):
search_dir = entry_path
continue
break
if len(entries) == 1 and os.path.splitext(entry_path)[1] == ".pdf":
link_pdf(entry_path, work_images_dir)
return
if len(entries) == 0:
print(f'{work_id} contains no files? Skipping')
return
if all(os.path.isfile(os.path.join(search_dir, entry)) for entry in entries):
ordering = complete_prefix_number_ordering(entries)
if ordering:
link_ordered_files(search_dir, ordering, work_images_dir)
return
print(f'Unable to deduce file structure for {work_id}, skipping')
def link_pdf(src, dest): def link_pdf(src, dest):
pass print(f'PDF support not yet implemented, skipping {src}')
def complete_prefix_number_ordering(entries): def complete_prefix_number_ordering(entries):
matches = reversed(list(NUMBER_REGEX.finditer(entries[0]))) matches = reversed(list(NUMBER_REGEX.finditer(entries[0].name)))
for m in matches: for m in matches:
pos = m.start() pos = m.start()
prefix = entries[0][:pos] prefix = entries[0].name[:pos]
if all(e.startswith(prefix) for e in entries): if all(e.name.startswith(prefix) for e in entries):
entries_with_indices = [] entries_with_indices = []
indices = set() indices = set()
for e in entries: for e in entries:
n = NUMBER_REGEX.match(e[pos:]) n = NUMBER_REGEX.match(e.name[pos:])
if n is None: if n is None:
return None return None
i = int(n.group()) i = int(n.group())
@ -167,11 +173,62 @@ def complete_prefix_number_ordering(entries):
return [e for (e, i) in entries_with_indices] return [e for (e, i) in entries_with_indices]
return None return None
def link_ordered_files(srcdir, ordering, dest): def link_ordered_files(ordering, dest):
for (idx, item) in enumerate(ordering): dest.mkdir(parents=True)
ext = os.path.splitext(item)[1]
target = os.path.join(dest, f'{idx:04d}{ext}')
os.link(os.path.join(srcdir, item), target)
def gen_site(): for (idx, src_path) in enumerate(ordering):
ext = src_path.suffix.lower()
link_path = dest / f'{idx:04d}{ext}'
link_path.symlink_to(relpath(src_path, dest))
def metadata(args):
pass pass
def publish(args):
pass
argparser = argparse.ArgumentParser(prog='dlibrary')
argparser.add_argument(
'-d', '--destdir',
type=Path,
default=Path('./dlibrary'),
help='directory to store dlibrary content and metadata to (default: ./dlibrary)',
)
subparsers = argparser.add_subparsers(title="subcommands")
parser_extract = subparsers.add_parser('extract', help='extract zipfiles')
parser_extract.add_argument(
'-r', '--remove',
action='store_true',
help='remove original zipfiles after extraction',
)
parser_extract.add_argument(
'zipfiles',
metavar='FILE',
type=Path,
nargs='+',
help='zipfiles to extract',
)
parser_extract.set_defaults(func=extract)
parser_fetch = subparsers.add_parser('fetch', help='fetch metadata and thumbnails')
parser_fetch.set_defaults(func=fetch)
parser_collate = subparsers.add_parser('collate', help='collate a single sequence of image files for each work')
parser_collate.set_defaults(func=collate)
parser_metadata = subparsers.add_parser('metadata', help='view or modify metadata for a work')
parser_metadata.add_argument('work_id')
parser_metadata.add_argument(
'--virtual',
action=argparse.BooleanOptionalAction,
help='set work as virtual',
)
parser_metadata.set_defaults(func=metadata)
parser_publish = subparsers.add_parser('publish', help='generate HTML/CSS/JS for library site')
parser_publish.set_defaults(func=publish)
if __name__ == "__main__":
args = argparser.parse_args()
args.func(args)