[+] update vim, pydantic

1. refactor python vim module;
  1.1. experiment
    with fast select based on popup in vim,
    and multi threaded app in python;
  1.2. TODO,
    figure out some thread safe way
    to call vim.command from python side threads;
  1.3. update pydantic validate params;
This commit is contained in:
Siarhei Siniak 2025-10-12 00:29:01 +03:00
parent cc4a703bcb
commit cf2476ec28
9 changed files with 753 additions and 232 deletions

@ -86,6 +86,7 @@ dotfiles_vim_put:
cp dotfiles/.vimrc ~/.vimrc
cp dotfiles/.py3.vimrc ~/.py3.vimrc
cp dotfiles/.module.vimrc.py ~/.module.vimrc.py
PLATFORM ?= macbook_air_2012
PLATFORM_TMP ?= tmp/platform_dotfiles/$(PLATFORM)

495
dotfiles/.beta.vimrc.py Normal file

@ -0,0 +1,495 @@
import functools
import configparser
import collections
import asyncio
import threading
import re
import inspect
import pathlib
import logging
import fnmatch
import vim
from typing import (
Optional,
ClassVar,
Self,
Any,
Callable,
)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.WARNING)
MODULE_NAME = 'online_fxreader_pr34_vim'
def f1():
t1 = vim.current.window
t2 = t1.width
vim.command('vnew')
t3 = t2 // 3
vim.command('vertical resize %d' % t3)
vim.current.window = t1
def f2():
context = {k: vim.options['splitright'] for k in ['splitright']}
try:
current_window = vim.current.window
vim.options['splitright'] = True
vim.command('vnew')
vim.command('r! tmux show-buffer')
vim.current.window = current_window
finally:
for k, v in context.items():
vim.options[k] = v
def f5_1(pattern, flags, info):
import subprocess
import io
import re
import tempfile
import traceback
import logging
# print([pattern, flags, info])
completed_process = None
options = dict(
recursive=False,
ext=[],
)
# print('fuck')
if b'r' in flags:
while True:
ext_m = re.compile(r'^.([^\,]+),(.*)$').match(pattern)
if pattern[:3] in [r'\r,']:
options['recursive'] = True
pattern = pattern[3:]
elif not ext_m is None:
options['ext'].append(ext_m[1])
pattern = ext_m[2]
else:
break
print(
[
flags,
pattern,
options,
]
)
try:
git_cmd = [
'git',
'grep',
'-n',
]
if options['recursive']:
git_cmd.append('--recurse-submodules')
git_cmd.extend(['-P', pattern])
if len(options['ext']) > 0:
git_cmd.extend(['--', *['**/*%s' % o for o in options['ext']]])
completed_process = subprocess.run(
git_cmd,
capture_output=True,
)
assert completed_process.returncode == 0 or (
completed_process.stdout == b''
# completed_process.stdout == b'' and
# completed_process.stderr == b''
)
t1 = completed_process.stdout
except:
logging.error(
''.join(
[
traceback.format_exc(),
getattr(completed_process, 'stdout', b'').decode('utf-8'),
getattr(completed_process, 'stderr', b'').decode('utf-8'),
]
)
)
t1 = b''
def watch(data):
with tempfile.NamedTemporaryFile(suffix='.txt') as f:
with io.open(f.name, 'wb') as f2:
f2.write(data)
vim.command('!less %s' % f.name)
# watch(t1)
t2 = []
for o in t1.splitlines():
try:
# watch(o.encode('utf-8'))
t3 = o.decode('utf-8')
t4 = re.compile(r'^([^\:\=]+)[\:\=](\d+)[\:\=](.*)$').match(t3)
if not t4 is None:
t2.append(
dict(
name=t4[3].strip(),
filename=t4[1],
cmd=t4[2],
)
)
except:
pass
# print(t2)
# return [{'name': 'blah', 'filename': 'docker-compose.yml', 'cmd': '23'}]
return t2
class EditorConfigModeline:
_instance: ClassVar[Optional['EditorConfigModeline']] = None
def __init__(self) -> None:
self.configs: dict[
pathlib.Path,
dict[str, str],
] = dict()
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def load_config(self) -> Optional[dict[str, str]]:
cwd = pathlib.Path.cwd()
if not cwd in self.configs:
config_path = cwd / '.editorconfig'
if not config_path.exists():
return None
parser = configparser.ConfigParser()
parser.optionxform = str # keep case
parser.read(str(config_path))
config: dict[str, str] = dict()
for section in parser.sections():
logger.info(dict(section=section))
if len(section) > 0:
# pattern = section[1:-1]
pattern = section
if not parser[section].get('vim_modeline') is None:
config[pattern] = parser[section].get('vim_modeline')
self.validate_modeline(config[pattern])
self.configs[cwd] = config
return self.configs[cwd]
@classmethod
def validate_modeline(cls, modeline: str) -> None:
pattern = re.compile(r'^set(\s+(noet|sts|ts|et|ai|ci|noai|noci|sw)(\=\w)?)+$')
assert pattern.match(modeline), 'invalid modeline %s' % modeline
@classmethod
def find_entry(
cls,
file_path: pathlib.Path,
config: Optional[dict[str, str]] = None,
) -> Optional[str]:
if config is None:
return None
project_root = pathlib.Path.cwd()
if file_path.is_relative_to(project_root):
rel_path = file_path.relative_to(pathlib.Path.cwd())
else:
rel_path = file_path
for pattern, modeline in config.items():
if fnmatch.fnmatch(str(rel_path), pattern):
return modeline
return None
def on_buffer(self) -> None:
config = self.load_config()
logger.info(dict(config=config))
buf_name = vim.current.buffer.name
file_path = pathlib.Path(buf_name).resolve()
entry = self.find_entry(file_path, config=config)
logger.info(dict(modeline=entry))
vim.command('silent! {}'.format(entry))
# vim.command("echo '{}'".format('applied %s' % entry))
# raise NotImplementedError
class _Vim:
@classmethod
def run_command(cls, cmd) -> list[str]:
logger.info(dict(cmd=cmd))
output: list[str] = []
for line in cmd.splitlines():
if line.strip() == '':
continue
output.append(
vim.command(line)
)
return output
def future_dump_exception(future: Any) -> None:
try:
future.result()
except:
logger.exception('')
class FastSelect:
_instance: ClassVar[Optional['FastSelect']] = None
def __init__(self) -> None:
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(
target=self.loop.run_forever,
)
self._queue : collections.deque[Callable[[], None]] = collections.deque()
self._lock = threading.Lock()
self.thread.start()
self._option_id : asyncio.Future[Optional[int]] = None
self._options: list[str] = None
auto_group = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'close',
).capitalize()
vim.command(r'''
func! UIThread(timer_id)
python3 FastSelect.singleton().ui_thread()
endfunc
''')
_Vim.run_command(r'''
call timer_start(1000, 'UIThread', {'repeat': -1})
''')
_Vim.run_command(r'''
augroup {auto_group}
autocmd!
autocmd VimLeavePre * python3 FastSelect.singleton().close()
augroup END
'''.format(
auto_group=auto_group,
))
def __del__(self) -> None:
self.close()
def close(self) -> None:
logger.info(dict(msg='close started'))
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
logger.info(dict(msg='close done'))
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def pick_option_put_id(self, option_id: int) -> None:
self.loop.call_soon_threadsafe(
lambda: self._option_id.set_result(option_id)
)
async def _switch_buffer(self) -> None:
buffers_future : asyncio.Future[list[tuple[str, int]]] = asyncio.Future()
def get_buffers() -> list[tuple[str, int]]:
res = [
(o.name, o.number)
for o in vim.buffers
]
self.loop.call_soon_threadsafe(
lambda: buffers_future.set_result(res)
)
with self._lock:
self._queue.append(get_buffers)
buffers = await buffers_future
logger.info(dict(buffers=buffers[:3]))
selected_id = await self._pick_option_from_popup(
[o[0] for o in buffers]
)
logger.info(dict(selected_id=selected_id))
def ui_switch_buffer():
nonlocal selected_id
nonlocal buffers
logger.warning(dict(buffers=list(vim.buffers), id=selected_id))
# print(vim.buffers, selected_id)
if selected_id >= 0:
vim.current.buffer = vim.buffers[buffers[selected_id][1]]
with self._lock:
self._queue.append(ui_switch_buffer)
def switch_buffer(self) -> None:
logger.info(dict(msg='before switch_buffer started'))
result = asyncio.run_coroutine_threadsafe(
self._switch_buffer(),
self.loop
)
result.add_done_callback(future_dump_exception)
logger.info(dict(msg='after switch_buffer started'))
def pick_file_from_popup(
self,
paths: list[pathlib.Path],
) -> None:
_Vim.run_command(r'''
call popup_menu(['asdasdfasdfasdfasdfasdfasdfasdff', 'adfadf'], {'title': '!!!!sdfasdf'})
''')
return
logger.info(dict(msg='before pick started'))
result = asyncio.run_coroutine_threadsafe(
self._pick_option_from_popup(
options=[str(o) for o in paths]
),
self.loop
)
result.add_done_callback(future_dump_exception)
logger.info(dict(msg='after pick started'))
async def _pick_option_from_popup(
self,
options: list[str],
) -> Optional[int]:
logger.info(dict(msg='started'))
self._options = options
self._option_id = asyncio.Future[int]()
await self._pick_option_start_popup()
option_id = await self._option_id
logger.info(dict(option_id=option_id))
self._options = None
self._option_id = None
logger.info(dict(msg='done'))
if option_id >= 0:
return option_id
else:
return None
def ui_thread(self):
with self._lock:
while len(self._queue) > 0:
cmd = self._queue.pop();
logger.warning(dict(msg='start command', cmd=inspect.getsource(cmd)))
try:
cmd()
except:
logger.exception('')
# self._result.append(
# vim.command(cmd)
# )
async def _pick_option_start_popup(
self,
):
callback_name = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'popup_callback',
).capitalize()
if int(vim.eval('exists("{}")'.format(callback_name))) == 1:
logger.warning(dict(msg='callback already defined, %s' % callback_name))
vim.command(r"""
function! {callback_name}(id, result)
if a:result > 0
call py3eval('FastSelect.singleton().pick_option_put_id(' . (a:result - 1). ')')
else
call py3eval('FastSelect.singleton().pick_option_put_id(-1)')
endif
endfunction
""".format(
callback_name=callback_name,
))
logger.info(dict(msg='before popup'))
popup_menu = vim.Function('popup_menu')
with self._lock:
self._queue.append(
lambda : popup_menu(
self._options,
{
'title': 'Select a file',
'callback': callback_name
}
)
#lambda : vim.command(
# "call popup_menu({options}, {'title': '{title}', 'callback': '{callback}'})".replace(
# '{options}', '[%s]' % ','.join([
# '\'%s\'' % o.replace('\'', '\\\'')
# for o in self._options
# ]),
# ).replace(
# '{title}', 'Select a file',
# ).replace(
# '{callback}',
# callback_name
# )
#)
)
# logger.info(dict(popup_id=popup_id))
# logger.info(dict(msg='after popup'))

