Chapter 25
Nitro Espresso Streamer

Requirements: Nitro Espresso Streamer Requirements


Contents



25.1 Overview
25.2 Parameters
25.3 Streaming Messages
25.3.1 Peek
25.3.2 Next
25.3.3 NextEspressoBlock
25.3.4 ParseEspressoPayload
DereferenceChunks
Spam Prevention
Chunk Fetching and Reconstruction
25.4 Checkpoints
25.4.1 Proof Of Checkpoint Consistency
25.5 Low Level Functions
25.5.1 Check if the msgBuf array contains the message
25.5.2 Fetch the transactions in a block from all the clients
25.5.3 Fetch and verify the transactions in a block



25.1 # Overview

This page specifies the design of the Nitro Espresso streamer component of the Arbitrum Nitro integration . This component is responsible for fetching messages from the query nodes and then adding them to the messagesWithMetadataAndPos array. So that these messages can be processed by the Nitro caffeinated node and Nitro batcher

25.2 # Parameters

The streamer requires these parameters, which help define the behavior of the chain derivation from Espresso and thus, conceptually, must be part of the onchain configuration of the rollup:

In practice these parameters are committed onchain indirectly, because they will be part of the TEE configuration for running the Nitro batcher , and this configuration is committed to in the TEE verifier contract via the MR_ENCLAVE hash.

In addition, the streamer takes the following parameters which are needed for performance/liveness only, but are not used in defining the chain derivation:

And it uses these state variables:

25.3 # Streaming Messages

The primary interface to the streamer is a pair of functions to view and consume the next message:

1    func (s *EspressoStreamer) Peek() (*MessageWithMetadataAndPos, error) 
2    func (s *EspressoStreamer) Next() (MessageWithMetadataAndPos, error)

Peek is used when processing of the message might fail and need to be retried. For example, the Nitro caffeinated node uses Peek to get the next message, processes it and stores the resulting state in the database, which might fail, and then calls Next to advance the stream only after the current message has been processed successfully.

Tech Debt: 

Currently, only Next is implemented, so the Nitro caffeinated node optimistically advances the stream when it gets the next message, and then uses a Reset interface to move the stream back to the prior position in case processing of that message fails. This is a clunkier and more error-prone API than Peek/Next.

25.3.1 # Peek

Peek works by checking if the next message is already available in the buffer and, if not, consuming the next block from Espresso until either the message is buffered or there is no more data available from Espresso, in which case it returns an error indicating there is no message available at this time. In this case, the caller should retry after some delay.

Listing 25.1: Looping to find the next message in Peek
1    for s.msgBuf[0].Pos != s.currentMessagePos { 
2        if err := s.nextEspressoBlock(); err != nil { 
3            return nil, err 
4        } 
5    } 
6    return &s.msgBuf[0], nil

Tech Debt: 

In the current implementation, polling Espresso for new blocks is not triggered by calls to Peek, but instead happens asynchronously in a polling task. Peek/Next only check for messages already in the buffer.

The design specified here is better because

25.3.2 # Next

Next uses Peek to check if the next message is available and, if so, removes it from the buffer, advancing the streamer state.

1    func (s *EspressoStreamer) Next() (MessageWithMetadataAndPos, error) { 
2        msg, err := s.Peek() 
3        if err != nil { 
4            return MessageWithMetadataAndPos{}, err 
5        } 
6        msg := *msg 
7        s.msgBuf = s.msgBuf[1:] 
8        s.currentMessagePos++ 
9        return msg, nil 
10    }

25.3.3 # NextEspressoBlock

This is an internal function used to consume the next block from Espresso and add any relevant messages to the buffer. It is used by Peek when the next Nitro message is not yet buffered.

Its job is to

Tech Debt: 

Currently we incur O(N) work to check for duplicates and O(N log N) to sort the list, every time we receive a new message. Both of these can be handled with just O(N) by doing a binary search insert-into-already-sorted list operation, and simply aborting if we find there is already a message with the same pos at the insertion site.

