nym_node_reward_tracker
  1. Epoch-by-epoch reward replay
  • Nym node reward tracker (docs index)
  • Common utilities
  • Cosmos Tx Parsing Helpers
  • Snapshot tracker
  • Cache: chain-native event cache for epoch-by-epoch rewards
  • Epoch-by-epoch reward replay
  • Reward transaction export
  • CLI
  • Node Interest Rates Walkthrough on Nyx (last 120 hours)
  • Notebook tests

On this page

  • Imports + constants
    • NodeEpochTotalRow
    • EpochEarningRow
    • SeedWallet
  • Cache-first seed wallet loading
    • seed_wallet_tag_map
    • load_seed_wallets
  • Watch-node discovery (cache first)
    • discover_watch_nodes
    • resolve_owner_wallet_for_node
  • Canonical reward-event selection
  • Replay math helpers
    • pending_unym
    • delegator_reward_epoch
    • delta_u
    • stake_value_unym
  • Node replay
    • replay_node_epoch_rows
  • Involved-wallet closure and consistency checks
    • check_node_epoch_consistency
    • involved_wallets_for_watch_nodes
  • Pending-value comparison against API + cache
    • compare_replay_vs_live_pending
  • Excel writer
    • write_epoch_by_epoch_excel
  • Main entrypoint
    • run_epoch_by_epoch
  • Live run example
  • Report an issue

Other Formats

  • CommonMark

Epoch-by-epoch reward replay

Imports + constants

Intent: define shared constants, typed row models, and reusable parsing helpers for replay and export.


NodeEpochTotalRow


def NodeEpochTotalRow(
    data:Any
)->None:

Node-level epoch totals aggregated from replayed reward events.


EpochEarningRow


def EpochEarningRow(
    data:Any
)->None:

Per-wallet, per-node, per-epoch replayed earning breakdown.


SeedWallet


def SeedWallet(
    data:Any
)->None:

Wallet seed row used to start cache-backed epoch replay.

seed = SeedWallet(address="n1abc", tag="demo")
assert seed.address == "n1abc"
assert _dec_str(Decimal("10.5000")) == "10.5"
assert _to_decimal("12") == Decimal("12")

Cache-first seed wallet loading

Intent: load seed wallets from CSV and keep wallet_tag mapping stable for export sheets.


seed_wallet_tag_map


def seed_wallet_tag_map(
    seed_wallets:Sequence[SeedWallet]
)->Dict[str, str]:

load_seed_wallets


def load_seed_wallets(
    wallets_csv:str | Path
)->List[SeedWallet]:
ws = [SeedWallet(address="n1a", tag="A"), SeedWallet(address="n1b", tag="")]
assert seed_wallet_tag_map(ws)["n1a"] == "A"

Watch-node discovery (cache first)

Intent: discover watch nodes using cache tables first, then fallback to get_owned_nym_node only when operator relation is missing.


discover_watch_nodes


def discover_watch_nodes(
    conn:sqlite3.Connection, seed_wallets:Sequence[SeedWallet], session:Any,
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', timeout:int=60,
    logger:Optional[logging.Logger]=None
)->Tuple[set[int], Dict[int, str]]:

resolve_owner_wallet_for_node


def resolve_owner_wallet_for_node(
    session:Any, node_id:int, contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', timeout:int=60,
    logger:Optional[logging.Logger]=None
)->str:
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE wallet_nodes (wallet_address TEXT, node_id INTEGER, relation TEXT, start_height INTEGER, updated_at TEXT)")
conn.execute("CREATE TABLE delegation_events (node_id INTEGER, delegator TEXT)")
conn.execute("CREATE TABLE withdraw_events (node_id INTEGER, delegator TEXT)")
conn.execute("CREATE TABLE wallet_actions (wallet_address TEXT, node_id INTEGER, height INTEGER)")
conn.execute("INSERT INTO wallet_nodes VALUES ('n1a', 14, 'delegator', 1, '')")
conn.execute("INSERT INTO wallet_nodes VALUES ('n1a', 9, 'operator', 1, '')")
conn.execute("INSERT INTO delegation_events VALUES (2933, 'n1a')")
conn.execute("INSERT INTO withdraw_events VALUES (2196, 'n1a')")
conn.commit()

nodes, owners = discover_watch_nodes(conn, [SeedWallet(address='n1a', tag='')], session=object())
assert {9, 14, 2933, 2196}.issubset(nodes)
assert owners[9] == 'n1a'
conn.close()

Canonical reward-event selection

Intent: enforce one canonical reward event per (node_id, epoch) using (height, msg_index, txhash) ordering, and keep traceability fields for every epoch row.

rows = [
    {"epoch": 7, "height": 20, "msg_index": 1, "txhash": "b"},
    {"epoch": 7, "height": 20, "msg_index": 0, "txhash": "c"},
    {"epoch": 7, "height": 19, "msg_index": 9, "txhash": "a"},
]
out = _canonical_reward_rows(rows)
assert len(out) == 1
assert out[0]["txhash"] == "a"

Replay math helpers

Intent: apply event-sourced delegation state transitions and reward-split formulas exactly at reward heights.


pending_unym


def pending_unym(
    a:Decimal, c:Decimal, U:Decimal, D:Decimal=Decimal('1000000000')
)->Decimal:

delegator_reward_epoch


def delegator_reward_epoch(
    a:Decimal, c:Decimal, dU:Decimal, D:Decimal=Decimal('1000000000')
)->Decimal:

delta_u


