nym_node_reward_tracker
  1. Snapshot tracker
  • Nym node reward tracker (docs index)
  • Common utilities
  • Cosmos Tx Parsing Helpers
  • Snapshot tracker
  • Cache: chain-native event cache for epoch-by-epoch rewards
  • Epoch-by-epoch reward replay
  • Reward transaction export
  • CLI
  • Node Interest Rates Walkthrough on Nyx (last 120 hours)
  • Notebook tests

On this page

  • Imports + endpoints + constants
  • Data model
    • WalletSnapshot
    • BalanceBreakdown
  • Wallet → node metadata (node_id, identity_key)
    • get_owned_node_details
  • Source 1: Spectre balance parser
    • balances_from_spectre_json
  • Source 1: Spectre fetcher
    • fetch_balances_spectre
  • Source 2: Cosmos snapshot builder (best-effort, chain-native)
    • fetch_spendable_unym_bank
    • sum_delegated_unym
    • get_delegator_delegations_current
    • get_pending_delegator_reward_unym
    • get_pending_operator_reward_unym
  • Cosmos snapshot assembly
    • fetch_balances_cosmos
  • Snapshot orchestration
    • build_wallet_snapshot
  • Persistence and history
    • scaled_window_change_unym
    • record_history_point
    • prune_history_older_than
  • Tabular output (latest snapshot)
    • log_snapshot_table
    • write_latest_snapshot_csv
  • Main entry point: run_snapshot
    • run_snapshot
  • Report an issue

Other Formats

  • CommonMark

Snapshot tracker

This notebook implements the snapshot CLI command.

It records wallet-level balances in micro-NYM (unym) and can fetch balances from: * SpectreDAO aggregator (/api/v1/balances/{address}) * Nyx chain APIs (Cosmos REST + mixnet contract smart queries)

It produces: * a “latest snapshot” table file (node-balances.csv by default, or .xlsx by extension) * a rolling history YAML (data.yaml by default) to estimate 7d/30d deltas

Imports + endpoints + constants

Intent: define endpoints and defaults in one place, and keep the rest of the module pure and testable.

Data model

Intent: normalize both sources to a single internal representation, in unym ints (lossless).


WalletSnapshot


def WalletSnapshot(
    data:Any
)->None:

Single timestamped snapshot row for one wallet.


BalanceBreakdown


def BalanceBreakdown(
    data:Any
)->None:

Normalized wallet balance components in unym.

b = BalanceBreakdown(
    delegated_unym="1",
    self_bonded_unym=2,
    spendable_unym=3,
    rewards_operator_commissions_unym="4",
)
assert b.total_unym() == 10

w = WalletSnapshot(
    timestamp_utc="2026-01-01T00:00:00+00:00",
    wallet_address="n1wallet",
    tag="test",
    node_id="2196",
    identity_key="identity",
    balances=b,
    source="spectre",
)
assert w.to_latest_csv_row()["node_id"] == "2196"
b
BalanceBreakdown(delegated_unym=1, self_bonded_unym=2, spendable_unym=3, locked_unym=0, rewards_operator_commissions_unym=4, rewards_staking_rewards_unym=0, rewards_unlocked_unym=0)
w
WalletSnapshot(timestamp_utc='2026-01-01T00:00:00+00:00', wallet_address='n1wallet', tag='test', node_id=2196, identity_key='identity', balances=BalanceBreakdown(delegated_unym=1, self_bonded_unym=2, spendable_unym=3, locked_unym=0, rewards_operator_commissions_unym=4, rewards_staking_rewards_unym=0, rewards_unlocked_unym=0), source='spectre')

Wallet → node metadata (node_id, identity_key)

Intent: enrich snapshots with node identity when the wallet owns a node.

This uses get_owned_nym_node.

