nym_node_reward_tracker
  1. Common utilities
  • Nym node reward tracker (docs index)
  • Common utilities
  • Cosmos Tx Parsing Helpers
  • Snapshot tracker
  • Cache: chain-native event cache for epoch-by-epoch rewards
  • Epoch-by-epoch reward replay
  • Reward transaction export
  • CLI
  • Node Interest Rates Walkthrough on Nyx (last 120 hours)
  • Notebook tests

On this page

  • Common utilities
    • Path helpers
      • ensure_parent_dir
      • resolve_under_dir
    • Logging
      • setup_logging
    • Wallet CSV
      • read_wallets_csv
    • YAML + JSON I/O
      • write_yaml
      • read_yaml
      • write_json
      • read_json
    • HTTP session + JSON request
      • request_json
      • build_session
    • Smart query helper
      • smart_query
    • Parsing helpers
      • b64_json
      • parse_unym_amount
      • to_decimal
      • to_int
      • utc_now_iso
    • Date + wallet tx-query helpers
      • iter_txs_by_query
      • build_tx_query
      • parse_date_filters
      • parse_rfc3339_to_utc
    • Cosmos tx search + pagination and window iteration
      • tx_search_all_pages
      • tx_search
      • iter_windows
  • Report an issue

Other Formats

  • CommonMark

Common utilities

Common utilities

This notebook defines shared helpers used by all commands and notebooks:

  • consistent logging
  • stable filesystem handling under data/
  • small input parsers
  • HTTP JSON fetchers with retry/backoff
  • base64-encoded contract smart-query support

The rule: anything used by ≥2 modules belongs here.

Intent: import only what we need and define stable constants used everywhere.

Path helpers

Intent: resolve “inputs/outputs under data/” consistently.

Rule:

  • if the user passes an absolute path → use it
  • if the user passes a bare filename → place under data_dir
  • if the user passes a relative path with a parent (e.g. data/foo.csv or tmp/foo.csv) → keep as-is

ensure_parent_dir


def ensure_parent_dir(
    path:str | Path
)->Path:

resolve_under_dir


def resolve_under_dir(
    base_dir:str | Path, path:str | Path
)->Path:

Resolve path relative to base_dir only if it is a bare filename.

Examples: resolve_under_dir(“data”, “wallet-addresses.csv”) -> data/wallet-addresses.csv resolve_under_dir(“data”, “data/wallet-addresses.csv”) -> data/wallet-addresses.csv resolve_under_dir(“data”, “/abs/x.csv”) -> /abs/x.csv

from tempfile import TemporaryDirectory
with TemporaryDirectory() as td:
    base = Path(td) / "data"
    assert resolve_under_dir(base, "x.csv") == base / "x.csv"
    assert resolve_under_dir(base, "data/x.csv") == Path("data/x.csv")
    assert resolve_under_dir(base, base / "x.csv") == base / "x.csv"

with TemporaryDirectory() as td:
    out = ensure_parent_dir(Path(td) / "a" / "b" / "c.txt")
    assert out.parent.exists()

Logging

Intent: create a logger that does not accumulate duplicate handlers when cells re-run.

Design:

  • setup_logging(name, ...) returns a configured logger
  • safe to call multiple times
  • optional file handler + console handler
  • propagate=False to avoid double logs

setup_logging


def setup_logging(
    name:str, log_file:str | Path | None=None, level:str='INFO'
)->logging.Logger:
log1 = setup_logging("test_common_logger", level="DEBUG")
log2 = setup_logging("test_common_logger", level="INFO")
assert log1 is log2
assert len(log1.handlers) >= 1
2026-02-12T13:36:54 | DEBUG | test_common_logger | Logger initialized

Wallet CSV

Intent: parse wallet-addresses.csv with columns address,tag into a stable list.

Rules: * skip empty addresses * tag is optional * preserve file order (helps stable output)


read_wallets_csv


