[+] reformat with ruff

This commit is contained in:
Siarhei Siniak 2025-05-20 11:13:17 +03:00
parent 8510d49015
commit 0f17070c62
20 changed files with 7886 additions and 7960 deletions

@ -1,5 +1,5 @@
#!/usr/bin/env python3
#vim: set filetype=python
# vim: set filetype=python
import logging
import json
@ -7,158 +7,184 @@ import enum
import pathlib
import sys
import argparse
#import optparse
# import optparse
import dataclasses
import subprocess
import os
from typing import (
Optional, Any, TypeAlias, Literal, cast, BinaryIO, Generator,
ClassVar, Self,
Optional,
Any,
TypeAlias,
Literal,
cast,
BinaryIO,
Generator,
ClassVar,
Self,
)
logger = logging.getLogger()
@dataclasses.dataclass
class Settings:
project_root : pathlib.Path = pathlib.Path.cwd()
project_root: pathlib.Path = pathlib.Path.cwd()
env_path : pathlib.Path = project_root / 'tmp' / 'env3'
env_path: pathlib.Path = project_root / 'tmp' / 'env3'
_settings : ClassVar[Optional['Settings']] = None
_settings: ClassVar[Optional['Settings']] = None
@classmethod
def settings(cls) -> Self:
if cls._settings is None:
cls._settings = cls()
@classmethod
def settings(cls) -> Self:
if cls._settings is None:
cls._settings = cls()
return cls._settings
return cls._settings
def js(argv: list[str]) -> int:
return subprocess.check_call([
'sudo',
'docker-compose',
'--project-directory',
Settings.settings().project_root,
'-f',
Settings.settings().project_root / 'docker' / 'js' / 'docker-compose.yml',
*argv,
])
return subprocess.check_call(
[
'sudo',
'docker-compose',
'--project-directory',
Settings.settings().project_root,
'-f',
Settings.settings().project_root / 'docker' / 'js' / 'docker-compose.yml',
*argv,
]
)
def env(
argv: Optional[list[str]] = None,
mode: Literal['exec', 'subprocess'] = 'subprocess',
**kwargs: Any,
argv: Optional[list[str]] = None,
mode: Literal['exec', 'subprocess'] = 'subprocess',
**kwargs: Any,
) -> Optional[subprocess.CompletedProcess[bytes]]:
env_path = Settings.settings().env_path
env_path = Settings.settings().env_path
if not env_path.exists():
subprocess.check_call([
sys.executable, '-m', 'venv',
'--system-site-packages',
str(env_path)
])
if not env_path.exists():
subprocess.check_call([sys.executable, '-m', 'venv', '--system-site-packages', str(env_path)])
subprocess.check_call([
env_path / 'bin' / 'python3',
'-m', 'pip',
'install', '-r', 'requirements.txt',
])
subprocess.check_call(
[
env_path / 'bin' / 'python3',
'-m',
'pip',
'install',
'-r',
'requirements.txt',
]
)
if not argv is None:
python_path = str(env_path / 'bin' / 'python3')
if not argv is None:
python_path = str(env_path / 'bin' / 'python3')
if mode == 'exec':
os.execv(
python_path,
[
python_path,
*argv,
],
)
return None
elif mode == 'subprocess':
return subprocess.run([
python_path,
*argv,
], **kwargs)
else:
raise NotImplementedError
if mode == 'exec':
os.execv(
python_path,
[
python_path,
*argv,
],
)
return None
elif mode == 'subprocess':
return subprocess.run(
[
python_path,
*argv,
],
**kwargs,
)
else:
raise NotImplementedError
return None
return None
def ruff(argv: list[str]) -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
dest='paths',
help='specify paths to check',
default=[],
action='append',
)
parser.add_argument(
'-e',
dest='exclude',
help='rules to ignore',
default=[],
action='append',
)
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
dest='paths',
help='specify paths to check',
default=[],
action='append',
)
parser.add_argument(
'-e',
dest='exclude',
help='rules to ignore',
default=[],
action='append',
)
options, args = parser.parse_known_args(argv)
options, args = parser.parse_known_args(argv)
if len(options.paths) == 0:
options.paths.extend([
'.',
'dotfiles/.local/bin/commands',
])
if len(options.paths) == 0:
options.paths.extend(
[
'.',
'dotfiles/.local/bin/commands',
]
)
if len(options.exclude) == 0:
options.exclude.extend([
'E731',
'E713',
'E714',
'E703',
])
if len(options.exclude) == 0:
options.exclude.extend(
[
'E731',
'E713',
'E714',
'E703',
]
)
res = env([
'-m',
'ruff',
'check',
*args,
'--output-format', 'json',
'--ignore', ','.join(options.exclude),
*options.paths,
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
res = env(
[
'-m',
'ruff',
'check',
*args,
'--output-format',
'json',
'--ignore',
','.join(options.exclude),
*options.paths,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
assert not res is None
assert not res is None
errors = json.loads(res.stdout.decode('utf-8'))
errors = json.loads(res.stdout.decode('utf-8'))
g: dict[str, Any] = dict()
for o in errors:
if not o['filename'] in g:
g[o['filename']] = []
g[o['filename']].append(o)
g: dict[str, Any] = dict()
for o in errors:
if not o['filename'] in g:
g[o['filename']] = []
g[o['filename']].append(o)
h = {
k : len(v)
for k, v in g.items()
}
h = {k: len(v) for k, v in g.items()}
logger.info(json.dumps(errors, indent=4))
logger.info(json.dumps(h, indent=4))
logger.info(json.dumps(errors, indent=4))
logger.info(json.dumps(h, indent=4))
def inside_env() -> bool:
try:
import numpy
return True
except Exception:
return False
try:
import numpy
#class Commands(enum.StrEnum):
return True
except Exception:
return False
# class Commands(enum.StrEnum):
# js = 'js'
# mypy = 'mypy'
# env = 'env'
@ -172,83 +198,97 @@ def inside_env() -> bool:
# argv,
# )
def host_deps(argv: list[str]) -> None:
if sys.platform in ['linux']:
subprocess.check_call(r'''
if sys.platform in ['linux']:
subprocess.check_call(
r"""
exec yay -S $(cat requirements-archlinux.txt)
''', shell=True,)
else:
raise NotImplementedError
""",
shell=True,
)
else:
raise NotImplementedError
Command_args = ['js', 'mypy', 'env', 'ruff', 'm2', 'host_deps',]
Command : TypeAlias = Literal['js', 'mypy', 'env', 'ruff', 'm2', 'host_deps',]
Command_args = [
'js',
'mypy',
'env',
'ruff',
'm2',
'host_deps',
]
Command: TypeAlias = Literal[
'js',
'mypy',
'env',
'ruff',
'm2',
'host_deps',
]
def run(argv: Optional[list[str]] = None) -> None:
logging.basicConfig(
level=logging.INFO,
format=(
'%(levelname)s:%(name)s:%(message)s'
':%(process)d'
':%(asctime)s'
':%(pathname)s:%(funcName)s:%(lineno)s'
),
)
logging.basicConfig(
level=logging.INFO,
format=('%(levelname)s:%(name)s:%(message)s:%(process)d:%(asctime)s:%(pathname)s:%(funcName)s:%(lineno)s'),
)
if argv is None:
argv = sys.argv[:]
if argv is None:
argv = sys.argv[:]
parser = argparse.ArgumentParser()
parser.add_argument(
'command',
#'_command',
choices=[o for o in Command_args],
# required=True,
)
parser = argparse.ArgumentParser()
parser.add_argument(
'command',
#'_command',
choices=[
o
for o in Command_args
],
#required=True,
)
options, args = parser.parse_known_args(argv[1:])
options, args = parser.parse_known_args(argv[1:])
assert options.command in Command_args
assert options.command in Command_args
if len(args) > 0 and args[0] == '--':
del args[0]
if len(args) > 0 and args[0] == '--':
del args[0]
# options.command = Commands(options._command)
#options.command = Commands(options._command)
if options.command == 'js':
js(args)
elif options.command == 'host_deps':
host_deps(args)
elif options.command == 'env':
env(
args,
mode='exec',
)
# elif options.command == 'mypy':
# if not inside_env():
# env(
# [
# pathlib.Path(__file__).parent / 'm.py',
# *argv[1:],
# ],
# mode='exec'
# )
# else:
# mypy(args)
elif options.command == 'ruff':
ruff(args)
elif options.command == 'm2':
if not inside_env():
env(['--', '_m.py', 'm2', *args])
return
if options.command == 'js':
js(args)
elif options.command == 'host_deps':
host_deps(args)
elif options.command == 'env':
env(args, mode='exec',)
# elif options.command == 'mypy':
# if not inside_env():
# env(
# [
# pathlib.Path(__file__).parent / 'm.py',
# *argv[1:],
# ],
# mode='exec'
# )
# else:
# mypy(args)
elif options.command == 'ruff':
ruff(args)
elif options.command == 'm2':
if not inside_env():
env(['--', '_m.py', 'm2', *args])
return
import python.tasks.cython
python.tasks.cython.mypyc_build(pathlib.Path('_m.py'))
else:
raise NotImplementedError
import python.tasks.cython
python.tasks.cython.mypyc_build(
pathlib.Path('_m.py')
)
else:
raise NotImplementedError
if __name__ == '__main__':
run()
run()

@ -10,7 +10,10 @@ import enum
import argparse
import dataclasses
from typing import (Optional, override,)
from typing import (
Optional,
override,
)
from online.fxreader.pr34.commands_typed.logging import setup as logging_setup
@ -24,139 +27,134 @@ logger = logging.getLogger(__name__)
class Command(enum.StrEnum):
mypy = 'mypy'
deploy_wheel = 'deploy:wheel'
tests = 'tests'
mypy = 'mypy'
deploy_wheel = 'deploy:wheel'
tests = 'tests'
@dataclasses.dataclass
class Settings(
_cli.DistSettings,
_cli.DistSettings,
):
base_dir: pathlib.Path = pathlib.Path(__file__).parent.parent
build_dir: pathlib.Path = base_dir / 'tmp' / 'build'
wheel_dir: pathlib.Path = base_dir / 'deps' / 'dist'
env_path: pathlib.Path = cli_bootstrap.BootstrapSettings.get(base_dir).env_path
python_path: pathlib.Path = cli_bootstrap.BootstrapSettings.get(base_dir).python_path
base_dir: pathlib.Path = pathlib.Path(__file__).parent.parent
build_dir: pathlib.Path = base_dir / 'tmp' / 'build'
wheel_dir: pathlib.Path = base_dir / 'deps' / 'dist'
env_path: pathlib.Path = cli_bootstrap.BootstrapSettings.get(base_dir).env_path
python_path: pathlib.Path = cli_bootstrap.BootstrapSettings.get(base_dir).python_path
class CLI(_cli.CLI):
def __init__(self) -> None:
self.settings = Settings()
self._projects: dict[str, _cli.Project] = {
'online.fxreader.pr34': _cli.Project(
source_dir=self.settings.base_dir / 'python',
build_dir=self.settings.base_dir / 'tmp' / 'online' / 'fxreader' / 'pr34' / 'build',
dest_dir=self.settings.base_dir / 'tmp' / 'online' / 'fxreader' / 'pr34' / 'install',
)
}
def __init__(self) -> None:
self.settings = Settings()
self._projects: dict[str, _cli.Project] = {
'online.fxreader.pr34': _cli.Project(
source_dir=self.settings.base_dir / 'python',
build_dir=self.settings.base_dir / 'tmp' / 'online' / 'fxreader' / 'pr34' / 'build',
dest_dir=self.settings.base_dir / 'tmp' / 'online' / 'fxreader' / 'pr34' / 'install',
)
}
self._dependencies : dict[str, _cli.Dependency] = dict()
self._dependencies: dict[str, _cli.Dependency] = dict()
@override
@property
def dist_settings(self) -> _cli.DistSettings:
return self.settings
@override
@property
def dist_settings(self) -> _cli.DistSettings:
return self.settings
@override
@property
def projects(self) -> dict[str, _cli.Project]:
return self._projects
@override
@property
def projects(self) -> dict[str, _cli.Project]:
return self._projects
def mypy(
self,
argv: list[str],
) -> None:
import online.fxreader.pr34.commands_typed.mypy as _mypy
def mypy(
self,
argv: list[str],
) -> None:
import online.fxreader.pr34.commands_typed.mypy as _mypy
project = self._projects['online.fxreader.pr34']
project = self._projects['online.fxreader.pr34']
_mypy.run(
argv,
settings=_mypy.MypySettings(
paths=[
#Settings.settings().project_root / 'dotfiles/.local/bin/commands',
# project.source_dir / 'm.py',
project.source_dir / '_m.py',
project.source_dir / 'online',
project.source_dir / 'cli.py',
self.settings.base_dir / 'm.py',
# Settings.settings().project_root / 'deps/com.github.aiortc.aiortc/src',
#Settings.settings().project_root / 'm.py',
],
max_errors={
'python/online/fxreader/pr34/commands_typed': 0,
'python/cli.py': 0,
'm.py': 0,
'deps/com.github.aiortc.aiortc/src/online_fxreader': 0,
'deps/com.github.aiortc.aiortc/src/aiortc/contrib/signaling': 0
}
),
)
_mypy.run(
argv,
settings=_mypy.MypySettings(
paths=[
# Settings.settings().project_root / 'dotfiles/.local/bin/commands',
# project.source_dir / 'm.py',
project.source_dir / '_m.py',
project.source_dir / 'online',
project.source_dir / 'cli.py',
self.settings.base_dir / 'm.py',
# Settings.settings().project_root / 'deps/com.github.aiortc.aiortc/src',
# Settings.settings().project_root / 'm.py',
],
max_errors={
'python/online/fxreader/pr34/commands_typed': 0,
'python/cli.py': 0,
'm.py': 0,
'deps/com.github.aiortc.aiortc/src/online_fxreader': 0,
'deps/com.github.aiortc.aiortc/src/aiortc/contrib/signaling': 0,
},
),
)
@override
@property
def dependencies(self) -> dict[str, _cli.Dependency]:
return self._dependencies
@override
@property
def dependencies(self) -> dict[str, _cli.Dependency]:
return self._dependencies
def run(self, argv: Optional[list[str]] = None) -> None:
if argv is None:
argv = copy.deepcopy(sys.argv)
def run(self, argv: Optional[list[str]] = None) -> None:
if argv is None:
argv = copy.deepcopy(sys.argv)
parser = argparse.ArgumentParser()
parser.add_argument(
'command',
choices=[
o.value
for o in Command
]
)
parser.add_argument(
'-p', '--project',
choices=[
o
for o in self.projects
]
)
parser.add_argument(
'-o', '--output_dir',
default=None,
help='wheel output dir for deploy:wheel',
)
parser.add_argument(
'-f', '--force',
default=False,
action='store_true',
help='remove install dir, before installing, default = false',
)
parser = argparse.ArgumentParser()
parser.add_argument('command', choices=[o.value for o in Command])
parser.add_argument('-p', '--project', choices=[o for o in self.projects])
parser.add_argument(
'-o',
'--output_dir',
default=None,
help='wheel output dir for deploy:wheel',
)
parser.add_argument(
'-f',
'--force',
default=False,
action='store_true',
help='remove install dir, before installing, default = false',
)
options, args = parser.parse_known_args(argv[1:])
options, args = parser.parse_known_args(argv[1:])
options.command = Command(options.command)
options.command = Command(options.command)
if options.command is Command.deploy_wheel:
assert not options.project is None
if options.command is Command.deploy_wheel:
assert not options.project is None
self.deploy_wheel(
project_name=options.project,
argv=args,
output_dir=options.output_dir,
mypy=True,
)
elif options.command is Command.mypy:
self.mypy(
argv=args,
)
elif options.command is Command.tests:
for k, v in self.projects.items():
subprocess.check_call(
[
sys.executable,
'-m',
'unittest',
'online.fxreader.pr34.tests.test_crypto',
*args,
],
cwd=str(v.source_dir),
)
else:
raise NotImplementedError
self.deploy_wheel(
project_name=options.project,
argv=args,
output_dir=options.output_dir,
mypy=True,
)
elif options.command is Command.mypy:
self.mypy(
argv=args,
)
elif options.command is Command.tests:
for k, v in self.projects.items():
subprocess.check_call([
sys.executable,
'-m',
'unittest',
'online.fxreader.pr34.tests.test_crypto',
*args,
], cwd=str(v.source_dir))
else:
raise NotImplementedError
if __name__ == '__main__':
CLI().run()
CLI().run()

File diff suppressed because it is too large Load Diff

@ -1,27 +1,28 @@
__all__ = (
'parse_args',
)
__all__ = ('parse_args',)
import sys
import argparse
from typing import (Optional,)
from typing import (
Optional,
)
def parse_args(
parser: argparse.ArgumentParser,
args: Optional[list[str]] = None,
parser: argparse.ArgumentParser,
args: Optional[list[str]] = None,
) -> tuple[argparse.Namespace, list[str]]:
if args is None:
args = sys.argv[1:]
if args is None:
args = sys.argv[1:]
argv : list[str] = []
argv: list[str] = []
for i, o in enumerate(args):
if o == '--':
argv.extend(args[i + 1:])
for i, o in enumerate(args):
if o == '--':
argv.extend(args[i + 1 :])
del args[i:]
del args[i:]
break
break
return parser.parse_args(args), argv
return parser.parse_args(args), argv

@ -1,14 +1,23 @@
import logging
import asyncio
from typing import (Any,)
from typing import (
Any,
)
logger = logging.getLogger(__name__)
def handle_task_result(fut: asyncio.Future[Any]) -> None:
try:
fut.result()
logger.debug(dict(fut=fut, msg='done'), stacklevel=2,)
except:
logger.exception('', stacklevel=2,)
def handle_task_result(fut: asyncio.Future[Any]) -> None:
try:
fut.result()
logger.debug(
dict(fut=fut, msg='done'),
stacklevel=2,
)
except:
logger.exception(
'',
stacklevel=2,
)

@ -12,467 +12,490 @@ import abc
from .os import shutil_which
from typing import (
Optional,
Literal,
Any,
Optional,
Literal,
Any,
)
logger = logging.getLogger(__name__)
@dataclasses.dataclass
class Project:
source_dir : pathlib.Path
build_dir : pathlib.Path
dest_dir : pathlib.Path
meson_path: Optional[pathlib.Path] = None
source_dir: pathlib.Path
build_dir: pathlib.Path
dest_dir: pathlib.Path
meson_path: Optional[pathlib.Path] = None
@dataclasses.dataclass
class Dependency:
name: str
mode : Literal['pyproject', 'meson', 'meson-python', 'm']
source_path : pathlib.Path
args: Optional[list[str]] = None
name: str
mode: Literal['pyproject', 'meson', 'meson-python', 'm']
source_path: pathlib.Path
args: Optional[list[str]] = None
@dataclasses.dataclass
class DistSettings:
wheel_dir : pathlib.Path
python_path: pathlib.Path
env_path: pathlib.Path
wheel_dir: pathlib.Path
python_path: pathlib.Path
env_path: pathlib.Path
class CLI(abc.ABC):
@property
@abc.abstractmethod
def dist_settings(self) -> DistSettings:
raise NotImplementedError
@property
@abc.abstractmethod
def projects(self) -> dict[str, Project]:
raise NotImplementedError
@property
@abc.abstractmethod
def dependencies(self) -> dict[str, Dependency]:
raise NotImplementedError
def mypy(
self,
argv: list[str]
) -> None:
from . import mypy as _mypy
_mypy.run(
argv,
)
def ruff(
self,
project_name: str,
argv: list[str],
) -> None:
project = self.projects[project_name]
if len(argv) == 0:
argv = ['check', '.',]
subprocess.check_call([
self.dist_settings.python_path,
'-m',
'ruff',
'--config', str(project.source_dir / 'pyproject.toml'),
*argv,
])
def pyright(
self,
project_name: str,
argv: list[str],
) -> None:
project = self.projects[project_name]
if len(argv) == 0:
argv = ['--threads', '3']
cmd = [
str(self.dist_settings.python_path),
'-m',
'pyright',
'--pythonpath', str(self.dist_settings.python_path),
'-p', str(project.source_dir / 'pyproject.toml'),
*argv,
]
logger.info(cmd)
subprocess.check_call(cmd)
def pip_sync(
self,
project: str,
features: list[str],
) -> None:
from . import cli_bootstrap
pyproject = cli_bootstrap.pyproject_load(
self.projects[project].source_dir / 'pyproject.toml'
)
dependencies = sum([
pyproject.dependencies[o]
for o in features
], [])
pip_find_links : list[pathlib.Path] = []
if not pyproject.pip_find_links is None:
pip_find_links.extend(pyproject.pip_find_links)
logger.info(dict(
dependencies=dependencies,
))
if len(dependencies) > 0:
subprocess.check_call([
self.dist_settings.python_path,
'-m',
'uv', 'pip', 'install',
*sum([
['-f', str(o),]
for o in pip_find_links
], []),
# '-f', str(pathlib.Path(__file__).parent / 'deps' / 'dist'),
'--offline',
*dependencies,
])
def deploy_fetch_dist(
self,
force: bool,
) -> None:
for k, d in self.dependencies.items():
whl_glob = self.dist_settings.wheel_dir / ('*%s*.whl' % d.name.replace('.', '_'))
if len(glob.glob(
str(whl_glob)
)) == 0 or force:
if d.source_path.exists():
def whl_files_get() -> list[dict[str, Any]]:
return [
dict(
path=o,
stat=os.stat(o).st_mtime,
)
for o in glob.glob(
str(whl_glob)
)
]
present_files = whl_files_get()
if d.mode == 'm':
if (d.source_path / 'm.py').exists():
cmd = [
sys.executable,
str(d.source_path / 'm.py'),
'deploy:wheel',
'-o',
str(self.dist_settings.wheel_dir),
]
if not d.args is None:
cmd.extend(d.args)
subprocess.check_call(
cmd,
cwd=d.source_path,
)
else:
raise NotImplementedError
updated_files = whl_files_get()
def index_get(o: dict[str, Any]) -> tuple[Any, ...]:
return (o['path'], o['stat'])
present_files_index = {
index_get(o) : o
for o in present_files
}
new_files : list[dict[str, Any]] = []
for o in updated_files:
entry_index = index_get(o)
if not entry_index in present_files_index:
new_files.append(o)
if len(new_files) == 0:
raise NotImplementedError
latest_file = sorted(
new_files,
key=lambda x: x['stat']
)[-1]
subprocess.check_call([
self.dist_settings.python_path,
'-m', 'pip',
'install',
latest_file['path'],
])
@property
def pkg_config_path(self,) -> set[pathlib.Path]:
return {
pathlib.Path(o)
for o in glob.glob(
str(self.dist_settings.env_path / 'lib' / 'python*' / '**' / 'pkgconfig'),
recursive=True,
)
}
def deploy_wheel(
self,
project_name: str,
argv: Optional[list[str]] = None,
output_dir: Optional[pathlib.Path] = None,
force: Optional[bool] = None,
env: Optional[dict[str, str]] = None,
mypy: bool = False,
tests: bool = False,
) -> None:
project = self.projects[project_name]
# subprocess.check_call([
# sys.argv[0],
# # sys.executable,
# '-p', options.project,
# Command.meson_setup.value,
# ])
if argv is None:
argv = []
# assert argv is None or len(argv) == 0
if not project.meson_path is None:
if tests:
self.meson_test(
project_name=project_name,
)
self.meson_install(
project_name=project_name,
force=force,
)
if mypy:
self.mypy([])
if env is None:
env = dict()
extra_args: list[str] = []
if len(self.third_party_roots) > 0:
extra_args.extend([
'-Csetup-args=%s' % (
'-Dthird_party_roots=%s' % str(o.absolute())
)
for o in self.third_party_roots
])
cmd = [
sys.executable,
'-m',
'build',
'-w', '-n',
*extra_args,
'-Csetup-args=-Dmodes=pyproject',
'-Cbuild-dir=%s' % str(project.build_dir / 'pyproject'),
'-Csetup-args=-Dinstall_path=%s' % str(project.dest_dir),
# '-Cbuild-dir=%s' % str(project.build_dir),
str(project.source_dir),
*argv,
]
if not output_dir is None:
cmd.extend(['-o', str(output_dir)])
logger.info(dict(env=env))
subprocess.check_call(
cmd,
env=dict(list(os.environ.items())) | env,
)
if not project.meson_path is None:
if tests:
subprocess.check_call(
[
'ninja',
'-C',
str(project.build_dir / 'pyproject'),
'test',
]
)
def meson_install(
self,
project_name: str,
force: Optional[bool] = None,
argv: Optional[list[str]] = None,
) -> None:
project = self.projects[project_name]
if force is None:
force = False
if argv is None:
argv = []
if force and project.dest_dir.exists():
shutil.rmtree(project.dest_dir)
subprocess.check_call([
shutil_which('meson', True,),
'install',
'-C',
project.build_dir / 'meson',
'--destdir', project.dest_dir,
*argv,
])
for o in glob.glob(
str(project.dest_dir / 'lib' / 'pkgconfig' / '*.pc'),
recursive=True,
):
logger.info(dict(
path=o,
action='patch prefix',
))
with io.open(o, 'r') as f:
content = f.read()
with io.open(o, 'w') as f:
f.write(
content.replace('prefix=/', 'prefix=${pcfiledir}/../../')
)
def ninja(
self,
project_name: str,
argv: Optional[list[str]] = None,
env: Optional[dict[str, str]] = None,
) -> None:
project = self.projects[project_name]
if argv is None:
argv = []
if env is None:
env = dict()
logger.info(dict(env=env))
subprocess.check_call(
[
shutil_which('ninja', True),
'-C',
str(project.build_dir / 'meson'),
*argv,
],
env=dict(list(os.environ.items())) | env,
)
def meson_test(
self,
project_name: str,
argv: Optional[list[str]] = None,
) -> None:
project = self.projects[project_name]
if argv is None:
argv = []
subprocess.check_call([
shutil_which('meson', True,),
'test',
'-C',
project.build_dir / 'meson',
*argv,
])
def meson_compile(
self,
project_name: str,
argv: Optional[list[str]] = None,
) -> None:
project = self.projects[project_name]
if argv is None:
argv = []
subprocess.check_call([
shutil_which('meson', True,),
'compile',
'-C',
project.build_dir / 'meson',
*argv,
])
@property
def third_party_roots(self) -> list[pathlib.Path]:
return []
def meson_setup(
self,
project_name: str,
force: bool,
argv: Optional[list[str]] = None,
env: Optional[dict[str, str]] = None,
# third_party_roots: Optional[list[pathlib.Path]] = None,
) -> None:
project = self.projects[project_name]
if argv is None:
argv = []
if env is None:
env = dict()
logger.info(dict(env=env))
if force:
if (project.build_dir / 'meson').exists():
logger.info(dict(action='removing build dir', path=project.build_dir / 'meson'))
shutil.rmtree(project.build_dir / 'meson')
extra_args : list[str] = []
if len(self.third_party_roots) > 0:
extra_args.extend([
'-Dthird_party_roots=%s' % str(o.absolute())
for o in self.third_party_roots
])
cmd = [
shutil_which('meson', True,),
'setup',
str(project.source_dir),
str(project.build_dir / 'meson'),
'-Dmodes=["meson"]',
*extra_args,
# '-Dpkgconfig.relocatable=true',
'-Dprefix=/',
*argv,
]
logger.info(dict(cmd=cmd))
subprocess.check_call(
cmd,
env=dict(list(os.environ.items())) | env,
)
@property
@abc.abstractmethod
def dist_settings(self) -> DistSettings:
raise NotImplementedError
@property
@abc.abstractmethod
def projects(self) -> dict[str, Project]:
raise NotImplementedError
@property
@abc.abstractmethod
def dependencies(self) -> dict[str, Dependency]:
raise NotImplementedError
def mypy(self, argv: list[str]) -> None:
from . import mypy as _mypy
_mypy.run(
argv,
)
def ruff(
self,
project_name: str,
argv: list[str],
) -> None:
project = self.projects[project_name]
if len(argv) == 0:
argv = [
'check',
'.',
]
subprocess.check_call(
[
self.dist_settings.python_path,
'-m',
'ruff',
'--config',
str(project.source_dir / 'pyproject.toml'),
*argv,
]
)
def pyright(
self,
project_name: str,
argv: list[str],
) -> None:
project = self.projects[project_name]
if len(argv) == 0:
argv = ['--threads', '3']
cmd = [
str(self.dist_settings.python_path),
'-m',
'pyright',
'--pythonpath',
str(self.dist_settings.python_path),
'-p',
str(project.source_dir / 'pyproject.toml'),
*argv,
]
logger.info(cmd)
subprocess.check_call(cmd)
def pip_sync(
self,
project: str,
features: list[str],
) -> None:
from . import cli_bootstrap
pyproject = cli_bootstrap.pyproject_load(self.projects[project].source_dir / 'pyproject.toml')
dependencies = sum([pyproject.dependencies[o] for o in features], [])
pip_find_links: list[pathlib.Path] = []
if not pyproject.pip_find_links is None:
pip_find_links.extend(pyproject.pip_find_links)
logger.info(
dict(
dependencies=dependencies,
)
)
if len(dependencies) > 0:
subprocess.check_call(
[
self.dist_settings.python_path,
'-m',
'uv',
'pip',
'install',
*sum(
[
[
'-f',
str(o),
]
for o in pip_find_links
],
[],
),
# '-f', str(pathlib.Path(__file__).parent / 'deps' / 'dist'),
'--offline',
*dependencies,
]
)
def deploy_fetch_dist(
self,
force: bool,
) -> None:
for k, d in self.dependencies.items():
whl_glob = self.dist_settings.wheel_dir / ('*%s*.whl' % d.name.replace('.', '_'))
if len(glob.glob(str(whl_glob))) == 0 or force:
if d.source_path.exists():
def whl_files_get() -> list[dict[str, Any]]:
return [
dict(
path=o,
stat=os.stat(o).st_mtime,
)
for o in glob.glob(str(whl_glob))
]
present_files = whl_files_get()
if d.mode == 'm':
if (d.source_path / 'm.py').exists():
cmd = [
sys.executable,
str(d.source_path / 'm.py'),
'deploy:wheel',
'-o',
str(self.dist_settings.wheel_dir),
]
if not d.args is None:
cmd.extend(d.args)
subprocess.check_call(
cmd,
cwd=d.source_path,
)
else:
raise NotImplementedError
updated_files = whl_files_get()
def index_get(o: dict[str, Any]) -> tuple[Any, ...]:
return (o['path'], o['stat'])
present_files_index = {index_get(o): o for o in present_files}
new_files: list[dict[str, Any]] = []
for o in updated_files:
entry_index = index_get(o)
if not entry_index in present_files_index:
new_files.append(o)
if len(new_files) == 0:
raise NotImplementedError
latest_file = sorted(new_files, key=lambda x: x['stat'])[-1]
subprocess.check_call(
[
self.dist_settings.python_path,
'-m',
'pip',
'install',
latest_file['path'],
]
)
@property
def pkg_config_path(
self,
) -> set[pathlib.Path]:
return {
pathlib.Path(o)
for o in glob.glob(
str(self.dist_settings.env_path / 'lib' / 'python*' / '**' / 'pkgconfig'),
recursive=True,
)
}
def deploy_wheel(
self,
project_name: str,
argv: Optional[list[str]] = None,
output_dir: Optional[pathlib.Path] = None,
force: Optional[bool] = None,
env: Optional[dict[str, str]] = None,
mypy: bool = False,
tests: bool = False,
) -> None:
project = self.projects[project_name]
# subprocess.check_call([
# sys.argv[0],
# # sys.executable,
# '-p', options.project,
# Command.meson_setup.value,
# ])
if argv is None:
argv = []
# assert argv is None or len(argv) == 0
if not project.meson_path is None:
if tests:
self.meson_test(
project_name=project_name,
)
self.meson_install(
project_name=project_name,
force=force,
)
if mypy:
self.mypy([])
if env is None:
env = dict()
extra_args: list[str] = []
if len(self.third_party_roots) > 0:
extra_args.extend(['-Csetup-args=%s' % ('-Dthird_party_roots=%s' % str(o.absolute())) for o in self.third_party_roots])
cmd = [
sys.executable,
'-m',
'build',
'-w',
'-n',
*extra_args,
'-Csetup-args=-Dmodes=pyproject',
'-Cbuild-dir=%s' % str(project.build_dir / 'pyproject'),
'-Csetup-args=-Dinstall_path=%s' % str(project.dest_dir),
# '-Cbuild-dir=%s' % str(project.build_dir),
str(project.source_dir),
*argv,
]
if not output_dir is None:
cmd.extend(['-o', str(output_dir)])
logger.info(dict(env=env))
subprocess.check_call(
cmd,
env=dict(list(os.environ.items())) | env,
)
if not project.meson_path is None:
if tests:
subprocess.check_call(
[
'ninja',
'-C',
str(project.build_dir / 'pyproject'),
'test',
]
)
def meson_install(
self,
project_name: str,
force: Optional[bool] = None,
argv: Optional[list[str]] = None,
) -> None:
project = self.projects[project_name]
if force is None:
force = False
if argv is None:
argv = []
if force and project.dest_dir.exists():
shutil.rmtree(project.dest_dir)
subprocess.check_call(
[
shutil_which(
'meson',
True,
),
'install',
'-C',
project.build_dir / 'meson',
'--destdir',
project.dest_dir,
*argv,
]
)
for o in glob.glob(
str(project.dest_dir / 'lib' / 'pkgconfig' / '*.pc'),
recursive=True,
):
logger.info(
dict(
path=o,
action='patch prefix',
)
)
with io.open(o, 'r') as f:
content = f.read()
with io.open(o, 'w') as f:
f.write(content.replace('prefix=/', 'prefix=${pcfiledir}/../../'))
def ninja(
self,
project_name: str,
argv: Optional[list[str]] = None,
env: Optional[dict[str, str]] = None,
) -> None:
project = self.projects[project_name]
if argv is None:
argv = []
if env is None:
env = dict()
logger.info(dict(env=env))
subprocess.check_call(
[
shutil_which('ninja', True),
'-C',
str(project.build_dir / 'meson'),
*argv,
],
env=dict(list(os.environ.items())) | env,
)
def meson_test(
self,
project_name: str,
argv: Optional[list[str]] = None,
) -> None:
project = self.projects[project_name]
if argv is None:
argv = []
subprocess.check_call(
[
shutil_which(
'meson',
True,
),
'test',
'-C',
project.build_dir / 'meson',
*argv,
]
)
def meson_compile(
self,
project_name: str,
argv: Optional[list[str]] = None,
) -> None:
project = self.projects[project_name]
if argv is None:
argv = []
subprocess.check_call(
[
shutil_which(
'meson',
True,
),
'compile',
'-C',
project.build_dir / 'meson',
*argv,
]
)
@property
def third_party_roots(self) -> list[pathlib.Path]:
return []
def meson_setup(
self,
project_name: str,
force: bool,
argv: Optional[list[str]] = None,
env: Optional[dict[str, str]] = None,
# third_party_roots: Optional[list[pathlib.Path]] = None,
) -> None:
project = self.projects[project_name]
if argv is None:
argv = []
if env is None:
env = dict()
logger.info(dict(env=env))
if force:
if (project.build_dir / 'meson').exists():
logger.info(dict(action='removing build dir', path=project.build_dir / 'meson'))
shutil.rmtree(project.build_dir / 'meson')
extra_args: list[str] = []
if len(self.third_party_roots) > 0:
extra_args.extend(['-Dthird_party_roots=%s' % str(o.absolute()) for o in self.third_party_roots])
cmd = [
shutil_which(
'meson',
True,
),
'setup',
str(project.source_dir),
str(project.build_dir / 'meson'),
'-Dmodes=["meson"]',
*extra_args,
# '-Dpkgconfig.relocatable=true',
'-Dprefix=/',
*argv,
]
logger.info(dict(cmd=cmd))
subprocess.check_call(
cmd,
env=dict(list(os.environ.items())) | env,
)

@ -10,327 +10,324 @@ import os
import logging
from typing import (Optional, Any,)
from typing import (
Optional,
Any,
)
from typing_extensions import (
Self, BinaryIO,
Self,
BinaryIO,
)
logger = logging.getLogger(__name__)
def toml_load(f: BinaryIO) -> Any:
try:
import tomllib
return tomllib.load(f)
except:
pass
try:
import tomllib
try:
import tomli
return tomli.load(f)
except:
pass
return tomllib.load(f)
except:
pass
try:
import tomli
return tomli.load(f)
except:
pass
raise NotImplementedError
raise NotImplementedError
@dataclasses.dataclass
class PyProject:
path: pathlib.Path
dependencies: dict[str, list[str]]
early_features: Optional[list[str]] = None
pip_find_links: Optional[list[pathlib.Path]] = None
runtime_libdirs: Optional[list[pathlib.Path]] = None
runtime_preload: Optional[list[pathlib.Path]] = None
requirements: dict[str, pathlib.Path] = dataclasses.field(default_factory=lambda : dict())
path: pathlib.Path
dependencies: dict[str, list[str]]
early_features: Optional[list[str]] = None
pip_find_links: Optional[list[pathlib.Path]] = None
runtime_libdirs: Optional[list[pathlib.Path]] = None
runtime_preload: Optional[list[pathlib.Path]] = None
requirements: dict[str, pathlib.Path] = dataclasses.field(default_factory=lambda: dict())
def pyproject_load(
d: pathlib.Path,
d: pathlib.Path,
) -> PyProject:
with io.open(d, 'rb') as f:
content = toml_load(f)
with io.open(d, 'rb') as f:
content = toml_load(f)
assert isinstance(content, dict)
assert isinstance(content, dict)
dependencies : dict[str, list[str]] = dict()
dependencies: dict[str, list[str]] = dict()
dependencies['default'] = content['project']['dependencies']
dependencies['default'] = content['project']['dependencies']
if (
'optional-dependencies' in content['project']
):
assert isinstance(
content['project']['optional-dependencies'],
dict
)
if 'optional-dependencies' in content['project']:
assert isinstance(content['project']['optional-dependencies'], dict)
for k, v in content['project']['optional-dependencies'].items():
assert isinstance(v, list)
assert isinstance(k, str)
for k, v in content['project']['optional-dependencies'].items():
assert isinstance(v, list)
assert isinstance(k, str)
dependencies[k] = v
dependencies[k] = v
res = PyProject(
path=d,
dependencies=dependencies,
)
res = PyProject(
path=d,
dependencies=dependencies,
)
tool_name = 'online.fxreader.pr34'.replace('.', '-')
tool_name = 'online.fxreader.pr34'.replace('.', '-')
if 'tool' in content and isinstance(content['tool'], dict) and tool_name in content['tool'] and isinstance(content['tool'][tool_name], dict):
if 'early_features' in content['tool'][tool_name]:
res.early_features = content['tool'][tool_name]['early_features']
if (
'tool' in content and
isinstance(
content['tool'], dict
) and
tool_name in content['tool'] and
isinstance(
content['tool'][tool_name],
dict
)
):
if 'early_features' in content['tool'][tool_name]:
res.early_features = content['tool'][tool_name]['early_features']
if 'pip_find_links' in content['tool'][tool_name]:
res.pip_find_links = [d.parent / pathlib.Path(o) for o in content['tool'][tool_name]['pip_find_links']]
if 'pip_find_links' in content['tool'][tool_name]:
res.pip_find_links = [
d.parent / pathlib.Path(o)
for o in content['tool'][tool_name]['pip_find_links']
]
if 'runtime_libdirs' in content['tool'][tool_name]:
res.runtime_libdirs = [
d.parent / pathlib.Path(o)
# pathlib.Path(o)
for o in content['tool'][tool_name]['runtime_libdirs']
]
if 'runtime_libdirs' in content['tool'][tool_name]:
res.runtime_libdirs = [
d.parent / pathlib.Path(o)
# pathlib.Path(o)
for o in content['tool'][tool_name]['runtime_libdirs']
]
if 'runtime_preload' in content['tool'][tool_name]:
res.runtime_preload = [
d.parent / pathlib.Path(o)
# pathlib.Path(o)
for o in content['tool'][tool_name]['runtime_preload']
]
if 'runtime_preload' in content['tool'][tool_name]:
res.runtime_preload = [
d.parent / pathlib.Path(o)
# pathlib.Path(o)
for o in content['tool'][tool_name]['runtime_preload']
]
if 'requirements' in content['tool'][tool_name]:
assert isinstance(content['tool'][tool_name]['requirements'], dict)
if 'requirements' in content['tool'][tool_name]:
assert isinstance(content['tool'][tool_name]['requirements'], dict)
res.requirements = {
k: d.parent / pathlib.Path(v)
# pathlib.Path(o)
for k, v in content['tool'][tool_name]['requirements'].items()
}
res.requirements = {
k : d.parent / pathlib.Path(v)
# pathlib.Path(o)
for k, v in content['tool'][tool_name]['requirements'].items()
}
return res
return res
@dataclasses.dataclass
class BootstrapSettings:
env_path: pathlib.Path
python_path: pathlib.Path
base_dir: pathlib.Path
python_version: Optional[str] = dataclasses.field(
default_factory=lambda : os.environ.get(
'PYTHON_VERSION',
'%d.%d' % (
sys.version_info.major,
sys.version_info.minor,
),
).strip()
)
uv_args: list[str] = dataclasses.field(
default_factory=lambda : os.environ.get(
'UV_ARGS',
'--offline',
).split(),
)
env_path: pathlib.Path
python_path: pathlib.Path
base_dir: pathlib.Path
python_version: Optional[str] = dataclasses.field(
default_factory=lambda: os.environ.get(
'PYTHON_VERSION',
'%d.%d'
% (
sys.version_info.major,
sys.version_info.minor,
),
).strip()
)
uv_args: list[str] = dataclasses.field(
default_factory=lambda: os.environ.get(
'UV_ARGS',
'--offline',
).split(),
)
@classmethod
def get(
cls,
base_dir: Optional[pathlib.Path] = None,
) -> Self:
if base_dir is None:
base_dir = pathlib.Path.cwd()
@classmethod
def get(
cls,
base_dir: Optional[pathlib.Path] = None,
) -> Self:
if base_dir is None:
base_dir = pathlib.Path.cwd()
env_path = base_dir / '.venv'
python_path = env_path / 'bin' / 'python3'
env_path = base_dir / '.venv'
python_path = env_path / 'bin' / 'python3'
return cls(
base_dir=base_dir,
env_path=env_path,
python_path=python_path,
)
return cls(
base_dir=base_dir,
env_path=env_path,
python_path=python_path,
)
def env_bootstrap(
bootstrap_settings: BootstrapSettings,
pyproject: PyProject,
bootstrap_settings: BootstrapSettings,
pyproject: PyProject,
) -> None:
pip_find_links : list[pathlib.Path] = []
pip_find_links: list[pathlib.Path] = []
if not pyproject.pip_find_links is None:
pip_find_links.extend(pyproject.pip_find_links)
if not pyproject.pip_find_links is None:
pip_find_links.extend(pyproject.pip_find_links)
pip_find_links_args = sum([
['-f', str(o),]
for o in pip_find_links
], [])
pip_find_links_args = sum(
[
[
'-f',
str(o),
]
for o in pip_find_links
],
[],
)
features : list[str] = []
features: list[str] = []
if pyproject.early_features:
features.extend(pyproject.early_features)
if pyproject.early_features:
features.extend(pyproject.early_features)
requirements_python_version: Optional[str] = None
if not bootstrap_settings.python_version is None:
requirements_python_version = bootstrap_settings.python_version.replace('.', '_')
requirements_python_version: Optional[str] = None
if not bootstrap_settings.python_version is None:
requirements_python_version = bootstrap_settings.python_version.replace('.', '_')
requirements_name = '_'.join(sorted(features))
if requirements_python_version:
requirements_name += '_' + requirements_python_version
requirements_path: Optional[pathlib.Path] = None
if requirements_name in pyproject.requirements:
requirements_path = pyproject.requirements[requirements_name]
else:
requirements_path = pyproject.path.parent / 'requirements.txt'
requirements_in: list[str] = []
requirements_in.extend(['uv', 'pip', 'build', 'setuptools', 'meson-python', 'pybind11'])
if pyproject.early_features:
early_dependencies = sum([pyproject.dependencies[o] for o in pyproject.early_features], [])
logger.info(
dict(
requirements_name=requirements_name,
early_dependencies=early_dependencies,
)
)
requirements_in.extend(early_dependencies)
# if len(early_dependencies) > 0:
# subprocess.check_call([
# bootstrap_settings.python_path,
# '-m',
# 'uv', 'pip', 'install',
# *pip_find_links_args,
# # '-f', str(pathlib.Path(__file__).parent / 'deps' / 'dist'),
# *bootstrap_settings.uv_args,
# *early_dependencies,
# ])
if not requirements_path.exists():
with tempfile.NamedTemporaryFile(
mode='w',
prefix='requirements',
suffix='.in',
) as f:
f.write('\n'.join(requirements_in))
f.flush()
subprocess.check_call(
[
'uv',
'pip',
'compile',
'--generate-hashes',
*pip_find_links_args,
# '-p',
# bootstrap_settings.python_path,
*bootstrap_settings.uv_args,
'-o',
str(requirements_path),
f.name,
]
)
uv_python_version: list[str] = []
if not bootstrap_settings.python_version is None:
uv_python_version.extend(
[
'-p',
bootstrap_settings.python_version,
]
)
subprocess.check_call(
[
'uv',
'venv',
*uv_python_version,
*pip_find_links_args,
# '--seed',
*bootstrap_settings.uv_args,
str(bootstrap_settings.env_path),
]
)
subprocess.check_call(
[
'uv',
'pip',
'install',
*pip_find_links_args,
'-p',
bootstrap_settings.python_path,
'--require-hashes',
*bootstrap_settings.uv_args,
'-r',
str(requirements_path),
]
)
requirements_name = '_'.join(sorted(features))
def paths_equal(a: pathlib.Path | str, b: pathlib.Path | str) -> bool:
return os.path.abspath(str(a)) == os.path.abspath(str(b))
if requirements_python_version:
requirements_name += '_' + requirements_python_version
requirements_path : Optional[pathlib.Path] = None
if requirements_name in pyproject.requirements:
requirements_path = pyproject.requirements[requirements_name]
else:
requirements_path = pyproject.path.parent / 'requirements.txt'
requirements_in : list[str] = []
requirements_in.extend([
'uv', 'pip', 'build', 'setuptools', 'meson-python', 'pybind11'
])
if pyproject.early_features:
early_dependencies = sum([
pyproject.dependencies[o]
for o in pyproject.early_features
], [])
logger.info(dict(
requirements_name=requirements_name,
early_dependencies=early_dependencies,
))
requirements_in.extend(early_dependencies)
# if len(early_dependencies) > 0:
# subprocess.check_call([
# bootstrap_settings.python_path,
# '-m',
# 'uv', 'pip', 'install',
# *pip_find_links_args,
# # '-f', str(pathlib.Path(__file__).parent / 'deps' / 'dist'),
# *bootstrap_settings.uv_args,
# *early_dependencies,
# ])
if not requirements_path.exists():
with tempfile.NamedTemporaryFile(
mode='w',
prefix='requirements',
suffix='.in',
) as f:
f.write(
'\n'.join(requirements_in)
)
f.flush()
subprocess.check_call([
'uv',
'pip',
'compile',
'--generate-hashes',
*pip_find_links_args,
# '-p',
# bootstrap_settings.python_path,
*bootstrap_settings.uv_args,
'-o', str(requirements_path),
f.name,
])
uv_python_version: list[str] = []
if not bootstrap_settings.python_version is None:
uv_python_version.extend([
'-p', bootstrap_settings.python_version,
])
subprocess.check_call([
'uv', 'venv',
*uv_python_version,
*pip_find_links_args,
# '--seed',
*bootstrap_settings.uv_args,
str(bootstrap_settings.env_path)
])
subprocess.check_call([
'uv',
'pip',
'install',
*pip_find_links_args,
'-p',
bootstrap_settings.python_path,
'--require-hashes',
*bootstrap_settings.uv_args,
'-r', str(requirements_path),
])
def paths_equal(
a: pathlib.Path | str,
b: pathlib.Path | str
) -> bool:
return (
os.path.abspath(str(a)) ==
os.path.abspath(str(b))
)
def run(
d: Optional[pathlib.Path] = None,
cli_path: Optional[pathlib.Path] = None,
d: Optional[pathlib.Path] = None,
cli_path: Optional[pathlib.Path] = None,
) -> None:
if cli_path is None:
cli_path = pathlib.Path(__file__).parent / 'cli.py'
if cli_path is None:
cli_path = pathlib.Path(__file__).parent / 'cli.py'
if d is None:
d = pathlib.Path(__file__).parent / 'pyproject.toml'
if d is None:
d = pathlib.Path(__file__).parent / 'pyproject.toml'
bootstrap_settings = BootstrapSettings.get()
bootstrap_settings = BootstrapSettings.get()
pyproject : PyProject = pyproject_load(
d
)
pyproject: PyProject = pyproject_load(d)
logging.basicConfig(level=logging.INFO)
logging.basicConfig(level=logging.INFO)
if not bootstrap_settings.env_path.exists():
env_bootstrap(
bootstrap_settings=bootstrap_settings,
pyproject=pyproject,
)
if not bootstrap_settings.env_path.exists():
env_bootstrap(
bootstrap_settings=bootstrap_settings,
pyproject=pyproject,
)
logger.info([sys.executable, sys.argv, bootstrap_settings.python_path])
logger.info([sys.executable, sys.argv, bootstrap_settings.python_path])
if not paths_equal(sys.executable, bootstrap_settings.python_path):
os.execv(
str(bootstrap_settings.python_path),
[
str(bootstrap_settings.python_path),
*sys.argv,
]
)
if not paths_equal(sys.executable, bootstrap_settings.python_path):
os.execv(
str(bootstrap_settings.python_path),
[
str(bootstrap_settings.python_path),
*sys.argv,
],
)
os.execv(
str(bootstrap_settings.python_path),
[
str(bootstrap_settings.python_path),
str(cli_path),
*sys.argv[1:],
],
)
os.execv(
str(bootstrap_settings.python_path),
[
str(bootstrap_settings.python_path),
str(
cli_path
),
*sys.argv[1:],
]
)
if __name__ == '__main__':
run()
run()

@ -3,88 +3,95 @@ import os
import cryptography.hazmat.primitives.kdf.scrypt
from typing import (Literal, overload, Optional,)
from typing import (
Literal,
overload,
Optional,
)
class PasswordUtils:
@overload
@classmethod
def secret_hash(
cls,
secret: str | bytes,
mode: Literal['base64'],
salt: Optional[bytes] = None,
) -> tuple[str, str]: ...
@overload
@classmethod
def secret_hash(
cls,
secret: str | bytes,
mode: Literal['base64'],
salt: Optional[bytes] = None,
) -> tuple[str, str]: ...
@overload
@classmethod
def secret_hash(
cls,
secret: str | bytes,
mode: Literal['bytes'],
salt: Optional[bytes] = None,
) -> tuple[bytes, bytes]: ...
@overload
@classmethod
def secret_hash(
cls,
secret: str | bytes,
mode: Literal['bytes'],
salt: Optional[bytes] = None,
) -> tuple[bytes, bytes]: ...
@classmethod
def secret_hash(
cls,
secret: str | bytes,
mode: Literal['bytes', 'base64'],
salt: Optional[bytes] = None,
) -> tuple[str, str] | tuple[bytes, bytes]:
if salt is None:
salt = os.urandom(16)
@classmethod
def secret_hash(
cls,
secret: str | bytes,
mode: Literal['bytes', 'base64'],
salt: Optional[bytes] = None,
) -> tuple[str, str] | tuple[bytes, bytes]:
if salt is None:
salt = os.urandom(16)
if isinstance(secret, str):
secret = secret.encode('utf-8')
# derive
kdf = cls._scrypt_init(salt=salt)
if isinstance(secret, str):
secret = secret.encode('utf-8')
# derive
kdf = cls._scrypt_init(salt=salt)
hashed_secret = kdf.derive(secret)
hashed_secret = kdf.derive(secret)
if mode == 'bytes':
return (salt, hashed_secret)
elif mode == 'base64':
res_tuple = tuple((
base64.b64encode(o).decode('utf-8')
for o in (salt, hashed_secret,)
))
return (res_tuple[0], res_tuple[1])
else:
raise NotImplementedError
if mode == 'bytes':
return (salt, hashed_secret)
elif mode == 'base64':
res_tuple = tuple(
(
base64.b64encode(o).decode('utf-8')
for o in (
salt,
hashed_secret,
)
)
)
return (res_tuple[0], res_tuple[1])
else:
raise NotImplementedError
@classmethod
def _scrypt_init(
cls,
salt: bytes
) -> cryptography.hazmat.primitives.kdf.scrypt.Scrypt:
return cryptography.hazmat.primitives.kdf.scrypt.Scrypt(
salt=salt,
length=32,
n=2**14,
r=8,
p=1,
)
@classmethod
def _scrypt_init(cls, salt: bytes) -> cryptography.hazmat.primitives.kdf.scrypt.Scrypt:
return cryptography.hazmat.primitives.kdf.scrypt.Scrypt(
salt=salt,
length=32,
n=2**14,
r=8,
p=1,
)
@classmethod
def secret_check(
cls,
secret: str | bytes,
salt: str | bytes,
hashed_secret: str | bytes,
) -> bool:
if isinstance(salt, str):
salt = base64.b64decode(salt)
@classmethod
def secret_check(
cls,
secret: str | bytes,
salt: str | bytes,
hashed_secret: str | bytes,
) -> bool:
if isinstance(salt, str):
salt = base64.b64decode(salt)
if isinstance(secret, str):
secret = secret.encode('utf-8')
if isinstance(secret, str):
secret = secret.encode('utf-8')
if isinstance(hashed_secret, str):
hashed_secret = base64.b64decode(hashed_secret)
if isinstance(hashed_secret, str):
hashed_secret = base64.b64decode(hashed_secret)
kdf = cls._scrypt_init(salt=salt)
kdf = cls._scrypt_init(salt=salt)
try:
kdf.verify(secret, hashed_secret)
return True
except cryptography.exceptions.InvalidKey:
return False
try:
kdf.verify(secret, hashed_secret)
return True
except cryptography.exceptions.InvalidKey:
return False

@ -1,35 +1,39 @@
import os
import logging
from typing import (Optional,)
from typing import (
Optional,
)
logger = logging.getLogger(__name__)
class DebugPy:
@classmethod
def set_trace(
cls,
host: Optional[str] = None,
port: Optional[int] = None,
wait: Optional[bool] = None,
) -> None:
if host is None:
host = '127.0.0.1'
if port is None:
port = 4444
if wait is None:
wait = True
@classmethod
def set_trace(
cls,
host: Optional[str] = None,
port: Optional[int] = None,
wait: Optional[bool] = None,
) -> None:
if host is None:
host = '127.0.0.1'
if port is None:
port = 4444
if wait is None:
wait = True
import debugpy
import debugpy
if os.environ.get('DEBUGPY_RUNNING') != 'true':
logger.info('debugpy init')
import debugpy
debugpy.listen((host, port))
os.environ['DEBUGPY_RUNNING'] = 'true'
if os.environ.get('DEBUGPY_RUNNING') != 'true':
logger.info('debugpy init')
import debugpy
if wait:
debugpy.wait_for_client()
debugpy.breakpoint()
debugpy.listen((host, port))
os.environ['DEBUGPY_RUNNING'] = 'true'
logger.info('debugpy done')
if wait:
debugpy.wait_for_client()
debugpy.breakpoint()
logger.info('debugpy done')

@ -1,16 +1,14 @@
import logging
from typing import (Optional,)
from typing import (
Optional,
)
def setup(level: Optional[int] = None) -> None:
if level is None:
level = logging.INFO
if level is None:
level = logging.INFO
logging.basicConfig(
level=level,
format=(
'%(levelname)s:%(name)s:%(message)s'
':%(process)d'
':%(asctime)s'
':%(pathname)s:%(funcName)s:%(lineno)s'
),
)
logging.basicConfig(
level=level,
format=('%(levelname)s:%(name)s:%(message)s:%(process)d:%(asctime)s:%(pathname)s:%(funcName)s:%(lineno)s'),
)

@ -9,208 +9,232 @@ import logging
import sys
import argparse
from pydantic import (Field,)
from pydantic import (
Field,
)
from typing import (ClassVar, Generator, Annotated, Optional, Any,)
from typing import (
ClassVar,
Generator,
Annotated,
Optional,
Any,
)
logger = logging.getLogger(__name__)
@pydantic.dataclasses.dataclass
class MypyFormatEntry:
name : str
value : str
name: str
value: str
def __eq__(self, other: object) -> bool:
if not isinstance(other, type(self)):
raise NotImplementedError
def __eq__(self, other: object) -> bool:
if not isinstance(other, type(self)):
raise NotImplementedError
return self.value == other.value
return self.value == other.value
class MypyFormat:
vscode : ClassVar[MypyFormatEntry] = MypyFormatEntry(name='vscode', value='vscode')
json : ClassVar[MypyFormatEntry] = MypyFormatEntry(name='json', value='json')
vscode: ClassVar[MypyFormatEntry] = MypyFormatEntry(name='vscode', value='vscode')
json: ClassVar[MypyFormatEntry] = MypyFormatEntry(name='json', value='json')
@classmethod
def from_value(cls, value: str) -> MypyFormatEntry:
for e in cls.entries():
if value == e.value:
return e
@classmethod
def from_value(cls, value: str) -> MypyFormatEntry:
for e in cls.entries():
if value == e.value:
return e
raise NotImplementedError
raise NotImplementedError
@classmethod
def entries(
cls,
) -> Generator[
MypyFormatEntry,
None,
None,
]:
for o in dir(cls):
e = getattr(cls, o)
if not isinstance(e, MypyFormatEntry):
continue
@classmethod
def entries(cls) -> Generator[MypyFormatEntry, None, None,]:
for o in dir(cls):
e = getattr(cls, o)
if not isinstance(e, MypyFormatEntry):
continue
yield e
yield e
class MypySettings(pydantic_settings.BaseSettings):
model_config = pydantic_settings.SettingsConfigDict(
env_prefix='online_fxreader_pr34_mypy_',
case_sensitive=False,
)
model_config = pydantic_settings.SettingsConfigDict(
env_prefix='online_fxreader_pr34_mypy_',
case_sensitive=False,
)
config_path: pathlib.Path = pathlib.Path.cwd() / '.mypy.ini'
max_errors: dict[str, int] = dict()
paths: Annotated[list[pathlib.Path], Field(default_factory=lambda: ['.'])]
config_path : pathlib.Path = pathlib.Path.cwd() / '.mypy.ini'
max_errors : dict[str, int] = dict()
paths : Annotated[list[pathlib.Path], Field(default_factory=lambda : ['.'])]
def run(
argv: Optional[list[str]] = None,
settings: Optional[MypySettings] = None,
argv: Optional[list[str]] = None,
settings: Optional[MypySettings] = None,
) -> None:
if argv is None:
argv = []
if argv is None:
argv = []
if settings is None:
settings = MypySettings()
if settings is None:
settings = MypySettings()
parser = argparse.ArgumentParser()
parser.add_argument(
'-q', '--quiet',
dest='quiet',
action='store_true',
help='do not print anything if the program is correct according to max_errors limits',
default=False,
)
parser.add_argument(
'-i',
dest='paths',
help='specify paths to check',
default=[],
action='append',
)
parser.add_argument(
'-f', '--format',
dest='_format',
help='output format of errors',
default=MypyFormat.json.value,
choices=[
o.value
for o in MypyFormat.entries()
],
)
options, args = parser.parse_known_args(argv)
parser = argparse.ArgumentParser()
parser.add_argument(
'-q',
'--quiet',
dest='quiet',
action='store_true',
help='do not print anything if the program is correct according to max_errors limits',
default=False,
)
parser.add_argument(
'-i',
dest='paths',
help='specify paths to check',
default=[],
action='append',
)
parser.add_argument(
'-f',
'--format',
dest='_format',
help='output format of errors',
default=MypyFormat.json.value,
choices=[o.value for o in MypyFormat.entries()],
)
options, args = parser.parse_known_args(argv)
if len(args) > 0 and args[0] == '--':
del args[0]
if len(args) > 0 and args[0] == '--':
del args[0]
options.format = MypyFormat.from_value(options._format)
options.format = MypyFormat.from_value(options._format)
if len(options.paths) == 0:
options.paths.extend(settings.paths)
if len(options.paths) == 0:
options.paths.extend(settings.paths)
started_at = datetime.datetime.now()
started_at = datetime.datetime.now()
mypy_cmd = [
sys.executable,
'-m',
'mypy',
'--config-file', str(settings.config_path),
'--strict',
'-O',
'json',
*args,
*options.paths,
]
mypy_cmd = [
sys.executable,
'-m',
'mypy',
'--config-file',
str(settings.config_path),
'--strict',
'-O',
'json',
*args,
*options.paths,
]
logger.info(dict(cmd=mypy_cmd))
logger.info(dict(cmd=mypy_cmd))
res = subprocess.run(
mypy_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
res = subprocess.run(
mypy_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
done_at = datetime.datetime.now()
done_at = datetime.datetime.now()
try:
assert not res.returncode is None
try:
assert not res.returncode is None
errors = sorted(
[json.loads(o) for o in res.stdout.decode('utf-8').splitlines() if not o.strip() == ''],
key=lambda x: (
x.get('file', ''),
x.get('line', 0),
),
)
errors = sorted([
json.loads(o)
for o in res.stdout.decode('utf-8').splitlines()
if not o.strip() == ''
], key=lambda x: (
x.get('file', ''),
x.get('line', 0),
))
if not options.quiet:
if (len(res.stderr)) > 0:
logger.error(res.stderr.decode('utf-8'))
except:
logger.exception('')
logger.error(res.stdout.decode('utf-8'))
logger.error(res.stderr.decode('utf-8'))
sys.exit(res.returncode)
if not options.quiet:
if (len(res.stderr)) > 0:
logger.error(res.stderr.decode('utf-8'))
except:
logger.exception('')
logger.error(res.stdout.decode('utf-8'))
logger.error(res.stderr.decode('utf-8'))
sys.exit(res.returncode)
g: dict[str, Any] = dict()
for o in errors:
if not o['file'] in g:
g[o['file']] = []
g[o['file']].append(o)
h = {
k: len(v)
for k, v in sorted(
list(g.items()),
key=lambda x: x[0],
)
}
g : dict[str, Any] = dict()
for o in errors:
if not o['file'] in g:
g[o['file']] = []
g[o['file']].append(o)
mentioned_paths = marisa_trie.Trie(list(h))
h = {
k : len(v)
for k, v in sorted(
list(g.items()),
key=lambda x: x[0],
)
}
violated_limits: dict[str, str] = dict()
mentioned_paths = marisa_trie.Trie(list(h))
for k, v in settings.max_errors.items():
matching_paths = mentioned_paths.keys(k)
total_errors = sum([h[o] for o in matching_paths], 0)
violated_limits : dict[str, str] = dict()
if total_errors > v:
violated_limits[k] = '%s - [%s]: has %d errors > %d' % (
k,
', '.join(matching_paths),
total_errors,
v,
)
for k, v in settings.max_errors.items():
matching_paths = mentioned_paths.keys(k)
total_errors = sum([
h[o]
for o in matching_paths
], 0)
if len(violated_limits) > 0 or not options.quiet:
if options.format == MypyFormat.vscode:
for o in errors:
sys.stdout.write(
'[%s] %s:%d,%d %s - %s - %s\n'
% (
o['severity'],
o['file'],
o['line'],
o['column'],
o['message'],
o['hint'],
o['code'],
)
)
sys.stdout.flush()
# logger.info(json.dumps(errors, indent=4))
else:
logger.info(json.dumps(errors, indent=4))
if total_errors > v:
violated_limits[k] = '%s - [%s]: has %d errors > %d' % (
k, ', '.join(matching_paths), total_errors, v,
)
# if len(violated_limits) > 0:
# logger.info(json.dumps(violated_limits, indent=4))
logger.info(
json.dumps(
dict(
max_errors=settings.max_errors,
violated_limits=violated_limits,
histogram=h,
elapsed=(done_at - started_at).total_seconds(),
),
indent=4,
)
)
if len(violated_limits) > 0 or not options.quiet:
if options.format == MypyFormat.vscode:
for o in errors:
sys.stdout.write('[%s] %s:%d,%d %s - %s - %s\n' % (
o['severity'],
o['file'],
o['line'],
o['column'],
o['message'],
o['hint'],
o['code'],
))
sys.stdout.flush()
#logger.info(json.dumps(errors, indent=4))
else:
logger.info(json.dumps(errors, indent=4))
if len(violated_limits) > 0:
sys.exit(1)
#if len(violated_limits) > 0:
# logger.info(json.dumps(violated_limits, indent=4))
logger.info(json.dumps(dict(
max_errors=settings.max_errors,
violated_limits=violated_limits,
histogram=h,
elapsed=(done_at - started_at).total_seconds(),
), indent=4))
if len(violated_limits) > 0:
sys.exit(1)
if __name__ == '__main__':
from . import logging as _logging
_logging.setup()
run(sys.argv[1:])
from . import logging as _logging
_logging.setup()
run(sys.argv[1:])

@ -11,112 +11,115 @@ import dataclasses
logger = logging.getLogger(__name__)
from typing import (overload, Optional, Literal, Any, Annotated,)
from typing import (
overload,
Optional,
Literal,
Any,
Annotated,
)
from .cli_bootstrap import PyProject
@overload
def shutil_which(
name: str,
raise_on_failure: Literal[True],
name: str,
raise_on_failure: Literal[True],
) -> str: ...
@overload
def shutil_which(
name: str,
raise_on_failure: bool,
name: str,
raise_on_failure: bool,
) -> Optional[str]: ...
def shutil_which(
name: str,
raise_on_failure: bool,
name: str,
raise_on_failure: bool,
) -> Optional[str]:
res = shutil.which(name)
if res is None and raise_on_failure:
raise NotImplementedError
else:
return res
res = shutil.which(name)
if res is None and raise_on_failure:
raise NotImplementedError
else:
return res
def runtime_libdirs_init(
project: PyProject,
project: PyProject,
) -> None:
if sys.platform == 'linux':
ld_library_path : list[pathlib.Path] = [
o
for o in [
*[
o.absolute()
for o in (
project.runtime_libdirs
if project.runtime_libdirs
else []
)
],
*[
pathlib.Path(o)
for o in os.environ.get(
'LD_LIBRARY_PATH',
''
).split(os.path.pathsep)
if o != ''
]
]
]
if sys.platform == 'linux':
ld_library_path: list[pathlib.Path] = [
o
for o in [
*[o.absolute() for o in (project.runtime_libdirs if project.runtime_libdirs else [])],
*[pathlib.Path(o) for o in os.environ.get('LD_LIBRARY_PATH', '').split(os.path.pathsep) if o != ''],
]
]
ld_library_path_present : list[pathlib.Path] = []
ld_library_path_present: list[pathlib.Path] = []
for o in ld_library_path:
if not o.exists():
logger.warning(dict(
ld_library_path=o,
msg='not found',
))
for o in ld_library_path:
if not o.exists():
logger.warning(
dict(
ld_library_path=o,
msg='not found',
)
)
ld_library_path_present.append(o)
ld_library_path_present.append(o)
os.environ.update(
LD_LIBRARY_PATH=os.path.pathsep.join([
str(o) for o in ld_library_path_present
])
)
os.environ.update(LD_LIBRARY_PATH=os.path.pathsep.join([str(o) for o in ld_library_path_present]))
for preload_path in (project.runtime_preload or []):
for preload_found in glob.glob(str(
preload_path.parent / ('lib%s.so' % preload_path.name)
)):
logger.info(dict(
preload_path=preload_path, preload_found=preload_found,
# lib_path=o,
msg='load_library',
))
for preload_path in project.runtime_preload or []:
for preload_found in glob.glob(str(preload_path.parent / ('lib%s.so' % preload_path.name))):
logger.info(
dict(
preload_path=preload_path,
preload_found=preload_found,
# lib_path=o,
msg='load_library',
)
)
ctypes.cdll.LoadLibrary(preload_found)
else:
raise NotImplementedError
ctypes.cdll.LoadLibrary(preload_found)
else:
raise NotImplementedError
class interfaces_index_t:
@dataclasses.dataclass
class Interface:
@dataclasses.dataclass
class AddrInfo:
family: str
local: str
@dataclasses.dataclass
class Interface:
@dataclasses.dataclass
class AddrInfo:
family: str
local: str
name: Annotated[
str,
pydantic.Field(
alias='ifname',
),
]
addr_info: list[AddrInfo]
name: Annotated[
str,
pydantic.Field(
alias='ifname',
)
]
addr_info: list[AddrInfo]
def interfaces_index() -> list[interfaces_index_t.Interface]:
res = pydantic.RootModel[
list[interfaces_index_t.Interface]
].model_validate_json(
subprocess.check_output([
'ip', '-j', 'addr',
]).decode('utf-8')
).root
res = (
pydantic.RootModel[list[interfaces_index_t.Interface]]
.model_validate_json(
subprocess.check_output(
[
'ip',
'-j',
'addr',
]
).decode('utf-8')
)
.root
)
return res
return res

@ -11,514 +11,470 @@ import logging
import typing
if typing.TYPE_CHECKING:
import pip._internal.commands.show
import pip._internal.commands.download
import pip._internal.cli.main_parser
import pip._internal.models.index
import pip._internal.utils.temp_dir
import pip._internal.cli.main
import pip._internal.network.download
import pip._internal.resolution.base
import pip._internal.resolution.resolvelib.resolver
import pip._internal.operations.prepare
import pip._internal.commands.show
import pip._internal.commands.download
import pip._internal.cli.main_parser
import pip._internal.models.index
import pip._internal.utils.temp_dir
import pip._internal.cli.main
import pip._internal.network.download
import pip._internal.resolution.base
import pip._internal.resolution.resolvelib.resolver
import pip._internal.operations.prepare
from typing import (
Literal, Optional, Iterable, Any,
Literal,
Optional,
Iterable,
Any,
)
logger = logging.getLogger(__name__)
def pip_show(
argv: list[str],
argv: list[str],
) -> list['pip._internal.commands.show._PackageInfo']:
import pip._internal.commands.show
return list(
pip._internal.commands.show.search_packages_info(
argv,
)
)
import pip._internal.commands.show
return list(
pip._internal.commands.show.search_packages_info(
argv,
)
)
class pip_resolve_t:
class kwargs_t:
class mode_t(enum.StrEnum):
copy_paste = "copy_paste"
monkey_patch = "monkey_patch"
uv_pip_freeze = "uv_pip_freeze"
uv_pip_compile = "uv_pip_compile"
class kwargs_t:
class mode_t(enum.StrEnum):
copy_paste = 'copy_paste'
monkey_patch = 'monkey_patch'
uv_pip_freeze = 'uv_pip_freeze'
uv_pip_compile = 'uv_pip_compile'
@dataclasses.dataclass
class res_t:
@dataclasses.dataclass
class download_info_t:
url: str
sha256: str
constraint: str
@dataclasses.dataclass
class res_t:
@dataclasses.dataclass
class download_info_t:
url: str
sha256: str
constraint: str
txt: Optional[str] = None
entries: Optional[list[download_info_t]] = None
txt: Optional[str] = None
entries: Optional[list[download_info_t]] = None
def pip_resolve_entries_to_txt(
entries: list[pip_resolve_t.res_t.download_info_t]
) -> str:
return '\n'.join([
'#%s\n%s %s' % (
o.url,
o.constraint,
' '.join([
'--hash=sha256:%s' % o2
for o2 in o.sha256
])
)
for o in entries
])
def pip_resolve_entries_to_txt(entries: list[pip_resolve_t.res_t.download_info_t]) -> str:
return '\n'.join(['#%s\n%s %s' % (o.url, o.constraint, ' '.join(['--hash=sha256:%s' % o2 for o2 in o.sha256])) for o in entries])
def pip_resolve(
argv: list[str],
mode: pip_resolve_t.kwargs_t.mode_t,
requirements: Optional[list[str]] = None,
argv: list[str],
mode: pip_resolve_t.kwargs_t.mode_t,
requirements: Optional[list[str]] = None,
) -> pip_resolve_t.res_t:
if mode is pip_resolve_t.kwargs_t.mode_t.copy_paste:
import pip._internal.commands.show
import pip._internal.commands.download
import pip._internal.cli.cmdoptions
import pip._internal.cli.main_parser
import pip._internal.models.index
import pip._internal.utils.temp_dir
import pip._internal.cli.main
import pip._internal.network.download
import pip._internal.resolution.base
import pip._internal.req.req_install
import pip._internal.resolution.resolvelib.resolver
import pip._internal.operations.prepare
import pip._internal.utils.temp_dir
import pip._internal.operations.build.build_tracker
import pip._internal.models.direct_url
if mode is pip_resolve_t.kwargs_t.mode_t.copy_paste:
import pip._internal.commands.show
import pip._internal.commands.download
import pip._internal.cli.cmdoptions
import pip._internal.cli.main_parser
import pip._internal.models.index
import pip._internal.utils.temp_dir
import pip._internal.cli.main
import pip._internal.network.download
import pip._internal.resolution.base
import pip._internal.req.req_install
import pip._internal.resolution.resolvelib.resolver
import pip._internal.operations.prepare
import pip._internal.utils.temp_dir
import pip._internal.operations.build.build_tracker
import pip._internal.models.direct_url
with contextlib.ExitStack() as stack:
stack.enter_context(pip._internal.utils.temp_dir.global_tempdir_manager())
with contextlib.ExitStack() as stack:
stack.enter_context(pip._internal.utils.temp_dir.global_tempdir_manager())
t2 = pip._internal.cli.main_parser.create_main_parser()
t2 = pip._internal.cli.main_parser.create_main_parser()
t3 = t2.parse_args(["download"])
t1 = pip._internal.commands.download.DownloadCommand("blah", "shit")
t3 = t2.parse_args(['download'])
t1 = pip._internal.commands.download.DownloadCommand('blah', 'shit')
stack.enter_context(t1.main_context())
stack.enter_context(t1.main_context())
# options = pip._internal.commands.download.Values()
options = t3[0]
options.python_version = None
options.platforms = []
options.abis = []
options.implementation = []
options.format_control = None
options.ignore_dependencies = None
options.index_url = pip._internal.models.index.PyPI.simple_url
options.extra_index_urls = []
options.no_index = None
options.find_links = []
options.pre = None
options.prefer_binary = True
options.only_binary = True
options.constraints = []
options.use_pep517 = None
options.editables = []
options.requirements = []
options.src_dir = str(pathlib.Path(__file__).parent)
options.build_isolation = None
options.check_build_deps = None
options.progress_bar = True
options.require_hashes = None
options.ignore_requires_python = False
# options.cache_dir
pip._internal.cli.cmdoptions.check_dist_restriction(options)
# t1._in_main_context = True
session = t1.get_default_session(options)
target_python = pip._internal.cli.cmdoptions.make_target_python(options)
finder = t1._build_package_finder(
options=options,
session=session,
target_python=target_python,
ignore_requires_python=options.ignore_requires_python,
)
build_tracker = t1.enter_context(
pip._internal.operations.build.build_tracker.get_build_tracker()
)
reqs = t1.get_requirements(
[
#'pip', 'uv', 'ipython',
*argv,
],
options,
finder,
session,
)
pip._internal.req.req_install.check_legacy_setup_py_options(options, reqs)
directory = pip._internal.utils.temp_dir.TempDirectory(
delete=True, kind="download", globally_managed=True
)
preparer = t1.make_requirement_preparer(
temp_build_dir=directory,
options=options,
build_tracker=build_tracker,
session=session,
finder=finder,
download_dir=None,
use_user_site=False,
verbosity=False,
)
resolver = t1.make_resolver(
preparer=preparer,
finder=finder,
options=options,
ignore_requires_python=options.ignore_requires_python,
use_pep517=options.use_pep517,
py_version_info=options.python_version,
)
t1.trace_basic_info(finder)
requirement_set = resolver.resolve(reqs, check_supported_wheels=True)
# options = pip._internal.commands.download.Values()
options = t3[0]
options.python_version = None
options.platforms = []
options.abis = []
options.implementation = []
options.format_control = None
options.ignore_dependencies = None
options.index_url = pip._internal.models.index.PyPI.simple_url
options.extra_index_urls = []
options.no_index = None
options.find_links = []
options.pre = None
options.prefer_binary = True
options.only_binary = True
options.constraints = []
options.use_pep517 = None
options.editables = []
options.requirements = []
options.src_dir = str(pathlib.Path(__file__).parent)
options.build_isolation = None
options.check_build_deps = None
options.progress_bar = True
options.require_hashes = None
options.ignore_requires_python = False
# options.cache_dir
pip._internal.cli.cmdoptions.check_dist_restriction(options)
# t1._in_main_context = True
session = t1.get_default_session(options)
target_python = pip._internal.cli.cmdoptions.make_target_python(options)
finder = t1._build_package_finder(
options=options,
session=session,
target_python=target_python,
ignore_requires_python=options.ignore_requires_python,
)
build_tracker = t1.enter_context(pip._internal.operations.build.build_tracker.get_build_tracker())
reqs = t1.get_requirements(
[
#'pip', 'uv', 'ipython',
*argv,
],
options,
finder,
session,
)
pip._internal.req.req_install.check_legacy_setup_py_options(options, reqs)
directory = pip._internal.utils.temp_dir.TempDirectory(delete=True, kind='download', globally_managed=True)
preparer = t1.make_requirement_preparer(
temp_build_dir=directory,
options=options,
build_tracker=build_tracker,
session=session,
finder=finder,
download_dir=None,
use_user_site=False,
verbosity=False,
)
resolver = t1.make_resolver(
preparer=preparer,
finder=finder,
options=options,
ignore_requires_python=options.ignore_requires_python,
use_pep517=options.use_pep517,
py_version_info=options.python_version,
)
t1.trace_basic_info(finder)
requirement_set = resolver.resolve(reqs, check_supported_wheels=True)
res = pip_resolve_t.res_t()
res = pip_resolve_t.res_t()
res.entries = []
res.entries = []
for k, v in requirement_set.requirements.items():
assert not v.download_info is None
assert isinstance(
v.download_info.info,
pip._internal.models.direct_url.ArchiveInfo,
)
assert not v.download_info.info.hashes is None
for k, v in requirement_set.requirements.items():
assert not v.download_info is None
assert isinstance(
v.download_info.info,
pip._internal.models.direct_url.ArchiveInfo,
)
assert not v.download_info.info.hashes is None
res.entries.append(
pip_resolve_t.res_t.download_info_t(
constraint=k,
sha256=v.download_info.info.hashes["sha256"],
url=v.download_info.url,
)
)
res.entries.append(
pip_resolve_t.res_t.download_info_t(
constraint=k,
sha256=v.download_info.info.hashes['sha256'],
url=v.download_info.url,
)
)
res.txt = pip_resolve_entries_to_txt(
res.entries
)
res.txt = pip_resolve_entries_to_txt(res.entries)
return res
elif mode is pip_resolve_t.kwargs_t.mode_t.monkey_patch:
import pip._internal.commands.show
import pip._internal.commands.download
import pip._internal.cli.main_parser
import pip._internal.models.index
import pip._internal.models.link
from pip._internal.models.link import (
Link,
)
import pip._internal.utils.temp_dir
from pip._internal.metadata.base import (
BaseDistribution,
)
import pip._internal.cli.main
import pip._internal.network.download
import pip._internal.resolution.base
import pip._internal.resolution.resolvelib.resolver
import pip._internal.operations.prepare
from pip._internal.network.download import (
Downloader,
)
from pip._internal.operations.prepare import (
File,
)
from pip._internal.req.req_set import RequirementSet
from pip._internal.utils.hashes import Hashes
from pip._internal.req.req_install import InstallRequirement
return res
elif mode is pip_resolve_t.kwargs_t.mode_t.monkey_patch:
import pip._internal.commands.show
import pip._internal.commands.download
import pip._internal.cli.main_parser
import pip._internal.models.index
import pip._internal.models.link
from pip._internal.models.link import (
Link,
)
import pip._internal.utils.temp_dir
from pip._internal.metadata.base import (
BaseDistribution,
)
import pip._internal.cli.main
import pip._internal.network.download
import pip._internal.resolution.base
import pip._internal.resolution.resolvelib.resolver
import pip._internal.operations.prepare
from pip._internal.network.download import (
Downloader,
)
from pip._internal.operations.prepare import (
File,
)
from pip._internal.req.req_set import RequirementSet
from pip._internal.utils.hashes import Hashes
from pip._internal.req.req_install import InstallRequirement
downloader_call_def = pip._internal.network.download.Downloader.__call__
downloader_call_def = pip._internal.network.download.Downloader.__call__
def downloader_call(
_self: pip._internal.network.download.Downloader,
link: pip._internal.models.link.Link,
location: str,
) -> tuple[str, str]:
logger.info(
dict(
url=link.url,
)
)
def downloader_call(
_self: pip._internal.network.download.Downloader,
link: pip._internal.models.link.Link,
location: str,
) -> tuple[str, str]:
logger.info(
dict(
url=link.url,
)
)
return downloader_call_def(
_self,
link, location,
)
return downloader_call_def(
_self,
link,
location,
)
batch_downloader_call_def = (
pip._internal.network.download.BatchDownloader.__call__
)
batch_downloader_call_def = pip._internal.network.download.BatchDownloader.__call__
def batch_downloader_call(
_self: pip._internal.network.download.BatchDownloader,
links: Iterable[pip._internal.models.link.Link],
location: str,
) -> Iterable[
tuple[
pip._internal.models.link.Link,
tuple[str, str]
]
]:
# print(args)
def batch_downloader_call(
_self: pip._internal.network.download.BatchDownloader,
links: Iterable[pip._internal.models.link.Link],
location: str,
) -> Iterable[tuple[pip._internal.models.link.Link, tuple[str, str]]]:
# print(args)
logger.info(
dict(
links=links,
location=location,
)
)
logger.info(
dict(
links=links,
location=location,
)
)
return [
(o, ("/dev/null", ''))
for o in links
]
return [(o, ('/dev/null', '')) for o in links]
# base_resolver_resolve_def = pip._internal.resolution.base.BaseResolver.resolve
base_resolver_resolve_def = (
pip._internal.resolution.resolvelib.resolver.Resolver.resolve
)
# base_resolver_resolve_def = pip._internal.resolution.base.BaseResolver.resolve
base_resolver_resolve_def = pip._internal.resolution.resolvelib.resolver.Resolver.resolve
result_requirements : list[
RequirementSet | InstallRequirement
] = []
result_requirements: list[RequirementSet | InstallRequirement] = []
def base_resolver_resolve(
_self: pip._internal.resolution.resolvelib.resolver.Resolver,
root_reqs: list[
InstallRequirement,
],
check_supported_wheels: bool,
) -> RequirementSet:
# print(args, kwargs)
def base_resolver_resolve(
_self: pip._internal.resolution.resolvelib.resolver.Resolver,
root_reqs: list[InstallRequirement,],
check_supported_wheels: bool,
) -> RequirementSet:
# print(args, kwargs)
res = base_resolver_resolve_def(
_self,
root_reqs,
check_supported_wheels
)
res = base_resolver_resolve_def(_self, root_reqs, check_supported_wheels)
result_requirements.append(res)
raise NotImplementedError
return res
result_requirements.append(res)
raise NotImplementedError
return res
get_http_url_def = pip._internal.operations.prepare.get_http_url
get_http_url_def = pip._internal.operations.prepare.get_http_url
def get_http_url(
link: Link,
download: Downloader,
download_dir: Optional[str] = None,
hashes: Optional[Hashes] = None,
) -> File:
logger.info(
dict(
url=link.url,
hashes=hashes,
)
)
def get_http_url(
link: Link,
download: Downloader,
download_dir: Optional[str] = None,
hashes: Optional[Hashes] = None,
) -> File:
logger.info(
dict(
url=link.url,
hashes=hashes,
)
)
if link.url.endswith(".whl"):
print("blah")
hashes = None
if link.url.endswith('.whl'):
print('blah')
hashes = None
return File(
"/dev/null",
'',
)
else:
return get_http_url_def(
link,
download,
download_dir,
hashes
)
return File(
'/dev/null',
'',
)
else:
return get_http_url_def(link, download, download_dir, hashes)
prepare_linked_requirements_more_def = pip._internal.operations.prepare.RequirementPreparer.prepare_linked_requirements_more
prepare_linked_requirements_more_def = pip._internal.operations.prepare.RequirementPreparer.prepare_linked_requirements_more
def prepare_linked_requirements_more(
_self: pip._internal.resolution.resolvelib.resolver.Resolver,
reqs: Iterable[InstallRequirement],
parallel_builds: bool = False,
) -> None:
result_requirements.extend(
reqs
)
raise NotImplementedError
def prepare_linked_requirements_more(
_self: pip._internal.resolution.resolvelib.resolver.Resolver,
reqs: Iterable[InstallRequirement],
parallel_builds: bool = False,
) -> None:
result_requirements.extend(reqs)
raise NotImplementedError
_complete_partial_requirements_def = pip._internal.operations.prepare.RequirementPreparer._complete_partial_requirements
_complete_partial_requirements_def = pip._internal.operations.prepare.RequirementPreparer._complete_partial_requirements
def _complete_partial_requirements(
_self: pip._internal.resolution.resolvelib.resolver.Resolver,
partially_downloaded_reqs: Iterable[InstallRequirement],
parallel_builds: bool = False,
) -> None:
result_requirements.extend(
partially_downloaded_reqs
)
raise NotImplementedError
def _complete_partial_requirements(
_self: pip._internal.resolution.resolvelib.resolver.Resolver,
partially_downloaded_reqs: Iterable[InstallRequirement],
parallel_builds: bool = False,
) -> None:
result_requirements.extend(partially_downloaded_reqs)
raise NotImplementedError
patches : list[Any] = []
patches: list[Any] = []
patches.append(
unittest.mock.patch.object(
pip._internal.network.download.Downloader, "__call__", downloader_call
)
)
# patches.append(
# unittest.mock.patch.object(
# pip._internal.network.download.BatchDownloader,
# '__call__',
# batch_downloader_call
# )
# )
# patches.append(
# unittest.mock.patch.object(
# pip._internal.resolution.base.BaseResolver, 'resolve', base_resolver_resolve))
patches.append(unittest.mock.patch.object(pip._internal.network.download.Downloader, '__call__', downloader_call))
# patches.append(
# unittest.mock.patch.object(
# pip._internal.network.download.BatchDownloader,
# '__call__',
# batch_downloader_call
# )
# )
# patches.append(
# unittest.mock.patch.object(
# pip._internal.resolution.base.BaseResolver, 'resolve', base_resolver_resolve))
patches.append(
unittest.mock.patch.object(
pip._internal.resolution.resolvelib.resolver.Resolver,
"resolve",
base_resolver_resolve,
)
)
patches.append(
unittest.mock.patch.object(
pip._internal.operations.prepare,
"get_http_url",
get_http_url,
)
)
patches.append(
unittest.mock.patch.object(
pip._internal.operations.prepare.RequirementPreparer,
"prepare_linked_requirements_more",
prepare_linked_requirements_more,
)
)
# patches.append(
# unittest.mock.patch.object(
# pip._internal.operations.prepare.RequirementPreparer,
# '_complete_partial_requirements',
# _complete_partial_requirements
# )
# )
patches.append(
unittest.mock.patch.object(
pip._internal.resolution.resolvelib.resolver.Resolver,
'resolve',
base_resolver_resolve,
)
)
patches.append(
unittest.mock.patch.object(
pip._internal.operations.prepare,
'get_http_url',
get_http_url,
)
)
patches.append(
unittest.mock.patch.object(
pip._internal.operations.prepare.RequirementPreparer,
'prepare_linked_requirements_more',
prepare_linked_requirements_more,
)
)
# patches.append(
# unittest.mock.patch.object(
# pip._internal.operations.prepare.RequirementPreparer,
# '_complete_partial_requirements',
# _complete_partial_requirements
# )
# )
with contextlib.ExitStack() as stack:
for p in patches:
stack.enter_context(p)
with contextlib.ExitStack() as stack:
for p in patches:
stack.enter_context(p)
pip._internal.cli.main.main(
[
"download",
"-q",
"--no-cache",
"-d",
"/dev/null",
*argv,
# 'numpy',
]
)
pip._internal.cli.main.main(
[
'download',
'-q',
'--no-cache',
'-d',
'/dev/null',
*argv,
# 'numpy',
]
)
# return sum([
# [
# pip_resolve_t.res_t.download_info_t(
# constraint=k,
# sha256=v.download_info.info.hashes['sha256'],
# url=v.download_info.url,
# )
# for k, v in o.requirements.items()
# ]
# for o in result_requirements
# ], [])
logger.warn(result_requirements)
# return sum([
# [
# pip_resolve_t.res_t.download_info_t(
# constraint=k,
# sha256=v.download_info.info.hashes['sha256'],
# url=v.download_info.url,
# )
# for k, v in o.requirements.items()
# ]
# for o in result_requirements
# ], [])
logger.warn(result_requirements)
res = pip_resolve_t.res_t()
res = pip_resolve_t.res_t()
res.entries = []
res.entries = []
for o in result_requirements:
assert isinstance(o, InstallRequirement)
for o in result_requirements:
assert isinstance(o, InstallRequirement)
sha256_hashes = o.hashes()._allowed["sha256"]
assert len(sha256_hashes) == 1
assert not o.link is None
sha256_hashes = o.hashes()._allowed['sha256']
assert len(sha256_hashes) == 1
assert not o.link is None
res.entries.append(
pip_resolve_t.res_t.download_info_t(
constraint=str(o.req),
sha256=sha256_hashes[0],
url=o.link.url,
)
)
res.entries.append(
pip_resolve_t.res_t.download_info_t(
constraint=str(o.req),
sha256=sha256_hashes[0],
url=o.link.url,
)
)
res.txt = pip_resolve_entries_to_txt(
res.entries
)
res.txt = pip_resolve_entries_to_txt(res.entries)
return res
elif mode is pip_resolve_t.kwargs_t.mode_t.uv_pip_freeze:
assert len(argv) == 0
return res
elif mode is pip_resolve_t.kwargs_t.mode_t.uv_pip_freeze:
assert len(argv) == 0
pip_freeze = subprocess.check_output(
[
sys.executable,
"-m",
"uv",
"pip",
"freeze",
],
).decode('utf-8')
pip_compile = subprocess.check_output(
[
sys.executable, '-m',
'uv', 'pip', 'compile',
'--generate-hashes',
'-',
pip_freeze = subprocess.check_output(
[
sys.executable,
'-m',
'uv',
'pip',
'freeze',
],
).decode('utf-8')
pip_compile = subprocess.check_output(
[
sys.executable,
'-m',
'uv',
'pip',
'compile',
'--generate-hashes',
'-',
],
input=pip_freeze.encode('utf-8'),
).decode('utf-8')
],
input=pip_freeze.encode('utf-8')
).decode('utf-8')
return pip_resolve_t.res_t(
txt=pip_compile,
)
elif mode is pip_resolve_t.kwargs_t.mode_t.uv_pip_compile:
with contextlib.ExitStack() as stack:
if not requirements is None:
# assert len(argv) == 0
return pip_resolve_t.res_t(
txt=pip_compile,
)
elif mode is pip_resolve_t.kwargs_t.mode_t.uv_pip_compile:
with contextlib.ExitStack() as stack:
if not requirements is None:
# assert len(argv) == 0
f = stack.enter_context(
tempfile.NamedTemporaryFile(
suffix='.txt',
)
)
f.write(('\n'.join(requirements)).encode('utf-8'))
f.flush()
f = stack.enter_context(
tempfile.NamedTemporaryFile(
suffix='.txt',
)
)
f.write(
('\n'.join(requirements)).encode('utf-8')
)
f.flush()
argv.append(f.name)
argv.append(f.name)
if argv[0] == '--':
del argv[0]
if argv[0] == '--':
del argv[0]
pip_compile = subprocess.check_output(
[
sys.executable,
'-m',
'uv',
'pip',
'compile',
'--generate-hashes',
*argv,
],
).decode('utf-8')
pip_compile = subprocess.check_output(
[
sys.executable, '-m',
'uv', 'pip', 'compile',
'--generate-hashes',
*argv,
],
).decode('utf-8')
return pip_resolve_t.res_t(
txt=pip_compile,
)
else:
raise NotImplementedError
return pip_resolve_t.res_t(
txt=pip_compile,
)
else:
raise NotImplementedError

@ -6,22 +6,23 @@ from typing import Any
from typing_extensions import Protocol
from abc import abstractmethod
C = typing.TypeVar("C", bound="Comparable")
C = typing.TypeVar('C', bound='Comparable')
class Comparable(Protocol):
@abstractmethod
def __eq__(self, other: Any) -> bool:
pass
@abstractmethod
def __eq__(self, other: Any) -> bool:
pass
@abstractmethod
def __lt__(self: C, other: C) -> bool:
pass
@abstractmethod
def __lt__(self: C, other: C) -> bool:
pass
def __gt__(self: C, other: C) -> bool:
return (not self < other) and self != other
def __gt__(self: C, other: C) -> bool:
return (not self < other) and self != other
def __le__(self: C, other: C) -> bool:
return self < other or self == other
def __le__(self: C, other: C) -> bool:
return self < other or self == other
def __ge__(self: C, other: C) -> bool:
return (not self < other)
def __ge__(self: C, other: C) -> bool:
return not self < other

@ -5,121 +5,107 @@ import pprint
async def f1():
devices = await bleak.BleakScanner.discover()
return devices
devices = await bleak.BleakScanner.discover()
return devices
async def f2(device, timeout=None):
if timeout is None:
timeout = 1.0
if timeout is None:
timeout = 1.0
assert isinstance(timeout, float) and timeout >= 1e-8
assert isinstance(timeout, float) and timeout >= 1e-8
p = await bleak.BleakClient(
device,
timeout=timeout,
).__aenter__()
return p
p = await bleak.BleakClient(
device,
timeout=timeout,
).__aenter__()
return p
async def f3(client):
t1 = [
dict(
service=o.__dict__,
characteristics=[
o2.__dict__
for o2 in o.characteristics
]
)
for o in client.services
]
return t1
t1 = [dict(service=o.__dict__, characteristics=[o2.__dict__ for o2 in o.characteristics]) for o in client.services]
return t1
async def f5(
name_check=None,
name_check=None,
):
t2 = []
t2 = []
attempt = 0
attempt = 0
while True:
t1 = await f1()
pprint.pprint([o.__dict__ for o in t1])
while True:
t1 = await f1()
pprint.pprint([o.__dict__ for o in t1])
if not name_check is None:
assert inspect.isfunction(name_check)
if not name_check is None:
assert inspect.isfunction(name_check)
t5 = {
i : o.details[0].name()
for i, o in enumerate(t1)
}
t5 = {i: o.details[0].name() for i, o in enumerate(t1)}
t2.extend(
[
t1[k]
for k, v in t5.items()
if isinstance(v, str) and name_check(v)
]
)
else:
t2.extend(t1)
t2.extend([t1[k] for k, v in t5.items() if isinstance(v, str) and name_check(v)])
else:
t2.extend(t1)
if len(t2) > 0:
break
if len(t2) > 0:
break
attempt += 1
print('\rattempt #%d' % attempt, end='')
attempt += 1
print('\rattempt #%d' % attempt, end='')
return t2
return t2
async def f4(
timeout=None,
characteristics=None,
operations=None,
name_check=None,
timeout=None,
characteristics=None,
operations=None,
name_check=None,
):
if isinstance(name_check, str):
assert name_check in [
'watch fit',
]
name_check2 = lambda current_name: name_check.lower() in current_name.lower()
else:
name_check2 = name_check
if isinstance(name_check, str):
assert name_check in [
'watch fit',
]
name_check2 = lambda current_name: name_check.lower() in current_name.lower()
else:
name_check2 = name_check
assert not name_check2 is None
assert not name_check2 is None
if characteristics is None:
characteristics = [
'0000ffd1-0000-1000-8000-00805f9b34fb',
]
if characteristics is None:
characteristics = [
'0000ffd1-0000-1000-8000-00805f9b34fb',
]
t2 = await f5(
name_check=name_check2,
)
t2 = await f5(
name_check=name_check2,
)
if len(t2) == 0:
print('not found')
return
if len(t2) == 0:
print('not found')
return
t3 = None
try:
t3 = await f2(t2[0], timeout=timeout)
t4 = await f3(t3)
pprint.pprint(t4)
t3 = None
try:
t3 = await f2(t2[0], timeout=timeout)
t4 = await f3(t3)
pprint.pprint(t4)
if not operations is None and inspect.isfunction(operations):
await operations(
client=t3,
t4=t4,
)
else:
t6 = {}
for o in characteristics:
try:
t7 = await t3.read_gatt_char(o)
except Exception as exception:
print(traceback.format_exc())
t7 = None
t6[o] = t7
pprint.pprint(t6)
finally:
if not t3 is None:
await t3.disconnect()
if not operations is None and inspect.isfunction(operations):
await operations(
client=t3,
t4=t4,
)
else:
t6 = {}
for o in characteristics:
try:
t7 = await t3.read_gatt_char(o)
except Exception as exception:
print(traceback.format_exc())
t7 = None
t6[o] = t7
pprint.pprint(t6)
finally:
if not t3 is None:
await t3.disconnect()

@ -10,162 +10,149 @@ import threading
import cython
import datetime
from typing import (Any, Optional, TypeVar, Type, cast)
from typing import Any, Optional, TypeVar, Type, cast
# from scoping import scoping as s
def test(
_id: int,
T: float,
a: numpy.ndarray[Any, numpy.dtype[numpy.int32]],
) -> None:
with cython.nogil:
#if True:
started_at = datetime.datetime.now()
print('started')
def elapsed() -> float:
return (datetime.datetime.now() - started_at).total_seconds()
#a = 0
while elapsed() < T:
#a += 1
for k in range(1024 * 1024):
a[_id] += 1
print(['done', started_at, elapsed(), a[_id]])
def test(
_id: int,
T: float,
a: numpy.ndarray[Any, numpy.dtype[numpy.int32]],
) -> None:
with cython.nogil:
# if True:
started_at = datetime.datetime.now()
print('started')
def elapsed() -> float:
return (datetime.datetime.now() - started_at).total_seconds()
# a = 0
while elapsed() < T:
# a += 1
for k in range(1024 * 1024):
a[_id] += 1
print(['done', started_at, elapsed(), a[_id]])
M = TypeVar('M', bound=Type[Any])
def build(content: str, module: M) -> M:
import pathlib
import tempfile
import hashlib
import Cython.Build.Inline
import pathlib
import tempfile
import hashlib
import Cython.Build.Inline
sha256sum = hashlib.sha256(content.encode('utf-8')).digest().hex()
sha256sum = hashlib.sha256(content.encode('utf-8')).digest().hex()
output_dir = (pathlib.Path('.') / 'tmp' / 'cython' / sha256sum).absolute()
output_dir = (pathlib.Path('.') / 'tmp' / 'cython' / sha256sum).absolute()
if not output_dir.exists() or True:
os.makedirs(str(output_dir), exist_ok=True)
if not output_dir.exists() or True:
os.makedirs(str(output_dir), exist_ok=True)
source_path = output_dir / ('_%s.pyx' % sha256sum)
if not source_path.exists():
with io.open(str(source_path), 'w') as f:
f.write(content)
source_path = output_dir / ('_%s.pyx' % sha256sum)
if not source_path.exists():
with io.open(str(source_path), 'w') as f:
f.write(content)
t1 = Cython.Build.Inline._get_build_extension()
t1.extensions = Cython.Build.cythonize(str(source_path))
t1.build_temp = str(pathlib.Path('/'))
t1.build_lib = str(output_dir)
# t2 = Cython.Build.Inline.Extension(
# name=sha256sum,
# )
t1.run()
t1 = Cython.Build.Inline._get_build_extension()
t1.extensions = Cython.Build.cythonize(str(source_path))
t1.build_temp = str(pathlib.Path('/'))
t1.build_lib = str(output_dir)
#t2 = Cython.Build.Inline.Extension(
# name=sha256sum,
#)
t1.run()
return cast(M, Cython.Build.Inline.load_dynamic('_%s' % sha256sum, glob.glob(str(output_dir / ('_%s*.so' % sha256sum)))[0]))
return cast(
M,
Cython.Build.Inline.load_dynamic(
'_%s' % sha256sum,
glob.glob(
str(output_dir / ('_%s*.so' % sha256sum))
)[0]
)
)
raise NotImplementedError
raise NotImplementedError
def mypyc_build(file_path: pathlib.Path) -> Any:
import pathlib
import tempfile
import hashlib
import mypyc.build
import Cython.Build.Inline
import pathlib
import tempfile
import hashlib
import mypyc.build
import Cython.Build.Inline
assert isinstance(file_path, pathlib.Path)
assert isinstance(file_path, pathlib.Path)
#sha256sum = hashlib.sha256(content.encode('utf-8')).digest().hex()
# sha256sum = hashlib.sha256(content.encode('utf-8')).digest().hex()
#output_dir = (pathlib.Path('.') / 'tmp' / 'cython' / sha256sum).absolute()
output_dir = pathlib.Path('.') / 'tmp' / 'mypyc'
sha256sum = file_path.stem
lib_pattern = file_path.parent / ('%s.cpython*.so' % sha256sum)
lib_dir = pathlib.Path('.')
# output_dir = (pathlib.Path('.') / 'tmp' / 'cython' / sha256sum).absolute()
output_dir = pathlib.Path('.') / 'tmp' / 'mypyc'
sha256sum = file_path.stem
lib_pattern = file_path.parent / ('%s.cpython*.so' % sha256sum)
lib_dir = pathlib.Path('.')
def lib_path_glob(path: str | pathlib.Path) -> Optional[pathlib.Path]:
res: list[str] = glob.glob(str(path))
def lib_path_glob(path: str | pathlib.Path) -> Optional[pathlib.Path]:
res : list[str] = glob.glob(str(path))
if len(res) == 0:
return None
else:
return pathlib.Path(res[0])
if len(res) == 0:
return None
else:
return pathlib.Path(res[0])
need_build: bool = False
need_build : bool = False
lib_path: Optional[pathlib.Path] = None
lib_path : Optional[pathlib.Path] = None
lib_path = lib_path_glob(lib_pattern)
lib_path = lib_path_glob(lib_pattern)
if not lib_path is None:
t2 = file_path.stat()
t3 = lib_path.stat()
if t3.st_mtime < t2.st_mtime:
need_build = True
if not lib_path is None:
t2 = file_path.stat()
t3 = lib_path.stat()
if t3.st_mtime < t2.st_mtime:
need_build = True
del t2
del t3
else:
need_build = True
del t2
del t3
else:
need_build = True
if need_build:
for o in [
output_dir,
output_dir / 'build' / file_path.parent,
]:
os.makedirs(str(o), exist_ok=True)
# source_path = output_dir / ('_%s.py' % sha256sum)
source_path = file_path
# with io.open(str(source_path), 'w') as f:
# f.write(content)
t1 = Cython.Build.Inline._get_build_extension()
t1.extensions = mypyc.build.mypycify([str(source_path)], target_dir=str(output_dir / 'build'))
t1.build_temp = str(output_dir)
t1.build_lib = str(lib_dir)
# t2 = Cython.Build.Inline.Extension(
# name=sha256sum,
# )
t1.run()
if need_build:
for o in [
output_dir,
output_dir / 'build' / file_path.parent,
]:
os.makedirs(
str(o),
exist_ok=True
)
#source_path = output_dir / ('_%s.py' % sha256sum)
source_path = file_path
#with io.open(str(source_path), 'w') as f:
# f.write(content)
lib_path = lib_path_glob(lib_pattern)
t1 = Cython.Build.Inline._get_build_extension()
t1.extensions = mypyc.build.mypycify(
[str(source_path)],
target_dir=str(output_dir / 'build')
)
t1.build_temp = str(output_dir)
t1.build_lib = str(lib_dir)
#t2 = Cython.Build.Inline.Extension(
# name=sha256sum,
#)
t1.run()
return Cython.Build.Inline.load_dynamic(
#'_%s' % sha256sum,
# t1.extensions[0].name,
file_path.stem,
str(lib_path),
)
lib_path = lib_path_glob(lib_pattern)
raise NotImplementedError
return Cython.Build.Inline.load_dynamic(
#'_%s' % sha256sum,
#t1.extensions[0].name,
file_path.stem,
str(lib_path),
)
raise NotImplementedError
class Source:
@staticmethod
def test2(
_a : numpy.ndarray[Any, numpy.dtype[numpy.int64]],
_id : numpy.dtype[numpy.int32] | int,
T : float=16
) -> int:
raise NotImplementedError
@staticmethod
def test2(_a: numpy.ndarray[Any, numpy.dtype[numpy.int64]], _id: numpy.dtype[numpy.int32] | int, T: float = 16) -> int:
raise NotImplementedError
source = build(r'''
source = build(
r"""
cimport cython
@cython.boundscheck(False)
@ -226,52 +213,52 @@ def test2(long long [:] _a, int _id, double T=16) -> int:
return _a[_id]
''', Source)
""",
Source,
)
def test_cython(N: int=4, T:int=16) -> None:
#a = [0] * N
a = numpy.zeros((N,), dtype=numpy.int64)
t = [
threading.Thread(
target=functools.partial(
source.test2,
a,
k,
T,
)
)
for k in range(N)
]
def test_cython(N: int = 4, T: int = 16) -> None:
# a = [0] * N
a = numpy.zeros((N,), dtype=numpy.int64)
for o in t:
o.start()
for o in t:
o.join()
t = [
threading.Thread(
target=functools.partial(
source.test2,
a,
k,
T,
)
)
for k in range(N)
]
#cython_module['test2'](a, 0)
for o in t:
o.start()
for o in t:
o.join()
def test_mypyc(N: int=4, W:int=35) -> None:
cython2 = mypyc_build(
(pathlib.Path(__file__).parent / 'cython2.py').relative_to(
pathlib.Path.cwd()
)
)
# cython_module['test2'](a, 0)
# from .cython2 import fib
#a = [0] * N
t = [
threading.Thread(
target=functools.partial(
cython2.fib,
W,
)
)
for k in range(N)
]
def test_mypyc(N: int = 4, W: int = 35) -> None:
cython2 = mypyc_build((pathlib.Path(__file__).parent / 'cython2.py').relative_to(pathlib.Path.cwd()))
for o in t:
o.start()
for o in t:
o.join()
# from .cython2 import fib
# a = [0] * N
t = [
threading.Thread(
target=functools.partial(
cython2.fib,
W,
)
)
for k in range(N)
]
for o in t:
o.start()
for o in t:
o.join()

@ -1,10 +1,12 @@
import time
def fib(n: int) -> int:
if n <= 1:
return n
else:
return fib(n - 2) + fib(n - 1)
if n <= 1:
return n
else:
return fib(n - 2) + fib(n - 1)
t0 = time.time()
fib(32)

@ -5,378 +5,334 @@ import os
def kernel_1_sample_scrap(
max_articles=None,
max_articles=None,
):
if max_articles is None:
max_articles = 1
if max_articles is None:
max_articles = 1
with requests.get(
'https://dev.to',
) as p:
t10 = p.content.decode('utf-8')
t11 = pyquery.PyQuery(t10)
t13 = t11('.crayons-story__title > a')
t12 = [
pyquery.PyQuery(o).attr('href')
for o in t13
]
pprint.pprint(t12)
t14 = [
'https://dev.to/%s' % o
for o in t12
]
with requests.get(
'https://dev.to',
) as p:
t10 = p.content.decode('utf-8')
t11 = pyquery.PyQuery(t10)
t13 = t11('.crayons-story__title > a')
t12 = [pyquery.PyQuery(o).attr('href') for o in t13]
pprint.pprint(t12)
t14 = ['https://dev.to/%s' % o for o in t12]
t8 = []
for t7 in t14[:max_articles]:
with requests.get(
t7,
) as p:
t1 = p.content.decode('utf-8')
t2 = pyquery.PyQuery(t1)
t3 = t2('.comment__content')
t6 = []
for o in t3:
t4 = pyquery.PyQuery(o)
t5 = t4('.comment__header > a').attr['href']
t9 = t4('.comment__body').text()
t6.append(
dict(
author=t5,
text=t9,
)
)
t8 = []
for t7 in t14[:max_articles]:
with requests.get(
t7,
) as p:
t1 = p.content.decode('utf-8')
t2 = pyquery.PyQuery(t1)
t3 = t2('.comment__content')
t6 = []
for o in t3:
t4 = pyquery.PyQuery(o)
t5 = t4('.comment__header > a').attr['href']
t9 = t4('.comment__body').text()
t6.append(
dict(
author=t5,
text=t9,
)
)
#pprint.pprint(t3)
pprint.pprint(t6)
t8.append(
dict(
article=t7,
comments=t6,
)
)
# pprint.pprint(t3)
pprint.pprint(t6)
t8.append(
dict(
article=t7,
comments=t6,
)
)
pprint.pprint(t8)
pprint.pprint(t8)
return dict(
t1=t1,
t2=t2,
t3=t3,
t6=t6,
t8=t8,
t12=t12,
)
return dict(
t1=t1,
t2=t2,
t3=t3,
t6=t6,
t8=t8,
t12=t12,
)
def kernel_2():
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras.models import Sequential
from keras.layers.recurrent import LSTM, GRU,SimpleRNN
from keras.layers.core import Dense, Activation, Dropout
from keras.layers.embeddings import Embedding
from keras.layers.normalization import BatchNormalization
from keras.utils import np_utils
from sklearn import preprocessing, decomposition, model_selection, metrics, pipeline
from keras.layers import GlobalMaxPooling1D, Conv1D, MaxPooling1D, Flatten, Bidirectional, SpatialDropout1D
from keras.preprocessing import sequence, text
from keras.callbacks import EarlyStopping
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
from tqdm import tqdm
from sklearn.model_selection import train_test_split
import tensorflow as tf
from keras.models import Sequential
from keras.layers.recurrent import LSTM, GRU, SimpleRNN
from keras.layers.core import Dense, Activation, Dropout
from keras.layers.embeddings import Embedding
from keras.layers.normalization import BatchNormalization
from keras.utils import np_utils
from sklearn import preprocessing, decomposition, model_selection, metrics, pipeline
from keras.layers import GlobalMaxPooling1D, Conv1D, MaxPooling1D, Flatten, Bidirectional, SpatialDropout1D
from keras.preprocessing import sequence, text
from keras.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.pyplot as plt
import seaborn as sns
#%matplotlib inline
from plotly import graph_objs as go
import plotly.express as px
import plotly.figure_factory as ff
# %matplotlib inline
from plotly import graph_objs as go
import plotly.express as px
import plotly.figure_factory as ff
# %% [markdown]
# # Configuring TPU's
#
# For this version of Notebook we will be using TPU's as we have to built a BERT Model
# %% [markdown]
# # Configuring TPU's
#
# For this version of Notebook we will be using TPU's as we have to built a BERT Model
# %% [code]
# Detect hardware, return appropriate distribution strategy
try:
# TPU detection. No parameters necessary if TPU_NAME environment variable is
# set: this is always the case on Kaggle.
tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
print('Running on TPU ', tpu.master())
except ValueError:
tpu = None
# %% [code]
# Detect hardware, return appropriate distribution strategy
try:
# TPU detection. No parameters necessary if TPU_NAME environment variable is
# set: this is always the case on Kaggle.
tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
print('Running on TPU ', tpu.master())
except ValueError:
tpu = None
if tpu:
tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)
strategy = tf.distribute.experimental.TPUStrategy(tpu)
else:
# Default distribution strategy in Tensorflow. Works on CPU and single GPU.
strategy = tf.distribute.get_strategy()
if tpu:
tf.config.experimental_connect_to_cluster(tpu)
tf.tpu.experimental.initialize_tpu_system(tpu)
strategy = tf.distribute.experimental.TPUStrategy(tpu)
else:
# Default distribution strategy in Tensorflow. Works on CPU and single GPU.
strategy = tf.distribute.get_strategy()
print("REPLICAS: ", strategy.num_replicas_in_sync)
print('REPLICAS: ', strategy.num_replicas_in_sync)
# %% [code]
train = pd.read_csv('/kaggle/input/jigsaw-multilingual-toxic-comment-classification/jigsaw-toxic-comment-train.csv')
validation = pd.read_csv('/kaggle/input/jigsaw-multilingual-toxic-comment-classification/validation.csv')
test = pd.read_csv('/kaggle/input/jigsaw-multilingual-toxic-comment-classification/test.csv')
# %% [code]
train = pd.read_csv('/kaggle/input/jigsaw-multilingual-toxic-comment-classification/jigsaw-toxic-comment-train.csv')
validation = pd.read_csv('/kaggle/input/jigsaw-multilingual-toxic-comment-classification/validation.csv')
test = pd.read_csv('/kaggle/input/jigsaw-multilingual-toxic-comment-classification/test.csv')
# %% [markdown]
# We will drop the other columns and approach this problem as a Binary Classification Problem and also we will have our exercise done on a smaller subsection of the dataset(only 12000 data points) to make it easier to train the models
# %% [markdown]
# We will drop the other columns and approach this problem as a Binary Classification Problem and also we will have our exercise done on a smaller subsection of the dataset(only 12000 data points) to make it easier to train the models
# %% [code]
train.drop(['severe_toxic','obscene','threat','insult','identity_hate'],axis=1,inplace=True)
# %% [code]
train.drop(['severe_toxic', 'obscene', 'threat', 'insult', 'identity_hate'], axis=1, inplace=True)
# %% [code]
train = train.loc[:12000,:]
train.shape
# %% [code]
train = train.loc[:12000, :]
train.shape
# %% [markdown]
# We will check the maximum number of words that can be present in a comment , this will help us in padding later
# %% [markdown]
# We will check the maximum number of words that can be present in a comment , this will help us in padding later
# %% [code]
train['comment_text'].apply(lambda x:len(str(x).split())).max()
# %% [code]
train['comment_text'].apply(lambda x: len(str(x).split())).max()
# %% [markdown]
# ### Data Preparation
# %% [markdown]
# ### Data Preparation
# %% [code]
xtrain, xvalid, ytrain, yvalid = train_test_split(
train.comment_text.values, train.toxic.values, stratify=train.toxic.values, random_state=42, test_size=0.2, shuffle=True
)
# %% [code]
xtrain, xvalid, ytrain, yvalid = train_test_split(train.comment_text.values, train.toxic.values,
stratify=train.toxic.values,
random_state=42,
test_size=0.2, shuffle=True)
# %% [markdown]
# # Before We Begin
#
# Before we Begin If you are a complete starter with NLP and never worked with text data, I am attaching a few kernels that will serve as a starting point of your journey
# * https://www.kaggle.com/arthurtok/spooky-nlp-and-topic-modelling-tutorial
# * https://www.kaggle.com/abhishek/approaching-almost-any-nlp-problem-on-kaggle
#
# If you want a more basic dataset to practice with here is another kernel which I wrote:
# * https://www.kaggle.com/tanulsingh077/what-s-cooking
#
# Below are some Resources to get started with basic level Neural Networks, It will help us to easily understand the upcoming parts
# * https://www.youtube.com/watch?v=aircAruvnKk&list=PL_h2yd2CGtBHEKwEH5iqTZH85wLS-eUzv
# * https://www.youtube.com/watch?v=IHZwWFHWa-w&list=PL_h2yd2CGtBHEKwEH5iqTZH85wLS-eUzv&index=2
# * https://www.youtube.com/watch?v=Ilg3gGewQ5U&list=PL_h2yd2CGtBHEKwEH5iqTZH85wLS-eUzv&index=3
# * https://www.youtube.com/watch?v=tIeHLnjs5U8&list=PL_h2yd2CGtBHEKwEH5iqTZH85wLS-eUzv&index=4
#
# For Learning how to visualize test data and what to use view:
# * https://www.kaggle.com/tanulsingh077/twitter-sentiment-extaction-analysis-eda-and-model
# * https://www.kaggle.com/jagangupta/stop-the-s-toxic-comments-eda
# %% [markdown]
# # Before We Begin
#
# Before we Begin If you are a complete starter with NLP and never worked with text data, I am attaching a few kernels that will serve as a starting point of your journey
# * https://www.kaggle.com/arthurtok/spooky-nlp-and-topic-modelling-tutorial
# * https://www.kaggle.com/abhishek/approaching-almost-any-nlp-problem-on-kaggle
#
# If you want a more basic dataset to practice with here is another kernel which I wrote:
# * https://www.kaggle.com/tanulsingh077/what-s-cooking
#
# Below are some Resources to get started with basic level Neural Networks, It will help us to easily understand the upcoming parts
# * https://www.youtube.com/watch?v=aircAruvnKk&list=PL_h2yd2CGtBHEKwEH5iqTZH85wLS-eUzv
# * https://www.youtube.com/watch?v=IHZwWFHWa-w&list=PL_h2yd2CGtBHEKwEH5iqTZH85wLS-eUzv&index=2
# * https://www.youtube.com/watch?v=Ilg3gGewQ5U&list=PL_h2yd2CGtBHEKwEH5iqTZH85wLS-eUzv&index=3
# * https://www.youtube.com/watch?v=tIeHLnjs5U8&list=PL_h2yd2CGtBHEKwEH5iqTZH85wLS-eUzv&index=4
#
# For Learning how to visualize test data and what to use view:
# * https://www.kaggle.com/tanulsingh077/twitter-sentiment-extaction-analysis-eda-and-model
# * https://www.kaggle.com/jagangupta/stop-the-s-toxic-comments-eda
# %% [markdown]
# # Simple RNN
#
# ## Basic Overview
#
# What is a RNN?
#
# Recurrent Neural Network(RNN) are a type of Neural Network where the output from previous step are fed as input to the current step. In traditional neural networks, all the inputs and outputs are independent of each other, but in cases like when it is required to predict the next word of a sentence, the previous words are required and hence there is a need to remember the previous words. Thus RNN came into existence, which solved this issue with the help of a Hidden Layer.
#
# Why RNN's?
#
# https://www.quora.com/Why-do-we-use-an-RNN-instead-of-a-simple-neural-network
#
# ## In-Depth Understanding
#
# * https://medium.com/mindorks/understanding-the-recurrent-neural-network-44d593f112a2
# * https://www.youtube.com/watch?v=2E65LDnM2cA&list=PL1F3ABbhcqa3BBWo170U4Ev2wfsF7FN8l
# * https://www.d2l.ai/chapter_recurrent-neural-networks/rnn.html
#
# ## Code Implementation
#
# So first I will implement the and then I will explain the code step by step
# %% [markdown]
# # Simple RNN
#
# ## Basic Overview
#
# What is a RNN?
#
# Recurrent Neural Network(RNN) are a type of Neural Network where the output from previous step are fed as input to the current step. In traditional neural networks, all the inputs and outputs are independent of each other, but in cases like when it is required to predict the next word of a sentence, the previous words are required and hence there is a need to remember the previous words. Thus RNN came into existence, which solved this issue with the help of a Hidden Layer.
#
# Why RNN's?
#
# https://www.quora.com/Why-do-we-use-an-RNN-instead-of-a-simple-neural-network
#
# ## In-Depth Understanding
#
# * https://medium.com/mindorks/understanding-the-recurrent-neural-network-44d593f112a2
# * https://www.youtube.com/watch?v=2E65LDnM2cA&list=PL1F3ABbhcqa3BBWo170U4Ev2wfsF7FN8l
# * https://www.d2l.ai/chapter_recurrent-neural-networks/rnn.html
#
# ## Code Implementation
#
# So first I will implement the and then I will explain the code step by step
# %% [code]
# using keras tokenizer here
token = text.Tokenizer(num_words=None)
max_len = 1500
# %% [code]
# using keras tokenizer here
token = text.Tokenizer(num_words=None)
max_len = 1500
token.fit_on_texts(list(xtrain) + list(xvalid))
xtrain_seq = token.texts_to_sequences(xtrain)
xvalid_seq = token.texts_to_sequences(xvalid)
token.fit_on_texts(list(xtrain) + list(xvalid))
xtrain_seq = token.texts_to_sequences(xtrain)
xvalid_seq = token.texts_to_sequences(xvalid)
# zero pad the sequences
xtrain_pad = sequence.pad_sequences(xtrain_seq, maxlen=max_len)
xvalid_pad = sequence.pad_sequences(xvalid_seq, maxlen=max_len)
#zero pad the sequences
xtrain_pad = sequence.pad_sequences(xtrain_seq, maxlen=max_len)
xvalid_pad = sequence.pad_sequences(xvalid_seq, maxlen=max_len)
word_index = token.word_index
word_index = token.word_index
# %% [code]
# %%time
with strategy.scope():
# A simpleRNN without any pretrained embeddings and one dense layer
model = Sequential()
model.add(Embedding(len(word_index) + 1, 300, input_length=max_len))
model.add(SimpleRNN(100))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
# %% [code]
#%%time
with strategy.scope():
# A simpleRNN without any pretrained embeddings and one dense layer
model = Sequential()
model.add(Embedding(len(word_index) + 1,
300,
input_length=max_len))
model.add(SimpleRNN(100))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()
model.summary()
return dict(
model=model,
xtrain_pad=xtrain_pad,
strategy=strategy,
xvalid_pad=xvalid_pad,
xtrain_seq=xtrain_seq,
token=token,
max_len=max_len,
xtrain=xtrain,
xvalid=xvalid,
ytrain=ytrain,
yvalid=yvalid,
)
return dict(
model=model,
xtrain_pad=xtrain_pad,
strategy=strategy,
xvalid_pad=xvalid_pad,
xtrain_seq=xtrain_seq,
token=token,
max_len=max_len,
xtrain=xtrain,
xvalid=xvalid,
ytrain=ytrain,
yvalid=yvalid,
)
def kernel_3(
o_2,
nb_epochs=None,
o_2,
nb_epochs=None,
):
if nb_epochs is None:
nb_epochs = 5
if nb_epochs is None:
nb_epochs = 5
# %% [markdown]
# Writing a function for getting auc score for validation
# %% [markdown]
# Writing a function for getting auc score for validation
# %% [code]
def roc_auc(predictions,target):
import sklearn.metrics
'''
# %% [code]
def roc_auc(predictions, target):
import sklearn.metrics
"""
This methods returns the AUC Score when given the Predictions
and Labels
'''
"""
fpr, tpr, thresholds = sklearn.metrics.roc_curve(target, predictions)
roc_auc = sklearn.metrics.auc(fpr, tpr)
return roc_auc
fpr, tpr, thresholds = sklearn.metrics.roc_curve(target, predictions)
roc_auc = sklearn.metrics.auc(fpr, tpr)
return roc_auc
# %% [code]
if os.path.exists('model.h5'):
o_2['model'].load_weights('model.h5')
else:
o_2['model'].fit(
o_2['xtrain_pad'],
o_2['ytrain'],
nb_epoch=nb_epochs,
batch_size=64*o_2['strategy'].num_replicas_in_sync
) #Multiplying by Strategy to run on TPU's
o_2['model'].save_weights('model.h5')
# %% [code]
if os.path.exists('model.h5'):
o_2['model'].load_weights('model.h5')
else:
o_2['model'].fit(
o_2['xtrain_pad'], o_2['ytrain'], nb_epoch=nb_epochs, batch_size=64 * o_2['strategy'].num_replicas_in_sync
) # Multiplying by Strategy to run on TPU's
o_2['model'].save_weights('model.h5')
# %% [code]
scores = o_2['model'].predict(o_2['xvalid_pad'])
print(
"Auc: %.2f%%" % (
roc_auc(
scores,
o_2['yvalid']
)
)
)
# %% [code]
scores = o_2['model'].predict(o_2['xvalid_pad'])
print('Auc: %.2f%%' % (roc_auc(scores, o_2['yvalid'])))
# %% [code]
scores_model = []
scores_model.append(
{
'Model': 'SimpleRNN',
'AUC_Score': roc_auc(
scores,
o_2['yvalid']
)
}
)
# %% [code]
scores_model = []
scores_model.append({'Model': 'SimpleRNN', 'AUC_Score': roc_auc(scores, o_2['yvalid'])})
# %% [markdown]
# ## Code Explanantion
# * Tokenization<br><br>
# So if you have watched the videos and referred to the links, you would know that in an RNN we input a sentence word by word. We represent every word as one hot vectors of dimensions : Numbers of words in Vocab +1. <br>
# What keras Tokenizer does is , it takes all the unique words in the corpus,forms a dictionary with words as keys and their number of occurences as values,it then sorts the dictionary in descending order of counts. It then assigns the first value 1 , second value 2 and so on. So let's suppose word 'the' occured the most in the corpus then it will assigned index 1 and vector representing 'the' would be a one-hot vector with value 1 at position 1 and rest zereos.<br>
# Try printing first 2 elements of xtrain_seq you will see every word is represented as a digit now
# %% [markdown]
# ## Code Explanantion
# * Tokenization<br><br>
# So if you have watched the videos and referred to the links, you would know that in an RNN we input a sentence word by word. We represent every word as one hot vectors of dimensions : Numbers of words in Vocab +1. <br>
# What keras Tokenizer does is , it takes all the unique words in the corpus,forms a dictionary with words as keys and their number of occurences as values,it then sorts the dictionary in descending order of counts. It then assigns the first value 1 , second value 2 and so on. So let's suppose word 'the' occured the most in the corpus then it will assigned index 1 and vector representing 'the' would be a one-hot vector with value 1 at position 1 and rest zereos.<br>
# Try printing first 2 elements of xtrain_seq you will see every word is represented as a digit now
# %% [code]
o_2['xtrain_seq'][:1]
# %% [code]
o_2['xtrain_seq'][:1]
def kernel_4(
o_2,
input_texts=None,
o_2,
input_texts=None,
):
import keras.preprocessing.sequence
import keras.preprocessing.sequence
if input_texts is None:
input_texts = [
'blahb blahb blah',
'Hello World!',
'This is very good!',
'A very non toxic comment! This is so polite and polished one!'
]
if input_texts is None:
input_texts = ['blahb blahb blah', 'Hello World!', 'This is very good!', 'A very non toxic comment! This is so polite and polished one!']
t6 = []
for o in input_texts:
t1 = o
t2 = o_2['token'].texts_to_sequences(
[t1],
)
t3 = keras.preprocessing.sequence.pad_sequences(
t2,
maxlen=o_2['max_len']
)
t4 = o_2['model'].predict(
t3,
)
t6.append(
dict(
text=o,
score=t4[0][0],
)
)
pprint.pprint(
dict(
t1=t1,
t2=t2,
t3=t3,
t4=t4,
)
)
pprint.pprint(t6)
t6 = []
for o in input_texts:
t1 = o
t2 = o_2['token'].texts_to_sequences(
[t1],
)
t3 = keras.preprocessing.sequence.pad_sequences(t2, maxlen=o_2['max_len'])
t4 = o_2['model'].predict(
t3,
)
t6.append(
dict(
text=o,
score=t4[0][0],
)
)
pprint.pprint(
dict(
t1=t1,
t2=t2,
t3=t3,
t4=t4,
)
)
pprint.pprint(t6)
return dict(
t6=t6,
)
return dict(
t6=t6,
)
def kernel_5(
o_1=None,
o_2=None,
o_1=None,
o_2=None,
):
if o_1 is None:
o_1 = kernel_1_sample_scrap(max_articles=50)
if o_1 is None:
o_1 = kernel_1_sample_scrap(max_articles=50)
if o_2 is None:
o_2 = kernel_2()
o_3 = kernel_3(
o_2=o_2,
nb_epochs=1
)
if o_2 is None:
o_2 = kernel_2()
o_3 = kernel_3(o_2=o_2, nb_epochs=1)
t1 = sum(
[
[
o['text'] for o in o2['comments']
] for o2 in o_1['t8']
], []
)
t1 = sum([[o['text'] for o in o2['comments']] for o2 in o_1['t8']], [])
t2 = kernel_4(
o_2=o_2,
input_texts=t1
)
t2 = kernel_4(o_2=o_2, input_texts=t1)
t3 = sorted(
t2['t6'],
key=lambda x: x['score'],
)
pprint.pprint(t3)
t3 = sorted(
t2['t6'],
key=lambda x: x['score'],
)
pprint.pprint(t3)

File diff suppressed because it is too large Load Diff

@ -3,34 +3,34 @@ import unittest
class TestCrypto(unittest.TestCase):
def test_password_utils(self) -> None:
salt = b'asdfasdfasdf'
def test_password_utils(self) -> None:
salt = b'asdfasdfasdf'
secret = 'blah'
secret = 'blah'
hash_res = crypto.PasswordUtils.secret_hash(
secret,
mode='bytes',
salt=salt,
)
self.assertEqual(
hash_res,
(
salt,
b'\xdak\xd15\xfa\x8e\xc8\r\xc3\xd2c\xf1m\xb0\xbf\xe6\x98\x01$!j\xc8\xc0Hh\x84\xea,\x91\x8b\x08\xce',
),
)
hash_res = crypto.PasswordUtils.secret_hash(
secret,
mode='bytes',
salt=salt,
)
self.assertEqual(
hash_res,
(
salt,
b'\xdak\xd15\xfa\x8e\xc8\r\xc3\xd2c\xf1m\xb0\xbf\xe6\x98\x01$!j\xc8\xc0Hh\x84\xea,\x91\x8b\x08\xce',
),
)
check_res = crypto.PasswordUtils.secret_check(
secret,
*hash_res,
)
check_res = crypto.PasswordUtils.secret_check(
secret,
*hash_res,
)
self.assertTrue(check_res)
self.assertTrue(check_res)
self.assertFalse(
crypto.PasswordUtils.secret_check(
secret + 'asdfasdfsdf',
*hash_res,
)
)
self.assertFalse(
crypto.PasswordUtils.secret_check(
secret + 'asdfasdfsdf',
*hash_res,
)
)