Overview
The Cellframe Node extensions provide tools for interacting with networks, chains, ledgers, and mempool. These extensions allow you to retrieve network data, manage states, handle datums and atoms, process transactions, and enable notifications for various events. More details about networks can be found in the architecture overview.
Basic Functionality
This example demonstrates how to use these Python extensions to:
- Retrieve network information using the
Net
class. - Process atoms and datums from a chain.
- Interact with the ledger to get transaction and emission information.
Below is the complete code for these operations:
from CellFrame.Network import Net
from DAP.Core import logIt
from CellFrame.Consensus import DAG, Block
from CellFrame.Common import DatumEmission
from CellFrame.Chain import ChainAtomPtr, Chain
from itertools import islice, chain as itertools_chain
net_name = "riemann"
net = Net.byName(net_name)
NUM_OF_DATUMS_ATOMS = 5
def net_info(net) -> None:
logIt.notice(f"The Network {str(net)} data:")
logIt.notice(f"My addr: {net.getCurAddr()}")
# Since net.id represents NetID instance,
# use method "long" to see a value.
logIt.notice(f"ID: {str(net.id.long)}")
logIt.notice(f"Chains: {[str(chain) for chain in net.chains]}")
logIt.notice(f"GDB group alias: {net.gdb_group_alias}")
# The fee data can also be output using the command:
# cellframe-node-cli net -net <name of network> get fee
# If network is offline, the following values will be
# shown as zeros, or nones.
logIt.notice(f"Transaction fee: {net.txFee}")
logIt.notice(f"Transaction fee address: {net.txFeeAddr}")
logIt.notice(f"The average validator fee: {net.validatorAverageFee}")
logIt.notice(f"The maximum validator fee: {net.validatorMaxFee}")
logIt.notice(f"The minimum validator fee: {net.validatorMinFee}")
logIt.notice(f"The native ticker: {net.nativeTicker}")
def get_atom_hash(atom: ChainAtomPtr):
# To get atom hash transorm it to `Block` or `DAG` object,
# using `fromAtom` method:
try:
block = Block.fromAtom(atom)
event = DAG.fromAtom(atom)
if block:
hash = str(block.hash)
elif event:
hash = str(event.hash)
else:
raise ValueError("Not found block or event from atom")
return hash
except ValueError as e:
logIt.error(str(e))
def get_first_n_atoms(chain: Chain, n: int):
# Create an atom iterator.
# Setting with_treshold to False means the iterator will
# not include thresholds, which are temporary storage for
# events that do not yet meet the criteria to be included
# in the main list.
atom_iter = chain.createAtomIter(False)
def atoms_generator():
# Get the first atom
atom, size = chain.atomIterGetFirst(atom_iter)
while atom and size:
yield atom, size
# Get the next atom
atom, size = chain.atomIterGetNext(atom_iter)
# Use islice to get the first n atoms.
# For more detais: https://docs.python.org/3/library/itertools
first_n_atoms = list(islice(atoms_generator(), n))
if not first_n_atoms:
logIt.notice("No atoms found.")
return []
return first_n_atoms
def get_first_n_datums(chain, n):
# Get the first n atoms
atoms = get_first_n_atoms(chain, n)
# Extract datums from atoms and combine them into a single iterator
all_datums = itertools_chain.from_iterable(chain.atomGetDatums(atom) for atom, size in atoms)
# Use islice to get the first n datums
first_n_datums = list(islice(all_datums, n))
if not first_n_datums:
logIt.notice("No transactions found.")
return []
return first_n_datums
def get_first_n_transactions(chain, n):
transactions = []
# Get the first n atoms
atoms = get_first_n_atoms(chain, n)
# Extract datums from atoms and combine them into a single iterator
all_datums = itertools_chain.from_iterable(chain.atomGetDatums(atom) for atom, size in atoms)
# Use islice to get the first n datums
datums = list(all_datums)
if not datums:
logIt.notice("No transactions found.")
return []
else:
for datum in datums:
if datum.isDatumTX() and len(transactions) <= n:
datum_tx = datum.getDatumTX()
transactions.append(datum_tx)
return transactions
def get_datum_emission(chain) -> DatumEmission | None:
# Get the first 100 atoms
atoms = get_first_n_atoms(chain, 100)
# Extract datums from atoms and combine them into a single iterator
all_datums = itertools_chain.from_iterable(chain.atomGetDatums(atom) for atom, size in atoms)
# Use islice to get the first n datums
datums = list(all_datums)
if not datums:
return None
for datum in datums:
if datum.isDatumTokenEmission():
return datum.getDatumTokenEmission() # DatumEmission
return None
def get_transaction_info(tx):
hash = tx.hash
date = tx.dateCreated
logIt.notice(f"Transaction with hash: {hash}, created at {date}")
return
def chain_info(chain, num) -> None:
logIt.notice(f"The Chain {str(chain)} data")
logIt.notice(f"Consensus type: {chain.getCSName()}")
logIt.notice(f"Numder of atoms in chain: {chain.countAtom()}")
# There are two ways to get atoms:
# The first is to use method getAtoms(count: int, page: int, reverse: bool).
# Count parameter - specifies the number of atoms per page.
# Page - the number of page from which the atoms will be received.
# Reverse - the order of the atoms output.
# All arguments must be posotional, not keywords.
logIt.notice(f"First {num} atoms:")
logIt.notice(f"Next {num} atoms received by `getAtoms` method")
for atom in chain.getAtoms(num, 1, False):
logIt.notice(f"Atom hash: {get_atom_hash(atom)}")
# The second way is to use iterations across all atoms.
# Look the implementation in the get_first_n_atoms function.
logIt.notice(f"Next {num} atoms received by using iterations")
for atom, size in get_first_n_atoms(chain, num):
logIt.notice(f"Atom hash: {get_atom_hash(atom)}, size: {size}")
logIt.notice(f"First {num} datums:")
for datum in get_first_n_datums(chain, num):
logIt.notice(f"Datum hash: {datum.hash}")
logIt.notice(f"First {num} transactions:")
for tx in get_first_n_transactions(chain, num):
get_transaction_info(tx)
def ledger_info(chain, ledger, num):
logIt.notice("The Ledger data")
transactions = ledger.getTransactions(3, 1, False, True)
logIt.notice("Transactions from Ledger")
if not transactions:
logIt.notice("No transactions found.")
return []
for tx in transactions[:num]:
get_transaction_info(tx)
ledger_tx_ticker = ledger.txGetMainTickerAndLedgerRc(tx)[0]
ledger_tx_responce = ledger.txGetMainTickerAndLedgerRc(tx)[1]
logIt.notice(f"Ticker: {ledger_tx_ticker}, Ledger responce: {ledger_tx_responce}")
one_tx = transactions[0]
ticker = ledger.txGetMainTickerAndLedgerRc(one_tx)[0]
pkeys_hashes = ledger.tokenAuthPkeysHashes(ticker) # list[HashFast]
signs_valid = ledger.tokenAuthSignsValid(ticker) # int
auth_signs_total = ledger.tokenAuthSignsTotal(ticker) # int
# Retrieve information about specified transaction from ledger
logIt.notice(f"Check for signature signs for {tx.hash}")
logIt.notice(f"The number of valid signature signs: {signs_valid}")
logIt.notice(f"The number of total signature signs: {auth_signs_total}")
logIt.notice("The hashes of public keys for signing ticker emission:")
for hash in pkeys_hashes:
logIt.notice(f"{hash}")
emission = get_datum_emission(chain)
if emission is None:
logIt.notice("No emission found")
else:
logIt.notice(f"Emission with hash: {emission.hash}")
if ledger.has_emission(emission.hash):
logIt.notice("exists in the ledger")
else:
logIt.notice("does not exist in the ledger")
def init():
net_name = "riemann"
raiden_net = Net.byName(net_name)
# It is possible to get Ledger in a several ways:
# Use getLedger() method for the specified Net instance:
raiden_ledger = raiden_net.getLedger()
# Or use static method ledgerByNetName:
# foobar_ledger = Net.ledgerByNetName("foobar")
main_chain = raiden_net.getChainByName("main")
zero_chain = raiden_net.getChainByName("zerochain")
NUM_OF_DATUMS_ATOMS = 5
net_info(raiden_net)
chain_info(main_chain, NUM_OF_DATUMS_ATOMS)
chain_info(zero_chain, NUM_OF_DATUMS_ATOMS)
ledger_info(main_chain, raiden_ledger, NUM_OF_DATUMS_ATOMS)
return 0