def read_wallets_csv(
    path:str | Path
)->List[Tuple[str, str]]:
from tempfile import TemporaryDirectory
with TemporaryDirectory() as td:
    p = Path(td) / "wallets.csv"
    p.write_text("address,tag\nn1abc,test\n,\n n1def ,  \n", encoding="utf-8")
    out = read_wallets_csv(p)
assert out == [("n1abc", "test"), ("n1def", "")]

YAML + JSON I/O

Intent: read/write YAML and JSON safely for history files and cached metadata.


write_yaml


def write_yaml(
    path:str | Path, obj:Any
)->None:

read_yaml


def read_yaml(
    path:str | Path, default:Any=None
)->Any:

write_json


def write_json(
    path:str | Path, obj:Any
)->None:

read_json


def read_json(
    path:str | Path, default:Any=None
)->Any:
from tempfile import TemporaryDirectory
with TemporaryDirectory() as td:
    jp = Path(td) / "x.json"
    write_json(jp, {"a": 1})
    assert read_json(jp) == {"a": 1}

with TemporaryDirectory() as td:
    yp = Path(td) / "x.yaml"
    write_yaml(yp, {"a": 1})
    assert read_yaml(yp) == {"a": 1}

HTTP session + JSON request

Intent: create a requests.Session with stable headers and a robust JSON fetcher with retry/backoff.

Important: keep it testable with “fake session” objects (so no reliance on urllib3 Retry internals).

Semantic note: some 429 responses are non-transient (for example: upstream grpc payload-size limits like “message larger than max”). In those cases retrying does not help and should fail fast; scanner window size/logs should be used to correct query shape.


request_json


def request_json(
    session:Any, url:str, params:Dict[str, Any] | Sequence[Tuple[str, str]] | None=None,
    headers:Dict[str, str] | None=None, timeout:int=60, retries:int=5, logger:Optional[logging.Logger]=None
)->Any:

GET JSON with exponential backoff retry for retryable transport/server errors.

Notes: - retries 429/5xx by default, - fails fast for known non-transient 429 payload-size errors, - raises RuntimeError after retries, - logs retry/success timing for diagnostics.


build_session


def build_session(
    user_agent:str='nym-node-reward-tracker/1.0'
)->requests.Session:
# verify_request_json_semantics
class _HdrResp:
    status_code = 200
    text = "{}"
    def raise_for_status(self):
        return None
    def json(self):
        return {"ok": True}

class _HdrSession:
    def __init__(self):
        self.headers_seen = None
    def get(self, url, params=None, headers=None, timeout=0):
        self.headers_seen = headers
        return _HdrResp()

hs = _HdrSession()
out = request_json(hs, "https://example", headers={"X-Test": "1"}, retries=1)
assert out["ok"] is True
assert hs.headers_seen == {"X-Test": "1"}

class _TooBigResp:
    status_code = 429
    text = 'grpc: received message larger than max (16281606 vs. 10485760)'
    def raise_for_status(self):
        raise RuntimeError("HTTP 429")
    def json(self):
        return {}

class _TooBigSession:
    def __init__(self):
        self.calls = 0
    def get(self, url, params=None, headers=None, timeout=0):
        self.calls += 1
        return _TooBigResp()

ts = _TooBigSession()
try:
    request_json(ts, "https://example", retries=5)
    raise AssertionError("expected failure")
except RuntimeError:
    pass
assert ts.calls == 1
request_json failed non_retryable timeout=60s elapsed=0.000s url=https://example err=HTTP 429 non-retryable payload-too-large: grpc: received message larger than max (16281606 vs. 10485760)
request_json failed retries_exhausted=5 timeout=60s url=https://example err=HTTP 429 non-retryable payload-too-large: grpc: received message larger than max (16281606 vs. 10485760)

Intent: provide tiny fake HTTP helpers for deterministic notebook tests of retry/pagination logic without real network calls.

f = _FakeSession([_FakeResp({"ok": True}), _FakeResp({"ok": False}, status_code=500)])
assert f.get("https://example").json() == {"ok": True}
try:
    f.get("https://example").raise_for_status()
    raise AssertionError("expected HTTP error")