Listing 25.2: createBlock Method
1    func (s *EspressoStreamer) nextEspressoBlock() error { 
2        header, arbTxns, err := s.fetchTransactionsInBlockFromAllClients() 
3        /* propagate errors */ 
4 
5        for _, tx := range arbTxns { 
6            messages, err := s.parseEspressoPayload(header, tx.Data) 
7            /* propagate errors */ 
8 
9            for i, message := range messages { 
10                // Skip messages that are before the last message position 
11                if message.Pos < s.currentMessagePos { 
12                                    continue 
13                            } 
14 
15                // Only append to the slice if the message is not a duplicate 
16                if findMessageIndexByPos == -1 { 
17                    s.msgBuf = append(s.msgBuf, message) 
18                } 
19            } 
20        } 
21 
22        // Sort the messagesWithMetadataAndPos based on ascending order 
23        // This is to ensure that we process messages in the correct order 
24        sort.SliceStable(s.msgBuf, func(i, j int) bool { 
25            return s.msgBuf[i].Pos < s.msgBuf[j].Pos 
26        }) 
27 
28        s.nextHotshotBlockNum += 1 
29        return nil 
30    }

25.3.4 # ParseEspressoPayload

This is an internal function used to reconstruct a list of Nitro messages from data committed to Espresso, validating and rejecting invalid messages. It works through the payload byte by byte extracting zero or more well-formed messages. Note that each message encodes its own length, so that multiple messages can be concatenated in one payload, and the end of each message and the start of the next can be found deterministically.

This function is greedy. If at any point in the payload it is impossible to continue extracting a well-formed message (e.g. not enough payload data remains, or the data in a message is inconsistent, such as an in correct length) the entire remainder of the payload is considered invalid, but well-formed messages that have already been parsed are still used.

A well-formed message may be invalid (i.e. if it is not signed by the rollup’s sequencer). Such messages are ignored, but subsequent messages in the payload may still be processed.

The presence of invalid messages or even a complete lack of valid messages is not considered an error, though an empty list may be returned. This function will only return an error if it was unable to determine if a certain message is valid or not, and the caller should retry. For example, if a message contains chunks, those chunks need to be fetched from Espresso. If this fails due to a network error, this function will return an error. It cannot skip such messages, because this would be non-deterministic.

1    func (s *EspressStreamer) parseEspressoPayload(header EspressoHeader, payload []byte) ([]MessageWithMetadataAndPos, error) { 
2        msgs := nil 
3        offset := 0 
4        for offset < len(payload) { 
5            offset, msg, position, signature, err := /* parse the next message */ 
6            if err != nil { 
7                return err 
8            } 
9 
10            /* Skip messages that are not signed by the centralized sequencer. */ 
11            if !VerifySequencerSignature(s.sequencerKey, msg, signature) { 
12                continue 
13            } 
14            msgs = append(msgs, MessageWithMetadataAndPos { 
15                MessageWithMeta: msg, 
16                Pos:             position, 
17                HotshotHeight:   header.Height, 
18            }) 
19        } 
20 
21        return msgs, nil 
22    }

Sequencing Sovereignty and Permissionless Batching Note the importance of the sequencer signature check, especially as it relates to the Permissionless Batching requirement. In the vanillo Nitro stack, the power to sequence batches is actually vested in the batcher. The batcher holds the key controlling the unique L1 EOA which is authorized by the inbox contract to post batches, and whatever is posted to this contract ultimately controls the sequence of batches which the rollup will finalize. In the usual configuration, the batcher and sequencer are operated by the same party, so this ”batcher sovereignty” property extends to ”sequencer sovereignty”.

Our integration removes enforcement at the inbox contract that only one batcher key is allowed to post batches, but we want to retain centralized sequencing sovereignty, so that any of the permissionless set of batchers is bound to post only batches which have been produced and authorized by the centralized sequencer. The sequencer signature check solves this, since it is performed using the key pair that is actually controlled by the sequencer, as part of the sequencer feed mechanism.

Thus, we have this chain of sovereignty from sequencing to settlement:

