[+] improve timeouts handling
This commit is contained in:
parent
92a9f36acd
commit
52df4b54d5
12
deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/app.py
vendored
12
deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/async_api/app.py
vendored
@ -27,6 +27,10 @@ async def run() -> None:
|
|||||||
timestamp: datetime.datetime,
|
timestamp: datetime.datetime,
|
||||||
session: 'async_sessionmaker[AsyncSession]',
|
session: 'async_sessionmaker[AsyncSession]',
|
||||||
) -> None:
|
) -> None:
|
||||||
|
logger.info(dict(
|
||||||
|
msg='before markets',
|
||||||
|
))
|
||||||
|
|
||||||
markets = await markets_get_by_symbol(
|
markets = await markets_get_by_symbol(
|
||||||
session,
|
session,
|
||||||
set([
|
set([
|
||||||
@ -35,6 +39,10 @@ async def run() -> None:
|
|||||||
]),
|
]),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
logger.info(dict(
|
||||||
|
msg='after markets',
|
||||||
|
))
|
||||||
|
|
||||||
await ticker_store_multiple(
|
await ticker_store_multiple(
|
||||||
session,
|
session,
|
||||||
[
|
[
|
||||||
@ -55,7 +63,9 @@ async def run() -> None:
|
|||||||
await Emcont.worker(
|
await Emcont.worker(
|
||||||
only_symbols={'EURUSD', 'USDJPY', 'GBPUSD', 'AUDUSD', 'USDCAD'},
|
only_symbols={'EURUSD', 'USDJPY', 'GBPUSD', 'AUDUSD', 'USDCAD'},
|
||||||
session=async_session,
|
session=async_session,
|
||||||
store_cb=store_cb
|
store_cb=store_cb,
|
||||||
|
request_timeout=2,
|
||||||
|
store_timeout=0.5,
|
||||||
)
|
)
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
40
deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers_retrieval/emcont.py
vendored
40
deps/test-task-2025-06-30-v1/python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers_retrieval/emcont.py
vendored
@ -92,27 +92,45 @@ class Emcont:
|
|||||||
session: 'async_sessionmaker[AsyncSession]',
|
session: 'async_sessionmaker[AsyncSession]',
|
||||||
store_cb: 'Emcont.store_cb_t',
|
store_cb: 'Emcont.store_cb_t',
|
||||||
only_symbols: Optional[set[str]] = None,
|
only_symbols: Optional[set[str]] = None,
|
||||||
|
request_timeout: float | int = 0.5,
|
||||||
|
store_timeout: float | int = 0.5,
|
||||||
) -> None:
|
) -> None:
|
||||||
last_retrieval = datetime.datetime.now()
|
last_retrieval = datetime.datetime.now()
|
||||||
|
|
||||||
while True:
|
assert request_timeout >= 0
|
||||||
try:
|
assert store_timeout >= 0
|
||||||
entries = await cls.rates_get(
|
|
||||||
only_symbols=only_symbols,
|
|
||||||
)
|
|
||||||
|
|
||||||
await store_cb(
|
while True:
|
||||||
rates=entries.rates,
|
logger.info(dict(msg='started'))
|
||||||
timestamp=last_retrieval,
|
|
||||||
session=session,
|
entries : Optional[cls.rates_get_t.data_t] = None
|
||||||
)
|
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
async with asyncio.timeout(request_timeout):
|
||||||
|
entries = await cls.rates_get(
|
||||||
|
only_symbols=only_symbols,
|
||||||
|
)
|
||||||
|
except TimeoutError:
|
||||||
|
logger.exception('request timeout')
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with asyncio.timeout(store_timeout):
|
||||||
|
if entries:
|
||||||
|
await store_cb(
|
||||||
|
rates=entries.rates,
|
||||||
|
timestamp=last_retrieval,
|
||||||
|
session=session,
|
||||||
|
)
|
||||||
|
except TimeoutError:
|
||||||
|
logger.exception('store timeout')
|
||||||
except:
|
except:
|
||||||
logger.exception('')
|
logger.exception('')
|
||||||
|
|
||||||
next_retrieval = last_retrieval + datetime.timedelta(seconds=1)
|
next_retrieval = last_retrieval + datetime.timedelta(seconds=1)
|
||||||
|
|
||||||
wait_interval = (
|
wait_interval = (
|
||||||
datetime.datetime.now() - next_retrieval
|
next_retrieval - datetime.datetime.now()
|
||||||
).total_seconds()
|
).total_seconds()
|
||||||
|
|
||||||
if wait_interval > 0:
|
if wait_interval > 0:
|
||||||
|
Loading…
Reference in New Issue
Block a user