freelance-project-34-market.../dotfiles/.vim/online_fxreader_pr34_vim/beta.py
Siarhei Siniak f59e0bbe6c [+] improve vim, beta
1. add filter callback;
  1.1. partially
    added recording of pressed keys;
  1.2. TODO,
    handle esape to close the window;
    handle to select the entry;
    handle hjkl, arrows
    to change the selection;
    handle pressed keys otherwise, ideally
    after i has been pressed,
    or handle hjkl after escape has been presed;
    to restrict entries to the matching pattern
    visualize current filter, and ins/normal mode
    in the popup title;
2025-10-15 21:48:30 +03:00

335 lines
7.3 KiB
Python

import functools
import configparser
import datetime
import collections
import asyncio
import threading
import re
import inspect
import pathlib
import logging
import fnmatch
import vim
from typing import (
Optional,
ClassVar,
Self,
Any,
Callable,
)
from .utils import Vim
logger = logging.getLogger(__name__)
MODULE_NAME = 'online_fxreader_pr34_vim'
def future_dump_exception(future: Any) -> None:
try:
future.result()
except:
logger.exception('')
class FastSelect:
_instance: ClassVar[Optional['FastSelect']] = None
def __init__(self) -> None:
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(
target=self.loop.run_forever,
)
self._buffer_frequency : dict[int, int] = dict()
self._buffer_last_used : dict[int, int] = dict()
self._filter_pattern : Optional[str] = None
self._queue : collections.deque[Callable[[], None]] = collections.deque()
self._lock = threading.Lock()
self.thread.start()
self._option_id : asyncio.Future[Optional[int]] = None
self._options: list[str] = None
auto_group = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'close',
).capitalize()
vim.command(r'''
func! UIThread(timer_id)
python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().ui_thread()
endfunc
''')
Vim.run_command(r'''
call timer_start(100, 'UIThread', {'repeat': -1})
''')
Vim.run_command(r'''
augroup {auto_group}
autocmd!
autocmd VimLeavePre * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().close()
autocmd BufEnter * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().on_buf_enter()
augroup END
'''.format(
auto_group=auto_group,
))
def __del__(self) -> None:
self.close()
def close(self) -> None:
logger.info(dict(msg='close started'))
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
logger.info(dict(msg='close done'))
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def pick_option_put_id(self, option_id: int) -> None:
self.loop.call_soon_threadsafe(
lambda: self._option_id.set_result(option_id)
)
async def _switch_buffer(self) -> None:
buffers_future : asyncio.Future[list[tuple[str, int]]] = asyncio.Future()
def get_buffers() -> list[tuple[str, int]]:
res = [
(o.name, o.number)
for o in vim.buffers
]
res_sorted = sorted(
res,
# key=lambda x: -self._buffer_frequency.get(x[1], 0)
key=lambda x: -self._buffer_last_used.get(x[1], 0)
)
self.loop.call_soon_threadsafe(
lambda: buffers_future.set_result(res_sorted)
)
with self._lock:
self._queue.append(get_buffers)
buffers = await buffers_future
logger.info(dict(buffers=buffers[:3]))
selected_id = await self._pick_option_from_popup(
[o[0] for o in buffers]
)
logger.info(dict(selected_id=selected_id))
def ui_switch_buffer():
nonlocal selected_id
nonlocal buffers
logger.warning(dict(buffers=list(vim.buffers), id=selected_id))
# print(vim.buffers, selected_id)
if not selected_id is None:
vim.current.buffer = vim.buffers[buffers[selected_id][1]]
with self._lock:
self._queue.append(ui_switch_buffer)
def switch_buffer(self) -> None:
logger.info(dict(msg='before switch_buffer started'))
result = asyncio.run_coroutine_threadsafe(
self._switch_buffer(),
self.loop
)
result.add_done_callback(future_dump_exception)
logger.info(dict(msg='after switch_buffer started'))
async def _pick_option_from_popup(
self,
options: list[str],
) -> Optional[int]:
logger.info(dict(msg='started'))
self._filter_pattern = ''
self._options = options
self._option_id = asyncio.Future[int]()
await self._pick_option_start_popup()
option_id = await self._option_id
logger.info(dict(option_id=option_id))
self._options = None
self._option_id = None
logger.info(dict(msg='done'))
if option_id >= 0:
return option_id
else:
return None
def ui_thread(self):
with self._lock:
#Vim.run_command(r'''
# set laststatus=2
# set statusline={}
#'''.format(datetime.datetime.now().isoformat()))
while len(self._queue) > 0:
cmd = self._queue.pop();
logger.warning(dict(msg='start command', cmd=inspect.getsource(cmd)))
try:
cmd()
except:
logger.exception('')
# self._result.append(
# vim.command(cmd)
# )
def on_buf_enter(self) -> None:
result = asyncio.run_coroutine_threadsafe(
self._on_buf_enter(
buf_number=vim.current.buffer.number,
buf_name=pathlib.Path(vim.current.buffer.name),
),
self.loop
)
result.add_done_callback(future_dump_exception)
def on_filter_key(self, key: str) -> None:
logger.info(dict(msg='got key', key=key))
try:
key_str = key.decode('utf-8')
except:
return 0
if not key_str.isprintable():
return 0
with self._lock:
self._filter_pattern += key_str
return 1
async def _on_buf_enter(
self,
buf_number: int,
buf_name: pathlib.Path,
) -> None:
# logger.info(dict(msg='waiting'))
with self._lock:
# buf_number = vim.current.buffer.number
if not buf_number in self._buffer_frequency:
self._buffer_frequency[buf_number] = 0
self._buffer_frequency[buf_number] += 1
self._buffer_last_used[buf_number] = datetime.datetime.now().timestamp()
logger.info(dict(
msg='updated',
buf_path=str(buf_name),
frequency=self._buffer_frequency[buf_number],
buf_number=buf_number,
))
async def _pick_option_start_popup(
self,
):
callback_name = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'popup_callback',
).capitalize()
filter_name = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'popup_filter',
).capitalize()
if int(vim.eval('exists("{}")'.format(callback_name))) == 1:
logger.warning(dict(msg='callback already defined, %s' % callback_name))
vim.command(r"""
function! {callback_name}(id, result)
if a:result > 0
call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(' . (a:result - 1). ')')
else
call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(-1)')
endif
endfunction
""".format(
callback_name=callback_name,
))
vim.command(r"""
function! {filter_name}(win_id, key)
return py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().on_filter_key(key)', #{key: a:key})
endfunction
""".replace(
'{filter_name}', filter_name,
))
logger.info(dict(msg='before popup'))
popup_menu = vim.Function('popup_menu')
with self._lock:
self._queue.append(
lambda : popup_menu(
self._options,
{
'title': 'Select a file',
'callback': callback_name,
'filter': filter_name,
'wrap': 1,
'maxwidth': 80,
'close': 'button',
'resize': 1,
'drag': 1,
'maxheight': '16',
}
)
#lambda : vim.command(
# "call popup_menu({options}, {'title': '{title}', 'callback': '{callback}'})".replace(
# '{options}', '[%s]' % ','.join([
# '\'%s\'' % o.replace('\'', '\\\'')
# for o in self._options
# ]),
# ).replace(
# '{title}', 'Select a file',
# ).replace(
# '{callback}',
# callback_name
# )
#)
)
# logger.info(dict(popup_id=popup_id))
# logger.info(dict(msg='after popup'))
def init():
FastSelect.singleton()