Chapter 27
Nitro Espresso Streamer

Requirements: Nitro Espresso Streamer Requirements


Contents



27.1 Overview
27.2 Parameters
27.3 Streaming Messages
27.3.1 Peek
27.3.2 Next
27.3.3 NextEspressoBlock
27.4 Checkpoints
27.4.1 Proof Of Checkpoint Consistency
27.5 Low Level Functions
27.5.1 Check if the msgBuf array contains the message
27.5.2 Fetch the transactions in a block from all the clients



27.1 # Overview

This page specifies the design of the Nitro Espresso streamer component of the Arbitrum Nitro integration . This component is responsible for fetching messages from the query nodes and then adding them to the messagesWithMetadataAndPos array. So that these messages can be processed by the Nitro caffeinated node and Nitro batcher

27.2 # Parameters

The streamer requires these parameters, which help define the behavior of the chain derivation from Espresso and thus, conceptually, must be part of the onchain configuration of the rollup:

In practice these parameters are committed onchain indirectly, because they will be part of the TEE configuration for running the Nitro batcher , and this configuration is committed to in the TEE verifier contract via the MR_ENCLAVE hash.

In addition, the streamer takes the following parameters which are needed for performance/liveness only, but are not used in defining the chain derivation:

And it uses these state variables:

27.3 # Streaming Messages

The primary interface to the streamer is a pair of functions to view and consume the next message:

1    func (s *EspressoStreamer) Peek() (*MessageWithMetadataAndPos, error) 
2    func (s *EspressoStreamer) Next() (MessageWithMetadataAndPos, error)

Peek is used when processing of the message might fail and need to be retried. For example, the Nitro caffeinated node uses Peek to get the next message, processes it and stores the resulting state in the database, which might fail, and then calls Next to advance the stream only after the current message has been processed successfully.

Tech Debt: 

Currently, only Next is implemented, so the Nitro caffeinated node optimistically advances the stream when it gets the next message, and then uses a Reset interface to move the stream back to the prior position in case processing of that message fails. This is a clunkier and more error-prone API than Peek/Next.

27.3.1 # Peek

Peek works by checking if the next message is already available in the buffer and, if not, consuming the next block from Espresso until either the message is buffered or there is no more data available from Espresso, in which case it returns an error indicating there is no message available at this time. In this case, the caller should retry after some delay.

Listing 27.1: Looping to find the next message in Peek
1    for s.msgBuf[0].Pos != s.currentMessagePos { 
2        if err := s.nextEspressoBlock(); err != nil { 
3            return nil, err 
4        } 
5    } 
6    return &s.msgBuf[0], nil

Tech Debt: 

In the current implementation, polling Espresso for new blocks is not triggered by calls to Peek, but instead happens asynchronously in a polling task. Peek/Next only check for messages already in the buffer.

The design specified here is better because

27.3.2 # Next

Next uses Peek to check if the next message is available and, if so, removes it from the buffer, advancing the streamer state.

1    func (s *EspressoStreamer) Next() (MessageWithMetadataAndPos, error) { 
2        msg, err := s.Peek() 
3        if err != nil { 
4            return MessageWithMetadataAndPos{}, err 
5        } 
6        msg := *msg 
7        s.msgBuf = s.msgBuf[1:] 
8        s.currentMessagePos++ 
9        return msg, nil 
10    }

27.3.3 # NextEspressoBlock

This is an internal function used to consume the next block from Espresso and add any relevant messages to the buffer. It is used by Peek when the next Nitro message is not yet buffered.

Its job is to

Tech Debt: 

Currently we incur O(N) work to check for duplicates and O(N log N) to sort the list, every time we receive a new message. Both of these can be handled with just O(N) by doing a binary search insert-into-already-sorted list operation, and simply aborting if we find there is already a message with the same pos at the insertion site.

