1. make sure sessions
    uses the directory vim has been opened at;
  2. update git ls-files
    to apply --recurse-submodules
    for choosing a buffer by name;
		
	
			
		
			
				
	
	
		
			515 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			515 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import functools
 | 
						|
import configparser
 | 
						|
import subprocess
 | 
						|
import dataclasses
 | 
						|
import json
 | 
						|
import datetime
 | 
						|
import collections
 | 
						|
import asyncio
 | 
						|
import threading
 | 
						|
import re
 | 
						|
import inspect
 | 
						|
import pathlib
 | 
						|
import logging
 | 
						|
import fnmatch
 | 
						|
import vim
 | 
						|
 | 
						|
from typing import (
 | 
						|
	Optional,
 | 
						|
	ClassVar,
 | 
						|
	Self,
 | 
						|
	Any,
 | 
						|
	Callable,
 | 
						|
)
 | 
						|
 | 
						|
from .utils import Vim
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
MODULE_NAME = 'online_fxreader_pr34_vim'
 | 
						|
 | 
						|
 | 
						|
def future_dump_exception(future: Any) -> None:
 | 
						|
	try:
 | 
						|
		future.result()
 | 
						|
	except:
 | 
						|
		logger.exception('')
 | 
						|
 | 
						|
 | 
						|
