freelance-project-34-market.../dotfiles/.vim/online_fxreader_pr34_vim/beta.py
Siarhei Siniak ecab00f4aa [+] update vim modules
1. use python modules
    for separate logic;
  2. update popup_menu
    parameters for fast select;
2025-10-14 18:13:20 +03:00

473 lines
9.9 KiB
Python

import functools
import configparser
import datetime
import collections
import asyncio
import threading
import re
import inspect
import pathlib
import logging
import fnmatch
import vim
from typing import (
Optional,
ClassVar,
Self,
Any,
Callable,
)
from .utils import Vim
logger = logging.getLogger(__name__)
MODULE_NAME = 'online_fxreader_pr34_vim'
def f1():
t1 = vim.current.window
t2 = t1.width
vim.command('vnew')
t3 = t2 // 3
vim.command('vertical resize %d' % t3)
vim.current.window = t1
def f2():
context = {k: vim.options['splitright'] for k in ['splitright']}
try:
current_window = vim.current.window
vim.options['splitright'] = True
vim.command('vnew')
vim.command('r! tmux show-buffer')
vim.current.window = current_window
finally:
for k, v in context.items():
vim.options[k] = v
def f5_1(pattern, flags, info):
import subprocess
import io
import re
import tempfile
import traceback
import logging
# print([pattern, flags, info])
completed_process = None
options = dict(
recursive=False,
ext=[],
)
# print('fuck')
if b'r' in flags:
while True:
ext_m = re.compile(r'^.([^\,]+),(.*)$').match(pattern)
if pattern[:3] in [r'\r,']:
options['recursive'] = True
pattern = pattern[3:]
elif not ext_m is None:
options['ext'].append(ext_m[1])
pattern = ext_m[2]
else:
break
print(
[
flags,
pattern,
options,
]
)
try:
git_cmd = [
'git',
'grep',
'-n',
]
if options['recursive']:
git_cmd.append('--recurse-submodules')
git_cmd.extend(['-P', pattern])
if len(options['ext']) > 0:
git_cmd.extend(['--', *['**/*%s' % o for o in options['ext']]])
completed_process = subprocess.run(
git_cmd,
capture_output=True,
)
assert completed_process.returncode == 0 or (
completed_process.stdout == b''
# completed_process.stdout == b'' and
# completed_process.stderr == b''
)
t1 = completed_process.stdout
except:
logging.error(
''.join(
[
traceback.format_exc(),
getattr(completed_process, 'stdout', b'').decode('utf-8'),
getattr(completed_process, 'stderr', b'').decode('utf-8'),
]
)
)
t1 = b''
def watch(data):
with tempfile.NamedTemporaryFile(suffix='.txt') as f:
with io.open(f.name, 'wb') as f2:
f2.write(data)
vim.command('!less %s' % f.name)
# watch(t1)
t2 = []
for o in t1.splitlines():
try:
# watch(o.encode('utf-8'))
t3 = o.decode('utf-8')
t4 = re.compile(r'^([^\:\=]+)[\:\=](\d+)[\:\=](.*)$').match(t3)
if not t4 is None:
t2.append(
dict(
name=t4[3].strip(),
filename=t4[1],
cmd=t4[2],
)
)
except:
pass
# print(t2)
# return [{'name': 'blah', 'filename': 'docker-compose.yml', 'cmd': '23'}]
return t2
class EditorConfigModeline:
_instance: ClassVar[Optional['EditorConfigModeline']] = None
def __init__(self) -> None:
self.configs: dict[
pathlib.Path,
dict[str, str],
] = dict()
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def load_config(self) -> Optional[dict[str, str]]:
cwd = pathlib.Path.cwd()
if not cwd in self.configs:
config_path = cwd / '.editorconfig'
if not config_path.exists():
return None
parser = configparser.ConfigParser()
parser.optionxform = str # keep case
parser.read(str(config_path))
config: dict[str, str] = dict()
for section in parser.sections():
logger.info(dict(section=section))
if len(section) > 0:
# pattern = section[1:-1]
pattern = section
if not parser[section].get('vim_modeline') is None:
config[pattern] = parser[section].get('vim_modeline')
self.validate_modeline(config[pattern])
self.configs[cwd] = config
return self.configs[cwd]
@classmethod
def validate_modeline(cls, modeline: str) -> None:
pattern = re.compile(r'^set(\s+(noet|sts|ts|et|ai|ci|noai|noci|sw)(\=\w)?)+$')
assert pattern.match(modeline), 'invalid modeline %s' % modeline
@classmethod
def find_entry(
cls,
file_path: pathlib.Path,
config: Optional[dict[str, str]] = None,
) -> Optional[str]:
if config is None:
return None
project_root = pathlib.Path.cwd()
if file_path.is_relative_to(project_root):
rel_path = file_path.relative_to(pathlib.Path.cwd())
else:
rel_path = file_path
for pattern, modeline in config.items():
if fnmatch.fnmatch(str(rel_path), pattern):
return modeline
return None
def on_buffer(self) -> None:
config = self.load_config()
logger.info(dict(config=config))
buf_name = vim.current.buffer.name
file_path = pathlib.Path(buf_name).resolve()
entry = self.find_entry(file_path, config=config)
logger.info(dict(modeline=entry))
vim.command('silent! {}'.format(entry))
# vim.command("echo '{}'".format('applied %s' % entry))
# raise NotImplementedError
def future_dump_exception(future: Any) -> None:
try:
future.result()
except:
logger.exception('')
class FastSelect:
_instance: ClassVar[Optional['FastSelect']] = None
def __init__(self) -> None:
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(
target=self.loop.run_forever,
)
self._queue : collections.deque[Callable[[], None]] = collections.deque()
self._lock = threading.Lock()
self.thread.start()
self._option_id : asyncio.Future[Optional[int]] = None
self._options: list[str] = None
auto_group = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'close',
).capitalize()
vim.command(r'''
func! UIThread(timer_id)
python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().ui_thread()
endfunc
''')
Vim.run_command(r'''
call timer_start(100, 'UIThread', {'repeat': -1})
''')
Vim.run_command(r'''
augroup {auto_group}
autocmd!
autocmd VimLeavePre * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().close()
augroup END
'''.format(
auto_group=auto_group,
))
def __del__(self) -> None:
self.close()
def close(self) -> None:
logger.info(dict(msg='close started'))
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
logger.info(dict(msg='close done'))
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def pick_option_put_id(self, option_id: int) -> None:
self.loop.call_soon_threadsafe(
lambda: self._option_id.set_result(option_id)
)
async def _switch_buffer(self) -> None:
buffers_future : asyncio.Future[list[tuple[str, int]]] = asyncio.Future()
def get_buffers() -> list[tuple[str, int]]:
res = [
(o.name, o.number)
for o in vim.buffers
]
self.loop.call_soon_threadsafe(
lambda: buffers_future.set_result(res)
)
with self._lock:
self._queue.append(get_buffers)
buffers = await buffers_future
logger.info(dict(buffers=buffers[:3]))
selected_id = await self._pick_option_from_popup(
[o[0] for o in buffers]
)
logger.info(dict(selected_id=selected_id))
def ui_switch_buffer():
nonlocal selected_id
nonlocal buffers
logger.warning(dict(buffers=list(vim.buffers), id=selected_id))
# print(vim.buffers, selected_id)
if not selected_id is None:
vim.current.buffer = vim.buffers[buffers[selected_id][1]]
with self._lock:
self._queue.append(ui_switch_buffer)
def switch_buffer(self) -> None:
logger.info(dict(msg='before switch_buffer started'))
result = asyncio.run_coroutine_threadsafe(
self._switch_buffer(),
self.loop
)
result.add_done_callback(future_dump_exception)
logger.info(dict(msg='after switch_buffer started'))
async def _pick_option_from_popup(
self,
options: list[str],
) -> Optional[int]:
logger.info(dict(msg='started'))
self._options = options
self._option_id = asyncio.Future[int]()
await self._pick_option_start_popup()
option_id = await self._option_id
logger.info(dict(option_id=option_id))
self._options = None
self._option_id = None
logger.info(dict(msg='done'))
if option_id >= 0:
return option_id
else:
return None
def ui_thread(self):
with self._lock:
#Vim.run_command(r'''
# set laststatus=2
# set statusline={}
#'''.format(datetime.datetime.now().isoformat()))
while len(self._queue) > 0:
cmd = self._queue.pop();
logger.warning(dict(msg='start command', cmd=inspect.getsource(cmd)))
try:
cmd()
except:
logger.exception('')
# self._result.append(
# vim.command(cmd)
# )
async def _pick_option_start_popup(
self,
):
callback_name = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'popup_callback',
).capitalize()
if int(vim.eval('exists("{}")'.format(callback_name))) == 1:
logger.warning(dict(msg='callback already defined, %s' % callback_name))
vim.command(r"""
function! {callback_name}(id, result)
if a:result > 0
call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(' . (a:result - 1). ')')
else
call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(-1)')
endif
endfunction
""".format(
callback_name=callback_name,
))
logger.info(dict(msg='before popup'))
popup_menu = vim.Function('popup_menu')
with self._lock:
self._queue.append(
lambda : popup_menu(
self._options,
{
'title': 'Select a file',
'callback': callback_name,
'wrap': 1,
'maxwidth': 80,
'close': 'button',
'resize': 1,
'drag': 1,
'maxheight': '16',
}
)
#lambda : vim.command(
# "call popup_menu({options}, {'title': '{title}', 'callback': '{callback}'})".replace(
# '{options}', '[%s]' % ','.join([
# '\'%s\'' % o.replace('\'', '\\\'')
# for o in self._options
# ]),
# ).replace(
# '{title}', 'Select a file',
# ).replace(
# '{callback}',
# callback_name
# )
#)
)
# logger.info(dict(popup_id=popup_id))
# logger.info(dict(msg='after popup'))
def init():
EditorConfigModeline.singleton()