Compare commits

...

88 Commits

Author SHA1 Message Date
81be8abc50 [+] improve POST:/payload
1. fix get_or_create parameters;
  2. TODO, check existence before calling
    summarizer,
    return schema for rest api
    with result and its id;
2025-07-28 10:59:16 +03:00
a5b19cfe2a [+] add caching of generated payloads
1. fix session persistence
    with using Generator as Depends method;
  2. implement payload_get_or_create method;
2025-07-28 10:39:08 +03:00
4491b1b05a [+] partially fix summarizer service env 2025-07-25 13:15:36 +03:00
a77df06bdd [+] update payload service 2025-07-25 13:11:44 +03:00
38c9c838ed [+] let payloads app reuse summarizer
1. add rest_client to transform app;
  2. wrap summarizer client inside payloads
    with domain and protocol configured via settings;
  3. update view to request summary generation per each
    input list inside POST:/payloads
2025-07-25 13:07:06 +03:00
17f5d81953 [+] deploy payloads app
1. add app.py for payloads
    alike the one in summarizer app;
  2. check that the app service works;
2025-07-25 12:50:04 +03:00
468eac45a2 [+] fix singleton for summarizer
1. use explicit singleton class variable
    for Summarizer, instead of relying
    on fastapi Depends caching;
2025-07-25 12:40:06 +03:00
c0866ee863 [+] fix summarizer created upon every request
1. Reuse single Annotated instance;
2025-07-25 12:17:30 +03:00
5568c458c2 [+] improve rest api for summarizer
1. refactor out dependency logic into dependencies;
  2. fix input schema for summarizer;
  3. fix early app initialization before uvicorn sets logging;
  4. add manual basicConfig call;
  5. update default host and port for fastapi server;
  6. add pyright watch mode;
2025-07-25 12:05:20 +03:00
e82586e08c [+] update service
1. adding initialization logging;
  2. try to fix no caching for requirements.txt;
2025-07-25 11:35:12 +03:00
520633f383 [+] rework docker for web
1. add separate requirements for heavy
    packages with custom index-url;
  2. rewrite docker file to make sure caching
    is not triggering reinstall of requirements
    all the time;
  3. make sure summarizer has transformers
    and torch modules;
2025-07-25 11:23:36 +03:00
491d272304 [+] add app component for transform
1. use FastAPI Depends to initialize
    Summarizer once for the whole transform app;
2025-07-25 10:40:17 +03:00
d6e1424d89 [+] remove duplicate views from app.py 2025-07-24 11:14:02 +03:00
9bd3c3e45b [+] partially deploy summarizer as service
1. add views, schema for transform module;
  2. partially added injection of singleton
    instances, like Summarizer;
2025-07-24 11:13:07 +03:00
7355552366 [+] add .whl 2025-07-24 11:03:45 +03:00
64cfccd353 [+] improve worker.py 2025-07-24 11:00:16 +03:00
97e0270550 [+] improve typing for worker.py 2025-07-24 10:26:47 +03:00
c4eb8b5568 [+] partially add worker based on huggingface
1. test huggingface manually from IPython;
  1.1. seems to work;
  1.2 put some long text from documentation,
    it has provided some summary;
  1.3. runs in about 40-60 seconds on 3-4 cores CPU;
  1.4. almost made reuse local cache,
    for some reason huggingface still can
    download LFS weights from the public repo;
  2. partially wrapped into worker
    which is to be run as a separate service in docker-compose;
2025-07-23 11:10:13 +03:00
0b5971c4af [+] partially add transform mock
1. researching hugging face model
    https://huggingface.co/sshleifer/distilbart-cnn-12-6?library=transformers
2025-07-22 11:14:18 +03:00
80b5a6a64e [+] update Makefile 2025-07-21 11:25:06 +03:00
bcf300e0c0 [+] update docs, fix url ending 2025-07-21 11:21:44 +03:00
13f5bbc2dd [+] add return type for rest api payloads 2025-07-21 11:19:59 +03:00
ce50a74510 [+] partially add rest api views 2025-07-21 11:17:26 +03:00
f77399b1d2 [+] add ruff
1. add ruff recipies to Makefile;
  2. reformat source code with ruff;
2025-07-18 10:11:28 +03:00
207a8737ba [+] clean async_api
1. comment out not used logic;
  2. remove app.py and websocket_api.py;
2025-07-18 10:07:43 +03:00
c81ee3c4ec [+] fix postgresql environment 2025-07-18 10:04:55 +03:00
d185aaf329 [+] add initial payloads/models.py migration 2025-07-18 09:59:16 +03:00
fe33d5c7f6 [+] rename module folder
1. rename python module folder;
  2. clean up payloads/logic.py;
  3. update partially payloads/alembic/env.py;
2025-07-18 09:55:22 +03:00
0ee9e87b7f [+] update payloads.models 2025-07-18 09:51:05 +03:00
cfdd6b72f0 [+] add requirements 2025-07-17 21:17:41 +03:00
d0ffbeef0f [+] add simple fastapi template
1. rename basic package name;
  2. rewrite requirements into docs/readme.md;
2025-07-17 21:14:07 +03:00
8385eff636 [m] merge test task v1 2025-07-17 20:55:46 +03:00
b0dd2eb5cf [+] release .whl 2025-07-16 12:22:32 +03:00
f7abca1e1b [+] implment assets_index 2025-07-16 12:21:18 +03:00
e73f57670a [+] fix time serialization 2025-07-16 12:14:51 +03:00
f4831d5759 [+] improve asset_history 2025-07-16 11:04:17 +03:00
17bfb08e43 [+] fix typing 2025-07-16 10:52:37 +03:00
dda9c841fd [+] improve ticker orm logic 2025-07-15 14:05:45 +03:00
181a9a5ce9 [+] partially add assets history endpoint 2025-07-15 13:10:04 +03:00
7d6ce1eaee [+] improve websocket api 2025-07-14 11:41:02 +03:00
612d807bc4 [+] add websocket clients manager 2025-07-14 11:17:39 +03:00
25d5b34add [+] add log_level 2025-07-14 11:07:22 +03:00
0151e61cd6 [+] add websockets dependency 2025-07-14 11:05:26 +03:00
687dc4bb9b [+] fix typing 2025-07-14 11:03:10 +03:00
814fefd18b [+] add api settings 2025-07-14 10:59:19 +03:00
afdc7c17b6 [+] partially add fastapi 2025-07-14 10:53:57 +03:00
3cde36d8a7 [+] use timezone for timestamp 2025-07-11 11:34:11 +03:00
83ac7c3a66 [+] update env for emcont_worker 2025-07-11 11:21:14 +03:00
ad7bff67c4 [+] update docker for services 2025-07-11 11:17:48 +03:00
83a09207d6 [+] deploy .whl 2025-07-11 11:04:38 +03:00
7103f3a089 [+] fix periodic retrieval 2025-07-11 10:55:36 +03:00
10c012aba2 [+] improve periodic processing 2025-07-10 11:53:08 +03:00
52df4b54d5 [+] improve timeouts handling 2025-07-10 11:40:53 +03:00
92a9f36acd [+] fix not frozen Market rows 2025-07-10 11:27:58 +03:00
070a63222c [+] improve tickers storage in DB 2025-07-10 11:18:15 +03:00
027475e4b3 [+] remove alembic.ini 2025-07-09 12:08:08 +03:00
13e2bff324 [+] add get_or_create for sqlalchemy 2025-07-09 11:51:20 +03:00
731b9d384a [+] partially add storing of tickers 2025-07-09 11:12:56 +03:00
c8370f96ff [+] reuse settings from tickers 2025-07-09 10:55:10 +03:00
9aec75cdd7 [+] add basic entry point 2025-07-08 13:17:13 +03:00
38c0b9ba87 [+] add Emcont.worker 2025-07-08 13:08:37 +03:00
ac23cc9397 [+] improve requirements 2025-07-08 10:46:16 +03:00
acd34f2ca5 [+] improve emcont wrapper 2025-07-08 10:44:47 +03:00
eb32f27bad [+] partially add emcont
1. update dependencies;
  2. partially add tickers retrieval and parsing;
2025-07-07 10:45:03 +03:00
60ef0e386d [+] add Ticker table 2025-07-07 10:33:44 +03:00
3f1e8c57ac [+] update alembic
1. fix async connection for alembic;
  2. generate Market table migration;
  3. migrate database;
2025-07-04 11:33:33 +03:00
18449382e1 [+] update alembic 2025-07-04 10:56:27 +03:00
c42e39a8d5 [+] update alembic 2025-07-04 10:49:50 +03:00
f8eb591b05 [+] improve alembic 2025-07-04 10:41:27 +03:00
d9f5c20557 [+] add models partially 2025-07-03 11:29:50 +03:00
4440e084b9 [+] improve typing 2025-07-03 11:22:33 +03:00
2dec1e33c2 [+] improve alembic 2025-07-03 11:18:35 +03:00
eb2d630dd0 [+] add alembic 2025-07-03 11:06:50 +03:00
e4a02726e7 [+] improve deployment 2025-07-02 12:43:40 +03:00
a0a2248306 [+] improve deployment 2025-07-02 12:39:40 +03:00
fadfd8711c [+] improve compose 2025-07-02 12:36:55 +03:00
d01386a4dc [+] improve compose 2025-07-02 12:29:45 +03:00
c82107bad1 [+] add postgresql 2025-07-02 12:21:55 +03:00
976576f8c6 [+] add redis 2025-07-01 11:08:51 +03:00
c2fecdd87c [+] improve compose 2025-07-01 11:05:50 +03:00
a0f1654cf5 [+] improve project layout 2025-07-01 11:03:55 +03:00
7fb9aae90e [+] update requirements 2025-07-01 10:49:40 +03:00
1e141ce6fa [+] add partial documentation 2025-06-30 18:43:09 +03:00
4da19bb053 [+] improve pyright 2025-06-30 18:40:03 +03:00
e7bc75f0d8 [+] add basic python module layout 2025-06-30 18:38:14 +03:00
d03154314c [+] update dependencies 2025-06-30 18:35:18 +03:00
1a668159c7 [+] add basic commands 2025-06-30 18:31:22 +03:00
a33711cc49 [+] add tmuxp 2025-06-30 18:21:22 +03:00
97 changed files with 6819 additions and 0 deletions

@ -0,0 +1,5 @@
.venv
tmp
.git
.env
build

