[+] clean async_api

1. comment out not used logic;
  2. remove app.py and websocket_api.py;
This commit is contained in:
Siarhei Siniak 2025-07-18 10:07:43 +03:00
parent c81ee3c4ec
commit 207a8737ba
5 changed files with 94 additions and 287 deletions

@ -1,68 +0,0 @@
import asyncio
import datetime
import logging
# import os
from ..tickers_retrieval.emcont import Emcont
from ..tickers.models import Ticker
from ..tickers.logic import ticker_store_multiple, markets_get_by_symbol
from .db import create_engine
import sqlalchemy.ext.asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from typing import Any
logger = logging.getLogger(__name__)
async def run() -> None:
async_session = create_engine()
async def store_cb(
rates: list[Emcont.rates_get_t.data_t.rate_t],
timestamp: datetime.datetime,
session: 'async_sessionmaker[AsyncSession]',
) -> None:
logger.info(dict(
msg='before markets',
))
markets = await markets_get_by_symbol(
session,
set([
rate.symbol
for rate in rates
]),
)
logger.info(dict(
msg='after markets',
))
await ticker_store_multiple(
session,
[
Ticker(
id=markets[rate.symbol],
timestamp=timestamp,
value=rate.value,
)
for rate in rates
]
)
logger.info(dict(
rates=rates,
timestamp=timestamp.isoformat()
))
await Emcont.worker(
only_symbols={'EURUSD', 'USDJPY', 'GBPUSD', 'AUDUSD', 'USDCAD'},
session=async_session,
store_cb=store_cb,
request_timeout=2,
store_timeout=0.5,
)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(run())

@ -1,4 +1,4 @@
from ..tickers.settings import Settings as ModelsSettings from ..payloads.settings import Settings as ModelsSettings
import sqlalchemy.ext.asyncio import sqlalchemy.ext.asyncio
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession

@ -9,49 +9,49 @@ import sys
from .settings import Settings as APISettings from .settings import Settings as APISettings
from .db import create_engine from .db import create_engine
from .websocket_api import WebsocketAPI # from .websocket_api import WebsocketAPI
from typing import (Any, Optional, Literal, Annotated,) from typing import (Any, Optional, Literal, Annotated,)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def websocket_tickers( # async def websocket_tickers(
websocket: fastapi.WebSocket, # websocket: fastapi.WebSocket,
websocket_api: WebsocketAPI, # websocket_api: WebsocketAPI,
) -> None: # ) -> None:
try: # try:
await websocket_api.connect(websocket) # await websocket_api.connect(websocket)
#
while True: # while True:
msg = await websocket.receive_text() # msg = await websocket.receive_text()
await websocket_api.on_message(websocket, msg) # await websocket_api.on_message(websocket, msg)
except fastapi.WebSocketDisconnect: # except fastapi.WebSocketDisconnect:
pass # pass
# websocket_api.disconnect(websocket) # # websocket_api.disconnect(websocket)
except: # except:
logger.exception('') # logger.exception('')
raise # raise
finally: # finally:
await websocket_api.disconnect(websocket) # await websocket_api.disconnect(websocket)
def create_app() -> fastapi.FastAPI: def create_app() -> fastapi.FastAPI:
async_session = create_engine() async_session = create_engine()
websocket_api = WebsocketAPI( # websocket_api = WebsocketAPI(
session=async_session, # session=async_session,
) # )
app = fastapi.FastAPI() app = fastapi.FastAPI()
app.websocket( # app.websocket(
'/tickers/', # '/tickers/',
)( # )(
functools.partial( # functools.partial(
websocket_tickers, # websocket_tickers,
websocket_api=fastapi.Depends(lambda : websocket_api), # websocket_api=fastapi.Depends(lambda : websocket_api),
) # )
) # )
return app return app

