[+] add simple fastapi template

1. rename basic package name;
  2. rewrite requirements into docs/readme.md;
This commit is contained in:
Siarhei Siniak 2025-07-17 21:14:07 +03:00
parent 8385eff636
commit d0ffbeef0f
28 changed files with 1200 additions and 0 deletions

@ -0,0 +1,5 @@
.venv
tmp
.git
.env
build

@ -0,0 +1 @@
releases/whl/** filter=lfs diff=lfs merge=lfs -text

@ -0,0 +1,6 @@
!.tmuxp/
!python
.env/
releases/tar
build
!releases/whl/**

@ -0,0 +1,9 @@
session_name: test-task-2025-07-17-v2
start_directory: ${PWD}/deps/test-task-2025-07-17-v2
windows:
- focus: 'true'
layout: 5687,98x12,0,0,18
options: {}
panes:
- pane
window_name: zsh

62
deps/test-task-2025-07-17-v2/Makefile vendored Normal file

@ -0,0 +1,62 @@
ENV_PATH ?= .venv
PYTHON_PATH = $(ENV_PATH)/bin/python3
PYTHON_VERSION ?= 3.10
UV_ARGS ?= --offline
DOCKER ?= podman
COMPOSE ?= podman compose
venv_extract_requirements:
$(ENV_PATH)/bin/tomlq \
-r '.project.dependencies | join("\n")' \
pyproject.toml > requirements.in
venv_compile:
uv pip compile \
$(UV_ARGS) \
-p $(PYTHON_VERSION) \
--generate-hashes \
requirements.in > \
requirements.txt
venv:
uv \
venv \
-p 3.13 \
$(UV_ARGS) \
--seed \
$(ENV_PATH)
uv \
pip install \
$(UV_ARGS) \
-p $(ENV_PATH) \
-r requirements.txt
pyright:
$(ENV_PATH)/bin/python3 -m pyright \
-p pyproject.toml \
--pythonpath $(PYTHON_PATH)
compose_env:
cat docker/postgresql/.env .env/postgresql.env > .env/postgresql.patched.env
cat docker/web/.env .env/web.env > .env/web.patched.env
compose_build_web:
$(COMPOSE) build web
git-release:
git archive \
--format=tar \
-o "releases/tar/repo-$$(git describe --tags).tar" \
HEAD
ALEMBIC_CMD ?= --help
alembic:
$(ENV_PATH)/bin/alembic \
-c pyproject.toml \
$(ALEMBIC_CMD)
deploy_wheel:
make pyright
$(PYTHON_PATH) -m build -o releases/whl -w -n

@ -0,0 +1,76 @@
services:
redis:
image: docker.io/redis:latest-alpine@sha256:e71b4cb00ea461ac21114cff40ff12fb8396914238e1e9ec41520b2d5a4d3423
ports:
- 127.0.0.1:9004:6379
web: &web
image: online.fxreader.pr34.test_task_2025_07_17_v1:dev
build:
context: .
dockerfile: ./docker/web/Dockerfile
target: web
env_file: .env/web.env
logging:
driver: "json-file"
options:
max-size: 10m
max-file: "3"
deploy:
resources:
limits:
cpus: '0.5'
memory: 128M
web-dev:
<<: *web
volumes:
- .:/app:ro
- ./tmp/cache:/app/tmp/cache:rw
payloads:
<<: *web
image: online.fxreader.pr34.test_task_2025_07_17_v1:dev
environment:
command:
- python3
- -m
- online.fxreader.pr34.test_task_2025_07_17_v1.async_api.app
postgresql:
image: docker.io/postgres:14.18-bookworm@sha256:c0aab7962b283cf24a0defa5d0d59777f5045a7be59905f21ba81a20b1a110c9
# restart: always
# set shared memory limit when using docker compose
shm_size: 128mb
volumes:
- postgresql_data:/var/lib/postgresql/data/:rw
# or set shared memory limit when deploy via swarm stack
#volumes:
# - type: tmpfs
# target: /dev/shm
# tmpfs:
# size: 134217728 # 128*2^20 bytes = 128Mb
env_file: .env/postgresql.patched.env
# environment:
# POSTGRES_PASSWORD: example
ports:
- 127.0.0.1:9002:5432
logging:
driver: "json-file"
options:
max-size: 10m
max-file: "3"
deploy:
resources:
limits:
cpus: '0.5'
memory: 128M
adminer:
image: docker.io/adminer:standalone@sha256:730215fe535daca9a2f378c48321bc615c8f0d88668721e0eff530fa35b6e8f6
ports:
- 127.0.0.1:9001:8080
volumes:
postgresql_data:

@ -0,0 +1,3 @@
PGDATA=/var/lib/postgresql/data/pgdata
POSTGRES_USER=tickers
POSTGRES_DB=tickers

@ -0,0 +1 @@
# DB_URL=

@ -0,0 +1,50 @@
FROM docker.io/library/python:3.12@sha256:6121c801703ec330726ebf542faab113efcfdf2236378c03df8f49d80e7b4180 AS base
ENV DEBIAN_FRONTEND=noninteractive
WORKDIR /app
COPY docker/web/apt.requirements.txt docker/web/apt.requirements.txt
RUN apt-get update \
&& apt-get install -y $(cat docker/web/apt.requirements.txt)
RUN \
pip3 install \
--break-system-packages uv
COPY requirements.txt requirements.txt
RUN \
--mount=type=bind,source=releases/whl,target=/app/releases/whl \
--mount=type=cache,target=/root/.cache/pip \
--mount=type=cache,target=/root/.cache/uv \
uv pip \
install \
--system \
--break-system-packages \
-f releases/whl \
-r requirements.txt
WORKDIR /app
RUN apt-get update -yy && apt-get install -yy tini
FROM base as web
RUN \
--mount=type=bind,source=releases/whl,target=/app/releases/whl \
--mount=type=cache,target=/root/.cache/pip \
--mount=type=cache,target=/root/.cache/uv \
uv pip \
install \
--system \
--break-system-packages \
--no-index \
-f releases/whl \
'online.fxreader.pr34.test_task_2025_06_30_v1==0.1'
ENTRYPOINT ["tini", "--"]
CMD [ \
"python3", \
"-m", \
"online.fxreader.pr34.test_task_2025_06_30_v1.async_api.app" \
]

@ -0,0 +1 @@
wget tar git curl

@ -0,0 +1,34 @@
# Requirements
1. FastAPI Microservice // Caching service
1.1. endpoints
1.1.1 POST:/payload `payload_create`
1.1.2. GET:/payload/<payload_id>/ `payload_read`
1.2. tech specs
1.2.1. fastapi for rest api
1.2.2. sqlite/postgresql for DB that caches LLM replies;
1.2.3. LLM transform can be stubbed (idk, maybe try to find something simple);
1.2.4. docker-compose for services
1.2.5. add pytest based tests;
1.2.6. add some linters for code style, and type checking;
# Schemas
```yaml
endpoints:
payload:
create:
request:
list_1: list[str]
list_2: list[str]
response:
payload:
id: int
output: list[str]
read:
request:
id: int
response:
id: int
output: list[str]
```

@ -0,0 +1,229 @@
[project]
description = 'test task for LLM replies caching'
requires-python = '>= 3.10'
maintainers = [
{ name = 'Siarhei Siniak', email = 'siarheisiniak@gmail.com' },
]
classifiers = [
'Programming Language :: Python',
]
name = 'online.fxreader.pr34.test_task_2025_07_17_v2'
version = '0.1.1'
dependencies = [
'alembic',
'fastapi',
'uvicorn',
'websockets',
'uvloop',
'tomlq',
'mypy',
'marisa-trie',
'pydantic',
'asyncpg',
'pydantic-settings',
'tomlkit',
'tomlq',
'numpy',
'cryptography',
'mypy',
'pyright',
'ruff',
'ipython',
'ipdb',
'requests',
'types-requests',
'aiohttp',
'build',
'wheel',
'setuptools',
'setuptools-scm',
]
[build-system]
requires = ['build', 'wheel', 'setuptools', 'setuptools-scm']
build-backend = 'setuptools.build_meta'
[tool.setuptools]
include-package-data = false
[tool.setuptools.package-dir]
'online.fxreader.pr34.test_task_2025_07_17_v2' = 'python/online/fxreader/pr34/test_task_2025_07_17_v2'
[tool.alembic]
script_location = 'python/online/fxreader/pr34/test_task_2025_07_17_v2/tickers/alembic'
prepend_sys_path = ['python']
# sqlalchemy.url = 'asdfasdf:/asdfasdfa'
[tool.ruff]
line-length = 160
target-version = 'py310'
# builtins = ['_', 'I', 'P']
include = [
# 'follow_the_leader/**/*.py',
#'*.py',
# '*.recipe',
'python/**/*.py',
'python/**/*.pyi',
]
exclude = [
'.venv',
]
[tool.ruff.format]
quote-style = 'single'
indent-style = 'tab'
skip-magic-trailing-comma = false
[tool.ruff.lint]
ignore = [
'E402', 'E722', 'E741', 'W191', 'E101', 'E501', 'I001', 'F401', 'E714',
'E713',
# remove lambdas later on
'E731',
# fix this too
'E712',
'E703',
# remove unused variables, or fix a bug
'F841',
# fix * imports
'F403',
# don't care about trailing new lines
'W292',
]
select = ['E', 'F', 'I', 'W', 'INT']
[tool.ruff.lint.isort]
detect-same-package = true
# extra-standard-library = ["aes", "elementmaker", "encodings"]
# known-first-party = ["calibre_extensions", "calibre_plugins", "polyglot"]
# known-third-party = ["odf", "qt", "templite", "tinycss", "css_selectors"]
relative-imports-order = "closest-to-furthest"
split-on-trailing-comma = true
section-order = [
# '__python__',
"future",
"standard-library", "third-party", "first-party", "local-folder"
]
force-wrap-aliases = true
# [tool.ruff.lint.isort.sections]
# '__python__' = ['__python__']
[tool.pylsp-mypy]
enabled = false
[tool.pyright]
include = [
#'../../../../../follow_the_leader/views2/payments.py',
#'../../../../../follow_the_leader/logic/payments.py',
#'../../../../../follow_the_leader/logic/paypal.py',
'python/**/*.py',
'python/**/*.pyi',
]
# stubPath = '../mypy-stubs'
extraPaths = [
]
#strict = ["src"]
analyzeUnannotatedFunctions = true
disableBytesTypePromotions = true
strictParameterNoneValue = true
enableTypeIgnoreComments = true
enableReachabilityAnalysis = true
strictListInference = true
strictDictionaryInference = true
strictSetInference = true
deprecateTypingAliases = false
enableExperimentalFeatures = false
reportMissingTypeStubs ="error"
reportMissingModuleSource = "warning"
reportInvalidTypeForm = "error"
reportMissingImports = "error"
reportUndefinedVariable = "error"
reportAssertAlwaysTrue = "error"
reportInvalidStringEscapeSequence = "error"
reportInvalidTypeVarUse = "error"
reportSelfClsParameterName = "error"
reportUnsupportedDunderAll = "error"
reportUnusedExpression = "error"
reportWildcardImportFromLibrary = "error"
reportAbstractUsage = "error"
reportArgumentType = "error"
reportAssertTypeFailure = "error"
reportAssignmentType = "error"
reportAttributeAccessIssue = "error"
reportCallIssue = "error"
reportGeneralTypeIssues = "error"
reportInconsistentOverload = "error"
reportIndexIssue = "error"
reportInvalidTypeArguments = "error"
reportNoOverloadImplementation = "error"
reportOperatorIssue = "error"
reportOptionalSubscript = "error"
reportOptionalMemberAccess = "error"
reportOptionalCall = "error"
reportOptionalIterable = "error"
reportOptionalContextManager = "error"
reportOptionalOperand = "error"
reportRedeclaration = "error"
reportReturnType = "error"
reportTypedDictNotRequiredAccess = "error"
reportPrivateImportUsage = "error"
reportUnboundVariable = "error"
reportUnhashable = "error"
reportUnusedCoroutine = "error"
reportUnusedExcept = "error"
reportFunctionMemberAccess = "error"
reportIncompatibleMethodOverride = "error"
reportIncompatibleVariableOverride = "error"
reportOverlappingOverload = "error"
reportPossiblyUnboundVariable = "error"
reportConstantRedefinition = "error"
#reportDeprecated = "error"
reportDeprecated = "warning"
reportDuplicateImport = "error"
reportIncompleteStub = "error"
reportInconsistentConstructor = "error"
reportInvalidStubStatement = "error"
reportMatchNotExhaustive = "error"
reportMissingParameterType = "error"
reportMissingTypeArgument = "error"
reportPrivateUsage = "error"
reportTypeCommentUsage = "error"
reportUnknownArgumentType = "error"
reportUnknownLambdaType = "error"
reportUnknownMemberType = "error"
reportUnknownParameterType = "error"
reportUnknownVariableType = "error"
#reportUnknownVariableType = "warning"
reportUnnecessaryCast = "error"
reportUnnecessaryComparison = "error"
reportUnnecessaryContains = "error"
#reportUnnecessaryIsInstance = "error"
reportUnnecessaryIsInstance = "warning"
reportUnusedClass = "error"
#reportUnusedImport = "error"
reportUnusedImport = "none"
# reportUnusedFunction = "error"
reportUnusedFunction = "warning"
#reportUnusedVariable = "error"
reportUnusedVariable = "warning"
reportUntypedBaseClass = "error"
reportUntypedClassDecorator = "error"
reportUntypedFunctionDecorator = "error"
reportUntypedNamedTuple = "error"
reportCallInDefaultInitializer = "none"
reportImplicitOverride = "none"
reportImplicitStringConcatenation = "none"
reportImportCycles = "none"
reportMissingSuperCall = "none"
reportPropertyTypeMismatch = "none"
reportShadowedImports = "none"
reportUninitializedInstanceVariable = "none"
reportUnnecessaryTypeIgnoreComment = "none"
reportUnusedCallResult = "none"

