[~] Refactor
This commit is contained in:
parent
1eecc616f5
commit
40350d128a
23
_m.py
23
_m.py
@ -123,7 +123,24 @@ def ruff(argv: list[str]) -> None:
|
||||
logger.info(json.dumps(errors, indent=4))
|
||||
logger.info(json.dumps(h, indent=4))
|
||||
|
||||
def mypy(args: list[str]) -> None:
|
||||
def mypy(argv: list[str]) -> None:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument(
|
||||
'-i',
|
||||
dest='paths',
|
||||
help='specify paths to check',
|
||||
default=[],
|
||||
action='append',
|
||||
)
|
||||
options, args = parser.parse_known_args(argv)
|
||||
|
||||
if len(options.paths) == 0:
|
||||
options.paths.extend([
|
||||
'dotfiles/.local/bin/commands',
|
||||
'python',
|
||||
'm.py',
|
||||
])
|
||||
|
||||
res = env([
|
||||
'-m',
|
||||
'mypy',
|
||||
@ -131,9 +148,7 @@ def mypy(args: list[str]) -> None:
|
||||
'-O',
|
||||
'json',
|
||||
*args,
|
||||
'dotfiles/.local/bin/commands',
|
||||
'python',
|
||||
'm.py',
|
||||
*options.paths,
|
||||
], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
|
||||
assert not res is None
|
||||
|
@ -23,7 +23,7 @@ import tempfile
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from typing import (Literal, Optional, TypedDict, Callable,)
|
||||
from typing import (Literal, Optional, TypedDict, Callable, Generator, TypeAlias,)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -73,14 +73,30 @@ def custom_notify(
|
||||
msg[-128:]
|
||||
])
|
||||
|
||||
class intercept_output_t:
|
||||
class line_res_t(TypedDict):
|
||||
aggregated: bool
|
||||
line: bytes
|
||||
|
||||
class realtime_res_t(TypedDict):
|
||||
aggregated: bool
|
||||
data: bytes
|
||||
|
||||
class aggregated_res_t(TypedDict):
|
||||
aggregated: bool
|
||||
data: bytes
|
||||
returncode: Optional[int]
|
||||
|
||||
res_t: TypeAlias = line_res_t | realtime_res_t | aggregated_res_t
|
||||
|
||||
def intercept_output(
|
||||
current_subprocess,
|
||||
return_aggregated=None,
|
||||
transform_callback=None,
|
||||
real_time=None,
|
||||
timeout=None,
|
||||
need_lines=None,
|
||||
):
|
||||
current_subprocess: subprocess.Popen[bytes],
|
||||
return_aggregated: Optional[bool]=None,
|
||||
transform_callback: Optional[Callable[[bytes], Optional[bytes]]] =None,
|
||||
real_time: Optional[bool]=None,
|
||||
timeout: Optional[float]=None,
|
||||
need_lines: Optional[bool]=None,
|
||||
) -> Generator[intercept_output_t.res_t]:
|
||||
if real_time is None:
|
||||
real_time = False
|
||||
|
||||
@ -90,11 +106,16 @@ def intercept_output(
|
||||
return_aggregated = False
|
||||
|
||||
t1 = select.poll()
|
||||
|
||||
assert not current_subprocess.stdout is None
|
||||
|
||||
assert isinstance(current_subprocess.stdout, io.BufferedReader)
|
||||
|
||||
t1.register(current_subprocess.stdout, select.POLLIN)
|
||||
#print([current_subprocess, current_subprocess.poll()])
|
||||
output = []
|
||||
buffer = collections.deque()
|
||||
buffer_lines = collections.deque()
|
||||
output: list[bytes] = []
|
||||
buffer: collections.deque[bytes] = collections.deque()
|
||||
buffer_lines: collections.deque[bytes] = collections.deque()
|
||||
|
||||
last_data = None
|
||||
|
||||
@ -164,7 +185,7 @@ def intercept_output(
|
||||
returncode=current_subprocess.poll(),
|
||||
)
|
||||
|
||||
def player_metadata():
|
||||
def player_metadata() -> Optional[str]:
|
||||
for k in range(20):
|
||||
try:
|
||||
metadata = {
|
||||
@ -176,7 +197,14 @@ def player_metadata():
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
def memory_stats():
|
||||
return None
|
||||
|
||||
class memory_stats_t:
|
||||
class res_t(TypedDict):
|
||||
mem_total: int
|
||||
mem_used: int
|
||||
|
||||
def memory_stats() -> memory_stats_t.res_t:
|
||||
if sys.platform == 'linux':
|
||||
with io.BytesIO(
|
||||
subprocess.check_output(
|
||||
|
Loading…
Reference in New Issue
Block a user