except RuntimeError:
    pass

Smart query helper

Intent: perform CosmWasm smart queries via Nyx REST (/smart/{b64}).

We keep it small and let request_json handle retries.


smart_query


def smart_query(
    session:Any, nyx_wasm_rest:str, contract:str, payload:Dict[str, Any], timeout:int=60,
    logger:Optional[logging.Logger]=None
)->Dict[str, Any]:

Execute a CosmWasm smart query against the Nyx REST endpoint.

fake = _FakeSession([_FakeResp({"ok": True})])
assert request_json(fake, "http://example") == {"ok": True}

smart = _FakeSession([_FakeResp({"data": {"node_id": 2196}})])
out = smart_query(
    smart,
    nyx_wasm_rest="https://api.nymtech.net/cosmwasm/wasm/v1/contract",
    contract="n1contract",
    payload={"get_node_rewarding_details": {"node_id": 2196}},
)
assert out["data"]["node_id"] == 2196

Parsing helpers

Intent: normalize common “stringly typed” values from APIs and events: * to_int * to_decimal * parse_unym_amount strips trailing denom and leading +


b64_json


def b64_json(
    payload:Dict[str, Any]
)->str:

parse_unym_amount


def parse_unym_amount(
    value:Optional[str]
)->Optional[str]:

Parse strings like ‘123unym’ or ‘+123unym’ -> ‘123’ Returns None if empty or invalid.


to_decimal


def to_decimal(
    value:Any, default:Optional[Decimal]=None
)->Optional[Decimal]:

to_int


def to_int(
    value:Any, default:Optional[int]=None
)->Optional[int]:

utc_now_iso


def utc_now_iso(
    
)->str:
assert to_int("42") == 42
assert to_int("x", None) is None
assert to_decimal("1.25") == Decimal("1.25")
assert parse_unym_amount("+123unym") == "123"
assert b64_json({"a": 1}) == "eyJhIjoxfQ=="

Date + wallet tx-query helpers

Intent: keep date parsing and wallet tx query pagination in one shared place so reward/caching notebooks can reuse the exact same behavior.


iter_txs_by_query


def iter_txs_by_query(
    session:Any, nyx_tx_rest:str, query:str, page_limit:int=100, order_by:str='ORDER_BY_ASC', timeout:int=60,
    retries:int=5, logger:Optional[logging.Logger]=None, progress_desc:Optional[str]=None
)->Iterator[Tuple[Dict[str, Any], Dict[str, Any]]]:

Yield (tx, tx_response) tuples across all pages for a Cosmos tx search query.

  • Uses pagination.next_key to page.
  • De-duplicates by txhash defensively.
  • If progress_desc is provided, shows a tqdm progress bar (uses pagination.total if available).

build_tx_query


def build_tx_query(
    address:str, search_mode:str
)->str:

parse_date_filters


def parse_date_filters(
    start:Optional[str], end:Optional[str]
)->tuple[Optional[datetime], Optional[datetime]]:

parse_rfc3339_to_utc


def parse_rfc3339_to_utc(
    ts:str
)->datetime:
assert parse_rfc3339_to_utc("2025-12-04T16:31:51Z").tzinfo == timezone.utc

s, e = parse_date_filters("2025-01-01", "2025-01-01")
assert s.isoformat().startswith("2025-01-01T00:00:00")
assert e.isoformat().startswith("2025-01-02T00:00:00")

page1 = {
    "txs": [{"body": {"messages": []}, "auth_info": {"fee": {"amount": []}}}],
    "tx_responses": [{"txhash": "A", "timestamp": "2025-01-01T00:00:00Z"}],
    "pagination": {"next_key": "K1", "total": "2"},
}
page2 = {
    "txs": [{"body": {"messages": []}, "auth_info": {"fee": {"amount": []}}}],
    "tx_responses": [{"txhash": "B", "timestamp": "2025-01-02T00:00:00Z"}],
    "pagination": {"next_key": None, "total": "2"},
}