@ -0,0 +1,68 @@
import asyncio
import datetime
import logging
# import os
from ..tickers_retrieval.emcont import Emcont
from ..tickers.models import Ticker
from ..tickers.logic import ticker_store_multiple, markets_get_by_symbol
from .db import create_engine
import sqlalchemy.ext.asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from typing import Any
logger = logging.getLogger(__name__)
async def run() -> None:
async_session = create_engine()
async def store_cb(
rates: list[Emcont.rates_get_t.data_t.rate_t],
timestamp: datetime.datetime,
session: 'async_sessionmaker[AsyncSession]',
) -> None:
logger.info(dict(
msg='before markets',
))
markets = await markets_get_by_symbol(
session,
set([
rate.symbol
for rate in rates
]),
)
logger.info(dict(
msg='after markets',
))
await ticker_store_multiple(
session,
[
Ticker(
id=markets[rate.symbol],
timestamp=timestamp,
value=rate.value,
)
for rate in rates
]
)
logger.info(dict(
rates=rates,
timestamp=timestamp.isoformat()
))
await Emcont.worker(
only_symbols={'EURUSD', 'USDJPY', 'GBPUSD', 'AUDUSD', 'USDCAD'},
session=async_session,
store_cb=store_cb,
request_timeout=2,
store_timeout=0.5,
)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(run())