MIXNET_CONTRACT='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr'
WALLET='n127c69pasr35p76amfczemusnutr8mtw78s8xl7'
Q=$(printf '%s' '{"get_owned_nym_node":{"address":"'"${WALLET}"'"}}' | base64 -w 0)
curl -s "https://api.nymtech.net/cosmwasm/wasm/v1/contract/${MIXNET_CONTRACT}/smart/${Q}" | jq '.'
{
  "data": {
    "address": "n127c69pasr35p76amfczemusnutr8mtw78s8xl7",
    "details": {
      "bond_information": {
        "node_id": 2196,
        "owner": "n127c69pasr35p76amfczemusnutr8mtw78s8xl7",
        "original_pledge": {
          "denom": "unym",
          "amount": "15000000000"
        },
        "bonding_height": 16380988,
        "is_unbonding": false,
        "node": {
          "host": "94.143.231.195",
          "custom_http_port": 8080,
          "identity_key": "E67dRcrMNsEpNvRAxvFTkvMyqigTYpRWUYYPm25rDuGQ"
        }
      },
      "rewarding_details": {
        "cost_params": {
          "profit_margin_percent": "0.2",
          "interval_operating_cost": {
            "denom": "unym",
            "amount": "80000000"
          }
        },
        "operator": "17460861134.830936160640492906",
        "delegates": "257508697946.918840668874723276",
        "total_unit_reward": "66985424.766269536600989211",
        "unit_delegation": "1000000000",
        "last_rewarded_epoch": 28429,
        "unique_delegations": 4
      },
      "pending_changes": {
        "pledge_change": null,
        "cost_params_change": 2291
      }
    }
  }
}

Intent: parse owned-node details from smart-query response for a wallet.


get_owned_node_details


def get_owned_node_details(
    session:Any, wallet_address:str, nyx_wasm_rest:str, mixnet_contract:str, timeout:int=20, logger:NoneType=None
)->Tuple[Optional[int], str, int]:

Returns (node_id, identity_key, self_bonded_unym_from_original_pledge).

If wallet has no owned node: (None, ““, 0).

fake = _FakeSession([_FakeResp({"data": {"ok": True}})])
# This fake returns no bond information, so node metadata should be empty.
nid, ik, pledge = get_owned_node_details(fake, "n1", nyx_wasm_rest="x", mixnet_contract="c")
assert nid is None and ik == "" and pledge == 0

Source 1: Spectre balance parser

Intent: parse Spectre response into our normalized BalanceBreakdown (unym ints).

We keep this pure: “JSON in → dataclass out”.

WALLET="n127c69pasr35p76amfczemusnutr8mtw78s8xl7"
URL="https://api.nym.spectredao.net/api/v1/balances/${WALLET}"
curl -s "$URL" | jq '.'
{
  "delegated": {
    "amount": 200977025706,
    "denom": "unym"
  },
  "locked": {
    "amount": 0,
    "denom": "unym"
  },
  "rewards": {
    "operator_commissions": {
      "amount": 2460861134,
      "denom": "unym"
    },
    "staking_rewards": {
      "amount": 524574066,
      "denom": "unym"
    },
    "unlocked": {
      "amount": 0,
      "denom": "unym"
    }
  },
  "self_bonded": {
    "amount": 15000000000,
    "denom": "unym"
  },
  "spendable": {
    "amount": 60639227,
    "denom": "unym"
  },
  "total": {
    "amount": 219023100133,
    "denom": "unym"
  }
}

Intent: normalize Spectre numeric amount fields to integer unym.


balances_from_spectre_json


def balances_from_spectre_json(
    js:Dict[str, Any]
)->BalanceBreakdown:
example = {
  "delegated": {"amount": 200977025706, "denom": "unym"},
  "locked": {"amount": 0, "denom": "unym"},
  "rewards": {
      "operator_commissions": {"amount": 2215364099, "denom": "unym"},
      "staking_rewards": {"amount": 280778666, "denom": "unym"},
      "unlocked": {"amount": 0, "denom": "unym"}
  },
  "self_bonded": {"amount": 15000000000, "denom": "unym"},
  "spendable": {"amount": 60639227, "denom": "unym"},
  "total": {"amount": 218533807698, "denom": "unym"},
}

