[+] partially add assets history endpoint

This commit is contained in:
Siarhei Siniak 2025-07-15 13:10:04 +03:00
parent 7d6ce1eaee
commit 181a9a5ce9
6 changed files with 204 additions and 65 deletions

@ -4,8 +4,8 @@ import logging
# import os # import os
from ..tickers_retrieval.emcont import Emcont from ..tickers_retrieval.emcont import Emcont
from ..tickers.models import Ticker from ..tickers.models import Ticker
from ..tickers.settings import Settings as ModelsSettings
from ..tickers.logic import ticker_store_multiple, markets_get_by_symbol from ..tickers.logic import ticker_store_multiple, markets_get_by_symbol
from .db import create_engine
import sqlalchemy.ext.asyncio import sqlalchemy.ext.asyncio
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.ext.asyncio import async_sessionmaker
@ -15,12 +15,7 @@ from typing import Any
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def run() -> None: async def run() -> None:
engine = sqlalchemy.ext.asyncio.create_async_engine( async_session = create_engine()
ModelsSettings.singleton().db_url
)
async_session = sqlalchemy.ext.asyncio.async_sessionmaker(
engine
)
async def store_cb( async def store_cb(
rates: list[Emcont.rates_get_t.data_t.rate_t], rates: list[Emcont.rates_get_t.data_t.rate_t],

@ -0,0 +1,15 @@
from ..tickers.settings import Settings as ModelsSettings
import sqlalchemy.ext.asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
def create_engine() -> 'async_sessionmaker[AsyncSession]':
engine = sqlalchemy.ext.asyncio.create_async_engine(
ModelsSettings.singleton().db_url
)
async_session = sqlalchemy.ext.asyncio.async_sessionmaker(
engine
)
return async_session

@ -8,68 +8,13 @@ import uvicorn.config
import sys import sys
from .settings import Settings as APISettings from .settings import Settings as APISettings
from .db import create_engine
from .websocket_api import WebsocketAPI
from typing import (Any, Optional, Literal, Annotated,) from typing import (Any, Optional, Literal, Annotated,)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class WebsocketAPI:
def __init__(
self,
) -> None:
self.connections : set[
fastapi.WebSocket,
] = set()
self.subscriptionss : dict[str, set[fastapi.WebSocket]] = dict()
async def connect(self, client: fastapi.WebSocket) -> None:
assert not client in self.connections
await client.accept()
self.connections.add(client)
class Subscribe(pydantic.BaseModel):
action: Literal['subscribe']
class message_t(pydantic.BaseModel):
asset_id: Annotated[
int,
pydantic.Field(alias='assetId')
]
message: message_t
class Assets(pydantic.BaseModel):
action: Literal['assets']
class message_t(pydantic.BaseModel):
pass
message: Annotated[
message_t,
pydantic.Field(
default_factory=message_t,
)
]
Message = pydantic.RootModel[
Assets | Subscribe
]
async def on_message(
self,
client: fastapi.WebSocket,
msg_raw: str
) -> None:
msg = self.Message.model_validate_json(
msg_raw
).root
raise NotImplementedError
async def disconnect(self, client: fastapi.WebSocket) -> None:
assert client in self.connections
self.connections.remove(client)
async def websocket_tickers( async def websocket_tickers(
websocket: fastapi.WebSocket, websocket: fastapi.WebSocket,
@ -91,7 +36,11 @@ async def websocket_tickers(
await websocket_api.disconnect(websocket) await websocket_api.disconnect(websocket)
def create_app() -> fastapi.FastAPI: def create_app() -> fastapi.FastAPI:
websocket_api = WebsocketAPI() async_session = create_engine()
websocket_api = WebsocketAPI(
session=async_session,
)
app = fastapi.FastAPI() app = fastapi.FastAPI()

@ -0,0 +1,62 @@
import pydantic
import decimal
from typing import (Literal, Annotated,)
class SubscribeAction(pydantic.BaseModel):
action: Literal['subscribe']
class message_t(pydantic.BaseModel):
asset_id: Annotated[
int,
pydantic.Field(alias='assetId')
]
message: message_t
class AssetsAction(pydantic.BaseModel):
action: Literal['assets']
class message_t(pydantic.BaseModel):
pass
message: Annotated[
message_t,
pydantic.Field(
default_factory=message_t,
)
]
Action = pydantic.RootModel[
AssetsAction | SubscribeAction
]
class AssetHistoryResponse(pydantic.BaseModel):
action: Literal['asset_history']
class message_t(pydantic.BaseModel):
class point_t(pydantic.BaseModel):
asset_name : Annotated[
str,
pydantic.Field(alias='assetName')
]
time: int
asset_id : Annotated[
int,
pydantic.Field(alias='assetId')
]
value: decimal.Decimal
points: list[point_t]
message: message_t
class AssetTickerResponse(pydantic.BaseModel):
action: Literal['point']
message: 'AssetHistoryResponse.message_t.point_t'
class AssetsResponse(pydantic.BaseModel):
action: Literal['asset_history']
class message_t(pydantic.BaseModel):
class asset_t(pydantic.BaseModel):
id: int
name: str
assets: list[asset_t]
message: message_t

@ -0,0 +1,107 @@
import fastapi
import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from . import schema
from ..tickers.logic import tickers_get_by_period
from typing import (Optional, Literal)
class WebsocketAPI:
def __init__(
self,
session: 'async_sessionmaker[AsyncSession]',
) -> None:
self.connections : set[
fastapi.WebSocket,
] = set()
self.subscriptions_by_asset_id : dict[
int, set[fastapi.WebSocket]
] = dict()
self.subscriptions_by_client : dict[
fastapi.WebSocket,
int,
] = dict()
self.session = session
async def connect(self, client: fastapi.WebSocket) -> None:
assert not client in self.connections
await client.accept()
self.connections.add(client)
async def subscribe(
self,
client: fastapi.WebSocket,
asset_id: int
) -> None:
if client in self.subscriptions_by_client:
last_asset_id = self.subscriptions_by_client[client]
del self.subscriptions_by_asset_id[last_asset_id]
del self.subscriptions_by_client[client]
self.subscriptions_by_asset_id[asset_id].add(client)
self.subscriptions_by_client[client] = asset_id
await self.asset_last_period(client, asset_id)
async def asset_last_period(
self,
client: fastapi.WebSocket,
asset_id: int,
) -> None:
tickers = await tickers_get_by_period(
self.session,
period=datetime.timedelta(minutes=30),
market_id=asset_id,
)
await client.send_text(
schema.AssetHistoryResponse.model_validate(dict(
message=dict(
points=[
dict(
asset_name=o.asset_name,
asset_id=o.asset_id,
time=o.timestamp.timestamp(),
value=o.value,
)
for o in tickers
]
)
)).json(),
)
async def assets_index(
self,
client: fastapi.WebSocket,
) -> None:
raise NotImplementedError
async def on_message(
self,
client: fastapi.WebSocket,
msg_raw: str
) -> None:
msg = schema.Action.model_validate_json(
msg_raw
).root
if isinstance(msg, schema.SubscribeAction):
await self.subscribe(
client,
msg.message.asset_id
)
elif isinstance(msg, schema.AssetsAction):
await self.assets_index(
client,
)
else:
raise NotImplementedError
async def disconnect(self, client: fastapi.WebSocket) -> None:
assert client in self.connections
self.connections.remove(client)

@ -1,3 +1,5 @@
import datetime
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.ext.asyncio import async_sessionmaker
@ -31,3 +33,12 @@ async def ticker_store_multiple(
active_session.add_all( active_session.add_all(
tickers, tickers,
) )
async def tickers_get_by_period(
session: 'async_sessionmaker[AsyncSession]',
market_id: int,
period: datetime.timedelta,
) -> None:
async with session() as active_session:
async with active_session.begin() as transaction:
raise NotImplementedError