#!/usr/bin/env python3 import contextlib import glob import importlib import json import io import tempfile import dataclasses import pathlib import sys import subprocess import os import logging import re import typing from typing import ( Optional, Any, cast, Type, TypeVar, Callable, overload, ) if typing.TYPE_CHECKING: from typing_extensions import ( Self, BinaryIO, ) logger = logging.getLogger(__name__) def toml_load(f: 'BinaryIO') -> Any: try: tomllib = importlib.import_module('tomllib') return cast( Callable[[Any], Any], getattr( tomllib, 'load', ), )(f) except ModuleNotFoundError: pass try: import tomli return tomli.load(f) except ModuleNotFoundError: pass raise NotImplementedError @dataclasses.dataclass class PyProject: @dataclasses.dataclass class Module: name: str meson: Optional[pathlib.Path] = None tool: dict[str, Any] = dataclasses.field(default_factory=lambda: dict()) scripts: dict[str, str] = dataclasses.field(default_factory=lambda: dict()) project: dict[str, Any] = dataclasses.field(default_factory=lambda: dict()) path: pathlib.Path dependencies: dict[str, list[str]] name: Optional[str] = None version: Optional[str] = None early_features: Optional[list[str]] = None pip_find_links: Optional[list[pathlib.Path]] = None runtime_libdirs: Optional[list[pathlib.Path]] = None runtime_preload: Optional[list[pathlib.Path]] = None @dataclasses.dataclass class ThirdPartyRoot: package: Optional[str] = None module_root: Optional[str] = None path: Optional[str] = None third_party_roots: list[ThirdPartyRoot] = dataclasses.field( default_factory=lambda: [], ) requirements: dict[str, pathlib.Path] = dataclasses.field(default_factory=lambda: dict()) modules: list[Module] = dataclasses.field( default_factory=lambda: [], ) tool: dict[str, Any] = dataclasses.field( default_factory=lambda: dict(), ) Key = TypeVar('Key') Value = TypeVar('Value') @overload def check_dict( value: Any, KT: Type[Key], VT: Type[Value], ) -> dict[Key, Value]: ... @overload def check_dict( value: Any, KT: Type[Key], ) -> dict[Key, Any]: ... def check_dict( value: Any, KT: Type[Key], VT: Optional[Type[Value]] = None, ) -> dict[Key, Value]: assert isinstance(value, dict) value2 = cast(dict[Any, Any], value) VT_class: Optional[type[Any]] = None if not VT is None: if not typing.get_origin(VT) is None: VT_class = cast(type[Any], typing.get_origin(VT)) else: VT_class = VT assert all( [ isinstance(k, KT) and (VT_class is None or isinstance(v, VT_class)) for k, v in value2.items() ] ) if VT is None: return cast( dict[Key, Any], value, ) else: return cast( dict[Key, Value], value, ) @overload def check_list( value: Any, VT: Type[Value], ) -> list[Value]: ... @overload def check_list( value: Any, ) -> list[Any]: ... def check_list( value: Any, VT: Optional[Type[Value]] = None, ) -> list[Value] | list[Any]: assert isinstance(value, list) value2 = cast(list[Any], value) assert all([(VT is None or isinstance(o, VT)) for o in value2]) if VT is None: return cast( list[Any], value, ) else: return cast( list[Value], value, ) def check_type( value: Any, VT: Type[Value], attribute_name: Optional[str] = None, ) -> Value: if attribute_name: attribute_value = getattr(value, attribute_name) assert isinstance(attribute_value, VT) return attribute_value else: assert isinstance(value, VT) return value def pyproject_load( d: pathlib.Path, ) -> PyProject: with io.open(d, 'rb') as f: content = toml_load(f) assert isinstance(content, dict) dependencies: dict[str, list[str]] = dict() dependencies['default'] = content['project']['dependencies'] if 'optional-dependencies' in content['project']: assert isinstance(content['project']['optional-dependencies'], dict) for k, v in check_dict( check_dict( check_dict( content, str, # Any, )['project'], str, # Any, )['optional-dependencies'], str, list[Any], ).items(): # assert isinstance(v, list) # assert isinstance(k, str) dependencies[k] = v name: Optional[str] = None if 'name' in content.get('project', {}): name = content['project']['name'] version: Optional[str] = None if 'version' in content.get('project', {}): version = content['project']['version'] res = PyProject( path=d, dependencies=dependencies, name=name, version=version, ) tool_name = 'online.fxreader.pr34'.replace('.', '-') if 'tool' in content: res.tool = check_dict( content['tool'], str, ) if ( 'tool' in content and isinstance(content['tool'], dict) and tool_name in content['tool'] and isinstance(content['tool'][tool_name], dict) ): pr34_tool = check_dict( check_dict( content['tool'], str, )[tool_name], str, ) if 'early_features' in pr34_tool: res.early_features = pr34_tool['early_features'] if 'pip_find_links' in pr34_tool: res.pip_find_links = [d.parent / pathlib.Path(o) for o in pr34_tool['pip_find_links']] if 'runtime_libdirs' in pr34_tool: res.runtime_libdirs = [ d.parent / pathlib.Path(o) # pathlib.Path(o) for o in check_list(pr34_tool['runtime_libdirs'], str) ] if 'runtime_preload' in pr34_tool: res.runtime_preload = [ d.parent / pathlib.Path(o) # pathlib.Path(o) for o in check_list(pr34_tool['runtime_preload'], str) ] if 'third_party_roots' in pr34_tool: for o in check_list(pr34_tool['third_party_roots']): o2 = check_dict(o, str, str) assert all([k in {'package', 'module_root', 'path'} for k in o2]) res.third_party_roots.append( PyProject.ThirdPartyRoot( package=o2.get('package'), module_root=o2.get('module_root'), path=o2.get('path'), ) ) if 'requirements' in pr34_tool: res.requirements = { k: d.parent / pathlib.Path(v) # pathlib.Path(o) for k, v in check_dict(pr34_tool['requirements'], str, str).items() } if 'modules' in pr34_tool: modules = check_list(pr34_tool['modules']) # res.modules = [] for o in modules: assert isinstance(o, dict) assert 'name' in o and isinstance(o['name'], str) module = PyProject.Module( name=o['name'], ) if 'meson' in o: assert 'meson' in o and isinstance(o['meson'], str) module.meson = pathlib.Path(o['meson']) if 'tool' in o: module.tool.update( check_dict( o['tool'], str, ) ) if 'scripts' in o: module.scripts.update( check_dict( o['scripts'], str, str, ) ) if 'project' in o: module.project.update( check_dict( o['project'], str, ) ) res.modules.append(module) return res @dataclasses.dataclass class BootstrapSettings: env_path: pathlib.Path whl_cache_path: pathlib.Path python_path: pathlib.Path base_dir: pathlib.Path python_version: Optional[str] = dataclasses.field( default_factory=lambda: os.environ.get( 'PYTHON_VERSION', '%d.%d' % ( sys.version_info.major, sys.version_info.minor, ), ).strip() ) pip_check_conflicts: Optional[bool] = dataclasses.field( default_factory=lambda: os.environ.get('PIP_CHECK_CONFLICTS', json.dumps(True)) in [json.dumps(True)], ) uv_cache_dir: str = dataclasses.field( default_factory=lambda: os.environ.get( 'UV_CACHE_DIR', str(pathlib.Path.cwd() / '.uv-cache'), ) ) uv_args: list[str] = dataclasses.field( default_factory=lambda: os.environ.get( 'UV_ARGS', '--no-index -U', ).split(), ) whl_cache_update: Optional[bool] = dataclasses.field( default_factory=lambda: os.environ.get('WHL_CACHE_UPDATE', json.dumps(False)) in [json.dumps(True)] ) uv_compile_allow_index: bool = dataclasses.field( default_factory=lambda: os.environ.get('UV_COMPILE_ALLOW_INDEX', json.dumps(False)) in [json.dumps(True)] ) venv_partial: bool = dataclasses.field( default_factory=lambda: os.environ.get('VENV_PARTIAL', json.dumps(False)) in [json.dumps(True)] ) @classmethod def get( cls, base_dir: Optional[pathlib.Path] = None, ) -> 'Self': if base_dir is None: base_dir = pathlib.Path.cwd() env_path: Optional[pathlib.Path] = None if 'ENV_PATH' in os.environ: env_path = pathlib.Path(os.environ['ENV_PATH']) else: env_path = base_dir / '.venv' whl_cache_path = env_path.parent / '.venv-whl-cache' python_path = env_path / 'bin' / 'python3' return cls( base_dir=base_dir, env_path=env_path, whl_cache_path=whl_cache_path, python_path=python_path, ) class requirements_name_get_t: @dataclasses.dataclass class res_t: not_compiled: pathlib.Path compiled: pathlib.Path name: str def requirements_name_get( source_dir: pathlib.Path, python_version: Optional[str], features: list[str], requirements: dict[str, pathlib.Path], ) -> requirements_name_get_t.res_t: requirements_python_version: Optional[str] = None if not python_version is None: requirements_python_version = python_version.replace('.', '_') requirements_name = '_'.join(sorted(features)) if requirements_python_version: requirements_name += '_' + requirements_python_version requirements_path: Optional[pathlib.Path] = None if requirements_name in requirements: requirements_path = requirements[requirements_name] else: requirements_path = source_dir / 'requirements.txt' requirements_path_in = requirements_path.parent / (requirements_path.stem + '.in') requirements_in: list[str] = [] return requirements_name_get_t.res_t( not_compiled=requirements_path_in, compiled=requirements_path, name=requirements_name, ) class packaging_t: class constants_t: canonicalize_re: typing.ClassVar[re.Pattern[str]] = re.compile(r'[-_.]+') req_spec_re: typing.ClassVar[re.Pattern[str]] = re.compile(r'^([a-zA-Z0-9._-]+)==([^\s;]+)') @dataclasses.dataclass class pkg_id_t: name: str version: str python_tag: Optional[str] = None @staticmethod def canonicalize_name(name: str) -> str: return packaging_t.constants_t.canonicalize_re.sub('-', name).lower() @staticmethod def parse_whl_name_version(filename: str) -> Optional['packaging_t.pkg_id_t']: parts = filename.split('-') if len(parts) >= 5 and filename.endswith('.whl'): return packaging_t.pkg_id_t( name=packaging_t.canonicalize_name(parts[0]), version=parts[1], python_tag=parts[2], ) if len(parts) >= 3 and filename.endswith('.whl'): return packaging_t.pkg_id_t( name=packaging_t.canonicalize_name(parts[0]), version=parts[1], ) return None @staticmethod def parse_req_spec(line: str) -> Optional['packaging_t.pkg_id_t']: m = packaging_t.constants_t.req_spec_re.match(line) if m: return packaging_t.pkg_id_t( name=packaging_t.canonicalize_name(m.group(1)), version=m.group(2), ) return None @staticmethod def parse_req_name(spec: str) -> Optional[str]: """Extract canonical package name from a requirement spec like 'pip>=23' or 'librt>=0.8'.""" m = re.match(r'^([a-zA-Z0-9._-]+)', spec.strip()) if m: return packaging_t.canonicalize_name(m.group(1)) return None @staticmethod def apply_overrides_to_constraints( requirements_path: pathlib.Path, overrides: list[str], output: 'typing.IO[str]', ) -> None: """Copy requirements file to output, replacing blocks for overridden packages. Handles multi-line entries (continuations with \\ and --hash lines). For each overridden package, its entire block is replaced with the override spec. """ override_map: dict[str, str] = {} for ov in overrides: name = packaging_t.parse_req_name(ov) if name is not None: override_map[name] = ov with io.open(requirements_path, 'r') as f: skip_block = False current_override: Optional[str] = None for line in f: stripped = line.strip() if stripped.startswith('#') or stripped.startswith('--hash'): if skip_block: continue output.write(line) continue if stripped == '' or stripped == '\\': if skip_block: continue output.write(line) continue if stripped.endswith('\\'): spec_part = stripped.rstrip('\\').strip() else: spec_part = stripped parsed = packaging_t.parse_req_spec(spec_part) if parsed is not None and parsed.name in override_map: if not skip_block: skip_block = True current_override = override_map.pop(parsed.name) output.write(current_override + '\n') continue if skip_block and (stripped.startswith('--hash') or stripped.endswith('\\')): continue skip_block = False current_override = None output.write(line) for name, ov in override_map.items(): output.write(ov + '\n') def whl_cache_download( whl_cache_path: pathlib.Path, requirements_path: pathlib.Path, python_version: Optional[str], pip_find_links_args: list[str], ) -> None: whl_cache_path.mkdir(parents=True, exist_ok=True) py_tag_prefix = 'cp' + python_version.replace('.', '') if python_version else None cached_pkgs: set[tuple[str, str]] = set() for whl in whl_cache_path.glob('*.whl'): parsed = packaging_t.parse_whl_name_version(whl.name) if parsed is None: continue if py_tag_prefix is not None and parsed.python_tag is not None: if not parsed.python_tag.startswith(py_tag_prefix) and parsed.python_tag not in ( 'py3', 'py2.py3', ): continue cached_pkgs.add((parsed.name, parsed.version)) missing_reqs: list[str] = [] with io.open(requirements_path, 'r') as f: for line in f: stripped = line.strip() if not stripped or stripped.startswith('#') or stripped.startswith('--hash'): continue spec = stripped.rstrip(' \\') if spec.startswith('#'): continue parsed = packaging_t.parse_req_spec(spec) if parsed is not None and (parsed.name, parsed.version) in cached_pkgs: logger.info(dict(msg='cached', pkg='%s==%s' % (parsed.name, parsed.version))) continue missing_reqs.append(spec) if not missing_reqs: logger.info(dict(msg='all wheels cached, skipping pip download')) return logger.info(dict(msg='downloading missing wheels', count=len(missing_reqs), pkgs=missing_reqs)) with tempfile.NamedTemporaryFile( mode='w', prefix='requirements_missing_', suffix='.txt', delete=False ) as f: f.write('\n'.join(missing_reqs)) f.flush() missing_req_path = f.name pip_python_version_args: list[str] = [] if python_version is not None: pip_python_version_args = ['--python-version', python_version.replace('.', '')] try: cmd = [ sys.executable, '-m', 'pip', 'download', '--only-binary=:all:', *pip_python_version_args, *pip_find_links_args, '-r', missing_req_path, '-d', str(whl_cache_path), ] logger.info(dict(cmd=cmd)) subprocess.check_call(cmd) finally: os.unlink(missing_req_path) def check_host_prerequisites() -> None: for mod in ['pip', 'uv']: try: subprocess.check_call( [sys.executable, '-m', mod, '--version'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) except (subprocess.CalledProcessError, FileNotFoundError): logger.error( '[bootstrap] %s -m %s is not available on the host system' % (sys.executable, mod) ) sys.exit(1) def env_bootstrap( bootstrap_settings: BootstrapSettings, pyproject: PyProject, overrides: Optional[list[str]] = None, ) -> None: check_host_prerequisites() pip_find_links: list[pathlib.Path] = [] if not pyproject.pip_find_links is None: pip_find_links.extend(pyproject.pip_find_links) pip_find_links_args = sum( [ [ '-f', str(o), ] for o in pip_find_links ], cast(list[str], []), ) features: list[str] = [] if pyproject.early_features: features.extend(pyproject.early_features) requirements_name_get_res = requirements_name_get( python_version=bootstrap_settings.python_version, features=features, requirements=pyproject.requirements, source_dir=pyproject.path.parent, ) requirements_path = requirements_name_get_res.compiled requirements_in: list[str] = [] requirements_in.extend(['uv', 'pip', 'build', 'setuptools', 'meson-python', 'pybind11']) if pyproject.early_features: early_dependencies = sum( [pyproject.dependencies[o] for o in pyproject.early_features], cast(list[str], []), ) logger.info( dict( requirements_name_get_res=requirements_name_get_res, early_dependencies=early_dependencies, ) ) requirements_in.extend(early_dependencies) # if len(early_dependencies) > 0: # subprocess.check_call([ # bootstrap_settings.python_path, # '-m', # 'uv', 'pip', 'install', # *pip_find_links_args, # # '-f', str(pathlib.Path(__file__).parent / 'deps' / 'dist'), # *bootstrap_settings.uv_args, # *early_dependencies, # ]) uv_python_version: list[str] = [] venv_python_version: list[str] = [] if not bootstrap_settings.python_version is None: uv_python_version.extend( [ # '-p', '--python-version', bootstrap_settings.python_version, ] ) venv_python_version.extend( [ '-p', # '--python-version', bootstrap_settings.python_version, ] ) logger.info('[bootstrap] step 1/5: compile requirements') needs_compile = not requirements_path.exists() constraint_args: list[str] = [] if bootstrap_settings.venv_partial and requirements_path.exists(): logger.info( '[bootstrap] VENV_PARTIAL: recompiling with existing requirements.txt as constraints' ) needs_compile = True constraint_args = ['-c', str(requirements_path)] cache_find_links_args: list[str] = [] if bootstrap_settings.whl_cache_path.exists(): cache_find_links_args = ['-f', str(bootstrap_settings.whl_cache_path)] if needs_compile: with ( tempfile.NamedTemporaryFile( mode='w', prefix='requirements', suffix='.in', ) as f_in, tempfile.NamedTemporaryFile( mode='w', prefix='requirements', suffix='.txt', dir=requirements_path.parent, delete=False, ) as f_out, ): f_in.write('\n'.join(requirements_in)) f_in.flush() uv_compile_args = bootstrap_settings.uv_args if bootstrap_settings.uv_compile_allow_index: uv_compile_args = [ o for o in uv_compile_args if o not in ('--no-index', '-U', '--upgrade') ] if len(constraint_args) > 0: uv_compile_args = [o for o in uv_compile_args if o not in ('-U', '--upgrade')] with contextlib.ExitStack() as stack: if overrides and len(constraint_args) > 0: patched = stack.enter_context( tempfile.NamedTemporaryFile(mode='w', prefix='constraints_', suffix='.txt') ) packaging_t.apply_overrides_to_constraints( requirements_path, overrides, patched ) patched.flush() constraint_args = ['-c', patched.name] cmd = [ 'uv', '--cache-dir', bootstrap_settings.uv_cache_dir, 'pip', 'compile', *uv_python_version, '--generate-hashes', '--no-annotate', '--no-header', *pip_find_links_args, *cache_find_links_args, *constraint_args, *uv_compile_args, '-o', f_out.name, f_in.name, ] logger.info(dict(cmd=cmd)) try: subprocess.check_call(cmd) os.replace(f_out.name, str(requirements_path)) except subprocess.CalledProcessError: os.unlink(f_out.name) raise if not bootstrap_settings.whl_cache_path.exists() or bootstrap_settings.whl_cache_update: whl_cache_download( whl_cache_path=bootstrap_settings.whl_cache_path, requirements_path=requirements_path, python_version=bootstrap_settings.python_version, pip_find_links_args=pip_find_links_args, ) if bootstrap_settings.whl_cache_path.exists(): cache_find_links_args = ['-f', str(bootstrap_settings.whl_cache_path)] if bootstrap_settings.venv_partial and bootstrap_settings.env_path.exists(): logger.info('[bootstrap] VENV_PARTIAL: skipping venv creation (already exists)') else: subprocess.check_call( [ 'uv', '--cache-dir', bootstrap_settings.uv_cache_dir, *[ o for o in bootstrap_settings.uv_args if o not in ['-U', '--upgrade', '--no-index'] ], 'venv', *venv_python_version, *cache_find_links_args, str(bootstrap_settings.env_path), ] ) cmd = [ 'uv', '--cache-dir', bootstrap_settings.uv_cache_dir, 'pip', 'install', *uv_python_version, *cache_find_links_args, '-p', str(bootstrap_settings.python_path), '--require-hashes', *bootstrap_settings.uv_args, '-r', str(requirements_path), ] logger.info(dict(cmd=cmd)) subprocess.check_call(cmd) if bootstrap_settings.pip_check_conflicts: subprocess.check_call( [ bootstrap_settings.python_path, '-m', 'online.fxreader.pr34.commands', 'pip_check_conflicts', ] ) def paths_equal(a: pathlib.Path | str, b: pathlib.Path | str) -> bool: return os.path.abspath(str(a)) == os.path.abspath(str(b)) import argparse as _argparse class argv_extract_t: """Extract known arguments from argv by scanning parser action definitions.""" @dataclasses.dataclass class res_t: namespace: _argparse.Namespace rest: list[str] @staticmethod def extract( parser: _argparse.ArgumentParser, argv: list[str], ) -> 'argv_extract_t.res_t': flag_map: dict[str, _argparse.Action] = {} for action in parser._actions: for opt in action.option_strings: flag_map[opt] = action matched_argv: list[str] = [] rest: list[str] = [] i = 0 while i < len(argv): action = flag_map.get(argv[i]) if action is not None: matched_argv.append(argv[i]) i += 1 if ( action.nargs in (None, 1) and action.const is None and not isinstance( action, ( _argparse._StoreTrueAction, _argparse._StoreFalseAction, _argparse._CountAction, _argparse._HelpAction, ), ) ): if i < len(argv): matched_argv.append(argv[i]) i += 1 else: rest.append(argv[i]) i += 1 namespace = parser.parse_args(matched_argv) return argv_extract_t.res_t( namespace=namespace, rest=rest, ) def run( d: Optional[pathlib.Path] = None, cli_path: Optional[pathlib.Path] = None, ) -> None: if cli_path is None: cli_path = pathlib.Path(__file__).parent / 'cli.py' if d is None: d = pathlib.Path(__file__).parent / 'pyproject.toml' bootstrap_parser = _argparse.ArgumentParser(add_help=False) bootstrap_parser.add_argument( '--bootstrap-help', action='help', help='show bootstrap help and exit', ) bootstrap_parser.add_argument( '--bootstrap-override', dest='overrides', action='append', default=[], help='override for uv pip compile (e.g. "librt>=0.8")', ) bootstrap_args = argv_extract_t.extract(bootstrap_parser, sys.argv[1:]) bootstrap_settings = BootstrapSettings.get() pyproject: PyProject = pyproject_load(d) logging.basicConfig( level=logging.INFO, format='%(levelname)s:%(name)s:%(message)s:%(process)d:%(asctime)s:%(pathname)s:%(funcName)s:%(lineno)s', ) if not bootstrap_settings.env_path.exists() or bootstrap_settings.venv_partial: env_bootstrap( bootstrap_settings=bootstrap_settings, pyproject=pyproject, overrides=bootstrap_args.namespace.overrides or None, ) logger.info([sys.executable, sys.argv, bootstrap_settings.python_path]) if not paths_equal(sys.executable, bootstrap_settings.python_path): os.execv( str(bootstrap_settings.python_path), [ str(bootstrap_settings.python_path), sys.argv[0], *bootstrap_args.rest, ], ) os.execv( str(bootstrap_settings.python_path), [ str(bootstrap_settings.python_path), str(cli_path), *bootstrap_args.rest, ], ) if __name__ == '__main__': run( d=pathlib.Path(__file__).parent / 'pyproject.common.toml', cli_path=pathlib.Path(__file__).parent / 'cli.py', )