def delta_u(
    R:Decimal, U:Decimal, D:Decimal, P:Decimal
)->Decimal:

stake_value_unym


def stake_value_unym(
    a:Decimal, c:Decimal, U:Decimal, D:Decimal=Decimal('1000000000')
)->Decimal:
a = Decimal("100")
c = Decimal("0")
U = Decimal("0")
R = Decimal("10")
P = Decimal("100")
dU = delta_u(R, U, DELEGATION_DENOMINATOR, P)
assert delegator_reward_epoch(a, c, dU) == Decimal("10")
assert pending_unym(a, c, U) == Decimal("0")

Node replay

Intent: replay one node from cache events and produce wallet-epoch rows + node totals from the same canonical reward stream.


replay_node_epoch_rows


def replay_node_epoch_rows(
    node_id:int, owner_wallet:str, reward_rows:Sequence[Dict[str, Any]], delegation_rows:Sequence[Dict[str, Any]],
    withdraw_rows:Sequence[Dict[str, Any]], current_total_unit_reward:Decimal, D:Decimal=Decimal('1000000000')
)->Dict[str, Any]:
demo = replay_node_epoch_rows(
    node_id=1,
    owner_wallet="n1owner",
    reward_rows=[
        {"epoch": 1, "height": 10, "timestamp": "2026-01-01T00:00:00Z", "txhash": "tx1", "msg_index": 0,
         "prior_unit_reward": "0", "prior_delegates": "100", "delegates_reward": "10", "operator_reward": "2"}
    ],
    delegation_rows=[
        {"height": 5, "event_type": "wasm-v2_delegation", "delegator": "n1d", "delta_amount_unym": "100"}
    ],
    withdraw_rows=[],
    current_total_unit_reward=Decimal("0"),
)
assert len(demo["wallet_rows"]) == 2
assert any(r["path"] == "operator_reward" for r in demo["wallet_rows"])
assert any(r["path"] == "delegator_reward" for r in demo["wallet_rows"])

Involved-wallet closure and consistency checks

Intent: derive full wallet scope for Sheet 2 from watch nodes and enforce node-epoch reconciliation (delegator splits ~= delegates_reward).


check_node_epoch_consistency


def check_node_epoch_consistency(
    all_wallet_rows:Sequence[Dict[str, Any]], node_totals:Sequence[Dict[str, Any]], logger:logging.Logger,
    rel_tol:Decimal=Decimal('1E-8'), abs_tol:Decimal=Decimal('0.000001')
)->None:

involved_wallets_for_watch_nodes


def involved_wallets_for_watch_nodes(
    conn:sqlite3.Connection, watch_nodes:Sequence[int], owner_by_node:Dict[int, str]
)->set[str]:
rows = [{"node_id": 1, "epoch": 1, "path": "delegator_reward", "amount_unym": "10"}]
node = [{"node_id": 1, "epoch": 1, "delegates_reward_unym": "10"}]
logger = logging.getLogger("verify")
check_node_epoch_consistency(rows, node, logger=logger)
assert callable(involved_wallets_for_watch_nodes)
assert callable(check_node_epoch_consistency)

Pending-value comparison against API + cache

Intent: compare replay-derived pending values against live smart-query values for all involved wallets on watched nodes, and against cached pending state for traceability. Always log INFO summaries and emit WARNING on tolerance mismatches.


compare_replay_vs_live_pending


def compare_replay_vs_live_pending(
    session:Any, conn:sqlite3.Connection, seed_wallets:Sequence[SeedWallet], watch_nodes:Sequence[int],
    owner_by_node:Dict[int, str], replay_pending_delegator:Dict[Tuple[str, int], Decimal],
    replay_pending_operator:Dict[str, Decimal], contract:str, nyx_wasm_rest:str, timeout:int, logger:logging.Logger
)->None:
assert callable(_cached_pending_delegator_totals)
assert callable(compare_replay_vs_live_pending)

Excel writer

Intent: write all required sheets from one replay result set without float coercion for amount_unym.


write_epoch_by_epoch_excel


def write_epoch_by_epoch_excel(
    out_path:str | Path, wallet_seed_rows:Sequence[Dict[str, Any]], wallet_all_rows:Sequence[Dict[str, Any]],
    node_total_rows:Sequence[Dict[str, Any]]
)->None:
assert callable(write_epoch_by_epoch_excel)

Main entrypoint

Intent: run cache refresh first, replay watch nodes from cached events, emit three-sheet Excel output, and warn on replay-vs-live mismatches.


run_epoch_by_epoch


def run_epoch_by_epoch(
    data_dir:str='data', wallets_csv:str='wallet-addresses.csv', out_xlsx:str='epoch_by_epoch_rewards.xlsx',
    db_path:str='data/nym_cache.sqlite', contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract',
    window_size:Optional[int]=None, # cache refresh knobs
    window_size_reward:int=10000, window_size_delegation:int=10000, window_size_withdraw:int=20000,
    wallet_window_size:int=100000, wallet_start_height:int=0, page_size:int=100, max_pages:int=200, timeout:int=60,
    force:bool=False, log_file:Optional[str]='nym_epoch_by_epoch.log', log_level:str='INFO'
)->int:
assert callable(run_epoch_by_epoch)

Live run example

This cell demonstrates a real cache-refresh + replay invocation against project data.

# run_epoch_by_epoch(
#     data_dir="data",
#     wallets_csv="wallet-addresses.csv",
#     out_xlsx="epoch_by_epoch_rewards.xlsx",
# )
  • Report an issue