[~] Refactor

This commit is contained in:
Siarhei Siniak 2024-11-10 12:28:27 +03:00
parent 560e748867
commit 81bf066e82
15 changed files with 595 additions and 71 deletions

3
.gitignore vendored

@ -3,3 +3,6 @@ __pycache__
d2/book1/books d2/book1/books
.DS_Store .DS_Store
.vim .vim
*.so
.mypy_cache
.ruff_cache

5
.mypy.ini Normal file

@ -0,0 +1,5 @@
[mypy]
mypy_path =
python/stubs
plugins =
numpy.typing.mypy_plugin

@ -1,5 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio import asyncio
import argparse
import enum import enum
import threading import threading
import shutil import shutil
@ -21,7 +22,7 @@ import sys
import tempfile import tempfile
import time import time
import traceback import traceback
from typing import (Literal, Optional,) from typing import (Literal, Optional, Iterable,)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -108,7 +109,10 @@ def intercept_output(
if len(t2) == 1 and (t2[0][1] & select.POLLIN) > 0 and \ if len(t2) == 1 and (t2[0][1] & select.POLLIN) > 0 and \
not (isinstance(last_data, bytes) and len(last_data) == 0): not (isinstance(last_data, bytes) and len(last_data) == 0):
t3 = current_subprocess.stdout.peek() t3 = current_subprocess.stdout.peek()
t4 = current_subprocess.stdout.read(len(t3)) t4 = current_subprocess.stdout.read(len(t3))
assert isinstance(t4, int)
last_data = t3 last_data = t3
output.append(t3) output.append(t3)
if need_lines: if need_lines:
@ -168,7 +172,7 @@ def player_metadata():
} }
return '%s - %s' % (metadata['artist'], metadata['title']) return '%s - %s' % (metadata['artist'], metadata['title'])
time.sleep(1.0) time.sleep(1.0)
except: except Exception:
continue continue
def memory_stats(): def memory_stats():
@ -346,7 +350,7 @@ def eternal_oom(argv):
t1 = subprocess.check_output([ t1 = subprocess.check_output([
'pgrep', '-a', '-f', p[0] 'pgrep', '-a', '-f', p[0]
]).decode('utf-8') ]).decode('utf-8')
except: except Exception:
continue continue
t2 = t1.splitlines() t2 = t1.splitlines()
if not p[1] is None: if not p[1] is None:
@ -674,7 +678,7 @@ def eternal_oom(argv):
pid, pid,
)) ))
os.kill(pid, signal.SIGKILL) os.kill(pid, signal.SIGKILL)
except: except Exception:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
custom_notify( custom_notify(
msg='oom_kill, failed to kill pid %d' % pid msg='oom_kill, failed to kill pid %d' % pid
@ -876,7 +880,7 @@ def resilient_vlc(stream=None):
t2 = p.wait(timeout=1) t2 = p.wait(timeout=1)
print(t2) print(t2)
break break
except: except Exception:
print('shit') print('shit')
pass pass
time.sleep(1.0) time.sleep(1.0)
@ -1021,7 +1025,7 @@ def eternal_firefox(
swaymsg '[pid="%d"] kill' swaymsg '[pid="%d"] kill'
''' % (p.pid,)) == 0 ''' % (p.pid,)) == 0
break break
except: except Exception:
import traceback import traceback
import pprint import pprint
pprint.pprint(traceback.format_exc()) pprint.pprint(traceback.format_exc())
@ -1118,7 +1122,7 @@ def http_server(argv):
'ping', '-w', '1', 'ping', '-w', '1',
options.host options.host
]) ])
except: except Exception:
raise RuntimeError('invalid ip address %s' % options.host) raise RuntimeError('invalid ip address %s' % options.host)
@ -1373,7 +1377,7 @@ def pass_ssh_osx(argv):
t3 = int(t2) t3 = int(t2)
assert t3 >= 0 and t3 < len(t1) assert t3 >= 0 and t3 < len(t1)
break break
except: except AssertionError:
continue continue
command = r''' command = r'''
@ -1516,7 +1520,7 @@ def pass_ssh_osx(argv):
print('\rcleared cliboard\n', end='') print('\rcleared cliboard\n', end='')
def player_v1(folder_url, item_id): def player_v1(folder_url, item_id):
import sys #import sys
import urllib.parse import urllib.parse
import re import re
import subprocess import subprocess
@ -1537,10 +1541,16 @@ def player_v1(folder_url, item_id):
t7 = t5[k] t7 = t5[k]
t9 = urllib.parse.unquote(os.path.split(t7)[1]) t9 = urllib.parse.unquote(os.path.split(t7)[1])
progress_bar.set_description('%03d %s' % (k, t9)) progress_bar.set_description('%03d %s' % (k, t9))
with subprocess.Popen(['ffprobe', '-hide_banner', '-i', t7], stderr=subprocess.PIPE, stdout=subprocess.PIPE) as p: with subprocess.Popen([
'ffprobe',
'-hide_banner',
'-i',
t7
], stderr=subprocess.PIPE, stdout=subprocess.PIPE) as p:
p.wait() p.wait()
assert p.returncode == 0 assert p.returncode == 0
t8 = p.stderr.read().decode('utf-8') t8 = p.stderr.read().decode('utf-8')
assert isinstance(t8, str)
#print(t8) #print(t8)
with subprocess.Popen(['ffplay', '-hide_banner', '-nodisp', '-autoexit', '-loop', '1', t7], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) as p: with subprocess.Popen(['ffplay', '-hide_banner', '-nodisp', '-autoexit', '-loop', '1', t7], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) as p:
p.wait() p.wait()
@ -1627,6 +1637,7 @@ def pm_service(argv):
r'service:AppleMultitouchDevice product:Apple Internal Keyboard / Trackpad ' r'service:AppleMultitouchDevice product:Apple Internal Keyboard / Trackpad '
r'eventType:11" age:00:00:00 id:38654742889 [System: PrevIdle DeclUser kDisp]' r'eventType:11" age:00:00:00 id:38654742889 [System: PrevIdle DeclUser kDisp]'
) )
assert isinstance(sample, str)
action = None action = None
with subprocess.Popen(['log', 'stream'], stdout=subprocess.PIPE) as p: with subprocess.Popen(['log', 'stream'], stdout=subprocess.PIPE) as p:
@ -1800,7 +1811,7 @@ def scrap_yt_music(argv: Iterable[str]) -> None:
stderr=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
) )
logging.info(json.dumps(dict(output_name=output_name))) logging.info(json.dumps(dict(output_name=output_name)))
except: except Exception:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
finally: finally:
if not p is None: if not p is None:
@ -1983,14 +1994,14 @@ def desktop_services(argv):
def vlc_is_playing_fullscreen(cls): def vlc_is_playing_fullscreen(cls):
import subprocess import subprocess
import json import json
import sys #import sys
import pprint #import pprint
t2 = [] t2 = []
try: try:
t1 = subprocess.check_output(['swaymsg', '-t', 'get_tree']).decode('utf-8') t1 = subprocess.check_output(['swaymsg', '-t', 'get_tree']).decode('utf-8')
t2 = json.loads(t1) t2 = json.loads(t1)
except: except Exception:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
def walk(o, cb): def walk(o, cb):
@ -2024,7 +2035,7 @@ def desktop_services(argv):
t4 = subprocess.check_output([ t4 = subprocess.check_output([
'playerctl', '-p', 'vlc', 'status' 'playerctl', '-p', 'vlc', 'status'
], timeout=1,).decode('utf-8').strip() == 'Playing' ], timeout=1,).decode('utf-8').strip() == 'Playing'
except: except Exception:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
#pprint.pprint(t3) #pprint.pprint(t3)
@ -2118,7 +2129,7 @@ def desktop_services(argv):
), ),
end='' end=''
) )
except: except Exception:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
class Backlight: class Backlight:
@ -2154,7 +2165,7 @@ def desktop_services(argv):
] ]
return any([o['dpms'] for o in t4]) return any([o['dpms'] for o in t4])
except: except Exception:
return True return True
def check(self): def check(self):
@ -2183,7 +2194,7 @@ def desktop_services(argv):
))) )))
Backlight.disable() Backlight.disable()
self.dpms = new_dpms self.dpms = new_dpms
except: except Exception:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
@classmethod @classmethod
@ -2862,14 +2873,14 @@ echo 6500 > /sys/bus/platform/devices/applesmc.768/fan1_output;
o.check() o.check()
time.sleep(0.1) time.sleep(0.1)
except: except Exception:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
finally: finally:
for o in services: for o in services:
try: try:
o.terminate() o.terminate()
o.wait(timeout=10) o.wait(timeout=10)
except: except Exception:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
logging.error('killed %s' % str(o.__dict__)) logging.error('killed %s' % str(o.__dict__))
o.kill() o.kill()
@ -2879,7 +2890,7 @@ def suspend_timer(argv):
import datetime; import datetime;
import subprocess; import subprocess;
import time; import time;
import sys; #import sys;
if len(argv) == 0: if len(argv) == 0:
print("enter HH:MM"); print("enter HH:MM");
t3 = input().strip() t3 = input().strip()
@ -3119,7 +3130,7 @@ def socat_ssh(argv):
try: try:
p.terminate() p.terminate()
p.wait(10) p.wait(10)
except: except Exception:
p.kill() p.kill()
restart = False restart = False
@ -3183,10 +3194,10 @@ def share_wifi(argv):
shell=True shell=True
).decode('utf-8') ).decode('utf-8')
if len(pw) == 0: if len(pw) == 0:
p2 = subprocess.check_output( pw = subprocess.check_output(
'pwgen -syn 20 1', 'pwgen -syn 20 1',
shell=True shell=True
).decode('utf-8') ).decode('utf-8').strip()
with subprocess.Popen( with subprocess.Popen(
['qrencode', '-t', 'UTF8'], ['qrencode', '-t', 'UTF8'],
stdin=subprocess.PIPE stdin=subprocess.PIPE
@ -3207,6 +3218,8 @@ def share_wifi(argv):
def on_interrupt(sig, *args, **kwargs): def on_interrupt(sig, *args, **kwargs):
nonlocal restart nonlocal restart
nonlocal shutdown
if sig == signal.SIGINT: if sig == signal.SIGINT:
restart = True restart = True
elif sig == signal.SIGTERM: elif sig == signal.SIGTERM:
@ -3248,7 +3261,7 @@ def share_wifi(argv):
) )
try: try:
hostapd.wait(20) hostapd.wait(20)
except: except Exception:
hostapd.terminate() hostapd.terminate()
restart = False restart = False
@ -3264,7 +3277,7 @@ def share_wifi(argv):
restart = True restart = True
last_timestamp = datetime.datetime.now() last_timestamp = datetime.datetime.now()
except: except Exception:
print(traceback.format_exc()) print(traceback.format_exc())
restart = True restart = True
finally: finally:
@ -3340,7 +3353,7 @@ printf '% 3.0f%%' $(upower -d | grep -Po 'percentage:\\s+\\d+(\\.\\d+)?%' | grep
config.update( config.update(
json.load(f) json.load(f)
) )
except: except Exception:
logging.error(traceback.format_exc()) logging.error(traceback.format_exc())
pass pass
@ -3370,7 +3383,7 @@ printf '% 3.0f%%' $(upower -d | grep -Po 'percentage:\\s+\\d+(\\.\\d+)?%' | grep
timeout=timeout2, timeout=timeout2,
).decode('utf-8').strip() ).decode('utf-8').strip()
) )
except: except Exception:
t1.append('fail %d' % sh_index) t1.append('fail %d' % sh_index)
t3 = ' | '.join(t1).replace('\n\r', '') t3 = ' | '.join(t1).replace('\n\r', '')
@ -3400,15 +3413,16 @@ def custom_translate(current_string, check, none_char=None,):
def media_keys(argv): def media_keys(argv):
assert isinstance(argv, list) and all([isinstance(o, str) for o in argv]) assert isinstance(argv, list) and all([isinstance(o, str) for o in argv])
parser = optparse.OptionParser() parser = argparse.ArgumentParser()
parser.add_option( parser.add_argument(
'--command', #'-c', '--command',
dest='command', 'command',
#dest='command',
type=str, type=str,
default=None, default=None,
) )
options, args = parser.parse_args(argv) options, args = parser.parse_known_args(argv)
if options.command is None and len(args) > 0: if options.command is None and len(args) > 0:
assert len(args) == 1 assert len(args) == 1
@ -3417,6 +3431,8 @@ def media_keys(argv):
assert options.command in [ assert options.command in [
'media-play-pause', 'media-play-pause',
'media-next', 'media-next',
'media-forward-seconds',
'media-backward-seconds',
'media-prev', 'media-prev',
'media-lower-volume', 'media-lower-volume',
'media-raise-volume', 'media-raise-volume',
@ -3465,6 +3481,32 @@ def media_keys(argv):
msg = player_metadata() msg = player_metadata()
else: else:
raise NotImplementedError raise NotImplementedError
elif options.command == 'media-backward-seconds':
if mode == 'mocp':
raise NotImplementedError
elif mode == 'playerctl':
pos = float(subprocess.check_output(['playerctl', 'position']).decode('utf-8'))
subprocess.check_call([
'playerctl',
'position',
'%f' % (pos - float(args[0]))
])
#msg = player_metadata()
else:
raise NotImplementedError
elif options.command == 'media-forward-seconds':
if mode == 'mocp':
raise NotImplementedError
elif mode == 'playerctl':
pos = float(subprocess.check_output(['playerctl', 'position']).decode('utf-8'))
subprocess.check_call([
'playerctl',
'position',
'%f' % (pos + float(args[0]))
])
#msg = player_metadata()
else:
raise NotImplementedError
elif options.command == 'media-prev': elif options.command == 'media-prev':
if mode == 'mocp': if mode == 'mocp':
subprocess.check_call(['mocp', '--previous']) subprocess.check_call(['mocp', '--previous'])
@ -3593,7 +3635,7 @@ def commands_cli() -> None:
raise NotImplementedError raise NotImplementedError
except SystemExit: except SystemExit:
pass pass
except: except Exception:
msg = 'not implemented\n%s' % traceback.format_exc() msg = 'not implemented\n%s' % traceback.format_exc()
logging.error(msg) logging.error(msg)

0
dotfiles/m.py Normal file

37
m

@ -1,37 +0,0 @@
#!/usr/bin/env python3
#vim: set filetype=python
import sys
import optparse
import subprocess
import os
def js(argv):
return subprocess.check_call([
'sudo',
'docker-compose',
'--project-directory',
os.path.abspath(
os.path.dirname(__file__),
),
'-f',
os.path.abspath(
os.path.join(
os.path.dirname(__file__),
'docker', 'js',
'docker-compose.yml',
)
),
*argv,
])
if len(sys.argv) > 1 and sys.argv[1] == 'js':
js(sys.argv[2:])
elif len(sys.argv) > 1 and sys.argv[1] == 'mypy':
subprocess.check_call([
'mypy',
'--strict',
'dotfiles/.local/bin/commands',
])
else:
raise NotImplementedError

1
m Symbolic link

@ -0,0 +1 @@
m.py

195
m.py Executable file

@ -0,0 +1,195 @@
#!/usr/bin/env python3
#vim: set filetype=python
import logging
import json
import enum
import pathlib
import sys
import argparse
#import optparse
import subprocess
import os
from typing import (Optional, Any,)
logger = logging.getLogger()
def js(argv):
return subprocess.check_call([
'sudo',
'docker-compose',
'--project-directory',
os.path.abspath(
os.path.dirname(__file__),
),
'-f',
os.path.abspath(
os.path.join(
os.path.dirname(__file__),
'docker', 'js',
'docker-compose.yml',
)
),
*argv,
])
def env(
argv: Optional[list[str]] = None,
**kwargs: Any,
) -> Optional[subprocess.CompletedProcess]:
env_path = pathlib.Path(__file__).parent / 'tmp' / 'env3'
if not env_path.exists():
subprocess.check_call([
sys.executable, '-m', 'venv',
'--system-site-packages',
str(env_path)
])
subprocess.check_call([
env_path / 'bin' / 'python3',
'-m', 'pip',
'install', '-r', 'requirements.txt',
])
if not argv is None:
return subprocess.run([
str(env_path / 'bin' / 'python3'),
*argv,
], **kwargs)
return None
def ruff(args: list[str]) -> None:
res = env([
'-m',
'ruff',
'check',
*args,
'--output-format', 'json',
'--ignore', ','.join([
'E731',
'E713',
'E714',
'E703',
]),
'.',
'dotfiles/.local/bin/commands',
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert not res is None
errors = json.loads(res.stdout.decode('utf-8'))
g: dict[str, Any] = dict()
for o in errors:
if not o['filename'] in g:
g[o['filename']] = []
g[o['filename']].append(o)
h = {
k : len(v)
for k, v in g.items()
}
logger.info(json.dumps(errors, indent=4))
logger.info(json.dumps(h, indent=4))
def mypy(args: list[str]) -> None:
res = env([
'-m',
'mypy',
'--strict',
'-O',
'json',
*args,
'dotfiles/.local/bin/commands',
'python',
'm.py',
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert not res is None
errors = [
json.loads(o)
for o in res.stdout.decode('utf-8').splitlines()
]
g : dict[str, Any] = dict()
for o in errors:
if not o['file'] in g:
g[o['file']] = []
g[o['file']].append(o)
h = {
k : len(v)
for k, v in g.items()
}
logger.info(json.dumps(errors, indent=4))
logger.info(json.dumps(h, indent=4))
def inside_env() -> bool:
try:
import numpy
return True
except Exception:
return False
def run(argv: Optional[list[str]] = None) -> None:
logging.basicConfig(
level=logging.INFO,
format=(
'%(levelname)s:%(name)s:%(message)s'
':%(process)d'
':%(asctime)s'
':%(pathname)s:%(funcName)s:%(lineno)s'
),
)
if argv is None:
argv = sys.argv[1:]
class Commands(enum.StrEnum):
js = 'js'
mypy = 'mypy'
env = 'env'
ruff = 'ruff'
m2 = 'm2'
parser = argparse.ArgumentParser()
parser.add_argument(
#'command',
'_command',
choices=[
o.value
for o in Commands
],
#required=True,
)
options, args = parser.parse_known_args(argv)
options.command = Commands(options._command)
if options.command is Commands.js:
js(args)
elif options.command is Commands.env:
env(args)
elif options.command is Commands.mypy:
mypy(args)
elif options.command is Commands.ruff:
ruff(args)
elif options.command is Commands.m2:
if not inside_env():
return env(['--', 'm.py', 'm2', *args])
import python.tasks.cython
python.tasks.cython.mypyc_build(pathlib.Path('m.py'))
else:
raise NotImplementedError
if __name__ == '__main__':
run(sys.argv[1:])

@ -0,0 +1,3 @@
import distutils.command
def _get_build_extension() -> distutils.command.build_ext: ...

@ -0,0 +1,17 @@
import setuptools.extension
from typing import (Iterable,)
def cythonize(
module_list: str | Iterable[str]
#module_list,
#exclude=None,
#nthreads=0,
#aliases=None,
#quiet=False,
#force=None,
#language=None,
#exclude_failures=False,
#show_all_warnings=False,
#**options
) -> setuptools.extension.Extension: ...

@ -0,0 +1,7 @@
from typing import (Type, Any,)
class NoGIL:
def __enter__(self): ...
def __exit__(self, exc_class: Type[Exception], exc: Exception, tb: Any) -> None: ...
nogil = NoGIL()

@ -0,0 +1,12 @@
import setuptools.extension
import pathlib
class build_ext:
extensions : list[setuptools.extension.Extension]
build_temp : pathlib.Path
build_lib: pathlib.Path
def run(self) -> None:
...
...

260
python/tasks/cython.py Normal file

@ -0,0 +1,260 @@
import time
import glob
import io
import os
import numpy
import numpy.typing
import functools
import pathlib
import threading
import cython
import datetime
from typing import (Any, Optional,)
# from scoping import scoping as s
def test(
_id: int,
T: float,
a: numpy.ndarray[Any, numpy.dtype[numpy.int32]],
) -> None:
with cython.nogil:
#if True:
started_at = datetime.datetime.now()
print('started')
elapsed = lambda : (datetime.datetime.now() - started_at).total_seconds()
#a = 0
while elapsed() < T:
#a += 1
for k in range(1024 * 1024):
a[_id] += 1
print(['done', started_at, elapsed(), a[_id]])
def build(content: str) -> Any:
import pathlib
import tempfile
import hashlib
import Cython.Build.Inline
sha256sum = hashlib.sha256(content.encode('utf-8')).digest().hex()
output_dir = (pathlib.Path('.') / 'tmp' / 'cython' / sha256sum).absolute()
if not output_dir.exists() or True:
os.makedirs(str(output_dir), exist_ok=True)
source_path = output_dir / ('_%s.pyx' % sha256sum)
if not source_path.exists():
with io.open(str(source_path), 'w') as f:
f.write(content)
t1 = Cython.Build.Inline._get_build_extension()
t1.extensions = [Cython.Build.cythonize(str(source_path))]
t1.build_temp = pathlib.Path('/')
t1.build_lib = output_dir
#t2 = Cython.Build.Inline.Extension(
# name=sha256sum,
#)
t1.run()
return Cython.Build.Inline.load_dynamic(
'_%s' % sha256sum,
glob.glob(
str(output_dir / ('_%s*.so' % sha256sum))
)[0]
)
raise NotImplementedError
def mypyc_build(file_path: pathlib.Path) -> Any:
import pathlib
import tempfile
import hashlib
import mypyc.build
import Cython.Build.Inline
assert isinstance(file_path, pathlib.Path)
#sha256sum = hashlib.sha256(content.encode('utf-8')).digest().hex()
#output_dir = (pathlib.Path('.') / 'tmp' / 'cython' / sha256sum).absolute()
output_dir = pathlib.Path('.') / 'tmp' / 'mypyc'
sha256sum = file_path.stem
lib_pattern = file_path.parent / ('%s.cpython*.so' % sha256sum)
lib_dir = pathlib.Path('.')
def lib_path_glob(path: str | pathlib.Path) -> Optional[pathlib.Path]:
res : list[str] = glob.glob(str(path))
if len(res) == 0:
return None
else:
return pathlib.Path(res[0])
need_build : bool = False
lib_path : Optional[pathlib.Path] = None
lib_path = lib_path_glob(lib_pattern)
if not lib_path is None:
t2 = file_path.stat()
t3 = lib_path.stat()
if t3.st_mtime < t2.st_mtime:
need_build = True
del t2
del t3
else:
need_build = True
if need_build:
for o in [
output_dir,
output_dir / 'build' / file_path.parent,
]:
os.makedirs(
str(o),
exist_ok=True
)
#source_path = output_dir / ('_%s.py' % sha256sum)
source_path = file_path
#with io.open(str(source_path), 'w') as f:
# f.write(content)
t1 = Cython.Build.Inline._get_build_extension()
t1.extensions = mypyc.build.mypycify(
[str(source_path)],
target_dir=str(output_dir / 'build')
)
t1.build_temp = str(output_dir)
t1.build_lib = lib_dir
#t2 = Cython.Build.Inline.Extension(
# name=sha256sum,
#)
t1.run()
lib_path = lib_path_glob(lib_pattern)
return Cython.Build.Inline.load_dynamic(
#'_%s' % sha256sum,
#t1.extensions[0].name,
file_path.stem,
str(lib_path),
)
raise NotImplementedError
source = build(r'''
cimport cython
@cython.boundscheck(False)
@cython.wraparound(False)
def test4(int[:] a, int[:] b):
cdef int N = a.shape[0]
assert N == b.shape[0]
with cython.nogil:
for i in range(N):
a[i] += b[i]
return N
import datetime
def elapsed(started_at: datetime.datetime):
res = (datetime.datetime.now() - started_at).total_seconds()
return res
@cython.boundscheck(False) # Deactivate bounds checking
@cython.wraparound(False) # Deactivate negative indexing.
def has_time(started_at: datetime.datetime, T: float):
t1 = elapsed(started_at)
res = t1 < T
return res
@cython.boundscheck(False)
@cython.wraparound(False)
def test2(long long [:] _a, int _id, double T=16) -> int:
started_at = datetime.datetime.now()
print('started')
cdef int C = 1;
cdef int cond;
with cython.nogil:
#if True:
#a = 0
while True:
with cython.gil:
cond = has_time(started_at, T)
#cond = 0
if cond != 1:
break
#a += 1
for k in range(1024 * 1024 * 1024):
_a[_id] += C
print(['done', started_at, elapsed(started_at), _a[_id]])
return _a[_id]
''')
def test_cython(N: int=4, T:int=16):
#a = [0] * N
a = numpy.zeros((N,), dtype=numpy.int64)
t = [
threading.Thread(
target=functools.partial(
source.test2,
a,
k,
T,
)
)
for k in range(N)
]
for o in t:
o.start()
for o in t:
o.join()
#cython_module['test2'](a, 0)
def test_mypyc(N: int=4, W:int=35):
cython2 = mypyc_build(
(pathlib.Path(__file__).parent / 'cython2.py').relative_to(
pathlib.Path.cwd()
)
)
# from .cython2 import fib
#a = [0] * N
t = [
threading.Thread(
target=functools.partial(
cython2.fib,
W,
)
)
for k in range(N)
]
for o in t:
o.start()
for o in t:
o.join()

11
python/tasks/cython2.py Normal file

@ -0,0 +1,11 @@
import time
def fib(n: int) -> int:
if n <= 1:
return n
else:
return fib(n - 2) + fib(n - 1)
t0 = time.time()
fib(32)
print(time.time() - t0)

@ -7,3 +7,8 @@ youtube-dl
gdown gdown
aiohttp aiohttp
mypy mypy
ruff
cython
numpy
scoping
types-setuptools