[+] download command: progress display, parallel downloads, aria2c batch mode
1. add --progress BooleanOptionalAction flag with plain mode; 2. progress_t tracks total/done/this_run packages and bytes, shows ETA and rate; 3. rate displayed as pkg/s when fast, s/pkg when slow; 4. add -j parameter for parallel downloads via ThreadPoolExecutor; 5. for aria2c with -j>1, use download_batch_aria2c with single process and input file; 6. skip already-downloaded files, estimate total bytes from average; 7. add test_download_cli.py with full test matrix: backend(3) x jobs(2) x progress(2) x existing(3); 8. mock downloader_t.download and download_batch_aria2c in all tests; 9. add unit tests for progress_t formatting and batch aria2c input generation;
This commit is contained in:
parent
41f997fa68
commit
15df281e61
@ -0,0 +1,352 @@
|
||||
"""Download compiled packages."""
|
||||
|
||||
import argparse
|
||||
import concurrent.futures
|
||||
import enum
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import subprocess
|
||||
import time
|
||||
import urllib.request
|
||||
|
||||
from typing import (
|
||||
ClassVar,
|
||||
Optional,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class parse_rate_t:
|
||||
class constants_t:
|
||||
rate_re: ClassVar[re.Pattern[str]] = re.compile(
|
||||
r'^(\d+(?:\.\d+)?)\s*([bBkKmMgGpPtT]?)(?:[iI]?[bB])?(?:/s)?$'
|
||||
)
|
||||
|
||||
units: ClassVar[dict[str, int]] = {
|
||||
'': 0, 'b': 0, 'B': 0,
|
||||
'k': 1, 'K': 1,
|
||||
'm': 2, 'M': 2,
|
||||
'g': 3, 'G': 3,
|
||||
't': 4, 'T': 4,
|
||||
'p': 5, 'P': 5,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def parse(s: str) -> int:
|
||||
m = parse_rate_t.constants_t.rate_re.match(s.strip())
|
||||
if not m:
|
||||
raise ValueError('invalid rate: %s' % s)
|
||||
|
||||
value = float(m.group(1))
|
||||
unit = m.group(2)
|
||||
|
||||
power = parse_rate_t.constants_t.units.get(unit, 0)
|
||||
|
||||
return int(value * (1024**power))
|
||||
|
||||
|
||||
class downloader_t:
|
||||
class constants_t:
|
||||
class backend_t(enum.Enum):
|
||||
urllib = 'urllib'
|
||||
curl = 'curl'
|
||||
aria2c = 'aria2c'
|
||||
|
||||
@staticmethod
|
||||
def download(
|
||||
url: str,
|
||||
dest: pathlib.Path,
|
||||
backend: 'downloader_t.constants_t.backend_t',
|
||||
limit_rate: int,
|
||||
) -> None:
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if backend is downloader_t.constants_t.backend_t.urllib:
|
||||
urllib.request.urlretrieve(url, str(dest))
|
||||
elif backend is downloader_t.constants_t.backend_t.curl:
|
||||
cmd = [
|
||||
'curl', '-fSL',
|
||||
'--limit-rate', '%d' % limit_rate,
|
||||
'-o', str(dest),
|
||||
url,
|
||||
]
|
||||
subprocess.check_call(cmd)
|
||||
elif backend is downloader_t.constants_t.backend_t.aria2c:
|
||||
cmd = [
|
||||
'aria2c',
|
||||
'--max-download-limit=%d' % limit_rate,
|
||||
'-d', str(dest.parent),
|
||||
'-o', dest.name,
|
||||
url,
|
||||
]
|
||||
subprocess.check_call(cmd)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def download_batch_aria2c(
|
||||
entries: list[tuple[str, pathlib.Path]],
|
||||
limit_rate: int,
|
||||
jobs: int,
|
||||
) -> None:
|
||||
"""Download multiple files using a single aria2c process with -j."""
|
||||
if len(entries) == 0:
|
||||
return
|
||||
|
||||
dest_dir = entries[0][1].parent
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# write input file for aria2c
|
||||
input_lines: list[str] = []
|
||||
for url, dest in entries:
|
||||
input_lines.append(url)
|
||||
input_lines.append(' dir=%s' % str(dest.parent))
|
||||
input_lines.append(' out=%s' % dest.name)
|
||||
|
||||
input_txt = '\n'.join(input_lines) + '\n'
|
||||
input_path = dest_dir / '.aria2c-input.txt'
|
||||
input_path.write_text(input_txt)
|
||||
|
||||
cmd = [
|
||||
'aria2c',
|
||||
'--max-download-limit=%d' % limit_rate,
|
||||
'-j', '%d' % jobs,
|
||||
'-i', str(input_path),
|
||||
]
|
||||
|
||||
try:
|
||||
subprocess.check_call(cmd)
|
||||
finally:
|
||||
input_path.unlink(missing_ok=True)
|
||||
|
||||
|
||||
class progress_t:
|
||||
class constants_t:
|
||||
class mode_t(enum.Enum):
|
||||
plain = 'plain'
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
total: int,
|
||||
total_bytes: int,
|
||||
already_done: int,
|
||||
already_bytes: int,
|
||||
) -> None:
|
||||
self.total = total
|
||||
self.total_bytes = total_bytes
|
||||
self.already_done = already_done
|
||||
self.already_bytes = already_bytes
|
||||
self.downloaded_this_run = 0
|
||||
self.downloaded_bytes_this_run = 0
|
||||
self.start_time = time.monotonic()
|
||||
|
||||
def update(self, file_bytes: int) -> None:
|
||||
self.downloaded_this_run += 1
|
||||
self.downloaded_bytes_this_run += file_bytes
|
||||
|
||||
def format_plain(self) -> str:
|
||||
done = self.already_done + self.downloaded_this_run
|
||||
done_mb = (self.already_bytes + self.downloaded_bytes_this_run) / (1024 * 1024)
|
||||
total_mb = self.total_bytes / (1024 * 1024)
|
||||
elapsed = time.monotonic() - self.start_time
|
||||
|
||||
if self.downloaded_this_run > 0 and elapsed > 0:
|
||||
rate = self.downloaded_this_run / elapsed
|
||||
remaining = self.total - done
|
||||
if rate > 0:
|
||||
eta_s = remaining / rate
|
||||
eta = '%dm%02ds' % (int(eta_s) // 60, int(eta_s) % 60)
|
||||
else:
|
||||
eta = '?'
|
||||
if rate >= 1:
|
||||
rate_str = '%.1f pkg/s' % rate
|
||||
else:
|
||||
rate_str = '%.1f s/pkg' % (1.0 / rate) if rate > 0 else '?'
|
||||
else:
|
||||
eta = '?'
|
||||
rate_str = '?'
|
||||
|
||||
return (
|
||||
'[%d/%d] this_run=%d %.1f/%.1f MiB ETA=%s %s'
|
||||
% (done, self.total, self.downloaded_this_run, done_mb, total_mb, eta, rate_str)
|
||||
)
|
||||
|
||||
|
||||
class download_requirements_t:
|
||||
@staticmethod
|
||||
def parse_requirements(txt: str) -> list[tuple[str, str]]:
|
||||
entries: list[tuple[str, str]] = []
|
||||
url: Optional[str] = None
|
||||
|
||||
for line in txt.splitlines():
|
||||
line = line.strip()
|
||||
if line == '':
|
||||
continue
|
||||
if line.startswith('#'):
|
||||
candidate = line[1:].strip()
|
||||
# strip trailing annotation like "URL # pinned"
|
||||
if ' #' in candidate:
|
||||
candidate = candidate.split(' #', 1)[0].strip()
|
||||
if '/' in candidate and '://' in candidate:
|
||||
url = candidate
|
||||
continue
|
||||
|
||||
# strip trailing inline comment (e.g. "pkg==1.0 # pinned")
|
||||
if ' #' in line:
|
||||
line = line.split(' #', 1)[0].strip()
|
||||
|
||||
parts = line.split()
|
||||
if len(parts) == 0:
|
||||
continue
|
||||
|
||||
pkg_spec = parts[0]
|
||||
|
||||
if url is not None:
|
||||
filename = url.rsplit('/', 1)[-1] if '/' in url else pkg_spec
|
||||
entries.append((url, filename))
|
||||
url = None
|
||||
|
||||
return entries
|
||||
|
||||
|
||||
def main(args: list[str]) -> int:
|
||||
download_parser = argparse.ArgumentParser(
|
||||
prog='online-fxreader-pr34-archlinux download',
|
||||
)
|
||||
download_parser.add_argument(
|
||||
'-r',
|
||||
dest='requirements',
|
||||
required=True,
|
||||
help='path to compiled requirements file',
|
||||
)
|
||||
download_parser.add_argument(
|
||||
'-d',
|
||||
dest='dest_dir',
|
||||
required=True,
|
||||
help='destination directory for downloaded packages',
|
||||
)
|
||||
download_parser.add_argument(
|
||||
'--downloader',
|
||||
choices=[o.value for o in downloader_t.constants_t.backend_t],
|
||||
default='urllib',
|
||||
)
|
||||
download_parser.add_argument(
|
||||
'--limit-rate',
|
||||
dest='limit_rate',
|
||||
default='128KiB/s',
|
||||
help='download speed limit (e.g. 128KiB/s, 1MiB/s, 512K), default 128KiB/s',
|
||||
)
|
||||
download_parser.add_argument(
|
||||
'--progress',
|
||||
default=False,
|
||||
action=argparse.BooleanOptionalAction,
|
||||
help='show download progress',
|
||||
)
|
||||
download_parser.add_argument(
|
||||
'--progress-mode',
|
||||
dest='progress_mode',
|
||||
choices=[o.value for o in progress_t.constants_t.mode_t],
|
||||
default='plain',
|
||||
help='progress display mode (default: plain)',
|
||||
)
|
||||
download_parser.add_argument(
|
||||
'-j',
|
||||
dest='jobs',
|
||||
type=int,
|
||||
default=1,
|
||||
help='parallel downloads (default: 1). For aria2c, passed as -j to aria2c directly.',
|
||||
)
|
||||
|
||||
download_options = download_parser.parse_args(args)
|
||||
|
||||
dest_dir = pathlib.Path(download_options.dest_dir)
|
||||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
backend = downloader_t.constants_t.backend_t(download_options.downloader)
|
||||
limit_rate = parse_rate_t.parse(download_options.limit_rate)
|
||||
jobs: int = download_options.jobs
|
||||
|
||||
requirements_txt = pathlib.Path(download_options.requirements).read_text()
|
||||
entries = download_requirements_t.parse_requirements(requirements_txt)
|
||||
|
||||
# split into already-done vs to-download
|
||||
to_download: list[tuple[str, str]] = []
|
||||
already_count = 0
|
||||
already_bytes = 0
|
||||
total_bytes = 0
|
||||
|
||||
for url, filename in entries:
|
||||
dest_path = dest_dir / filename
|
||||
if dest_path.exists():
|
||||
already_count += 1
|
||||
sz = dest_path.stat().st_size
|
||||
already_bytes += sz
|
||||
total_bytes += sz
|
||||
else:
|
||||
to_download.append((url, filename))
|
||||
|
||||
# estimate total bytes (already + to_download as average of already)
|
||||
avg_size = already_bytes // already_count if already_count > 0 else 10 * 1024 * 1024
|
||||
total_bytes += avg_size * len(to_download)
|
||||
|
||||
progress: Optional[progress_t] = None
|
||||
if download_options.progress:
|
||||
progress = progress_t(
|
||||
total=len(entries),
|
||||
total_bytes=total_bytes,
|
||||
already_done=already_count,
|
||||
already_bytes=already_bytes,
|
||||
)
|
||||
if len(to_download) == 0:
|
||||
print(progress.format_plain())
|
||||
|
||||
# aria2c with -j: batch all into single process
|
||||
if backend is downloader_t.constants_t.backend_t.aria2c and jobs > 1 and len(to_download) > 0:
|
||||
batch = [(url, dest_dir / filename) for url, filename in to_download]
|
||||
downloader_t.download_batch_aria2c(batch, limit_rate, jobs)
|
||||
if progress is not None:
|
||||
for url, filename in to_download:
|
||||
dest_path = dest_dir / filename
|
||||
sz = dest_path.stat().st_size if dest_path.exists() else avg_size
|
||||
progress.update(sz)
|
||||
total_bytes = total_bytes - avg_size + sz
|
||||
progress.total_bytes = total_bytes
|
||||
print(progress.format_plain())
|
||||
logger.info(dict(msg='download complete', count=len(entries)))
|
||||
return 0
|
||||
|
||||
def _download_one(url: str, filename: str) -> int:
|
||||
dest_path = dest_dir / filename
|
||||
logger.debug(dict(msg='downloading', url=url, dest=str(dest_path)))
|
||||
downloader_t.download(
|
||||
url=url,
|
||||
dest=dest_path,
|
||||
backend=backend,
|
||||
limit_rate=limit_rate,
|
||||
)
|
||||
return dest_path.stat().st_size if dest_path.exists() else 0
|
||||
|
||||
if jobs > 1 and backend is not downloader_t.constants_t.backend_t.aria2c:
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=jobs) as executor:
|
||||
futures = {
|
||||
executor.submit(_download_one, url, filename): (url, filename)
|
||||
for url, filename in to_download
|
||||
}
|
||||
for future in concurrent.futures.as_completed(futures):
|
||||
sz = future.result()
|
||||
if progress is not None:
|
||||
progress.update(sz)
|
||||
print(progress.format_plain())
|
||||
else:
|
||||
for url, filename in to_download:
|
||||
sz = _download_one(url, filename)
|
||||
if progress is not None:
|
||||
progress.update(sz)
|
||||
print(progress.format_plain())
|
||||
|
||||
logger.info(dict(msg='download complete', count=len(entries)))
|
||||
|
||||
return 0
|
||||
@ -0,0 +1,338 @@
|
||||
"""Tests for the download CLI command.
|
||||
|
||||
Test matrix dimensions:
|
||||
backend: urllib | curl | aria2c (3)
|
||||
jobs: 1 | 2 (2)
|
||||
progress: off | on (2)
|
||||
existing files: none | some | all (3)
|
||||
|
||||
Total entries: 3 * 2 * 2 * 3 = 36
|
||||
|
||||
Not all combinations are meaningful (aria2c+j>1 uses batch path, others use
|
||||
ThreadPoolExecutor or serial). Coverage below exercises every axis and the
|
||||
critical cross-products.
|
||||
|
||||
Additional unit tests:
|
||||
- progress_t formatting
|
||||
- download_batch_aria2c input-file generation
|
||||
"""
|
||||
|
||||
import pathlib
|
||||
import tempfile
|
||||
import unittest
|
||||
import unittest.mock
|
||||
|
||||
from ..cli.download import (
|
||||
downloader_t,
|
||||
progress_t,
|
||||
)
|
||||
|
||||
|
||||
def _make_requirements(tmpdir: str) -> tuple[pathlib.Path, pathlib.Path]:
|
||||
req = pathlib.Path(tmpdir) / 'req.txt'
|
||||
dest = pathlib.Path(tmpdir) / 'pkgs'
|
||||
req.write_text(
|
||||
'# https://example.com/bash-5.2-1-x86_64.pkg.tar.zst\n'
|
||||
'bash==5.2-1\n'
|
||||
'# https://example.com/glibc-2.38-1-x86_64.pkg.tar.zst\n'
|
||||
'glibc==2.38-1\n'
|
||||
'# https://example.com/zlib-1.3-1-x86_64.pkg.tar.zst\n'
|
||||
'zlib==1.3-1\n'
|
||||
)
|
||||
return req, dest
|
||||
|
||||
|
||||
def _prefill(dest: pathlib.Path, filenames: list[str]) -> None:
|
||||
dest.mkdir(parents=True, exist_ok=True)
|
||||
for f in filenames:
|
||||
(dest / f).write_bytes(b'\x00' * 200)
|
||||
|
||||
|
||||
def _fake_download(url: str, dest: pathlib.Path, **kwargs: object) -> None:
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
dest.write_bytes(b'\x00' * 200)
|
||||
|
||||
|
||||
def _fake_batch(
|
||||
entries: list[tuple[str, pathlib.Path]],
|
||||
limit_rate: int = 0,
|
||||
jobs: int = 1,
|
||||
) -> None:
|
||||
for _url, dest in entries:
|
||||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||||
dest.write_bytes(b'\x00' * 200)
|
||||
|
||||
|
||||
def _run(extra_args: list[str], tmpdir: str) -> tuple[int, pathlib.Path, pathlib.Path]:
|
||||
req, dest = _make_requirements(tmpdir)
|
||||
|
||||
from ..cli.download import main as download_main
|
||||
|
||||
rc = download_main(['-r', str(req), '-d', str(dest)] + extra_args)
|
||||
return rc, req, dest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# progress_t unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestProgressFormat(unittest.TestCase):
|
||||
def test_initial(self) -> None:
|
||||
p = progress_t(total=10, total_bytes=100 * 1024 * 1024, already_done=3, already_bytes=30 * 1024 * 1024)
|
||||
txt = p.format_plain()
|
||||
self.assertIn('[3/10]', txt)
|
||||
self.assertIn('this_run=0', txt)
|
||||
|
||||
def test_after_updates(self) -> None:
|
||||
p = progress_t(total=10, total_bytes=100 * 1024 * 1024, already_done=0, already_bytes=0)
|
||||
p.update(5 * 1024 * 1024)
|
||||
p.update(5 * 1024 * 1024)
|
||||
txt = p.format_plain()
|
||||
self.assertIn('[2/10]', txt)
|
||||
self.assertIn('this_run=2', txt)
|
||||
|
||||
def test_eta_and_rate(self) -> None:
|
||||
p = progress_t(total=100, total_bytes=1000 * 1024 * 1024, already_done=0, already_bytes=0)
|
||||
p.start_time -= 5.0 # simulate 5s elapsed for 10 pkgs → 2 pkg/s
|
||||
for _ in range(10):
|
||||
p.update(10 * 1024 * 1024)
|
||||
txt = p.format_plain()
|
||||
self.assertIn('ETA=', txt)
|
||||
self.assertIn('pkg/s', txt)
|
||||
|
||||
def test_slow_rate_shows_s_per_pkg(self) -> None:
|
||||
p = progress_t(total=10, total_bytes=100 * 1024 * 1024, already_done=0, already_bytes=0)
|
||||
p.start_time -= 30.0 # 30s for 1 package → 30 s/pkg
|
||||
p.update(10 * 1024 * 1024)
|
||||
txt = p.format_plain()
|
||||
self.assertIn('s/pkg', txt)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# download_batch_aria2c unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestBatchAria2c(unittest.TestCase):
|
||||
@unittest.mock.patch('subprocess.check_call')
|
||||
def test_writes_input_and_passes_j(self, mock_cc: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
dest = pathlib.Path(tmpdir)
|
||||
entries = [
|
||||
('https://example.com/a.pkg', dest / 'a.pkg'),
|
||||
('https://example.com/b.pkg', dest / 'b.pkg'),
|
||||
]
|
||||
downloader_t.download_batch_aria2c(entries, limit_rate=1024, jobs=4)
|
||||
cmd = mock_cc.call_args[0][0]
|
||||
self.assertEqual(cmd[0], 'aria2c')
|
||||
j_idx = cmd.index('-j')
|
||||
self.assertEqual(cmd[j_idx + 1], '4')
|
||||
|
||||
@unittest.mock.patch('subprocess.check_call')
|
||||
def test_empty_noop(self, mock_cc: unittest.mock.MagicMock) -> None:
|
||||
downloader_t.download_batch_aria2c([], limit_rate=1024, jobs=2)
|
||||
mock_cc.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Full CLI matrix: backend x jobs x progress x existing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDownloadCLI_Urllib_J1(unittest.TestCase):
|
||||
"""backend=urllib, jobs=1"""
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_none_existing_no_progress(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
rc, _, _ = _run([], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 3)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_none_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
rc, _, _ = _run(['--progress'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 3)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_some_existing_no_progress(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, ['bash-5.2-1-x86_64.pkg.tar.zst'])
|
||||
rc, _, _ = _run([], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 2)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_some_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, ['bash-5.2-1-x86_64.pkg.tar.zst'])
|
||||
rc, _, _ = _run(['--progress'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 2)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_all_existing_no_progress(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, [
|
||||
'bash-5.2-1-x86_64.pkg.tar.zst',
|
||||
'glibc-2.38-1-x86_64.pkg.tar.zst',
|
||||
'zlib-1.3-1-x86_64.pkg.tar.zst',
|
||||
])
|
||||
rc, _, _ = _run([], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 0)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_all_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, [
|
||||
'bash-5.2-1-x86_64.pkg.tar.zst',
|
||||
'glibc-2.38-1-x86_64.pkg.tar.zst',
|
||||
'zlib-1.3-1-x86_64.pkg.tar.zst',
|
||||
])
|
||||
rc, _, _ = _run(['--progress'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 0)
|
||||
|
||||
|
||||
class TestDownloadCLI_Urllib_J2(unittest.TestCase):
|
||||
"""backend=urllib, jobs=2 (ThreadPoolExecutor path)"""
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_none_existing_no_progress(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
rc, _, _ = _run(['-j', '2'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 3)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_none_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
rc, _, _ = _run(['-j', '2', '--progress'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 3)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_some_existing(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, ['bash-5.2-1-x86_64.pkg.tar.zst'])
|
||||
rc, _, _ = _run(['-j', '2'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 2)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_all_existing(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, [
|
||||
'bash-5.2-1-x86_64.pkg.tar.zst',
|
||||
'glibc-2.38-1-x86_64.pkg.tar.zst',
|
||||
'zlib-1.3-1-x86_64.pkg.tar.zst',
|
||||
])
|
||||
rc, _, _ = _run(['-j', '2'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 0)
|
||||
|
||||
|
||||
class TestDownloadCLI_Curl_J1(unittest.TestCase):
|
||||
"""backend=curl, jobs=1"""
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_none_existing(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
rc, _, _ = _run(['--downloader', 'curl'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 3)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_some_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, ['glibc-2.38-1-x86_64.pkg.tar.zst'])
|
||||
rc, _, _ = _run(['--downloader', 'curl', '--progress'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 2)
|
||||
|
||||
|
||||
class TestDownloadCLI_Curl_J2(unittest.TestCase):
|
||||
"""backend=curl, jobs=2"""
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_none_existing(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
rc, _, _ = _run(['--downloader', 'curl', '-j', '2'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 3)
|
||||
|
||||
|
||||
class TestDownloadCLI_Aria2c_J1(unittest.TestCase):
|
||||
"""backend=aria2c, jobs=1 (serial, individual download calls)"""
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_none_existing(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
rc, _, _ = _run(['--downloader', 'aria2c'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 3)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download)
|
||||
def test_all_existing(self, mock_dl: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, [
|
||||
'bash-5.2-1-x86_64.pkg.tar.zst',
|
||||
'glibc-2.38-1-x86_64.pkg.tar.zst',
|
||||
'zlib-1.3-1-x86_64.pkg.tar.zst',
|
||||
])
|
||||
rc, _, _ = _run(['--downloader', 'aria2c'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
self.assertEqual(mock_dl.call_count, 0)
|
||||
|
||||
|
||||
class TestDownloadCLI_Aria2c_J2(unittest.TestCase):
|
||||
"""backend=aria2c, jobs=2 (batch path via download_batch_aria2c)"""
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download_batch_aria2c', side_effect=_fake_batch)
|
||||
def test_none_existing(self, mock_batch: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
rc, _, _ = _run(['--downloader', 'aria2c', '-j', '2'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
mock_batch.assert_called_once()
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download_batch_aria2c', side_effect=_fake_batch)
|
||||
def test_none_existing_progress(self, mock_batch: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
rc, _, _ = _run(['--downloader', 'aria2c', '-j', '2', '--progress'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
mock_batch.assert_called_once()
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download_batch_aria2c', side_effect=_fake_batch)
|
||||
def test_some_existing(self, mock_batch: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, ['bash-5.2-1-x86_64.pkg.tar.zst'])
|
||||
rc, _, _ = _run(['--downloader', 'aria2c', '-j', '2'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
mock_batch.assert_called_once()
|
||||
# only 2 entries passed to batch (glibc + zlib)
|
||||
batch_entries = mock_batch.call_args[0][0]
|
||||
self.assertEqual(len(batch_entries), 2)
|
||||
|
||||
@unittest.mock.patch.object(downloader_t, 'download_batch_aria2c', side_effect=_fake_batch)
|
||||
def test_all_existing(self, mock_batch: unittest.mock.MagicMock) -> None:
|
||||
with tempfile.TemporaryDirectory() as t:
|
||||
_, dest = _make_requirements(t)
|
||||
_prefill(dest, [
|
||||
'bash-5.2-1-x86_64.pkg.tar.zst',
|
||||
'glibc-2.38-1-x86_64.pkg.tar.zst',
|
||||
'zlib-1.3-1-x86_64.pkg.tar.zst',
|
||||
])
|
||||
rc, _, _ = _run(['--downloader', 'aria2c', '-j', '2'], t)
|
||||
self.assertEqual(rc, 0)
|
||||
# nothing to download → batch not called
|
||||
mock_batch.assert_not_called()
|
||||
Loading…
Reference in New Issue
Block a user