transbeam/cli/transbeam-cli

112 lines
4.6 KiB
Python
Executable file

#!/usr/bin/env python3
import argparse
import asyncio
import getpass
import json
import math
import pathlib
import sys
import time
from unicodedata import east_asian_width
import aiofiles
from tqdm import tqdm
from websockets import connect
FILE_CHUNK_SIZE = 16384
FILE_NAME_DISPLAY_WIDTH = 24
FILE_NAME_DISPLAY_PADDING = 3
FILE_NAME_DISPLAY_UPDATE_PERIOD = 0.2
def rotating_segment(name, pos):
output = ''
total_width = 0
rotating_name = name + (' ' * FILE_NAME_DISPLAY_PADDING) + name
for char in rotating_name[pos:]:
char_width = 2 if east_asian_width(char) in ['F', 'W'] else 1
if char_width == 2 and total_width == FILE_NAME_DISPLAY_WIDTH - 1:
output += ' '
total_width += 1
else:
output += char
total_width += char_width
if total_width >= FILE_NAME_DISPLAY_WIDTH:
return output
async def file_loader(files):
with tqdm(desc="Total", total=sum(size for (path, size) in files), unit='B', unit_scale=True, leave=True, position=1) as total_progress:
for (path, size) in files:
if len(path.name) > FILE_NAME_DISPLAY_WIDTH:
file_name_display_pos = 0
last_desc_update_time = time.time()
desc = rotating_segment(path.name, 0)
else:
desc = path.name
with tqdm(desc=desc, total=size, unit='B', unit_scale=True, leave=True, position=0) as file_progress:
async with aiofiles.open(path, mode='rb') as f:
while True:
pos = await f.tell()
if pos >= size:
break
data = await f.read(min(FILE_CHUNK_SIZE, size - pos))
if data == b'':
tqdm.write("file ended early!")
exit(1)
if len(path.name) > FILE_NAME_DISPLAY_WIDTH and (now := time.time()) - last_desc_update_time > FILE_NAME_DISPLAY_UPDATE_PERIOD:
file_name_display_pos = (file_name_display_pos + 1) % (len(path.name) + FILE_NAME_DISPLAY_PADDING)
last_desc_update_time = now
file_progress.set_description(
desc=rotating_segment(path.name, file_name_display_pos),
refresh=False,
)
total_progress.update(len(data))
file_progress.update(len(data))
yield data
async def send(paths, host, password, lifetime, collection_name=None, relpaths=False):
paths = [path for path in paths if path.is_file()]
fileMetadata = [
{
"name": str(path) if relpaths else path.name,
"size": path.stat().st_size,
"modtime": math.floor(path.stat().st_mtime * 1000),
} for path in paths
]
manifest = {
"files": fileMetadata,
"lifetime": lifetime,
"password": password,
}
if collection_name is not None:
manifest["collection_name"] = collection_name
async with connect("wss://{}/upload".format(host)) as ws:
await ws.send(json.dumps(manifest))
resp = json.loads(await ws.recv())
if resp["type"] != "ready":
print("unexpected response: {}".format(resp))
exit(1)
print("Download: https://{}/download?code={}".format(host, resp["code"]))
loader = file_loader([(paths[i], fileMetadata[i]["size"]) for i in range(len(paths))])
async for data in loader:
await ws.send(data)
parser = argparse.ArgumentParser(description="Upload files to transbeam")
parser.add_argument("-l", "--lifetime", type=int, default=7, help="Lifetime in days for files (default 7)")
parser.add_argument("-H", "--host", type=str, default="transbeam.link", help="transbeam host (default transbeam.link)")
parser.add_argument("-n", "--collection-name", type=str, help="Name for a collection of multiple files")
parser.add_argument("-R", "--relative-paths", action="store_true", help="Preserve file paths relative to working directory")
parser.add_argument("files", type=pathlib.Path, nargs="+", help="Files to upload")
async def main():
args = parser.parse_args()
if len(args.files) == 1 and args.collection_name is not None:
print("--collection-name is only applicable when multiple files are being uploaded")
exit(1)
password = getpass.getpass()
await send(args.files, args.host, password, args.lifetime, args.collection_name, args.relative_paths)
asyncio.run(main())