nym_node_reward_tracker
  1. Cache: chain-native event cache for epoch-by-epoch rewards
  • Nym node reward tracker (docs index)
  • Common utilities
  • Cosmos Tx Parsing Helpers
  • Snapshot tracker
  • Cache: chain-native event cache for epoch-by-epoch rewards
  • Epoch-by-epoch reward replay
  • Reward transaction export
  • CLI
  • Node Interest Rates Walkthrough on Nyx (last 120 hours)
  • Notebook tests

On this page

  • Cache: chain-native event cache for epoch-by-epoch rewards
    • Imports and constants
    • API “shape” probes (bash cells)
    • Shared tx parsing imports
    • Row extractors (node-centric and wallet-centric)
    • reward_rows_from_tx
    • delegation_rows_from_tx
    • withdraw_rows_from_tx
    • wallet_action_rows_from_tx
    • SQLite schema (with a dedicated wallet-actions table)
    • connect_db
    • init_schema
    • insert_wallet_action_rows
    • insert_withdraw_rows
    • insert_delegation_rows
    • insert_reward_rows
    • replace_current_pending_reward_state
    • replace_current_delegation_state
    • upsert_current_rewarding_state
    • upsert_wallet_node_link
    • upsert_node_metadata
    • Scan manifest helpers (generic entity_type/entity_id)
    • min_wallet_scanned_start_height
    • max_wallet_scanned_end_height
    • record_wallet_scan_window
    • wallet_window_done
    • min_scanned_start_height
    • max_scanned_end_height
    • record_scan_window
    • window_done
    • Window scanners
    • scan_node_rewards
    • scan_node_delegations
    • scan_node_withdrawals
    • scan_wallet_actions
    • Cached read helpers
    • get_cached_wallet_actions
    • get_cached_current_pending_reward_state
    • get_cached_current_rewarding_state
    • get_cached_current_delegation_state
    • get_cached_withdraw_events
    • get_cached_delegation_events
    • get_cached_reward_events
    • Derive “historical node set” per wallet from wallet_actions
    • wallet_nodes_from_cache
    • Current-state snapshots
    • fetch_latest_height
    • get_node_metadata
    • get_node_delegations_current
    • get_owned_node_id
    • get_delegator_delegations
    • discover_wallet_nodes
    • get_pending_delegator_reward
    • run_cache orchestration (wallet actions first)
    • run_cache
    • Deterministic unit checks (offline)
  • Report an issue

Other Formats

  • CommonMark

Cache: chain-native event cache for epoch-by-epoch rewards

This notebook builds a local SQLite cache that makes epoch-by-epoch reward reconstruction feasible and repeatable.

Why it exists: * CosmWasm smart queries (/cosmwasm/wasm/v1/contract/.../smart/...) only expose current state. * Epoch-by-epoch reconstruction needs historical events, which come from Cosmos tx search (/cosmos/tx/v1beta1/txs). * Tx search must be queried in height slices; large queries time out.

What the cache stores: * Node-level reward events (wasm-v2_node_rewarding) * Node-level delegation/undelegation and withdrawal events * Wallet-level “actions” (delegation, undelegation, withdraw…) so we can discover nodes a wallet interacted with in the past * A scan manifest table so scans are incremental and idempotent

Design rules: * Idempotent inserts via UNIQUE constraints * Store big numbers as TEXT to avoid float surprises

Imports and constants

Intent: define endpoints, defaults, and event type taxonomy once, at the top.

API “shape” probes (bash cells)

Intent: demonstrate the Cosmos tx search query format and the shape of tx_responses[].events[].

NYX_TX="https://api.nymtech.net/cosmos/tx/v1beta1/txs"
NODE_ID="14"
START="21312000"
END="21314000"

curl -sG "$NYX_TX"   --data-urlencode "query=tx.height>=$START AND tx.height<=$END AND wasm-v2_node_rewarding.node_id='$NODE_ID'"   --data-urlencode "pagination.limit=1"   --data-urlencode "pagination.offset=0"   --data-urlencode "order_by=ORDER_BY_DESC" | jq '.tx_responses[0] | {height, txhash, timestamp, rewarding_events: ([.events[] | select(.type=="wasm-v2_node_rewarding")][0:1])}'
{
  "height": "21313861",
  "txhash": "B734F05F2CD884206FAB4A51AEBCBD185D421F51736E82D4AE7D98263159FDFA",
  "timestamp": "2025-12-04T18:26:53Z",
  "rewarding_events": [
    {
      "type": "wasm-v2_node_rewarding",
      "attributes": [
        {
          "key": "_contract_address",
          "value": "n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr",
          "index": true
        },
        {
          "key": "interval_details",
          "value": "26760",
          "index": true
        },
        {
          "key": "prior_delegates",
          "value": "1073730365040.330027110287110781",
          "index": true
        },
        {
          "key": "prior_unit_reward",
          "value": "270818456.314079051771011094",
          "index": true
        },
        {
          "key": "node_id",
          "value": "14",
          "index": true
        },
        {
          "key": "operator_reward",
          "value": "4272973.556427493591543445",
          "index": true
        },
        {
          "key": "delegates_reward",
          "value": "11419372.22422543896658992",
          "index": true
        },
        {
          "key": "msg_index",
          "value": "0",
          "index": true
        }
      ]
    }
  ]
}

Intent: demonstrate a wallet-driven query (message.sender + contract) for discovering historical delegation/withdraw actions.

NYX_TX="https://api.nymtech.net/cosmos/tx/v1beta1/txs"
WALLET="n127c69pasr35p76amfczemusnutr8mtw78s8xl7"
START="22000000"
END="22300000"

