Requirements: Nitro Espresso Streamer Requirements
Contents |
This page specifies the design of the Nitro Espresso streamer component of the Arbitrum Nitro integration . This component is responsible for fetching messages from the query nodes and then adding them to the messagesWithMetadataAndPos array. So that these messages can be processed by the Nitro caffeinated node and Nitro batcher
The streamer requires these parameters, which help define the behavior of the chain derivation from Espresso and thus, conceptually, must be part of the onchain configuration of the rollup:
Namespace: The namespace for the chain.
sequencerKey: The public key of the rollup’s centralized sequencer, which it uses to sign messages in the sequencer feed.
In practice these parameters are committed onchain indirectly, because they will
be part of the TEE configuration for running the Nitro batcher , and this
configuration is committed to in the TEE verifier contract via the MR_ENCLAVE
hash.
In addition, the streamer takes the following parameters which are needed for performance/liveness only, but are not used in defining the chain derivation:
Hotshot URLs: The URLs of the query nodes.
L1 Reader: The L1 reader for the chain.
And it uses these state variables:
nextHotshotBlockNum: The next block number to fetch from the query nodes.
currentMessagePos: The current message position that needs to be processed.
msgBuf: Buffered messages with metadata (message position and the HotShot block number the message came from). These are stored in position order, but may have gaps as messages are received out of order.
The primary interface to the streamer is a pair of functions to view and consume the next message:
1 func (s *EspressoStreamer) Peek() (*MessageWithMetadataAndPos, error) 2 func (s *EspressoStreamer) Next() (MessageWithMetadataAndPos, error)
Peek
is used when processing of the message might fail and need to be retried. For
example, the Nitro caffeinated node uses Peek
to get the next message, processes it
and stores the resulting state in the database, which might fail, and then calls Next
to advance the stream only after the current message has been processed
successfully.
Tech Debt:
Currently, only Next
is implemented, so the Nitro caffeinated node
optimistically advances the stream when it gets the next message, and
then uses a Reset
interface to move the stream back to the prior position
in case processing of that message fails. This is a clunkier and more
error-prone API than Peek
/Next
.
Peek
works by checking if the next message is already available in the buffer and, if
not, consuming the next block from Espresso until either the message is buffered or
there is no more data available from Espresso, in which case it returns an error
indicating there is no message available at this time. In this case, the caller should
retry after some delay.
Peek
1 for s.msgBuf[0].Pos != s.currentMessagePos { 2 if err := s.nextEspressoBlock(); err != nil { 3 return nil, err 4 } 5 } 6 return &s.msgBuf[0], nil
Tech Debt:
In the current implementation, polling Espresso for new blocks is not
triggered by calls to Peek
, but instead happens asynchronously in a
polling task. Peek
/Next
only check for messages already in the buffer.
The design specified here is better because
Determinism is a primary responsiblity of the Espresso streamer, and it is much easier to reason about this when there are no internal asynchronous tasks where timing could creep in as a source of non-determinism
a polling task adds an unnecessary configuration parameter (how frequently to poll) on which overall latency becomes somewhat sensitive
a long term goal is to authenticate the output of the Espresso streamer with a SNARK. This is much easier to do if the streamer is written as a simple function of inputs and outputs, rather than an autonomous task with a reliance on an asynchronous runtime
Next
uses Peek
to check if the next message is available and, if so, removes it from
the buffer, advancing the streamer state.
1 func (s *EspressoStreamer) Next() (MessageWithMetadataAndPos, error) { 2 msg, err := s.Peek() 3 if err != nil { 4 return MessageWithMetadataAndPos{}, err 5 } 6 msg := *msg 7 s.msgBuf = s.msgBuf[1:] 8 s.currentMessagePos++ 9 return msg, nil 10 }
This is an internal function used to consume the next block from Espresso and
add any relevant messages to the buffer. It is used by Peek
when the next Nitro
message is not yet buffered.
Its job is to
Fetch the next block from HotShot. If no new block is available, or if some error is encountered when fetching the block, it returns an error without modifying the streamer state, indicating that the caller should try again later.
Parse messages intended for this rollup from the payload of the block
Verify messages; ignore any without a valid sequencer signature
Filter out duplicate messages: any message with a position that we have already processed or already buffered from an earlier HotShot block is ignored. It does this in two ways:
s.currentMessagePos
) are checked against
the messages already in s.msgBuf
to see if there is already a message
with the same position
s.currentMessagePos
, so the streamer is not required to retain large
amounts of old historyBuffer messages in order by position
Tech Debt:
Currently we incur O(N) work to check for duplicates and O(N log N) to sort the list, every time we receive a new message. Both of these can be handled with just O(N) by doing a binary search insert-into-already-sorted list operation, and simply aborting if we find there is already a message with the same pos at the insertion site.
1 func (s *EspressoStreamer) nextEspressoBlock() error { 2 arbTxns, err := s.fetchTransactionsInBlockFromAllClients() 3 /* propagate errors */ 4 5 for _, tx := range arbTxns { 6 indices, messages, signatures := arbutil.ParseHotShotPayload(tx) 7 for i, message := range messages { 8 // Skip messages that are not signed by the centralized sequencer. 9 if !VerifySequencerSignature(s.sequencerKey, message, indices[i], signatures[i]) { 10 continue 11 } 12 13 // Skip messages that are before the last message position 14 if indices[i] < s.currentMessagePos { 15 continue 16 } 17 18 // Only append to the slice if the message is not a duplicate 19 if findMessageIndexByPos == -1 { 20 s.msgBuf = append(s.msgBuf, &MessageWithMetadataAndPos{ 21 MessageWithMeta: messageWithMetadata, 22 Pos: indices[i], 23 HotshotHeight: s.nextHotshotBlockNum, 24 }) 25 } 26 } 27 } 28 29 // Sort the messagesWithMetadataAndPos based on ascending order 30 // This is to ensure that we process messages in the correct order 31 sort.SliceStable(s.msgBuf, func(i, j int) bool { 32 return s.msgBuf[i].Pos < s.msgBuf[j].Pos 33 }) 34 35 s.nextHotshotBlockNum += 1 36 return nil 37 }
Sequencing Sovereignty and Permissionless Batching
Note the importance of the sequencer signature check, especially as it relates to the Permissionless Batching requirement. In the vanillo Nitro stack, the power to sequence batches is actually vested in the batcher. The batcher holds the key controlling the unique L1 EOA which is authorized by the inbox contract to post batches, and whatever is posted to this contract ultimately controls the sequence of batches which the rollup will finalize. In the usual configuration, the batcher and sequencer are operated by the same party, so this ”batcher sovereignty” property extends to ”sequencer sovereignty”.
Our integration removes enforcement at the inbox contract that only one batcher key is allowed to post batches, but we want to retain centralized sequencing sovereignty, so that any of the permissionless set of batchers is bound to post only batches which have been produced and authorized by the centralized sequencer. The sequencer signature check solves this, since it is performed using the key pair that is actually controlled by the sequencer, as part of the sequencer feed mechanism.
Thus, we have this chain of sovereignty from sequencing to settlement:
A streamer checkpoint consists of just two fields:
1 type Checkpoint struct { 2 messagePos uint64 3 espressoHeight uint64 4 }
This makes them easy to persist and load, whether on L1 where storage is expensive, or in a database. The two fields represent, respectively:
Loading a checkpoint means creating a new streamer to replay HotShot blocks
from espressoHeight
. Setting espressoHeight
to the lowest block containing any buffered
messages at checkpoint time ensures we will not miss any important messages on
replay, while saving the Nitro messagePos
ensures we will correctly ignore any old Nitro
messages that also get included in the replay that the stream had already yielded
before the checkpoint.
Specifically, when creating a streamer from a checkpoint, we simply set the state variables as follows:
nextHotshotBlockNum: checkpoint.espressoHeight
currentMessagePos: checkpoint.messagePos + 1
msgBuf: nil
Note that this is not necessarily equivalent to the state the streamer was in when
the checkpoint was created. If messages were committed on Espresso out of order, it
is possible that the checkpoint was created while the streamer had msgBuf
non-empty
and nextHotshotBlockNum
greater than we are setting it here. However, the
restored streamer should yield the same sequence of messages starting from
checkpoint.messagePos + 1
as the original streamer would. Now we give a sketch of a
proof of this important property.
In this proof sketch, we assume the streamer is broadly correct apart from restarts/ restoration from a checkpoint. That is, assume a streamer which is started with an Espresso height h and Nitro message position p will yield, for each position i ≥ p, the earliest valid message with position i in the Espresso block stream starting at h. Read Streaming Messages and try to convince yourself this is true. Our goal is to show that restoring a streamer from a checkpoint does not disrupt this property with respect to the messages yielded after the checkpoint.
Consider an original streamer started from genesis, with a checkpoint taken at Espresso height HC and message position PC, and a second restored streamer started from this checkpoint. We aim to prove for all p > PC (1), the restored streamer and the original streamer yield the same message at position p.
Suppose the restored streamer yields message M at position p. The original streamer only yields a different message M′ with position p if M′ appears earlier in the Espresso stream than M. This must occur at an Espresso block h < HC (2), or else the restored streamer would also yield M′.
However, this is impossible: were it so, then the original streamer would have had M′ buffered when the checkpoint was taken at position PC: M′ would have already been buffered, because the checkpoint was taken at Espresso block HC > h (2), and would not yet have been yielded because the checkpoint was taken at position PC < p (1). But the checkpoint uses Espresso height HC, by definition the lowest Espresso height in the buffer at checkpoint time, so an Espresso height h < HC could not have been in the buffer. Therefore, the original streamer must also yield M.
This function checks if the msgBuf array contains the message with the given position and return the index of the message if it exists, otherwise return -1.
1 func (s *EspressoStreamer) findMessageIndexByPos(pos uint64) int { 2 for i, msg := range s.msgBuf { 3 if msg.Pos == pos { 4 return i 5 } 6 } 7 return -1 8 }
This function fetches the transactions in a block from all the query node clients and returns the transactions.
Tech Debt:
Eventually we should replace this ”majority rules” approach with just verifying the proofs directly, and we only need to talk to one query service. Currently we dont do this is because we need the query service to provide us with the latest header and a proof that the specific header is finalized.
1 func (s *EspressoStreamer) fetchTransactionsInBlockFromAllClients(ctx context.Context) ([]espressoTypes.Bytes, error) { 2 3 transactionsMap := make(map[string]TransactionsInBlockWithCount) 4 5 for _, client := range s.espressoClients { 6 txs, err := client.FetchTransactionsInBlock(ctx, s.nextHotshotBlockNum, s.namespace) 7 if err != nil { 8 return nil, err 9 } 10 txsHash := arrayToHash(txs.Transactions) 11 if transactionsMap[txsHash].count > 0 { 12 transactionsMap[txsHash] = TransactionsInBlockWithCount{ 13 count: transactionsMap[txsHash].count + 1, 14 transactions: transactionsMap[txsHash].transactions, 15 } 16 } else { 17 transactionsMap[txsHash] = TransactionsInBlockWithCount{ 18 count: 1, 19 transactions: transactionsMap[txsHash].transactions, 20 } 21 } 22 } 23 24 for _, txInBlockWithCount := range transactionsMap { 25 if txInBlockWithCount.count > len(s.espressoClients)/2 { 26 return txInBlockWithCount.transactions, nil 27 } 28 } 29 30 return nil, fmt.Errorf("unable to find the transactions which have a majority") 31 }
Note that because an honest committee of Espresso nodes will always
return the same response for a given block number, results of this function
are cacheable, using s.nextHotshotBlockNum
as a caching key. Generally, this
function will only be called once for each Espresso block. However, in case an
Espresso streamer needs to be reset to a checkpoint, caching the relevant
transactions in each Espresso block can greatly speed up replay from that earlier
checkpoint, because on replay the cache will be hit and we will skip multiple
HTTP requests to query nodes for each Espresso block. Such caching can be
added transparently without complicating the streamer logic at all, since
fetchTransactionsInBlockFromAllClients
presents the same interface regardless of
whether the response is cached.