import functools import configparser import datetime import collections import asyncio import threading import re import inspect import pathlib import logging import fnmatch import vim from typing import ( Optional, ClassVar, Self, Any, Callable, ) from .utils import Vim logger = logging.getLogger(__name__) MODULE_NAME = 'online_fxreader_pr34_vim' def future_dump_exception(future: Any) -> None: try: future.result() except: logger.exception('') class FastSelect: _instance: ClassVar[Optional['FastSelect']] = None def __init__(self) -> None: self.loop = asyncio.new_event_loop() self.thread = threading.Thread( target=self.loop.run_forever, ) self._buffer_frequency : dict[int, int] = dict() self._queue : collections.deque[Callable[[], None]] = collections.deque() self._lock = threading.Lock() self.thread.start() self._option_id : asyncio.Future[Optional[int]] = None self._options: list[str] = None auto_group = '{}_{}_{}'.format( MODULE_NAME, type(self).__name__.lower(), 'close', ).capitalize() vim.command(r''' func! UIThread(timer_id) python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().ui_thread() endfunc ''') Vim.run_command(r''' call timer_start(100, 'UIThread', {'repeat': -1}) ''') Vim.run_command(r''' augroup {auto_group} autocmd! autocmd VimLeavePre * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().close() autocmd BufEnter * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().on_buf_enter() augroup END '''.format( auto_group=auto_group, )) def __del__(self) -> None: self.close() def close(self) -> None: logger.info(dict(msg='close started')) self.loop.call_soon_threadsafe(self.loop.stop) self.thread.join() logger.info(dict(msg='close done')) @classmethod def singleton(cls) -> Self: if cls._instance is None: cls._instance = cls() return cls._instance def pick_option_put_id(self, option_id: int) -> None: self.loop.call_soon_threadsafe( lambda: self._option_id.set_result(option_id) ) async def _switch_buffer(self) -> None: buffers_future : asyncio.Future[list[tuple[str, int]]] = asyncio.Future() def get_buffers() -> list[tuple[str, int]]: res = [ (o.name, o.number) for o in vim.buffers ] res_sorted = sorted( res, key=lambda x: -self._buffer_frequency.get(x[1], 0) ) self.loop.call_soon_threadsafe( lambda: buffers_future.set_result(res_sorted) ) with self._lock: self._queue.append(get_buffers) buffers = await buffers_future logger.info(dict(buffers=buffers[:3])) selected_id = await self._pick_option_from_popup( [o[0] for o in buffers] ) logger.info(dict(selected_id=selected_id)) def ui_switch_buffer(): nonlocal selected_id nonlocal buffers logger.warning(dict(buffers=list(vim.buffers), id=selected_id)) # print(vim.buffers, selected_id) if not selected_id is None: vim.current.buffer = vim.buffers[buffers[selected_id][1]] with self._lock: self._queue.append(ui_switch_buffer) def switch_buffer(self) -> None: logger.info(dict(msg='before switch_buffer started')) result = asyncio.run_coroutine_threadsafe( self._switch_buffer(), self.loop ) result.add_done_callback(future_dump_exception) logger.info(dict(msg='after switch_buffer started')) async def _pick_option_from_popup( self, options: list[str], ) -> Optional[int]: logger.info(dict(msg='started')) self._options = options self._option_id = asyncio.Future[int]() await self._pick_option_start_popup() option_id = await self._option_id logger.info(dict(option_id=option_id)) self._options = None self._option_id = None logger.info(dict(msg='done')) if option_id >= 0: return option_id else: return None def ui_thread(self): with self._lock: #Vim.run_command(r''' # set laststatus=2 # set statusline={} #'''.format(datetime.datetime.now().isoformat())) while len(self._queue) > 0: cmd = self._queue.pop(); logger.warning(dict(msg='start command', cmd=inspect.getsource(cmd))) try: cmd() except: logger.exception('') # self._result.append( # vim.command(cmd) # ) def on_buf_enter(self) -> None: with self._lock: buf_number = vim.current.buffer.number if not buf_number in self._buffer_frequency: self._buffer_frequency[buf_number] = 0 self._buffer_frequency[buf_number] += 1 logger.info(dict( msg='updated', buf_path=vim.current.buffer.name, frequency=self._buffer_frequency[buf_number], buf_number=buf_number, )) async def _pick_option_start_popup( self, ): callback_name = '{}_{}_{}'.format( MODULE_NAME, type(self).__name__.lower(), 'popup_callback', ).capitalize() if int(vim.eval('exists("{}")'.format(callback_name))) == 1: logger.warning(dict(msg='callback already defined, %s' % callback_name)) vim.command(r""" function! {callback_name}(id, result) if a:result > 0 call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(' . (a:result - 1). ')') else call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(-1)') endif endfunction """.format( callback_name=callback_name, )) logger.info(dict(msg='before popup')) popup_menu = vim.Function('popup_menu') with self._lock: self._queue.append( lambda : popup_menu( self._options, { 'title': 'Select a file', 'callback': callback_name, 'wrap': 1, 'maxwidth': 80, 'close': 'button', 'resize': 1, 'drag': 1, 'maxheight': '16', } ) #lambda : vim.command( # "call popup_menu({options}, {'title': '{title}', 'callback': '{callback}'})".replace( # '{options}', '[%s]' % ','.join([ # '\'%s\'' % o.replace('\'', '\\\'') # for o in self._options # ]), # ).replace( # '{title}', 'Select a file', # ).replace( # '{callback}', # callback_name # ) #) ) # logger.info(dict(popup_id=popup_id)) # logger.info(dict(msg='after popup')) def init(): FastSelect.singleton()