@ -0,0 +1 @@
releases/whl/** filter=lfs diff=lfs merge=lfs -text

@ -0,0 +1,6 @@
!.tmuxp/
!python
.env/
releases/tar
build
!releases/whl/**

@ -0,0 +1,9 @@
session_name: test-task-2025-06-30-v1
start_directory: ${PWD}/deps/test-task-2025-06-30-v1
windows:
- focus: 'true'
layout: 5687,98x12,0,0,18
options: {}
panes:
- pane
window_name: zsh

62
deps/test-task-2025-06-30-v1/Makefile vendored Normal file

@ -0,0 +1,62 @@
ENV_PATH ?= .venv
PYTHON_PATH = $(ENV_PATH)/bin/python3
PYTHON_VERSION ?= 3.10
UV_ARGS ?= --offline
DOCKER ?= podman
COMPOSE ?= podman compose
venv_extract_requirements:
$(ENV_PATH)/bin/tomlq \
-r '.project.dependencies | join("\n")' \
pyproject.toml > requirements.in
venv_compile:
uv pip compile \
$(UV_ARGS) \
-p $(PYTHON_VERSION) \
--generate-hashes \
requirements.in > \
requirements.txt
venv:
uv \
venv \
-p 3.13 \
$(UV_ARGS) \
--seed \
$(ENV_PATH)
uv \
pip install \
$(UV_ARGS) \
-p $(ENV_PATH) \
-r requirements.txt
pyright:
$(ENV_PATH)/bin/python3 -m pyright \
-p pyproject.toml \
--pythonpath $(PYTHON_PATH)
compose_env:
cat docker/postgresql/.env .env/postgresql.env > .env/postgresql.patched.env
cat docker/web/.env .env/web.env > .env/web.patched.env
compose_build_web:
$(COMPOSE) build web
git-release:
git archive \
--format=tar \
-o "releases/tar/repo-$$(git describe --tags).tar" \
HEAD
ALEMBIC_CMD ?= --help
alembic:
$(ENV_PATH)/bin/alembic \
-c pyproject.toml \
$(ALEMBIC_CMD)
deploy_wheel:
make pyright
$(PYTHON_PATH) -m build -o releases/whl -w -n

@ -0,0 +1,76 @@
services:
redis:
image: docker.io/redis:latest-alpine@sha256:e71b4cb00ea461ac21114cff40ff12fb8396914238e1e9ec41520b2d5a4d3423
ports:
- 127.0.0.1:9004:6379
web: &web
image: online.fxreader.pr34.test_task_2025_06_30_v1:dev
build:
context: .
dockerfile: ./docker/web/Dockerfile
target: web
env_file: .env/web.env
logging:
driver: "json-file"
options:
max-size: 10m
max-file: "3"
deploy:
resources:
limits:
cpus: '0.5'
memory: 128M
web-dev:
<<: *web
volumes:
- .:/app:ro
- ./tmp/cache:/app/tmp/cache:rw
emcont_worker:
<<: *web
image: online.fxreader.pr34.test_task_2025_06_30_v1:dev
environment:
command:
- python3
- -m
- online.fxreader.pr34.test_task_2025_06_30_v1.async_api.app
postgresql:
image: docker.io/postgres:14.18-bookworm@sha256:c0aab7962b283cf24a0defa5d0d59777f5045a7be59905f21ba81a20b1a110c9
# restart: always
# set shared memory limit when using docker compose
shm_size: 128mb
volumes:
- postgresql_data:/var/lib/postgresql/data/:rw
# or set shared memory limit when deploy via swarm stack
#volumes:
# - type: tmpfs
# target: /dev/shm
# tmpfs:
# size: 134217728 # 128*2^20 bytes = 128Mb
env_file: .env/postgresql.patched.env
# environment:
# POSTGRES_PASSWORD: example
ports:
- 127.0.0.1:9002:5432
logging:
driver: "json-file"
options:
max-size: 10m
max-file: "3"
deploy:
resources:
limits:
cpus: '0.5'
memory: 128M
adminer:
image: docker.io/adminer:standalone@sha256:730215fe535daca9a2f378c48321bc615c8f0d88668721e0eff530fa35b6e8f6
ports:
- 127.0.0.1:9001:8080
volumes:
postgresql_data:

@ -0,0 +1,3 @@
PGDATA=/var/lib/postgresql/data/pgdata
POSTGRES_USER=tickers
POSTGRES_DB=tickers

@ -0,0 +1 @@
# DB_URL=

@ -0,0 +1,50 @@
FROM docker.io/library/python:3.12@sha256:6121c801703ec330726ebf542faab113efcfdf2236378c03df8f49d80e7b4180 AS base
ENV DEBIAN_FRONTEND=noninteractive
WORKDIR /app
COPY docker/web/apt.requirements.txt docker/web/apt.requirements.txt
RUN apt-get update \
&& apt-get install -y $(cat docker/web/apt.requirements.txt)
RUN \
pip3 install \
--break-system-packages uv
COPY requirements.txt requirements.txt
RUN \
--mount=type=bind,source=releases/whl,target=/app/releases/whl \
--mount=type=cache,target=/root/.cache/pip \
--mount=type=cache,target=/root/.cache/uv \
uv pip \
install \
--system \
--break-system-packages \
-f releases/whl \
-r requirements.txt
WORKDIR /app
RUN apt-get update -yy && apt-get install -yy tini
FROM base as web
RUN \
--mount=type=bind,source=releases/whl,target=/app/releases/whl \
--mount=type=cache,target=/root/.cache/pip \
--mount=type=cache,target=/root/.cache/uv \
uv pip \
install \
--system \
--break-system-packages \
--no-index \
-f releases/whl \
'online.fxreader.pr34.test_task_2025_06_30_v1==0.1'
ENTRYPOINT ["tini", "--"]
CMD [ \
"python3", \
"-m", \
"online.fxreader.pr34.test_task_2025_06_30_v1.async_api.app" \
]

@ -0,0 +1 @@
wget tar git curl

@ -0,0 +1,89 @@
# Requirements
Tickers of interest:
- EURUSD
- USDJPY
- GBPUSD
- AUDUSD
- USDCAD
Rest API - https://rates.emcont.com
Scrape every second;
Schema:
Ticker:
id: foreign_key market
timestamp: datetime
# (ask + bid) / 2
value: decimal
Store up to 30 minutes of recent tickers;
Return via websocket up to 30 minutes of recent tickers;
# AsyncAPI
```yaml
AsyncAPI:
Endpoints:
subscribe:
Request: SubscribeAction
Response: AssetHistoryResponse | AssetTickerResponse
list:
Request: AssetsAction
Response: AssetsResponse
Schema:
SubscribeAction:
action: Literal['subscribe']
message:
assetId: 1
AssetHistoryResponse:
action: Literal['asset_history']
message:
points:
- assetName: EURUSD
time: 1455883484
assetId: 1
value: 1.110481
- assetName: EURUSD
time: 1455883485
assetId: 1
value: 1.110948
- assetName: EURUSD
time: 1455883486
assetId: 1
value: 1.111122
AssetTickerResponse:
action: Literal['point']
message:
assetName: EURUSD
time: 1455883484
assetId: 1
value: 1.110481
AssetsAction:
action: Literal['assets']
message: {}
AssetsResponse:
action: Literal['assets']
message:
assets:
- id: 1
name: EURUSD
- id: 2
name: USDJPY
- id: 3
name: GBPUSD
- id: 4
name: AUDUSD
- id: 5
name: USDCAD
```
# Services:
``` yaml
web:
ports:
- 8080:80
```

@ -0,0 +1,229 @@
[project]
description = 'test task for websocket with crypto tickers'
requires-python = '>= 3.10'
maintainers = [
{ name = 'Siarhei Siniak', email = 'siarheisiniak@gmail.com' },
]
classifiers = [
'Programming Language :: Python',
]
name = 'online.fxreader.pr34.test_task_2025_06_30_v1'
version = '0.1.1'
dependencies = [
'alembic',
'fastapi',
'uvicorn',
'websockets',
'uvloop',
'tomlq',
'mypy',
'marisa-trie',
'pydantic',
'asyncpg',
'pydantic-settings',
'tomlkit',
'tomlq',
'numpy',
'cryptography',
'mypy',
'pyright',
'ruff',
'ipython',
'ipdb',
'requests',
'types-requests',
'aiohttp',
'build',
'wheel',
'setuptools',
'setuptools-scm',
]
[build-system]
requires = ['build', 'wheel', 'setuptools', 'setuptools-scm']
build-backend = 'setuptools.build_meta'
[tool.setuptools]
include-package-data = false
[tool.setuptools.package-dir]
'online.fxreader.pr34.test_task_2025_06_30_v1' = 'python/online/fxreader/pr34/test_task_2025_06_30_v1'
[tool.alembic]
script_location = 'python/online/fxreader/pr34/test_task_2025_06_30_v1/tickers/alembic'
prepend_sys_path = ['python']
# sqlalchemy.url = 'asdfasdf:/asdfasdfa'
[tool.ruff]
line-length = 160
target-version = 'py310'
# builtins = ['_', 'I', 'P']
include = [
# 'follow_the_leader/**/*.py',
#'*.py',
# '*.recipe',
'python/**/*.py',
'python/**/*.pyi',
]
exclude = [
'.venv',
]
[tool.ruff.format]
quote-style = 'single'
indent-style = 'tab'
skip-magic-trailing-comma = false
[tool.ruff.lint]
ignore = [
'E402', 'E722', 'E741', 'W191', 'E101', 'E501', 'I001', 'F401', 'E714',
'E713',
# remove lambdas later on
'E731',
# fix this too
'E712',
'E703',
# remove unused variables, or fix a bug
'F841',
# fix * imports
'F403',
# don't care about trailing new lines
'W292',
]
select = ['E', 'F', 'I', 'W', 'INT']
[tool.ruff.lint.isort]
detect-same-package = true
# extra-standard-library = ["aes", "elementmaker", "encodings"]
# known-first-party = ["calibre_extensions", "calibre_plugins", "polyglot"]
# known-third-party = ["odf", "qt", "templite", "tinycss", "css_selectors"]
relative-imports-order = "closest-to-furthest"
split-on-trailing-comma = true
section-order = [
# '__python__',
"future",
"standard-library", "third-party", "first-party", "local-folder"
]
force-wrap-aliases = true
# [tool.ruff.lint.isort.sections]
# '__python__' = ['__python__']
[tool.pylsp-mypy]
enabled = false
[tool.pyright]
include = [
#'../../../../../follow_the_leader/views2/payments.py',
#'../../../../../follow_the_leader/logic/payments.py',
#'../../../../../follow_the_leader/logic/paypal.py',
'python/**/*.py',
'python/**/*.pyi',
]
# stubPath = '../mypy-stubs'
extraPaths = [
]
#strict = ["src"]
analyzeUnannotatedFunctions = true
disableBytesTypePromotions = true
strictParameterNoneValue = true
enableTypeIgnoreComments = true
enableReachabilityAnalysis = true
strictListInference = true
strictDictionaryInference = true
strictSetInference = true
deprecateTypingAliases = false
enableExperimentalFeatures = false
reportMissingTypeStubs ="error"
reportMissingModuleSource = "warning"
reportInvalidTypeForm = "error"
reportMissingImports = "error"
reportUndefinedVariable = "error"
reportAssertAlwaysTrue = "error"
reportInvalidStringEscapeSequence = "error"
reportInvalidTypeVarUse = "error"
reportSelfClsParameterName = "error"
reportUnsupportedDunderAll = "error"
reportUnusedExpression = "error"
reportWildcardImportFromLibrary = "error"
reportAbstractUsage = "error"
reportArgumentType = "error"
reportAssertTypeFailure = "error"
reportAssignmentType = "error"
reportAttributeAccessIssue = "error"
reportCallIssue = "error"
reportGeneralTypeIssues = "error"
reportInconsistentOverload = "error"
reportIndexIssue = "error"
reportInvalidTypeArguments = "error"
reportNoOverloadImplementation = "error"
reportOperatorIssue = "error"
reportOptionalSubscript = "error"
reportOptionalMemberAccess = "error"
reportOptionalCall = "error"
reportOptionalIterable = "error"
reportOptionalContextManager = "error"
reportOptionalOperand = "error"
reportRedeclaration = "error"
reportReturnType = "error"
reportTypedDictNotRequiredAccess = "error"
reportPrivateImportUsage = "error"
reportUnboundVariable = "error"
reportUnhashable = "error"
reportUnusedCoroutine = "error"
reportUnusedExcept = "error"
reportFunctionMemberAccess = "error"
reportIncompatibleMethodOverride = "error"
reportIncompatibleVariableOverride = "error"
reportOverlappingOverload = "error"
reportPossiblyUnboundVariable = "error"
reportConstantRedefinition = "error"
#reportDeprecated = "error"
reportDeprecated = "warning"
reportDuplicateImport = "error"
reportIncompleteStub = "error"
reportInconsistentConstructor = "error"
reportInvalidStubStatement = "error"
reportMatchNotExhaustive = "error"
reportMissingParameterType = "error"
reportMissingTypeArgument = "error"
reportPrivateUsage = "error"
reportTypeCommentUsage = "error"
reportUnknownArgumentType = "error"
reportUnknownLambdaType = "error"
reportUnknownMemberType = "error"
reportUnknownParameterType = "error"
reportUnknownVariableType = "error"
#reportUnknownVariableType = "warning"
reportUnnecessaryCast = "error"
reportUnnecessaryComparison = "error"
reportUnnecessaryContains = "error"
#reportUnnecessaryIsInstance = "error"
reportUnnecessaryIsInstance = "warning"
reportUnusedClass = "error"
#reportUnusedImport = "error"
reportUnusedImport = "none"
# reportUnusedFunction = "error"
reportUnusedFunction = "warning"
#reportUnusedVariable = "error"
reportUnusedVariable = "warning"
reportUntypedBaseClass = "error"
reportUntypedClassDecorator = "error"
reportUntypedFunctionDecorator = "error"
reportUntypedNamedTuple = "error"
reportCallInDefaultInitializer = "none"
reportImplicitOverride = "none"
reportImplicitStringConcatenation = "none"
reportImportCycles = "none"
reportMissingSuperCall = "none"
reportPropertyTypeMismatch = "none"
reportShadowedImports = "none"
reportUninitializedInstanceVariable = "none"
reportUnnecessaryTypeIgnoreComment = "none"
reportUnusedCallResult = "none"