243
dotfiles/.module.vimrc.py Normal file

@ -0,0 +1,243 @@
import functools
import configparser
import collections
import asyncio
import threading
import re
import inspect
import pathlib
import logging
import fnmatch
import vim
from typing import (
Optional,
ClassVar,
Self,
Any,
Callable,
)
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.WARNING)
MODULE_NAME = 'online_fxreader_pr34_vim'
def f1():
t1 = vim.current.window
t2 = t1.width
vim.command('vnew')
t3 = t2 // 3
vim.command('vertical resize %d' % t3)
vim.current.window = t1
def f2():
context = {k: vim.options['splitright'] for k in ['splitright']}
try:
current_window = vim.current.window
vim.options['splitright'] = True
vim.command('vnew')
vim.command('r! tmux show-buffer')
vim.current.window = current_window
finally:
for k, v in context.items():
vim.options[k] = v
def f5_1(pattern, flags, info):
import subprocess
import io
import re
import tempfile
import traceback
import logging
# print([pattern, flags, info])
completed_process = None
options = dict(
recursive=False,
ext=[],
)
# print('fuck')
if b'r' in flags:
while True:
ext_m = re.compile(r'^.([^\,]+),(.*)$').match(pattern)
if pattern[:3] in [r'\r,']:
options['recursive'] = True
pattern = pattern[3:]
elif not ext_m is None:
options['ext'].append(ext_m[1])
pattern = ext_m[2]
else:
break
print(
[
flags,
pattern,
options,
]
)
try:
git_cmd = [
'git',
'grep',
'-n',
]
if options['recursive']:
git_cmd.append('--recurse-submodules')
git_cmd.extend(['-P', pattern])
if len(options['ext']) > 0:
git_cmd.extend(['--', *['**/*%s' % o for o in options['ext']]])
completed_process = subprocess.run(
git_cmd,
capture_output=True,
)
assert completed_process.returncode == 0 or (
completed_process.stdout == b''
# completed_process.stdout == b'' and
# completed_process.stderr == b''
)
t1 = completed_process.stdout
except:
logging.error(
''.join(
[
traceback.format_exc(),
getattr(completed_process, 'stdout', b'').decode('utf-8'),
getattr(completed_process, 'stderr', b'').decode('utf-8'),
]
)
)
t1 = b''
def watch(data):
with tempfile.NamedTemporaryFile(suffix='.txt') as f:
with io.open(f.name, 'wb') as f2:
f2.write(data)
vim.command('!less %s' % f.name)
# watch(t1)
t2 = []
for o in t1.splitlines():
try:
# watch(o.encode('utf-8'))
t3 = o.decode('utf-8')
t4 = re.compile(r'^([^\:\=]+)[\:\=](\d+)[\:\=](.*)$').match(t3)
if not t4 is None:
t2.append(
dict(
name=t4[3].strip(),
filename=t4[1],
cmd=t4[2],
)
)
except:
pass
# print(t2)
# return [{'name': 'blah', 'filename': 'docker-compose.yml', 'cmd': '23'}]
return t2
class EditorConfigModeline:
_instance: ClassVar[Optional['EditorConfigModeline']] = None
def __init__(self) -> None:
self.configs: dict[
pathlib.Path,
dict[str, str],
] = dict()
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def load_config(self) -> Optional[dict[str, str]]:
cwd = pathlib.Path.cwd()
if not cwd in self.configs:
config_path = cwd / '.editorconfig'
if not config_path.exists():
return None
parser = configparser.ConfigParser()
parser.optionxform = str # keep case
parser.read(str(config_path))
config: dict[str, str] = dict()
for section in parser.sections():
logger.info(dict(section=section))
if len(section) > 0:
# pattern = section[1:-1]
pattern = section
if not parser[section].get('vim_modeline') is None:
config[pattern] = parser[section].get('vim_modeline')
self.validate_modeline(config[pattern])
self.configs[cwd] = config
return self.configs[cwd]
@classmethod
def validate_modeline(cls, modeline: str) -> None:
pattern = re.compile(r'^set(\s+(noet|sts|ts|et|ai|ci|noai|noci|sw)(\=\w)?)+$')
assert pattern.match(modeline), 'invalid modeline %s' % modeline
@classmethod
def find_entry(
cls,
file_path: pathlib.Path,
config: Optional[dict[str, str]] = None,
) -> Optional[str]:
if config is None:
return None
project_root = pathlib.Path.cwd()
if file_path.is_relative_to(project_root):
rel_path = file_path.relative_to(pathlib.Path.cwd())
else:
rel_path = file_path
for pattern, modeline in config.items():
if fnmatch.fnmatch(str(rel_path), pattern):
return modeline
return None
def on_buffer(self) -> None:
config = self.load_config()
logger.info(dict(config=config))
buf_name = vim.current.buffer.name
file_path = pathlib.Path(buf_name).resolve()
entry = self.find_entry(file_path, config=config)
logger.info(dict(modeline=entry))
vim.command('silent! {}'.format(entry))
# vim.command("echo '{}'".format('applied %s' % entry))
# raise NotImplementedError