Listing 27.2: createBlock Method
1    func (s *EspressoStreamer) nextEspressoBlock() error { 
2        arbTxns, err := s.fetchTransactionsInBlockFromAllClients() 
3        /* propagate errors */ 
4 
5        for _, tx := range arbTxns { 
6            indices, messages, signatures := arbutil.ParseHotShotPayload(tx) 
7            for i, message := range messages { 
8                // Skip messages that are not signed by the centralized sequencer. 
9                if !VerifySequencerSignature(s.sequencerKey, message, indices[i], signatures[i]) { 
10                    continue 
11                } 
12 
13                // Skip messages that are before the last message position 
14                if indices[i] < s.currentMessagePos { 
15                                    continue 
16                            } 
17 
18                // Only append to the slice if the message is not a duplicate 
19                if findMessageIndexByPos == -1 { 
20                    s.msgBuf = append(s.msgBuf, &MessageWithMetadataAndPos{ 
21                        MessageWithMeta: messageWithMetadata, 
22                        Pos:             indices[i], 
23                        HotshotHeight:   s.nextHotshotBlockNum, 
24                    }) 
25                } 
26            } 
27        } 
28 
29        // Sort the messagesWithMetadataAndPos based on ascending order 
30        // This is to ensure that we process messages in the correct order 
31        sort.SliceStable(s.msgBuf, func(i, j int) bool { 
32            return s.msgBuf[i].Pos < s.msgBuf[j].Pos 
33        }) 
34 
35        s.nextHotshotBlockNum += 1 
36        return nil 
37    }

Sequencing Sovereignty and Permissionless Batching

Note the importance of the sequencer signature check, especially as it relates to the Permissionless Batching requirement. In the vanillo Nitro stack, the power to sequence batches is actually vested in the batcher. The batcher holds the key controlling the unique L1 EOA which is authorized by the inbox contract to post batches, and whatever is posted to this contract ultimately controls the sequence of batches which the rollup will finalize. In the usual configuration, the batcher and sequencer are operated by the same party, so this ”batcher sovereignty” property extends to ”sequencer sovereignty”.

Our integration removes enforcement at the inbox contract that only one batcher key is allowed to post batches, but we want to retain centralized sequencing sovereignty, so that any of the permissionless set of batchers is bound to post only batches which have been produced and authorized by the centralized sequencer. The sequencer signature check solves this, since it is performed using the key pair that is actually controlled by the sequencer, as part of the sequencer feed mechanism.

Thus, we have this chain of sovereignty from sequencing to settlement:

1.
Sequencer creates a message, signs it, and publishes on the sequencer feed
2.
A permissionless batcher relays the message to Espresso, along with the signature
3.
Anyone else can post any message to the Espresso namespace, but the Espresso Streamer will ignore any message not signed by the sequencer.
4.
A permissonless batcher reads messages using the Espresso Streamer, and thus processes only message signed by the sequencer
5.
The inbox contract allows any batcher to post, but requires the batcher to produce a TEE attestation showing that it is running the Espresso Streamer code, and therefore only including sequencer-signed messages
6.
The rollup only finalizes and settles transactions which are posted to the inbox contract

27.4 # Checkpoints

A streamer checkpoint consists of just two fields:

1    type Checkpoint struct { 
2        messagePos     uint64 
3        espressoHeight uint64 
4    }

This makes them easy to persist and load, whether on L1 where storage is expensive, or in a database. The two fields represent, respectively:

1.
The Nitro message position last yielded by the stream when the checkpoint was created
2.
The lowest Espresso block number corresponding to any message in the buffer at the time the checkpoint was created

Loading a checkpoint means creating a new streamer to replay HotShot blocks from espressoHeight. Setting espressoHeight to the lowest block containing any buffered messages at checkpoint time ensures we will not miss any important messages on replay, while saving the Nitro messagePos ensures we will correctly ignore any old Nitro messages that also get included in the replay that the stream had already yielded before the checkpoint.

Specifically, when creating a streamer from a checkpoint, we simply set the state variables as follows:

1.
nextHotshotBlockNum: checkpoint.espressoHeight
2.
currentMessagePos: checkpoint.messagePos + 1
3.
msgBuf: nil

Note that this is not necessarily equivalent to the state the streamer was in when the checkpoint was created. If messages were committed on Espresso out of order, it is possible that the checkpoint was created while the streamer had msgBuf non-empty and nextHotshotBlockNum greater than we are setting it here. However, the restored streamer should yield the same sequence of messages starting from checkpoint.messagePos + 1 as the original streamer would. Now we give a sketch of a proof of this important property.

27.4.1 # Proof Of Checkpoint Consistency