curl -sG "$NYX_TX"   --data-urlencode "query=tx.height>=$START AND tx.height<=$END AND message.sender='$WALLET'"   --data-urlencode "pagination.limit=1"   --data-urlencode "pagination.offset=0"   --data-urlencode "order_by=ORDER_BY_DESC" | jq '{total: .pagination.total, sample: (.tx_responses[0] | {height, txhash, wallet_events: ([.events[] | select((.type|startswith("wasm-v2_")) or .type=="transfer" or .type=="coin_spent" or .type=="coin_received")][0:6])})}'
{
  "total": null,
  "sample": {
    "height": "22250074",
    "txhash": "C2AFFF3A8A967B59EB18B4C8EDF4A180688D7EA5E8B1BE3F37916B98723DC420",
    "wallet_events": [
      {
        "type": "coin_spent",
        "attributes": [
          {
            "key": "spender",
            "value": "n127c69pasr35p76amfczemusnutr8mtw78s8xl7",
            "index": true
          },
          {
            "key": "amount",
            "value": "4874unym",
            "index": true
          }
        ]
      },
      {
        "type": "coin_received",
        "attributes": [
          {
            "key": "receiver",
            "value": "n17xpfvakm2amg962yls6f84z3kell8c5lza5z5c",
            "index": true
          },
          {
            "key": "amount",
            "value": "4874unym",
            "index": true
          }
        ]
      },
      {
        "type": "transfer",
        "attributes": [
          {
            "key": "recipient",
            "value": "n17xpfvakm2amg962yls6f84z3kell8c5lza5z5c",
            "index": true
          },
          {
            "key": "sender",
            "value": "n127c69pasr35p76amfczemusnutr8mtw78s8xl7",
            "index": true
          },
          {
            "key": "amount",
            "value": "4874unym",
            "index": true
          }
        ]
      },
      {
        "type": "wasm-v2_pending_cost_params_update",
        "attributes": [
          {
            "key": "_contract_address",
            "value": "n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr",
            "index": true
          },
          {
            "key": "node_id",
            "value": "2196",
            "index": true
          },
          {
            "key": "updated_mixnode_cost_params",
            "value": "{\"profit_margin_percent\":\"0.2\",\"interval_operating_cost\":{\"denom\":\"unym\",\"amount\":\"400000000\"}}",
            "index": true
          },
          {
            "key": "msg_index",
            "value": "0",
            "index": true
          }
        ]
      }
    ]
  }
}

Shared tx parsing imports

Intent: reuse shared tx-shape parsing helpers from 00_cosmos_tx_parsing.ipynb instead of maintaining duplicate local parsers in cache and reward notebooks.

sample = extract_events({"events": [{"type": "x", "attributes": [{"key": "a", "value": "1"}]}]})
assert sample[0]["attributes"]["a"] == "1"

Row extractors (node-centric and wallet-centric)

Intent: standardize how we read height/txhash/timestamp and how we derive node_id from event attributes.

assert _node_id_from_attrs({"node_id": "14"}) == 14
assert _node_id_from_attrs({"delegation_target": "2933"}) == 2933
assert _node_id_from_attrs({}) is None

Reward rows (node_rewarding)

Intent: extract only the minimal fields needed for epoch-by-epoch replay from wasm-v2_node_rewarding events for a given node_id.


reward_rows_from_tx


def reward_rows_from_tx(
    tx_response:Dict[str, Any], node_id:int,
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr'
)->List[Dict[str, Any]]:
synth = {
    "height": "10",
    "txhash": "H",
    "timestamp": "2026-01-01T00:00:00Z",
    "events": [
        {
            "type": "wasm-v2_node_rewarding",
            "attributes": [
                {"key": "_contract_address", "value": DEFAULT_MIXNET_CONTRACT},
                {"key": "node_id", "value": "14"},
                {"key": "interval_details", "value": "2"},
                {"key": "prior_unit_reward", "value": "2.0"},
                {"key": "prior_delegates", "value": "1.0"},
                {"key": "delegates_reward", "value": "3.0"},
                {"key": "operator_reward", "value": "4.0"},
                {"key": "msg_index", "value": "0"},
            ],
        }
    ],
}
rows = reward_rows_from_tx(synth, 14)
assert rows and rows[0]["epoch"] == 2 and rows[0]["operator_reward"] == "4.0"

Delegation rows (node-centric)

Intent: cache delegation and undelegation events per node, including a signed delta where available.

Note: some undelegation events may not include an amount; replay logic can treat them as “full removal”.


delegation_rows_from_tx


def delegation_rows_from_tx(
    tx_response:Dict[str, Any], node_id:int,
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr'
)->List[Dict[str, Any]]:
# verify_delegation_rows_from_tx
synth = {
    "height": "1",
    "txhash": "D",
    "timestamp": "2026-01-01T00:00:00Z",
    "events": [{"type": "wasm-v2_delegation", "attributes": [
        {"key": "_contract_address", "value": DEFAULT_MIXNET_CONTRACT},
        {"key": "delegation_target", "value": "14"},
        {"key": "owner", "value": "n1"},
        {"key": "amount", "value": "10unym"},
    ]}],
}
rows = delegation_rows_from_tx(synth, 14)
assert len(rows) == 1
assert rows[0]["delegator"] == "n1"

Withdraw rows (node-centric)

Intent: cache reward withdrawals per node (delegator + operator withdrawals) so replay can reset bookmarks appropriately.


withdraw_rows_from_tx


def withdraw_rows_from_tx(
    tx_response:Dict[str, Any], node_id:int,
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr'
)->List[Dict[str, Any]]:
# verify_withdraw_rows_from_tx
synth = {
    "height": "1",
    "txhash": "W",
    "timestamp": "2026-01-01T00:00:00Z",
    "events": [{"type": "wasm-v2_withdraw_operator_reward", "attributes": [
        {"key": "_contract_address", "value": DEFAULT_MIXNET_CONTRACT},
        {"key": "mix_id", "value": "14"},
        {"key": "owner", "value": "n1"},
        {"key": "amount", "value": "9unym"},
    ]}],
}
rows = withdraw_rows_from_tx(synth, 14)
assert len(rows) == 1
assert rows[0]["amount_unym"] == "9"