@ -3,65 +3,66 @@ import decimal
from typing import (Literal, Annotated,) from typing import (Literal, Annotated,)
class SubscribeAction(pydantic.BaseModel): # class SubscribeAction(pydantic.BaseModel):
action: Literal['subscribe'] # action: Literal['subscribe']
class message_t(pydantic.BaseModel): # class message_t(pydantic.BaseModel):
asset_id: Annotated[ # asset_id: Annotated[
int, # int,
pydantic.Field(alias='assetId') # pydantic.Field(alias='assetId')
] # ]
#
message: message_t # message: message_t
#
class AssetsAction(pydantic.BaseModel): # class AssetsAction(pydantic.BaseModel):
action: Literal['assets'] # action: Literal['assets']
class message_t(pydantic.BaseModel): # class message_t(pydantic.BaseModel):
pass # pass
#
message: Annotated[ # message: Annotated[
message_t, # message_t,
pydantic.Field( # pydantic.Field(
default_factory=message_t, # default_factory=message_t,
) # )
] # ]
#
Action = pydantic.RootModel[ # Action = pydantic.RootModel[
AssetsAction | SubscribeAction # AssetsAction | SubscribeAction
] # ]
#
class AssetHistoryResponse(pydantic.BaseModel): # class AssetHistoryResponse(pydantic.BaseModel):
action: Literal['asset_history'] = 'asset_history' # action: Literal['asset_history'] = 'asset_history'
#
class message_t(pydantic.BaseModel): # class message_t(pydantic.BaseModel):
class point_t(pydantic.BaseModel): # class point_t(pydantic.BaseModel):
asset_name : Annotated[ # asset_name : Annotated[
str, # str,
pydantic.Field( # pydantic.Field(
alias='assetName', # alias='assetName',
) # )
] # ]
time: int # time: int
asset_id : Annotated[ # asset_id : Annotated[
int, # int,
pydantic.Field(alias='assetId') # pydantic.Field(alias='assetId')
] # ]
value: decimal.Decimal # value: decimal.Decimal
#
points: list[point_t] # points: list[point_t]
message: message_t # message: message_t
#
class AssetTickerResponse(pydantic.BaseModel): # class AssetTickerResponse(pydantic.BaseModel):
action: Literal['point'] = 'point' # action: Literal['point'] = 'point'
#
message: 'AssetHistoryResponse.message_t.point_t' # message: 'AssetHistoryResponse.message_t.point_t'
#
class AssetsResponse(pydantic.BaseModel): # class AssetsResponse(pydantic.BaseModel):
action: Literal['assets'] = 'assets' # action: Literal['assets'] = 'assets'
#
class message_t(pydantic.BaseModel): # class message_t(pydantic.BaseModel):
class asset_t(pydantic.BaseModel): # class asset_t(pydantic.BaseModel):
id: int # id: int
name: str # name: str
#
assets: list[asset_t] # assets: list[asset_t]
message: message_t # message: message_t
#

@ -1,126 +0,0 @@
import fastapi
import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from . import schema
from ..tickers.logic import tickers_get_by_period, markets_all
from typing import (Optional, Literal)
class WebsocketAPI:
def __init__(
self,
session: 'async_sessionmaker[AsyncSession]',
) -> None:
self.connections : set[
fastapi.WebSocket,
] = set()
self.subscriptions_by_asset_id : dict[
int, set[fastapi.WebSocket]
] = dict()
self.subscriptions_by_client : dict[
fastapi.WebSocket,
int,
] = dict()
self.session = session
async def connect(self, client: fastapi.WebSocket) -> None:
assert not client in self.connections
await client.accept()
self.connections.add(client)
async def subscribe(
self,
client: fastapi.WebSocket,
asset_id: int
) -> None:
if client in self.subscriptions_by_client:
last_asset_id = self.subscriptions_by_client[client]
del self.subscriptions_by_asset_id[last_asset_id]
del self.subscriptions_by_client[client]
if not asset_id in self.subscriptions_by_asset_id:
self.subscriptions_by_asset_id[asset_id] = set()
self.subscriptions_by_asset_id[asset_id].add(client)
self.subscriptions_by_client[client] = asset_id
await self.asset_last_period(client, asset_id)
async def asset_last_period(
self,
client: fastapi.WebSocket,
asset_id: int,
) -> None:
tickers = await tickers_get_by_period(
self.session,
period=datetime.timedelta(minutes=30),
market_id=asset_id,
)
await client.send_text(
schema.AssetHistoryResponse(
message=schema.AssetHistoryResponse.message_t(
points=[
schema.AssetHistoryResponse.message_t.point_t.model_construct(
asset_name=o.market.name,
asset_id=o.market.id,
time=int(o.timestamp.timestamp()),
value=o.value,
)
for o in tickers
]
)
).json(by_alias=True,),
)
async def assets_index(
self,
client: fastapi.WebSocket,
) -> None:
markets = await markets_all(
self.session,
)
await client.send_text(
schema.AssetsResponse(
message=schema.AssetsResponse.message_t(
assets=[
schema.AssetsResponse.message_t.asset_t.model_construct(
name=o.name,
id=o.id,
)
for o in markets
]
)
).json(by_alias=True,),
)
async def on_message(
self,
client: fastapi.WebSocket,
msg_raw: str
) -> None:
msg = schema.Action.model_validate_json(
msg_raw
).root
if isinstance(msg, schema.SubscribeAction):
await self.subscribe(
client,
msg.message.asset_id
)
elif isinstance(msg, schema.AssetsAction):
await self.assets_index(
client,
)
else:
raise NotImplementedError
async def disconnect(self, client: fastapi.WebSocket) -> None:
assert client in self.connections
self.connections.remove(client)