b = balances_from_spectre_json(example)
assert b.total_unym() == 218533807698

Source 1: Spectre fetcher

Intent: fetch Spectre balance JSON and convert it into BalanceBreakdown.


fetch_balances_spectre


def fetch_balances_spectre(
    session:Any, wallet_address:str,
    spectre_balance_url:str='https://api.nym.spectredao.net/api/v1/balances/{address}', timeout:int=20,
    logger:NoneType=None
)->BalanceBreakdown:
# offline verify: we only test the parser above.
assert callable(fetch_balances_spectre)

Intent: run the same helper against the real API so notebook readers can inspect actual values (without failing tests if network is unavailable).

live_session = build_session("nym-node-reward-tracker/notebook-live")
live_wallet = "n127c69pasr35p76amfczemusnutr8mtw78s8xl7"

live_spectre = fetch_balances_spectre(live_session, live_wallet)
assert isinstance(live_spectre, BalanceBreakdown)
live_spectre
BalanceBreakdown(delegated_unym=200977025706, self_bonded_unym=15000000000, spendable_unym=60639227, locked_unym=0, rewards_operator_commissions_unym=2460861134, rewards_staking_rewards_unym=524574066, rewards_unlocked_unym=0)

Source 2: Cosmos snapshot builder (best-effort, chain-native)

Intent: query the bank module for unym spendable balance.

WALLET="n127c69pasr35p76amfczemusnutr8mtw78s8xl7"
NYX_BANK_REST="https://api.nymtech.net/cosmos/bank/v1beta1"
URL="${NYX_BANK_REST}/balances/${WALLET}"

curl -s "$URL" | jq '.'
{
  "balances": [
    {
      "denom": "unym",
      "amount": "60639227"
    }
  ],
  "pagination": {
    "next_key": null,
    "total": "1"
  }
}

Intent: fetch spendable unym from bank balances endpoint.


fetch_spendable_unym_bank


def fetch_spendable_unym_bank(
    session:Any, wallet_address:str, nyx_bank_rest:str='https://api.nymtech.net/cosmos/bank/v1beta1', timeout:int=20,
    logger:NoneType=None
)->int:
sample_bank = {"balances": [{"denom": "unym", "amount": "123"}]}
class _BankSession:
    def get(self, url, params=None, timeout=0):
        return _FakeResp(sample_bank)

assert fetch_spendable_unym_bank(_BankSession(), "n1", nyx_bank_rest="https://x") == 123

Intent: list current delegations for a wallet (smart query), and sum principal amounts in unym.

WALLET="n127c69pasr35p76amfczemusnutr8mtw78s8xl7"
NYX_WASM_REST="https://api.nymtech.net/cosmwasm/wasm/v1/contract"
MIXNET_CONTRACT="n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr"

# Single-page example of get_delegator_delegations (like one call inside get_delegator_delegations_current)
PAYLOAD=$(jq -cn --arg w "$WALLET" '{get_delegator_delegations:{delegator:$w, limit:5}}')
echo $PAYLOAD
Q=$(printf '%s' "$PAYLOAD" | base64 -w 0)

curl -s "${NYX_WASM_REST}/${MIXNET_CONTRACT}/smart/${Q}" | jq '.'
{"get_delegator_delegations":{"delegator":"n127c69pasr35p76amfczemusnutr8mtw78s8xl7","limit":5}}
{
  "data": {
    "delegations": [
      {
        "owner": "n127c69pasr35p76amfczemusnutr8mtw78s8xl7",
        "node_id": 2933,
        "cumulative_reward_ratio": "669788.748216958693826745",
        "amount": {
          "denom": "unym",
          "amount": "200977025706"
        },
        "height": 22222929,
        "proxy": null
      }
    ],
    "start_next_after": [
      2933,
      "n127c69pasr35p76amfczemusnutr8mtw78s8xl7"
    ]
  }
}