Wallet action rows

Intent: cache “wallet actions” so we can answer: “which nodes has this wallet ever interacted with?”

Without it, the cache only knows about currently-delegated nodes and cannot reconstruct historical wallet income flows.


wallet_action_rows_from_tx


def wallet_action_rows_from_tx(
    tx_response:Dict[str, Any], wallet_address:str,
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr'
)->List[Dict[str, Any]]:

Extract wallet-centric actions from tx events.

Output rows are minimal and stable: - wallet_address, action_kind, event_type - node_id (if any) - amount_unym (if any) - height/txhash/timestamp

Notes: - mixnet contract events are filtered by _contract_address == contract - generic bank events (transfer/coin_spent/coin_received/message) are wallet-scoped and do not require mixnet contract filtering

wallet = "n1wallet"
synth_wallet_tx = {
    "height": "50",
    "txhash": "TX",
    "timestamp": "2026-01-01T00:00:00Z",
    "events": [
        {"type": "wasm-v2_pending_delegation", "attributes": [
            {"key": "_contract_address", "value": DEFAULT_MIXNET_CONTRACT},
            {"key": "delegator", "value": wallet},
            {"key": "delegation_target", "value": "2933"},
            {"key": "amount", "value": "123unym"},
        ]},
        {"type": "wasm-v2_withdraw_delegator_reward", "attributes": [
            {"key": "_contract_address", "value": DEFAULT_MIXNET_CONTRACT},
            {"key": "delegator", "value": wallet},
            {"key": "delegation_target", "value": "2933"},
            {"key": "amount", "value": "77unym"},
        ]},
        {"type": "transfer", "attributes": [
            {"key": "sender", "value": "n1other"},
            {"key": "recipient", "value": wallet},
            {"key": "amount", "value": "5unym"},
        ]},
    ],
}
rows = wallet_action_rows_from_tx(synth_wallet_tx, wallet)
assert {r["action_kind"] for r in rows} == {"delegation_pending", "withdraw_delegator_reward", "transfer_receive"}
assert next(r for r in rows if r["action_kind"] == "delegation_pending")["node_id"] == 2933

SQLite schema (with a dedicated wallet-actions table)

Intent: define a schema that is: * minimal (stores only what replay needs), * idempotent (UNIQUE constraints), * incrementally scannable (scan windows manifest), * and wallet-history aware (wallet actions table).


connect_db


def connect_db(
    path:str | Path='data/nym_cache.sqlite'
)->sqlite3.Connection:

Open a SQLite DB with sane defaults for a write-heavy cache: - creates parent dir - row_factory=sqlite3.Row for dict(row) - WAL mode for better concurrent read/write

import tempfile
with tempfile.TemporaryDirectory() as td:
    c = connect_db(Path(td) / "t.sqlite")
    assert isinstance(c, sqlite3.Connection)

init_schema


init_schema


def init_schema(
    conn:sqlite3.Connection
)->None:

Create or migrate schema.

Migration policy (simple, notebook-friendly): - version 0: create fresh schema - version 1: attempt to migrate scan_windows(node_id,…) -> scan_windows(entity_type, entity_id,…) - then ensure wallet_actions exists

import tempfile
with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "unit.sqlite")
    init_schema(conn)
    v = conn.execute("SELECT value FROM meta WHERE key='schema_version'").fetchone()[0]
    assert int(v) == SCHEMA_VERSION

Idempotent insert helpers

Intent: small, obvious helpers that insert derived rows and rely on UNIQUE constraints for idempotency.


insert_wallet_action_rows


def insert_wallet_action_rows(
    conn:sqlite3.Connection, rows:List[Dict[str, Any]]
)->int:

insert_withdraw_rows


def insert_withdraw_rows(
    conn:sqlite3.Connection, rows:List[Dict[str, Any]]
)->int:

insert_delegation_rows


def insert_delegation_rows(
    conn:sqlite3.Connection, rows:List[Dict[str, Any]]
)->int:

insert_reward_rows


def insert_reward_rows(
    conn:sqlite3.Connection, rows:List[Dict[str, Any]]
)->int:

replace_current_pending_reward_state


def replace_current_pending_reward_state(
    conn:sqlite3.Connection, node_id:int, rows:List[Dict[str, Any]]
)->None:

replace_current_delegation_state


def replace_current_delegation_state(
    conn:sqlite3.Connection, node_id:int, rows:List[Dict[str, Any]]
)->None:

upsert_current_rewarding_state


def upsert_current_rewarding_state(
    conn:sqlite3.Connection, node_id:int, total_unit_reward:str, unique_delegations:int
)->None:

upsert_wallet_node_link


def upsert_wallet_node_link(
    conn:sqlite3.Connection, wallet_address:str, node_id:int, relation:str, start_height:Optional[int]
)->None:

upsert_node_metadata


def upsert_node_metadata(
    conn:sqlite3.Connection, node_id:int, bonding_height:int, unit_delegation:str, contract:str
)->None:
import tempfile
with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "t.sqlite")
    init_schema(conn)

    local_tx = {
        "height": "10",
        "txhash": "H",
        "timestamp": "2026-01-01T00:00:00Z",
        "events": [{
            "type": "wasm-v2_node_rewarding",
            "attributes": [
                {"key": "_contract_address", "value": DEFAULT_MIXNET_CONTRACT},
                {"key": "node_id", "value": "14"},
                {"key": "interval_details", "value": "2"},
                {"key": "prior_unit_reward", "value": "2.0"},
                {"key": "prior_delegates", "value": "1.0"},
                {"key": "delegates_reward", "value": "3.0"},
                {"key": "operator_reward", "value": "4.0"},
                {"key": "msg_index", "value": "0"},
            ],
        }],
    }

    r = reward_rows_from_tx(local_tx, 14)
    assert insert_reward_rows(conn, r) == 1
    assert insert_reward_rows(conn, r) == 0  # idempotent

