from tempfile import TemporaryDirectoryCommon utilities
Common utilities
This notebook defines shared helpers used by all commands and notebooks:
- consistent logging
- stable filesystem handling under
data/ - small input parsers
- HTTP JSON fetchers with retry/backoff
- base64-encoded contract smart-query support
The rule: anything used by ≥2 modules belongs here.
Intent: import only what we need and define stable constants used everywhere.
Path helpers
Intent: resolve “inputs/outputs under data/” consistently.
Rule:
- if the user passes an absolute path → use it
- if the user passes a bare filename → place under
data_dir - if the user passes a relative path with a parent (e.g.
data/foo.csvortmp/foo.csv) → keep as-is
ensure_parent_dir
def ensure_parent_dir(
path:str | Path
)->Path:
resolve_under_dir
def resolve_under_dir(
base_dir:str | Path, path:str | Path
)->Path:
Resolve path relative to base_dir only if it is a bare filename.
Examples: resolve_under_dir(“data”, “wallet-addresses.csv”) -> data/wallet-addresses.csv resolve_under_dir(“data”, “data/wallet-addresses.csv”) -> data/wallet-addresses.csv resolve_under_dir(“data”, “/abs/x.csv”) -> /abs/x.csv
with TemporaryDirectory() as td:
base = Path(td) / "data"
assert resolve_under_dir(base, "x.csv") == base / "x.csv"
assert resolve_under_dir(base, "data/x.csv") == Path("data/x.csv")
assert resolve_under_dir(base, base / "x.csv") == base / "x.csv"
with TemporaryDirectory() as td:
out = ensure_parent_dir(Path(td) / "a" / "b" / "c.txt")
assert out.parent.exists()Logging
Intent: create a logger that does not accumulate duplicate handlers when cells re-run.
Design:
setup_logging(name, ...)returns a configured logger- safe to call multiple times
- optional file handler + console handler
propagate=Falseto avoid double logs
setup_logging
def setup_logging(
name:str, log_file:str | Path | None=None, level:str='INFO'
)->logging.Logger:
log1 = setup_logging("test_common_logger", level="DEBUG")
log2 = setup_logging("test_common_logger", level="INFO")
assert log1 is log2
assert len(log1.handlers) >= 12026-02-12T13:36:54 | DEBUG | test_common_logger | Logger initialized
Wallet CSV
Intent: parse wallet-addresses.csv with columns address,tag into a stable list.
Rules: * skip empty addresses * tag is optional * preserve file order (helps stable output)
read_wallets_csv
def read_wallets_csv(
path:str | Path
)->List[Tuple[str, str]]:
from tempfile import TemporaryDirectorywith TemporaryDirectory() as td:
p = Path(td) / "wallets.csv"
p.write_text("address,tag\nn1abc,test\n,\n n1def , \n", encoding="utf-8")
out = read_wallets_csv(p)
assert out == [("n1abc", "test"), ("n1def", "")]YAML + JSON I/O
Intent: read/write YAML and JSON safely for history files and cached metadata.
write_yaml
def write_yaml(
path:str | Path, obj:Any
)->None:
read_yaml
def read_yaml(
path:str | Path, default:Any=None
)->Any:
write_json
def write_json(
path:str | Path, obj:Any
)->None:
read_json
def read_json(
path:str | Path, default:Any=None
)->Any:
from tempfile import TemporaryDirectorywith TemporaryDirectory() as td:
jp = Path(td) / "x.json"
write_json(jp, {"a": 1})
assert read_json(jp) == {"a": 1}
with TemporaryDirectory() as td:
yp = Path(td) / "x.yaml"
write_yaml(yp, {"a": 1})
assert read_yaml(yp) == {"a": 1}HTTP session + JSON request
Intent: create a requests.Session with stable headers and a robust JSON fetcher with retry/backoff.
Important: keep it testable with “fake session” objects (so no reliance on urllib3 Retry internals).
Semantic note: some 429 responses are non-transient (for example: upstream grpc payload-size limits like “message larger than max”). In those cases retrying does not help and should fail fast; scanner window size/logs should be used to correct query shape.
request_json
def request_json(
session:Any, url:str, params:Dict[str, Any] | Sequence[Tuple[str, str]] | None=None,
headers:Dict[str, str] | None=None, timeout:int=60, retries:int=5, logger:Optional[logging.Logger]=None
)->Any:
GET JSON with exponential backoff retry for retryable transport/server errors.
Notes: - retries 429/5xx by default, - fails fast for known non-transient 429 payload-size errors, - raises RuntimeError after retries, - logs retry/success timing for diagnostics.
build_session
def build_session(
user_agent:str='nym-node-reward-tracker/1.0'
)->requests.Session:
# verify_request_json_semantics
class _HdrResp:
status_code = 200
text = "{}"
def raise_for_status(self):
return None
def json(self):
return {"ok": True}
class _HdrSession:
def __init__(self):
self.headers_seen = None
def get(self, url, params=None, headers=None, timeout=0):
self.headers_seen = headers
return _HdrResp()
hs = _HdrSession()
out = request_json(hs, "https://example", headers={"X-Test": "1"}, retries=1)
assert out["ok"] is True
assert hs.headers_seen == {"X-Test": "1"}
class _TooBigResp:
status_code = 429
text = 'grpc: received message larger than max (16281606 vs. 10485760)'
def raise_for_status(self):
raise RuntimeError("HTTP 429")
def json(self):
return {}
class _TooBigSession:
def __init__(self):
self.calls = 0
def get(self, url, params=None, headers=None, timeout=0):
self.calls += 1
return _TooBigResp()
ts = _TooBigSession()
try:
request_json(ts, "https://example", retries=5)
raise AssertionError("expected failure")
except RuntimeError:
pass
assert ts.calls == 1request_json failed non_retryable timeout=60s elapsed=0.000s url=https://example err=HTTP 429 non-retryable payload-too-large: grpc: received message larger than max (16281606 vs. 10485760)
request_json failed retries_exhausted=5 timeout=60s url=https://example err=HTTP 429 non-retryable payload-too-large: grpc: received message larger than max (16281606 vs. 10485760)
Intent: provide tiny fake HTTP helpers for deterministic notebook tests of retry/pagination logic without real network calls.
f = _FakeSession([_FakeResp({"ok": True}), _FakeResp({"ok": False}, status_code=500)])
assert f.get("https://example").json() == {"ok": True}
try:
f.get("https://example").raise_for_status()
raise AssertionError("expected HTTP error")
except RuntimeError:
passSmart query helper
Intent: perform CosmWasm smart queries via Nyx REST (/smart/{b64}).
We keep it small and let request_json handle retries.
smart_query
def smart_query(
session:Any, nyx_wasm_rest:str, contract:str, payload:Dict[str, Any], timeout:int=60,
logger:Optional[logging.Logger]=None
)->Dict[str, Any]:
Execute a CosmWasm smart query against the Nyx REST endpoint.
fake = _FakeSession([_FakeResp({"ok": True})])
assert request_json(fake, "http://example") == {"ok": True}
smart = _FakeSession([_FakeResp({"data": {"node_id": 2196}})])
out = smart_query(
smart,
nyx_wasm_rest="https://api.nymtech.net/cosmwasm/wasm/v1/contract",
contract="n1contract",
payload={"get_node_rewarding_details": {"node_id": 2196}},
)
assert out["data"]["node_id"] == 2196Parsing helpers
Intent: normalize common “stringly typed” values from APIs and events: * to_int * to_decimal * parse_unym_amount strips trailing denom and leading +
b64_json
def b64_json(
payload:Dict[str, Any]
)->str:
parse_unym_amount
def parse_unym_amount(
value:Optional[str]
)->Optional[str]:
Parse strings like ‘123unym’ or ‘+123unym’ -> ‘123’ Returns None if empty or invalid.
to_decimal
def to_decimal(
value:Any, default:Optional[Decimal]=None
)->Optional[Decimal]:
to_int
def to_int(
value:Any, default:Optional[int]=None
)->Optional[int]:
utc_now_iso
def utc_now_iso(
)->str:
assert to_int("42") == 42
assert to_int("x", None) is None
assert to_decimal("1.25") == Decimal("1.25")
assert parse_unym_amount("+123unym") == "123"
assert b64_json({"a": 1}) == "eyJhIjoxfQ=="Date + wallet tx-query helpers
Intent: keep date parsing and wallet tx query pagination in one shared place so reward/caching notebooks can reuse the exact same behavior.
iter_txs_by_query
def iter_txs_by_query(
session:Any, nyx_tx_rest:str, query:str, page_limit:int=100, order_by:str='ORDER_BY_ASC', timeout:int=60,
retries:int=5, logger:Optional[logging.Logger]=None, progress_desc:Optional[str]=None
)->Iterator[Tuple[Dict[str, Any], Dict[str, Any]]]:
Yield (tx, tx_response) tuples across all pages for a Cosmos tx search query.
- Uses
pagination.next_keyto page. - De-duplicates by txhash defensively.
- If
progress_descis provided, shows a tqdm progress bar (uses pagination.total if available).
build_tx_query
def build_tx_query(
address:str, search_mode:str
)->str:
parse_date_filters
def parse_date_filters(
start:Optional[str], end:Optional[str]
)->tuple[Optional[datetime], Optional[datetime]]:
parse_rfc3339_to_utc
def parse_rfc3339_to_utc(
ts:str
)->datetime:
assert parse_rfc3339_to_utc("2025-12-04T16:31:51Z").tzinfo == timezone.utc
s, e = parse_date_filters("2025-01-01", "2025-01-01")
assert s.isoformat().startswith("2025-01-01T00:00:00")
assert e.isoformat().startswith("2025-01-02T00:00:00")
page1 = {
"txs": [{"body": {"messages": []}, "auth_info": {"fee": {"amount": []}}}],
"tx_responses": [{"txhash": "A", "timestamp": "2025-01-01T00:00:00Z"}],
"pagination": {"next_key": "K1", "total": "2"},
}
page2 = {
"txs": [{"body": {"messages": []}, "auth_info": {"fee": {"amount": []}}}],
"tx_responses": [{"txhash": "B", "timestamp": "2025-01-02T00:00:00Z"}],
"pagination": {"next_key": None, "total": "2"},
}
fake = _FakeSession([_FakeResp(page1), _FakeResp(page2)])
rows = list(iter_txs_by_query(fake, nyx_tx_rest="https://example", query="q", page_limit=1))
assert [r[1]["txhash"] for r in rows] == ["A", "B"]
assert build_tx_query("n1x", "sender") == "message.sender='n1x'"
assert build_tx_query("n1x", "recipient") == "transfer.recipient='n1x'"Cosmos tx search + pagination and window iteration
Intent: provide a single, reusable Cosmos /cosmos/tx/v1beta1/txs search helper with safe pagination and deduplication. This is needed by both cache building and later epoch-by-epoch reward tooling.
tx_search_all_pages
def tx_search_all_pages(
session:Any, nyx_tx_rest:str, query:str, page_size:int=100, max_pages:int=200, timeout:int=60,
logger:Optional[logging.Logger]=None, strict:bool=False, retries:int=5
)->List[Dict[str, Any]]:
Fetch all pages using offset pagination.
Defensive features: - de-duplicates by txhash - stops on repeated page fingerprint (guards against buggy pagination) - with strict=True, raises on transport/shape errors and max_pages truncation
tx_search
def tx_search(
session:Any, nyx_tx_rest:str, query:str, limit:int=100, offset:int=0, timeout:int=60,
logger:Optional[logging.Logger]=None, strict:bool=False, retries:int=5
)->Dict[str, Any]:
Thin wrapper around Cosmos tx search endpoint.
Returns a dict that (on success) contains: - tx_responses: list - pagination: { total: “…” }
On error (strict=False) it returns: - tx_responses: [] - pagination: { total: “0” } - error:
With strict=True, it raises RuntimeError on HTTP/shape errors.
# Define -> use -> verify: paging dedupe + strict error handling
# dedupe works
_fake_pages = [
{"tx_responses": [{"txhash": "A"}, {"txhash": "B"}], "pagination": {"total": "2"}},
{"tx_responses": [{"txhash": "A"}, {"txhash": "B"}], "pagination": {"total": "2"}},
]
fake = _FakeSession([_FakeResp(p) for p in _fake_pages])
out = tx_search_all_pages(fake, nyx_tx_rest="https://example", query="q", page_size=2, max_pages=5)
assert [r["txhash"] for r in out] == ["A", "B"]
# strict=True raises on transport errors
class BoomSession:
def get(self, url: str, params=None, timeout: int = 0):
raise RuntimeError("boom")
try:
tx_search(BoomSession(), nyx_tx_rest="https://example", query="q", strict=True, retries=1)
raise AssertionError("expected strict tx_search to raise")
except RuntimeError:
pass
non_strict = tx_search(BoomSession(), nyx_tx_rest="https://example", query="q", strict=False, retries=1)
assert non_strict["tx_responses"] == []
assert "error" in non_strict
# strict=True raises when max_pages truncation is reached
class EndlessSession:
def get(self, url: str, params=None, timeout: int = 0):
lim = int((params or {}).get("pagination.limit", 2))
off = int((params or {}).get("pagination.offset", 0))
page = {
"tx_responses": [{"txhash": f"T{off+n}"} for n in range(lim)],
"pagination": {"total": "999999"},
}
return _FakeResp(page)
try:
tx_search_all_pages(EndlessSession(), nyx_tx_rest="https://example", query="q", page_size=2, max_pages=2, strict=True, retries=1)
raise AssertionError("expected strict max_pages truncation to raise")
except RuntimeError:
passrequest_json failed retries_exhausted=1 timeout=60s url=https://example err=boom
request_json failed retries_exhausted=1 timeout=60s url=https://example err=boom
Intent: deterministic height slicing used across scanners.
iter_windows
def iter_windows(
start_height:int, end_height:int, window_size:int
)->List[Tuple[int, int]]:
Inclusive height windows: (start, end) with end <= end_height.
assert iter_windows(1, 5, 2) == [(1, 2), (3, 4), (5, 5)]