@ -0,0 +1,15 @@
from ..tickers.settings import Settings as ModelsSettings
import sqlalchemy.ext.asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
def create_engine() -> 'async_sessionmaker[AsyncSession]':
engine = sqlalchemy.ext.asyncio.create_async_engine(
ModelsSettings.singleton().db_url
)
async_session = sqlalchemy.ext.asyncio.async_sessionmaker(
engine
)
return async_session

@ -0,0 +1,71 @@
import fastapi
import pydantic
import functools
import logging
import copy
import uvicorn
import uvicorn.config
import sys
from .settings import Settings as APISettings
from .db import create_engine
from .websocket_api import WebsocketAPI
from typing import (Any, Optional, Literal, Annotated,)
logger = logging.getLogger(__name__)
async def websocket_tickers(
websocket: fastapi.WebSocket,
websocket_api: WebsocketAPI,
) -> None:
try:
await websocket_api.connect(websocket)
while True:
msg = await websocket.receive_text()
await websocket_api.on_message(websocket, msg)
except fastapi.WebSocketDisconnect:
pass
# websocket_api.disconnect(websocket)
except:
logger.exception('')
raise
finally:
await websocket_api.disconnect(websocket)
def create_app() -> fastapi.FastAPI:
async_session = create_engine()
websocket_api = WebsocketAPI(
session=async_session,
)
app = fastapi.FastAPI()
app.websocket(
'/tickers/',
)(
functools.partial(
websocket_tickers,
websocket_api=fastapi.Depends(lambda : websocket_api),
)
)
return app
def run(args: list[str]):
log_config = copy.deepcopy(uvicorn.config.LOGGING_CONFIG)
uvicorn.run(
create_app(),
host=APISettings.singleton().uvicorn_host,
port=APISettings.singleton().uvicorn_port,
loop='uvloop',
log_config=log_config,
log_level=logging.INFO,
)
if __name__ == '__main__':
run(sys.argv[1:])

