diff --git a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers_retrieval/emcont.py b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers_retrieval/emcont.py index 2ada068..0b2f0a8 100644 --- a/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers_retrieval/emcont.py +++ b/deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers_retrieval/emcont.py @@ -1,9 +1,20 @@ import aiohttp +import asyncio import decimal +import logging +import datetime import pydantic import json -from typing import (Any, Annotated, Optional,) +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import async_sessionmaker + +from typing import ( + Any, Annotated, Optional, Awaitable, Callable, + Protocol, +) + +logger = logging.getLogger(__name__) class Emcont: class rates_get_t: @@ -60,3 +71,45 @@ class Emcont: ] return data + + class store_cb_t(Protocol): + async def __call__( + self, + rates: list['Emcont.rates_get_t.data_t.rate_t'], + timestamp: datetime.datetime, + session: 'async_sessionmaker[AsyncSession]', + ) -> None: ... + + @classmethod + async def worker( + cls, + session: 'async_sessionmaker[AsyncSession]', + store_cb: 'Emcont.store_cb_t', + only_symbols: Optional[set[str]] = None, + ) -> None: + last_retrieval = datetime.datetime.now() + + while True: + try: + entries = await cls.rates_get( + only_symbols=only_symbols, + ) + + await store_cb( + rates=entries.rates, + timestamp=last_retrieval, + session=session, + ) + except: + logger.exception('') + + next_retrieval = last_retrieval + datetime.timedelta(seconds=1) + + wait_interval = ( + datetime.datetime.now() - next_retrieval + ).total_seconds() + + if wait_interval > 0: + await asyncio.sleep(wait_interval) + + last_retrieval = next_retrieval