Compare commits

..

No commits in common. "f9f18df7371fba2f16ed1bf42666ed63ca90b074" and "560e74886773281b0a0e6da8202d374eb441f498" have entirely different histories.

19 changed files with 101 additions and 784 deletions

3
.gitignore vendored

@ -3,6 +3,3 @@ __pycache__
d2/book1/books
.DS_Store
.vim
*.so
.mypy_cache
.ruff_cache

@ -1,5 +0,0 @@
[mypy]
mypy_path =
python/stubs
plugins =
numpy.typing.mypy_plugin

246
_m.py

@ -1,246 +0,0 @@
#!/usr/bin/env python3
#vim: set filetype=python
import logging
import json
import enum
import pathlib
import sys
import argparse
#import optparse
import subprocess
import os
from typing import (Optional, Any, TypeAlias, Literal, cast,)
logger = logging.getLogger()
def js(argv):
return subprocess.check_call([
'sudo',
'docker-compose',
'--project-directory',
os.path.abspath(
os.path.dirname(__file__),
),
'-f',
os.path.abspath(
os.path.join(
os.path.dirname(__file__),
'docker', 'js',
'docker-compose.yml',
)
),
*argv,
])
def env(
argv: Optional[list[str]] = None,
**kwargs: Any,
) -> Optional[subprocess.CompletedProcess]:
env_path = pathlib.Path(__file__).parent / 'tmp' / 'env3'
if not env_path.exists():
subprocess.check_call([
sys.executable, '-m', 'venv',
'--system-site-packages',
str(env_path)
])
subprocess.check_call([
env_path / 'bin' / 'python3',
'-m', 'pip',
'install', '-r', 'requirements.txt',
])
if not argv is None:
return subprocess.run([
str(env_path / 'bin' / 'python3'),
*argv,
], **kwargs)
return None
def ruff(argv: list[str]) -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
dest='paths',
help='specify paths to check',
default=[],
action='append',
)
parser.add_argument(
'-e',
dest='exclude',
help='rules to ignore',
default=[],
action='append',
)
options, args = parser.parse_known_args(argv)
if len(options.paths) == 0:
options.paths.extend([
'.',
'dotfiles/.local/bin/commands',
])
if len(options.exclude) == 0:
options.exclude.extend([
'E731',
'E713',
'E714',
'E703',
])
res = env([
'-m',
'ruff',
'check',
*args,
'--output-format', 'json',
'--ignore', ','.join(options.exclude),
*options.paths,
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert not res is None
errors = json.loads(res.stdout.decode('utf-8'))
g: dict[str, Any] = dict()
for o in errors:
if not o['filename'] in g:
g[o['filename']] = []
g[o['filename']].append(o)
h = {
k : len(v)
for k, v in g.items()
}
logger.info(json.dumps(errors, indent=4))
logger.info(json.dumps(h, indent=4))
def mypy(argv: list[str]) -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
'-i',
dest='paths',
help='specify paths to check',
default=[],
action='append',
)
options, args = parser.parse_known_args(argv)
if len(options.paths) == 0:
options.paths.extend([
'dotfiles/.local/bin/commands',
'python',
'm.py',
])
res = env([
'-m',
'mypy',
'--strict',
'-O',
'json',
*args,
*options.paths,
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert not res is None
errors = [
json.loads(o)
for o in res.stdout.decode('utf-8').splitlines()
]
g : dict[str, Any] = dict()
for o in errors:
if not o['file'] in g:
g[o['file']] = []
g[o['file']].append(o)
h = {
k : len(v)
for k, v in g.items()
}
logger.info(json.dumps(errors, indent=4))
logger.info(json.dumps(h, indent=4))
def inside_env() -> bool:
try:
import numpy
return True
except Exception:
return False
#class Commands(enum.StrEnum):
# js = 'js'
# mypy = 'mypy'
# env = 'env'
# ruff = 'ruff'
# m2 = 'm2'
Command_args = ['js', 'mypy', 'env', 'ruff', 'm2']
Command : TypeAlias = Literal['js', 'mypy', 'env', 'ruff', 'm2']
def run(argv: Optional[list[str]] = None) -> None:
logging.basicConfig(
level=logging.INFO,
format=(
'%(levelname)s:%(name)s:%(message)s'
':%(process)d'
':%(asctime)s'
':%(pathname)s:%(funcName)s:%(lineno)s'
),
)
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser()
parser.add_argument(
'command',
#'_command',
choices=[
o
for o in Command_args
],
#required=True,
)
options, args = parser.parse_known_args(argv)
assert options.command in cast(list[str], Command_args)
#options.command = Commands(options._command)
if options.command == 'js':
js(args)
elif options.command == 'env':
env(args)
elif options.command == 'mypy':
mypy(args)
elif options.command == 'ruff':
ruff(args)
elif options.command == 'm2':
if not inside_env():
env(['--', '_m.py', 'm2', *args])
return
import python.tasks.cython
python.tasks.cython.mypyc_build(
pathlib.Path('_m.py')
)
else:
raise NotImplementedError
if __name__ == '__main__':
run(sys.argv[1:])

@ -1,6 +1,5 @@
#!/usr/bin/env python3
import asyncio
import argparse
import enum
import threading
import shutil
@ -22,8 +21,7 @@ import sys
import tempfile
import time
import traceback
from typing import (Literal, Optional, TypedDict, Callable, Generator, TypeAlias, Any,)
from typing import (Literal, Optional,)
logger = logging.getLogger(__name__)
@ -73,30 +71,14 @@ def custom_notify(
msg[-128:]
])
class intercept_output_t:
class line_res_t(TypedDict):
aggregated: bool
line: bytes
class realtime_res_t(TypedDict):
aggregated: bool
data: bytes
class aggregated_res_t(TypedDict):
aggregated: bool
data: bytes
returncode: Optional[int]
res_t: TypeAlias = line_res_t | realtime_res_t | aggregated_res_t
def intercept_output(
current_subprocess: subprocess.Popen[bytes],
return_aggregated: Optional[bool]=None,
transform_callback: Optional[Callable[[bytes], Optional[bytes]]] =None,
real_time: Optional[bool]=None,
timeout: Optional[float]=None,
need_lines: Optional[bool]=None,
) -> Generator[intercept_output_t.res_t, None, None,]:
current_subprocess,
return_aggregated=None,
transform_callback=None,
real_time=None,
timeout=None,
need_lines=None,
):
if real_time is None:
real_time = False
@ -106,16 +88,11 @@ def intercept_output(
return_aggregated = False
t1 = select.poll()
assert not current_subprocess.stdout is None
assert isinstance(current_subprocess.stdout, io.BufferedReader)
t1.register(current_subprocess.stdout, select.POLLIN)
#print([current_subprocess, current_subprocess.poll()])
output: list[bytes] = []
buffer: collections.deque[bytes] = collections.deque()
buffer_lines: collections.deque[bytes] = collections.deque()
output = []
buffer = collections.deque()
buffer_lines = collections.deque()
last_data = None
@ -131,10 +108,7 @@ def intercept_output(
if len(t2) == 1 and (t2[0][1] & select.POLLIN) > 0 and \
not (isinstance(last_data, bytes) and len(last_data) == 0):
t3 = current_subprocess.stdout.peek()
t4 = current_subprocess.stdout.read(len(t3))
assert isinstance(t4, int)
last_data = t3
output.append(t3)
if need_lines:
@ -185,7 +159,7 @@ def intercept_output(
returncode=current_subprocess.poll(),
)
def player_metadata() -> Optional[str]:
def player_metadata():
for k in range(20):
try:
metadata = {
@ -194,17 +168,10 @@ def player_metadata() -> Optional[str]:
}
return '%s - %s' % (metadata['artist'], metadata['title'])
time.sleep(1.0)
except Exception:
except:
continue
return None
class memory_stats_t:
class res_t(TypedDict):
mem_total: int
mem_used: int
def memory_stats() -> memory_stats_t.res_t:
def memory_stats():
if sys.platform == 'linux':
with io.BytesIO(
subprocess.check_output(
@ -258,8 +225,8 @@ def memory_stats() -> memory_stats_t.res_t:
raise NotImplementedError
def chrome(
argv: list[str]
) -> int:
argv
):
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = optparse.OptionParser()
parser.add_option(
@ -281,7 +248,7 @@ def chrome(
#assert os.path.exists(options.user_data_dir)
if sys.platform == 'linux':
return subprocess.check_call([
subprocess.check_call([
'google-chrome-stable',
'--enable-features=useOzonePlatform',
'--ozone-platform=wayland',
@ -292,7 +259,7 @@ def chrome(
else:
raise NotImplementedError
def eternal_oom(argv: list[str]) -> None:
def eternal_oom(argv):
import signal
import re
import time
@ -379,7 +346,7 @@ def eternal_oom(argv: list[str]) -> None:
t1 = subprocess.check_output([
'pgrep', '-a', '-f', p[0]
]).decode('utf-8')
except Exception:
except:
continue
t2 = t1.splitlines()
if not p[1] is None:
@ -412,15 +379,12 @@ def eternal_oom(argv: list[str]) -> None:
)
return
cpu_count = os.cpu_count()
assert isinstance(cpu_count, int)
if options.period is None:
options.period = 1
if options.memory_limit is None:
options.memory_limit = 3 * 1024 * 1024
if options.cpu_limit is None:
options.cpu_limit = 0.6 * cpu_count
options.cpu_limit = 0.6 * os.cpu_count()
if options.cpu_wait is None:
options.cpu_wait = 10
if options.mean_size is None:
@ -434,40 +398,24 @@ def eternal_oom(argv: list[str]) -> None:
and options.memory_limit > 512 * 1024
assert isinstance(options.cpu_limit, float) \
and options.cpu_limit > 0.2 * cpu_count and \
options.cpu_limit < cpu_count * 0.95
and options.cpu_limit > 0.2 * os.cpu_count() and \
options.cpu_limit < os.cpu_count() * 0.95
assert options.period >= 1
assert options.cpu_wait >= 10
assert options.mean_size >= 16
def pandas_data_frame(
lines: list[str],
groups_regex: re.Pattern[str],
header_regex: re.Pattern[str],
extra_columns: dict[
str,
Callable[
[dict[str, str]],
Any
def pandas_data_frame(lines, groups_regex, header_regex, extra_columns):
header = re.compile(header_regex).search(lines[0]).groups()
rows = [
re.compile(groups_regex).search(row).groups()
for row in lines[1:]
]
],
) -> dict[str, list[Any]]:
header_match = re.compile(header_regex).search(lines[0])
assert not header_match is None
header = header_match.groups()
rows = []
for line in lines[1:]:
row_match = re.compile(groups_regex).search(line)
assert not row_match is None
rows.append(row_match.groups())
columns: dict[str, list[Any]] = {
columns = {
column: []
for column in header
}
for row in rows:
for value, column in zip(row, header):
columns[column].append(value)
@ -726,7 +674,7 @@ def eternal_oom(argv: list[str]) -> None:
pid,
))
os.kill(pid, signal.SIGKILL)
except Exception:
except:
logging.error(traceback.format_exc())
custom_notify(
msg='oom_kill, failed to kill pid %d' % pid
@ -928,7 +876,7 @@ def resilient_vlc(stream=None):
t2 = p.wait(timeout=1)
print(t2)
break
except Exception:
except:
print('shit')
pass
time.sleep(1.0)
@ -1073,7 +1021,7 @@ def eternal_firefox(
swaymsg '[pid="%d"] kill'
''' % (p.pid,)) == 0
break
except Exception:
except:
import traceback
import pprint
pprint.pprint(traceback.format_exc())
@ -1170,7 +1118,7 @@ def http_server(argv):
'ping', '-w', '1',
options.host
])
except Exception:
except:
raise RuntimeError('invalid ip address %s' % options.host)
@ -1425,7 +1373,7 @@ def pass_ssh_osx(argv):
t3 = int(t2)
assert t3 >= 0 and t3 < len(t1)
break
except AssertionError:
except:
continue
command = r'''
@ -1568,7 +1516,7 @@ def pass_ssh_osx(argv):
print('\rcleared cliboard\n', end='')
def player_v1(folder_url, item_id):
#import sys
import sys
import urllib.parse
import re
import subprocess
@ -1589,16 +1537,10 @@ def player_v1(folder_url, item_id):
t7 = t5[k]
t9 = urllib.parse.unquote(os.path.split(t7)[1])
progress_bar.set_description('%03d %s' % (k, t9))
with subprocess.Popen([
'ffprobe',
'-hide_banner',
'-i',
t7
], stderr=subprocess.PIPE, stdout=subprocess.PIPE) as p:
with subprocess.Popen(['ffprobe', '-hide_banner', '-i', t7], stderr=subprocess.PIPE, stdout=subprocess.PIPE) as p:
p.wait()
assert p.returncode == 0
t8 = p.stderr.read().decode('utf-8')
assert isinstance(t8, str)
#print(t8)
with subprocess.Popen(['ffplay', '-hide_banner', '-nodisp', '-autoexit', '-loop', '1', t7], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) as p:
p.wait()
@ -1685,7 +1627,6 @@ def pm_service(argv):
r'service:AppleMultitouchDevice product:Apple Internal Keyboard / Trackpad '
r'eventType:11" age:00:00:00 id:38654742889 [System: PrevIdle DeclUser kDisp]'
)
assert isinstance(sample, str)
action = None
with subprocess.Popen(['log', 'stream'], stdout=subprocess.PIPE) as p:
@ -1743,7 +1684,7 @@ def pm_service(argv):
print('')
def scrap_yt_music(argv: list[str]) -> None:
def scrap_yt_music(argv: Iterable[str]) -> None:
parser = optparse.OptionParser()
parser.add_option(
'--verbose',
@ -1859,21 +1800,13 @@ def scrap_yt_music(argv: list[str]) -> None:
stderr=subprocess.DEVNULL,
)
logging.info(json.dumps(dict(output_name=output_name)))
except Exception:
except:
logging.error(traceback.format_exc())
finally:
if not p is None:
p.terminate()
class Context(TypedDict):
http_on_event: Callable[..., None]
shutdown: bool
workers: list[threading.Thread]
track_cv: threading.Condition
main_cv: threading.Condition
track_name: Optional[str]
context: Context = dict(
context = dict(
http_on_event=lambda *args, **kwargs: None,
shutdown=False,
workers=[],
@ -2050,14 +1983,14 @@ def desktop_services(argv):
def vlc_is_playing_fullscreen(cls):
import subprocess
import json
#import sys
#import pprint
import sys
import pprint
t2 = []
try:
t1 = subprocess.check_output(['swaymsg', '-t', 'get_tree']).decode('utf-8')
t2 = json.loads(t1)
except Exception:
except:
logging.error(traceback.format_exc())
def walk(o, cb):
@ -2091,7 +2024,7 @@ def desktop_services(argv):
t4 = subprocess.check_output([
'playerctl', '-p', 'vlc', 'status'
], timeout=1,).decode('utf-8').strip() == 'Playing'
except Exception:
except:
logging.error(traceback.format_exc())
#pprint.pprint(t3)
@ -2185,7 +2118,7 @@ def desktop_services(argv):
),
end=''
)
except Exception:
except:
logging.error(traceback.format_exc())
class Backlight:
@ -2221,7 +2154,7 @@ def desktop_services(argv):
]
return any([o['dpms'] for o in t4])
except Exception:
except:
return True
def check(self):
@ -2250,7 +2183,7 @@ def desktop_services(argv):
)))
Backlight.disable()
self.dpms = new_dpms
except Exception:
except:
logging.error(traceback.format_exc())
@classmethod
@ -2929,14 +2862,14 @@ echo 6500 > /sys/bus/platform/devices/applesmc.768/fan1_output;
o.check()
time.sleep(0.1)
except Exception:
except:
logging.error(traceback.format_exc())
finally:
for o in services:
try:
o.terminate()
o.wait(timeout=10)
except Exception:
except:
logging.error(traceback.format_exc())
logging.error('killed %s' % str(o.__dict__))
o.kill()
@ -2946,7 +2879,7 @@ def suspend_timer(argv):
import datetime;
import subprocess;
import time;
#import sys;
import sys;
if len(argv) == 0:
print("enter HH:MM");
t3 = input().strip()
@ -3186,7 +3119,7 @@ def socat_ssh(argv):
try:
p.terminate()
p.wait(10)
except Exception:
except:
p.kill()
restart = False
@ -3250,10 +3183,10 @@ def share_wifi(argv):
shell=True
).decode('utf-8')
if len(pw) == 0:
pw = subprocess.check_output(
p2 = subprocess.check_output(
'pwgen -syn 20 1',
shell=True
).decode('utf-8').strip()
).decode('utf-8')
with subprocess.Popen(
['qrencode', '-t', 'UTF8'],
stdin=subprocess.PIPE
@ -3274,8 +3207,6 @@ def share_wifi(argv):
def on_interrupt(sig, *args, **kwargs):
nonlocal restart
nonlocal shutdown
if sig == signal.SIGINT:
restart = True
elif sig == signal.SIGTERM:
@ -3317,7 +3248,7 @@ def share_wifi(argv):
)
try:
hostapd.wait(20)
except Exception:
except:
hostapd.terminate()
restart = False
@ -3333,7 +3264,7 @@ def share_wifi(argv):
restart = True
last_timestamp = datetime.datetime.now()
except Exception:
except:
print(traceback.format_exc())
restart = True
finally:
@ -3409,7 +3340,7 @@ printf '% 3.0f%%' $(upower -d | grep -Po 'percentage:\\s+\\d+(\\.\\d+)?%' | grep
config.update(
json.load(f)
)
except Exception:
except:
logging.error(traceback.format_exc())
pass
@ -3439,7 +3370,7 @@ printf '% 3.0f%%' $(upower -d | grep -Po 'percentage:\\s+\\d+(\\.\\d+)?%' | grep
timeout=timeout2,
).decode('utf-8').strip()
)
except Exception:
except:
t1.append('fail %d' % sh_index)
t3 = ' | '.join(t1).replace('\n\r', '')
@ -3469,16 +3400,15 @@ def custom_translate(current_string, check, none_char=None,):
def media_keys(argv):
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = argparse.ArgumentParser()
parser.add_argument(
#'-c', '--command',
'command',
#dest='command',
parser = optparse.OptionParser()
parser.add_option(
'--command',
dest='command',
type=str,
default=None,
)
options, args = parser.parse_known_args(argv)
options, args = parser.parse_args(argv)
if options.command is None and len(args) > 0:
assert len(args) == 1
@ -3487,8 +3417,6 @@ def media_keys(argv):
assert options.command in [
'media-play-pause',
'media-next',
'media-forward-seconds',
'media-backward-seconds',
'media-prev',
'media-lower-volume',
'media-raise-volume',
@ -3508,12 +3436,10 @@ def media_keys(argv):
def mocp_info() -> str:
t1 = subprocess.check_output(['mocp', '-i'])
t3 = t1.decode('utf-8')
t2 : dict[str, str] = dict([
(lambda o2: (o2[0], o2[1]))(o.split(':'))
#tuple(o.split(':')[:2])
t2 = dict([
tuple(o.split(':')[:2])
for o in t3.splitlines()
])
return t2['Title'].strip()[:128]
if is_mocp():
@ -3539,32 +3465,6 @@ def media_keys(argv):
msg = player_metadata()
else:
raise NotImplementedError
elif options.command == 'media-backward-seconds':
if mode == 'mocp':
raise NotImplementedError
elif mode == 'playerctl':
pos = float(subprocess.check_output(['playerctl', 'position']).decode('utf-8'))
subprocess.check_call([
'playerctl',
'position',
'%f' % (pos - float(args[0]))
])
#msg = player_metadata()
else:
raise NotImplementedError
elif options.command == 'media-forward-seconds':
if mode == 'mocp':
raise NotImplementedError
elif mode == 'playerctl':
pos = float(subprocess.check_output(['playerctl', 'position']).decode('utf-8'))
subprocess.check_call([
'playerctl',
'position',
'%f' % (pos + float(args[0]))
])
#msg = player_metadata()
else:
raise NotImplementedError
elif options.command == 'media-prev':
if mode == 'mocp':
subprocess.check_call(['mocp', '--previous'])
@ -3693,7 +3593,7 @@ def commands_cli() -> None:
raise NotImplementedError
except SystemExit:
pass
except Exception:
except:
msg = 'not implemented\n%s' % traceback.format_exc()
logging.error(msg)

1
m

@ -1 +0,0 @@
m.py

37
m Executable file

@ -0,0 +1,37 @@
#!/usr/bin/env python3
#vim: set filetype=python
import sys
import optparse
import subprocess
import os
def js(argv):
return subprocess.check_call([
'sudo',
'docker-compose',
'--project-directory',
os.path.abspath(
os.path.dirname(__file__),
),
'-f',
os.path.abspath(
os.path.join(
os.path.dirname(__file__),
'docker', 'js',
'docker-compose.yml',
)
),
*argv,
])
if len(sys.argv) > 1 and sys.argv[1] == 'js':
js(sys.argv[2:])
elif len(sys.argv) > 1 and sys.argv[1] == 'mypy':
subprocess.check_call([
'mypy',
'--strict',
'dotfiles/.local/bin/commands',
])
else:
raise NotImplementedError

3
m.py

@ -1,3 +0,0 @@
#!/usr/bin/env python3
import _m
_m.run()

@ -1,6 +0,0 @@
import distutils.command
from typing import (Any,)
def _get_build_extension() -> distutils.command.build_ext: ...
def load_dynamic(name: str, path: str) -> Any: ...

@ -1,17 +0,0 @@
import setuptools.extension
from typing import (Iterable,)
def cythonize(
module_list: str | Iterable[str]
#module_list,
#exclude=None,
#nthreads=0,
#aliases=None,
#quiet=False,
#force=None,
#language=None,
#exclude_failures=False,
#show_all_warnings=False,
#**options
) -> list[setuptools.extension.Extension]: ...

@ -1,7 +0,0 @@
from typing import (Type, Any,)
class NoGIL:
def __enter__(self): ...
def __exit__(self, exc_class: Type[Exception], exc: Exception, tb: Any) -> None: ...
nogil = NoGIL()

@ -1,14 +0,0 @@
import setuptools.extension
import pathlib
class build_ext:
extensions : list[setuptools.extension.Extension]
#build_temp : pathlib.Path
#build_lib: pathlib.Path
build_temp: str
build_lib: str
def run(self) -> None:
...
...

@ -1,18 +0,0 @@
import setuptools.extension
from typing import (Any, Iterable,)
def mypycify(
paths: 'list[str]',
*,
only_compile_paths: 'Iterable[str] | None' = None,
verbose: 'bool' = False,
opt_level: 'str' = '3',
debug_level: 'str' = '1',
strip_asserts: 'bool' = False,
multi_file: 'bool' = False,
separate: 'bool | list[tuple[list[str], str | None]]' = False,
skip_cgen_input: 'Any | None' = None,
target_dir: 'str | None' = None,
include_runtime_files: 'bool | None' = None,
) -> 'list[setuptools.extension.Extension]': ...

@ -1,8 +0,0 @@
from typing import (Self, Any)
class tqdm:
def __enter__(self) -> Self: ...
def __exit__(self, args: Any) -> None: ...
def update(self, delta: int) -> None: ...
def set_description(self, description: str) -> None: ...

@ -1,276 +0,0 @@
import time
import glob
import io
import os
import numpy
import numpy.typing
import functools
import pathlib
import threading
import cython
import datetime
from typing import (Any, Optional, TypeVar, Type, cast)
# from scoping import scoping as s
def test(
_id: int,
T: float,
a: numpy.ndarray[Any, numpy.dtype[numpy.int32]],
) -> None:
with cython.nogil:
#if True:
started_at = datetime.datetime.now()
print('started')
elapsed = lambda : (datetime.datetime.now() - started_at).total_seconds()
#a = 0
while elapsed() < T:
#a += 1
for k in range(1024 * 1024):
a[_id] += 1
print(['done', started_at, elapsed(), a[_id]])
M = TypeVar('M', bound=Type[Any])
def build(content: str, module: M) -> M:
import pathlib
import tempfile
import hashlib
import Cython.Build.Inline
sha256sum = hashlib.sha256(content.encode('utf-8')).digest().hex()
output_dir = (pathlib.Path('.') / 'tmp' / 'cython' / sha256sum).absolute()
if not output_dir.exists() or True:
os.makedirs(str(output_dir), exist_ok=True)
source_path = output_dir / ('_%s.pyx' % sha256sum)
if not source_path.exists():
with io.open(str(source_path), 'w') as f:
f.write(content)
t1 = Cython.Build.Inline._get_build_extension()
t1.extensions = Cython.Build.cythonize(str(source_path))
t1.build_temp = str(pathlib.Path('/'))
t1.build_lib = str(output_dir)
#t2 = Cython.Build.Inline.Extension(
# name=sha256sum,
#)
t1.run()
return cast(
M,
Cython.Build.Inline.load_dynamic(
'_%s' % sha256sum,
glob.glob(
str(output_dir / ('_%s*.so' % sha256sum))
)[0]
)
)
raise NotImplementedError
def mypyc_build(file_path: pathlib.Path) -> Any:
import pathlib
import tempfile
import hashlib
import mypyc.build
import Cython.Build.Inline
assert isinstance(file_path, pathlib.Path)
#sha256sum = hashlib.sha256(content.encode('utf-8')).digest().hex()
#output_dir = (pathlib.Path('.') / 'tmp' / 'cython' / sha256sum).absolute()
output_dir = pathlib.Path('.') / 'tmp' / 'mypyc'
sha256sum = file_path.stem
lib_pattern = file_path.parent / ('%s.cpython*.so' % sha256sum)
lib_dir = pathlib.Path('.')
def lib_path_glob(path: str | pathlib.Path) -> Optional[pathlib.Path]:
res : list[str] = glob.glob(str(path))
if len(res) == 0:
return None
else:
return pathlib.Path(res[0])
need_build : bool = False
lib_path : Optional[pathlib.Path] = None
lib_path = lib_path_glob(lib_pattern)
if not lib_path is None:
t2 = file_path.stat()
t3 = lib_path.stat()
if t3.st_mtime < t2.st_mtime:
need_build = True
del t2
del t3
else:
need_build = True
if need_build:
for o in [
output_dir,
output_dir / 'build' / file_path.parent,
]:
os.makedirs(
str(o),
exist_ok=True
)
#source_path = output_dir / ('_%s.py' % sha256sum)
source_path = file_path
#with io.open(str(source_path), 'w') as f:
# f.write(content)
t1 = Cython.Build.Inline._get_build_extension()
t1.extensions = mypyc.build.mypycify(
[str(source_path)],
target_dir=str(output_dir / 'build')
)
t1.build_temp = str(output_dir)
t1.build_lib = str(lib_dir)
#t2 = Cython.Build.Inline.Extension(
# name=sha256sum,
#)
t1.run()
lib_path = lib_path_glob(lib_pattern)
return Cython.Build.Inline.load_dynamic(
#'_%s' % sha256sum,
#t1.extensions[0].name,
file_path.stem,
str(lib_path),
)
raise NotImplementedError
class Source:
@staticmethod
def test2(
_a : numpy.ndarray[Any, numpy.dtype[numpy.int64]],
_id : numpy.dtype[numpy.int32] | int,
T : float=16
) -> int:
raise NotImplementedError
source = build(r'''
cimport cython
@cython.boundscheck(False)
@cython.wraparound(False)
def test4(int[:] a, int[:] b):
cdef int N = a.shape[0]
assert N == b.shape[0]
with cython.nogil:
for i in range(N):
a[i] += b[i]
return N
import datetime
def elapsed(started_at: datetime.datetime):
res = (datetime.datetime.now() - started_at).total_seconds()
return res
@cython.boundscheck(False) # Deactivate bounds checking
@cython.wraparound(False) # Deactivate negative indexing.
def has_time(started_at: datetime.datetime, T: float):
t1 = elapsed(started_at)
res = t1 < T
return res
@cython.boundscheck(False)
@cython.wraparound(False)
def test2(long long [:] _a, int _id, double T=16) -> int:
started_at = datetime.datetime.now()
print('started')
cdef int C = 1;
cdef int cond;
with cython.nogil:
#if True:
#a = 0
while True:
with cython.gil:
cond = has_time(started_at, T)
#cond = 0
if cond != 1:
break
#a += 1
for k in range(1024 * 1024 * 1024):
_a[_id] += C
print(['done', started_at, elapsed(started_at), _a[_id]])
return _a[_id]
''', Source)
def test_cython(N: int=4, T:int=16):
#a = [0] * N
a = numpy.zeros((N,), dtype=numpy.int64)
t = [
threading.Thread(
target=functools.partial(
source.test2,
a,
k,
T,
)
)
for k in range(N)
]
for o in t:
o.start()
for o in t:
o.join()
#cython_module['test2'](a, 0)
def test_mypyc(N: int=4, W:int=35):
cython2 = mypyc_build(
(pathlib.Path(__file__).parent / 'cython2.py').relative_to(
pathlib.Path.cwd()
)
)
# from .cython2 import fib
#a = [0] * N
t = [
threading.Thread(
target=functools.partial(
cython2.fib,
W,
)
)
for k in range(N)
]
for o in t:
o.start()
for o in t:
o.join()

@ -1,11 +0,0 @@
import time
def fib(n: int) -> int:
if n <= 1:
return n
else:
return fib(n - 2) + fib(n - 1)
t0 = time.time()
fib(32)
print(time.time() - t0)

@ -7,8 +7,3 @@ youtube-dl
gdown
aiohttp
mypy
ruff
cython
numpy
scoping
types-setuptools