Scan manifest helpers (generic entity_type/entity_id)

Intent: record which height windows have been scanned successfully, so cache updates are incremental and restartable.


min_wallet_scanned_start_height


def min_wallet_scanned_start_height(
    conn:sqlite3.Connection, wallet_address:str, event_kind:str
)->Optional[int]:

max_wallet_scanned_end_height


def max_wallet_scanned_end_height(
    conn:sqlite3.Connection, wallet_address:str, event_kind:str
)->Optional[int]:

record_wallet_scan_window


def record_wallet_scan_window(
    conn:sqlite3.Connection, wallet_address:str, event_kind:str, start_height:int, end_height:int, status:str,
    endpoint:str, note:str=''
)->None:

wallet_window_done


def wallet_window_done(
    conn:sqlite3.Connection, wallet_address:str, event_kind:str, start_height:int, end_height:int
)->bool:

min_scanned_start_height


def min_scanned_start_height(
    conn:sqlite3.Connection, node_id:int, event_kind:str
)->Optional[int]:

max_scanned_end_height


def max_scanned_end_height(
    conn:sqlite3.Connection, node_id:int, event_kind:str
)->Optional[int]:

record_scan_window


def record_scan_window(
    conn:sqlite3.Connection, node_id:int, event_kind:str, start_height:int, end_height:int, status:str, endpoint:str,
    note:str=''
)->None:

window_done


def window_done(
    conn:sqlite3.Connection, node_id:int, event_kind:str, start_height:int, end_height:int
)->bool:
import tempfile
with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "w.sqlite")
    init_schema(conn)
    record_wallet_scan_window(conn, wallet_address="n1", event_kind="wallet_actions", start_height=1, end_height=10, status="ok", endpoint=DEFAULT_NYX_TX_REST)
    conn.commit()
    assert wallet_window_done(conn, wallet_address="n1", event_kind="wallet_actions", start_height=1, end_height=10)

Window scanners

Intent: a single generic window scanning loop that: * enforces height slicing, * supports multiple queries per window, * deduplicates txhash across queries, * performs idempotent inserts, * records scan status.

# verify_scan_windows_generic
import tempfile
class _NoCallSession:
    def get(self, *args, **kwargs):
        raise AssertionError("no network call expected")

with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "t.sqlite")
    init_schema(conn)

    def _queries(_s, _e):
        return []

    def _extract(_tx, _entity):
        return []

    def _insert(_conn, rows):
        assert rows == []
        return 0

    stats = _scan_windows_generic(
        conn,
        _NoCallSession(),
        entity_type="node",
        entity_id="14",
        event_kind="reward",
        start_height=10,
        end_height=9,
        window_size=1000,
        nyx_tx_rest=DEFAULT_NYX_TX_REST,
        queries_for_window=_queries,
        extract_rows=_extract,
        insert_rows=_insert,
        force=False,
        page_size=100,
        max_pages=5,
        timeout=5,
        logger=logging.getLogger("test"),
    )
    assert stats["windows"] == 0
    assert stats["errors"] == 0
reward:14: 0window [00:00, ?window/s]reward:14: 0window [00:00, ?window/s]

concrete scanners


scan_node_rewards


def scan_node_rewards(
    conn:sqlite3.Connection, session:Any, node_id:int, start_height:int, end_height:int, window_size:int=2000,
    force:bool=False, page_size:int=100, max_pages:int=200, timeout:int=60,
    nyx_tx_rest:str='https://api.nymtech.net/cosmos/tx/v1beta1/txs',
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    logger:Optional[logging.Logger]=None
)->Dict[str, int]:
# verify_scan_node_rewards
import tempfile
class _NoCallSession:
    def get(self, *args, **kwargs):
        raise AssertionError("no network call expected")

with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "t.sqlite")
    init_schema(conn)
    stats = scan_node_rewards(
        conn,
        _NoCallSession(),
        node_id=14,
        start_height=10,
        end_height=9,
        logger=logging.getLogger("test"),
    )
    assert stats["windows"] == 0
    assert stats["errors"] == 0
reward:14: 0window [00:00, ?window/s]reward:14: 0window [00:00, ?window/s]

Intent: scan delegation-related tx events for one node across height windows.


scan_node_delegations


def scan_node_delegations(
    conn:sqlite3.Connection, session:Any, node_id:int, start_height:int, end_height:int, window_size:int=10000,
    force:bool=False, page_size:int=100, max_pages:int=200, timeout:int=60,
    nyx_tx_rest:str='https://api.nymtech.net/cosmos/tx/v1beta1/txs',
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    logger:Optional[logging.Logger]=None
)->Dict[str, int]:
# verify_scan_node_delegations
import tempfile
class _NoCallSession:
    def get(self, *args, **kwargs):
        raise AssertionError("no network call expected")

with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "t.sqlite")
    init_schema(conn)
    stats = scan_node_delegations(
        conn,
        _NoCallSession(),
        node_id=14,
        start_height=10,
        end_height=9,
        logger=logging.getLogger("test"),
    )
    assert stats["windows"] == 0
    assert stats["errors"] == 0
delegation:14: 0window [00:00, ?window/s]delegation:14: 0window [00:00, ?window/s]

Intent: scan withdraw-related tx events for one node across height windows.


scan_node_withdrawals


def scan_node_withdrawals(
    conn:sqlite3.Connection, session:Any, node_id:int, start_height:int, end_height:int, window_size:int=20000,
    force:bool=False, page_size:int=100, max_pages:int=200, timeout:int=60,
    nyx_tx_rest:str='https://api.nymtech.net/cosmos/tx/v1beta1/txs',
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    logger:Optional[logging.Logger]=None
)->Dict[str, int]:
# verify_scan_node_withdrawals
import tempfile
class _NoCallSession:
    def get(self, *args, **kwargs):
        raise AssertionError("no network call expected")