@ -0,0 +1,68 @@
import asyncio
import datetime
import logging
# import os
from ..tickers_retrieval.emcont import Emcont
from ..tickers.models import Ticker
from ..tickers.logic import ticker_store_multiple, markets_get_by_symbol
from .db import create_engine
import sqlalchemy.ext.asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from typing import Any
logger = logging.getLogger(__name__)
async def run() -> None:
async_session = create_engine()
async def store_cb(
rates: list[Emcont.rates_get_t.data_t.rate_t],
timestamp: datetime.datetime,
session: 'async_sessionmaker[AsyncSession]',
) -> None:
logger.info(dict(
msg='before markets',
))
markets = await markets_get_by_symbol(
session,
set([
rate.symbol
for rate in rates
]),
)
logger.info(dict(
msg='after markets',
))
await ticker_store_multiple(
session,
[
Ticker(
id=markets[rate.symbol],
timestamp=timestamp,
value=rate.value,
)
for rate in rates
]
)
logger.info(dict(
rates=rates,
timestamp=timestamp.isoformat()
))
await Emcont.worker(
only_symbols={'EURUSD', 'USDJPY', 'GBPUSD', 'AUDUSD', 'USDCAD'},
session=async_session,
store_cb=store_cb,
request_timeout=2,
store_timeout=0.5,
)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(run())

@ -0,0 +1,15 @@
from ..tickers.settings import Settings as ModelsSettings
import sqlalchemy.ext.asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
def create_engine() -> 'async_sessionmaker[AsyncSession]':
engine = sqlalchemy.ext.asyncio.create_async_engine(
ModelsSettings.singleton().db_url
)
async_session = sqlalchemy.ext.asyncio.async_sessionmaker(
engine
)
return async_session

@ -0,0 +1,71 @@
import fastapi
import pydantic
import functools
import logging
import copy
import uvicorn
import uvicorn.config
import sys
from .settings import Settings as APISettings
from .db import create_engine
from .websocket_api import WebsocketAPI
from typing import (Any, Optional, Literal, Annotated,)
logger = logging.getLogger(__name__)
async def websocket_tickers(
websocket: fastapi.WebSocket,
websocket_api: WebsocketAPI,
) -> None:
try:
await websocket_api.connect(websocket)
while True:
msg = await websocket.receive_text()
await websocket_api.on_message(websocket, msg)
except fastapi.WebSocketDisconnect:
pass
# websocket_api.disconnect(websocket)
except:
logger.exception('')
raise
finally:
await websocket_api.disconnect(websocket)
def create_app() -> fastapi.FastAPI:
async_session = create_engine()
websocket_api = WebsocketAPI(
session=async_session,
)
app = fastapi.FastAPI()
app.websocket(
'/tickers/',
)(
functools.partial(
websocket_tickers,
websocket_api=fastapi.Depends(lambda : websocket_api),
)
)
return app
def run(args: list[str]):
log_config = copy.deepcopy(uvicorn.config.LOGGING_CONFIG)
uvicorn.run(
create_app(),
host=APISettings.singleton().uvicorn_host,
port=APISettings.singleton().uvicorn_port,
loop='uvloop',
log_config=log_config,
log_level=logging.INFO,
)
if __name__ == '__main__':
run(sys.argv[1:])

@ -0,0 +1,67 @@
import pydantic
import decimal
from typing import (Literal, Annotated,)
class SubscribeAction(pydantic.BaseModel):
action: Literal['subscribe']
class message_t(pydantic.BaseModel):
asset_id: Annotated[
int,
pydantic.Field(alias='assetId')
]
message: message_t
class AssetsAction(pydantic.BaseModel):
action: Literal['assets']
class message_t(pydantic.BaseModel):
pass
message: Annotated[
message_t,
pydantic.Field(
default_factory=message_t,
)
]
Action = pydantic.RootModel[
AssetsAction | SubscribeAction
]
class AssetHistoryResponse(pydantic.BaseModel):
action: Literal['asset_history'] = 'asset_history'
class message_t(pydantic.BaseModel):
class point_t(pydantic.BaseModel):
asset_name : Annotated[
str,
pydantic.Field(
alias='assetName',
)
]
time: int
asset_id : Annotated[
int,
pydantic.Field(alias='assetId')
]
value: decimal.Decimal
points: list[point_t]
message: message_t
class AssetTickerResponse(pydantic.BaseModel):
action: Literal['point'] = 'point'
message: 'AssetHistoryResponse.message_t.point_t'
class AssetsResponse(pydantic.BaseModel):
action: Literal['assets'] = 'assets'
class message_t(pydantic.BaseModel):
class asset_t(pydantic.BaseModel):
id: int
name: str
assets: list[asset_t]
message: message_t

@ -0,0 +1,18 @@
import pydantic
import pydantic_settings
from typing import (ClassVar, Optional,)
class Settings(pydantic_settings.BaseSettings):
uvicorn_port : int = 80
uvicorn_host : str = '127.0.0.1'
_singleton : ClassVar[Optional['Settings']] = None
@classmethod
def singleton(cls) -> 'Settings':
if cls._singleton is None:
cls._singleton = Settings.model_validate({})
return cls._singleton

@ -0,0 +1,126 @@
import fastapi
import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from . import schema
from ..tickers.logic import tickers_get_by_period, markets_all
from typing import (Optional, Literal)
class WebsocketAPI:
def __init__(
self,
session: 'async_sessionmaker[AsyncSession]',
) -> None:
self.connections : set[
fastapi.WebSocket,
] = set()
self.subscriptions_by_asset_id : dict[
int, set[fastapi.WebSocket]
] = dict()
self.subscriptions_by_client : dict[
fastapi.WebSocket,
int,
] = dict()
self.session = session
async def connect(self, client: fastapi.WebSocket) -> None:
assert not client in self.connections
await client.accept()
self.connections.add(client)
async def subscribe(
self,
client: fastapi.WebSocket,
asset_id: int
) -> None:
if client in self.subscriptions_by_client:
last_asset_id = self.subscriptions_by_client[client]
del self.subscriptions_by_asset_id[last_asset_id]
del self.subscriptions_by_client[client]
if not asset_id in self.subscriptions_by_asset_id:
self.subscriptions_by_asset_id[asset_id] = set()
self.subscriptions_by_asset_id[asset_id].add(client)
self.subscriptions_by_client[client] = asset_id
await self.asset_last_period(client, asset_id)
async def asset_last_period(
self,
client: fastapi.WebSocket,
asset_id: int,
) -> None:
tickers = await tickers_get_by_period(
self.session,
period=datetime.timedelta(minutes=30),
market_id=asset_id,
)
await client.send_text(
schema.AssetHistoryResponse(
message=schema.AssetHistoryResponse.message_t(
points=[
schema.AssetHistoryResponse.message_t.point_t.model_construct(
asset_name=o.market.name,
asset_id=o.market.id,
time=int(o.timestamp.timestamp()),
value=o.value,
)
for o in tickers
]
)
).json(by_alias=True,),
)
async def assets_index(
self,
client: fastapi.WebSocket,
) -> None:
markets = await markets_all(
self.session,
)
await client.send_text(
schema.AssetsResponse(
message=schema.AssetsResponse.message_t(
assets=[
schema.AssetsResponse.message_t.asset_t.model_construct(
name=o.name,
id=o.id,
)
for o in markets
]
)
).json(by_alias=True,),
)
async def on_message(
self,
client: fastapi.WebSocket,
msg_raw: str
) -> None:
msg = schema.Action.model_validate_json(
msg_raw
).root
if isinstance(msg, schema.SubscribeAction):
await self.subscribe(
client,
msg.message.asset_id
)
elif isinstance(msg, schema.AssetsAction):
await self.assets_index(
client,
)
else:
raise NotImplementedError
async def disconnect(self, client: fastapi.WebSocket) -> None:
assert client in self.connections
self.connections.remove(client)