Intent: fetch current delegations and provide a principal summation helper.


sum_delegated_unym


def sum_delegated_unym(
    delegations:List[Dict[str, Any]]
)->int:

get_delegator_delegations_current


def get_delegator_delegations_current(
    session:Any, wallet_address:str, nyx_wasm_rest:str, mixnet_contract:str, limit:int=200, max_pages:int=50,
    timeout:int=20, logger:NoneType=None
)->List[Dict[str, Any]]:

Returns a flat list of delegations from contract query get_delegator_delegations, handling pagination via start_after if present.

assert sum_delegated_unym([{"amount": {"amount": "10", "denom": "unym"}}, {"amount": {"amount": "2"}}]) == 12

Intent: fetch pending delegator reward for a (delegator,node_id) pair and return unym int.

WALLET="n127c69pasr35p76amfczemusnutr8mtw78s8xl7"
NODE_ID="2933"
NYX_WASM_REST="https://api.nymtech.net/cosmwasm/wasm/v1/contract"
MIXNET_CONTRACT="n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr"

PAYLOAD=$(jq -cn --arg w "$WALLET" --argjson n "$NODE_ID" '{get_pending_delegator_reward:{address:$w,node_id:$n}}')
echo $PAYLOAD
Q=$(printf '%s' "$PAYLOAD" | base64 -w 0)

curl -s "${NYX_WASM_REST}/${MIXNET_CONTRACT}/smart/${Q}" | jq '.'
{"get_pending_delegator_reward":{"address":"n127c69pasr35p76amfczemusnutr8mtw78s8xl7","node_id":2933}}
{
  "data": {
    "amount_staked": {
      "denom": "unym",
      "amount": "200977025706"
    },
    "amount_earned": {
      "denom": "unym",
      "amount": "524574066"
    },
    "amount_earned_detailed": "524574066.198570905667208663",
    "mixnode_still_fully_bonded": true,
    "node_still_fully_bonded": true
  }
}

Intent: fetch pending delegator reward amount for a node and wallet.


get_pending_delegator_reward_unym


def get_pending_delegator_reward_unym(
    session:Any, nyx_wasm_rest:str, mixnet_contract:str, node_id:int, delegator:str, timeout:int=20,
    logger:NoneType=None
)->int:

Intent: provide a tiny fake session for deterministic verification of pending-reward query parsing.

class _PendingSession:
    def get(self, url, params=None, timeout=0):
        return _FakeResp({"data": {"amount_earned": {"amount": "77", "denom": "unym"}}})

assert get_pending_delegator_reward_unym(
    _PendingSession(),
    nyx_wasm_rest="https://x",
    mixnet_contract="c",
    node_id=1,
    delegator="n1",
) == 77

Intent: fetch pending operator reward if available. Because the exact query variant may differ across contract versions, we implement a conservative fallback strategy:

  1. try get_pending_operator_reward by owner address
  2. try get_pending_node_operator_reward by node_id
  3. else return 0 (and let higher-level code report “unavailable” if desired)
WALLET="n127c69pasr35p76amfczemusnutr8mtw78s8xl7"
NODE_ID="2933"
NYX_WASM_REST="https://api.nymtech.net/cosmwasm/wasm/v1/contract"
MIXNET_CONTRACT="n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr"

# Attempt 1 (same as function): address-based get_pending_operator_reward
PAYLOAD=$(jq -cn --arg w "$WALLET" '{get_pending_operator_reward:{address:$w}}')
echo $PAYLOAD
Q=$(printf '%s' "$PAYLOAD" | base64 -w 0)