@ -0,0 +1,67 @@
import pydantic
import decimal
from typing import (Literal, Annotated,)
class SubscribeAction(pydantic.BaseModel):
action: Literal['subscribe']
class message_t(pydantic.BaseModel):
asset_id: Annotated[
int,
pydantic.Field(alias='assetId')
]
message: message_t
class AssetsAction(pydantic.BaseModel):
action: Literal['assets']
class message_t(pydantic.BaseModel):
pass
message: Annotated[
message_t,
pydantic.Field(
default_factory=message_t,
)
]
Action = pydantic.RootModel[
AssetsAction | SubscribeAction
]
class AssetHistoryResponse(pydantic.BaseModel):
action: Literal['asset_history'] = 'asset_history'
class message_t(pydantic.BaseModel):
class point_t(pydantic.BaseModel):
asset_name : Annotated[
str,
pydantic.Field(
alias='assetName',
)
]
time: int
asset_id : Annotated[
int,
pydantic.Field(alias='assetId')
]
value: decimal.Decimal
points: list[point_t]
message: message_t
class AssetTickerResponse(pydantic.BaseModel):
action: Literal['point'] = 'point'
message: 'AssetHistoryResponse.message_t.point_t'
class AssetsResponse(pydantic.BaseModel):
action: Literal['assets'] = 'assets'
class message_t(pydantic.BaseModel):
class asset_t(pydantic.BaseModel):
id: int
name: str
assets: list[asset_t]
message: message_t

