[+] add terminal.py adaptive renderer, rename tests.py to test_utils.py, bump pr34 v0.1.5.68
1. add commands_typed/terminal.py with field_t, priority_t, line_formatter_t, renderer_t; 2. adaptive line formatting: fields sorted by priority, low-priority dropped on narrow terminal; 3. render modes: plain (newline), interactive (\r overwrite), multiline (ANSI cursor); 4. terminal_t.width() via os.get_terminal_size with fallback; 5. rename tests.py to test_utils.py to avoid conflict with tests/ package; 6. update commands_typed/cli.py import from tests to test_utils; 7. add commands_typed/tests/test_terminal.py with 19 tests; 8. add [tool.online-fxreader-pr34.tests] config to pyproject.common.toml; 9. bump pr34 to v0.1.5.68;
This commit is contained in:
parent
687df29dfe
commit
b98173511e
@ -220,7 +220,7 @@ class CLI(abc.ABC):
|
||||
) -> None:
|
||||
from . import argparse as pr34_argparse
|
||||
from . import cli_bootstrap
|
||||
from . import tests as _tests
|
||||
from . import test_utils as _tests
|
||||
|
||||
tests_parser = argparse.ArgumentParser()
|
||||
tests_parser.add_argument(
|
||||
|
||||
201
python/online/fxreader/pr34/commands_typed/terminal.py
Normal file
201
python/online/fxreader/pr34/commands_typed/terminal.py
Normal file
@ -0,0 +1,201 @@
|
||||
"""Terminal-aware rendering utilities.
|
||||
|
||||
Supports adaptive single-line (\r), multi-line (ANSI), and plain (newline)
|
||||
output modes. Fields are prioritized and hidden/ellipsized based on
|
||||
terminal width.
|
||||
"""
|
||||
|
||||
import dataclasses
|
||||
import enum
|
||||
import os
|
||||
import sys
|
||||
|
||||
from typing import (
|
||||
Optional,
|
||||
TextIO,
|
||||
)
|
||||
|
||||
|
||||
class priority_t(enum.IntEnum):
|
||||
"""Field display priority. Lower = more important = shown first."""
|
||||
|
||||
critical = 0
|
||||
high = 1
|
||||
normal = 2
|
||||
low = 3
|
||||
|
||||
|
||||
class render_mode_t(enum.Enum):
|
||||
plain = 'plain'
|
||||
interactive = 'interactive'
|
||||
multiline = 'multiline'
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class field_t:
|
||||
"""A named field for adaptive terminal rendering."""
|
||||
|
||||
name: str
|
||||
value: str
|
||||
priority: priority_t = priority_t.normal
|
||||
min_width: int = 0
|
||||
max_width: int = 0
|
||||
ellipsize: bool = True
|
||||
separator: str = ' '
|
||||
|
||||
|
||||
class terminal_t:
|
||||
"""Detect terminal dimensions."""
|
||||
|
||||
@staticmethod
|
||||
def width(stream: Optional[TextIO] = None) -> int:
|
||||
if stream is None:
|
||||
stream = sys.stdout
|
||||
try:
|
||||
columns = os.get_terminal_size(stream.fileno()).columns
|
||||
except (AttributeError, ValueError, OSError):
|
||||
columns = int(os.environ.get('COLUMNS', '80'))
|
||||
return columns
|
||||
|
||||
@staticmethod
|
||||
def is_tty(stream: Optional[TextIO] = None) -> bool:
|
||||
if stream is None:
|
||||
stream = sys.stdout
|
||||
try:
|
||||
return stream.isatty()
|
||||
except (AttributeError, ValueError):
|
||||
return False
|
||||
|
||||
|
||||
class line_formatter_t:
|
||||
"""Render a list of fields into a fixed-width terminal line.
|
||||
|
||||
Fields are sorted by priority. If the line doesn't fit, lower-priority
|
||||
fields are hidden first, then remaining fields are ellipsized if allowed.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def format(
|
||||
fields: list[field_t],
|
||||
width: int,
|
||||
) -> str:
|
||||
if width <= 0 or len(fields) == 0:
|
||||
return ''
|
||||
|
||||
# sort by priority (critical first), preserve insertion order within same priority
|
||||
sorted_fields = sorted(fields, key=lambda f: f.priority)
|
||||
|
||||
# phase 1: try to fit all fields
|
||||
result = line_formatter_t._try_fit(sorted_fields, width)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
# phase 2: drop low-priority fields until it fits
|
||||
for drop_priority in reversed(list(priority_t)):
|
||||
candidates = [f for f in sorted_fields if f.priority <= drop_priority]
|
||||
if len(candidates) < len(sorted_fields):
|
||||
result = line_formatter_t._try_fit(candidates, width)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
# phase 3: ellipsize remaining fields
|
||||
candidates = [f for f in sorted_fields if f.priority == priority_t.critical]
|
||||
if len(candidates) == 0:
|
||||
candidates = sorted_fields[:1]
|
||||
return line_formatter_t._ellipsize_fit(candidates, width)
|
||||
|
||||
@staticmethod
|
||||
def _try_fit(fields: list[field_t], width: int) -> Optional[str]:
|
||||
parts: list[str] = []
|
||||
total = 0
|
||||
|
||||
for i, f in enumerate(fields):
|
||||
text = '%s=%s' % (f.name, f.value) if f.name else f.value
|
||||
if f.max_width > 0 and len(text) > f.max_width:
|
||||
if f.ellipsize:
|
||||
text = text[: f.max_width - 1] + '\u2026'
|
||||
else:
|
||||
text = text[: f.max_width]
|
||||
sep = f.separator if i > 0 else ''
|
||||
total += len(sep) + len(text)
|
||||
parts.append(sep + text)
|
||||
|
||||
if total <= width:
|
||||
return ''.join(parts)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _ellipsize_fit(fields: list[field_t], width: int) -> str:
|
||||
parts: list[str] = []
|
||||
remaining = width
|
||||
|
||||
for i, f in enumerate(fields):
|
||||
text = '%s=%s' % (f.name, f.value) if f.name else f.value
|
||||
sep = f.separator if i > 0 else ''
|
||||
available = remaining - len(sep)
|
||||
if available <= 0:
|
||||
break
|
||||
if len(text) > available:
|
||||
if available > 1 and f.ellipsize:
|
||||
text = text[: available - 1] + '\u2026'
|
||||
else:
|
||||
text = text[:available]
|
||||
remaining -= len(sep) + len(text)
|
||||
parts.append(sep + text)
|
||||
|
||||
return ''.join(parts)
|
||||
|
||||
|
||||
class renderer_t:
|
||||
"""Manages terminal output for progress rendering.
|
||||
|
||||
Supports three modes:
|
||||
- plain: each emit() prints a new line
|
||||
- interactive: each emit() overwrites the current line with \r
|
||||
- multiline: manages N lines using ANSI cursor movement
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mode: render_mode_t = render_mode_t.plain,
|
||||
num_lines: int = 1,
|
||||
stream: Optional[TextIO] = None,
|
||||
) -> None:
|
||||
self.mode = mode
|
||||
self.num_lines = num_lines
|
||||
self.stream = stream or sys.stdout
|
||||
self._lines: list[str] = [''] * num_lines
|
||||
self._first_emit = True
|
||||
|
||||
def emit(self, fields: list[field_t], line_index: int = 0) -> None:
|
||||
width = terminal_t.width(self.stream)
|
||||
text = line_formatter_t.format(fields, width)
|
||||
|
||||
if self.mode is render_mode_t.plain:
|
||||
self.stream.write(text + '\n')
|
||||
self.stream.flush()
|
||||
elif self.mode is render_mode_t.interactive:
|
||||
padded = text.ljust(width - 1)[: width - 1]
|
||||
self.stream.write('\r' + padded)
|
||||
self.stream.flush()
|
||||
elif self.mode is render_mode_t.multiline:
|
||||
self._lines[line_index] = text
|
||||
self._render_multiline(width)
|
||||
|
||||
def _render_multiline(self, width: int) -> None:
|
||||
if not self._first_emit:
|
||||
# move cursor up to first line
|
||||
self.stream.write('\033[%dA' % self.num_lines)
|
||||
self._first_emit = False
|
||||
|
||||
for line in self._lines:
|
||||
padded = line.ljust(width - 1)[: width - 1]
|
||||
self.stream.write('\r' + padded + '\n')
|
||||
self.stream.flush()
|
||||
|
||||
def finish(self) -> None:
|
||||
if self.mode is render_mode_t.interactive:
|
||||
self.stream.write('\n')
|
||||
self.stream.flush()
|
||||
elif self.mode is render_mode_t.multiline:
|
||||
pass # lines already end with \n
|
||||
@ -0,0 +1,162 @@
|
||||
import io
|
||||
import unittest
|
||||
|
||||
from ..terminal import (
|
||||
field_t,
|
||||
line_formatter_t,
|
||||
priority_t,
|
||||
render_mode_t,
|
||||
renderer_t,
|
||||
terminal_t,
|
||||
)
|
||||
|
||||
|
||||
class TestTerminalWidth(unittest.TestCase):
|
||||
def test_returns_int(self) -> None:
|
||||
w = terminal_t.width()
|
||||
self.assertIsInstance(w, int)
|
||||
self.assertGreater(w, 0)
|
||||
|
||||
def test_is_tty(self) -> None:
|
||||
self.assertIsInstance(terminal_t.is_tty(), bool)
|
||||
|
||||
def test_non_tty_stream(self) -> None:
|
||||
buf = io.StringIO()
|
||||
self.assertFalse(terminal_t.is_tty(buf))
|
||||
|
||||
|
||||
class TestLineFormatter(unittest.TestCase):
|
||||
def test_empty(self) -> None:
|
||||
self.assertEqual(line_formatter_t.format([], 80), '')
|
||||
|
||||
def test_single_field(self) -> None:
|
||||
fields = [field_t(name='', value='hello', priority=priority_t.critical)]
|
||||
result = line_formatter_t.format(fields, 80)
|
||||
self.assertEqual(result, 'hello')
|
||||
|
||||
def test_named_field(self) -> None:
|
||||
fields = [field_t(name='pkg', value='42', priority=priority_t.critical)]
|
||||
result = line_formatter_t.format(fields, 80)
|
||||
self.assertEqual(result, 'pkg=42')
|
||||
|
||||
def test_multiple_fields(self) -> None:
|
||||
fields = [
|
||||
field_t(name='', value='[1/10]', priority=priority_t.critical),
|
||||
field_t(name='speed', value='1.5M/s', priority=priority_t.high),
|
||||
field_t(name='ETA', value='2m30s', priority=priority_t.critical),
|
||||
]
|
||||
result = line_formatter_t.format(fields, 80)
|
||||
self.assertIn('[1/10]', result)
|
||||
self.assertIn('speed=1.5M/s', result)
|
||||
self.assertIn('ETA=2m30s', result)
|
||||
|
||||
def test_narrow_drops_low_priority(self) -> None:
|
||||
fields = [
|
||||
field_t(name='', value='[1/10]', priority=priority_t.critical),
|
||||
field_t(name='extra', value='some long detail here', priority=priority_t.low),
|
||||
field_t(name='ETA', value='2m', priority=priority_t.critical),
|
||||
]
|
||||
result = line_formatter_t.format(fields, 25)
|
||||
self.assertIn('[1/10]', result)
|
||||
self.assertIn('ETA=2m', result)
|
||||
self.assertNotIn('extra', result)
|
||||
|
||||
def test_very_narrow_ellipsizes(self) -> None:
|
||||
fields = [
|
||||
field_t(
|
||||
name='',
|
||||
value='very long value that does not fit',
|
||||
priority=priority_t.critical,
|
||||
ellipsize=True,
|
||||
),
|
||||
]
|
||||
result = line_formatter_t.format(fields, 15)
|
||||
self.assertLessEqual(len(result), 15)
|
||||
|
||||
def test_max_width_truncates(self) -> None:
|
||||
fields = [
|
||||
field_t(
|
||||
name='',
|
||||
value='abcdefghijklmnop',
|
||||
priority=priority_t.critical,
|
||||
max_width=8,
|
||||
ellipsize=True,
|
||||
),
|
||||
]
|
||||
result = line_formatter_t.format(fields, 80)
|
||||
self.assertLessEqual(len(result), 8)
|
||||
|
||||
def test_separator(self) -> None:
|
||||
fields = [
|
||||
field_t(name='', value='A', priority=priority_t.critical, separator=' | '),
|
||||
field_t(name='', value='B', priority=priority_t.critical, separator=' | '),
|
||||
]
|
||||
result = line_formatter_t.format(fields, 80)
|
||||
self.assertEqual(result, 'A | B')
|
||||
|
||||
def test_zero_width(self) -> None:
|
||||
fields = [field_t(name='', value='x', priority=priority_t.critical)]
|
||||
self.assertEqual(line_formatter_t.format(fields, 0), '')
|
||||
|
||||
def test_priority_ordering(self) -> None:
|
||||
fields = [
|
||||
field_t(name='lo', value='X', priority=priority_t.low),
|
||||
field_t(name='hi', value='Y', priority=priority_t.high),
|
||||
field_t(name='cr', value='Z', priority=priority_t.critical),
|
||||
]
|
||||
result = line_formatter_t.format(fields, 80)
|
||||
pos_cr = result.index('cr=Z')
|
||||
pos_hi = result.index('hi=Y')
|
||||
pos_lo = result.index('lo=X')
|
||||
self.assertLess(pos_cr, pos_hi)
|
||||
self.assertLess(pos_hi, pos_lo)
|
||||
|
||||
|
||||
class TestRendererPlain(unittest.TestCase):
|
||||
def test_plain_emits_newline(self) -> None:
|
||||
buf = io.StringIO()
|
||||
r = renderer_t(mode=render_mode_t.plain, stream=buf)
|
||||
r.emit([field_t(name='', value='hello', priority=priority_t.critical)])
|
||||
self.assertEqual(buf.getvalue(), 'hello\n')
|
||||
|
||||
def test_plain_multiple(self) -> None:
|
||||
buf = io.StringIO()
|
||||
r = renderer_t(mode=render_mode_t.plain, stream=buf)
|
||||
r.emit([field_t(name='', value='line1', priority=priority_t.critical)])
|
||||
r.emit([field_t(name='', value='line2', priority=priority_t.critical)])
|
||||
self.assertEqual(buf.getvalue(), 'line1\nline2\n')
|
||||
|
||||
|
||||
class TestRendererInteractive(unittest.TestCase):
|
||||
def test_interactive_uses_cr(self) -> None:
|
||||
buf = io.StringIO()
|
||||
r = renderer_t(mode=render_mode_t.interactive, stream=buf)
|
||||
r.emit([field_t(name='', value='progress', priority=priority_t.critical)])
|
||||
output = buf.getvalue()
|
||||
self.assertTrue(output.startswith('\r'))
|
||||
self.assertIn('progress', output)
|
||||
|
||||
def test_finish_adds_newline(self) -> None:
|
||||
buf = io.StringIO()
|
||||
r = renderer_t(mode=render_mode_t.interactive, stream=buf)
|
||||
r.emit([field_t(name='', value='done', priority=priority_t.critical)])
|
||||
r.finish()
|
||||
self.assertTrue(buf.getvalue().endswith('\n'))
|
||||
|
||||
|
||||
class TestRendererMultiline(unittest.TestCase):
|
||||
def test_multiline_emits_lines(self) -> None:
|
||||
buf = io.StringIO()
|
||||
r = renderer_t(mode=render_mode_t.multiline, num_lines=2, stream=buf)
|
||||
r.emit([field_t(name='', value='line0', priority=priority_t.critical)], line_index=0)
|
||||
output = buf.getvalue()
|
||||
self.assertIn('line0', output)
|
||||
|
||||
def test_multiline_update_moves_cursor(self) -> None:
|
||||
buf = io.StringIO()
|
||||
r = renderer_t(mode=render_mode_t.multiline, num_lines=2, stream=buf)
|
||||
r.emit([field_t(name='', value='first', priority=priority_t.critical)], line_index=0)
|
||||
r.emit([field_t(name='', value='second', priority=priority_t.critical)], line_index=1)
|
||||
output = buf.getvalue()
|
||||
# second emit should have ANSI cursor-up
|
||||
self.assertIn('\033[', output)
|
||||
@ -9,7 +9,7 @@ classifiers = [
|
||||
]
|
||||
|
||||
name = 'online.fxreader.pr34'
|
||||
version = '0.1.5.67'
|
||||
version = '0.1.5.68'
|
||||
dynamic = []
|
||||
|
||||
dependencies = [
|
||||
@ -82,6 +82,13 @@ modules = [
|
||||
{ name = 'online.fxreader.pr34', tool = { 'online-fxreader-pr34' = { early_features = ['default', 'early', 'lint'] } } },
|
||||
]
|
||||
|
||||
[tool.online-fxreader-pr34.tests]
|
||||
search_paths = ['.']
|
||||
test_names = [
|
||||
'online.fxreader.pr34.commands_typed.tests',
|
||||
]
|
||||
discovery_paths = ['.']
|
||||
|
||||
[build-system]
|
||||
requires = ["meson-python", "pybind11"]
|
||||
build-backend = "mesonpy"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user