510 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			510 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import functools
 | |
| import configparser
 | |
| import subprocess
 | |
| import dataclasses
 | |
| import json
 | |
| import datetime
 | |
| import collections
 | |
| import asyncio
 | |
| import threading
 | |
| import re
 | |
| import inspect
 | |
| import pathlib
 | |
| import logging
 | |
| import fnmatch
 | |
| import vim
 | |
| 
 | |
| from typing import (
 | |
| 	Optional,
 | |
| 	ClassVar,
 | |
| 	Self,
 | |
| 	Any,
 | |
| 	Callable,
 | |
| )
 | |
| 
 | |
| from .utils import Vim
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| MODULE_NAME = 'online_fxreader_pr34_vim'
 | |
| 
 | |
| 
 | |
| def future_dump_exception(future: Any) -> None:
 | |
| 	try:
 | |
| 		future.result()
 | |
| 	except:
 | |
| 		logger.exception('')
 | |
| 
 | |
| 
 | |
| class FastSelect:
 | |
| 	_instance: ClassVar[Optional['FastSelect']] = None
 | |
| 
 | |
| 	def __init__(self) -> None:
 | |
| 		self.loop = asyncio.new_event_loop()
 | |
| 
 | |
| 		self.thread = threading.Thread(
 | |
| 			target=self.loop.run_forever,
 | |
| 		)
 | |
| 
 | |
| 		self._buffer_frequency: dict[int, int] = dict()
 | |
| 		self._buffer_last_used: dict[int, int] = dict()
 | |
| 
 | |
| 		self._filter_pattern: Optional[str] = None
 | |
| 		self._include_git : Optional[bool] = False
 | |
| 		self._filtered_ids: Optional[set[int]] = None
 | |
| 
 | |
| 		self._items: Optional[list['self.entry_t']] = None
 | |
| 		self._buffers: Optional[list['self.entry_t']] = None
 | |
| 		self._tracked_files: Optional[list['self.entry_t']] = None
 | |
| 
 | |
| 		self._queue: collections.deque[Callable[[], None]] = collections.deque()
 | |
| 		self._lock = threading.Lock()
 | |
| 
 | |
| 		self.popup_id: Optional[int] = None
 | |
| 
 | |
| 		self.thread.start()
 | |
| 		self._option_id: asyncio.Future[Optional[int]] = None
 | |
| 		self._options: list[str] = None
 | |
| 
 | |
| 		auto_group = '{}_{}_{}'.format(
 | |
| 			MODULE_NAME,
 | |
| 			type(self).__name__.lower(),
 | |
| 			'close',
 | |
| 		).capitalize()
 | |
| 
 | |
| 		vim.command(r"""
 | |
| 			func! UIThread(timer_id)
 | |
| 				python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().ui_thread()
 | |
| 			endfunc
 | |
| 		""")
 | |
| 		Vim.run_command(r"""
 | |
| 			call timer_start(100, 'UIThread', {'repeat': -1})
 | |
| 		""")
 | |
| 		Vim.run_command(
 | |
| 			r"""
 | |
| augroup {auto_group}
 | |
|   autocmd!
 | |
|   autocmd VimLeavePre * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().close()
 | |
|   autocmd BufEnter * python3 online_fxreader_pr34_vim.beta.FastSelect.singleton().on_buf_enter()
 | |
| augroup END
 | |
| 		""".format(
 | |
| 				auto_group=auto_group,
 | |
| 			)
 | |
| 		)
 | |
| 
 | |
| 	def __del__(self) -> None:
 | |
| 		self.close()
 | |
| 
 | |
| 	def close(self) -> None:
 | |
| 		logger.info(dict(msg='close started'))
 | |
| 		self.loop.call_soon_threadsafe(self.loop.stop)
 | |
| 		self.thread.join()
 | |
| 
 | |
| 		logger.info(dict(msg='close done'))
 | |
| 
 | |
| 	@classmethod
 | |
| 	def singleton(cls) -> Self:
 | |
| 		if cls._instance is None:
 | |