1.
Sequencer creates a message, signs it, and publishes on the sequencer feed
2.
A permissionless batcher relays the message to Espresso, along with the signature
3.
Anyone else can post any message to the Espresso namespace, but the Espresso Streamer will ignore any message not signed by the sequencer.
4.
A permissonless batcher reads messages using the Espresso Streamer, and thus processes only message signed by the sequencer
5.
The inbox contract allows any batcher to post, but requires the batcher to produce a TEE attestation showing that it is running the Espresso Streamer code, and therefore only including sequencer-signed messages
6.
The rollup only finalizes and settles transactions which are posted to the inbox contract

The parsing of each individual message starts by consuming a single byte, which indicates the type of message. If the message type is not one of the defined message types, the message is considered malformed and the rest of the payload is skipped (offset is set to len(payload)). Once the type of message is known, the fields for that message type are extracted one by one from the payload following the format.

For normal messages (type 1) this yields a complete Nitro message with metadata (message position) and signature, which can be verified and appended directly.

For messages with chunk references (type 2), there is additional validation required, then the chunks must be fetched from Espresso and concatenated to the payload data in the message itself to form the complete Nitro message. This can finally be validated and appended as with type 1 messages. The job of converting a type 2 message (represented in code as a struct MessageWithChunkRefs) to a Nitro message with metadata and signature belongs to a self-contained function dereferenceChunks.

# DereferenceChunks

1    func (s *EspressoStreamer) dereferenceChunks(header EspressoHeader, msg MessageWithChunkRefs) (*MessageWithMetadata, error)

This function validates the message with chunk references (to prevent spam, since chunk processing can be costly), fetches the referenced chunks, reconstructs the full payload, and deserializes the Nitro message. If any step fails because the message is not valid, it returns nil indicating the message should be skipped. As with parseEspressoPayload, it only fails if it is unable to validate or reconstruct the message due to a transient error fetching data from Espresso.

Spam Prevention Before fetching chunks, this function performs some checks to reduce the rate of spam and limit the amount of work it required for the Espresso streamer to fetch large chunks.

First, there is a maximum number of chunk references allowed in any given message (which should be set high enough so that any valid Nitro message can fit in the given number of chunks):

1    if msg.NumChunks > s.maxChunks { 
2        return, nil, nil 
3    }

Next, we validate a proof-of-work challenge completed by the sender. This prevents a DOS attack where the attacker, naively, has an asymmetric advantage. We assume that in the normal message case, the cost of posting data on Espresso is already sufficient to make infeasible a DOS attack where the attacker sends large amounts of nonsense data to Espresso, forcing the Espresso streamer to download and verify all of it. Chunks allow the attacker to subvert the cost of posting to Espresso: they can post one single large transaction to Espresso, and then spam many small type 2 transactions which all reference the same chunk. These transactions are cheap to post on Espresso, but expensive for the streamer to deal with, because for each one, it needs to fetch the large chunk and then verify the (invalid) sequencer signature over a large amount of data.

Proof of work levels the playing field by forcing the attacker to expend a non-trivial amount of computation (configurable, but ideally in the ballpark of a second or two) each time it posts one of the otherwise cheap chunk-containing transactions to Espresso. The attacker must try (using brute force) to find a value for the PoW field of the MessageWithChunkRefs such that the hash of the entire message has a sufficient number of trailing zeros (the difficulty of the problem):

1    if Keccak(msg) & ((1 << s.PowDifficulty) - 1) != 0 { 
2        return nil, nil 
3    }

Note that there is still an attack where the attacker finds a single working proof-of-work witness and then spams the cheap type 2 transactions with the same chunk references over and over again. This attack is easily mitigated by caching. After hashing the message and checking the proof-of-work solution, the streamer only needs to check the cache for an entry with the same hash to see if it has already determine that the fully reconstructed message is invalid.

Tech Debt: 

The initial version does not use proof of work to prevent DOS attacks. Instead, the ability to send to Espresso is permissioned based on a simple signature on the submitted payload, so only payloads submitted by parties trusted not to attack the liveness of the system will be processed. These signatures are not trusted for safety purposes.

Chunk Fetching and Reconstruction Once we have eliminated spam, we need to fetch the referenced chunks and reconstruct the message. To fetch a chunk reference to Espresso block h, transaction index i:

