dlibrary/dlibrary.py
2024-01-21 17:45:56 -05:00

178 lines
6 KiB
Python

#!/usr/bin/env python3
import asyncio
import os
import os.path
import re
import sqlite3
from urllib.parse import urlparse
import zipfile
from dlsite_async import DlsiteAPI
# import fitz
import requests
ZIP_DIR = "./zip"
EXTRACT_DIR = "./extract"
SITE_DIR = "./site"
DB_PATH = "./dlibrary.db"
NUMBER_REGEX = re.compile('[0-9]+')
def open_zipfile_with_encoding(path):
try:
return zipfile.ZipFile(path, metadata_encoding="utf-8")
except UnicodeDecodeError:
pass
try:
return zipfile.ZipFile(path, metadata_encoding="shift-jis")
except UnicodeDecodeError:
pass
return zipfile.ZipFile(path, metadata_encoding="shift-jisx0213")
def extract(zip_path, remove=False):
work_id = os.path.splitext(os.path.basename(zip_path))[0]
work_extract_path = os.path.join(EXTRACT_DIR, work_id)
os.makedirs(work_extract_path)
with open_zipfile_with_encoding(zip_path) as z:
z.extractall(path=work_extract_path)
if remove:
os.remove(zip_path)
def extract_all(remove=False):
for f in os.listdir(ZIP_DIR):
if f.endswith('.zip'):
print(f'Extracting {f}')
extract(os.path.join(ZIP_DIR, f), remove=remove)
async def populate_db(refresh=False):
con = sqlite3.connect(DB_PATH)
cur = con.cursor()
cur.execute("CREATE TABLE IF NOT EXISTS works(id TEXT PRIMARY KEY, title TEXT, circle TEXT, date TEXT, description TEXT, thumbnail_url TEXT)")
cur.execute("CREATE TABLE IF NOT EXISTS authors(author TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))")
cur.execute("CREATE TABLE IF NOT EXISTS tags(tag TEXT, work TEXT, FOREIGN KEY(work) REFERENCES works(id))")
async with DlsiteAPI() as api:
for work_id in os.listdir(EXTRACT_DIR):
if not refresh:
res = cur.execute("SELECT id FROM works WHERE id = ?", (work_id,))
if res.fetchone() is not None:
print(f'Metadata for {work_id} is already cached, skipping')
continue
print(f'Fetching metadata for {work_id}')
metadata = await api.get_work(work_id)
cur.execute(
"INSERT INTO works VALUES(:id, :title, :circle, :date, :description, :thumbnail_url)",
{
"id": work_id,
"title": metadata.work_name,
"circle": metadata.circle,
"date": metadata.regist_date.date().isoformat(),
"description": metadata.description,
"thumbnail_url": metadata.work_image,
},
)
cur.executemany(
"INSERT INTO authors VALUES(:author, :work)",
[{ "author": author, "work": work_id } for author in (metadata.author or [])],
)
cur.executemany(
"INSERT INTO tags VALUES(:tag, :work)",
[{ "tag": tag, "work": work_id } for tag in (metadata.genre or [])],
)
con.commit()
con.close()
def url_file_ext(url):
return os.path.splitext(urlparse(url).path)[1]
def get_thumbnails(refresh=False):
con = sqlite3.connect(DB_PATH)
cur = con.cursor()
for (work_id, thumbnail_url) in cur.execute("SELECT id, thumbnail_url FROM works"):
if thumbnail_url.startswith('//'):
thumbnail_url = 'https:' + thumbnail_url
ext = url_file_ext(thumbnail_url)
dest_file = os.path.join(SITE_DIR, 'thumbnails', work_id + ext)
if not refresh:
if os.path.exists(dest_file):
print(f'Thumbnail for {work_id} is already cached, skipping')
continue
print(f'Downloading thumbnail for {work_id} from {thumbnail_url}')
with open(dest_file, 'wb') as fd:
with requests.get(thumbnail_url, stream=True) as r:
for chunk in r.iter_content(chunk_size=16384):
fd.write(chunk)
def link_files(work_id):
work_site_dir = os.path.join(SITE_DIR, "works", work_id)
work_images_dir = os.path.join(work_site_dir, "images")
os.makedirs(work_images_dir)
search_dir = os.path.join(EXTRACT_DIR, work_id)
while True:
entries = os.listdir(search_dir)
if len(entries) == 1:
entry_path = os.path.join(search_dir, entries[0])
if os.path.isdir(entry_path):
search_dir = entry_path
continue
break
if len(entries) == 1 and os.path.splitext(entry_path)[1] == ".pdf":
link_pdf(entry_path, work_images_dir)
return
if len(entries) == 0:
print(f'{work_id} contains no files? Skipping')
return
if all(os.path.isfile(os.path.join(search_dir, entry)) for entry in entries):
ordering = complete_prefix_number_ordering(entries)
if ordering:
link_ordered_files(search_dir, ordering, work_images_dir)
return
print(f'Unable to deduce file structure for {work_id}, skipping')
def link_pdf(src, dest):
pass
def complete_prefix_number_ordering(entries):
matches = reversed(list(NUMBER_REGEX.finditer(entries[0])))
for m in matches:
pos = m.start()
prefix = entries[0][:pos]
if all(e.startswith(prefix) for e in entries):
entries_with_indices = []
indices = set()
for e in entries:
n = NUMBER_REGEX.match(e[pos:])
if n is None:
return None
i = int(n.group())
if i in indices:
return None
indices.add(i)
entries_with_indices.append((e, i))
entries_with_indices.sort(key=lambda ei: ei[1])
return [e for (e, i) in entries_with_indices]
return None
def link_ordered_files(srcdir, ordering, dest):
for (idx, item) in enumerate(ordering):
ext = os.path.splitext(item)[1]
target = os.path.join(dest, f'{idx:04d}{ext}')
os.link(os.path.join(srcdir, item), target)
def gen_site():
pass