@ -0,0 +1,117 @@
import asyncio
import logging
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.engine.base import Connection
from alembic import context
from online.fxreader.pr34.test_task_2025_06_30_v1.tickers.settings import Settings
from online.fxreader.pr34.test_task_2025_06_30_v1.tickers.models import (
Base,
Market,
)
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
config.set_main_option(
'sqlalchemy.url',
Settings.singleton().db_url
)
# Interpret the config file for Python logging.
# This line sets up loggers basically.
# if config.config_file_name is not None:
# fileConfig(config.config_file_name)
# else:
if True:
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# target_metadata = None
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def do_run_migrations(
connection: Connection,
):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations():
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
logger.info(dict(msg='started'))
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
logger.info(dict(msg='done'))
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
raise NotImplementedError
# run_migrations_offline()
else:
run_migrations_online()

@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

@ -0,0 +1,36 @@
"""add Market table
Revision ID: 335b4c4f052c
Revises:
Create Date: 2025-07-04 11:31:10.983947
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '335b4c4f052c'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('tickers_market',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=32), nullable=False),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('tickers_market')
# ### end Alembic commands ###

@ -0,0 +1,38 @@
"""add timezone
Revision ID: 729afc7194c9
Revises: eb63f793db3a
Create Date: 2025-07-11 11:30:06.246152
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '729afc7194c9'
down_revision: Union[str, Sequence[str], None] = 'eb63f793db3a'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('tickers_ticker', 'timestamp',
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=False)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('tickers_ticker', 'timestamp',
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=False)
# ### end Alembic commands ###

