Requirements: Nitro Espresso Streamer Requirements
Contents |
This page specifies the design of the Nitro Espresso streamer component of the Arbitrum Nitro integration . This component is responsible for fetching messages from the query nodes and then adding them to the messagesWithMetadataAndPos array. So that these messages can be processed by the Nitro caffeinated node and Nitro batcher
The streamer requires these parameters, which help define the behavior of the chain derivation from Espresso and thus, conceptually, must be part of the onchain configuration of the rollup:
Namespace: The namespace for the chain.
sequencerKey: The public key of the rollup’s centralized sequencer, which it uses to sign messages in the sequencer feed.
maxChunks: The maximum number of chunks a message can reference.
Note:
This ensures that there is a hard upper bound on the amount of work required to process any given message. It is possible to place an upper bound on the number of chunks a valid message may reference without changing Nitro semantics, because Nitro has an implicit maximum message size, even if this size is greater than that of a single Espresso block. The maximum Nitro message size is determined by the chain’s gas limit. EVM semantics are such that, for any given amount of gas, there is a finite maximum block size which uses less than that amount of gas.
powDifficulty: The difficulty of the proof of work problem required to post a message with chunks.
In practice these parameters are committed onchain indirectly, because they will
be part of the TEE configuration for running the Nitro batcher , and this
configuration is committed to in the TEE verifier contract via the MR_ENCLAVE
hash.
In addition, the streamer takes the following parameters which are needed for performance/liveness only, but are not used in defining the chain derivation:
Hotshot URLs: The URLs of the query nodes.
L1 Reader: The L1 reader for the chain.
And it uses these state variables:
nextHotshotBlockNum: The next block number to fetch from the query nodes.
currentMessagePos: The current message position that needs to be processed.
msgBuf: Buffered messages with metadata (message position and the HotShot block number the message came from). These are stored in position order, but may have gaps as messages are received out of order.
The primary interface to the streamer is a pair of functions to view and consume the next message:
1 func (s *EspressoStreamer) Peek() (*MessageWithMetadataAndPos, error) 2 func (s *EspressoStreamer) Next() (MessageWithMetadataAndPos, error)
Peek
is used when processing of the message might fail and need to be retried. For
example, the Nitro caffeinated node uses Peek
to get the next message, processes it
and stores the resulting state in the database, which might fail, and then calls Next
to advance the stream only after the current message has been processed
successfully.
Tech Debt:
Currently, only Next
is implemented, so the Nitro caffeinated node
optimistically advances the stream when it gets the next message, and
then uses a Reset
interface to move the stream back to the prior position
in case processing of that message fails. This is a clunkier and more
error-prone API than Peek
/Next
.
Peek
works by checking if the next message is already available in the buffer and, if
not, consuming the next block from Espresso until either the message is buffered or
there is no more data available from Espresso, in which case it returns an error
indicating there is no message available at this time. In this case, the caller should
retry after some delay.
Peek
1 for s.msgBuf[0].Pos != s.currentMessagePos { 2 if err := s.nextEspressoBlock(); err != nil { 3 return nil, err 4 } 5 } 6 return &s.msgBuf[0], nil
Tech Debt:
In the current implementation, polling Espresso for new blocks is not
triggered by calls to Peek
, but instead happens asynchronously in a
polling task. Peek
/Next
only check for messages already in the buffer.
The design specified here is better because
Determinism is a primary responsiblity of the Espresso streamer, and it is much easier to reason about this when there are no internal asynchronous tasks where timing could creep in as a source of non-determinism
a polling task adds an unnecessary configuration parameter (how frequently to poll) on which overall latency becomes somewhat sensitive
a long term goal is to authenticate the output of the Espresso streamer with a SNARK. This is much easier to do if the streamer is written as a simple function of inputs and outputs, rather than an autonomous task with a reliance on an asynchronous runtime
Next
uses Peek
to check if the next message is available and, if so, removes it from
the buffer, advancing the streamer state.
1 func (s *EspressoStreamer) Next() (MessageWithMetadataAndPos, error) { 2 msg, err := s.Peek() 3 if err != nil { 4 return MessageWithMetadataAndPos{}, err 5 } 6 msg := *msg 7 s.msgBuf = s.msgBuf[1:] 8 s.currentMessagePos++ 9 return msg, nil 10 }
This is an internal function used to consume the next block from Espresso and
add any relevant messages to the buffer. It is used by Peek
when the next Nitro
message is not yet buffered.
Its job is to
Fetch the next block from HotShot. If no new block is available, or if some error is encountered when fetching the block, it returns an error without modifying the streamer state, indicating that the caller should try again later.
Parse messages intended for this rollup from the payload of the block
Verify messages; ignore any without a valid sequencer signature
Filter out duplicate messages: any message with a position that we have already processed or already buffered from an earlier HotShot block is ignored. It does this in two ways:
s.currentMessagePos
) are checked against
the messages already in s.msgBuf
to see if there is already a message
with the same position
s.currentMessagePos
, so the streamer is not required to retain large
amounts of old historyBuffer messages in order by position
Tech Debt:
Currently we incur O(N) work to check for duplicates and O(N log N) to sort the list, every time we receive a new message. Both of these can be handled with just O(N) by doing a binary search insert-into-already-sorted list operation, and simply aborting if we find there is already a message with the same pos at the insertion site.
1 func (s *EspressoStreamer) nextEspressoBlock() error { 2 header, arbTxns, err := s.fetchTransactionsInBlockFromAllClients() 3 /* propagate errors */ 4 5 for _, tx := range arbTxns { 6 messages, err := s.parseEspressoPayload(header, tx.Data) 7 /* propagate errors */ 8 9 for i, message := range messages { 10 // Skip messages that are before the last message position 11 if message.Pos < s.currentMessagePos { 12 continue 13 } 14 15 // Only append to the slice if the message is not a duplicate 16 if findMessageIndexByPos == -1 { 17 s.msgBuf = append(s.msgBuf, message) 18 } 19 } 20 } 21 22 // Sort the messagesWithMetadataAndPos based on ascending order 23 // This is to ensure that we process messages in the correct order 24 sort.SliceStable(s.msgBuf, func(i, j int) bool { 25 return s.msgBuf[i].Pos < s.msgBuf[j].Pos 26 }) 27 28 s.nextHotshotBlockNum += 1 29 return nil 30 }
This is an internal function used to reconstruct a list of Nitro messages from data committed to Espresso, validating and rejecting invalid messages. It works through the payload byte by byte extracting zero or more well-formed messages. Note that each message encodes its own length, so that multiple messages can be concatenated in one payload, and the end of each message and the start of the next can be found deterministically.
This function is greedy. If at any point in the payload it is impossible to continue extracting a well-formed message (e.g. not enough payload data remains, or the data in a message is inconsistent, such as an in correct length) the entire remainder of the payload is considered invalid, but well-formed messages that have already been parsed are still used.
A well-formed message may be invalid (i.e. if it is not signed by the rollup’s sequencer). Such messages are ignored, but subsequent messages in the payload may still be processed.
The presence of invalid messages or even a complete lack of valid messages is not considered an error, though an empty list may be returned. This function will only return an error if it was unable to determine if a certain message is valid or not, and the caller should retry. For example, if a message contains chunks, those chunks need to be fetched from Espresso. If this fails due to a network error, this function will return an error. It cannot skip such messages, because this would be non-deterministic.
1 func (s *EspressStreamer) parseEspressoPayload(header EspressoHeader, payload []byte) ([]MessageWithMetadataAndPos, error) { 2 msgs := nil 3 offset := 0 4 for offset < len(payload) { 5 offset, msg, position, signature, err := /* parse the next message */ 6 if err != nil { 7 return err 8 } 9 10 /* Skip messages that are not signed by the centralized sequencer. */ 11 if !VerifySequencerSignature(s.sequencerKey, msg, signature) { 12 continue 13 } 14 msgs = append(msgs, MessageWithMetadataAndPos { 15 MessageWithMeta: msg, 16 Pos: position, 17 HotshotHeight: header.Height, 18 }) 19 } 20 21 return msgs, nil 22 }
Sequencing Sovereignty and Permissionless Batching
Note the importance of the sequencer signature check, especially as
it relates to the Permissionless Batching requirement. In the vanillo Nitro
stack, the power to sequence batches is actually vested in the batcher. The
batcher holds the key controlling the unique L1 EOA which is authorized by
the inbox contract to post batches, and whatever is posted to this contract
ultimately controls the sequence of batches which the rollup will finalize.
In the usual configuration, the batcher and sequencer are operated by the
same party, so this ”batcher sovereignty” property extends to ”sequencer
sovereignty”.
Our integration removes enforcement at the inbox contract that only one batcher key is allowed to post batches, but we want to retain centralized sequencing sovereignty, so that any of the permissionless set of batchers is bound to post only batches which have been produced and authorized by the centralized sequencer. The sequencer signature check solves this, since it is performed using the key pair that is actually controlled by the sequencer, as part of the sequencer feed mechanism.
Thus, we have this chain of sovereignty from sequencing to settlement:
The parsing of each individual message starts by consuming a single byte, which
indicates the type of message. If the message type is not one of the defined message
types, the message is considered malformed and the rest of the payload is skipped
(offset
is set to len(payload)
). Once the type of message is known, the fields for
that message type are extracted one by one from the payload following the
format.
For normal messages (type 1) this yields a complete Nitro message with metadata (message position) and signature, which can be verified and appended directly.
For messages with chunk references (type 2), there is additional validation
required, then the chunks must be fetched from Espresso and concatenated to the
payload data in the message itself to form the complete Nitro message. This can
finally be validated and appended as with type 1 messages. The job of converting a
type 2 message (represented in code as a struct MessageWithChunkRefs
) to a Nitro
message with metadata and signature belongs to a self-contained function
dereferenceChunks
.
1 func (s *EspressoStreamer) dereferenceChunks(header EspressoHeader, msg MessageWithChunkRefs) (*MessageWithMetadata, error)
This function validates the message with chunk references (to prevent
spam, since chunk processing can be costly), fetches the referenced chunks,
reconstructs the full payload, and deserializes the Nitro message. If any step
fails because the message is not valid, it returns nil
indicating the message
should be skipped. As with parseEspressoPayload
, it only fails if it is unable to
validate or reconstruct the message due to a transient error fetching data from
Espresso.
Spam Prevention Before fetching chunks, this function performs some checks to reduce the rate of spam and limit the amount of work it required for the Espresso streamer to fetch large chunks.
First, there is a maximum number of chunk references allowed in any given message (which should be set high enough so that any valid Nitro message can fit in the given number of chunks):
1 if msg.NumChunks > s.maxChunks { 2 return, nil, nil 3 }
Next, we validate a proof-of-work challenge completed by the sender. This prevents a DOS attack where the attacker, naively, has an asymmetric advantage. We assume that in the normal message case, the cost of posting data on Espresso is already sufficient to make infeasible a DOS attack where the attacker sends large amounts of nonsense data to Espresso, forcing the Espresso streamer to download and verify all of it. Chunks allow the attacker to subvert the cost of posting to Espresso: they can post one single large transaction to Espresso, and then spam many small type 2 transactions which all reference the same chunk. These transactions are cheap to post on Espresso, but expensive for the streamer to deal with, because for each one, it needs to fetch the large chunk and then verify the (invalid) sequencer signature over a large amount of data.
Proof of work levels the playing field by forcing the attacker to expend a
non-trivial amount of computation (configurable, but ideally in the ballpark of a
second or two) each time it posts one of the otherwise cheap chunk-containing
transactions to Espresso. The attacker must try (using brute force) to find a
value for the PoW
field of the MessageWithChunkRefs
such that the hash of the
entire message has a sufficient number of trailing zeros (the difficulty of the
problem):
1 if Keccak(msg) & ((1 << s.PowDifficulty) - 1) != 0 { 2 return nil, nil 3 }
Note that there is still an attack where the attacker finds a single working proof-of-work witness and then spams the cheap type 2 transactions with the same chunk references over and over again. This attack is easily mitigated by caching. After hashing the message and checking the proof-of-work solution, the streamer only needs to check the cache for an entry with the same hash to see if it has already determine that the fully reconstructed message is invalid.
Tech Debt:
The initial version does not use proof of work to prevent DOS attacks. Instead, the ability to send to Espresso is permissioned based on a simple signature on the submitted payload, so only payloads submitted by parties trusted not to attack the liveness of the system will be processed. These signatures are not trusted for safety purposes.
Chunk Fetching and Reconstruction
Once we have eliminated spam, we need to fetch the referenced chunks and
reconstruct the message. To fetch a chunk reference to Espresso block h
, transaction
index i
:
h
is less than header.Height
; that is, the referenced chunks must
be earlier in the Espresso chain than the message being reconstructed. If
not, ignore the entire message.
chunkHeader := s.espressoClient.FetchHeaderByHeight(h)
h
relative to the original header header
,
using s.espressoClient.FetchBlockMerkleProof(header.Height, h)
. Verify this
proof against header.GetBlockMerkleTreeRoot
using
espressocrypto.VerifyMerkleProof
.
s.Namespace
namespace of chunkHeader
using
s.fetchTransactionsInBlock(chunkHeader)
i
th transaction from this list (if it does not exist, ignore the entire
message).
After successfully extracting chunk data from each chunk reference, we
concatenate the Data
field of the original MessageWithChunkRefs
followed by the data
from each referenced chunk, in the order they were referenced. This must
deserialize as a Nitro MessageWithMetadata
. If it does, return it. Otherwise, return
nil
.
A streamer checkpoint consists of just two fields:
1 type Checkpoint struct { 2 messagePos uint64 3 espressoHeight uint64 4 }
This makes them easy to persist and load, whether on L1 where storage is expensive, or in a database. The two fields represent, respectively:
Loading a checkpoint means creating a new streamer to replay HotShot blocks
from espressoHeight
. Setting espressoHeight
to the lowest block containing any buffered
messages at checkpoint time ensures we will not miss any important messages on
replay, while saving the Nitro messagePos
ensures we will correctly ignore any old Nitro
messages that also get included in the replay that the stream had already yielded
before the checkpoint.
Specifically, when creating a streamer from a checkpoint, we simply set the state variables as follows:
nextHotshotBlockNum: checkpoint.espressoHeight
currentMessagePos: checkpoint.messagePos + 1
msgBuf: nil
Note that this is not necessarily equivalent to the state the streamer was in when
the checkpoint was created. If messages were committed on Espresso out of order, it
is possible that the checkpoint was created while the streamer had msgBuf
non-empty
and nextHotshotBlockNum
greater than we are setting it here. However, the
restored streamer should yield the same sequence of messages starting from
checkpoint.messagePos + 1
as the original streamer would. Now we give a sketch of a
proof of this important property.
In this proof sketch, we assume the streamer is broadly correct apart from restarts/ restoration from a checkpoint. That is, assume a streamer which is started with an Espresso height h and Nitro message position p will yield, for each position i ≥ p, the earliest valid message with position i in the Espresso block stream starting at h. Read Streaming Messages and try to convince yourself this is true. Our goal is to show that restoring a streamer from a checkpoint does not disrupt this property with respect to the messages yielded after the checkpoint.
Consider an original streamer started from genesis, with a checkpoint taken at Espresso height HC and message position PC, and a second restored streamer started from this checkpoint. We aim to prove for all p > PC (1), the restored streamer and the original streamer yield the same message at position p.
Suppose the restored streamer yields message M at position p. The original streamer only yields a different message M′ with position p if M′ appears earlier in the Espresso stream than M. This must occur at an Espresso block h < HC (2), or else the restored streamer would also yield M′.
However, this is impossible: were it so, then the original streamer would have had M′ buffered when the checkpoint was taken at position PC: M′ would have already been buffered, because the checkpoint was taken at Espresso block HC > h (2), and would not yet have been yielded because the checkpoint was taken at position PC < p (1). But the checkpoint uses Espresso height HC, by definition the lowest Espresso height in the buffer at checkpoint time, so an Espresso height h < HC could not have been in the buffer. Therefore, the original streamer must also yield M.
This function checks if the msgBuf array contains the message with the given position and return the index of the message if it exists, otherwise return -1.
1 func (s *EspressoStreamer) findMessageIndexByPos(pos uint64) int { 2 for i, msg := range s.msgBuf { 3 if msg.Pos == pos { 4 return i 5 } 6 } 7 return -1 8 }
This function fetches the next Espresso header and the transactions in the corresponding block from the rollup’s namespace.
To defend against a malicious Espresso query node, it first fetches the header from a list of query nodes, taking the majority response as authoritative. Once a header is obtained, the transactions can be fetched from a single node and validated against the header, which saves latency and bandwidth over fetching the full transaction data from all query nodes, in case the transaction payloads are large.
Tech Debt:
Eventually we should replace this ”majority rules” approach with just verifying the proofs directly, and we only need to talk to one query service. Currently we dont do this is because we need the query service to provide us with the latest header and a proof that the specific header is finalized.
1 func (s *EspressoStreamer) fetchTransactionsInBlockFromAllClients(ctx context.Context) (EspressoHeader, []espressoTypes.Bytes, error) { 2 /* unless otherwise noted all errors in this listing are propagated to the caller, but this code is omitted for brevity. */ 3 headersMap := make(map[[]byte]HeaderWithCount) 4 5 for _, client := range s.espressoClients { 6 header, err := client.FetchHeaderByHeight(ctx, s.nextHotshotBlockNum) 7 hash := header.Commit() 8 if _, ok := headersMap[hash]; ok { 9 headersMap[hash].count += 1 10 } else { 11 headersMap[hash] = HeaderWithCount{ 12 count: 1, 13 header: header, 14 } 15 } 16 } 17 18 header := nil 19 for _, headerWithCount := range headersMap { 20 if headerWithCount.count > len(s.espressoClients)/2 { 21 header = &headerWithCount.header 22 break 23 } 24 } 25 if header == nil { 26 return nil, fmt.Errorf("unable to find the header which has a majority") 27 } 28 29 txs, err := s.fetchTransactionsInBlock(ctx, header) 30 return header, txs, err 31 }
Note that because an honest committee of Espresso nodes will always
return the same response for a given block number, results of this function
are cacheable, using s.nextHotshotBlockNum
as a caching key. Generally, this
function will only be called once for each Espresso block. However, in case an
Espresso streamer needs to be reset to a checkpoint, caching the relevant
transactions in each Espresso block can greatly speed up replay from that earlier
checkpoint, because on replay the cache will be hit and we will skip multiple
HTTP requests to query nodes for each Espresso block. Such caching can be
added transparently without complicating the streamer logic at all, since
fetchTransactionsInBlockFromAllClients
presents the same interface regardless of
whether the response is cached.
Given an Espresso header, this function returns the list of transactions in the
corresponding Espresso block associated with the namespace s.Namespace
. It verifies
the list against the header. If there is a network error when fetching data from the
Espresso query service, or if the query service is malicious (i.e. returns a list
of transactions which cannot be verified against the header) it returns an
error.
1 func (s *EspressoStreamer) fetchTransactionsInBlock(header EspressoHeader) ([]Bytes, error) { 2 /* unless otherwise noted all errors in this listing are propagated to the caller, but this code is ommitted for brevity. */ 3 resp, err := s.espressoClients[0].FetchTransactionsInBlock(header.Height, s.Namespace) 4 namespaceOk := espressocrypto.VerifyNamespace( 5 s.Namespace, 6 resp.Proof, 7 *header.GetPayloadCommitment(), 8 *header.GetNsTable(), 9 resp.Transactions, 10 resp.VidCommon, 11 ) 12 if !namespaceOk { 13 return nil, fmt.Errorf("error validating namespace proof") 14 } 15 return resp.Transactions, nil 16 }