freelance-project-34-market.../dotfiles/.vim/online_fxreader_pr34_vim/beta.py
Siarhei Siniak f657d63522 [+] update vim, python3 plugin
1. handle escape character;
  2. handle items filtering
    based on entered letters;
  3. redraw UI when the filter
    changes;
  4. TODO
    handle selection changes via arrows, hjkl;
  5. TODO
    handle selection after Enter has been pressed;
2025-10-16 17:13:50 +03:00

379 lines
8.4 KiB
Python

import functools
import configparser
import datetime
import collections
import asyncio
import threading
import re
import inspect
import pathlib
import logging
import fnmatch
import vim
from typing import (
Optional,
ClassVar,
Self,
Any,
Callable,
)
from .utils import Vim
logger = logging.getLogger(__name__)
MODULE_NAME = 'online_fxreader_pr34_vim'
def future_dump_exception(future: Any) -> None:
try:
future.result()
except:
logger.exception('')
class FastSelect:
_instance: ClassVar[Optional['FastSelect']] = None
def __init__(self) -> None:
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(
target=self.loop.run_forever,
)
self._buffer_frequency: dict[int, int] = dict()
self._buffer_last_used: dict[int, int] = dict()
self._filter_pattern: Optional[str] = None
self._items: Optional[list[tuple[str, int]]] = None
self._filtered_ids: Optional[set[int]] = None
self._queue: collections.deque[Callable[[], None]] = collections.deque()
self._lock = threading.Lock()
self.popup_id: Optional[int] = None
self.thread.start()
self._option_id: asyncio.Future[Optional[int]] = None
self._options: list[str] = None
auto_group = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'close',
).capitalize()
vim.command(r"""
func! UIThread(timer_id)
python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().ui_thread()
endfunc
""")
Vim.run_command(r"""
call timer_start(100, 'UIThread', {'repeat': -1})
""")
Vim.run_command(
r"""
augroup {auto_group}
autocmd!
autocmd VimLeavePre * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().close()
autocmd BufEnter * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().on_buf_enter()
augroup END
""".format(
auto_group=auto_group,
)
)
def __del__(self) -> None:
self.close()
def close(self) -> None:
logger.info(dict(msg='close started'))
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
logger.info(dict(msg='close done'))
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def pick_option_put_id(self, option_id: int) -> None:
self.loop.call_soon_threadsafe(lambda: self._option_id.set_result(option_id))
async def _switch_buffer(self) -> None:
buffers_future: asyncio.Future[list[tuple[str, int]]] = asyncio.Future()
def get_buffers() -> list[tuple[str, int]]:
res = [(o.name, o.number) for o in vim.buffers]
res_sorted = sorted(
res,
# key=lambda x: -self._buffer_frequency.get(x[1], 0)
key=lambda x: -self._buffer_last_used.get(x[1], 0),
)
self.loop.call_soon_threadsafe(lambda: buffers_future.set_result(res_sorted))
with self._lock:
self._queue.append(get_buffers)
buffers = await buffers_future
logger.info(dict(buffers=buffers[:3]))
self._items = buffers
with self._lock:
self._set_filter_pattern('')
selected_id = await self._pick_option_from_popup(
# [o[0] for o in buffers]
)
logger.info(dict(selected_id=selected_id))
def ui_switch_buffer():
nonlocal selected_id
nonlocal buffers
logger.warning(dict(buffers=list(vim.buffers), id=selected_id))
# print(vim.buffers, selected_id)
if not selected_id is None:
vim.current.buffer = vim.buffers[buffers[selected_id][1]]
with self._lock:
self._queue.append(ui_switch_buffer)
def switch_buffer(self) -> None:
logger.info(dict(msg='before switch_buffer started'))
result = asyncio.run_coroutine_threadsafe(self._switch_buffer(), self.loop)
result.add_done_callback(future_dump_exception)
logger.info(dict(msg='after switch_buffer started'))
async def _pick_option_from_popup(
self,
# options: list[str],
) -> Optional[int]:
logger.info(dict(msg='started'))
self._filter_pattern = ''
self._popup_id = None
# self._options = options
self._option_id = asyncio.Future[int]()
await self._pick_option_start_popup()
option_id = await self._option_id
logger.info(dict(option_id=option_id))
self._options = None
self._option_id = None
logger.info(dict(msg='done'))
if option_id >= 0:
return option_id
else:
return None
def ui_thread(self):
with self._lock:
# Vim.run_command(r'''
# set laststatus=2
# set statusline={}
#'''.format(datetime.datetime.now().isoformat()))
while len(self._queue) > 0:
cmd = self._queue.pop()
logger.warning(dict(msg='start command', cmd=inspect.getsource(cmd)))
try:
cmd()
except:
logger.exception('')
# self._result.append(
# vim.command(cmd)
# )
def on_buf_enter(self) -> None:
result = asyncio.run_coroutine_threadsafe(
self._on_buf_enter(
buf_number=vim.current.buffer.number,
buf_name=pathlib.Path(vim.current.buffer.name),
),
self.loop,
)
result.add_done_callback(future_dump_exception)
def on_filter_key(self, key: str) -> None:
logger.info(dict(msg='got key', key=key))
if key == bytes([27]):
logger.info(dict(msg='closing popup'))
vim.Function('popup_close')(self._popup_id)
return 1
if key == b'\x80kb':
logger.info(dict(msg='backspace'))
with self._lock:
self._set_filter_pattern(self._filter_pattern[:-1])
else:
try:
key_str = key.decode('utf-8')
except:
return 0
if not key_str.isprintable():
return 0
else:
with self._lock:
self._set_filter_pattern(self._filter_pattern + key_str)
self._update_popup()
return 1
def _set_filter_pattern(self, filter_pattern: str) -> None:
self._filter_pattern = filter_pattern
pattern = re.compile(self._filter_pattern)
self._filtered_ids = [i for i, o in enumerate(self._items) if not pattern.search(o[0]) is None]
self._options = [self._items[o][0] for o in self._filtered_ids]
def _update_popup(self) -> None:
vim.Function('popup_settext')(
self._popup_id,
self._options,
)
vim.Function('popup_setoptions')(self._popup_id, {'title': 'Select a file, [%s]' % self._filter_pattern})
async def _on_buf_enter(
self,
buf_number: int,
buf_name: pathlib.Path,
) -> None:
# logger.info(dict(msg='waiting'))
with self._lock:
# buf_number = vim.current.buffer.number
if not buf_number in self._buffer_frequency:
self._buffer_frequency[buf_number] = 0
self._buffer_frequency[buf_number] += 1
self._buffer_last_used[buf_number] = datetime.datetime.now().timestamp()
logger.info(
dict(
msg='updated',
buf_path=str(buf_name),
frequency=self._buffer_frequency[buf_number],
buf_number=buf_number,
)
)
async def _pick_option_start_popup(
self,
):
callback_name = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'popup_callback',
).capitalize()
filter_name = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'popup_filter',
).capitalize()
if int(vim.eval('exists("{}")'.format(callback_name))) == 1:
logger.warning(dict(msg='callback already defined, %s' % callback_name))
vim.command(
r"""
function! {callback_name}(id, result)
if a:result > 0
call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(' . (a:result - 1). ')')
else
call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(-1)')
endif
endfunction
""".format(
callback_name=callback_name,
)
)
vim.command(
r"""
function! {filter_name}(win_id, key)
return py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().on_filter_key(key)', #{key: a:key})
endfunction
""".replace(
'{filter_name}',
filter_name,
)
)
logger.info(dict(msg='before popup'))
popup_menu = vim.Function('popup_menu')
def create_popup():
self._popup_id = popup_menu(
self._options,
{
'title': 'Select a file',
'callback': callback_name,
'filter': filter_name,
'wrap': 1,
'maxwidth': 80,
'close': 'button',
'resize': 1,
'drag': 1,
'maxheight': '16',
},
)
with self._lock:
self._queue.append(
create_popup,
# lambda : vim.command(
# "call popup_menu({options}, {'title': '{title}', 'callback': '{callback}'})".replace(
# '{options}', '[%s]' % ','.join([
# '\'%s\'' % o.replace('\'', '\\\'')
# for o in self._options
# ]),
# ).replace(
# '{title}', 'Select a file',
# ).replace(
# '{callback}',
# callback_name
# )
# )
)
# logger.info(dict(popup_id=popup_id))
# logger.info(dict(msg='after popup'))
def init():
FastSelect.singleton()