@ -0,0 +1,38 @@
"""add Ticker table
Revision ID: eb63f793db3a
Revises: 335b4c4f052c
Create Date: 2025-07-07 10:32:49.812738
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'eb63f793db3a'
down_revision: Union[str, Sequence[str], None] = '335b4c4f052c'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('tickers_ticker',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('timestamp', sa.DateTime(), nullable=False),
sa.Column('value', sa.Numeric(precision=32, scale=6), nullable=False),
sa.ForeignKeyConstraint(['id'], ['tickers_market.id'], ondelete='CASCADE'),
sa.UniqueConstraint('id', 'timestamp')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('tickers_ticker')
# ### end Alembic commands ###

@ -0,0 +1,83 @@
import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload, make_transient
from sqlalchemy.future import select
from .models import Ticker, Market
from .utils import get_or_create
async def markets_get_by_symbol(
session: 'async_sessionmaker[AsyncSession]',
symbols: set[str],
) -> dict[str, int]:
res : dict[str, int] = dict()
async with session() as active_session:
async with active_session.begin() as transaction:
for o in symbols:
m = (await get_or_create(
active_session,
Market,
name=o,
))[0]
res[o] = m.id
return res
async def ticker_store_multiple(
session: 'async_sessionmaker[AsyncSession]',
tickers: list[Ticker],
) -> None:
async with session() as active_session:
async with active_session.begin() as transaction:
active_session.add_all(
tickers,
)
async def tickers_get_by_period(
session: 'async_sessionmaker[AsyncSession]',
market_id: int,
period: datetime.timedelta,
) -> list[Ticker]:
async with session() as active_session:
async with active_session.begin() as transaction:
q = select(
Ticker
).join(Ticker.market).where(
Market.id == market_id,
Ticker.timestamp >= datetime.datetime.now(
tz=datetime.timezone.utc
) - period
).order_by(Ticker.timestamp.desc()).options(
selectinload(Ticker.market)
)
res = await active_session.execute(q)
rows = [o[0] for o in res]
for o in rows:
active_session.expunge(o)
make_transient(o.market)
return rows
async def markets_all(
session: 'async_sessionmaker[AsyncSession]',
) -> list[Market]:
async with session() as active_session:
async with active_session.begin() as transaction:
q = select(
Market
)
res = await active_session.execute(q)
rows = [o[0] for o in res]
for o in rows:
active_session.expunge(o)
return rows

@ -0,0 +1,63 @@
import datetime
import decimal
from sqlalchemy.orm import (
mapped_column,
Mapped,
DeclarativeBase,
relationship,
)
from sqlalchemy import (
String,
ForeignKey,
Numeric,
DateTime,
UniqueConstraint,
)
from typing import (Optional,)
class Base(DeclarativeBase):
pass
class Market(Base):
__tablename__ = 'tickers_market'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(32))
tickers: Mapped[list['Ticker']] = relationship(
back_populates='market',
)
def __repr__(self) -> str:
return f"Market(id={self.id!r}, name={self.name!r})"
class Ticker(Base):
__tablename__ = 'tickers_ticker'
id: Mapped[int] = mapped_column(ForeignKey(
'tickers_market.id',
ondelete='CASCADE',
))
market: Mapped['Market'] = relationship(
back_populates='tickers'
)
timestamp: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True,)
)
value: Mapped[decimal.Decimal] = mapped_column(Numeric(
precision=32, scale=6,
))
__table_args__ = (
UniqueConstraint('id', 'timestamp'),
)
__mapper_args__ = dict(
primary_key=('id', 'timestamp',)
)
def __repr__(self) -> str:
return f"Ticker(id={self.id!r}, timestamp={self.timestamp!r}, value={self.value!r})"

@ -0,0 +1,17 @@
import pydantic
import pydantic_settings
from typing import (ClassVar, Optional,)
class Settings(pydantic_settings.BaseSettings):
db_url : str
_singleton : ClassVar[Optional['Settings']] = None
@classmethod
def singleton(cls) -> 'Settings':
if cls._singleton is None:
cls._singleton = Settings.model_validate({})
return cls._singleton

@ -0,0 +1,50 @@
from typing import (TypeVar, Optional, Any, cast,)
from sqlalchemy.ext.asyncio import AsyncSessionTransaction, AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.exc import NoResultFound, IntegrityError
M = TypeVar('M', bound='DeclarativeBase')
async def get_or_create(
session: AsyncSession,
model: type[M],
create_method: Optional[str] = None,
create_method_kwargs: Optional[dict[str, Any]] = None,
**kwargs: Any
) -> tuple[M, bool]:
async def select_row() -> M:
res = await session.execute(
select(model).where(
*[
getattr(model, k) == v
for k, v in kwargs.items()
]
)
)
row = res.one()[0]
assert isinstance(row, model)
return row
try:
res = await select_row()
return res, False
except NoResultFound:
if create_method_kwargs:
kwargs.update(create_method_kwargs)
if not create_method:
created = model(**kwargs)
else:
created = getattr(model, create_method)(**kwargs)
try:
session.add(created)
await session.flush()
return created, True
except IntegrityError:
await session.rollback()
return await select_row(), False

@ -0,0 +1,165 @@
import aiohttp
import asyncio
import decimal
import logging
import datetime
# import datetime.timezone
import pydantic
import json
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from typing import (
Any, Annotated, Optional, Awaitable, Callable,
Protocol,
)
logger = logging.getLogger(__name__)
class Emcont:
class rates_get_t:
class data_t(pydantic.BaseModel):
class rate_t(pydantic.BaseModel):
symbol: Annotated[
str,
pydantic.Field(
alias='Symbol',
)
]
bid: Annotated[
decimal.Decimal,
pydantic.Field(
alias='Bid',
)
]
ask: Annotated[
decimal.Decimal,
pydantic.Field(
alias='Ask',
)
]
@pydantic.computed_field
def value(self) -> decimal.Decimal:
return (self.ask + self.bid) / 2
product_type: Annotated[
str,
pydantic.Field(
alias='ProductType',
)
]
rates: Annotated[
list[rate_t],
pydantic.Field(
alias='Rates',
)
]
@classmethod
async def rates_get(
cls,
only_symbols: Optional[set[str]] = None,
) -> Any:
async with aiohttp.ClientSession() as session:
async with session.get('https://rates.emcont.com') as response:
data_json = await response.text()
data = cls.rates_get_t.data_t.model_validate_json(
data_json[5:-3],
)
if only_symbols:
data.rates = [
o
for o in data.rates
if o.symbol in only_symbols
]
return data
class store_cb_t(Protocol):
async def __call__(
self,
rates: list['Emcont.rates_get_t.data_t.rate_t'],
timestamp: datetime.datetime,
session: 'async_sessionmaker[AsyncSession]',
) -> None: ...
@classmethod
async def worker(
cls,
session: 'async_sessionmaker[AsyncSession]',
store_cb: 'Emcont.store_cb_t',
only_symbols: Optional[set[str]] = None,
request_timeout: float | int = 0.5,
store_timeout: float | int = 0.5,
request_period: float | int = 1,
) -> None:
last_retrieval = datetime.datetime.now(
tz=datetime.timezone.utc,
)
assert request_timeout >= 0
assert store_timeout >= 0
request_period_timedelta = datetime.timedelta(
seconds=request_period,
)
while True:
logger.info(dict(msg='started'))
entries : Optional['Emcont.rates_get_t.data_t'] = None
try:
try:
async with asyncio.timeout(request_timeout):
entries = await cls.rates_get(
only_symbols=only_symbols,
)
except TimeoutError:
logger.exception('request timeout')
try:
async with asyncio.timeout(store_timeout):
if entries:
await store_cb(
rates=entries.rates,
timestamp=last_retrieval,
session=session,
)
except TimeoutError:
logger.exception('store timeout')
except:
logger.exception('')
next_retrieval = last_retrieval
def wait_interval():
nonlocal next_retrieval
return (
next_retrieval - datetime.datetime.now(
tz=datetime.timezone.utc,
)
).total_seconds()
while True:
next_retrieval += request_period_timedelta
if (
wait_interval() > 0 or
wait_interval() > -request_period_timedelta.total_seconds() / 4
):
break
else:
logger.warning(dict(
msg='skip period due to huge lag',
))
if wait_interval() > 0:
await asyncio.sleep(wait_interval())
last_retrieval = next_retrieval

@ -0,0 +1,27 @@
alembic
fastapi
uvicorn
websockets
uvloop
tomlq
mypy
marisa-trie
pydantic
asyncpg
pydantic-settings
tomlkit
tomlq
numpy
cryptography
mypy
pyright
ruff
ipython
ipdb
requests
types-requests
aiohttp
build
wheel
setuptools
setuptools-scm

File diff suppressed because it is too large Load Diff

@ -0,0 +1,5 @@
.venv
tmp
.git
.env
build

@ -0,0 +1 @@
releases/whl/** filter=lfs diff=lfs merge=lfs -text

@ -0,0 +1,6 @@
!.tmuxp/
!python
.env/
releases/tar
build
!releases/whl/**

@ -0,0 +1,9 @@
session_name: test-task-2025-07-17-v2
start_directory: ${PWD}/deps/test-task-2025-07-17-v2
windows:
- focus: 'true'
layout: 5687,98x12,0,0,18
options: {}
panes:
- pane
window_name: zsh

99
deps/test-task-2025-07-17-v2/Makefile vendored Normal file

@ -0,0 +1,99 @@
ENV_PATH ?= .venv
PYTHON_PATH = $(ENV_PATH)/bin/python3
PYTHON_VERSION ?= 3.12.9
UV_ARGS ?= --offline
DOCKER ?= podman
COMPOSE ?= podman compose
venv_extract_requirements:
$(ENV_PATH)/bin/tomlq \
-r '.project.dependencies | join("\n")' \
pyproject.toml > requirements.in
venv_compile:
for requirements_name in requirements requirements.torch; do\
uv pip compile \
$(UV_ARGS) \
-p $(PYTHON_VERSION) \
--generate-hashes \
$$requirements_name.in > \
$$requirements_name.txt; \
cat $$requirements_name.in | grep 'index-url' >> $$requirements_name.txt; \
done
venv:
uv \
venv \
-p 3.13 \
$(UV_ARGS) \
--seed \
$(ENV_PATH)
uv \
pip install \
$(UV_ARGS) \
-p $(ENV_PATH) \
-r requirements.txt
PYRIGHT_ARGS ?= --threads 3
pyright:
$(ENV_PATH)/bin/python3 -m pyright \
-p pyproject.toml \
--pythonpath $(PYTHON_PATH) \
$(PYRIGHT_ARGS)
pyright_watch:
make \
PYRIGHT_ARGS=-w \
pyright
ruff_check:
$(ENV_PATH)/bin/python3 -m ruff \
check
ruff_format_check:
$(ENV_PATH)/bin/python3 -m ruff \
format --check
ruff_format:
$(ENV_PATH)/bin/python3 -m ruff \
format
ruff: ruff_format_check ruff_check
compose_env:
cat docker/postgresql/.env .env/postgresql.env > .env/postgresql.patched.env
cat docker/web/.env .env/web.env > .env/web.patched.env
for app in summarizer payloads; do \
cat docker/web/$$app.env .env/$$app.env > .env/$$app.patched.env; \
done
compose_build_web:
$(COMPOSE) build web
compose_build_summarizer:
$(COMPOSE) build summarizer
compose_build_payloads:
$(COMPOSE) build payloads
git-release:
mkdir -p releases/tar
git archive \
--format=tar \
-o "releases/tar/repo-$$(git describe --tags).tar" \
HEAD
ALEMBIC_CMD ?= --help
alembic:
$(ENV_PATH)/bin/alembic \
-c pyproject.toml \
$(ALEMBIC_CMD)
deploy_wheel:
make pyright
make deploy_wheel_unsafe
deploy_wheel_unsafe:
$(PYTHON_PATH) -m build -o releases/whl -w -n

@ -0,0 +1,88 @@
services:
redis:
image: docker.io/redis:latest-alpine@sha256:e71b4cb00ea461ac21114cff40ff12fb8396914238e1e9ec41520b2d5a4d3423
ports:
- 127.0.0.1:9004:6379
web: &web
image: online.fxreader.pr34.test_task_2025_07_17_v2.web:dev
build:
context: .
dockerfile: ./docker/web/Dockerfile
target: web
env_file: .env/web.env
logging:
driver: "json-file"
options:
max-size: 10m
max-file: "3"
deploy:
resources:
limits:
cpus: '0.5'
memory: 128M
web-dev:
<<: *web
volumes:
- .:/app:ro
- ./tmp/cache:/app/tmp/cache:rw
payloads:
<<: *web
image: online.fxreader.pr34.test_task_2025_07_17_v2.payloads:dev
env_file: .env/payloads.patched.env
ports:
- 127.0.0.1:9003:80
summarizer:
<<: *web
image: online.fxreader.pr34.test_task_2025_07_17_v2.summarizer:dev
env_file: .env/summarizer.patched.env
# ports:
# - 127.0.0.1:9003:80
deploy:
resources:
limits:
cpus: '4'
memory: 1500M
volumes:
- ~/.cache/huggingface/hub:/root/.cache/huggingface/hub:ro
postgresql:
image: docker.io/postgres:14.18-bookworm@sha256:c0aab7962b283cf24a0defa5d0d59777f5045a7be59905f21ba81a20b1a110c9
# restart: always
# set shared memory limit when using docker compose
shm_size: 128mb
volumes:
- postgresql_data:/var/lib/postgresql/data/:rw
# or set shared memory limit when deploy via swarm stack
#volumes:
# - type: tmpfs
# target: /dev/shm
# tmpfs:
# size: 134217728 # 128*2^20 bytes = 128Mb
env_file: .env/postgresql.patched.env
# environment:
# POSTGRES_PASSWORD: example
ports:
- 127.0.0.1:9002:5432
logging:
driver: "json-file"
options:
max-size: 10m
max-file: "3"
deploy:
resources:
limits:
cpus: '0.5'
memory: 128M
adminer:
image: docker.io/adminer:standalone@sha256:730215fe535daca9a2f378c48321bc615c8f0d88668721e0eff530fa35b6e8f6
ports:
- 127.0.0.1:9001:8080
volumes:
postgresql_data:

@ -0,0 +1,3 @@
PGDATA=/var/lib/postgresql/data/pgdata
POSTGRES_USER=payloads
POSTGRES_DB=payloads

@ -0,0 +1 @@
# DB_URL=

@ -0,0 +1,55 @@
FROM docker.io/library/python:3.12@sha256:6121c801703ec330726ebf542faab113efcfdf2236378c03df8f49d80e7b4180 AS base
ENV DEBIAN_FRONTEND=noninteractive
WORKDIR /app
COPY docker/web/apt.requirements.txt docker/web/apt.requirements.txt
RUN apt-get update \
&& apt-get install -y $(cat docker/web/apt.requirements.txt)
RUN \
pip3 install \
--break-system-packages uv
RUN apt-get update -yy && apt-get install -yy tini
COPY requirements.txt .
COPY requirements.torch.txt .
RUN \
# --mount=type=bind,source=releases/whl,target=/app/releases/whl \
--mount=type=cache,target=/root/.cache/pip \
--mount=type=cache,target=/root/.cache/uv \
( \
for requirements in requirements*.txt; do \
uv pip \
install \
--system \
--break-system-packages \
-r $requirements; \
done; \
)
WORKDIR /app
FROM base as web
RUN \
--mount=type=bind,source=releases/whl,target=/app/releases/whl \
--mount=type=cache,target=/root/.cache/pip \
--mount=type=cache,target=/root/.cache/uv \
uv pip \
install \
--system \
--break-system-packages \
--no-index \
-f releases/whl \
'online.fxreader.pr34.test_task_2025_07_17_v2==0.1.15'
ENTRYPOINT ["tini", "--"]
CMD [ \
"python3", \
"-m", \
"online.fxreader.pr34.test_task_2025_07_17_v2.async_api.fastapi" \
]

@ -0,0 +1 @@
wget tar git curl

@ -0,0 +1,5 @@
APPS=["online.fxreader.pr34.test_task_2025_07_17_v2.payloads.app:get_app_router:"]
UVICORN_HOST=0.0.0.0
UVICORN_PORT=80
SUMMARIZER_DOMAIN=summarizer
SUMMARIZER_PROTOCOL=http

@ -0,0 +1,5 @@
APPS=["online.fxreader.pr34.test_task_2025_07_17_v2.transform.app:get_app_router:"]
UVICORN_HOST=0.0.0.0
UVICORN_PORT=80
SUMMARIZER_DOMAIN=summarizer
SUMMARIZER_PROTOCOL=http

@ -0,0 +1,34 @@
# Requirements
1. FastAPI Microservice // Caching service
1.1. endpoints
1.1.1 POST:/payload `payload_create`
1.1.2. GET:/payload/<payload_id> `payload_read`
1.2. tech specs
1.2.1. fastapi for rest api
1.2.2. sqlite/postgresql for DB that caches LLM replies;
1.2.3. LLM transform can be stubbed (idk, maybe try to find something simple);
1.2.4. docker-compose for services
1.2.5. add pytest based tests;
1.2.6. add some linters for code style, and type checking;
# Schemas
```yaml
endpoints:
payload:
create:
request:
list_1: list[str]
list_2: list[str]
response:
payload:
id: int
output: list[str]
read:
request:
id: int
response:
id: int
output: list[str]
```

@ -0,0 +1,230 @@
[project]
description = 'test task for LLM replies caching'
requires-python = '>= 3.10'
maintainers = [
{ name = 'Siarhei Siniak', email = 'siarheisiniak@gmail.com' },
]
classifiers = [
'Programming Language :: Python',
]
name = 'online.fxreader.pr34.test_task_2025_07_17_v2'
version = '0.1.15'
dependencies = [
'alembic',
'fastapi',
'uvicorn',
'websockets',
'uvloop',
'tomlq',
'mypy',
'marisa-trie',
'pydantic',
'asyncpg',
'pydantic-settings',
'tomlkit',
'tomlq',
'numpy',
'cryptography',
'mypy',
'pyright',
'ruff',
'ipython',
'ipdb',
'requests',
'types-requests',
'aiohttp',
'build',
'wheel',
'setuptools',
'setuptools-scm',
'transformers',
]
[build-system]
requires = ['build', 'wheel', 'setuptools', 'setuptools-scm']
build-backend = 'setuptools.build_meta'
[tool.setuptools]
include-package-data = false
[tool.setuptools.package-dir]
'online.fxreader.pr34.test_task_2025_07_17_v2' = 'python/online/fxreader/pr34/test_task_2025_07_17_v2'
[tool.alembic]
script_location = 'python/online/fxreader/pr34/test_task_2025_07_17_v2/payloads/alembic'
prepend_sys_path = ['python']
# sqlalchemy.url = 'asdfasdf:/asdfasdfa'
[tool.ruff]
line-length = 160
target-version = 'py310'
# builtins = ['_', 'I', 'P']
include = [
# 'follow_the_leader/**/*.py',
#'*.py',
# '*.recipe',
'python/**/*.py',
'python/**/*.pyi',
]
exclude = [
'.venv',
]
[tool.ruff.format]
quote-style = 'single'
indent-style = 'tab'
skip-magic-trailing-comma = false
[tool.ruff.lint]
ignore = [
'E402', 'E722', 'E741', 'W191', 'E101', 'E501', 'I001', 'F401', 'E714',
'E713',
# remove lambdas later on
'E731',
# fix this too
'E712',
'E703',
# remove unused variables, or fix a bug
'F841',
# fix * imports
'F403',
# don't care about trailing new lines
'W292',
]
select = ['E', 'F', 'I', 'W', 'INT']
[tool.ruff.lint.isort]
detect-same-package = true
# extra-standard-library = ["aes", "elementmaker", "encodings"]
# known-first-party = ["calibre_extensions", "calibre_plugins", "polyglot"]
# known-third-party = ["odf", "qt", "templite", "tinycss", "css_selectors"]
relative-imports-order = "closest-to-furthest"
split-on-trailing-comma = true
section-order = [
# '__python__',
"future",
"standard-library", "third-party", "first-party", "local-folder"
]
force-wrap-aliases = true
# [tool.ruff.lint.isort.sections]
# '__python__' = ['__python__']
[tool.pylsp-mypy]
enabled = false
[tool.pyright]
include = [
#'../../../../../follow_the_leader/views2/payments.py',
#'../../../../../follow_the_leader/logic/payments.py',
#'../../../../../follow_the_leader/logic/paypal.py',
'python/**/*.py',
'python/**/*.pyi',
]
# stubPath = '../mypy-stubs'
extraPaths = [
]
#strict = ["src"]
analyzeUnannotatedFunctions = true
disableBytesTypePromotions = true
strictParameterNoneValue = true
enableTypeIgnoreComments = true
enableReachabilityAnalysis = true
strictListInference = true
strictDictionaryInference = true
strictSetInference = true
deprecateTypingAliases = false
enableExperimentalFeatures = false
reportMissingTypeStubs ="error"
reportMissingModuleSource = "warning"
reportInvalidTypeForm = "error"
reportMissingImports = "error"
reportUndefinedVariable = "error"
reportAssertAlwaysTrue = "error"
reportInvalidStringEscapeSequence = "error"
reportInvalidTypeVarUse = "error"
reportSelfClsParameterName = "error"
reportUnsupportedDunderAll = "error"
reportUnusedExpression = "error"
reportWildcardImportFromLibrary = "error"
reportAbstractUsage = "error"
reportArgumentType = "error"
reportAssertTypeFailure = "error"
reportAssignmentType = "error"
reportAttributeAccessIssue = "error"
reportCallIssue = "error"
reportGeneralTypeIssues = "error"
reportInconsistentOverload = "error"
reportIndexIssue = "error"
reportInvalidTypeArguments = "error"
reportNoOverloadImplementation = "error"
reportOperatorIssue = "error"
reportOptionalSubscript = "error"
reportOptionalMemberAccess = "error"
reportOptionalCall = "error"
reportOptionalIterable = "error"
reportOptionalContextManager = "error"
reportOptionalOperand = "error"
reportRedeclaration = "error"
reportReturnType = "error"
reportTypedDictNotRequiredAccess = "error"
reportPrivateImportUsage = "error"
reportUnboundVariable = "error"
reportUnhashable = "error"
reportUnusedCoroutine = "error"
reportUnusedExcept = "error"
reportFunctionMemberAccess = "error"
reportIncompatibleMethodOverride = "error"
reportIncompatibleVariableOverride = "error"
reportOverlappingOverload = "error"
reportPossiblyUnboundVariable = "error"
reportConstantRedefinition = "error"
#reportDeprecated = "error"
reportDeprecated = "warning"
reportDuplicateImport = "error"
reportIncompleteStub = "error"
reportInconsistentConstructor = "error"
reportInvalidStubStatement = "error"
reportMatchNotExhaustive = "error"
reportMissingParameterType = "error"
reportMissingTypeArgument = "error"
reportPrivateUsage = "error"
reportTypeCommentUsage = "error"
reportUnknownArgumentType = "error"
reportUnknownLambdaType = "error"
reportUnknownMemberType = "error"
reportUnknownParameterType = "error"
reportUnknownVariableType = "error"
#reportUnknownVariableType = "warning"
reportUnnecessaryCast = "error"
reportUnnecessaryComparison = "error"
reportUnnecessaryContains = "error"
#reportUnnecessaryIsInstance = "error"
reportUnnecessaryIsInstance = "warning"
reportUnusedClass = "error"
#reportUnusedImport = "error"
reportUnusedImport = "none"
# reportUnusedFunction = "error"
reportUnusedFunction = "warning"
#reportUnusedVariable = "error"
reportUnusedVariable = "warning"
reportUntypedBaseClass = "error"
reportUntypedClassDecorator = "error"
reportUntypedFunctionDecorator = "error"
reportUntypedNamedTuple = "error"
reportCallInDefaultInitializer = "none"
reportImplicitOverride = "none"
reportImplicitStringConcatenation = "none"
reportImportCycles = "none"
reportMissingSuperCall = "none"
reportPropertyTypeMismatch = "none"
reportShadowedImports = "none"
reportUninitializedInstanceVariable = "none"
reportUnnecessaryTypeIgnoreComment = "none"
reportUnusedCallResult = "none"

@ -0,0 +1,25 @@
import fastapi
from ..payloads.settings import Settings as ModelsSettings
import sqlalchemy.ext.asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from typing import (Annotated, Generator,)
def create_engine() -> Generator[
'async_sessionmaker[AsyncSession]',
None,
None,
]:
engine = sqlalchemy.ext.asyncio.create_async_engine(ModelsSettings.singleton().db_url)
async_session = sqlalchemy.ext.asyncio.async_sessionmaker(engine)
yield async_session
AsyncSessionDep = Annotated[
'async_sessionmaker[AsyncSession]',
fastapi.Depends(create_engine)
]

@ -0,0 +1,113 @@
import fastapi
import importlib
import pydantic
import functools
import logging
import copy
import uvicorn
import uvicorn.config
import sys
from .settings import Settings as APISettings
# from .db import create_engine, async_session
from ..payloads.views import router as payloads_router
# from .websocket_api import WebsocketAPI
from typing import (
Any,
Optional,
Literal,
Annotated,
cast,
Callable,
)
logger = logging.getLogger(__name__)
# async def websocket_tickers(
# websocket: fastapi.WebSocket,
# websocket_api: WebsocketAPI,
# ) -> None:
# try:
# await websocket_api.connect(websocket)
#
# while True:
# msg = await websocket.receive_text()
# await websocket_api.on_message(websocket, msg)
# except fastapi.WebSocketDisconnect:
# pass
# # websocket_api.disconnect(websocket)
# except:
# logger.exception('')
# raise
# finally:
# await websocket_api.disconnect(websocket)
def create_app() -> fastapi.FastAPI:
# async_session = create_engine()
# websocket_api = WebsocketAPI(
# session=async_session,
# )
app = fastapi.FastAPI(
# dependencies=[
# fastapi.Depends(async_session),
# ]
)
logger.info(dict(msg='started loading apps'))
for app_config in APISettings.singleton().apps:
logger.info(dict(msg='start loading app = {}'.format(app_config)))
app_module, app_method, app_prefix = app_config.split(':')
app_router = cast(
Callable[[], Any],
getattr(
importlib.import_module(app_module),
app_method
)
)()
assert isinstance(app_router, fastapi.APIRouter)
app.include_router(
app_router,
prefix=app_prefix,
# prefix='/',
)
logger.info(dict(msg='done loading app = {}'.format(app_config)))
logger.info(dict(msg='done loading apps'))
# app.websocket(
# '/tickers/',
# )(
# functools.partial(
# websocket_tickers,
# websocket_api=fastapi.Depends(lambda : websocket_api),
# )
# )
return app
def run(args: list[str]):
logging.basicConfig(level=logging.INFO)
log_config = copy.deepcopy(uvicorn.config.LOGGING_CONFIG)
uvicorn.run(
# create_app(),
create_app,
host=APISettings.singleton().uvicorn_host,
port=APISettings.singleton().uvicorn_port,
loop='uvloop',
log_config=log_config,
log_level=logging.INFO,
)
if __name__ == '__main__':
run(sys.argv[1:])

@ -0,0 +1,71 @@
import pydantic
import decimal
from typing import (
Literal,
Annotated,
)
# class SubscribeAction(pydantic.BaseModel):
# action: Literal['subscribe']
# class message_t(pydantic.BaseModel):
# asset_id: Annotated[
# int,
# pydantic.Field(alias='assetId')
# ]
#
# message: message_t
#
# class AssetsAction(pydantic.BaseModel):
# action: Literal['assets']
# class message_t(pydantic.BaseModel):
# pass
#
# message: Annotated[
# message_t,
# pydantic.Field(
# default_factory=message_t,
# )
# ]
#
# Action = pydantic.RootModel[
# AssetsAction | SubscribeAction
# ]
#
# class AssetHistoryResponse(pydantic.BaseModel):
# action: Literal['asset_history'] = 'asset_history'
#
# class message_t(pydantic.BaseModel):
# class point_t(pydantic.BaseModel):
# asset_name : Annotated[
# str,
# pydantic.Field(
# alias='assetName',
# )
# ]
# time: int
# asset_id : Annotated[
# int,
# pydantic.Field(alias='assetId')
# ]
# value: decimal.Decimal
#
# points: list[point_t]
# message: message_t
#
# class AssetTickerResponse(pydantic.BaseModel):
# action: Literal['point'] = 'point'
#
# message: 'AssetHistoryResponse.message_t.point_t'
#
# class AssetsResponse(pydantic.BaseModel):
# action: Literal['assets'] = 'assets'
#
# class message_t(pydantic.BaseModel):
# class asset_t(pydantic.BaseModel):
# id: int
# name: str
#
# assets: list[asset_t]
# message: message_t
#

@ -0,0 +1,29 @@
import pydantic
import pydantic_settings
from typing import (
ClassVar,
Optional,
Annotated,
)
class Settings(pydantic_settings.BaseSettings):
apps: Annotated[
list[str],
pydantic.Field(
default_factory=list,
)
]
uvicorn_port: int = 80
uvicorn_host: str = '127.0.0.1'
_singleton: ClassVar[Optional['Settings']] = None
@classmethod
def singleton(cls) -> 'Settings':
if cls._singleton is None:
cls._singleton = Settings.model_validate({})
return cls._singleton

@ -0,0 +1,114 @@
import asyncio
import logging
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.engine.base import Connection
from alembic import context
from online.fxreader.pr34.test_task_2025_07_17_v2.payloads.settings import Settings
from online.fxreader.pr34.test_task_2025_07_17_v2.payloads.models import (
Base,
# Market,
)
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
config.set_main_option('sqlalchemy.url', Settings.singleton().db_url)
# Interpret the config file for Python logging.
# This line sets up loggers basically.
# if config.config_file_name is not None:
# fileConfig(config.config_file_name)
# else:
if True:
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# target_metadata = None
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def do_run_migrations(
connection: Connection,
):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations():
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
logger.info(dict(msg='started'))
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
logger.info(dict(msg='done'))
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option('sqlalchemy.url')
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={'paramstyle': 'named'},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
raise NotImplementedError
# run_migrations_offline()
else:
run_migrations_online()

@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

@ -0,0 +1,42 @@
"""add payloads models
Revision ID: f7fa90d3339d
Revises:
Create Date: 2025-07-18 09:58:54.099010
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'f7fa90d3339d'
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
'payloads_payload',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('output', sa.JSON(), nullable=False),
sa.Column('list_1', sa.JSON(), nullable=False),
sa.Column('list_2', sa.JSON(), nullable=False),
sa.Column('input_hash', sa.String(), nullable=False),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('input_hash'),
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('payloads_payload')
# ### end Alembic commands ###