@ -0,0 +1,18 @@
import pydantic
import pydantic_settings
from typing import (ClassVar, Optional,)
class Settings(pydantic_settings.BaseSettings):
uvicorn_port : int = 80
uvicorn_host : str = '127.0.0.1'
_singleton : ClassVar[Optional['Settings']] = None
@classmethod
def singleton(cls) -> 'Settings':
if cls._singleton is None:
cls._singleton = Settings.model_validate({})
return cls._singleton

@ -0,0 +1,126 @@
import fastapi
import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from . import schema
from ..tickers.logic import tickers_get_by_period, markets_all
from typing import (Optional, Literal)
class WebsocketAPI:
def __init__(
self,
session: 'async_sessionmaker[AsyncSession]',
) -> None:
self.connections : set[
fastapi.WebSocket,
] = set()
self.subscriptions_by_asset_id : dict[
int, set[fastapi.WebSocket]
] = dict()
self.subscriptions_by_client : dict[
fastapi.WebSocket,
int,
] = dict()
self.session = session
async def connect(self, client: fastapi.WebSocket) -> None:
assert not client in self.connections
await client.accept()
self.connections.add(client)
async def subscribe(
self,
client: fastapi.WebSocket,
asset_id: int
) -> None:
if client in self.subscriptions_by_client:
last_asset_id = self.subscriptions_by_client[client]
del self.subscriptions_by_asset_id[last_asset_id]
del self.subscriptions_by_client[client]
if not asset_id in self.subscriptions_by_asset_id:
self.subscriptions_by_asset_id[asset_id] = set()
self.subscriptions_by_asset_id[asset_id].add(client)
self.subscriptions_by_client[client] = asset_id
await self.asset_last_period(client, asset_id)
async def asset_last_period(
self,
client: fastapi.WebSocket,
asset_id: int,
) -> None:
tickers = await tickers_get_by_period(
self.session,
period=datetime.timedelta(minutes=30),
market_id=asset_id,
)
await client.send_text(
schema.AssetHistoryResponse(
message=schema.AssetHistoryResponse.message_t(
points=[
schema.AssetHistoryResponse.message_t.point_t.model_construct(
asset_name=o.market.name,
asset_id=o.market.id,
time=int(o.timestamp.timestamp()),
value=o.value,
)
for o in tickers
]
)
).json(by_alias=True,),
)
async def assets_index(
self,
client: fastapi.WebSocket,
) -> None:
markets = await markets_all(
self.session,
)
await client.send_text(
schema.AssetsResponse(
message=schema.AssetsResponse.message_t(
assets=[
schema.AssetsResponse.message_t.asset_t.model_construct(
name=o.name,
id=o.id,
)
for o in markets
]
)
).json(by_alias=True,),
)
async def on_message(
self,
client: fastapi.WebSocket,
msg_raw: str
) -> None:
msg = schema.Action.model_validate_json(
msg_raw
).root
if isinstance(msg, schema.SubscribeAction):
await self.subscribe(
client,
msg.message.asset_id
)
elif isinstance(msg, schema.AssetsAction):
await self.assets_index(
client,
)
else:
raise NotImplementedError
async def disconnect(self, client: fastapi.WebSocket) -> None:
assert client in self.connections
self.connections.remove(client)