with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "t.sqlite")
    init_schema(conn)
    stats = scan_node_withdrawals(
        conn,
        _NoCallSession(),
        node_id=14,
        start_height=10,
        end_height=9,
        logger=logging.getLogger("test"),
    )
    assert stats["windows"] == 0
    assert stats["errors"] == 0
withdraw:14: 0window [00:00, ?window/s]withdraw:14: 0window [00:00, ?window/s]

Intent: scan wallet-originated and wallet-received tx actions for discovery and history.


scan_wallet_actions


def scan_wallet_actions(
    conn:sqlite3.Connection, session:Any, wallet_address:str, start_height:int, end_height:int,
    window_size:int=500000, force:bool=False, page_size:int=100, max_pages:int=200, timeout:int=60,
    nyx_tx_rest:str='https://api.nymtech.net/cosmos/tx/v1beta1/txs',
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    logger:Optional[logging.Logger]=None
)->Dict[str, int]:
# verify_scan_wallet_actions
import tempfile
class _NoCallSession:
    def get(self, *args, **kwargs):
        raise AssertionError("no network call expected")

with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "t.sqlite")
    init_schema(conn)
    stats = scan_wallet_actions(
        conn,
        _NoCallSession(),
        wallet_address="n1",
        start_height=10,
        end_height=9,
        logger=logging.getLogger("test"),
    )
    assert stats["windows"] == 0
    assert stats["errors"] == 0
wallet_actions:n1: 0window [00:00, ?window/s]wallet_actions:n1: 0window [00:00, ?window/s]

Cached read helpers


get_cached_wallet_actions


def get_cached_wallet_actions(
    conn:sqlite3.Connection, wallet_address:str, height_min:Optional[int]=None, height_max:Optional[int]=None
)->Any:

get_cached_current_pending_reward_state


def get_cached_current_pending_reward_state(
    conn:sqlite3.Connection, node_id:int
)->Any:

get_cached_current_rewarding_state


def get_cached_current_rewarding_state(
    conn:sqlite3.Connection, node_id:int
)->Any:

get_cached_current_delegation_state


def get_cached_current_delegation_state(
    conn:sqlite3.Connection, node_id:int
)->Any:

get_cached_withdraw_events


def get_cached_withdraw_events(
    conn:sqlite3.Connection, node_id:int, height_min:Optional[int]=None, height_max:Optional[int]=None
)->Any:

get_cached_delegation_events


def get_cached_delegation_events(
    conn:sqlite3.Connection, node_id:int, height_min:Optional[int]=None, height_max:Optional[int]=None
)->Any:

get_cached_reward_events


def get_cached_reward_events(
    conn:sqlite3.Connection, node_id:int, height_min:Optional[int]=None, height_max:Optional[int]=None
)->Any:
# verify_query_cached_rows
import tempfile
with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "q.sqlite")
    init_schema(conn)
    insert_reward_rows(conn, [{
        "node_id": 14,
        "height": 1,
        "txhash": "R",
        "timestamp": "",
        "epoch": 1,
        "prior_unit_reward": "1",
        "prior_delegates": "1",
        "delegates_reward": "1",
        "operator_reward": "1",
        "msg_index": 0,
        "contract_address": DEFAULT_MIXNET_CONTRACT,
    }])
    out = _query_cached_rows(conn, "reward_events", 14)
    assert len(out) == 1

Derive “historical node set” per wallet from wallet_actions

Intent: from cached wallet actions, produce the set of node_ids a wallet interacted with, including the earliest observed height per (wallet,node_id). This is used to ensure we scan nodes that were delegated to in the past, not only those currently delegated to.


wallet_nodes_from_cache


def wallet_nodes_from_cache(
    conn:sqlite3.Connection, wallet_address:str
)->Dict[int, int]:

Returns {node_id: first_seen_height} for nodes discovered via wallet_actions.

import tempfile
with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "x.sqlite")
    init_schema(conn)
    insert_wallet_action_rows(conn, [{
        "wallet_address": "n1",
        "height": 10,
        "txhash": "t",
        "timestamp": "",
        "action_kind": "delegation",
        "event_type": "wasm-v2_delegation",
        "node_id": 2933,
        "amount_unym": "1",
        "msg_index": 0,
        "contract_address": DEFAULT_MIXNET_CONTRACT,
    }])
    conn.commit()
    assert wallet_nodes_from_cache(conn, "n1")[2933] == 10

Current-state snapshots

Define -> use -> verify for live-state helpers used by run_cache:

  • latest height fetch,
  • node metadata,
  • current node delegations,
  • pending delegator rewards,
  • wallet->node discovery from owned + delegated + cached historical links.

Important semantic note: - these helpers are evaluated individually below, - and scanner/orchestration behavior is documented based on observed runtime logs (not only intended behavior).

API Quick Intro (Current-State Helpers)

These %%bash examples mirror the REST calls used by the helper functions below. They show request shape and the key JSON fields to inspect before reading the Python wrappers.

Latest height (fetch_latest_height): inspect block.header.height and block.header.time.

TM="https://api.nymtech.net/cosmos/base/tendermint/v1beta1"
curl -s "$TM/blocks/latest" | jq '{height:.block.header.height, time:.block.header.time}'
{
  "height": "22371647",
  "time": "2026-02-12T12:36:52.454968768Z"
}

Node metadata (get_node_metadata): inspect bonding height, identity, and rewarding fields.

WASM="https://api.nymtech.net/cosmwasm/wasm/v1/contract"
CONTRACT="n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr"
NODE_ID=2196