@ -0,0 +1,11 @@
import logging
import fastapi
logger = logging.getLogger(__name__)
from . import views
from typing import (Annotated,)
def get_app_router() -> fastapi.APIRouter:
return views.router

@ -0,0 +1,112 @@
import datetime
import hashlib
import json
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload, make_transient
from sqlalchemy.future import select
from .models import Payload
from .utils import get_or_create
from typing import (Optional, Any,)
async def payload_get_or_create(
session: AsyncSession,
output: list[str],
list_1: list[str],
list_2: list[str],
input_hash: Optional[str] = None,
):
if input_hash is None:
input_hash = hashlib.sha256(json.dumps(dict(
list_1=list_1,
list_2=list_2,
)).encode('utf-8')).digest().hex()
return await get_or_create(
session,
Payload,
create_method_kwargs=dict(
output=output,
list_1=list_1,
list_2=list_2,
),
input_hash=input_hash,
)
# async def markets_get_by_symbol(
# session: 'async_sessionmaker[AsyncSession]',
# symbols: set[str],
# ) -> dict[str, int]:
# res : dict[str, int] = dict()
#
# async with session() as active_session:
# async with active_session.begin() as transaction:
# for o in symbols:
# m = (await get_or_create(
# active_session,
# Market,
# name=o,
# ))[0]
# res[o] = m.id
#
# return res
#
# async def ticker_store_multiple(
# session: 'async_sessionmaker[AsyncSession]',
# tickers: list[Ticker],
# ) -> None:
# async with session() as active_session:
# async with active_session.begin() as transaction:
# active_session.add_all(
# tickers,
# )
#
# async def tickers_get_by_period(
# session: 'async_sessionmaker[AsyncSession]',
# market_id: int,
# period: datetime.timedelta,
# ) -> list[Ticker]:
# async with session() as active_session:
# async with active_session.begin() as transaction:
# q = select(
# Ticker
# ).join(Ticker.market).where(
# Market.id == market_id,
# Ticker.timestamp >= datetime.datetime.now(
# tz=datetime.timezone.utc
# ) - period
# ).order_by(Ticker.timestamp.desc()).options(
# selectinload(Ticker.market)
# )
#
# res = await active_session.execute(q)
#
# rows = [o[0] for o in res]
#
# for o in rows:
# active_session.expunge(o)
# make_transient(o.market)
#
# return rows
#
# async def markets_all(
# session: 'async_sessionmaker[AsyncSession]',
# ) -> list[Market]:
# async with session() as active_session:
# async with active_session.begin() as transaction:
# q = select(
# Market
# )
#
# res = await active_session.execute(q)
#
# rows = [o[0] for o in res]
#
# for o in rows:
# active_session.expunge(o)
#
# return rows
#