@ -0,0 +1,117 @@
import asyncio
import logging
from logging.config import fileConfig
from sqlalchemy.ext.asyncio import async_engine_from_config
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from sqlalchemy.engine.base import Connection
from alembic import context
from online.fxreader.pr34.test_task_2025_06_30_v1.tickers.settings import Settings
from online.fxreader.pr34.test_task_2025_06_30_v1.tickers.models import (
Base,
Market,
)
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
config.set_main_option(
'sqlalchemy.url',
Settings.singleton().db_url
)
# Interpret the config file for Python logging.
# This line sets up loggers basically.
# if config.config_file_name is not None:
# fileConfig(config.config_file_name)
# else:
if True:
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# target_metadata = None
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def do_run_migrations(
connection: Connection,
):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations():
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
logger.info(dict(msg='started'))
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
logger.info(dict(msg='done'))
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
raise NotImplementedError
# run_migrations_offline()
else:
run_migrations_online()

@ -0,0 +1,28 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision: str = ${repr(up_revision)}
down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)}
branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
def upgrade() -> None:
"""Upgrade schema."""
${upgrades if upgrades else "pass"}
def downgrade() -> None:
"""Downgrade schema."""
${downgrades if downgrades else "pass"}

@ -0,0 +1,83 @@
import datetime
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload, make_transient
from sqlalchemy.future import select
from .models import Ticker, Market
from .utils import get_or_create
async def markets_get_by_symbol(
session: 'async_sessionmaker[AsyncSession]',
symbols: set[str],
) -> dict[str, int]:
res : dict[str, int] = dict()
async with session() as active_session:
async with active_session.begin() as transaction:
for o in symbols:
m = (await get_or_create(
active_session,
Market,
name=o,
))[0]
res[o] = m.id
return res
async def ticker_store_multiple(
session: 'async_sessionmaker[AsyncSession]',
tickers: list[Ticker],
) -> None:
async with session() as active_session:
async with active_session.begin() as transaction:
active_session.add_all(
tickers,
)
async def tickers_get_by_period(
session: 'async_sessionmaker[AsyncSession]',
market_id: int,
period: datetime.timedelta,
) -> list[Ticker]:
async with session() as active_session:
async with active_session.begin() as transaction:
q = select(
Ticker
).join(Ticker.market).where(
Market.id == market_id,
Ticker.timestamp >= datetime.datetime.now(
tz=datetime.timezone.utc
) - period
).order_by(Ticker.timestamp.desc()).options(
selectinload(Ticker.market)
)
res = await active_session.execute(q)
rows = [o[0] for o in res]
for o in rows:
active_session.expunge(o)
make_transient(o.market)
return rows
async def markets_all(
session: 'async_sessionmaker[AsyncSession]',
) -> list[Market]:
async with session() as active_session:
async with active_session.begin() as transaction:
q = select(
Market
)
res = await active_session.execute(q)
rows = [o[0] for o in res]
for o in rows:
active_session.expunge(o)
return rows