fake = _FakeSession([_FakeResp(page1), _FakeResp(page2)])
rows = list(iter_txs_by_query(fake, nyx_tx_rest="https://example", query="q", page_limit=1))
assert [r[1]["txhash"] for r in rows] == ["A", "B"]

assert build_tx_query("n1x", "sender") == "message.sender='n1x'"
assert build_tx_query("n1x", "recipient") == "transfer.recipient='n1x'"

Cosmos tx search + pagination and window iteration

Intent: provide a single, reusable Cosmos /cosmos/tx/v1beta1/txs search helper with safe pagination and deduplication. This is needed by both cache building and later epoch-by-epoch reward tooling.


tx_search_all_pages


def tx_search_all_pages(
    session:Any, nyx_tx_rest:str, query:str, page_size:int=100, max_pages:int=200, timeout:int=60,
    logger:Optional[logging.Logger]=None, strict:bool=False, retries:int=5
)->List[Dict[str, Any]]:

Fetch all pages using offset pagination.

Defensive features: - de-duplicates by txhash - stops on repeated page fingerprint (guards against buggy pagination) - with strict=True, raises on transport/shape errors and max_pages truncation


tx_search


def tx_search(
    session:Any, nyx_tx_rest:str, query:str, limit:int=100, offset:int=0, timeout:int=60,
    logger:Optional[logging.Logger]=None, strict:bool=False, retries:int=5
)->Dict[str, Any]:

Thin wrapper around Cosmos tx search endpoint.

Returns a dict that (on success) contains: - tx_responses: list - pagination: { total: “…” }

On error (strict=False) it returns: - tx_responses: [] - pagination: { total: “0” } - error:

With strict=True, it raises RuntimeError on HTTP/shape errors.

# Define -> use -> verify: paging dedupe + strict error handling

# dedupe works
_fake_pages = [
    {"tx_responses": [{"txhash": "A"}, {"txhash": "B"}], "pagination": {"total": "2"}},
    {"tx_responses": [{"txhash": "A"}, {"txhash": "B"}], "pagination": {"total": "2"}},
]
fake = _FakeSession([_FakeResp(p) for p in _fake_pages])
out = tx_search_all_pages(fake, nyx_tx_rest="https://example", query="q", page_size=2, max_pages=5)
assert [r["txhash"] for r in out] == ["A", "B"]

# strict=True raises on transport errors
class BoomSession:
    def get(self, url: str, params=None, timeout: int = 0):
        raise RuntimeError("boom")

try:
    tx_search(BoomSession(), nyx_tx_rest="https://example", query="q", strict=True, retries=1)
    raise AssertionError("expected strict tx_search to raise")
except RuntimeError:
    pass

non_strict = tx_search(BoomSession(), nyx_tx_rest="https://example", query="q", strict=False, retries=1)
assert non_strict["tx_responses"] == []
assert "error" in non_strict

# strict=True raises when max_pages truncation is reached
class EndlessSession:
    def get(self, url: str, params=None, timeout: int = 0):
        lim = int((params or {}).get("pagination.limit", 2))
        off = int((params or {}).get("pagination.offset", 0))
        page = {
            "tx_responses": [{"txhash": f"T{off+n}"} for n in range(lim)],
            "pagination": {"total": "999999"},
        }
        return _FakeResp(page)

try:
    tx_search_all_pages(EndlessSession(), nyx_tx_rest="https://example", query="q", page_size=2, max_pages=2, strict=True, retries=1)
    raise AssertionError("expected strict max_pages truncation to raise")
except RuntimeError:
    pass
request_json failed retries_exhausted=1 timeout=60s url=https://example err=boom
request_json failed retries_exhausted=1 timeout=60s url=https://example err=boom

Intent: deterministic height slicing used across scanners.


iter_windows


def iter_windows(
    start_height:int, end_height:int, window_size:int
)->List[Tuple[int, int]]:

Inclusive height windows: (start, end) with end <= end_height.

assert iter_windows(1, 5, 2) == [(1, 2), (3, 4), (5, 5)]
  • Report an issue