@ -0,0 +1,50 @@
import datetime
import decimal
import json
from sqlalchemy.orm import (
mapped_column,
Mapped,
DeclarativeBase,
relationship,
)
from sqlalchemy import (
String,
ForeignKey,
Numeric,
DateTime,
UniqueConstraint,
JSON,
)
from typing import (
Optional,
)
class Base(DeclarativeBase):
pass
class Payload(Base):
__tablename__ = 'payloads_payload'
id: Mapped[int] = mapped_column(primary_key=True)
output: Mapped[list[str]] = mapped_column(JSON())
list_1: Mapped[list[str]] = mapped_column(JSON())
list_2: Mapped[list[str]] = mapped_column(JSON())
input_hash: Mapped[str] = mapped_column()
__table_args__ = (UniqueConstraint('input_hash'),)
def __repr__(self) -> str:
return json.dumps(
dict(
model=str(type(self)),
id=self.id,
output=self.output,
list_1=self.list_1,
list_2=self.list_2,
input_hash=self.input_hash,
)
)

@ -0,0 +1,5 @@
import pydantic
class Payload(pydantic.BaseModel):
id: int
output: list[str]

@ -0,0 +1,24 @@
import pydantic
import pydantic_settings
from typing import (
ClassVar,
Optional,
Literal,
)
class Settings(pydantic_settings.BaseSettings):
db_url: str
_singleton: ClassVar[Optional['Settings']] = None
summarizer_domain: str
summarizer_protocol: Literal['http', 'https']
@classmethod
def singleton(cls) -> 'Settings':
if cls._singleton is None:
cls._singleton = Settings.model_validate({})
return cls._singleton

@ -0,0 +1,18 @@
from ..transform import rest_client as summarizer_rest_client
from ..transform.rest_client import SummaryRequest
from .settings import Settings as ModelSettings
from typing import (ClassVar, Optional,)
class SummarizerClient:
_summarizer_client : ClassVar[Optional[summarizer_rest_client.SummarizerClient]] = None
@classmethod
def singleton(cls) -> summarizer_rest_client.SummarizerClient:
if cls._summarizer_client is None:
cls._summarizer_client = summarizer_rest_client.SummarizerClient(
domain=ModelSettings.singleton().summarizer_domain,
protocol=ModelSettings.singleton().summarizer_protocol,
)
return cls._summarizer_client

@ -0,0 +1,45 @@
from typing import (
TypeVar,
Optional,
Any,
cast,
)
from sqlalchemy.ext.asyncio import AsyncSessionTransaction, AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.exc import NoResultFound, IntegrityError
M = TypeVar('M', bound='DeclarativeBase')
async def get_or_create(
session: AsyncSession, model: type[M], create_method: Optional[str] = None, create_method_kwargs: Optional[dict[str, Any]] = None, **kwargs: Any
) -> tuple[M, bool]:
async def select_row() -> M:
res = await session.execute(select(model).where(*[getattr(model, k) == v for k, v in kwargs.items()]))
row = res.one()[0]
assert isinstance(row, model)
return row
try:
res = await select_row()
return res, False
except NoResultFound:
if create_method_kwargs:
kwargs.update(create_method_kwargs)
if not create_method:
created = model(**kwargs)
else:
created = getattr(model, create_method)(**kwargs)
try:
session.add(created)
await session.flush()
return created, True
except IntegrityError:
await session.rollback()
return await select_row(), False

@ -0,0 +1,70 @@
import fastapi
import itertools
from typing import (Annotated, Any, cast, Optional,)
from . import schema
from .summarizer import SummarizerClient, SummaryRequest
from ..async_api.db import AsyncSessionDep
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from . import logic
router = fastapi.APIRouter()
@router.post('/payload')
async def payload_create(
list_1: Annotated[
list[str],
fastapi.Body(),
],
list_2: Annotated[
list[str],
fastapi.Body(),
],
session: AsyncSessionDep
) -> schema.Payload:
data_1 = (await SummarizerClient.singleton().summarize_post(
request=SummaryRequest(
data=list_1
)
)).data
data_2 = (await SummarizerClient.singleton().summarize_post(
request=SummaryRequest(
data=list_2
)
)).data
output_len = max(len(data_1), len(data_2))
def filter_none(o: Any) -> bool:
return o is not None
output = list(
filter(
filter_none,
sum(
list(
itertools.zip_longest(data_1, data_2),
),
cast(tuple[str], tuple()),
)
)
)
async with session() as active_session:
async with active_session.begin() as transaction:
payload, is_created = await logic.payload_get_or_create(
active_session,
list_1=list_1,
list_2=list_2,
output=output,
)
raise NotImplementedError
@router.get('/payload/{paylaod_id}')
async def payload_read(
payload_id: int,
) -> schema.Payload:
raise NotImplementedError

@ -0,0 +1,27 @@
import logging
import fastapi
logger = logging.getLogger(__name__)
from . import views
from .dependencies import summarizer_dependency
from typing import (Annotated,)
def get_app_router() -> fastapi.APIRouter:
logger.info(dict(msg='started'))
router = fastapi.APIRouter(
# dependencies=[
# fastapi.Depends(summarizer_dependency,)
# ]
)
router.include_router(
views.router,
prefix='',
)
logger.info(dict(msg='done'))
return router

@ -0,0 +1,22 @@
import logging
import fastapi
logger = logging.getLogger(__name__)
from .worker import Summarizer
from typing import (Annotated,)
async def create_summarizer(
) -> Summarizer:
# return Summarizer()
return Summarizer.singleton()
AnnotatedSummarizer = Annotated[
Summarizer, fastapi.Depends(create_summarizer)
]
async def summarizer_dependency(
summarizer: AnnotatedSummarizer
) -> None:
pass

@ -0,0 +1,12 @@
import asyncio
async def transform(
list_1: list[str],
list_2: list[str]
) -> list[str]:
raise NotImplementedError
async def transform_api_post(
data: list[str],
) -> list[str]:
raise NotImplementedError

@ -0,0 +1,36 @@
import asyncio
import json
import aiohttp
import logging
from .schema import Summary, SummaryRequest
from typing import (Literal,)
logger = logging.getLogger(__name__)
class SummarizerClient:
def __init__(
self,
domain: str,
protocol: Literal['http', 'https'] = 'http',
) -> None:
self.domain = domain
self.protocol = protocol
async def summarize_post(
self,
request: SummaryRequest,
) -> Summary:
async with aiohttp.ClientSession() as session:
async with session.post(
'{}://{}/summarize'.format(self.protocol, self.domain),
json=json.loads(request.json()),
) as response:
data_json = await response.text()
data = Summary.model_validate_json(
data_json,
)
return data

@ -0,0 +1,7 @@
import pydantic
class Summary(pydantic.BaseModel):
data: list[str]
class SummaryRequest(pydantic.BaseModel):
data: list[str]

@ -0,0 +1,25 @@
import fastapi
from typing import (Annotated, Any,)
from . import schema
# from .worker import Summarizer
from .dependencies import AnnotatedSummarizer
router = fastapi.APIRouter()
@router.post(
'/summarize',
# response_model=schema.Summary,
)
async def summarize(
request: Annotated[
schema.SummaryRequest,
fastapi.Body(),
],
summarizer: AnnotatedSummarizer
) -> schema.Summary:
return schema.Summary(
data=summarizer.summarize(
request.data,
)
)

