From 68d1de72ecd2dc07568c0ed86308614dcfdfd488 Mon Sep 17 00:00:00 2001 From: Siarhei Siniak Date: Sat, 18 Oct 2025 11:41:01 +0300 Subject: [PATCH] [+] improve vim, python plugin 1. add files selection from git ls-files; --- .../.vim/online_fxreader_pr34_vim/beta.py | 175 +++++++++++++++--- .../.vim/online_fxreader_pr34_vim/utils.py | 4 +- 2 files changed, 153 insertions(+), 26 deletions(-) diff --git a/dotfiles/.vim/online_fxreader_pr34_vim/beta.py b/dotfiles/.vim/online_fxreader_pr34_vim/beta.py index 475d626..3053c4a 100644 --- a/dotfiles/.vim/online_fxreader_pr34_vim/beta.py +++ b/dotfiles/.vim/online_fxreader_pr34_vim/beta.py @@ -1,5 +1,8 @@ import functools import configparser +import subprocess +import dataclasses +import json import datetime import collections import asyncio @@ -47,9 +50,13 @@ class FastSelect: self._buffer_last_used: dict[int, int] = dict() self._filter_pattern: Optional[str] = None - self._items: Optional[list[tuple[str, int]]] = None + self._include_git : Optional[bool] = False self._filtered_ids: Optional[set[int]] = None + self._items: Optional[list['self.entry_t']] = None + self._buffers: Optional[list['self.entry_t']] = None + self._tracked_files: Optional[list['self.entry_t']] = None + self._queue: collections.deque[Callable[[], None]] = collections.deque() self._lock = threading.Lock() @@ -105,28 +112,37 @@ augroup END def pick_option_put_id(self, option_id: int) -> None: self.loop.call_soon_threadsafe(lambda: self._option_id.set_result(option_id)) - async def _switch_buffer(self) -> None: - buffers_future: asyncio.Future[list[tuple[str, int]]] = asyncio.Future() + @dataclasses.dataclass + class entry_t: + path: pathlib.Path + buf_number: Optional[int] = None - def get_buffers() -> list[tuple[str, int]]: - res = [(o.name, o.number) for o in vim.buffers] - - res_sorted = sorted( - res, - # key=lambda x: -self._buffer_frequency.get(x[1], 0) - key=lambda x: -self._buffer_last_used.get(x[1], 0), + def _get_buffers( + self, + res_future: Optional[asyncio.Future[ + list[entry_t] + ]] = None, + ) -> list[entry_t]: + res = [ + self.entry_t( + buf_number=o.number, + path=pathlib.Path(o.name).absolute(), ) + for o in vim.buffers + ] - self.loop.call_soon_threadsafe(lambda: buffers_future.set_result(res_sorted)) + if res_future: + self.loop.call_soon_threadsafe(lambda: res_future.set_result(res)) + return res + + async def _switch_buffer(self) -> None: with self._lock: - self._queue.append(get_buffers) + self._reset_items() - buffers = await buffers_future + await self._sync_task(self._update_items) - logger.info(dict(buffers=buffers[:3])) - - self._items = buffers + # self._items = buffers with self._lock: self._set_filter_pattern('') @@ -139,12 +155,22 @@ augroup END def ui_switch_buffer(): nonlocal selected_id - nonlocal buffers + # nonlocal buffers + + logger.warning(dict( + buffers=self._items[:3], + id=selected_id, + )) - logger.warning(dict(buffers=list(vim.buffers), id=selected_id)) # print(vim.buffers, selected_id) + if not selected_id is None: - vim.current.buffer = vim.buffers[buffers[selected_id][1]] + selected_item = self._items[selected_id] + if selected_item.buf_number is None: + Vim.run_command('badd %s' % json.dumps(str(selected_item.path))[1:-1]) + Vim.run_command('e %s' % json.dumps(str(selected_item.path))[1:-1]) + else: + vim.current.buffer = vim.buffers[selected_item.buf_number] with self._lock: self._queue.append(ui_switch_buffer) @@ -195,8 +221,15 @@ augroup END while len(self._queue) > 0: cmd = self._queue.pop() - logger.warning(dict(msg='start command', cmd=inspect.getsource(cmd))) + try: + cmd_str = inspect.getsource(cmd) + except: + cmd_str = str(cmd) + + try: + logger.warning(dict(msg='start command', cmd=cmd_str)) + cmd() except: logger.exception('') @@ -229,6 +262,14 @@ augroup END with self._lock: self._set_filter_pattern(self._filter_pattern[:-1]) + + # C-g + elif key == b'\x07': + with self._lock: + self._include_git = not self._include_git + self._update_items() + self._update_filtered() + # self._update_popup() else: try: key_str = key.decode('utf-8') @@ -252,14 +293,98 @@ augroup END return 1 + async def _sync_task( + self, + cb: Callable[[], None], + # future: asyncio.Future[bool] + ) -> None: + res_future: asyncio.Future[bool] = asyncio.Future() + + def wrapper(): + res : bool = True + + try: + cb() + except: + logger.exception('') + res = False + + self.loop.call_soon_threadsafe(lambda: res_future.set_result(res)) + + with self._lock: + self._queue.append(wrapper) + + return await res_future + + def _update_items(self) -> None: + known_files: dict[str, int] = dict() + + if self._buffers is None: + self._buffers = self._get_buffers() + + logger.info(dict(buffers=self._buffers[:3])) + + if self._include_git: + if self._tracked_files is None: + for o in self._buffers: + assert o.buf_number + + known_files[str(o.path)] = o.buf_number + + ls_files_output = [ + o.strip() + for o in subprocess.check_output( + ['git', 'ls-files'] + ).decode('utf-8').splitlines() + ] + + self._tracked_files = [] + + for o in ls_files_output: + path = pathlib.Path( + o, + ).absolute() + + entry = self.entry_t( + path=path, + buf_number=known_files.get(str(path)), + ) + + if entry.buf_number: + continue + + self._tracked_files.append(entry) + + logger.info(dict(tracked_files=self._tracked_files[:3])) + + self._items = self._buffers + self._tracked_files + else: + self._items = self._buffers + + self._items = sorted( + self._items, + # key=lambda x: -self._buffer_frequency.get(x[1], 0) + key=lambda x: -self._buffer_last_used.get(x.buf_number, 0), + ) + + def _reset_items(self) -> None: + self._buffers = None + self._tracked_files = None + self._items = None + + def _update_filtered(self) -> None: + pattern = re.compile(self._filter_pattern) + + self._filtered_ids = [ + i for i, o in enumerate(self._items) if not pattern.search(str(o.path)) is None + ] + + self._options = [str(self._items[o].path) for o in self._filtered_ids] + def _set_filter_pattern(self, filter_pattern: str) -> None: self._filter_pattern = filter_pattern - pattern = re.compile(self._filter_pattern) - - self._filtered_ids = [i for i, o in enumerate(self._items) if not pattern.search(o[0]) is None] - - self._options = [self._items[o][0] for o in self._filtered_ids] + self._update_filtered() def _update_popup(self) -> None: vim.Function('popup_settext')( diff --git a/dotfiles/.vim/online_fxreader_pr34_vim/utils.py b/dotfiles/.vim/online_fxreader_pr34_vim/utils.py index ca4a712..02c49f5 100644 --- a/dotfiles/.vim/online_fxreader_pr34_vim/utils.py +++ b/dotfiles/.vim/online_fxreader_pr34_vim/utils.py @@ -1,10 +1,12 @@ import vim +import logging +logger = logging.getLogger(__name__) class Vim: @classmethod def run_command(cls, cmd) -> list[str]: - # logger.info(dict(cmd=cmd)) + logger.info(dict(cmd=cmd)) output: list[str] = [] for line in cmd.splitlines():