diff --git a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/app.py b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/app.py index a9b8b40..8c980e9 100644 --- a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/app.py +++ b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/app.py @@ -4,8 +4,8 @@ import logging # import os from ..tickers_retrieval.emcont import Emcont from ..tickers.models import Ticker -from ..tickers.settings import Settings as ModelsSettings from ..tickers.logic import ticker_store_multiple, markets_get_by_symbol +from .db import create_engine import sqlalchemy.ext.asyncio from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import async_sessionmaker @@ -15,12 +15,7 @@ from typing import Any logger = logging.getLogger(__name__) async def run() -> None: - engine = sqlalchemy.ext.asyncio.create_async_engine( - ModelsSettings.singleton().db_url - ) - async_session = sqlalchemy.ext.asyncio.async_sessionmaker( - engine - ) + async_session = create_engine() async def store_cb( rates: list[Emcont.rates_get_t.data_t.rate_t], diff --git a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/db.py b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/db.py new file mode 100644 index 0000000..2f16629 --- /dev/null +++ b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/db.py @@ -0,0 +1,15 @@ +from ..tickers.settings import Settings as ModelsSettings + +import sqlalchemy.ext.asyncio +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import async_sessionmaker + +def create_engine() -> 'async_sessionmaker[AsyncSession]': + engine = sqlalchemy.ext.asyncio.create_async_engine( + ModelsSettings.singleton().db_url + ) + async_session = sqlalchemy.ext.asyncio.async_sessionmaker( + engine + ) + + return async_session diff --git a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/fastapi.py b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/fastapi.py index f6a95a0..74f58ba 100644 --- a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/fastapi.py +++ b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/fastapi.py @@ -8,68 +8,13 @@ import uvicorn.config import sys from .settings import Settings as APISettings +from .db import create_engine +from .websocket_api import WebsocketAPI from typing import (Any, Optional, Literal, Annotated,) logger = logging.getLogger(__name__) -class WebsocketAPI: - def __init__( - self, - ) -> None: - self.connections : set[ - fastapi.WebSocket, - ] = set() - self.subscriptionss : dict[str, set[fastapi.WebSocket]] = dict() - - async def connect(self, client: fastapi.WebSocket) -> None: - assert not client in self.connections - - await client.accept() - - self.connections.add(client) - - class Subscribe(pydantic.BaseModel): - action: Literal['subscribe'] - class message_t(pydantic.BaseModel): - asset_id: Annotated[ - int, - pydantic.Field(alias='assetId') - ] - - message: message_t - - class Assets(pydantic.BaseModel): - action: Literal['assets'] - class message_t(pydantic.BaseModel): - pass - - message: Annotated[ - message_t, - pydantic.Field( - default_factory=message_t, - ) - ] - - Message = pydantic.RootModel[ - Assets | Subscribe - ] - - async def on_message( - self, - client: fastapi.WebSocket, - msg_raw: str - ) -> None: - msg = self.Message.model_validate_json( - msg_raw - ).root - - raise NotImplementedError - - async def disconnect(self, client: fastapi.WebSocket) -> None: - assert client in self.connections - - self.connections.remove(client) async def websocket_tickers( websocket: fastapi.WebSocket, @@ -91,7 +36,11 @@ async def websocket_tickers( await websocket_api.disconnect(websocket) def create_app() -> fastapi.FastAPI: - websocket_api = WebsocketAPI() + async_session = create_engine() + + websocket_api = WebsocketAPI( + session=async_session, + ) app = fastapi.FastAPI() diff --git a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/schema.py b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/schema.py new file mode 100644 index 0000000..85339bd --- /dev/null +++ b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/schema.py @@ -0,0 +1,62 @@ +import pydantic +import decimal + +from typing import (Literal, Annotated,) + +class SubscribeAction(pydantic.BaseModel): + action: Literal['subscribe'] + class message_t(pydantic.BaseModel): + asset_id: Annotated[ + int, + pydantic.Field(alias='assetId') + ] + + message: message_t + +class AssetsAction(pydantic.BaseModel): + action: Literal['assets'] + class message_t(pydantic.BaseModel): + pass + + message: Annotated[ + message_t, + pydantic.Field( + default_factory=message_t, + ) + ] + +Action = pydantic.RootModel[ + AssetsAction | SubscribeAction +] + +class AssetHistoryResponse(pydantic.BaseModel): + action: Literal['asset_history'] + class message_t(pydantic.BaseModel): + class point_t(pydantic.BaseModel): + asset_name : Annotated[ + str, + pydantic.Field(alias='assetName') + ] + time: int + asset_id : Annotated[ + int, + pydantic.Field(alias='assetId') + ] + value: decimal.Decimal + + points: list[point_t] + message: message_t + +class AssetTickerResponse(pydantic.BaseModel): + action: Literal['point'] + message: 'AssetHistoryResponse.message_t.point_t' + +class AssetsResponse(pydantic.BaseModel): + action: Literal['asset_history'] + class message_t(pydantic.BaseModel): + class asset_t(pydantic.BaseModel): + id: int + name: str + + assets: list[asset_t] + message: message_t diff --git a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/websocket_api.py b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/websocket_api.py new file mode 100644 index 0000000..5877b3e --- /dev/null +++ b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/websocket_api.py @@ -0,0 +1,107 @@ +import fastapi +import datetime +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import async_sessionmaker +from . import schema +from ..tickers.logic import tickers_get_by_period + +from typing import (Optional, Literal) + +class WebsocketAPI: + def __init__( + self, + session: 'async_sessionmaker[AsyncSession]', + ) -> None: + self.connections : set[ + fastapi.WebSocket, + ] = set() + self.subscriptions_by_asset_id : dict[ + int, set[fastapi.WebSocket] + ] = dict() + self.subscriptions_by_client : dict[ + fastapi.WebSocket, + int, + ] = dict() + self.session = session + + async def connect(self, client: fastapi.WebSocket) -> None: + assert not client in self.connections + + await client.accept() + + self.connections.add(client) + + + async def subscribe( + self, + client: fastapi.WebSocket, + asset_id: int + ) -> None: + if client in self.subscriptions_by_client: + last_asset_id = self.subscriptions_by_client[client] + del self.subscriptions_by_asset_id[last_asset_id] + del self.subscriptions_by_client[client] + + self.subscriptions_by_asset_id[asset_id].add(client) + self.subscriptions_by_client[client] = asset_id + + await self.asset_last_period(client, asset_id) + + async def asset_last_period( + self, + client: fastapi.WebSocket, + asset_id: int, + ) -> None: + tickers = await tickers_get_by_period( + self.session, + period=datetime.timedelta(minutes=30), + market_id=asset_id, + ) + + await client.send_text( + schema.AssetHistoryResponse.model_validate(dict( + message=dict( + points=[ + dict( + asset_name=o.asset_name, + asset_id=o.asset_id, + time=o.timestamp.timestamp(), + value=o.value, + ) + for o in tickers + ] + ) + )).json(), + ) + + async def assets_index( + self, + client: fastapi.WebSocket, + ) -> None: + raise NotImplementedError + + async def on_message( + self, + client: fastapi.WebSocket, + msg_raw: str + ) -> None: + msg = schema.Action.model_validate_json( + msg_raw + ).root + + if isinstance(msg, schema.SubscribeAction): + await self.subscribe( + client, + msg.message.asset_id + ) + elif isinstance(msg, schema.AssetsAction): + await self.assets_index( + client, + ) + else: + raise NotImplementedError + + async def disconnect(self, client: fastapi.WebSocket) -> None: + assert client in self.connections + + self.connections.remove(client) diff --git a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers/logic.py b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers/logic.py index 80ecf75..82e0180 100644 --- a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers/logic.py +++ b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers/logic.py @@ -1,3 +1,5 @@ +import datetime + from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import async_sessionmaker @@ -31,3 +33,12 @@ async def ticker_store_multiple( active_session.add_all( tickers, ) + +async def tickers_get_by_period( + session: 'async_sessionmaker[AsyncSession]', + market_id: int, + period: datetime.timedelta, +) -> None: + async with session() as active_session: + async with active_session.begin() as transaction: + raise NotImplementedError