@ -1,230 +1,4 @@
py3 << EOF
from typing import (Optional, ClassVar, Self,)
import configparser
import re
import pathlib
import logging
import fnmatch
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.WARNING)
def f1():
t1 = vim.current.window
t2 = t1.width
vim.command('vnew')
t3 = t2 // 3
vim.command('vertical resize %d' % t3)
vim.current.window = t1
def f2():
context = {
k : vim.options['splitright']
for k in ['splitright']
}
try:
current_window = vim.current.window
vim.options['splitright'] = True
vim.command('vnew')
vim.command('r! tmux show-buffer')
vim.current.window = current_window
finally:
for k, v in context.items():
vim.options[k] = v
def f5_1(pattern, flags, info):
import subprocess
import io
import re
import tempfile
import traceback
import logging
#print([pattern, flags, info])
completed_process = None
options = dict(
recursive=False,
ext=[],
)
#print('fuck')
if b'r' in flags:
while True:
ext_m = re.compile(r'^.([^\,]+),(.*)$').match(pattern)
if pattern[:3] in [r'\r,']:
options['recursive'] = True
pattern = pattern[3:]
elif not ext_m is None:
options['ext'].append(
ext_m[1]
)
pattern = ext_m[2]
else:
break
print([flags, pattern, options,])
try:
git_cmd = [
'git', 'grep',
'-n',
]
if options['recursive']:
git_cmd.append('--recurse-submodules')
git_cmd.extend(['-P', pattern])
if len(options['ext']) > 0:
git_cmd.extend(['--', *[
'**/*%s' % o
for o in options['ext']
]])
completed_process = subprocess.run(
git_cmd,
capture_output=True,
)
assert (
completed_process.returncode == 0 or
(
completed_process.stdout == b''
#completed_process.stdout == b'' and
#completed_process.stderr == b''
)
)
t1 = completed_process.stdout
except:
logging.error(''.join([
traceback.format_exc(),
getattr(completed_process, 'stdout', b'').decode('utf-8'),
getattr(completed_process, 'stderr', b'').decode('utf-8'),
]))
t1 = b''
def watch(data):
with tempfile.NamedTemporaryFile(suffix='.txt') as f:
with io.open(f.name, 'wb') as f2:
f2.write(data)
vim.command('!less %s' % f.name)
#watch(t1)
t2 = []
for o in t1.splitlines():
try:
#watch(o.encode('utf-8'))
t3 = o.decode('utf-8')
t4 = re.compile(r'^([^\:\=]+)[\:\=](\d+)[\:\=](.*)$').match(t3)
if not t4 is None:
t2.append(
dict(
name=t4[3].strip(),
filename=t4[1],
cmd=t4[2],
)
)
except:
pass
#print(t2)
#return [{'name': 'blah', 'filename': 'docker-compose.yml', 'cmd': '23'}]
return t2
class EditorConfigModeline:
_instance : ClassVar[Optional['EditorConfigModeline']] = None
def __init__(self) -> None:
self.configs : dict[
pathlib.Path,
dict[str, str],
] = dict()
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def load_config(self) -> Optional[dict[str, str]]:
cwd = pathlib.Path.cwd()
if not cwd in self.configs:
config_path = cwd / '.editorconfig'
if not config_path.exists():
return None
parser = configparser.ConfigParser()
parser.optionxform = str # keep case
parser.read(str(config_path))
config : dict[str, str] = dict()
for section in parser.sections():
logger.info(dict(section=section))
if len(section) > 0:
# pattern = section[1:-1]
pattern = section
if not parser[section].get('vim_modeline') is None:
config[pattern] = parser[section].get('vim_modeline')
self.validate_modeline(config[pattern])
self.configs[cwd] = config
return self.configs[cwd]
@classmethod
def validate_modeline(cls, modeline: str) -> None:
pattern = re.compile(r'^set(\s+(noet|sts|ts|et|ai|ci|noai|noci|sw)(\=\w)?)+$')
assert pattern.match(modeline), 'invalid modeline %s' % modeline
@classmethod
def find_entry(
cls,
file_path: pathlib.Path,
config: Optional[dict[str, str]] = None,
) -> Optional[str]:
if config is None:
return None
project_root = pathlib.Path.cwd()
if file_path.is_relative_to(project_root):
rel_path = file_path.relative_to(pathlib.Path.cwd())
else:
rel_path = file_path
for pattern, modeline in config.items():
if fnmatch.fnmatch(str(rel_path), pattern):
return modeline
return None
def on_buffer(self) -> None:
config = self.load_config()
logger.info(dict(config=config))
buf_name = vim.current.buffer.name
file_path = pathlib.Path(buf_name).resolve()
entry = self.find_entry(file_path, config=config)
logger.info(dict(modeline=entry))
vim.command('silent! {}'.format(entry))
# vim.command("echo '{}'".format('applied %s' % entry))
# raise NotImplementedError
EOF
py3file ~/.module.vimrc.py
augroup EditorConfigModeline
autocmd!