Q_DETAILS=$(jq -cn --argjson node_id "$NODE_ID" '{"get_nym_node_details":{"node_id":$node_id}}' | base64 | tr -d '\n')
Q_REWARD=$(jq -cn --argjson node_id "$NODE_ID" '{"get_node_rewarding_details":{"node_id":$node_id}}' | base64 | tr -d '\n')

DETAILS=$(curl -s "$WASM/$CONTRACT/smart/$Q_DETAILS")
REWARD=$(curl -s "$WASM/$CONTRACT/smart/$Q_REWARD")

jq -n --argjson d "$DETAILS" --argjson r "$REWARD" '{
  bonding_height: $d.data.details.bond_information.bonding_height,
  identity_key: $d.data.details.bond_information.identity_key,
  total_unit_reward: $r.data.rewarding_details.total_unit_reward,
  unique_delegations: $r.data.rewarding_details.unique_delegations
}'
{
  "bonding_height": 16380988,
  "identity_key": null,
  "total_unit_reward": "66985424.766269536600989211",
  "unique_delegations": 4
}

Current node delegations (get_node_delegations_current): inspect delegations[] and start_next_after for pagination.

WASM="https://api.nymtech.net/cosmwasm/wasm/v1/contract"
CONTRACT="n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr"
NODE_ID=2196

Q=$(jq -cn --argjson node_id "$NODE_ID" '{"get_node_delegations":{"node_id":$node_id,"start_after":null,"limit":2}}' | base64 | tr -d '\n')

curl -s "$WASM/$CONTRACT/smart/$Q" | jq '{count:(.data.delegations|length), next_key:.data.start_next_after, sample:(.data.delegations[0] // {})}'
{
  "count": 2,
  "next_key": "n1hhl9jd3rwk63sdkqjq58le7677lemtkgnq7mmh",
  "sample": {
    "owner": "n1gx3s4zenfs7qz0m742xvte2m0nh86rkz9ygq0q",
    "node_id": 2196,
    "cumulative_reward_ratio": "44429242.232656255693475965",
    "amount": {
      "denom": "unym",
      "amount": "5000146592"
    },
    "height": 20917079,
    "proxy": null
  }
}

Owned node lookup (get_owned_node_id): inspect data.details.bond_information.node_id.

WASM="https://api.nymtech.net/cosmwasm/wasm/v1/contract"
CONTRACT="n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr"
WALLET="n127c69pasr35p76amfczemusnutr8mtw78s8xl7"

Q=$(jq -cn --arg wallet "$WALLET" '{"get_owned_nym_node":{"address":$wallet}}' | base64 | tr -d '\n')

curl -s "$WASM/$CONTRACT/smart/$Q" | jq '{node_id: .data.details.bond_information.node_id, details_present:(.data.details != null)}'
{
  "node_id": 2196,
  "details_present": true
}

Delegator delegations (get_delegator_delegations): inspect delegations[] and start_next_after.

WASM="https://api.nymtech.net/cosmwasm/wasm/v1/contract"
CONTRACT="n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr"
WALLET="n127c69pasr35p76amfczemusnutr8mtw78s8xl7"

Q=$(jq -cn --arg wallet "$WALLET" '{"get_delegator_delegations":{"delegator":$wallet,"start_after":null,"limit":2}}' | base64 | tr -d '\n')

curl -s "$WASM/$CONTRACT/smart/$Q" | jq '{count:(.data.delegations|length), next_key:.data.start_next_after, sample:(.data.delegations[0] // {})}'
{
  "count": 1,
  "next_key": [
    2933,
    "n127c69pasr35p76amfczemusnutr8mtw78s8xl7"
  ],
  "sample": {
    "owner": "n127c69pasr35p76amfczemusnutr8mtw78s8xl7",
    "node_id": 2933,
    "cumulative_reward_ratio": "669788.748216958693826745",
    "amount": {
      "denom": "unym",
      "amount": "200977025706"
    },
    "height": 22222929,
    "proxy": null
  }
}

Pending delegator reward (get_pending_delegator_reward): inspect amount_staked, amount_earned, and bond-state flags.

Null values mean the wallet currently has no active delegation for that node, or the node is not fully bonded.

WASM="https://api.nymtech.net/cosmwasm/wasm/v1/contract"
CONTRACT="n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr"
WALLET="n127c69pasr35p76amfczemusnutr8mtw78s8xl7"
NODE_ID=2933

Q=$(jq -cn --arg wallet "$WALLET" --argjson node_id "$NODE_ID" '{"get_pending_delegator_reward":{"address":$wallet,"node_id":$node_id}}' | base64 | tr -d '\n')

curl -s "$WASM/$CONTRACT/smart/$Q" | jq '{amount_staked:.data.amount_staked.amount, amount_earned:.data.amount_earned.amount, node_still_fully_bonded:.data.node_still_fully_bonded}'
{
  "amount_staked": "200977025706",
  "amount_earned": "524574066",
  "node_still_fully_bonded": true
}

Intent: fetch the latest chain height from Tendermint REST so scanners can anchor window ranges to current chain state.


fetch_latest_height


def fetch_latest_height(
    session:Any, nyx_tm_rest:str='https://api.nymtech.net/cosmos/base/tendermint/v1beta1', timeout:int=60,
    logger:Optional[logging.Logger]=None
)->int:
# verify_fetch_latest_height
js = {"block": {"header": {"height": "42"}}}
class _Resp:
    status_code = 200
    text = "{}"
    def __init__(self, payload): self.payload = payload
    def raise_for_status(self): return None
    def json(self): return self.payload
class _Session:
    def __init__(self, payload): self.payload = payload
    def get(self, *args, **kwargs): return _Resp(self.payload)
assert fetch_latest_height(_Session(js), nyx_tm_rest="https://x", timeout=1) == 42

Intent: fetch current node metadata and rewarding state via smart queries.


get_node_metadata