@ -0,0 +1,80 @@
import transformers
import transformers.pipelines
import logging
logger = logging.getLogger(__name__)
from typing import (
Any, cast, Callable, Protocol, Literal, TypedDict,
TypeAlias, ClassVar, Optional,
)
class SummarizerPipeline(Protocol):
class predict_t:
class output_t(TypedDict):
summary_text: str
res_t : TypeAlias = list[output_t]
def predict(self, data: str) -> predict_t.res_t: ...
class Pipeline(Protocol):
def __call__(
self,
task: Literal['summarization'],
model: Any,
tokenizer: Any,
) -> 'SummarizerPipeline': ...
class Summarizer:
def __init__(self) -> None:
logger.info(dict(msg='started loading bart'))
self.tokenizer = cast(
Callable[[str], Any],
getattr(transformers.AutoTokenizer, 'from_pretrained')
)(
'sshleifer/distilbart-cnn-12-6',
)
self.model = cast(
Callable[[str], Any],
getattr(transformers.AutoModelForSeq2SeqLM, 'from_pretrained')
)(
'sshleifer/distilbart-cnn-12-6',
)
logger.info(dict(msg='done loading bart'))
self.summarizer = cast(
Pipeline,
# getattr(transformers.pipelines, 'pipeline')
getattr(transformers, 'pipeline')
)(
'summarization',
model=self.model,
tokenizer=self.tokenizer,
# framework='pt',
)
logger.info(dict(msg='created pipeline'))
def summarize(
self,
data: list[str]
) -> list[str]:
res = self.summarizer.predict(
' '.join(data)
)
assert len(res) == 1
return res[0]['summary_text'].split()
_singleton: ClassVar[Optional['Summarizer']] = None
@classmethod
def singleton(cls) -> 'Summarizer':
if cls._singleton is None:
cls._singleton = Summarizer()
return cls._singleton

@ -0,0 +1,28 @@
alembic
fastapi
uvicorn
websockets
uvloop
tomlq
mypy
marisa-trie
pydantic
asyncpg
pydantic-settings
tomlkit
tomlq
numpy
cryptography
mypy
pyright
ruff
ipython
ipdb
requests
types-requests
aiohttp
build
wheel
setuptools
setuptools-scm
transformers

@ -0,0 +1,2 @@
--extra-index-url https://download.pytorch.org/whl/cpu
torch

@ -0,0 +1,86 @@
# This file was autogenerated by uv via the following command:
# uv pip compile -p 3.12.9 --generate-hashes requirements.torch.in
filelock==3.13.1 \
--hash=sha256:57dbda9b35157b05fb3e58ee91448612eb674172fab98ee235ccb0b5bee19a1c
# via torch
fsspec==2024.6.1 \
--hash=sha256:3cb443f8bcd2efb31295a5b9fdb02aee81d8452c80d28f97a6d0959e6cee101e
# via torch
jinja2==3.1.4 \
--hash=sha256:bc5dd2abb727a5319567b7a813e6a2e7318c39f4f487cfe6c89c6f9c7d25197d
# via torch
markupsafe==2.1.5 \
--hash=sha256:17b950fccb810b3293638215058e432159d2b71005c74371d784862b7e4683f3 \
--hash=sha256:1f3fbcb7ef1f16e48246f704ab79d79da8a46891e2da03f8783a5b6fa41a9532 \
--hash=sha256:2174c595a0d73a3080ca3257b40096db99799265e1c27cc5a610743acd86d62f \
--hash=sha256:2b7c57a4dfc4f16f7142221afe5ba4e093e09e728ca65c51f5620c9aaeb9a617 \
--hash=sha256:3c6b973f22eb18a789b1460b4b91bf04ae3f0c4234a0a6aa6b0a92f6f7b951d4 \
--hash=sha256:3e53af139f8579a6d5f7b76549125f0d94d7e630761a2111bc431fd820e163b8 \
--hash=sha256:4275d846e41ecefa46e2015117a9f491e57a71ddd59bbead77e904dc02b1bed2 \
--hash=sha256:4c31f53cdae6ecfa91a77820e8b151dba54ab528ba65dfd235c80b086d68a465 \
--hash=sha256:4f11aa001c540f62c6166c7726f71f7573b52c68c31f014c25cc7901deea0b52 \
--hash=sha256:5b7b716f97b52c5a14bffdf688f971b2d5ef4029127f1ad7a513973cfd818df2 \
--hash=sha256:619bc166c4f2de5caa5a633b8b7326fbe98e0ccbfacabd87268a2b15ff73a029 \
--hash=sha256:629ddd2ca402ae6dbedfceeba9c46d5f7b2a61d9749597d4307f943ef198fc1f \
--hash=sha256:656f7526c69fac7f600bd1f400991cc282b417d17539a1b228617081106feb4a \
--hash=sha256:6ec585f69cec0aa07d945b20805be741395e28ac1627333b1c5b0105962ffced \
--hash=sha256:72b6be590cc35924b02c78ef34b467da4ba07e4e0f0454a2c5907f473fc50ce5 \
--hash=sha256:7502934a33b54030eaf1194c21c692a534196063db72176b0c4028e140f8f32c \
--hash=sha256:7a68b554d356a91cce1236aa7682dc01df0edba8d043fd1ce607c49dd3c1edcf \
--hash=sha256:823b65d8706e32ad2df51ed89496147a42a2a6e01c13cfb6ffb8b1e92bc910bb \
--hash=sha256:8dec4936e9c3100156f8a2dc89c4b88d5c435175ff03413b443469c7c8c5f4d1 \
--hash=sha256:97cafb1f3cbcd3fd2b6fbfb99ae11cdb14deea0736fc2b0952ee177f2b813a46 \
--hash=sha256:a17a92de5231666cfbe003f0e4b9b3a7ae3afb1ec2845aadc2bacc93ff85febc \
--hash=sha256:a549b9c31bec33820e885335b451286e2969a2d9e24879f83fe904a5ce59d70a \
--hash=sha256:ac07bad82163452a6884fe8fa0963fb98c2346ba78d779ec06bd7a6262132aee \
--hash=sha256:ae2ad8ae6ebee9d2d94b17fb62763125f3f374c25618198f40cbb8b525411900 \
--hash=sha256:b91c037585eba9095565a3556f611e3cbfaa42ca1e865f7b8015fe5c7336d5a5 \
--hash=sha256:bf50cd79a75d181c9181df03572cdce0fbb75cc353bc350712073108cba98de5 \
--hash=sha256:c8b29db45f8fe46ad280a7294f5c3ec36dbac9491f2d1c17345be8e69cc5928f \
--hash=sha256:d283d37a890ba4c1ae73ffadf8046435c76e7bc2247bbb63c00bd1a709c6544b \
--hash=sha256:db0b55e0f3cc0be60c1f19efdde9a637c32740486004f20d1cff53c3c0ece4d2 \
--hash=sha256:e61659ba32cf2cf1481e575d0462554625196a1f2fc06a1c777d3f48e8865d46 \
--hash=sha256:ea3d8a3d18833cf4304cd2fc9cbb1efe188ca9b5efef2bdac7adc20594a0e46b \
--hash=sha256:ec6a563cff360b50eed26f13adc43e61bc0c04d94b8be985e6fb24b81f6dcfdf \
--hash=sha256:f5dfb42c4604dddc8e4305050aa6deb084540643ed5804d7455b5df8fe16f5e5 \
--hash=sha256:fa173ec60341d6bb97a89f5ea19c85c5643c1e7dedebc22f5181eb73573142c5 \
--hash=sha256:fa9db3f79de01457b03d4f01b34cf91bc0048eb2c3846ff26f66687c2f6d16ab \
--hash=sha256:ffee1f21e5ef0d712f9033568f8344d5da8cc2869dbd08d87c84656e6a2d2f68
# via jinja2
mpmath==1.3.0 \
--hash=sha256:a0b2b9fe80bbcd81a6647ff13108738cfb482d481d826cc0e02f5b35e5c88d2c
# via sympy
networkx==3.3 \
--hash=sha256:28575580c6ebdaf4505b22c6256a2b9de86b316dc63ba9e93abde3d78dfdbcf2
# via torch
setuptools==70.2.0 \
--hash=sha256:b8b8060bb426838fbe942479c90296ce976249451118ef566a5a0b7d8b78fb05
# via torch
sympy==1.13.3 \
--hash=sha256:54612cf55a62755ee71824ce692986f23c88ffa77207b30c1368eda4a7060f73
# via torch
torch==2.7.1+cpu \
--hash=sha256:0bc887068772233f532b51a3e8c8cfc682ae62bef74bf4e0c53526c8b9e4138f \
--hash=sha256:1f04a373a3f643821f721da9898ef77dce73b5b6bfc64486f0976f7fb5f90e83 \
--hash=sha256:355614185a2aea7155f9c88a20bfd49de5f3063866f3cf9b2f21b6e9e59e31e0 \
--hash=sha256:3bf2db5adf77b433844f080887ade049c4705ddf9fe1a32023ff84ff735aa5ad \
--hash=sha256:464bca1bc9452f2ccd676514688896e66b9488f2a0268ecd3ac497cf09c5aac1 \
--hash=sha256:56136a2aca6707df3c8811e46ea2d379eaafd18e656e2fd51e8e4d0ca995651b \
--hash=sha256:5fe6045b8f426bf2d0426e4fe009f1667a954ec2aeb82f1bd0bf60c6d7a85445 \
--hash=sha256:7b977eccbc85ae2bd19d6998de7b1f1f4bd3c04eaffd3015deb7934389783399 \
--hash=sha256:84ea1f6a1d15663037d01b121d6e33bb9da3c90af8e069e5072c30f413455a57 \
--hash=sha256:8f8b3cfc53010a4b4a3c7ecb88c212e9decc4f5eeb6af75c3c803937d2d60947 \
--hash=sha256:a1684793e352f03fa14f78857e55d65de4ada8405ded1da2bf4f452179c4b779 \
--hash=sha256:a2618775f32eb4126c5b2050686da52001a08cffa331637d9cf51c8250931e00 \
--hash=sha256:a4551cb97b83df5f93fc0d7538332535828581e1db2f179afc287027afbdd6e8 \
--hash=sha256:b4cc706973655151f198d027ed34c92ab31a3db55676b44251194e1280631426 \
--hash=sha256:b66f77f6f67317344ee083aa7ac4751a14395fcb38060d564bf513978d267153 \
--hash=sha256:c0df17cee97653d09a4e84488a33d21217f9b24208583c55cf28f0045aab0766 \
--hash=sha256:d205cac087d60bc176bdc0b63a1d00dc7a4ee5ac76fd20a2ca318ac65674167e \
--hash=sha256:d25435bdc4780d3cb512aad55142aca9584ae1fe8f8691cda6d32f19faf5d58e \
--hash=sha256:eb17646792ac4374ffc87e42369f45d21eff17c790868963b90483ef0b6db4ef
# via -r requirements.torch.in
typing-extensions==4.12.2 \
--hash=sha256:04e5ca0351e0f3f85c6853954072df659d0d13fac324d0072316b67d7794700d
# via torch
--extra-index-url https://download.pytorch.org/whl/cpu

File diff suppressed because it is too large Load Diff