Compare commits

...

3 commits

3 changed files with 202 additions and 168 deletions

View file

@ -3,6 +3,7 @@
import argparse
import asyncio
import importlib_resources as resources
from io import BytesIO
from pathlib import Path
import os
from os.path import relpath, splitext
@ -17,6 +18,7 @@ import zipfile
from dlsite_async import DlsiteAPI
import fitz
from libsixel import *
from PIL import Image
from jinja2 import Environment, PackageLoader, select_autoescape
import requests
@ -77,7 +79,8 @@ IMAGE_FILE_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.gif', '.tiff', '.bmp']
IGNOREABLE_FILES = ['Thumbs.db', '__MACOSX', '.DS_Store']
IGNOREABLE_EXTENSIONS = ['.txt', '.html', '.htm', '.psd', '.mp4']
PDF_FALLBACK_DPI = 300
PDF_CONVERSION_DPI = 300
PDF_PREVIEW_DPI = 72
IRRELEVANT_PDF_BLOCK_REGEX = re.compile(r'\bTCPDF\b', re.I)
@ -214,11 +217,52 @@ def self_and_parents(path):
return [path] + list(path.parents)
def collate(args):
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
extraction_dir = args.destdir / 'extract'
hint_map = {self_and_parents(Path(relpath(hint, extraction_dir)))[-2].name: hint for hint in args.hints}
def extracted_path_work_id(path):
trail = self_and_parents(Path(relpath(path, extraction_dir)))
if len(trail) < 2:
return None
result = trail[-2].name
if result == '..':
return None
return result
(raw_groups, raw_exclusions) = parse_expressions(args.expression)
specified_works = set()
works_groups = {}
for group in raw_groups:
if len(group) == 0:
continue
work_id = extracted_path_work_id(group[0])
if not work_id:
print(f'Group {group} contains paths outside an extracted work!')
exit(1)
if not all(extracted_path_work_id(item) == work_id for item in group[1:]):
print(f'Group {group} contains paths from multiple works!')
exit(1)
specified_works.add(work_id)
if work_id not in works_groups:
works_groups[work_id] = []
normalized_paths = [normalize_to(item, args.destdir) for item in group]
if not all(path.exists() for path in normalized_paths):
print(f'Group {group} contains nonexistent paths!')
exit(1)
works_groups[work_id].append(normalized_paths)
exclusions = []
for exclusion in raw_exclusions:
work_id = extracted_path_work_id(exclusion)
if not work_id:
print(f'Excluded path {exclusion} does not belong to an extracted work!')
exit(1)
specified_works.add(work_id)
normalized_path = normalize_to(exclusion, args.destdir)
if not normalized_path.exists():
print(f'Excluded path {exclusion} does not exist!')
exit(1)
exclusions.append(normalized_path)
collation_staging_area = args.destdir / 'site' / 'images-staging'
collation_staging_area.mkdir(parents=True)
@ -226,21 +270,41 @@ def collate(args):
collation_area = args.destdir / 'site' / 'images'
collation_area.mkdir(parents=True, exist_ok=True)
con = sqlite3.connect(args.destdir / 'meta.db')
cur = con.cursor()
for work_path in extraction_dir.iterdir():
work_id = work_path.name
if args.only_specified_works and work_id not in specified_works:
continue
work_collation_dir = collation_area / work_id
if work_collation_dir.exists():
continue
if work_id not in specified_works:
continue
if len(list(work_collation_dir.iterdir())) > 0:
print(f'Collation directory for work {work_id} already exists!')
break
else:
work_collation_dir.rmdir()
virtual = cur.execute("SELECT virtual FROM works WHERE id = ?", (work_id,)).fetchone()
if virtual == (1,):
if work_id in specified_works:
print(f'Work {work_id} is virtual!')
break
continue
work_staging_dir = collation_staging_area / work_id
collator = Collator(work_staging_dir, [], args)
collation_result = collator.collate_from_paths([hint_map.get(work_id, work_path)])
collator = Collator(work_staging_dir, exclusions, args)
for group in works_groups.get(work_id, [[work_path]]):
collation_result = collator.collate_from_paths([item for item in group if item not in exclusions])
if not collation_result:
print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}')
break
if collation_result and collator.index > 0:
print(f'Collated {collator.index} pages for {work_id}')
work_staging_dir.rename(work_collation_dir)
@ -253,7 +317,7 @@ def collate(args):
if not collation_result:
print(f'Unable to deduce file structure for {work_id}, skipping')
elif collator.index == 0:
print(f'{work_id} contains no files? skipping')
print(f'No files found for {work_id}, skipping')
collation_staging_area.rmdir()
con.close()
@ -466,43 +530,84 @@ def extract_image(pdf, xref):
pix = fitz.Pixmap(pdf, xref)
return { 'ext': 'png', 'image': pix.tobytes('png') }
def display_sixel_page(page):
s = BytesIO()
image = Image.open(BytesIO(page.get_pixmap(dpi=PDF_PREVIEW_DPI).tobytes('png')))
width, height = image.size
try:
data = image.tobytes()
except NotImplementedError:
data = image.tostring()
output = sixel_output_new(lambda data, s: s.write(data), s)
try:
if image.mode == 'RGBA':
dither = sixel_dither_new(256)
sixel_dither_initialize(dither, data, width, height, SIXEL_PIXELFORMAT_RGBA8888)
elif image.mode == 'RGB':
dither = sixel_dither_new(256)
sixel_dither_initialize(dither, data, width, height, SIXEL_PIXELFORMAT_RGB888)
elif image.mode == 'P':
palette = image.getpalette()
dither = sixel_dither_new(256)
sixel_dither_set_palette(dither, palette)
sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_PAL8)
elif image.mode == 'L':
dither = sixel_dither_get(SIXEL_BUILTIN_G8)
sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_G8)
elif image.mode == '1':
dither = sixel_dither_get(SIXEL_BUILTIN_G1)
sixel_dither_set_pixelformat(dither, SIXEL_PIXELFORMAT_G1)
else:
raise RuntimeError('unexpected image mode')
try:
sixel_encode(data, width, height, 1, dither, output)
print(s.getvalue().decode('ascii'))
finally:
sixel_dither_unref(dither)
finally:
sixel_output_unref(output)
def pdf_images(pdf, force=False):
images_by_page = [(page.get_images(), is_single_image(page)) for page in pdf]
if all(len(images) == 1 and single for (images, single) in images_by_page):
return (extract_image(pdf, images[0][0]) for (images, _) in images_by_page)
print("Checking PDF images the quick way failed, trying the slow way")
def xref_or_image_generator():
xref_mode = not force
for (idx, page) in enumerate(pdf):
page_images = page.get_image_info(xrefs=True)
if len(page_images) == 1 and page_images[0]['xref'] != 0 and is_single_image(page):
xref = page_images[0]['xref']
if xref_mode:
yield xref
else:
yield extract_image(pdf, xref)
print(f'0/{pdf.page_count} pages processed...', end='')
image_extractors = []
for (idx, page) in enumerate(pdf):
page_images = page.get_image_info(xrefs=True)
if len(page_images) == 1 and page_images[0]['xref'] != 0:
xref = page_images[0]['xref']
else:
xref = None
if xref is not None and is_single_image(page):
image_extractors.append(lambda p=pdf, x=xref: extract_image(p, x))
else:
print(f'\nPage {idx+1}: {len(page_images)} images, {len([img for img in page_images if img["xref"] == 0])} non-xref images, {len(relevant_blocks(page))} total relevant objects')
if force:
print(f'Converting page {idx+1}')
choice = 'c'
else:
print(f'\nPage {idx+1}: {len(page_images)} images, {len([img for img in page_images if img["xref"] == 0])} non-xref images, {len(relevant_blocks(page))} total relevant objects')
if xref_mode:
raise ValueError
else:
print(f'Generating pixmap for page {idx+1}')
pix = page.get_pixmap(dpi=PDF_FALLBACK_DPI)
yield { 'ext': 'png', 'image': pix.tobytes('png') }
print(f'\x1b[2K\r{idx+1}/{pdf.page_count} pages processed...', end='')
print('')
shown = False
while True:
choice = input(f'[N]ope out of this PDF / [c]onvert page to image{"" if xref is None else " / e[x]tract embedded image without text"}{"" if shown else " / [s]how page before deciding"}? [N/c{"" if xref is None else "/x"}{"" if shown else "/s"}] ')
if not shown and choice != '' and choice[0].lower() == 's':
display_sixel_page(page)
shown = True
else:
break
if xref is not None and choice != '' and choice[0].lower() == 'x':
image_extractors.append(lambda p=pdf, x=xref: extract_image(p, x))
elif choice != '' and choice[0].lower() == 'c':
image_extractors.append(lambda p=page: { 'ext': 'png', 'image': p.get_pixmap(dpi=PDF_CONVERSION_DPI).tobytes('png') })
else:
return None
print(f'\x1b[2K\r{idx+1}/{pdf.page_count} pages processed...', end=('' if idx+1 < pdf.page_count else '\n'))
if force:
return xref_or_image_generator()
try:
xrefs = list(xref_or_image_generator())
except ValueError:
print('\nFailed')
return None
print('Success')
return (extract_image(pdf, xref) for xref in xrefs)
return (extractor() for extractor in image_extractors)
def nfc(s):
return unicodedata.normalize('NFC', s)
@ -688,9 +793,6 @@ def superior_or_equal(a, b):
return len(a) >= len(b) and all(a[i] >= b[i] for i in range(len(b)))
def self_and_parents(path):
return [path] + list(path.parents)
def parse_expressions(tokens):
groups = []
exclusions = []
@ -727,61 +829,6 @@ def parse_group(tokens):
def normalize_to(path, ref):
return ref / Path(relpath(path, ref))
def manual_collate(args):
(raw_groups, raw_exclusions) = parse_expressions(args.expression)
extraction_dir = args.destdir / 'extract'
sample_path = next(path for group in (raw_groups + [raw_exclusions]) for path in group)
work_id = self_and_parents(Path(relpath(sample_path, extraction_dir)))[-2].name
exclusions = [normalize_to(item, args.destdir) for item in raw_exclusions]
if raw_groups:
groups = [[normalize_to(item, args.destdir) for item in group] for group in raw_groups]
else:
groups = [[extraction_dir / work_id]]
collation_area = args.destdir / 'site' / 'images'
collation_area.mkdir(parents=True, exist_ok=True)
work_collation_dir = collation_area / work_id
if work_collation_dir.exists():
if len(list(work_collation_dir.iterdir())) > 0:
print('Collation directory already exists!')
return
else:
work_collation_dir.rmdir()
nonexistent = [path for group in (groups + [exclusions]) for path in group if not path.exists()]
if len(nonexistent) > 0:
print(f'Nonexistent paths: {nonexistent}')
return
collation_staging_area = args.destdir / 'site' / 'images-staging'
work_staging_dir = collation_staging_area / work_id
work_staging_dir.mkdir(parents=True)
collator = Collator(work_staging_dir, exclusions, args)
for group in groups:
collation_result = collator.collate_from_paths([item for item in group if item not in exclusions])
if collation_result is None:
print(f'Unable to deduce file structure for {work_id} subgroup {[str(path) for path in group]}')
break
if collation_result and collator.index > 0:
print(f'Collated {collator.index} pages for {work_id}')
work_staging_dir.rename(work_collation_dir)
else:
for f in work_staging_dir.iterdir():
f.unlink()
work_staging_dir.rmdir()
if collation_result and collator.index == 0:
print(f'No files found for {work_id}')
collation_staging_area.rmdir()
def fmt_size(s):
return f'{s[0]}x{s[1]}px'
@ -1022,9 +1069,9 @@ argparser = argparse.ArgumentParser(
subfolder.
- `fetch` metadata and thumbnail images for extracted works
from DLSite.
- `collate` and/or `manual-collate` extracted works,
producing a single sequence of image files (or symlinks
into the extracted data, when possible) for each work.
- `collate` extracted works, producing a single sequence of
image files (or symlinks into the extracted data, when
possible) for each work.
- Manually adjust works' `metadata` when necessary.
- `generate` a static website providing a catalog and viewer
for all collated works.
@ -1047,7 +1094,7 @@ argparser.add_argument(
)
subparsers = argparser.add_subparsers(title="subcommands", required=True)
parser_extract = subparsers.add_parser('extract', aliases=['x', 'ex'], help='extract zipfiles')
parser_extract = subparsers.add_parser('extract', aliases=['x'], help='extract zipfiles')
parser_extract.add_argument(
'-r', '--remove',
action='store_true',
@ -1062,111 +1109,96 @@ parser_extract.add_argument(
)
parser_extract.set_defaults(func=extract)
parser_fetch = subparsers.add_parser('fetch', aliases=['f', 'fet'], help='fetch metadata and thumbnails')
parser_fetch = subparsers.add_parser('fetch', aliases=['f'], help='fetch metadata and thumbnails')
parser_fetch.set_defaults(func=fetch)
parser_collate = subparsers.add_parser(
'collate',
aliases=['c', 'co', 'col'],
help='collate each work into a sequence of image files',
aliases=['c'],
help='collate works into sequences of image files',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
For each extracted work that has not already been collated,
DLibrary will attempt to intuit its structure as follows:
DLibrary will attempt to intuit its structure and create
a single ordered list of image files in the site data
directory. Each image will either be a symlink to an image
file in the extraction folder, or a single page extracted
from a PDF file.
- Enter the work's directory. If the directory contains
nothing except a single subdirectory (ignoring a few types
of files that are definitely not relevant), traverse
downwards repeatedly.
- If the current directory contains nothing except a single
PDF (again, ignoring irrelevant files), attempt to extract
a series of images from the PDF. This process expects that
each page of the PDF consists of a single embedded image,
which will be extracted at full resolution. Support for
more complex PDFs is not yet implemented.
- If the current directory contains nothing except image
files, and the image files are named in a way that clearly
indicates a complete numerical order (each filename
consists of a shared prefix followed by a distinct
number), symlink files in the inferred order.
- Otherwise, skip processing this work for now.
DLibrary may fail to automatically collate a work if its
files and subdirectories are not named in a way that
indicates a clear linear ordering. In order to assist with
collation, you can provide a list of expressions specifying
where to start traversing the directory structure, what
files to include in what order, and/or what files to ignore
entirely.
DLibrary can be given "collation hints" which provide
alternative starting points for this search process. A hint
is a path under $DLIBRARY_DIR/extract/[work id]/
indicating a different directory or PDF file to begin the
search process for that work, rather than starting at the
top level of the extracted data. There can be at most one
hint per work; for more complicated scenarios where a work
includes multiple folders that need to be collated together,
or where filenames do not clearly indicate an ordering, use
`manual-collate` instead.
"""),
)
parser_collate.add_argument(
'hints',
metavar='PATH',
type=Path,
nargs='*',
help='paths within extraction folders as collation hints'
)
parser_collate.set_defaults(func=collate, force_convert_pdf=False)
parser_manual_collate = subparsers.add_parser(
'manual-collate',
aliases=['mc', 'man', 'manual'],
help='collate a single work manually',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\
Provide an expression or sequence of expressions specifying groups
of paths to collate or skip. An expression can be:
An expression can be:
PATH
A single path. If this is an image, it will be appended to
the sequence of collated images; if this is a PDF, images will be
extracted from it and concatenated to the sequence; if this is a
directory, the contents of the directory will be collated based on
the normal heuristics and concatenated to the sequence.
the sequence of collated images for the work it belongs to;
if this is a PDF, images will be extracted from it and
concatenated to the sequence; if this is a directory, the
contents of the directory will be automatically collated
using DLibrary's default heuristics, and concatenated
to the sequence.
( PATH [PATH ...] )
A group of paths contained in parentheses. You may need to escape
the parentheses to avoid them getting parsed by your shell.
All the paths in this group will be considered together, and
collated based on the normal heuristics, regardless of what
order the paths are provided in.
automatically collated using the default heuristics, regardless
of what order the paths are provided in.
! PATH
! ( PATH [PATH ...] )
A path or group of paths to exclude from collation. You may
need to escape the !. If an excluded path appears within any
of the other specified paths, it will be ignored.
of the other specified paths, it will be skipped by the collation
heuristics.
If the only expressions provided are negations, then auto-collation
will start from the top level of the extracted work while excluding
the negated paths.
will start from the top level of the extracted work while skipping
the excluded paths.
All provided paths must be under $DLIBRARY_DIR/extract/[work id]/
for the work being manually collated. `manual-collate` can
only handle one work at a time.
"""),
for some not-yet-collated work. Paths belonging to multiple
different works can all be provided on the same command line, and
expressions will be clustered together by work id while otherwise
preserving the order they were provided in. A parenthesized group
expression must only contain paths belonging to a single work.
By default, DLibrary will attempt to collate every not-yet-collated
work (excluding "virtual" works), using the provided expressions
to assist in collation when available. The `-o` flag will direct
DLibrary to *only* collate works included in the provided expressions,
even if other uncollated works are present.
"""),
)
parser_manual_collate.add_argument(
parser_collate.add_argument(
'-o', '--only-specified-works',
action='store_true',
help="only collate works that are explicitly specified",
)
parser_collate.add_argument(
'--force-convert-pdf',
action='store_true',
help="convert a PDF page to a 300dpi image if there isn't a single image we can extract directly",
)
parser_manual_collate.add_argument(
parser_collate.add_argument(
'expression',
nargs='+',
nargs='*',
help='expressions indicating paths to collate or skip',
)
parser_manual_collate.set_defaults(func=manual_collate)
parser_collate.set_defaults(func=collate)
parser_analyze = subparsers.add_parser('analyze', aliases=['a', 'an', 'anal'], help='analyze an extracted folder to assist in collation')
parser_analyze = subparsers.add_parser('analyze', aliases=['a'], help='analyze an extracted folder to assist in collation')
parser_analyze.add_argument('work_id')
parser_analyze.set_defaults(func=analyze)
parser_metadata = subparsers.add_parser('metadata', aliases=['m', 'me', 'meta'], help='view or modify metadata for a work')
parser_metadata = subparsers.add_parser('metadata', aliases=['m'], help='view or modify metadata for a work')
parser_metadata.add_argument('work_id')
parser_metadata.add_argument(
'--virtual',
@ -1177,7 +1209,7 @@ parser_metadata.set_defaults(func=metadata)
parser_generate = subparsers.add_parser(
'generate',
aliases=['g', 'gen'],
aliases=['g'],
help='generate HTML/CSS/JS for library site',
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent("""\

View file

@ -57,6 +57,7 @@
jinja2
importlib-resources
setuptools
libsixel
];
src = ./.;
};

View file

@ -11,6 +11,7 @@ dependencies = [
"dlsite-async",
"jinja2",
"importlib_resources",
"libsixel",
]
[project.scripts]