def get_node_metadata(
    session:Any, node_id:int, contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', timeout:int=60,
    logger:Optional[logging.Logger]=None
)->Dict[str, Any]:
# verify_get_node_metadata
payload = {
    "data": {
        "details": {"bond_information": {"bonding_height": 123}},
        "rewarding_details": {"cost_params": {"profit_margin_percent": "0.1"}, "operator": "5"},
    }
}
class _Resp:
    status_code = 200
    text = "{}"
    def __init__(self, payload): self.payload = payload
    def raise_for_status(self): return None
    def json(self): return self.payload
class _Session:
    def __init__(self, payload): self.payload = payload
    def get(self, *args, **kwargs): return _Resp(self.payload)
out = get_node_metadata(_Session(payload), 14, nyx_wasm_rest="https://x")
assert out["bonding_height"] == 123

Intent: fetch the current delegation set for a node using paginated smart queries.


get_node_delegations_current


def get_node_delegations_current(
    session:Any, node_id:int, contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', page_limit:int=200, timeout:int=60,
    max_pages:int=30, logger:Optional[logging.Logger]=None
)->List[Dict[str, Any]]:
# verify_get_node_delegations_current
payload = {"data": {"delegations": [{"owner": "n1", "amount": {"amount": "1"}}], "start_next_after": None}}
class _Resp:
    status_code = 200
    text = "{}"
    def __init__(self, payload): self.payload = payload
    def raise_for_status(self): return None
    def json(self): return self.payload
class _Session:
    def __init__(self, payload): self.payload = payload
    def get(self, *args, **kwargs): return _Resp(self.payload)
rows = get_node_delegations_current(_Session(payload), 14, nyx_wasm_rest="https://x")
assert len(rows) == 1
assert rows[0]["owner"] == "n1"

Intent: resolve a wallet’s currently owned node id from contract state.


get_owned_node_id


def get_owned_node_id(
    session:Any, wallet_address:str, contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', timeout:int=60,
    logger:Optional[logging.Logger]=None
)->Optional[int]:
# verify_get_owned_node_id
payload = {"data": {"details": {"bond_information": {"node_id": 77}}}}
class _Resp:
    status_code = 200
    text = "{}"
    def __init__(self, payload): self.payload = payload
    def raise_for_status(self): return None
    def json(self): return self.payload
class _Session:
    def __init__(self, payload): self.payload = payload
    def get(self, *args, **kwargs): return _Resp(self.payload)
assert get_owned_node_id(_Session(payload), "n1", nyx_wasm_rest="https://x") == 77

Intent: fetch a wallet’s current delegations with pagination support.


get_delegator_delegations


def get_delegator_delegations(
    session:Any, wallet_address:str, contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', page_limit:int=200, timeout:int=60,
    max_pages:int=30, logger:Optional[logging.Logger]=None
)->List[Dict[str, Any]]:
# verify_get_delegator_delegations
payload = {"data": {"delegations": [{"node_id": 14, "height": 10}], "start_next_after": None}}
class _Resp:
    status_code = 200
    text = "{}"
    def __init__(self, payload): self.payload = payload
    def raise_for_status(self): return None
    def json(self): return self.payload
class _Session:
    def __init__(self, payload): self.payload = payload
    def get(self, *args, **kwargs): return _Resp(self.payload)
rows = get_delegator_delegations(_Session(payload), "n1", nyx_wasm_rest="https://x")
assert rows[0]["node_id"] == 14

Intent: combine cached historical links with current contract state to discover relevant node ids.


discover_wallet_nodes


def discover_wallet_nodes(
    conn:sqlite3.Connection, session:Any, wallets:List[Tuple[str, str]],
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', timeout:int=60,
    logger:Optional[logging.Logger]=None
)->Dict[int, Dict[str, Any]]:
# verify_discover_wallet_nodes
import tempfile
class _Resp:
    status_code = 200
    text = "{}"
    def __init__(self, payload): self.payload = payload
    def raise_for_status(self): return None
    def json(self): return self.payload
class _Session:
    def get(self, *args, **kwargs):
        # get_owned_nym_node and get_delegator_delegations shape
        return _Resp({"data": {"details": {}, "delegations": [], "start_next_after": None}})
with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "t.sqlite")
    init_schema(conn)
    out = discover_wallet_nodes(conn, _Session(), [("n1", "tag")], nyx_wasm_rest="https://x")
    assert isinstance(out, dict)

Intent: fetch pending delegator reward details for one (node, delegator) pair.


get_pending_delegator_reward


def get_pending_delegator_reward(
    session:Any, node_id:int, delegator:str,
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', timeout:int=60,
    logger:Optional[logging.Logger]=None
)->Dict[str, Any]:
# verify_get_pending_delegator_reward
payload = {"data": {"amount_staked": {"amount": "1"}, "amount_earned": {"amount": "2"}}}
class _Resp:
    status_code = 200
    text = "{}"
    def __init__(self, payload): self.payload = payload
    def raise_for_status(self): return None
    def json(self): return self.payload
class _Session:
    def __init__(self, payload): self.payload = payload
    def get(self, *args, **kwargs): return _Resp(self.payload)
out = get_pending_delegator_reward(_Session(payload), 14, "n1", nyx_wasm_rest="https://x")
assert out["amount_staked"]["amount"] == "1"

run_cache orchestration (wallet actions first)

Intent: update cache end-to-end with incremental behavior and explicit tradeoffs:

  • scan wallet actions incrementally (with backfill if an older range is missing),
  • discover node ids from wallet history plus current contract state,
  • refresh current-state snapshot tables,
  • scan reward/delegation/withdraw windows with strict tx paging.

Important semantic notes: - adaptive reward window sizing can improve request count but changes window boundaries, - boundary changes may reduce scan-window reuse against historical fixed-window runs, - large windows can trigger payload-limit errors; cap and logs should be treated as operational safety signals.

