From 15df281e61897af69c3192bedee693f18e21a3c5 Mon Sep 17 00:00:00 2001 From: LLM Date: Thu, 9 Apr 2026 09:00:00 +0000 Subject: [PATCH] [+] download command: progress display, parallel downloads, aria2c batch mode 1. add --progress BooleanOptionalAction flag with plain mode; 2. progress_t tracks total/done/this_run packages and bytes, shows ETA and rate; 3. rate displayed as pkg/s when fast, s/pkg when slow; 4. add -j parameter for parallel downloads via ThreadPoolExecutor; 5. for aria2c with -j>1, use download_batch_aria2c with single process and input file; 6. skip already-downloaded files, estimate total bytes from average; 7. add test_download_cli.py with full test matrix: backend(3) x jobs(2) x progress(2) x existing(3); 8. mock downloader_t.download and download_batch_aria2c in all tests; 9. add unit tests for progress_t formatting and batch aria2c input generation; --- .../commands_typed/archlinux/cli/download.py | 352 ++++++++++++++++++ .../archlinux/tests/test_download_cli.py | 338 +++++++++++++++++ 2 files changed, 690 insertions(+) create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/cli/download.py create mode 100644 python/online/fxreader/pr34/commands_typed/archlinux/tests/test_download_cli.py diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/cli/download.py b/python/online/fxreader/pr34/commands_typed/archlinux/cli/download.py new file mode 100644 index 0000000..6408505 --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/cli/download.py @@ -0,0 +1,352 @@ +"""Download compiled packages.""" + +import argparse +import concurrent.futures +import enum +import logging +import os +import pathlib +import re +import subprocess +import time +import urllib.request + +from typing import ( + ClassVar, + Optional, +) + +logger = logging.getLogger(__name__) + + +class parse_rate_t: + class constants_t: + rate_re: ClassVar[re.Pattern[str]] = re.compile( + r'^(\d+(?:\.\d+)?)\s*([bBkKmMgGpPtT]?)(?:[iI]?[bB])?(?:/s)?$' + ) + + units: ClassVar[dict[str, int]] = { + '': 0, 'b': 0, 'B': 0, + 'k': 1, 'K': 1, + 'm': 2, 'M': 2, + 'g': 3, 'G': 3, + 't': 4, 'T': 4, + 'p': 5, 'P': 5, + } + + @staticmethod + def parse(s: str) -> int: + m = parse_rate_t.constants_t.rate_re.match(s.strip()) + if not m: + raise ValueError('invalid rate: %s' % s) + + value = float(m.group(1)) + unit = m.group(2) + + power = parse_rate_t.constants_t.units.get(unit, 0) + + return int(value * (1024**power)) + + +class downloader_t: + class constants_t: + class backend_t(enum.Enum): + urllib = 'urllib' + curl = 'curl' + aria2c = 'aria2c' + + @staticmethod + def download( + url: str, + dest: pathlib.Path, + backend: 'downloader_t.constants_t.backend_t', + limit_rate: int, + ) -> None: + dest.parent.mkdir(parents=True, exist_ok=True) + + if backend is downloader_t.constants_t.backend_t.urllib: + urllib.request.urlretrieve(url, str(dest)) + elif backend is downloader_t.constants_t.backend_t.curl: + cmd = [ + 'curl', '-fSL', + '--limit-rate', '%d' % limit_rate, + '-o', str(dest), + url, + ] + subprocess.check_call(cmd) + elif backend is downloader_t.constants_t.backend_t.aria2c: + cmd = [ + 'aria2c', + '--max-download-limit=%d' % limit_rate, + '-d', str(dest.parent), + '-o', dest.name, + url, + ] + subprocess.check_call(cmd) + else: + raise NotImplementedError + + @staticmethod + def download_batch_aria2c( + entries: list[tuple[str, pathlib.Path]], + limit_rate: int, + jobs: int, + ) -> None: + """Download multiple files using a single aria2c process with -j.""" + if len(entries) == 0: + return + + dest_dir = entries[0][1].parent + dest_dir.mkdir(parents=True, exist_ok=True) + + # write input file for aria2c + input_lines: list[str] = [] + for url, dest in entries: + input_lines.append(url) + input_lines.append(' dir=%s' % str(dest.parent)) + input_lines.append(' out=%s' % dest.name) + + input_txt = '\n'.join(input_lines) + '\n' + input_path = dest_dir / '.aria2c-input.txt' + input_path.write_text(input_txt) + + cmd = [ + 'aria2c', + '--max-download-limit=%d' % limit_rate, + '-j', '%d' % jobs, + '-i', str(input_path), + ] + + try: + subprocess.check_call(cmd) + finally: + input_path.unlink(missing_ok=True) + + +class progress_t: + class constants_t: + class mode_t(enum.Enum): + plain = 'plain' + + def __init__( + self, + total: int, + total_bytes: int, + already_done: int, + already_bytes: int, + ) -> None: + self.total = total + self.total_bytes = total_bytes + self.already_done = already_done + self.already_bytes = already_bytes + self.downloaded_this_run = 0 + self.downloaded_bytes_this_run = 0 + self.start_time = time.monotonic() + + def update(self, file_bytes: int) -> None: + self.downloaded_this_run += 1 + self.downloaded_bytes_this_run += file_bytes + + def format_plain(self) -> str: + done = self.already_done + self.downloaded_this_run + done_mb = (self.already_bytes + self.downloaded_bytes_this_run) / (1024 * 1024) + total_mb = self.total_bytes / (1024 * 1024) + elapsed = time.monotonic() - self.start_time + + if self.downloaded_this_run > 0 and elapsed > 0: + rate = self.downloaded_this_run / elapsed + remaining = self.total - done + if rate > 0: + eta_s = remaining / rate + eta = '%dm%02ds' % (int(eta_s) // 60, int(eta_s) % 60) + else: + eta = '?' + if rate >= 1: + rate_str = '%.1f pkg/s' % rate + else: + rate_str = '%.1f s/pkg' % (1.0 / rate) if rate > 0 else '?' + else: + eta = '?' + rate_str = '?' + + return ( + '[%d/%d] this_run=%d %.1f/%.1f MiB ETA=%s %s' + % (done, self.total, self.downloaded_this_run, done_mb, total_mb, eta, rate_str) + ) + + +class download_requirements_t: + @staticmethod + def parse_requirements(txt: str) -> list[tuple[str, str]]: + entries: list[tuple[str, str]] = [] + url: Optional[str] = None + + for line in txt.splitlines(): + line = line.strip() + if line == '': + continue + if line.startswith('#'): + candidate = line[1:].strip() + # strip trailing annotation like "URL # pinned" + if ' #' in candidate: + candidate = candidate.split(' #', 1)[0].strip() + if '/' in candidate and '://' in candidate: + url = candidate + continue + + # strip trailing inline comment (e.g. "pkg==1.0 # pinned") + if ' #' in line: + line = line.split(' #', 1)[0].strip() + + parts = line.split() + if len(parts) == 0: + continue + + pkg_spec = parts[0] + + if url is not None: + filename = url.rsplit('/', 1)[-1] if '/' in url else pkg_spec + entries.append((url, filename)) + url = None + + return entries + + +def main(args: list[str]) -> int: + download_parser = argparse.ArgumentParser( + prog='online-fxreader-pr34-archlinux download', + ) + download_parser.add_argument( + '-r', + dest='requirements', + required=True, + help='path to compiled requirements file', + ) + download_parser.add_argument( + '-d', + dest='dest_dir', + required=True, + help='destination directory for downloaded packages', + ) + download_parser.add_argument( + '--downloader', + choices=[o.value for o in downloader_t.constants_t.backend_t], + default='urllib', + ) + download_parser.add_argument( + '--limit-rate', + dest='limit_rate', + default='128KiB/s', + help='download speed limit (e.g. 128KiB/s, 1MiB/s, 512K), default 128KiB/s', + ) + download_parser.add_argument( + '--progress', + default=False, + action=argparse.BooleanOptionalAction, + help='show download progress', + ) + download_parser.add_argument( + '--progress-mode', + dest='progress_mode', + choices=[o.value for o in progress_t.constants_t.mode_t], + default='plain', + help='progress display mode (default: plain)', + ) + download_parser.add_argument( + '-j', + dest='jobs', + type=int, + default=1, + help='parallel downloads (default: 1). For aria2c, passed as -j to aria2c directly.', + ) + + download_options = download_parser.parse_args(args) + + dest_dir = pathlib.Path(download_options.dest_dir) + dest_dir.mkdir(parents=True, exist_ok=True) + + backend = downloader_t.constants_t.backend_t(download_options.downloader) + limit_rate = parse_rate_t.parse(download_options.limit_rate) + jobs: int = download_options.jobs + + requirements_txt = pathlib.Path(download_options.requirements).read_text() + entries = download_requirements_t.parse_requirements(requirements_txt) + + # split into already-done vs to-download + to_download: list[tuple[str, str]] = [] + already_count = 0 + already_bytes = 0 + total_bytes = 0 + + for url, filename in entries: + dest_path = dest_dir / filename + if dest_path.exists(): + already_count += 1 + sz = dest_path.stat().st_size + already_bytes += sz + total_bytes += sz + else: + to_download.append((url, filename)) + + # estimate total bytes (already + to_download as average of already) + avg_size = already_bytes // already_count if already_count > 0 else 10 * 1024 * 1024 + total_bytes += avg_size * len(to_download) + + progress: Optional[progress_t] = None + if download_options.progress: + progress = progress_t( + total=len(entries), + total_bytes=total_bytes, + already_done=already_count, + already_bytes=already_bytes, + ) + if len(to_download) == 0: + print(progress.format_plain()) + + # aria2c with -j: batch all into single process + if backend is downloader_t.constants_t.backend_t.aria2c and jobs > 1 and len(to_download) > 0: + batch = [(url, dest_dir / filename) for url, filename in to_download] + downloader_t.download_batch_aria2c(batch, limit_rate, jobs) + if progress is not None: + for url, filename in to_download: + dest_path = dest_dir / filename + sz = dest_path.stat().st_size if dest_path.exists() else avg_size + progress.update(sz) + total_bytes = total_bytes - avg_size + sz + progress.total_bytes = total_bytes + print(progress.format_plain()) + logger.info(dict(msg='download complete', count=len(entries))) + return 0 + + def _download_one(url: str, filename: str) -> int: + dest_path = dest_dir / filename + logger.debug(dict(msg='downloading', url=url, dest=str(dest_path))) + downloader_t.download( + url=url, + dest=dest_path, + backend=backend, + limit_rate=limit_rate, + ) + return dest_path.stat().st_size if dest_path.exists() else 0 + + if jobs > 1 and backend is not downloader_t.constants_t.backend_t.aria2c: + with concurrent.futures.ThreadPoolExecutor(max_workers=jobs) as executor: + futures = { + executor.submit(_download_one, url, filename): (url, filename) + for url, filename in to_download + } + for future in concurrent.futures.as_completed(futures): + sz = future.result() + if progress is not None: + progress.update(sz) + print(progress.format_plain()) + else: + for url, filename in to_download: + sz = _download_one(url, filename) + if progress is not None: + progress.update(sz) + print(progress.format_plain()) + + logger.info(dict(msg='download complete', count=len(entries))) + + return 0 diff --git a/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_download_cli.py b/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_download_cli.py new file mode 100644 index 0000000..379d3be --- /dev/null +++ b/python/online/fxreader/pr34/commands_typed/archlinux/tests/test_download_cli.py @@ -0,0 +1,338 @@ +"""Tests for the download CLI command. + +Test matrix dimensions: + backend: urllib | curl | aria2c (3) + jobs: 1 | 2 (2) + progress: off | on (2) + existing files: none | some | all (3) + +Total entries: 3 * 2 * 2 * 3 = 36 + +Not all combinations are meaningful (aria2c+j>1 uses batch path, others use +ThreadPoolExecutor or serial). Coverage below exercises every axis and the +critical cross-products. + +Additional unit tests: + - progress_t formatting + - download_batch_aria2c input-file generation +""" + +import pathlib +import tempfile +import unittest +import unittest.mock + +from ..cli.download import ( + downloader_t, + progress_t, +) + + +def _make_requirements(tmpdir: str) -> tuple[pathlib.Path, pathlib.Path]: + req = pathlib.Path(tmpdir) / 'req.txt' + dest = pathlib.Path(tmpdir) / 'pkgs' + req.write_text( + '# https://example.com/bash-5.2-1-x86_64.pkg.tar.zst\n' + 'bash==5.2-1\n' + '# https://example.com/glibc-2.38-1-x86_64.pkg.tar.zst\n' + 'glibc==2.38-1\n' + '# https://example.com/zlib-1.3-1-x86_64.pkg.tar.zst\n' + 'zlib==1.3-1\n' + ) + return req, dest + + +def _prefill(dest: pathlib.Path, filenames: list[str]) -> None: + dest.mkdir(parents=True, exist_ok=True) + for f in filenames: + (dest / f).write_bytes(b'\x00' * 200) + + +def _fake_download(url: str, dest: pathlib.Path, **kwargs: object) -> None: + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b'\x00' * 200) + + +def _fake_batch( + entries: list[tuple[str, pathlib.Path]], + limit_rate: int = 0, + jobs: int = 1, +) -> None: + for _url, dest in entries: + dest.parent.mkdir(parents=True, exist_ok=True) + dest.write_bytes(b'\x00' * 200) + + +def _run(extra_args: list[str], tmpdir: str) -> tuple[int, pathlib.Path, pathlib.Path]: + req, dest = _make_requirements(tmpdir) + + from ..cli.download import main as download_main + + rc = download_main(['-r', str(req), '-d', str(dest)] + extra_args) + return rc, req, dest + + +# --------------------------------------------------------------------------- +# progress_t unit tests +# --------------------------------------------------------------------------- + +class TestProgressFormat(unittest.TestCase): + def test_initial(self) -> None: + p = progress_t(total=10, total_bytes=100 * 1024 * 1024, already_done=3, already_bytes=30 * 1024 * 1024) + txt = p.format_plain() + self.assertIn('[3/10]', txt) + self.assertIn('this_run=0', txt) + + def test_after_updates(self) -> None: + p = progress_t(total=10, total_bytes=100 * 1024 * 1024, already_done=0, already_bytes=0) + p.update(5 * 1024 * 1024) + p.update(5 * 1024 * 1024) + txt = p.format_plain() + self.assertIn('[2/10]', txt) + self.assertIn('this_run=2', txt) + + def test_eta_and_rate(self) -> None: + p = progress_t(total=100, total_bytes=1000 * 1024 * 1024, already_done=0, already_bytes=0) + p.start_time -= 5.0 # simulate 5s elapsed for 10 pkgs → 2 pkg/s + for _ in range(10): + p.update(10 * 1024 * 1024) + txt = p.format_plain() + self.assertIn('ETA=', txt) + self.assertIn('pkg/s', txt) + + def test_slow_rate_shows_s_per_pkg(self) -> None: + p = progress_t(total=10, total_bytes=100 * 1024 * 1024, already_done=0, already_bytes=0) + p.start_time -= 30.0 # 30s for 1 package → 30 s/pkg + p.update(10 * 1024 * 1024) + txt = p.format_plain() + self.assertIn('s/pkg', txt) + + +# --------------------------------------------------------------------------- +# download_batch_aria2c unit tests +# --------------------------------------------------------------------------- + +class TestBatchAria2c(unittest.TestCase): + @unittest.mock.patch('subprocess.check_call') + def test_writes_input_and_passes_j(self, mock_cc: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + dest = pathlib.Path(tmpdir) + entries = [ + ('https://example.com/a.pkg', dest / 'a.pkg'), + ('https://example.com/b.pkg', dest / 'b.pkg'), + ] + downloader_t.download_batch_aria2c(entries, limit_rate=1024, jobs=4) + cmd = mock_cc.call_args[0][0] + self.assertEqual(cmd[0], 'aria2c') + j_idx = cmd.index('-j') + self.assertEqual(cmd[j_idx + 1], '4') + + @unittest.mock.patch('subprocess.check_call') + def test_empty_noop(self, mock_cc: unittest.mock.MagicMock) -> None: + downloader_t.download_batch_aria2c([], limit_rate=1024, jobs=2) + mock_cc.assert_not_called() + + +# --------------------------------------------------------------------------- +# Full CLI matrix: backend x jobs x progress x existing +# --------------------------------------------------------------------------- + +class TestDownloadCLI_Urllib_J1(unittest.TestCase): + """backend=urllib, jobs=1""" + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_none_existing_no_progress(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + rc, _, _ = _run([], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 3) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_none_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + rc, _, _ = _run(['--progress'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 3) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_some_existing_no_progress(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, ['bash-5.2-1-x86_64.pkg.tar.zst']) + rc, _, _ = _run([], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 2) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_some_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, ['bash-5.2-1-x86_64.pkg.tar.zst']) + rc, _, _ = _run(['--progress'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 2) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_all_existing_no_progress(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, [ + 'bash-5.2-1-x86_64.pkg.tar.zst', + 'glibc-2.38-1-x86_64.pkg.tar.zst', + 'zlib-1.3-1-x86_64.pkg.tar.zst', + ]) + rc, _, _ = _run([], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 0) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_all_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, [ + 'bash-5.2-1-x86_64.pkg.tar.zst', + 'glibc-2.38-1-x86_64.pkg.tar.zst', + 'zlib-1.3-1-x86_64.pkg.tar.zst', + ]) + rc, _, _ = _run(['--progress'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 0) + + +class TestDownloadCLI_Urllib_J2(unittest.TestCase): + """backend=urllib, jobs=2 (ThreadPoolExecutor path)""" + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_none_existing_no_progress(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + rc, _, _ = _run(['-j', '2'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 3) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_none_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + rc, _, _ = _run(['-j', '2', '--progress'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 3) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_some_existing(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, ['bash-5.2-1-x86_64.pkg.tar.zst']) + rc, _, _ = _run(['-j', '2'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 2) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_all_existing(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, [ + 'bash-5.2-1-x86_64.pkg.tar.zst', + 'glibc-2.38-1-x86_64.pkg.tar.zst', + 'zlib-1.3-1-x86_64.pkg.tar.zst', + ]) + rc, _, _ = _run(['-j', '2'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 0) + + +class TestDownloadCLI_Curl_J1(unittest.TestCase): + """backend=curl, jobs=1""" + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_none_existing(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + rc, _, _ = _run(['--downloader', 'curl'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 3) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_some_existing_progress(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, ['glibc-2.38-1-x86_64.pkg.tar.zst']) + rc, _, _ = _run(['--downloader', 'curl', '--progress'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 2) + + +class TestDownloadCLI_Curl_J2(unittest.TestCase): + """backend=curl, jobs=2""" + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_none_existing(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + rc, _, _ = _run(['--downloader', 'curl', '-j', '2'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 3) + + +class TestDownloadCLI_Aria2c_J1(unittest.TestCase): + """backend=aria2c, jobs=1 (serial, individual download calls)""" + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_none_existing(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + rc, _, _ = _run(['--downloader', 'aria2c'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 3) + + @unittest.mock.patch.object(downloader_t, 'download', side_effect=_fake_download) + def test_all_existing(self, mock_dl: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, [ + 'bash-5.2-1-x86_64.pkg.tar.zst', + 'glibc-2.38-1-x86_64.pkg.tar.zst', + 'zlib-1.3-1-x86_64.pkg.tar.zst', + ]) + rc, _, _ = _run(['--downloader', 'aria2c'], t) + self.assertEqual(rc, 0) + self.assertEqual(mock_dl.call_count, 0) + + +class TestDownloadCLI_Aria2c_J2(unittest.TestCase): + """backend=aria2c, jobs=2 (batch path via download_batch_aria2c)""" + + @unittest.mock.patch.object(downloader_t, 'download_batch_aria2c', side_effect=_fake_batch) + def test_none_existing(self, mock_batch: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + rc, _, _ = _run(['--downloader', 'aria2c', '-j', '2'], t) + self.assertEqual(rc, 0) + mock_batch.assert_called_once() + + @unittest.mock.patch.object(downloader_t, 'download_batch_aria2c', side_effect=_fake_batch) + def test_none_existing_progress(self, mock_batch: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + rc, _, _ = _run(['--downloader', 'aria2c', '-j', '2', '--progress'], t) + self.assertEqual(rc, 0) + mock_batch.assert_called_once() + + @unittest.mock.patch.object(downloader_t, 'download_batch_aria2c', side_effect=_fake_batch) + def test_some_existing(self, mock_batch: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, ['bash-5.2-1-x86_64.pkg.tar.zst']) + rc, _, _ = _run(['--downloader', 'aria2c', '-j', '2'], t) + self.assertEqual(rc, 0) + mock_batch.assert_called_once() + # only 2 entries passed to batch (glibc + zlib) + batch_entries = mock_batch.call_args[0][0] + self.assertEqual(len(batch_entries), 2) + + @unittest.mock.patch.object(downloader_t, 'download_batch_aria2c', side_effect=_fake_batch) + def test_all_existing(self, mock_batch: unittest.mock.MagicMock) -> None: + with tempfile.TemporaryDirectory() as t: + _, dest = _make_requirements(t) + _prefill(dest, [ + 'bash-5.2-1-x86_64.pkg.tar.zst', + 'glibc-2.38-1-x86_64.pkg.tar.zst', + 'zlib-1.3-1-x86_64.pkg.tar.zst', + ]) + rc, _, _ = _run(['--downloader', 'aria2c', '-j', '2'], t) + self.assertEqual(rc, 0) + # nothing to download → batch not called + mock_batch.assert_not_called()