In this proof sketch, we assume the streamer is broadly correct apart from restarts/ restoration from a checkpoint. That is, assume a streamer which is started with an Espresso height h and Nitro message position p will yield, for each position i p, the earliest valid message with position i in the Espresso block stream starting at h. Read Streaming Messages and try to convince yourself this is true. Our goal is to show that restoring a streamer from a checkpoint does not disrupt this property with respect to the messages yielded after the checkpoint.

Consider an original streamer started from genesis, with a checkpoint taken at Espresso height HC and message position PC, and a second restored streamer started from this checkpoint. We aim to prove for all p > PC (1), the restored streamer and the original streamer yield the same message at position p.

Suppose the restored streamer yields message M at position p. The original streamer only yields a different message Mwith position p if Mappears earlier in the Espresso stream than M. This must occur at an Espresso block h < HC (2), or else the restored streamer would also yield M.

However, this is impossible: were it so, then the original streamer would have had Mbuffered when the checkpoint was taken at position PC: Mwould have already been buffered, because the checkpoint was taken at Espresso block HC > h (2), and would not yet have been yielded because the checkpoint was taken at position PC < p (1). But the checkpoint uses Espresso height HC, by definition the lowest Espresso height in the buffer at checkpoint time, so an Espresso height h < HC could not have been in the buffer. Therefore, the original streamer must also yield M.

27.5 # Low Level Functions

27.5.1 # Check if the msgBuf array contains the message

This function checks if the msgBuf array contains the message with the given position and return the index of the message if it exists, otherwise return -1.

Listing 27.3: findMessageIndexByPos Method
1    func (s *EspressoStreamer) findMessageIndexByPos(pos uint64) int { 
2        for i, msg := range s.msgBuf { 
3            if msg.Pos == pos { 
4                return i 
5            } 
6        } 
7        return -1 
8    }

27.5.2 # Fetch the transactions in a block from all the clients

This function fetches the transactions in a block from all the query node clients and returns the transactions.

Tech Debt: 

Eventually we should replace this ”majority rules” approach with just verifying the proofs directly, and we only need to talk to one query service. Currently we dont do this is because we need the query service to provide us with the latest header and a proof that the specific header is finalized.

Listing 27.4: fetchTransactionsInBlockFromAllClients Method
1    func (s *EspressoStreamer) fetchTransactionsInBlockFromAllClients(ctx context.Context) ([]espressoTypes.Bytes, error) { 
2 
3        transactionsMap := make(map[string]TransactionsInBlockWithCount) 
4 
5        for _, client := range s.espressoClients { 
6            txs, err := client.FetchTransactionsInBlock(ctx, s.nextHotshotBlockNum, s.namespace) 
7            if err != nil { 
8                return nil, err 
9            } 
10            txsHash := arrayToHash(txs.Transactions) 
11            if transactionsMap[txsHash].count > 0 { 
12                transactionsMap[txsHash] = TransactionsInBlockWithCount{ 
13                    count:        transactionsMap[txsHash].count + 1, 
14                    transactions: transactionsMap[txsHash].transactions, 
15                } 
16            } else { 
17                transactionsMap[txsHash] = TransactionsInBlockWithCount{ 
18                    count:        1, 
19                    transactions: transactionsMap[txsHash].transactions, 
20                } 
21            } 
22        } 
23 
24        for _, txInBlockWithCount := range transactionsMap { 
25            if txInBlockWithCount.count > len(s.espressoClients)/2 { 
26                return txInBlockWithCount.transactions, nil 
27            } 
28        } 
29 
30        return nil, fmt.Errorf("unable to find the transactions which have a majority") 
31    }

Note that because an honest committee of Espresso nodes will always return the same response for a given block number, results of this function are cacheable, using s.nextHotshotBlockNum as a caching key. Generally, this function will only be called once for each Espresso block. However, in case an Espresso streamer needs to be reset to a checkpoint, caching the relevant transactions in each Espresso block can greatly speed up replay from that earlier checkpoint, because on replay the cache will be hit and we will skip multiple HTTP requests to query nodes for each Espresso block. Such caching can be added transparently without complicating the streamer logic at all, since fetchTransactionsInBlockFromAllClients presents the same interface regardless of whether the response is cached.