Intent: update the cache end-to-end:

  • scan wallet actions for each wallet (incremental)
  • discover node_ids from wallet actions + owned nodes + current delegations
  • refresh current state snapshots per node
  • scan reward/delegation/withdraw windows per node (incremental)

run_cache


def run_cache(
    data_dir:str='data', wallets_csv:str='wallet-addresses.csv', db_path:str='data/nym_cache.sqlite',
    contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
    nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract',
    nyx_tx_rest:str='https://api.nymtech.net/cosmos/tx/v1beta1/txs',
    nyx_tm_rest:str='https://api.nymtech.net/cosmos/base/tendermint/v1beta1',
    window_size:Optional[int]=None, # window sizing
    window_size_reward:int=10000, window_size_delegation:int=10000, window_size_withdraw:int=20000,
    wallet_window_size:int=100000, page_size:int=100, # paging
    max_pages:int=200, timeout:int=60, force:bool=False, # behavior
    wallet_start_height:int=0, log_file:Optional[str]='nym_cache.log', log_level:str='INFO'
)->int:
# verify_run_cache_helpers
assert _adaptive_reward_window_size(start_height=1, end_height=100, base_window_size=10000) == 10000
assert _adaptive_reward_window_size(start_height=1, end_height=5_000_000, base_window_size=10000) <= 20000

Deterministic unit checks (offline)

This section performs deterministic offline checks:

  • creates a temp DB and initializes schema,
  • inserts one row into each event table (including wallet_actions),
  • records node + wallet scan windows,
  • verifies idempotency for inserts and window checks.
import tempfile
with tempfile.TemporaryDirectory() as td:
    conn = connect_db(Path(td) / "unit.sqlite")
    init_schema(conn)

    reward_row = {
        "node_id": 14,
        "height": 21312001,
        "txhash": "R1",
        "timestamp": "2026-01-01T00:00:00Z",
        "epoch": 1,
        "prior_unit_reward": "1",
        "prior_delegates": "1",
        "delegates_reward": "1",
        "operator_reward": "1",
        "msg_index": 0,
        "contract_address": DEFAULT_MIXNET_CONTRACT,
    }
    delegation_row = {
        "node_id": 14,
        "height": 21312002,
        "txhash": "D1",
        "timestamp": "2026-01-01T00:00:01Z",
        "event_type": "wasm-v2_delegation",
        "delegator": "n1deleg",
        "delta_amount_unym": "100",
        "contract_address": DEFAULT_MIXNET_CONTRACT,
    }
    withdraw_row = {
        "node_id": 14,
        "height": 21312003,
        "txhash": "W1",
        "timestamp": "2026-01-01T00:00:02Z",
        "event_type": "wasm-v2_withdraw_delegator_reward",
        "delegator": "n1deleg",
        "amount_unym": "10",
        "contract_address": DEFAULT_MIXNET_CONTRACT,
    }
    wallet_row = {
        "wallet_address": "n1wallet",
        "height": 21312004,
        "txhash": "T1",
        "timestamp": "2026-01-01T00:00:03Z",
        "action_kind": "transfer_receive",
        "event_type": "transfer",
        "node_id": None,
        "amount_unym": "7",
        "msg_index": -1,
        "contract_address": "",
    }

    assert insert_reward_rows(conn, [reward_row]) == 1
    assert insert_reward_rows(conn, [reward_row]) == 0

    assert insert_delegation_rows(conn, [delegation_row]) == 1
    assert insert_delegation_rows(conn, [delegation_row]) == 0

    assert insert_withdraw_rows(conn, [withdraw_row]) == 1
    assert insert_withdraw_rows(conn, [withdraw_row]) == 0

    assert insert_wallet_action_rows(conn, [wallet_row]) == 1
    assert insert_wallet_action_rows(conn, [wallet_row]) == 0

    upsert_node_metadata(conn, node_id=14, bonding_height=21300000, unit_delegation="1", contract=DEFAULT_MIXNET_CONTRACT)
    upsert_current_rewarding_state(conn, node_id=14, total_unit_reward="12", unique_delegations=2)
    replace_current_delegation_state(conn, node_id=14, rows=[{"owner": "n1deleg", "amount": {"amount": "100"}, "cumulative_reward_ratio": "0"}])
    replace_current_pending_reward_state(conn, node_id=14, rows=[{
        "delegator": "n1deleg",
        "amount_staked_unym": "100",
        "amount_earned_unym": "1",
        "amount_earned_detailed": "1.0",
        "node_still_fully_bonded": True,
        "mixnode_still_fully_bonded": True,
    }])
    upsert_wallet_node_link(conn, wallet_address="n1wallet", node_id=14, relation="delegator", start_height=21312002)

    record_scan_window(conn, node_id=14, event_kind="reward", start_height=21312000, end_height=21312999, status="ok", endpoint=DEFAULT_NYX_TX_REST)
    record_wallet_scan_window(conn, wallet_address="n1wallet", event_kind="wallet_actions", start_height=21312000, end_height=21312999, status="ok", endpoint=DEFAULT_NYX_TX_REST)

    conn.commit()

    assert window_done(conn, node_id=14, event_kind="reward", start_height=21312000, end_height=21312999)
    assert wallet_window_done(conn, wallet_address="n1wallet", event_kind="wallet_actions", start_height=21312000, end_height=21312999)
    assert max_scanned_end_height(conn, node_id=14, event_kind="reward") == 21312999
    assert max_wallet_scanned_end_height(conn, wallet_address="n1wallet", event_kind="wallet_actions") == 21312999

    assert len(_query_cached_rows(conn, "reward_events", 14)) == 1
    assert len(_query_cached_rows(conn, "delegation_events", 14)) == 1
    assert len(_query_cached_rows(conn, "withdraw_events", 14)) == 1
    assert len([dict(r) for r in conn.execute("SELECT * FROM wallet_actions").fetchall()]) == 1
  • Report an issue