freelance-project-34-market.../dotfiles/.vim/online_fxreader_pr34_vim/beta.py
Siarhei Siniak 68d1de72ec [+] improve vim, python plugin
1. add files selection
    from git ls-files;
2025-10-18 12:17:53 +03:00

510 lines
11 KiB
Python

import functools
import configparser
import subprocess
import dataclasses
import json
import datetime
import collections
import asyncio
import threading
import re
import inspect
import pathlib
import logging
import fnmatch
import vim
from typing import (
Optional,
ClassVar,
Self,
Any,
Callable,
)
from .utils import Vim
logger = logging.getLogger(__name__)
MODULE_NAME = 'online_fxreader_pr34_vim'
def future_dump_exception(future: Any) -> None:
try:
future.result()
except:
logger.exception('')
class FastSelect:
_instance: ClassVar[Optional['FastSelect']] = None
def __init__(self) -> None:
self.loop = asyncio.new_event_loop()
self.thread = threading.Thread(
target=self.loop.run_forever,
)
self._buffer_frequency: dict[int, int] = dict()
self._buffer_last_used: dict[int, int] = dict()
self._filter_pattern: Optional[str] = None
self._include_git : Optional[bool] = False
self._filtered_ids: Optional[set[int]] = None
self._items: Optional[list['self.entry_t']] = None
self._buffers: Optional[list['self.entry_t']] = None
self._tracked_files: Optional[list['self.entry_t']] = None
self._queue: collections.deque[Callable[[], None]] = collections.deque()
self._lock = threading.Lock()
self.popup_id: Optional[int] = None
self.thread.start()
self._option_id: asyncio.Future[Optional[int]] = None
self._options: list[str] = None
auto_group = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'close',
).capitalize()
vim.command(r"""
func! UIThread(timer_id)
python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().ui_thread()
endfunc
""")
Vim.run_command(r"""
call timer_start(100, 'UIThread', {'repeat': -1})
""")
Vim.run_command(
r"""
augroup {auto_group}
autocmd!
autocmd VimLeavePre * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().close()
autocmd BufEnter * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().on_buf_enter()
augroup END
""".format(
auto_group=auto_group,
)
)
def __del__(self) -> None:
self.close()
def close(self) -> None:
logger.info(dict(msg='close started'))
self.loop.call_soon_threadsafe(self.loop.stop)
self.thread.join()
logger.info(dict(msg='close done'))
@classmethod
def singleton(cls) -> Self:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def pick_option_put_id(self, option_id: int) -> None:
self.loop.call_soon_threadsafe(lambda: self._option_id.set_result(option_id))
@dataclasses.dataclass
class entry_t:
path: pathlib.Path
buf_number: Optional[int] = None
def _get_buffers(
self,
res_future: Optional[asyncio.Future[
list[entry_t]
]] = None,
) -> list[entry_t]:
res = [
self.entry_t(
buf_number=o.number,
path=pathlib.Path(o.name).absolute(),
)
for o in vim.buffers
]
if res_future:
self.loop.call_soon_threadsafe(lambda: res_future.set_result(res))
return res
async def _switch_buffer(self) -> None:
with self._lock:
self._reset_items()
await self._sync_task(self._update_items)
# self._items = buffers
with self._lock:
self._set_filter_pattern('')
selected_id = await self._pick_option_from_popup(
# [o[0] for o in buffers]
)
logger.info(dict(selected_id=selected_id))
def ui_switch_buffer():
nonlocal selected_id
# nonlocal buffers
logger.warning(dict(
buffers=self._items[:3],
id=selected_id,
))
# print(vim.buffers, selected_id)
if not selected_id is None:
selected_item = self._items[selected_id]
if selected_item.buf_number is None:
Vim.run_command('badd %s' % json.dumps(str(selected_item.path))[1:-1])
Vim.run_command('e %s' % json.dumps(str(selected_item.path))[1:-1])
else:
vim.current.buffer = vim.buffers[selected_item.buf_number]
with self._lock:
self._queue.append(ui_switch_buffer)
def switch_buffer(self) -> None:
logger.info(dict(msg='before switch_buffer started'))
result = asyncio.run_coroutine_threadsafe(self._switch_buffer(), self.loop)
result.add_done_callback(future_dump_exception)
logger.info(dict(msg='after switch_buffer started'))
async def _pick_option_from_popup(
self,
# options: list[str],
) -> Optional[int]:
logger.info(dict(msg='started'))
self._filter_pattern = ''
self._popup_id = None
# self._options = options
self._option_id = asyncio.Future[int]()
await self._pick_option_start_popup()
option_id = await self._option_id
logger.info(dict(option_id=option_id))
self._options = None
self._option_id = None
logger.info(dict(msg='done'))
if option_id >= 0:
return self._filtered_ids[option_id]
else:
return None
def ui_thread(self):
with self._lock:
# Vim.run_command(r'''
# set laststatus=2
# set statusline={}
#'''.format(datetime.datetime.now().isoformat()))
while len(self._queue) > 0:
cmd = self._queue.pop()
try:
cmd_str = inspect.getsource(cmd)
except:
cmd_str = str(cmd)
try:
logger.warning(dict(msg='start command', cmd=cmd_str))
cmd()
except:
logger.exception('')
# self._result.append(
# vim.command(cmd)
# )
def on_buf_enter(self) -> None:
result = asyncio.run_coroutine_threadsafe(
self._on_buf_enter(
buf_number=vim.current.buffer.number,
buf_name=pathlib.Path(vim.current.buffer.name),
),
self.loop,
)
result.add_done_callback(future_dump_exception)
def on_filter_key(self, key: str) -> None:
logger.info(dict(msg='got key', key=key))
if key == bytes([27]):
logger.info(dict(msg='closing popup'))
vim.Function('popup_close')(self._popup_id)
return 1
if key == b'\x80kb':
logger.info(dict(msg='backspace'))
with self._lock:
self._set_filter_pattern(self._filter_pattern[:-1])
# C-g
elif key == b'\x07':
with self._lock:
self._include_git = not self._include_git
self._update_items()
self._update_filtered()
# self._update_popup()
else:
try:
key_str = key.decode('utf-8')
except:
return vim.Function('popup_filter_menu')(
self._popup_id, key
)
# return 0
if not key_str.isprintable():
return vim.Function('popup_filter_menu')(
self._popup_id, key
)
# return 0
else:
with self._lock:
self._set_filter_pattern(self._filter_pattern + key_str)
self._update_popup()
return 1
async def _sync_task(
self,
cb: Callable[[], None],
# future: asyncio.Future[bool]
) -> None:
res_future: asyncio.Future[bool] = asyncio.Future()
def wrapper():
res : bool = True
try:
cb()
except:
logger.exception('')
res = False
self.loop.call_soon_threadsafe(lambda: res_future.set_result(res))
with self._lock:
self._queue.append(wrapper)
return await res_future
def _update_items(self) -> None:
known_files: dict[str, int] = dict()
if self._buffers is None:
self._buffers = self._get_buffers()
logger.info(dict(buffers=self._buffers[:3]))
if self._include_git:
if self._tracked_files is None:
for o in self._buffers:
assert o.buf_number
known_files[str(o.path)] = o.buf_number
ls_files_output = [
o.strip()
for o in subprocess.check_output(
['git', 'ls-files']
).decode('utf-8').splitlines()
]
self._tracked_files = []
for o in ls_files_output:
path = pathlib.Path(
o,
).absolute()
entry = self.entry_t(
path=path,
buf_number=known_files.get(str(path)),
)
if entry.buf_number:
continue
self._tracked_files.append(entry)
logger.info(dict(tracked_files=self._tracked_files[:3]))
self._items = self._buffers + self._tracked_files
else:
self._items = self._buffers
self._items = sorted(
self._items,
# key=lambda x: -self._buffer_frequency.get(x[1], 0)
key=lambda x: -self._buffer_last_used.get(x.buf_number, 0),
)
def _reset_items(self) -> None:
self._buffers = None
self._tracked_files = None
self._items = None
def _update_filtered(self) -> None:
pattern = re.compile(self._filter_pattern)
self._filtered_ids = [
i for i, o in enumerate(self._items) if not pattern.search(str(o.path)) is None
]
self._options = [str(self._items[o].path) for o in self._filtered_ids]
def _set_filter_pattern(self, filter_pattern: str) -> None:
self._filter_pattern = filter_pattern
self._update_filtered()
def _update_popup(self) -> None:
vim.Function('popup_settext')(
self._popup_id,
self._options,
)
vim.Function('popup_setoptions')(self._popup_id, {'title': 'Select a file, [%s]' % self._filter_pattern})
async def _on_buf_enter(
self,
buf_number: int,
buf_name: pathlib.Path,
) -> None:
# logger.info(dict(msg='waiting'))
with self._lock:
# buf_number = vim.current.buffer.number
if not buf_number in self._buffer_frequency:
self._buffer_frequency[buf_number] = 0
self._buffer_frequency[buf_number] += 1
self._buffer_last_used[buf_number] = datetime.datetime.now().timestamp()
logger.info(
dict(
msg='updated',
buf_path=str(buf_name),
frequency=self._buffer_frequency[buf_number],
buf_number=buf_number,
)
)
async def _pick_option_start_popup(
self,
):
callback_name = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'popup_callback',
).capitalize()
filter_name = '{}_{}_{}'.format(
MODULE_NAME,
type(self).__name__.lower(),
'popup_filter',
).capitalize()
if int(vim.eval('exists("{}")'.format(callback_name))) == 1:
logger.warning(dict(msg='callback already defined, %s' % callback_name))
vim.command(
r"""
function! {callback_name}(id, result)
if a:result > 0
call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(' . (a:result - 1). ')')
else
call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(-1)')
endif
endfunction
""".format(
callback_name=callback_name,
)
)
vim.command(
r"""
function! {filter_name}(win_id, key)
return py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().on_filter_key(key)', #{key: a:key})
endfunction
""".replace(
'{filter_name}',
filter_name,
)
)
logger.info(dict(msg='before popup'))
popup_menu = vim.Function('popup_menu')
def create_popup():
self._popup_id = popup_menu(
self._options,
{
'title': 'Select a file',
'callback': callback_name,
'filter': filter_name,
'wrap': 1,
'maxwidth': 80,
'close': 'button',
'resize': 1,
'drag': 1,
'maxheight': '16',
},
)
with self._lock:
self._queue.append(
create_popup,
# lambda : vim.command(
# "call popup_menu({options}, {'title': '{title}', 'callback': '{callback}'})".replace(
# '{options}', '[%s]' % ','.join([
# '\'%s\'' % o.replace('\'', '\\\'')
# for o in self._options
# ]),
# ).replace(
# '{title}', 'Select a file',
# ).replace(
# '{callback}',
# callback_name
# )
# )
)
# logger.info(dict(popup_id=popup_id))
# logger.info(dict(msg='after popup'))
def init():
FastSelect.singleton()