curl -s "${NYX_WASM_REST}/${MIXNET_CONTRACT}/smart/${Q}" | jq '.'
{"get_pending_operator_reward":{"address":"n127c69pasr35p76amfczemusnutr8mtw78s8xl7"}}
{
  "data": {
    "amount_staked": {
      "denom": "unym",
      "amount": "15000000000"
    },
    "amount_earned": {
      "denom": "unym",
      "amount": "2460861134"
    },
    "amount_earned_detailed": "2460861134.830936160640492906",
    "mixnode_still_fully_bonded": true,
    "node_still_fully_bonded": true
  }
}

Intent: fetch pending operator reward amount for wallet-owned node when available.


get_pending_operator_reward_unym


def get_pending_operator_reward_unym(
    session:Any, nyx_wasm_rest:str, mixnet_contract:str, owner_address:str, node_id:Optional[int], timeout:int=20,
    logger:NoneType=None
)->int:
# verify_get_pending_operator_reward_unym
class _OpSession:
    def get(self, *args, **kwargs):
        class R:
            status_code = 200
            text = "{}"
            def raise_for_status(self): return None
            def json(self):
                return {"data": {"amount_earned": {"amount": "7"}}}
        return R()
out = get_pending_operator_reward_unym(
    _OpSession(),
    nyx_wasm_rest="https://x",
    mixnet_contract="c",
    owner_address="n1",
    node_id=14,
)
assert out == 7
assert callable(get_pending_operator_reward_unym)

Cosmos snapshot assembly

Intent: build the full BalanceBreakdown from Cosmos endpoints + smart queries.


fetch_balances_cosmos


def fetch_balances_cosmos(
    session:Any, wallet_address:str, nyx_wasm_rest:str, nyx_bank_rest:str, mixnet_contract:str, timeout:int=20,
    logger:NoneType=None
)->Tuple[BalanceBreakdown, Optional[int], str]:
assert callable(fetch_balances_cosmos)

Intent: run the cosmos snapshot helper against real APIs so readers can inspect chain-native values without breaking tests when network is unavailable.

live_session = build_session("nym-node-reward-tracker/notebook-live")
live_wallet = "n127c69pasr35p76amfczemusnutr8mtw78s8xl7"

live_cosmos_balances, live_cosmos_node_id, live_cosmos_identity_key = fetch_balances_cosmos(
    live_session,
    live_wallet,
    nyx_wasm_rest=DEFAULT_NYX_WASM_REST,
    nyx_bank_rest=DEFAULT_NYX_BANK_REST,
    mixnet_contract=DEFAULT_MIXNET_CONTRACT,
)
assert isinstance(live_cosmos_balances, BalanceBreakdown)
live_cosmos_balances
BalanceBreakdown(delegated_unym=200977025706, self_bonded_unym=15000000000, spendable_unym=60639227, locked_unym=0, rewards_operator_commissions_unym=2460861134, rewards_staking_rewards_unym=524574066, rewards_unlocked_unym=0)

Snapshot orchestration

Intent: for each wallet, pick source → fetch balances → emit WalletSnapshot.

All network calls are inside fetch functions; this orchestration is “pure glue”.


build_wallet_snapshot


def build_wallet_snapshot(
    session:Any, wallet_address:str, tag:str, source:SnapshotSource, nyx_wasm_rest:str, nyx_bank_rest:str,
    mixnet_contract:str, spectre_balance_url:str, timeout:int, logger
)->WalletSnapshot:
# verify_build_wallet_snapshot
class _S:
    def get(self, *args, **kwargs):
        class R:
            status_code = 200
            text = "{}"
            def raise_for_status(self): return None
            def json(self):
                return {"data": {"details": None}, "balances": []}
        return R()
ws = build_wallet_snapshot(
    _S(),
    wallet_address="n1",
    tag="t",
    source="spectre",
    nyx_wasm_rest="https://x",
    nyx_bank_rest="https://x",
    mixnet_contract="c",
    spectre_balance_url="https://x/{address}",
    timeout=1,
    logger=None,
)
assert ws.wallet_address == "n1"
assert callable(build_wallet_snapshot)