1.
Check that h is less than header.Height; that is, the referenced chunks must be earlier in the Espresso chain than the message being reconstructed. If not, ignore the entire message.
2.
Fetch the header of the block containing the chunk, using chunkHeader := s.espressoClient.FetchHeaderByHeight(h)
3.
Get a proof that this is the correct header for height h relative to the original header header, using s.espressoClient.FetchBlockMerkleProof(header.Height, h). Verify this proof against header.GetBlockMerkleTreeRoot using espressocrypto.VerifyMerkleProof.
4.
Get the transactions in the s.Namespace namespace of chunkHeader using s.fetchTransactionsInBlock(chunkHeader)
5.
Get the ith transaction from this list (if it does not exist, ignore the entire message).
6.
The first byte of this transaction payload must be 3 (type 3 chunk transaction). Extract remaining bytes.

After successfully extracting chunk data from each chunk reference, we concatenate the Data field of the original MessageWithChunkRefs followed by the data from each referenced chunk, in the order they were referenced. This must deserialize as a Nitro MessageWithMetadata. If it does, return it. Otherwise, return nil.

25.4 # Checkpoints

A streamer checkpoint consists of just two fields:

1    type Checkpoint struct { 
2        messagePos     uint64 
3        espressoHeight uint64 
4    }

This makes them easy to persist and load, whether on L1 where storage is expensive, or in a database. The two fields represent, respectively:

1.
The Nitro message position last yielded by the stream when the checkpoint was created
2.
The lowest Espresso block number corresponding to any message in the buffer at the time the checkpoint was created

Loading a checkpoint means creating a new streamer to replay HotShot blocks from espressoHeight. Setting espressoHeight to the lowest block containing any buffered messages at checkpoint time ensures we will not miss any important messages on replay, while saving the Nitro messagePos ensures we will correctly ignore any old Nitro messages that also get included in the replay that the stream had already yielded before the checkpoint.

Specifically, when creating a streamer from a checkpoint, we simply set the state variables as follows:

1.
nextHotshotBlockNum: checkpoint.espressoHeight
2.
currentMessagePos: checkpoint.messagePos + 1
3.
msgBuf: nil

Note that this is not necessarily equivalent to the state the streamer was in when the checkpoint was created. If messages were committed on Espresso out of order, it is possible that the checkpoint was created while the streamer had msgBuf non-empty and nextHotshotBlockNum greater than we are setting it here. However, the restored streamer should yield the same sequence of messages starting from checkpoint.messagePos + 1 as the original streamer would. Now we give a sketch of a proof of this important property.

25.4.1 # Proof Of Checkpoint Consistency

In this proof sketch, we assume the streamer is broadly correct apart from restarts/ restoration from a checkpoint. That is, assume a streamer which is started with an Espresso height h and Nitro message position p will yield, for each position i p, the earliest valid message with position i in the Espresso block stream starting at h. Read Streaming Messages and try to convince yourself this is true. Our goal is to show that restoring a streamer from a checkpoint does not disrupt this property with respect to the messages yielded after the checkpoint.

Consider an original streamer started from genesis, with a checkpoint taken at Espresso height HC and message position PC, and a second restored streamer started from this checkpoint. We aim to prove for all p > PC (1), the restored streamer and the original streamer yield the same message at position p.

Suppose the restored streamer yields message M at position p. The original streamer only yields a different message Mwith position p if Mappears earlier in the Espresso stream than M. This must occur at an Espresso block h < HC (2), or else the restored streamer would also yield M.

However, this is impossible: were it so, then the original streamer would have had Mbuffered when the checkpoint was taken at position PC: Mwould have already been buffered, because the checkpoint was taken at Espresso block HC > h (2), and would not yet have been yielded because the checkpoint was taken at position PC < p (1). But the checkpoint uses Espresso height HC, by definition the lowest Espresso height in the buffer at checkpoint time, so an Espresso height h < HC could not have been in the buffer. Therefore, the original streamer must also yield M.

25.5 # Low Level Functions

25.5.1 # Check if the msgBuf array contains the message