class FastSelect:
 | 
						|
	_instance: ClassVar[Optional['FastSelect']] = None
 | 
						|
 | 
						|
	def __init__(self) -> None:
 | 
						|
		self.loop = asyncio.new_event_loop()
 | 
						|
 | 
						|
		self.thread = threading.Thread(
 | 
						|
			target=self.loop.run_forever,
 | 
						|
		)
 | 
						|
 | 
						|
		self._buffer_frequency: dict[int, int] = dict()
 | 
						|
		self._buffer_last_used: dict[int, int] = dict()
 | 
						|
 | 
						|
		self._filter_pattern: Optional[str] = None
 | 
						|
		self._include_git : Optional[bool] = False
 | 
						|
		self._filtered_ids: Optional[set[int]] = None
 | 
						|
 | 
						|
		self._items: Optional[list['self.entry_t']] = None
 | 
						|
		self._buffers: Optional[list['self.entry_t']] = None
 | 
						|
		self._tracked_files: Optional[list['self.entry_t']] = None
 | 
						|
 | 
						|
		self._queue: collections.deque[Callable[[], None]] = collections.deque()
 | 
						|
		self._lock = threading.Lock()
 | 
						|
 | 
						|
		self.popup_id: Optional[int] = None
 | 
						|
 | 
						|
		self.thread.start()
 | 
						|
		self._option_id: asyncio.Future[Optional[int]] = None
 | 
						|
		self._options: list[str] = None
 | 
						|
 | 
						|
		auto_group = '{}_{}_{}'.format(
 | 
						|
			MODULE_NAME,
 | 
						|
			type(self).__name__.lower(),
 | 
						|
			'close',
 | 
						|
		).capitalize()
 | 
						|
 | 
						|
		vim.command(r"""
 | 
						|
			func! UIThread(timer_id)
 | 
						|
				python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().ui_thread()
 | 
						|
			endfunc
 | 
						|
		""")
 | 
						|
		Vim.run_command(r"""
 | 
						|
			call timer_start(100, 'UIThread', {'repeat': -1})
 | 
						|
		""")
 | 
						|
		Vim.run_command(
 | 
						|
			r"""
 | 
						|
augroup {auto_group}
 | 
						|
  autocmd!
 | 
						|
  autocmd VimLeavePre * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().close()
 | 
						|
  autocmd BufEnter * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().on_buf_enter()
 | 
						|
augroup END
 | 
						|
		""".format(
 | 
						|
				auto_group=auto_group,
 | 
						|
			)
 | 
						|
		)
 | 
						|
 | 
						|
	def __del__(self) -> None:
 | 
						|
		self.close()
 | 
						|
 | 
						|
	def close(self) -> None:
 | 
						|
		logger.info(dict(msg='close started'))
 | 
						|
		self.loop.call_soon_threadsafe(self.loop.stop)
 | 
						|
		self.thread.join()
 | 
						|
 | 
						|
		logger.info(dict(msg='close done'))
 | 
						|
 | 
						|
	@classmethod
 | 
						|
	def singleton(cls) -> Self:
 | 
						|
		if cls._instance is None:
 | 
						|
			cls._instance = cls()
 | 
						|
 | 
						|
		return cls._instance
 | 
						|
 | 
						|
	def pick_option_put_id(self, option_id: int) -> None:
 | 
						|
		self.loop.call_soon_threadsafe(lambda: self._option_id.set_result(option_id))
 | 
						|
 | 
						|
	@dataclasses.dataclass
 | 
						|
	class entry_t:
 | 
						|
		path: pathlib.Path
 | 
						|
		buf_number: Optional[int] = None
 | 
						|
 | 
						|
	def _get_buffers(
 | 
						|
		self,
 | 
						|
		res_future: Optional[asyncio.Future[
 | 
						|
			list[entry_t]
 | 
						|
		]] = None,
 | 
						|
	) -> list[entry_t]:
 | 
						|
		res = [
 | 
						|
			self.entry_t(
 | 
						|
				buf_number=o.number,
 | 
						|
				path=pathlib.Path(o.name).absolute(),
 | 
						|
			)
 | 
						|
			for o in vim.buffers
 | 
						|
		]
 | 
						|
 | 
						|
		if res_future:
 | 
						|
			self.loop.call_soon_threadsafe(lambda: res_future.set_result(res))
 | 
						|
 | 
						|
		return res
 | 
						|
 | 
						|
	async def _switch_buffer(self) -> None:
 | 
						|
		with self._lock:
 | 
						|
			self._reset_items()
 | 
						|
 | 
						|
		await self._sync_task(self._update_items)
 | 
						|
 | 
						|
		# self._items = buffers
 | 
						|
 | 
						|
		with self._lock:
 | 
						|
			self._set_filter_pattern('')
 | 
						|
 | 
						|
		selected_id = await self._pick_option_from_popup(
 | 
						|
			# [o[0] for o in buffers]
 | 
						|
		)
 | 
						|
 | 
						|
		logger.info(dict(selected_id=selected_id))
 | 
						|
 | 
						|
		def ui_switch_buffer():
 | 
						|
			nonlocal selected_id
 | 
						|
			# nonlocal buffers
 | 
						|
 | 
						|
			logger.warning(dict(
 | 
						|
				buffers=self._items[:3],
 | 
						|
				id=selected_id,
 | 
						|
			))
 | 
						|
 | 
						|
			# print(vim.buffers, selected_id)
 | 
						|
 | 
						|
			if not selected_id is None:
 | 
						|
				selected_item = self._items[selected_id]
 | 
						|
				if selected_item.buf_number is None:
 | 
						|
					Vim.run_command('badd %s' % json.dumps(str(selected_item.path))[1:-1])
 | 
						|
					Vim.run_command('e %s' % json.dumps(str(selected_item.path))[1:-1])
 | 
						|
				else:
 | 
						|
					vim.current.buffer = vim.buffers[selected_item.buf_number]
 | 
						|
 | 
						|
		with self._lock:
 | 
						|
			self._queue.append(ui_switch_buffer)
 | 
						|
 | 
						|
	def switch_buffer(self) -> None:
 | 
						|
		logger.info(dict(msg='before switch_buffer started'))
 | 
						|
		result = asyncio.run_coroutine_threadsafe(self._switch_buffer(), self.loop)
 | 
						|
 | 
						|
		result.add_done_callback(future_dump_exception)
 | 
						|
 | 
						|
		logger.info(dict(msg='after switch_buffer started'))
 | 
						|
 | 
						|
	async def _pick_option_from_popup(
 | 
						|
		self,
 | 
						|
		# options: list[str],
 | 
						|
	) -> Optional[int]:
 | 
						|
		logger.info(dict(msg='started'))
 | 
						|
 | 
						|
		self._filter_pattern = ''
 | 
						|
		self._popup_id = None
 | 
						|
 | 
						|
		# self._options = options
 | 
						|
 | 
						|
		self._option_id = asyncio.Future[int]()
 | 
						|
 | 
						|
		await self._pick_option_start_popup()
 | 
						|
 | 
						|
		option_id = await self._option_id
 | 
						|
 | 
						|
		logger.info(dict(option_id=option_id))
 | 
						|
 | 
						|
		self._options = None
 | 
						|
		self._option_id = None
 | 
						|
 | 
						|
		logger.info(dict(msg='done'))
 | 
						|
 | 
						|
		if option_id >= 0:
 | 
						|
			return self._filtered_ids[option_id]
 | 
						|
		else:
 | 
						|
			return None
 | 
						|
 | 
						|
	def ui_thread(self):
 | 
						|
		with self._lock:
 | 
						|
			# Vim.run_command(r'''
 | 
						|
			# set laststatus=2
 | 
						|
			# set statusline={}
 | 
						|
			#'''.format(datetime.datetime.now().isoformat()))
 | 
						|
 | 
						|
			while len(self._queue) > 0:
 | 
						|
				cmd = self._queue.pop()
 | 
						|
 | 
						|
				try:
 | 
						|
					cmd_str = inspect.getsource(cmd)
 | 
						|
				except:
 | 
						|
					cmd_str = str(cmd)
 | 
						|
 | 
						|
				try:
 | 
						|
					logger.warning(dict(msg='start command', cmd=cmd_str))
 | 
						|
 | 
						|
					cmd()
 | 
						|
				except:
 | 
						|
					logger.exception('')
 | 
						|
				# self._result.append(
 | 
						|
				# 	vim.command(cmd)
 | 
						|
				# )
 | 
						|
 | 
						|
	def on_buf_enter(self) -> None:
 | 
						|
		result = asyncio.run_coroutine_threadsafe(
 | 
						|
			self._on_buf_enter(
 | 
						|
				buf_number=vim.current.buffer.number,
 | 
						|
				buf_name=pathlib.Path(vim.current.buffer.name),
 | 
						|
			),
 | 
						|
			self.loop,
 | 
						|
		)
 | 
						|
 | 
						|
		result.add_done_callback(future_dump_exception)
 | 
						|
 | 
						|
	def on_filter_key(self, key: str) -> None:
 | 
						|
		logger.info(dict(msg='got key', key=key))
 | 
						|
 | 
						|
		if key == bytes([27]):
 | 
						|
			logger.info(dict(msg='closing popup'))
 | 
						|
 | 
						|
			vim.Function('popup_close')(self._popup_id)
 | 
						|
			return 1
 | 
						|
 | 
						|
		if key == b'\x80kb':
 | 
						|
			logger.info(dict(msg='backspace'))
 | 
						|
 | 
						|
			with self._lock:
 | 
						|
				self._set_filter_pattern(self._filter_pattern[:-1])
 | 
						|
 | 
						|
		# C-g
 | 
						|
		elif key == b'\x07':
 | 
						|
			with self._lock:
 | 
						|
				self._include_git = not self._include_git
 | 
						|
				self._update_items()
 | 
						|
				self._update_filtered()
 | 
						|
				# self._update_popup()
 | 
						|
		else:
 | 
						|
			try:
 | 
						|
				key_str = key.decode('utf-8')
 | 
						|
			except:
 | 
						|
				return vim.Function('popup_filter_menu')(
 | 
						|
					self._popup_id, key
 | 
						|
				)
 | 
						|
				# return 0
 | 
						|
 | 
						|
			if not key_str.isprintable():
 | 
						|
				return vim.Function('popup_filter_menu')(
 | 
						|
					self._popup_id, key
 | 
						|
				)
 | 
						|
 | 
						|
				# return 0
 | 
						|
			else:
 | 
						|
				with self._lock:
 | 
						|
					self._set_filter_pattern(self._filter_pattern + key_str)
 | 
						|
 | 
						|
		self._update_popup()
 | 
						|
 | 
						|
		return 1
 | 
						|
 | 
						|
	async def _sync_task(
 | 
						|
		self,
 | 
						|
		cb: Callable[[], None],
 | 
						|
		# future: asyncio.Future[bool]
 | 
						|
	) -> None:
 | 
						|
		res_future: asyncio.Future[bool] = asyncio.Future()
 | 
						|
 | 
						|
		def wrapper():
 | 
						|
			res : bool = True
 | 
						|
 | 
						|
			try:
 | 
						|
				cb()
 | 
						|
			except:
 | 
						|
				logger.exception('')
 | 
						|
				res = False
 | 
						|
 | 
						|
			self.loop.call_soon_threadsafe(lambda: res_future.set_result(res))
 | 
						|
 | 
						|
		with self._lock:
 | 
						|
			self._queue.append(wrapper)
 | 
						|
 | 
						|
		return await res_future
 | 
						|
 | 
						|
	def _cwd(cls) -> pathlib.Path:
 | 
						|
		return pathlib.Path(
 | 
						|
			vim.Function('getcwd')().decode('utf-8')
 | 
						|
		)
 | 
						|
 | 
						|
	def _update_items(self) -> None:
 | 
						|
		known_files: dict[str, int] = dict()
 | 
						|
 | 
						|
		if self._buffers is None:
 | 
						|
			self._buffers = self._get_buffers()
 | 
						|
 | 
						|
			logger.info(dict(buffers=self._buffers[:3]))
 | 
						|
 | 
						|
		if self._include_git:
 | 
						|
			if self._tracked_files is None:
 | 
						|
				for o in self._buffers:
 | 
						|
					assert o.buf_number
 | 
						|
 | 
						|
					known_files[str(o.path)] = o.buf_number
 | 
						|
 | 
						|
				ls_files_output = [
 | 
						|
					o.strip()
 | 
						|
					for o in subprocess.check_output(
 | 
						|
						['git', 'ls-files', '--recurse-submodules',], cwd=self._cwd(),
 | 
						|
					).decode('utf-8').splitlines()
 | 
						|
				]
 | 
						|
 | 
						|
				self._tracked_files = []
 | 
						|
 | 
						|
				for o in ls_files_output:
 | 
						|
					path = pathlib.Path(
 | 
						|
						o,
 | 
						|
					).absolute()
 | 
						|
 | 
						|
					entry = self.entry_t(
 | 
						|
						path=path,
 | 
						|
						buf_number=known_files.get(str(path)),
 | 
						|
					)
 | 
						|
 | 
						|
					if entry.buf_number:
 | 
						|
						continue
 | 
						|
 | 
						|
					self._tracked_files.append(entry)
 | 
						|
 | 
						|
				logger.info(dict(tracked_files=self._tracked_files[:3]))
 | 
						|
 | 
						|
			self._items = self._buffers + self._tracked_files
 | 
						|
		else:
 | 
						|
			self._items = self._buffers
 | 
						|
 | 
						|
		self._items = sorted(
 | 
						|
				self._items,
 | 
						|
				# key=lambda x:  -self._buffer_frequency.get(x[1], 0)
 | 
						|
				key=lambda x: -self._buffer_last_used.get(x.buf_number, 0),
 | 
						|
		)
 | 
						|
 | 
						|
	def _reset_items(self) -> None:
 | 
						|
		self._buffers = None
 | 
						|
		self._tracked_files = None
 | 
						|
		self._items = None
 | 
						|
 | 
						|
	def _update_filtered(self) -> None:
 | 
						|
		pattern = re.compile(self._filter_pattern)
 | 
						|
 | 
						|
		self._filtered_ids = [
 | 
						|
		    i for i, o in enumerate(self._items) if not pattern.search(str(o.path)) is None
 | 
						|
		]
 | 
						|
 | 
						|
		self._options = [str(self._items[o].path) for o in self._filtered_ids]
 | 
						|
 | 
						|
	def _set_filter_pattern(self, filter_pattern: str) -> None:
 | 
						|
		self._filter_pattern = filter_pattern
 | 
						|
 | 
						|
		self._update_filtered()
 | 
						|
 | 
						|
	def _update_popup(self) -> None:
 | 
						|
		vim.Function('popup_settext')(
 | 
						|
			self._popup_id,
 | 
						|
			self._options,
 | 
						|
		)
 | 
						|
		vim.Function('popup_setoptions')(self._popup_id, {'title': 'Select a file, [%s]' % self._filter_pattern})
 | 
						|
 | 
						|
	async def _on_buf_enter(
 | 
						|
		self,
 | 
						|
		buf_number: int,
 | 
						|
		buf_name: pathlib.Path,
 | 
						|
	) -> None:
 | 
						|
		# logger.info(dict(msg='waiting'))
 | 
						|
 | 
						|
		with self._lock:
 | 
						|
			# buf_number = vim.current.buffer.number
 | 
						|
 | 
						|
			if not buf_number in self._buffer_frequency:
 | 
						|
				self._buffer_frequency[buf_number] = 0
 | 
						|
 | 
						|
			self._buffer_frequency[buf_number] += 1
 | 
						|
 | 
						|
			self._buffer_last_used[buf_number] = datetime.datetime.now().timestamp()
 | 
						|
 | 
						|
			logger.info(
 | 
						|
				dict(
 | 
						|
					msg='updated',
 | 
						|
					buf_path=str(buf_name),
 | 
						|
					frequency=self._buffer_frequency[buf_number],
 | 
						|
					buf_number=buf_number,
 | 
						|
				)
 | 
						|
			)
 | 
						|
 | 
						|
	async def _pick_option_start_popup(
 | 
						|
		self,
 | 
						|
	):
 | 
						|
		callback_name = '{}_{}_{}'.format(
 | 
						|
			MODULE_NAME,
 | 
						|
			type(self).__name__.lower(),
 | 
						|
			'popup_callback',
 | 
						|
		).capitalize()
 | 
						|
 | 
						|
		filter_name = '{}_{}_{}'.format(
 | 
						|
			MODULE_NAME,
 | 
						|
			type(self).__name__.lower(),
 | 
						|
			'popup_filter',
 | 
						|
		).capitalize()
 | 
						|
 | 
						|
		if int(vim.eval('exists("{}")'.format(callback_name))) == 1:
 | 
						|
			logger.warning(dict(msg='callback already defined, %s' % callback_name))
 | 
						|
 | 
						|
		vim.command(
 | 
						|
			r"""
 | 
						|
			function! {callback_name}(id, result)
 | 
						|
				if a:result > 0
 | 
						|
					call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(' . (a:result  - 1). ')')
 | 
						|
				else
 | 
						|
					call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(-1)')
 | 
						|
				endif
 | 
						|
			endfunction
 | 
						|
		""".format(
 | 
						|
				callback_name=callback_name,
 | 
						|
			)
 | 
						|
		)
 | 
						|
 | 
						|
		vim.command(
 | 
						|
			r"""
 | 
						|
			function! {filter_name}(win_id, key)
 | 
						|
				return py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().on_filter_key(key)', #{key: a:key})
 | 
						|
			endfunction
 | 
						|
		""".replace(
 | 
						|
				'{filter_name}',
 | 
						|
				filter_name,
 | 
						|
			)
 | 
						|
		)
 | 
						|
 | 
						|
		logger.info(dict(msg='before popup'))
 | 
						|
 | 
						|
		popup_menu = vim.Function('popup_menu')
 | 
						|
 | 
						|
		def create_popup():
 | 
						|
			self._popup_id = popup_menu(
 | 
						|
				self._options,
 | 
						|
				{
 | 
						|
					'title': 'Select a file',
 | 
						|
					'callback': callback_name,
 | 
						|
					'filter': filter_name,
 | 
						|
					'wrap': 1,
 | 
						|
					'maxwidth': 80,
 | 
						|
					'close': 'button',
 | 
						|
					'resize': 1,
 | 
						|
					'drag': 1,
 | 
						|
					'maxheight': '16',
 | 
						|
				},
 | 
						|
			)
 | 
						|
 | 
						|
		with self._lock:
 | 
						|
			self._queue.append(
 | 
						|
				create_popup,
 | 
						|
				# lambda : vim.command(
 | 
						|
				# "call popup_menu({options}, {'title': '{title}', 'callback': '{callback}'})".replace(
 | 
						|
				# '{options}', '[%s]' % ','.join([
 | 
						|
				# '\'%s\'' % o.replace('\'', '\\\'')
 | 
						|
				# for o in self._options
 | 
						|
				# ]),
 | 
						|
				# ).replace(
 | 
						|
				# '{title}', 'Select a file',
 | 
						|
				# ).replace(
 | 
						|
				# '{callback}',
 | 
						|
				# callback_name
 | 
						|
				# )
 | 
						|
				# )
 | 
						|
			)
 | 
						|
 | 
						|
		# logger.info(dict(popup_id=popup_id))
 | 
						|
 | 
						|
		# logger.info(dict(msg='after popup'))
 | 
						|
 | 
						|
 | 
						|
def init():
 | 
						|
	FastSelect.singleton()
 |