freelance-project-34-market.../dotfiles/.beta.vimrc.py
Siarhei Siniak 3a76eab761 [+] improve vim py3 modules
1. switch to file logging
    with rotation;
  2. test that at 100ms callback
    execution UI looks fast;
  3. test that <C-p> hotkey
    makes the switcher popu;
  3.1. TODO,
    select some better
    switching UI component;
    should support a list of more buffers
    than fit into the screen;
    should support some real time
    filtering via regex;
2025-10-13 18:39:44 +03:00

499 lines
10 KiB
Python

import functools
import configparser
import datetime
import collections
import asyncio
import threading
import re
import inspect
import pathlib
import logging
import fnmatch
import vim
from typing import (
Optional,
ClassVar,
Self,
Any,
Callable,
)
logger = logging.getLogger(__name__)
MODULE_NAME = 'online_fxreader_pr34_vim'
def f1():
t1 = vim.current.window
t2 = t1.width
vim.command('vnew')
t3 = t2 // 3
vim.command('vertical resize %d' % t3)
vim.current.window = t1
def f2():
context = {k: vim.options['splitright'] for k in ['splitright']}
try:
current_window = vim.current.window
vim.options['splitright'] = True
vim.command('vnew')
vim.command('r! tmux show-buffer')
vim.current.window = current_window
finally:
for k, v in context.items():
vim.options[k] = v
def f5_1(pattern, flags, info):
import subprocess
import io
import re
import tempfile
import traceback
import logging
# print([pattern, flags, info])
completed_process = None
options = dict(
recursive=False,
ext=[],
)
# print('fuck')
if b'r' in flags:
while True:
ext_m = re.compile(r'^.([^\,]+),(.*)$').match(pattern)
if pattern[:3] in [r'\r,']:
options['recursive'] = True
pattern = pattern[3:]
elif not ext_m is None:
options['ext'].append(ext_m[1])
pattern = ext_m[2]
else:
break
print(
[
flags,
pattern,
options,
]
)
try:
git_cmd = [
'git',
'grep',
'-n',
]
if options['recursive']:
git_cmd.append('--recurse-submodules')
git_cmd.extend(['-P', pattern])
if len(options['ext']) > 0:
git_cmd.extend(['--', *['**/*%s' % o for o in options['ext']]])
completed_process = subprocess.run(
git_cmd,
capture_output=True,
)
assert completed_process.returncode == 0 or (
completed_process.stdout == b''
# completed_process.stdout == b'' and
# completed_process.stderr == b''
)
t1 = completed_process.stdout
except:
logging.error(
''.join(
[
traceback.format_exc(),
getattr(completed_process, 'stdout', b'').decode('utf-8'),
getattr(completed_process, 'stderr', b'').decode('utf-8'),
]
)
)
t1 = b''
def watch(data):
with tempfile.NamedTemporaryFile(suffix='.txt') as f:
with io.open(f.name, 'wb') as f2:
f2.write(data)
vim.command('!less %s' % f.name)
# watch(t1)
t2 = []
for o in t1.splitlines():
try:
# watch(o.encode('utf-8'))
t3 = o.decode('utf-8')
t4 = re.compile(r'^([^\:\=]+)[\:\=](\d+)[\:\=](.*)$').match(t3)
if not t4 is None:
t2.append(
dict(
name=t4[3].strip(),
filename=t4[1],
cmd=t4[2],
)
)
except:
pass
# print(t2)
# return [{'name': 'blah', 'filename': 'docker-compose.yml', 'cmd': '23'}]
return t2
class EditorConfigModeline:
_instance: ClassVar[Optional['EditorConfigModeline']] = None
def __init__(self) -> None:
self.configs: dict[
pathlib.Path,
dict[str, str],
] = dict()
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def load_config(self) -> Optional[dict[str, str]]:
cwd = pathlib.Path.cwd()
if not cwd in self.configs:
config_path = cwd / '.editorconfig'
if not config_path.exists():
return None
parser = configparser.ConfigParser()
parser.optionxform = str # keep case
parser.read(str(config_path))
config: dict[str, str] = dict()
for section in parser.sections():
logger.info(dict(section=section))
if len(section) > 0:
# pattern = section[1:-1]
pattern = section
if not parser[section].get('vim_modeline') is None:
config[pattern] = parser[section].get('vim_modeline')
self.validate_modeline(config[pattern])
self.configs[cwd] = config
return self.configs[cwd]
@classmethod
def validate_modeline(cls, modeline: str) -> None:
pattern = re.compile(r'^set(\s+(noet|sts|ts|et|ai|ci|noai|noci|sw)(\=\w)?)+$')
assert pattern.match(modeline), 'invalid modeline %s' % modeline
@classmethod
def find_entry(
cls,
file_path: pathlib.Path,
config: Optional[dict[str, str]] = None,
) -> Optional[str]:
if config is None:
return None
project_root = pathlib.Path.cwd()
if file_path.is_relative_to(project_root):
rel_path = file_path.relative_to(pathlib.Path.cwd())
else:
rel_path = file_path
for pattern, modeline in config.items():
if fnmatch.fnmatch(str(rel_path), pattern):
return modeline
return None
def on_buffer(self) -> None:
config = self.load_config()
logger.info(dict(config=config))
buf_name = vim.current.buffer.name
file_path = pathlib.Path(buf_name).resolve()
entry = self.find_entry(file_path, config=config)
logger.info(dict(modeline=entry))
vim.command('silent! {}'.format(entry))
# vim.command("echo '{}'".format('applied %s' % entry))
# raise NotImplementedError
class _Vim:
@classmethod
def run_command(cls, cmd) -> list[str]:
# logger.info(dict(cmd=cmd))
output: list[str] = []
for line in cmd.splitlines():
if line.strip() == '':
continue
output.append(
vim.command(line)
)
return output
def future_dump_exception(future: Any) -> None:
try:
future.result()
except:
logger.exception('')
class FastSelect:
_instance: ClassVar[Optional['FastSelect']] = None
def __init__(self) -> None:
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(
target=self.loop.run_forever,
)
self._queue : collections.deque[Callable[[], None]] = collections.deque()
self._lock = threading.Lock()
self.thread.start()
self._option_id : asyncio.Future[Optional[int]] = None
self._options: list[str] = None
auto_group = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'close',
).capitalize()
vim.command(r'''
func! UIThread(timer_id)
python3 FastSelect.singleton().ui_thread()
endfunc
''')
_Vim.run_command(r'''
call timer_start(100, 'UIThread', {'repeat': -1})
''')
_Vim.run_command(r'''
augroup {auto_group}
autocmd!
autocmd VimLeavePre * python3 FastSelect.singleton().close()
augroup END
'''.format(
auto_group=auto_group,
))
def __del__(self) -> None:
self.close()
def close(self) -> None:
logger.info(dict(msg='close started'))
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
logger.info(dict(msg='close done'))
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def pick_option_put_id(self, option_id: int) -> None:
self.loop.call_soon_threadsafe(
lambda: self._option_id.set_result(option_id)
)
async def _switch_buffer(self) -> None:
buffers_future : asyncio.Future[list[tuple[str, int]]] = asyncio.Future()
def get_buffers() -> list[tuple[str, int]]:
res = [
(o.name, o.number)
for o in vim.buffers
]
self.loop.call_soon_threadsafe(
lambda: buffers_future.set_result(res)
)
with self._lock:
self._queue.append(get_buffers)
buffers = await buffers_future
logger.info(dict(buffers=buffers[:3]))
selected_id = await self._pick_option_from_popup(
[o[0] for o in buffers]
)
logger.info(dict(selected_id=selected_id))
def ui_switch_buffer():
nonlocal selected_id
nonlocal buffers
logger.warning(dict(buffers=list(vim.buffers), id=selected_id))
# print(vim.buffers, selected_id)
if selected_id:
vim.current.buffer = vim.buffers[buffers[selected_id][1]]
with self._lock:
self._queue.append(ui_switch_buffer)
def switch_buffer(self) -> None:
logger.info(dict(msg='before switch_buffer started'))
result = asyncio.run_coroutine_threadsafe(
self._switch_buffer(),
self.loop
)
result.add_done_callback(future_dump_exception)
logger.info(dict(msg='after switch_buffer started'))
def pick_file_from_popup(
self,
paths: list[pathlib.Path],
) -> None:
_Vim.run_command(r'''
call popup_menu(['asdasdfasdfasdfasdfasdfasdfasdff', 'adfadf'], {'title': '!!!!sdfasdf'})
''')
return
logger.info(dict(msg='before pick started'))
result = asyncio.run_coroutine_threadsafe(
self._pick_option_from_popup(
options=[str(o) for o in paths]
),
self.loop
)
result.add_done_callback(future_dump_exception)
logger.info(dict(msg='after pick started'))
async def _pick_option_from_popup(
self,
options: list[str],
) -> Optional[int]:
logger.info(dict(msg='started'))
self._options = options
self._option_id = asyncio.Future[int]()
await self._pick_option_start_popup()
option_id = await self._option_id
logger.info(dict(option_id=option_id))
self._options = None
self._option_id = None
logger.info(dict(msg='done'))
if option_id >= 0:
return option_id
else:
return None
def ui_thread(self):
with self._lock:
#_Vim.run_command(r'''
# set laststatus=2
# set statusline={}
#'''.format(datetime.datetime.now().isoformat()))
while len(self._queue) > 0:
cmd = self._queue.pop();
logger.warning(dict(msg='start command', cmd=inspect.getsource(cmd)))
try:
cmd()
except:
logger.exception('')
# self._result.append(
# vim.command(cmd)
# )
async def _pick_option_start_popup(
self,
):
callback_name = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'popup_callback',
).capitalize()
if int(vim.eval('exists("{}")'.format(callback_name))) == 1:
logger.warning(dict(msg='callback already defined, %s' % callback_name))
vim.command(r"""
function! {callback_name}(id, result)
if a:result > 0
call py3eval('FastSelect.singleton().pick_option_put_id(' . (a:result - 1). ')')
else
call py3eval('FastSelect.singleton().pick_option_put_id(-1)')
endif
endfunction
""".format(
callback_name=callback_name,
))
logger.info(dict(msg='before popup'))
popup_menu = vim.Function('popup_menu')
with self._lock:
self._queue.append(
lambda : popup_menu(
self._options,
{
'title': 'Select a file',
'callback': callback_name
}
)
#lambda : vim.command(
# "call popup_menu({options}, {'title': '{title}', 'callback': '{callback}'})".replace(
# '{options}', '[%s]' % ','.join([
# '\'%s\'' % o.replace('\'', '\\\'')
# for o in self._options
# ]),
# ).replace(
# '{title}', 'Select a file',
# ).replace(
# '{callback}',
# callback_name
# )
#)
)
# logger.info(dict(popup_id=popup_id))
# logger.info(dict(msg='after popup'))