@ -0,0 +1,63 @@
import datetime
import decimal
from sqlalchemy.orm import (
mapped_column,
Mapped,
DeclarativeBase,
relationship,
)
from sqlalchemy import (
String,
ForeignKey,
Numeric,
DateTime,
UniqueConstraint,
)
from typing import (Optional,)
class Base(DeclarativeBase):
pass
class Market(Base):
__tablename__ = 'tickers_market'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(32))
tickers: Mapped[list['Ticker']] = relationship(
back_populates='market',
)
def __repr__(self) -> str:
return f"Market(id={self.id!r}, name={self.name!r})"
class Ticker(Base):
__tablename__ = 'tickers_ticker'
id: Mapped[int] = mapped_column(ForeignKey(
'tickers_market.id',
ondelete='CASCADE',
))
market: Mapped['Market'] = relationship(
back_populates='tickers'
)
timestamp: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True,)
)
value: Mapped[decimal.Decimal] = mapped_column(Numeric(
precision=32, scale=6,
))
__table_args__ = (
UniqueConstraint('id', 'timestamp'),
)
__mapper_args__ = dict(
primary_key=('id', 'timestamp',)
)
def __repr__(self) -> str:
return f"Ticker(id={self.id!r}, timestamp={self.timestamp!r}, value={self.value!r})"

@ -0,0 +1,17 @@
import pydantic
import pydantic_settings
from typing import (ClassVar, Optional,)
class Settings(pydantic_settings.BaseSettings):
db_url : str
_singleton : ClassVar[Optional['Settings']] = None
@classmethod
def singleton(cls) -> 'Settings':
if cls._singleton is None:
cls._singleton = Settings.model_validate({})
return cls._singleton

@ -0,0 +1,50 @@
from typing import (TypeVar, Optional, Any, cast,)
from sqlalchemy.ext.asyncio import AsyncSessionTransaction, AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.exc import NoResultFound, IntegrityError
M = TypeVar('M', bound='DeclarativeBase')
async def get_or_create(
session: AsyncSession,
model: type[M],
create_method: Optional[str] = None,
create_method_kwargs: Optional[dict[str, Any]] = None,
**kwargs: Any
) -> tuple[M, bool]:
async def select_row() -> M:
res = await session.execute(
select(model).where(
*[
getattr(model, k) == v
for k, v in kwargs.items()
]
)
)
row = res.one()[0]
assert isinstance(row, model)
return row
try:
res = await select_row()
return res, False
except NoResultFound:
if create_method_kwargs:
kwargs.update(create_method_kwargs)
if not create_method:
created = model(**kwargs)
else:
created = getattr(model, create_method)(**kwargs)
try:
session.add(created)
await session.flush()
return created, True
except IntegrityError:
await session.rollback()
return await select_row(), False