def reward_query_window(h_start: int, h_end: int) -> str:
return (
f"tx.height>={int(h_start)} AND tx.height<={int(h_end)} "
f"AND {REWARD_EVENT_TYPE}._contract_address='{MIXNET_CONTRACT}'"
)
def extract_reward_rows(txr: dict[str, Any]) -> list[dict[str, Any]]:
"""
Extract reward events from a single tx_response.
Returns one row per reward event inside the tx.
"""
out: list[dict[str, Any]] = []
height = int(txr.get("height") or 0)
txhash = str(txr.get("txhash") or "")
timestamp = str(txr.get("timestamp") or "")
for ev in extract_events(txr):
if ev.get("type") != REWARD_EVENT_TYPE:
continue
attrs = ev.get("attributes") or {}
if attrs.get("_contract_address") != MIXNET_CONTRACT:
continue
try:
node_id = int(attrs.get("node_id") or 0)
epoch = int(attrs.get("interval_details") or 0)
prior_delegates = Decimal(attrs.get("prior_delegates") or "0")
delegates_reward = Decimal(attrs.get("delegates_reward") or "0")
operator_reward = Decimal(attrs.get("operator_reward") or "0")
msg_index = int(attrs.get("msg_index") or -1)
except Exception:
continue
out.append(
{
"node_id": node_id,
"epoch": epoch,
"height": height,
"txhash": txhash,
"timestamp": timestamp,
"msg_index": msg_index,
# store as strings for stable csv roundtrips
"prior_delegates_unym": str(prior_delegates),
"delegates_reward_unym": str(delegates_reward),
"operator_reward_unym": str(operator_reward),
}
)
return out
def scan_reward_events(session, start_height: int, end_height: int) -> pd.DataFrame:
"""
Scan reward events in [start_height, end_height], windowed by HEIGHT_WINDOW_SIZE.
Canonicalizes to one row per (node_id, epoch).
"""
rows: list[dict[str, Any]] = []
h_end = int(end_height)
total_h = max(0, h_end - int(start_height) + 1)
pbar = tqdm(total=total_h, desc="scan reward windows", unit="height")
while h_end >= start_height:
h_start = max(int(start_height), h_end - HEIGHT_WINDOW_SIZE + 1)
query = reward_query_window(h_start, h_end)
txs = tx_search_all_pages(
session,
nyx_tx_rest=NYX_TX_REST,
query=query,
page_size=TX_PAGE_SIZE,
max_pages=TX_MAX_PAGES,
timeout=HTTP_TIMEOUT_S,
strict=False,
)
for txr in txs:
rows.extend(extract_reward_rows(txr))
pbar.update(h_end - h_start + 1)
h_end = h_start - 1
pbar.close()
if not rows:
return pd.DataFrame(
columns=[
"node_id",
"epoch",
"height",
"txhash",
"timestamp",
"msg_index",
"prior_delegates_unym",
"delegates_reward_unym",
"operator_reward_unym",
]
)
df = pd.DataFrame(rows).drop_duplicates(subset=["node_id", "epoch", "height", "txhash", "msg_index"])
# normalize dtypes
df["node_id"] = pd.to_numeric(df["node_id"], errors="coerce").astype("Int64")
df["epoch"] = pd.to_numeric(df["epoch"], errors="coerce").astype("Int64")
df["height"] = pd.to_numeric(df["height"], errors="coerce").astype("Int64")
df["msg_index"] = pd.to_numeric(df["msg_index"], errors="coerce").astype("Int64")
df = df.dropna(subset=["node_id", "epoch", "height"]).copy()
# canonical per (node_id, epoch): smallest (height, msg_index, txhash)
df = df.sort_values(["node_id", "epoch", "height", "msg_index", "txhash"])
df = df.drop_duplicates(subset=["node_id", "epoch"], keep="first").reset_index(drop=True)
return df