| 			cls._instance = cls()
 | |
| 
 | |
| 		return cls._instance
 | |
| 
 | |
| 	def pick_option_put_id(self, option_id: int) -> None:
 | |
| 		self.loop.call_soon_threadsafe(lambda: self._option_id.set_result(option_id))
 | |
| 
 | |
| 	@dataclasses.dataclass
 | |
| 	class entry_t:
 | |
| 		path: pathlib.Path
 | |
| 		buf_number: Optional[int] = None
 | |
| 
 | |
| 	def _get_buffers(
 | |
| 		self,
 | |
| 		res_future: Optional[asyncio.Future[
 | |
| 			list[entry_t]
 | |
| 		]] = None,
 | |
| 	) -> list[entry_t]:
 | |
| 		res = [
 | |
| 			self.entry_t(
 | |
| 				buf_number=o.number,
 | |
| 				path=pathlib.Path(o.name).absolute(),
 | |
| 			)
 | |
| 			for o in vim.buffers
 | |
| 		]
 | |
| 
 | |
| 		if res_future:
 | |
| 			self.loop.call_soon_threadsafe(lambda: res_future.set_result(res))
 | |
| 
 | |
| 		return res
 | |
| 
 | |
| 	async def _switch_buffer(self) -> None:
 | |
| 		with self._lock:
 | |
| 			self._reset_items()
 | |
| 
 | |
| 		await self._sync_task(self._update_items)
 | |
| 
 | |
| 		# self._items = buffers
 | |
| 
 | |
| 		with self._lock:
 | |
| 			self._set_filter_pattern('')
 | |
| 
 | |
| 		selected_id = await self._pick_option_from_popup(
 | |
| 			# [o[0] for o in buffers]
 | |
| 		)
 | |
| 
 | |
| 		logger.info(dict(selected_id=selected_id))
 | |
| 
 | |
| 		def ui_switch_buffer():
 | |
| 			nonlocal selected_id
 | |
| 			# nonlocal buffers
 | |
| 
 | |
| 			logger.warning(dict(
 | |
| 				buffers=self._items[:3],
 | |
| 				id=selected_id,
 | |
| 			))
 | |
| 
 | |
| 			# print(vim.buffers, selected_id)
 | |
| 
 | |
| 			if not selected_id is None:
 | |
| 				selected_item = self._items[selected_id]
 | |
| 				if selected_item.buf_number is None:
 | |
| 					Vim.run_command('badd %s' % json.dumps(str(selected_item.path))[1:-1])
 | |
| 					Vim.run_command('e %s' % json.dumps(str(selected_item.path))[1:-1])
 | |
| 				else:
 | |
| 					vim.current.buffer = vim.buffers[selected_item.buf_number]
 | |
| 
 | |
| 		with self._lock:
 | |
| 			self._queue.append(ui_switch_buffer)
 | |
| 
 | |
| 	def switch_buffer(self) -> None:
 | |
| 		logger.info(dict(msg='before switch_buffer started'))
 | |
| 		result = asyncio.run_coroutine_threadsafe(self._switch_buffer(), self.loop)
 | |
| 
 | |
| 		result.add_done_callback(future_dump_exception)
 | |
| 
 | |
| 		logger.info(dict(msg='after switch_buffer started'))
 | |
| 
 | |
| 	async def _pick_option_from_popup(
 | |
| 		self,
 | |
| 		# options: list[str],
 | |
| 	) -> Optional[int]:
 | |
| 		logger.info(dict(msg='started'))
 | |
| 
 | |
| 		self._filter_pattern = ''
 | |
| 		self._popup_id = None
 | |
| 
 | |
| 		# self._options = options
 | |
| 
 | |
| 		self._option_id = asyncio.Future[int]()
 | |
| 
 | |
| 		await self._pick_option_start_popup()
 | |
| 
 | |
| 		option_id = await self._option_id
 | |
| 
 | |
| 		logger.info(dict(option_id=option_id))
 | |
| 
 | |
| 		self._options = None
 | |