@ -5,7 +5,7 @@ project(
).stdout().strip('\n'),
# 'online.fxreader.uv',
# ['c', 'cpp'],
version: '0.1.5.29',
version: '0.1.5.31',
# default_options: [
# 'cpp_std=c++23',
# # 'prefer_static=true',

@ -28,7 +28,7 @@ def validate_params(view: Callable[..., Awaitable[R]]) -> Callable[..., Awaitabl
def validate_params(view: Callable[..., R]) -> Callable[..., R]: ...
def validate_params(view: Callable[..., Awaitable[R]] | Callable[..., R]) -> Any:
def validate_params(view: Callable[..., Awaitable[R]] | Callable[..., R]) -> Callable[..., Awaitable[R]] | Callable[..., R]:
class Parameter:
kind: Any
annotation: Any
@ -89,13 +89,14 @@ def validate_params(view: Callable[..., Awaitable[R]] | Callable[..., R]) -> Any
if inspect.iscoroutinefunction(view):
async_view = cast(Callable[..., Awaitable[R]], view)
raise NotImplementedError
@functools.wraps(async_view)
async def wrapper(*args: Any, **kwargs: Any) -> R:
async def async_wrapper(*args: Any, **kwargs: Any) -> R:
validate_params(*args, **kwargs)
return await async_view(*args, **kwargs)
return async_wrapper
else:
sync_view = cast(Callable[..., R], view)

@ -78,9 +78,10 @@ include = [
# 'follow_the_leader/**/*.py',
#'*.py',
# '*.recipe',
'*.py',
'./*.py',
'online/**/*.py',
'online/**/*.pyi',
'../dotfiles/.module.vimrc.py',
]
exclude = [
'.venv',

BIN
releases/whl/online_fxreader_pr34-0.1.5.30-py3-none-any.whl (Stored with Git LFS) Normal file

Binary file not shown.

BIN
releases/whl/online_fxreader_pr34-0.1.5.31-py3-none-any.whl (Stored with Git LFS) Normal file

Binary file not shown.