This function checks if the msgBuf array contains the message with the given position and return the index of the message if it exists, otherwise return -1.

Listing 25.3: findMessageIndexByPos Method
1    func (s *EspressoStreamer) findMessageIndexByPos(pos uint64) int { 
2        for i, msg := range s.msgBuf { 
3            if msg.Pos == pos { 
4                return i 
5            } 
6        } 
7        return -1 
8    }

25.5.2 # Fetch the transactions in a block from all the clients

This function fetches the next Espresso header and the transactions in the corresponding block from the rollup’s namespace.

To defend against a malicious Espresso query node, it first fetches the header from a list of query nodes, taking the majority response as authoritative. Once a header is obtained, the transactions can be fetched from a single node and validated against the header, which saves latency and bandwidth over fetching the full transaction data from all query nodes, in case the transaction payloads are large.

Tech Debt: 

Eventually we should replace this ”majority rules” approach with just verifying the proofs directly, and we only need to talk to one query service. Currently we dont do this is because we need the query service to provide us with the latest header and a proof that the specific header is finalized.

Listing 25.4: fetchTransactionsInBlockFromAllClients Method
1    func (s *EspressoStreamer) fetchTransactionsInBlockFromAllClients(ctx context.Context) (EspressoHeader, []espressoTypes.Bytes, error) { 
2        /* unless otherwise noted all errors in this listing are propagated to the caller, but this code is omitted for brevity. */ 
3        headersMap := make(map[[]byte]HeaderWithCount) 
4 
5        for _, client := range s.espressoClients { 
6            header, err := client.FetchHeaderByHeight(ctx, s.nextHotshotBlockNum) 
7            hash := header.Commit() 
8            if _, ok := headersMap[hash]; ok { 
9                headersMap[hash].count += 1 
10            } else { 
11                headersMap[hash] = HeaderWithCount{ 
12                    count:  1, 
13                    header: header, 
14                } 
15            } 
16        } 
17 
18        header := nil 
19        for _, headerWithCount := range headersMap { 
20            if headerWithCount.count > len(s.espressoClients)/2 { 
21                header = &headerWithCount.header 
22                break 
23            } 
24        } 
25        if header == nil { 
26            return nil, fmt.Errorf("unable to find the header which has a majority") 
27        } 
28 
29        txs, err := s.fetchTransactionsInBlock(ctx, header) 
30        return header, txs, err 
31    }

Note that because an honest committee of Espresso nodes will always return the same response for a given block number, results of this function are cacheable, using s.nextHotshotBlockNum as a caching key. Generally, this function will only be called once for each Espresso block. However, in case an Espresso streamer needs to be reset to a checkpoint, caching the relevant transactions in each Espresso block can greatly speed up replay from that earlier checkpoint, because on replay the cache will be hit and we will skip multiple HTTP requests to query nodes for each Espresso block. Such caching can be added transparently without complicating the streamer logic at all, since fetchTransactionsInBlockFromAllClients presents the same interface regardless of whether the response is cached.

25.5.3 # Fetch and verify the transactions in a block

Given an Espresso header, this function returns the list of transactions in the corresponding Espresso block associated with the namespace s.Namespace. It verifies the list against the header. If there is a network error when fetching data from the Espresso query service, or if the query service is malicious (i.e. returns a list of transactions which cannot be verified against the header) it returns an error.

1    func (s *EspressoStreamer) fetchTransactionsInBlock(header EspressoHeader) ([]Bytes, error) { 
2        /* unless otherwise noted all errors in this listing are propagated to the caller, but this code is ommitted for brevity. */ 
3        resp, err := s.espressoClients[0].FetchTransactionsInBlock(header.Height, s.Namespace) 
4        namespaceOk := espressocrypto.VerifyNamespace( 
5            s.Namespace, 
6            resp.Proof, 
7            *header.GetPayloadCommitment(), 
8            *header.GetNsTable(), 
9            resp.Transactions, 
10            resp.VidCommon, 
11        ) 
12        if !namespaceOk { 
13            return nil, fmt.Errorf("error validating namespace proof") 
14        } 
15        return resp.Transactions, nil 
16    }