| 		self._option_id = None
 | |
| 
 | |
| 		logger.info(dict(msg='done'))
 | |
| 
 | |
| 		if option_id >= 0:
 | |
| 			return self._filtered_ids[option_id]
 | |
| 		else:
 | |
| 			return None
 | |
| 
 | |
| 	def ui_thread(self):
 | |
| 		with self._lock:
 | |
| 			# Vim.run_command(r'''
 | |
| 			# set laststatus=2
 | |
| 			# set statusline={}
 | |
| 			#'''.format(datetime.datetime.now().isoformat()))
 | |
| 
 | |
| 			while len(self._queue) > 0:
 | |
| 				cmd = self._queue.pop()
 | |
| 
 | |
| 				try:
 | |
| 					cmd_str = inspect.getsource(cmd)
 | |
| 				except:
 | |
| 					cmd_str = str(cmd)
 | |
| 
 | |
| 				try:
 | |
| 					logger.warning(dict(msg='start command', cmd=cmd_str))
 | |
| 
 | |
| 					cmd()
 | |
| 				except:
 | |
| 					logger.exception('')
 | |
| 				# self._result.append(
 | |
| 				# 	vim.command(cmd)
 | |
| 				# )
 | |
| 
 | |
| 	def on_buf_enter(self) -> None:
 | |
| 		result = asyncio.run_coroutine_threadsafe(
 | |
| 			self._on_buf_enter(
 | |
| 				buf_number=vim.current.buffer.number,
 | |
| 				buf_name=pathlib.Path(vim.current.buffer.name),
 | |
| 			),
 | |
| 			self.loop,
 | |
| 		)
 | |
| 
 | |
| 		result.add_done_callback(future_dump_exception)
 | |
| 
 | |
| 	def on_filter_key(self, key: str) -> None:
 | |
| 		logger.info(dict(msg='got key', key=key))
 | |
| 
 | |
| 		if key == bytes([27]):
 | |
| 			logger.info(dict(msg='closing popup'))
 | |
| 
 | |
| 			vim.Function('popup_close')(self._popup_id)
 | |
| 			return 1
 | |
| 
 | |
| 		if key == b'\x80kb':
 | |
| 			logger.info(dict(msg='backspace'))
 | |
| 
 | |
| 			with self._lock:
 | |
| 				self._set_filter_pattern(self._filter_pattern[:-1])
 | |
| 
 | |
| 		# C-g
 | |
| 		elif key == b'\x07':
 | |
| 			with self._lock:
 | |
| 				self._include_git = not self._include_git
 | |
| 				self._update_items()
 | |
| 				self._update_filtered()
 | |
| 				# self._update_popup()
 | |
| 		else:
 | |
| 			try:
 | |
| 				key_str = key.decode('utf-8')
 | |
| 			except:
 | |
| 				return vim.Function('popup_filter_menu')(
 | |
| 					self._popup_id, key
 | |
| 				)
 | |
| 				# return 0
 | |
| 
 | |
| 			if not key_str.isprintable():
 | |
| 				return vim.Function('popup_filter_menu')(
 | |
| 					self._popup_id, key
 | |
| 				)
 | |
| 
 | |
| 				# return 0
 | |
| 			else:
 | |
| 				with self._lock:
 | |
| 					self._set_filter_pattern(self._filter_pattern + key_str)
 | |
| 
 | |
| 		self._update_popup()
 | |
| 
 | |
| 		return 1
 | |
| 
 | |
| 	async def _sync_task(
 | |
| 		self,
 | |
| 		cb: Callable[[], None],
 | |
| 		# future: asyncio.Future[bool]
 | |
| 	) -> None:
 | |
| 		res_future: asyncio.Future[bool] = asyncio.Future()
 | |
| 
 | |
| 		def wrapper():
 | |
| 			res : bool = True
 | |
| 
 | |
| 			try:
 | |
| 				cb()
 | |
| 			except:
 | |
| 				logger.exception('')
 | |
| 				res = False
 | |
| 
 | |