Persistence and history

Intent: store a rolling per-wallet history (default 30 days) in YAML.

We store unym totals only (lossless, minimal). Later, if desired, extend with more fields.


scaled_window_change_unym


def scaled_window_change_unym(
    history:Dict[str, Any], wallet:str, now_ts:float, window_days:float, current_total_unym:int
)->Optional[Dict[str, Any]]:

record_history_point


def record_history_point(
    history:Dict[str, Any], wallet:str, ts:float, total_unym:int
)->None:

prune_history_older_than


def prune_history_older_than(
    history:Dict[str, Any], cutoff_ts:float
)->Dict[str, Any]:
h = {"w": [{"ts": 0.0, "total_unym": 10}]}
out = scaled_window_change_unym(h, wallet="w", now_ts=8 * 24 * 3600, window_days=7, current_total_unym=17)
assert out is not None

Tabular output (latest snapshot)

Intent: write a simple “latest snapshot” table file (overwritten each run) for quick inspection and downstream tooling. Format is selected by extension (.csv or .xlsx).


log_snapshot_table


def log_snapshot_table(
    rows:List[WalletSnapshot], history:Dict[str, Any], now_ts:float, logger
)->None:

write_latest_snapshot_csv


def write_latest_snapshot_csv(
    rows:List[WalletSnapshot], out_path:str | Path
)->None:
from tempfile import TemporaryDirectory
sample = WalletSnapshot(
    timestamp_utc="2026-01-01T00:00:00+00:00",
    wallet_address="n1wallet",
    tag="test",
    node_id=2196,
    identity_key="identity",
    balances=BalanceBreakdown(
        delegated_unym=1_500_000,
        self_bonded_unym=2_000_000,
        spendable_unym=250_000,
    ),
    source="spectre",
)

with TemporaryDirectory() as td:
    p_csv = Path(td) / "latest.csv"
    write_latest_snapshot_csv([sample], p_csv)
    assert p_csv.exists()

    p_xlsx = Path(td) / "latest.xlsx"
    write_latest_snapshot_csv([sample], p_xlsx)
    assert p_xlsx.exists()

assert callable(log_snapshot_table)

Main entry point: run_snapshot

Intent: the CLI-facing function run_snapshot(...):

  • loads wallets
  • fetches snapshots
  • updates history yaml
  • writes latest table output (.csv or .xlsx)
  • logs a compact report including 7d/30d scaled change

run_snapshot


def run_snapshot(
    data_dir:str | Path=Path('data'), wallets_csv:str | Path='wallet-addresses.csv',
    out_csv:str | Path='node-balances.csv', hist_file:str | Path='data.yaml', source:SnapshotSource='spectre',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract',
    nyx_bank_rest:str='https://api.nymtech.net/cosmos/bank/v1beta1',
    mixnet_contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    spectre_balance_url:str | None='https://api.nym.spectredao.net/api/v1/balances/{address}',
    spectre_nodes_url:str | None=None, validator_bonded_url:str | None=None, validator_described_url:str | None=None,
    history_days:int=30, timeout:int=20, log_file:str | None=None, log_level:str='INFO'
)->int:
# verify_run_snapshot_use
from tempfile import TemporaryDirectory
with TemporaryDirectory() as td:
    data_dir = Path(td)
    wallets = data_dir / "__missing__.csv"
    wallets.write_text("address,tag\n", encoding="utf-8")
    rc = run_snapshot(data_dir=str(data_dir), wallets_csv="__missing__.csv")
    assert isinstance(rc, int)
2026-02-12T13:37:00 | ERROR | nym_snapshot | No wallets found in /tmp/tmpwdesur12/__missing__.csv
# verify_run_snapshot_signature
assert run_snapshot.__name__ == "run_snapshot"
assert callable(run_snapshot)
  • Report an issue