seed = SeedWallet(address="n1abc", tag="demo")
assert seed.address == "n1abc"
assert _dec_str(Decimal("10.5000")) == "10.5"
assert _to_decimal("12") == Decimal("12")Epoch-by-epoch reward replay
Imports + constants
Intent: define shared constants, typed row models, and reusable parsing helpers for replay and export.
NodeEpochTotalRow
def NodeEpochTotalRow(
data:Any
)->None:
Node-level epoch totals aggregated from replayed reward events.
EpochEarningRow
def EpochEarningRow(
data:Any
)->None:
Per-wallet, per-node, per-epoch replayed earning breakdown.
SeedWallet
def SeedWallet(
data:Any
)->None:
Wallet seed row used to start cache-backed epoch replay.
Cache-first seed wallet loading
Intent: load seed wallets from CSV and keep wallet_tag mapping stable for export sheets.
seed_wallet_tag_map
def seed_wallet_tag_map(
seed_wallets:Sequence[SeedWallet]
)->Dict[str, str]:
load_seed_wallets
def load_seed_wallets(
wallets_csv:str | Path
)->List[SeedWallet]:
ws = [SeedWallet(address="n1a", tag="A"), SeedWallet(address="n1b", tag="")]
assert seed_wallet_tag_map(ws)["n1a"] == "A"Watch-node discovery (cache first)
Intent: discover watch nodes using cache tables first, then fallback to get_owned_nym_node only when operator relation is missing.
discover_watch_nodes
def discover_watch_nodes(
conn:sqlite3.Connection, seed_wallets:Sequence[SeedWallet], session:Any,
contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', timeout:int=60,
logger:Optional[logging.Logger]=None
)->Tuple[set[int], Dict[int, str]]:
resolve_owner_wallet_for_node
def resolve_owner_wallet_for_node(
session:Any, node_id:int, contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract', timeout:int=60,
logger:Optional[logging.Logger]=None
)->str:
conn = sqlite3.connect(":memory:")
conn.execute("CREATE TABLE wallet_nodes (wallet_address TEXT, node_id INTEGER, relation TEXT, start_height INTEGER, updated_at TEXT)")
conn.execute("CREATE TABLE delegation_events (node_id INTEGER, delegator TEXT)")
conn.execute("CREATE TABLE withdraw_events (node_id INTEGER, delegator TEXT)")
conn.execute("CREATE TABLE wallet_actions (wallet_address TEXT, node_id INTEGER, height INTEGER)")
conn.execute("INSERT INTO wallet_nodes VALUES ('n1a', 14, 'delegator', 1, '')")
conn.execute("INSERT INTO wallet_nodes VALUES ('n1a', 9, 'operator', 1, '')")
conn.execute("INSERT INTO delegation_events VALUES (2933, 'n1a')")
conn.execute("INSERT INTO withdraw_events VALUES (2196, 'n1a')")
conn.commit()
nodes, owners = discover_watch_nodes(conn, [SeedWallet(address='n1a', tag='')], session=object())
assert {9, 14, 2933, 2196}.issubset(nodes)
assert owners[9] == 'n1a'
conn.close()Canonical reward-event selection
Intent: enforce one canonical reward event per (node_id, epoch) using (height, msg_index, txhash) ordering, and keep traceability fields for every epoch row.
rows = [
{"epoch": 7, "height": 20, "msg_index": 1, "txhash": "b"},
{"epoch": 7, "height": 20, "msg_index": 0, "txhash": "c"},
{"epoch": 7, "height": 19, "msg_index": 9, "txhash": "a"},
]
out = _canonical_reward_rows(rows)
assert len(out) == 1
assert out[0]["txhash"] == "a"Replay math helpers
Intent: apply event-sourced delegation state transitions and reward-split formulas exactly at reward heights.
pending_unym
def pending_unym(
a:Decimal, c:Decimal, U:Decimal, D:Decimal=Decimal('1000000000')
)->Decimal:
delegator_reward_epoch
def delegator_reward_epoch(
a:Decimal, c:Decimal, dU:Decimal, D:Decimal=Decimal('1000000000')
)->Decimal:
delta_u
def delta_u(
R:Decimal, U:Decimal, D:Decimal, P:Decimal
)->Decimal:
stake_value_unym
def stake_value_unym(
a:Decimal, c:Decimal, U:Decimal, D:Decimal=Decimal('1000000000')
)->Decimal:
a = Decimal("100")
c = Decimal("0")
U = Decimal("0")
R = Decimal("10")
P = Decimal("100")
dU = delta_u(R, U, DELEGATION_DENOMINATOR, P)
assert delegator_reward_epoch(a, c, dU) == Decimal("10")
assert pending_unym(a, c, U) == Decimal("0")Node replay
Intent: replay one node from cache events and produce wallet-epoch rows + node totals from the same canonical reward stream.
replay_node_epoch_rows
def replay_node_epoch_rows(
node_id:int, owner_wallet:str, reward_rows:Sequence[Dict[str, Any]], delegation_rows:Sequence[Dict[str, Any]],
withdraw_rows:Sequence[Dict[str, Any]], current_total_unit_reward:Decimal, D:Decimal=Decimal('1000000000')
)->Dict[str, Any]:
demo = replay_node_epoch_rows(
node_id=1,
owner_wallet="n1owner",
reward_rows=[
{"epoch": 1, "height": 10, "timestamp": "2026-01-01T00:00:00Z", "txhash": "tx1", "msg_index": 0,
"prior_unit_reward": "0", "prior_delegates": "100", "delegates_reward": "10", "operator_reward": "2"}
],
delegation_rows=[
{"height": 5, "event_type": "wasm-v2_delegation", "delegator": "n1d", "delta_amount_unym": "100"}
],
withdraw_rows=[],
current_total_unit_reward=Decimal("0"),
)
assert len(demo["wallet_rows"]) == 2
assert any(r["path"] == "operator_reward" for r in demo["wallet_rows"])
assert any(r["path"] == "delegator_reward" for r in demo["wallet_rows"])Involved-wallet closure and consistency checks
Intent: derive full wallet scope for Sheet 2 from watch nodes and enforce node-epoch reconciliation (delegator splits ~= delegates_reward).
check_node_epoch_consistency
def check_node_epoch_consistency(
all_wallet_rows:Sequence[Dict[str, Any]], node_totals:Sequence[Dict[str, Any]], logger:logging.Logger,
rel_tol:Decimal=Decimal('1E-8'), abs_tol:Decimal=Decimal('0.000001')
)->None:
involved_wallets_for_watch_nodes
def involved_wallets_for_watch_nodes(
conn:sqlite3.Connection, watch_nodes:Sequence[int], owner_by_node:Dict[int, str]
)->set[str]:
rows = [{"node_id": 1, "epoch": 1, "path": "delegator_reward", "amount_unym": "10"}]
node = [{"node_id": 1, "epoch": 1, "delegates_reward_unym": "10"}]
logger = logging.getLogger("verify")
check_node_epoch_consistency(rows, node, logger=logger)
assert callable(involved_wallets_for_watch_nodes)
assert callable(check_node_epoch_consistency)Pending-value comparison against API + cache
Intent: compare replay-derived pending values against live smart-query values for all involved wallets on watched nodes, and against cached pending state for traceability. Always log INFO summaries and emit WARNING on tolerance mismatches.
compare_replay_vs_live_pending
def compare_replay_vs_live_pending(
session:Any, conn:sqlite3.Connection, seed_wallets:Sequence[SeedWallet], watch_nodes:Sequence[int],
owner_by_node:Dict[int, str], replay_pending_delegator:Dict[Tuple[str, int], Decimal],
replay_pending_operator:Dict[str, Decimal], contract:str, nyx_wasm_rest:str, timeout:int, logger:logging.Logger
)->None:
assert callable(_cached_pending_delegator_totals)
assert callable(compare_replay_vs_live_pending)Excel writer
Intent: write all required sheets from one replay result set without float coercion for amount_unym.
write_epoch_by_epoch_excel
def write_epoch_by_epoch_excel(
out_path:str | Path, wallet_seed_rows:Sequence[Dict[str, Any]], wallet_all_rows:Sequence[Dict[str, Any]],
node_total_rows:Sequence[Dict[str, Any]]
)->None:
assert callable(write_epoch_by_epoch_excel)Main entrypoint
Intent: run cache refresh first, replay watch nodes from cached events, emit three-sheet Excel output, and warn on replay-vs-live mismatches.
run_epoch_by_epoch
def run_epoch_by_epoch(
data_dir:str='data', wallets_csv:str='wallet-addresses.csv', out_xlsx:str='epoch_by_epoch_rewards.xlsx',
db_path:str='data/nym_cache.sqlite', contract:str='n17srjznxl9dvzdkpwpw24gg668wc73val88a6m5ajg6ankwvz9wtst0cznr',
nyx_wasm_rest:str='https://api.nymtech.net/cosmwasm/wasm/v1/contract',
window_size:Optional[int]=None, # cache refresh knobs
window_size_reward:int=10000, window_size_delegation:int=10000, window_size_withdraw:int=20000,
wallet_window_size:int=100000, wallet_start_height:int=0, page_size:int=100, max_pages:int=200, timeout:int=60,
force:bool=False, log_file:Optional[str]='nym_epoch_by_epoch.log', log_level:str='INFO'
)->int:
assert callable(run_epoch_by_epoch)Live run example
This cell demonstrates a real cache-refresh + replay invocation against project data.
# run_epoch_by_epoch(
# data_dir="data",
# wallets_csv="wallet-addresses.csv",
# out_xlsx="epoch_by_epoch_rewards.xlsx",
# )