| 			self.loop.call_soon_threadsafe(lambda: res_future.set_result(res))
 | |
| 
 | |
| 		with self._lock:
 | |
| 			self._queue.append(wrapper)
 | |
| 
 | |
| 		return await res_future
 | |
| 
 | |
| 	def _update_items(self) -> None:
 | |
| 		known_files: dict[str, int] = dict()
 | |
| 
 | |
| 		if self._buffers is None:
 | |
| 			self._buffers = self._get_buffers()
 | |
| 
 | |
| 			logger.info(dict(buffers=self._buffers[:3]))
 | |
| 
 | |
| 		if self._include_git:
 | |
| 			if self._tracked_files is None:
 | |
| 				for o in self._buffers:
 | |
| 					assert o.buf_number
 | |
| 
 | |
| 					known_files[str(o.path)] = o.buf_number
 | |
| 
 | |
| 				ls_files_output = [
 | |
| 					o.strip()
 | |
| 					for o in subprocess.check_output(
 | |
| 						['git', 'ls-files']
 | |
| 					).decode('utf-8').splitlines()
 | |
| 				]
 | |
| 
 | |
| 				self._tracked_files = []
 | |
| 
 | |
| 				for o in ls_files_output:
 | |
| 					path = pathlib.Path(
 | |
| 						o,
 | |
| 					).absolute()
 | |
| 
 | |
| 					entry = self.entry_t(
 | |
| 						path=path,
 | |
| 						buf_number=known_files.get(str(path)),
 | |
| 					)
 | |
| 
 | |
| 					if entry.buf_number:
 | |
| 						continue
 | |
| 
 | |
| 					self._tracked_files.append(entry)
 | |
| 
 | |
| 				logger.info(dict(tracked_files=self._tracked_files[:3]))
 | |
| 
 | |
| 			self._items = self._buffers + self._tracked_files
 | |
| 		else:
 | |
| 			self._items = self._buffers
 | |
| 
 | |
| 		self._items = sorted(
 | |
| 				self._items,
 | |
| 				# key=lambda x:  -self._buffer_frequency.get(x[1], 0)
 | |
| 				key=lambda x: -self._buffer_last_used.get(x.buf_number, 0),
 | |
| 		)
 | |
| 
 | |
| 	def _reset_items(self) -> None:
 | |
| 		self._buffers = None
 | |
| 		self._tracked_files = None
 | |
| 		self._items = None
 | |
| 
 | |
| 	def _update_filtered(self) -> None:
 | |
| 		pattern = re.compile(self._filter_pattern)
 | |
| 
 | |
| 		self._filtered_ids = [
 | |
| 		    i for i, o in enumerate(self._items) if not pattern.search(str(o.path)) is None
 | |
| 		]
 | |
| 
 | |
| 		self._options = [str(self._items[o].path) for o in self._filtered_ids]
 | |
| 
 | |
| 	def _set_filter_pattern(self, filter_pattern: str) -> None:
 | |
| 		self._filter_pattern = filter_pattern
 | |
| 
 | |
| 		self._update_filtered()
 | |
| 
 | |
| 	def _update_popup(self) -> None:
 | |
| 		vim.Function('popup_settext')(
 | |
| 			self._popup_id,
 | |
| 			self._options,
 | |
| 		)
 | |
| 		vim.Function('popup_setoptions')(self._popup_id, {'title': 'Select a file, [%s]' % self._filter_pattern})
 | |
| 
 | |
| 	async def _on_buf_enter(
 | |
| 		self,
 | |
| 		buf_number: int,
 | |
| 		buf_name: pathlib.Path,
 | |
| 	) -> None:
 | |
| 		# logger.info(dict(msg='waiting'))
 | |
| 
 | |
| 		with self._lock:
 | |
| 			# buf_number = vim.current.buffer.number
 | |
| 
 | |
| 			if not buf_number in self._buffer_frequency:
 | |
| 				self._buffer_frequency[buf_number] = 0
 | |
| 
 | |
