From 207a8737ba20bbf6fbdd234430177533e3703a3c Mon Sep 17 00:00:00 2001 From: Siarhei Siniak Date: Fri, 18 Jul 2025 10:07:43 +0300 Subject: [PATCH] [+] clean async_api 1. comment out not used logic; 2. remove app.py and websocket_api.py; --- .../test_task_2025_07_17_v2/async_api/app.py | 68 ---------- .../test_task_2025_07_17_v2/async_api/db.py | 2 +- .../async_api/fastapi.py | 60 ++++----- .../async_api/schema.py | 125 ++++++++--------- .../async_api/websocket_api.py | 126 ------------------ 5 files changed, 94 insertions(+), 287 deletions(-) delete mode 100644 deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/app.py delete mode 100644 deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/websocket_api.py diff --git a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/app.py b/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/app.py deleted file mode 100644 index 8c980e9..0000000 --- a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/app.py +++ /dev/null @@ -1,68 +0,0 @@ -import asyncio -import datetime -import logging -# import os -from ..tickers_retrieval.emcont import Emcont -from ..tickers.models import Ticker -from ..tickers.logic import ticker_store_multiple, markets_get_by_symbol -from .db import create_engine -import sqlalchemy.ext.asyncio -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.ext.asyncio import async_sessionmaker - -from typing import Any - -logger = logging.getLogger(__name__) - -async def run() -> None: - async_session = create_engine() - - async def store_cb( - rates: list[Emcont.rates_get_t.data_t.rate_t], - timestamp: datetime.datetime, - session: 'async_sessionmaker[AsyncSession]', - ) -> None: - logger.info(dict( - msg='before markets', - )) - - markets = await markets_get_by_symbol( - session, - set([ - rate.symbol - for rate in rates - ]), - ) - - logger.info(dict( - msg='after markets', - )) - - await ticker_store_multiple( - session, - [ - Ticker( - id=markets[rate.symbol], - timestamp=timestamp, - value=rate.value, - ) - for rate in rates - ] - ) - - logger.info(dict( - rates=rates, - timestamp=timestamp.isoformat() - )) - - await Emcont.worker( - only_symbols={'EURUSD', 'USDJPY', 'GBPUSD', 'AUDUSD', 'USDCAD'}, - session=async_session, - store_cb=store_cb, - request_timeout=2, - store_timeout=0.5, - ) - -if __name__ == '__main__': - logging.basicConfig(level=logging.INFO) - asyncio.run(run()) diff --git a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/db.py b/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/db.py index 2f16629..9c0075a 100644 --- a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/db.py +++ b/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/db.py @@ -1,4 +1,4 @@ -from ..tickers.settings import Settings as ModelsSettings +from ..payloads.settings import Settings as ModelsSettings import sqlalchemy.ext.asyncio from sqlalchemy.ext.asyncio import AsyncSession diff --git a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/fastapi.py b/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/fastapi.py index 74f58ba..b9043dc 100644 --- a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/fastapi.py +++ b/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/fastapi.py @@ -9,49 +9,49 @@ import sys from .settings import Settings as APISettings from .db import create_engine -from .websocket_api import WebsocketAPI +# from .websocket_api import WebsocketAPI from typing import (Any, Optional, Literal, Annotated,) logger = logging.getLogger(__name__) -async def websocket_tickers( - websocket: fastapi.WebSocket, - websocket_api: WebsocketAPI, -) -> None: - try: - await websocket_api.connect(websocket) - - while True: - msg = await websocket.receive_text() - await websocket_api.on_message(websocket, msg) - except fastapi.WebSocketDisconnect: - pass - # websocket_api.disconnect(websocket) - except: - logger.exception('') - raise - finally: - await websocket_api.disconnect(websocket) +# async def websocket_tickers( +# websocket: fastapi.WebSocket, +# websocket_api: WebsocketAPI, +# ) -> None: +# try: +# await websocket_api.connect(websocket) +# +# while True: +# msg = await websocket.receive_text() +# await websocket_api.on_message(websocket, msg) +# except fastapi.WebSocketDisconnect: +# pass +# # websocket_api.disconnect(websocket) +# except: +# logger.exception('') +# raise +# finally: +# await websocket_api.disconnect(websocket) def create_app() -> fastapi.FastAPI: async_session = create_engine() - websocket_api = WebsocketAPI( - session=async_session, - ) + # websocket_api = WebsocketAPI( + # session=async_session, + # ) app = fastapi.FastAPI() - app.websocket( - '/tickers/', - )( - functools.partial( - websocket_tickers, - websocket_api=fastapi.Depends(lambda : websocket_api), - ) - ) + # app.websocket( + # '/tickers/', + # )( + # functools.partial( + # websocket_tickers, + # websocket_api=fastapi.Depends(lambda : websocket_api), + # ) + # ) return app diff --git a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/schema.py b/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/schema.py index fd0d50b..499b84b 100644 --- a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/schema.py +++ b/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/schema.py @@ -3,65 +3,66 @@ import decimal from typing import (Literal, Annotated,) -class SubscribeAction(pydantic.BaseModel): - action: Literal['subscribe'] - class message_t(pydantic.BaseModel): - asset_id: Annotated[ - int, - pydantic.Field(alias='assetId') - ] - - message: message_t - -class AssetsAction(pydantic.BaseModel): - action: Literal['assets'] - class message_t(pydantic.BaseModel): - pass - - message: Annotated[ - message_t, - pydantic.Field( - default_factory=message_t, - ) - ] - -Action = pydantic.RootModel[ - AssetsAction | SubscribeAction -] - -class AssetHistoryResponse(pydantic.BaseModel): - action: Literal['asset_history'] = 'asset_history' - - class message_t(pydantic.BaseModel): - class point_t(pydantic.BaseModel): - asset_name : Annotated[ - str, - pydantic.Field( - alias='assetName', - ) - ] - time: int - asset_id : Annotated[ - int, - pydantic.Field(alias='assetId') - ] - value: decimal.Decimal - - points: list[point_t] - message: message_t - -class AssetTickerResponse(pydantic.BaseModel): - action: Literal['point'] = 'point' - - message: 'AssetHistoryResponse.message_t.point_t' - -class AssetsResponse(pydantic.BaseModel): - action: Literal['assets'] = 'assets' - - class message_t(pydantic.BaseModel): - class asset_t(pydantic.BaseModel): - id: int - name: str - - assets: list[asset_t] - message: message_t +# class SubscribeAction(pydantic.BaseModel): +# action: Literal['subscribe'] +# class message_t(pydantic.BaseModel): +# asset_id: Annotated[ +# int, +# pydantic.Field(alias='assetId') +# ] +# +# message: message_t +# +# class AssetsAction(pydantic.BaseModel): +# action: Literal['assets'] +# class message_t(pydantic.BaseModel): +# pass +# +# message: Annotated[ +# message_t, +# pydantic.Field( +# default_factory=message_t, +# ) +# ] +# +# Action = pydantic.RootModel[ +# AssetsAction | SubscribeAction +# ] +# +# class AssetHistoryResponse(pydantic.BaseModel): +# action: Literal['asset_history'] = 'asset_history' +# +# class message_t(pydantic.BaseModel): +# class point_t(pydantic.BaseModel): +# asset_name : Annotated[ +# str, +# pydantic.Field( +# alias='assetName', +# ) +# ] +# time: int +# asset_id : Annotated[ +# int, +# pydantic.Field(alias='assetId') +# ] +# value: decimal.Decimal +# +# points: list[point_t] +# message: message_t +# +# class AssetTickerResponse(pydantic.BaseModel): +# action: Literal['point'] = 'point' +# +# message: 'AssetHistoryResponse.message_t.point_t' +# +# class AssetsResponse(pydantic.BaseModel): +# action: Literal['assets'] = 'assets' +# +# class message_t(pydantic.BaseModel): +# class asset_t(pydantic.BaseModel): +# id: int +# name: str +# +# assets: list[asset_t] +# message: message_t +# diff --git a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/websocket_api.py b/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/websocket_api.py deleted file mode 100644 index e7c53dd..0000000 --- a/deps/test-task-2025-07-17-v2/python/online/fxreader/pr34/test_task_2025_07_17_v2/async_api/websocket_api.py +++ /dev/null @@ -1,126 +0,0 @@ -import fastapi -import datetime -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.ext.asyncio import async_sessionmaker -from . import schema -from ..tickers.logic import tickers_get_by_period, markets_all - -from typing import (Optional, Literal) - -class WebsocketAPI: - def __init__( - self, - session: 'async_sessionmaker[AsyncSession]', - ) -> None: - self.connections : set[ - fastapi.WebSocket, - ] = set() - self.subscriptions_by_asset_id : dict[ - int, set[fastapi.WebSocket] - ] = dict() - self.subscriptions_by_client : dict[ - fastapi.WebSocket, - int, - ] = dict() - self.session = session - - async def connect(self, client: fastapi.WebSocket) -> None: - assert not client in self.connections - - await client.accept() - - self.connections.add(client) - - - async def subscribe( - self, - client: fastapi.WebSocket, - asset_id: int - ) -> None: - if client in self.subscriptions_by_client: - last_asset_id = self.subscriptions_by_client[client] - del self.subscriptions_by_asset_id[last_asset_id] - del self.subscriptions_by_client[client] - - if not asset_id in self.subscriptions_by_asset_id: - self.subscriptions_by_asset_id[asset_id] = set() - - self.subscriptions_by_asset_id[asset_id].add(client) - self.subscriptions_by_client[client] = asset_id - - await self.asset_last_period(client, asset_id) - - async def asset_last_period( - self, - client: fastapi.WebSocket, - asset_id: int, - ) -> None: - tickers = await tickers_get_by_period( - self.session, - period=datetime.timedelta(minutes=30), - market_id=asset_id, - ) - - await client.send_text( - schema.AssetHistoryResponse( - message=schema.AssetHistoryResponse.message_t( - points=[ - schema.AssetHistoryResponse.message_t.point_t.model_construct( - asset_name=o.market.name, - asset_id=o.market.id, - time=int(o.timestamp.timestamp()), - value=o.value, - ) - for o in tickers - ] - ) - ).json(by_alias=True,), - ) - - async def assets_index( - self, - client: fastapi.WebSocket, - ) -> None: - markets = await markets_all( - self.session, - ) - - await client.send_text( - schema.AssetsResponse( - message=schema.AssetsResponse.message_t( - assets=[ - schema.AssetsResponse.message_t.asset_t.model_construct( - name=o.name, - id=o.id, - ) - for o in markets - ] - ) - ).json(by_alias=True,), - ) - - async def on_message( - self, - client: fastapi.WebSocket, - msg_raw: str - ) -> None: - msg = schema.Action.model_validate_json( - msg_raw - ).root - - if isinstance(msg, schema.SubscribeAction): - await self.subscribe( - client, - msg.message.asset_id - ) - elif isinstance(msg, schema.AssetsAction): - await self.assets_index( - client, - ) - else: - raise NotImplementedError - - async def disconnect(self, client: fastapi.WebSocket) -> None: - assert client in self.connections - - self.connections.remove(client)