[+] add Emcont.worker

This commit is contained in:
Siarhei Siniak 2025-07-08 13:08:37 +03:00
parent ac23cc9397
commit 38c0b9ba87

@ -1,9 +1,20 @@
import aiohttp import aiohttp
import asyncio
import decimal import decimal
import logging
import datetime
import pydantic import pydantic
import json import json
from typing import (Any, Annotated, Optional,) from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from typing import (
Any, Annotated, Optional, Awaitable, Callable,
Protocol,
)
logger = logging.getLogger(__name__)
class Emcont: class Emcont:
class rates_get_t: class rates_get_t:
@ -60,3 +71,45 @@ class Emcont:
] ]
return data return data
class store_cb_t(Protocol):
async def __call__(
self,
rates: list['Emcont.rates_get_t.data_t.rate_t'],
timestamp: datetime.datetime,
session: 'async_sessionmaker[AsyncSession]',
) -> None: ...
@classmethod
async def worker(
cls,
session: 'async_sessionmaker[AsyncSession]',
store_cb: 'Emcont.store_cb_t',
only_symbols: Optional[set[str]] = None,
) -> None:
last_retrieval = datetime.datetime.now()
while True:
try:
entries = await cls.rates_get(
only_symbols=only_symbols,
)
await store_cb(
rates=entries.rates,
timestamp=last_retrieval,
session=session,
)
except:
logger.exception('')
next_retrieval = last_retrieval + datetime.timedelta(seconds=1)
wait_interval = (
datetime.datetime.now() - next_retrieval
).total_seconds()
if wait_interval > 0:
await asyncio.sleep(wait_interval)
last_retrieval = next_retrieval