| 			self._buffer_frequency[buf_number] += 1
 | |
| 
 | |
| 			self._buffer_last_used[buf_number] = datetime.datetime.now().timestamp()
 | |
| 
 | |
| 			logger.info(
 | |
| 				dict(
 | |
| 					msg='updated',
 | |
| 					buf_path=str(buf_name),
 | |
| 					frequency=self._buffer_frequency[buf_number],
 | |
| 					buf_number=buf_number,
 | |
| 				)
 | |
| 			)
 | |
| 
 | |
| 	async def _pick_option_start_popup(
 | |
| 		self,
 | |
| 	):
 | |
| 		callback_name = '{}_{}_{}'.format(
 | |
| 			MODULE_NAME,
 | |
| 			type(self).__name__.lower(),
 | |
| 			'popup_callback',
 | |
| 		).capitalize()
 | |
| 
 | |
| 		filter_name = '{}_{}_{}'.format(
 | |
| 			MODULE_NAME,
 | |
| 			type(self).__name__.lower(),
 | |
| 			'popup_filter',
 | |
| 		).capitalize()
 | |
| 
 | |
| 		if int(vim.eval('exists("{}")'.format(callback_name))) == 1:
 | |
| 			logger.warning(dict(msg='callback already defined, %s' % callback_name))
 | |
| 
 | |
| 		vim.command(
 | |
| 			r"""
 | |
| 			function! {callback_name}(id, result)
 | |
| 				if a:result > 0
 | |
| 					call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(' . (a:result  - 1). ')')
 | |
| 				else
 | |
| 					call py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().pick_option_put_id(-1)')
 | |
| 				endif
 | |
| 			endfunction
 | |
| 		""".format(
 | |
| 				callback_name=callback_name,
 | |
| 			)
 | |
| 		)
 | |
| 
 | |
| 		vim.command(
 | |
| 			r"""
 | |
| 			function! {filter_name}(win_id, key)
 | |
| 				return py3eval('online_fxreader_pr34_vim.beta.FastSelect.singleton().on_filter_key(key)', #{key: a:key})
 | |
| 			endfunction
 | |
| 		""".replace(
 | |
| 				'{filter_name}',
 | |
| 				filter_name,
 | |
| 			)
 | |
| 		)
 | |
| 
 | |
| 		logger.info(dict(msg='before popup'))
 | |
| 
 | |
| 		popup_menu = vim.Function('popup_menu')
 | |
| 
 | |
| 		def create_popup():
 | |
| 			self._popup_id = popup_menu(
 | |
| 				self._options,
 | |
| 				{
 | |
| 					'title': 'Select a file',
 | |
| 					'callback': callback_name,
 | |
| 					'filter': filter_name,
 | |
| 					'wrap': 1,
 | |
| 					'maxwidth': 80,
 | |
| 					'close': 'button',
 | |
| 					'resize': 1,
 | |
| 					'drag': 1,
 | |
| 					'maxheight': '16',
 | |
| 				},
 | |
| 			)
 | |
| 
 | |
| 		with self._lock:
 | |
| 			self._queue.append(
 | |
| 				create_popup,
 | |
| 				# lambda : vim.command(
 | |
| 				# "call popup_menu({options}, {'title': '{title}', 'callback': '{callback}'})".replace(
 | |
| 				# '{options}', '[%s]' % ','.join([
 | |
| 				# '\'%s\'' % o.replace('\'', '\\\'')
 | |
| 				# for o in self._options
 | |
| 				# ]),
 | |
| 				# ).replace(
 | |
| 				# '{title}', 'Select a file',
 | |
| 				# ).replace(
 | |
| 				# '{callback}',
 | |
| 				# callback_name
 | |
| 				# )
 | |
| 				# )
 | |
| 			)
 | |
| 
 | |
| 		# logger.info(dict(popup_id=popup_id))
 | |
| 
 | |
| 		# logger.info(dict(msg='after popup'))
 | |
| 
 | |
| 
 | |
| def init():
 | |
| 	FastSelect.singleton()
 |