Chapter 22
Nitro Batcher

Requirements: Nitro Integration Requirements


Contents



22.1 Overview
22.2 Key Management
22.3 Startup
22.4 SequencerInbox
22.5 Task 1: Sequencer Espresso
22.5.1 High Level Flow
Transaction Submission Process
Finality Polling Process
22.5.2 Tranasaction Submission Process
BuildRawHotShotPayload
Finality Polling Process
SubmittedEspressoTx:
22.5.3 CheckEspressoQueryNodesForTransaction
ResubmitTransactionIfPastDelay
22.6 Task 2: Espresso  L1
22.6.1 MaybePostSequencerBatch
22.6.2 Submission To Sequencer Inbox
Signing
Checkpoint
22.6.3 Resetting



22.1 # Overview

This page documents the technical design of the batcher component of the Arbitrum Nitro integration .

The batcher consists of two tasks, which form a pipeline: Task 1 reads messages from the sequencer and sends them to Espresso. Task 2 reads messages from Espresso and sends them to L1.

Note that Task 1 need not be trusted for safety/integrity: neither the existing Nitro stack nor our added requirements make any guarantees about what happens prior to a valid confirmation being posted to Espresso (i.e. the sequencer is already untrusted).

We thus focus most of our attention on Task 2, which is designed to be stateless, self-contained, and maximally simple while upholding all the requirements save Liveness and Sequencer Sovereignty, which actually are left up to the rollup sequencer and Task 1.

22.2 # Key Management

The batcher maintains two sets of keys with different purposes:

Batcher Key

The key which is registered with the rollup chain config as the centralized batcher key. In this key is vested the authority to add batches to the L1, and thus the ultimate authority to determine the sequence of inputs processed by the rollup. Thus, this key also acts as a centralized sequencing key.

This key may exist outside of the TEE enclave running the batcher, although the private key will need to be passed into the enclave in order for it to function.

Ephemeral Key

A key generated inside the enclave which never leaves it. Thus, signatures from this key must originate inside the enclave. This is a way of proving some data originated from or was endorsed by the code running in the enclave. This is similar to producing a TEE attestation, but these signatures are cheaper to verify than the full TEE attestation.

The batcher must have both sets of keys in order to successfully post a batch; the former proves to the derivation pipeline that a batch is originating with the centralized sequencer, while the latter proves to the inbox contract that the batch is originating from within the TEE enclave.

22.3 # Startup

On startup the Nitro batcher will generate its ephemeral key, a random secp256k1 private and public key pair. The batcher will then send the public key of this keypair to the SequencerInbox contract, along with an attestation quote over the public key. This will be used by the sequencer inbox to skip validating the attestation quote with every batch.

Tech Debt: 

The assumption that fetching from a majority of the query nodes is a valid way to determine finality only holds if the majority of the query service nodes are honest, in the future we should verify the namespace and merkle proofs here.

The batch posters start creation method will use the geth crypto library to generate this key like so:
Listing 22.1: Key creation process
1    privateKey := crypto.GenerateKey() 
2    publicKey := privateKey.Public() 
3    ecdsaPubKey, err := publicKey.(*ecdsa.PublicKey) 
4    publicKeyAddress := crypto.PubkeyToAddress(ecdsaPubKey).Hex()

This key will then be sent to the sequencer inbox along with an attestation quote over the key.

22.4 # SequencerInbox

In the SequencerInbox contract we will verify the attestation to this public key, and store it in a array of attested to keys that may post batches.

After this, posting batches will require that the batcher includes a signature over some sort of data with one of these public keys, otherwise posting the batch should revert.

22.5 # Task 1: Sequencer Espresso

This task will have 2 sub processes one which handles submitting the transactions to espresso, and one which will poll submitted transactions for finality, and resubmit any that have not been finalized past a resubmission deadline These processes will run in the current transaction streamer.

22.5.1 # High Level Flow

The overall flow of a transaction through Task 1, from sequencing to confirmation on HotShot, is as follows:

1.
The sequencer creates a message, signs it, and publishes it on the sequencer feed
2.
The batcher’s transaction streamer receives the message, verifies the signature, and stores both the message and the signature in its database
3.
The submission task picks up the message from the database and creates a HotShot transaction which includes this message. It submits the transaction to HotShot and stores the transaction hash in the database.
4.
The resubmission task picks up the pending HotShot transaction from the database and queries for finality. If the message is not finalized quickly enough, the resubmission task will submit it to HotShot again.

PIC

Figure 22.1: Transaction flow

# Transaction Submission Process

This process will be responsible for consuming the backlog of message positions enqueued by the sequencer, crafting the payload to be sent to hotshot, and then enqueuing the message as submitted in the database.

# Finality Polling Process

This process will handle polling the transactions marked as submitted in the database for finality. In this implementation, finality only means that they can be fetched by hash from a majority of nodes on the query service.

When we do this we can remove it from the submitted transactions queue, Otherwise we can resubmit

22.5.2 # Tranasaction Submission Process

The transaction streamer will now store an arbutil.MessageIndex in its database to track the most recently submitted message. However, we need to ensure that messages to be submitted are not too large for the Espresso network; consequently, oversized messages are intentionally not stored in the database. While the Nitro codebase already enforces size limits, this additional check protects against malicious sequencers by preventing them from submitting oversized messages that exceed Espresso network limits. With this approach, should a message be too large, we throw an error as implemented in the ‘writeMessages‘ function; this ensures the sequencer’s signature remains valid over the unchanged message content. Liveness may be compromised if the next set of transactions due for submission cannot be formed into a message that satisfies the size limit. Such a liveness compromise is an acceptable outcome in the case of a malicious sequencer.

Listing 22.2: Function to write messages to the database
1func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockInfo, batch ethdb.Batch) error { 
2    if batch == nil { 
3        batch = s.db.NewBatch() 
4    } 
5    for i, msg := range messages { 
6        if len(msg.MessageWithMeta.Message.L2msg) > arbostypes.MaxL2MessageSize { 
7            log.Warn("L2 message is too large", "pos", pos+arbutil.MessageIndex(i), "size", len(msg.MessageWithMeta.Message.L2msg)) 
8            return fmt.Errorf("L2 message is too large") 
9        } 
10        ... 
11    } 
12    ... 
13} 
14 
15\Subsection{Low level functions} 
16 
17\Subsubsection{submitTransactionsToEspresso task} 
18This task runs as a polled function that is started when the transaction streamer is started 
19 
20\begin{lstlisting}[style=go, caption={Start function of the Transaction Streamer}] 
21func (s *TransactionStreamer) Start(ctxIn context.Context) error { 
22    s.StopWaiter.Start(ctxIn, s) 
23 
24    if s.lightClientReader != nil && s.espressoClient != nil { 
25        err = stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.submitTransactionsToEspresso, s.newSovereignTxNotifier) 
26        if err != nil { 
27            return err 
28        } 
29        err := stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.pollSubmittedTransactionForFinality, s.newSovereignTxNotifier) 
30        if err != nil { 
31            return err 
32        } 
33         
34    } 
35     
36}

submitTransactionsToEspresso exists mainly as a wrapper function for submitEspressoTransactions that handles logging errors and returning a polling interval for the StopWaiter.

Listing 22.3: StopWaiter wrapper for submitEspressoTransactions
1func (s *TransactionStreamer) submitTransactionsToEspresso(ctx context.Context, ignored struct{}) time.Duration { 
2    retryRate := s.espressoTxnsPollingInterval * 50 
3    err := s.submitEspressoTransactions(ctx) 
4    if err != nil { 
5        log.Error("failed to submit espresso transactions", "err", err) 
6        return retryRate 
7    } 
8    return s.espressoTxnsPollingInterval 
9}

The main logic for submission lives in this inner function submitEspressoTransactions. This function will use the new previously stored position and iterate from that messages positon + 1, till the current message count in the transaction streamer submitting all messages to Espresso, and then updating the latest submitted position in the database.

Listing 22.4: Main logic for transaction submission
1func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) error { 
2    /* unless otherwise noted all errors in this listing are propagated to the caller, but this code 
3       is ommitted for brevity. */ 
4 
5    lastSubmittedTxnsPos, err := s.getLastSubmittedTxnPos() 
6    if lastSubmittedTxnsPos == nil && err == nil{ 
7        // We are initializing and havent recently submitted anything. 
8        // Initialize last submitted as the head of the database, and start submitting txns to espresso 
9        lastSubmittedTxnsPos = s.getMessageCount() - 1 
10    } 
11    //build an array of the list of positions for batching transactions in the espresso submission. 
12    payloadPositions: []byte{} 
13    for position := lastSubmittedTxnsPos+1; position < s.getMessageCount(); position++{ 
14        payloadPositions = append(payloadPositions, position) 
15    } 
16    if len(payloadPositions) == 0 { 
17        return nil 
18    } 
19    payload, msgCnt := s.buildRawHotShotPayload(payloadPositions) 
20 
21    // Note: same key should not be used for two namespaces for this to work 
22    hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{ 
23        Payload:   payload, 
24        Namespace: s.chainConfig.ChainID.Uint64(), 
25    }) 
26    s.espressoTxnsStateInsertionMutex.Lock() 
27    defer s.espressoTxnsStateIn sertionMutex.Unlock() 
28    batch := s.db.NewBatch() 
29    submittedTxns, err := s.getEspressoSubmittedTxns() 
30    tx := SubmittedEspressoTx{ 
31        Hash:    hash.String(), 
32        Pos:     payloadPositions[msgCnt-1], 
33        Payload: payload, 
34        SubmittedAt: time.Now(), 
35    } 
36    if submittedTxns == nil { 
37        submittedTxns = []SubmittedEspressoTx{tx} 
38    } else { 
39        submittedTxns = append(submittedTxns, tx) 
40    } 
41 
42    err = s.setLastSubmittedTxnPos(batch, payloadPositions[msgCnt-1]) 
43    err = batch.Write() 
44    return nil 
45}

# BuildRawHotShotPayload

buildRawHotShotPayload takes a list of message positions and constructs an Espresso transaction for submitting these to HotShot. For each message position, it reads from the database the message bytes and the sequencer signature. It then concatenates these for each message into a single buffer which forms the payload of the Espresso transaction.

Listing 22.5: Building the payload
1func (s *EspressoStreamer) BuildRawHotShotPayload(msgPositions []arbutil.MessageIndex) ([]byte, error){ 
2    payload := []byte{} 
3    msgCnt := 0 
4 
5    for _, p := range msgPositions { 
6        msgBytes, err := s.fetchMessage(p) 
7        if err != nil { 
8            break 
9        } 
10 
11        sigBytes, err := s.fetchSequencerSignature(p) 
12        if err != nil { 
13            break 
14        } 
15 
16        sizeBuf := make([]byte, LEN_SIZE) 
17        positionBuf := make([]byte, INDEX_SIZE) 
18 
19        if len(payload)+len(sizeBuf)+len(msgBytes)+len(sigBytes)+len(positionBuf) > int(s.espressoMaxTransactionSize) { 
20            break 
21        } 
22        binary.BigEndian.PutUint64(sizeBuf, uint64(len(msgBytes))) 
23        binary.BigEndian.PutUint64(positionBuf, uint64(p)) 
24 
25        // Add the submitted txn position and the size of the message along with the signature and message 
26        payload = append(payload, positionBuf...) 
27        payload = append(payload, sizeBuf...) 
28        payload = append(payload, sigBytes...) 
29        payload = append(payload, msgBytes...) 
30        msgCnt += 1 
31    } 
32    return payload, msgCnt 
33 
34}

# Finality Polling Process

The primary change will reside in checkSubmittedTransactionForFinality

This function will validate that the payload is in the block using the response from the ‘FetchTransactionsInBlock‘ call. Additionally, we can improve the submission and resubmission process by storing a timestamp when we first attempted to submit the transaction to espresso. This would involve changing the SubmittedEspressoTx type to include a timestamp as such.

# SubmittedEspressoTx:

1type SubmittedEspressoTx struct { 
2    Hash    string 
3    Pos     []arbutil.MessageIndex 
4    Payload []byte 
5    SubmittedAt *time.Time 
6}

When we poll for finality, we resubmit any transactions that haven’t seen finality over a certain interval past the timestamp from when it was submitted. We would then update this messages submitted timestamp in the database with the timestamp created when we resubmit the transaction which results in checkSubmittedTransactionForFinality looking like this

1// Check if the latest submitted transaction has been finalized on L1 and verify it. 
2// Return a bool indicating whether a new transaction can be submitted to HotShot 
3func (s *TransactionStreamer) checkSubmittedTransactionForFinality(ctx context.Context) error { 
4    submittedTxns, err := s.getEspressoSubmittedTxns() 
5    resultTxns := submittedTxns 
6    if err != nil { 
7        return fmt.Errorf("submitted pos not found: %w", err) 
8    } 
9    if len(submittedTxns) == 0 { 
10        return nil // no submitted transaction, treated as successful 
11    } 
12    for i, submittedTxn := range submittedTxns{ 
13        hash := submittedTxn.Hash 
14 
15        submittedTxHash, err := tagged_base64.Parse(hash) 
16        if err != nil || submittedTxHash == nil { 
17            return fmt.Errorf("invalid hotshot tx hash, failed to parse hash %s: %w", hash, err) 
18        } 
19        data, err := s.checkEspressoQueryNodesForTransaction(ctx, submittedTxHash) 
20 
21        if err != nil { 
22            //We havent send this txn in hotshot should resubmit if its past the delay threshhold 
23            log.Warn("Unable to fetch transaction by hash, checking if we need to resubmit", "hash", 
24            //If we are past the delay, resubmit the transaction in lower level function and return new espressoSubmittedTx that we put in the resubmitted list 
25            resubmittedTxn := resubmitTransactionIfPastDelay(submittedTxn) 
26            if resubmittedTxn != nil{ 
27                //if we successfully resubmitted the tx update it in the list, if not we will retry on the next iteration 
28                resubmittedTxns := append(resultTxns[:i], resubmittedTxn, resultTxns[i+1:]A 
29            } 
30            continue 
31        } 
32        height := data.BlockHeight 
33 
34        // Validate the transaction payload is in this block 
35        resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.chainConfig.ChainID.Uint64()) 
36        if err != nil { 
37            log.Warn("Failed to fetch transactions in block referenced in fetch transaction by hash", "height", height, "error", err) 
38            continue 
39        } 
40 
41        submittedPayload := submittedTxn.Payload 
42        validated := validateIfPayloadIsInBlock(submittedPayload, resp.Transactions) 
43        if !validated { 
44            log.Warn("submitted payload was not in block that had this transaction, we should attempt to resubmit") 
45            //Im not sure if we need to resubmit here or not (In theory this case shouldnt happen), but this case seems as though it wont resolve naturally, so we should try to resubmit 
46            resubmittedTxn := resubmitTransactionIfPastDelay(submittedTxn) 
47            if resubmittedTxn != nil{ 
48                //if we successfully resubmitted the tx update it in the list, if not we will retry on the next iteration 
49                resubmittedTxns := append(resultTxns[:i], resubmittedTxn, resultTxns[i+1:]) 
50            } 
51            continue 
52        } 
53        //If this transaction is validated remove it from the result transactions 
54        resultTxns := append(resultTxns[:i], resultTxns[i+1:]) 
55    } 
56    // We have checked all transactions for finality and resubmitted transactions that needed to be resubmitted. Update the db batch. 
57    s.espressoTxnsStateInsertionMutex.Lock() 
58    defer s.espressoTxnsStateInsertionMutex.Unlock() 
59 
60    batch := s.db.NewBatch() 
61    if err := s.setEspressoSubmittedTxns(batch, submittedTxns[1:]); err != nil { 
62        return fmt.Errorf("failed to set the espresso submitted txns: %w", err) 
63    } 
64 
65    if err := batch.Write(); err != nil { 
66        return fmt.Errorf("failed to write to db: %w", err) 
67    } 
68 
69    return nil 
70}

22.5.3 # CheckEspressoQueryNodesForTransaction

1func (s *TransactionStreamer) checkEspressoQueryNodesForTransaction(ctx context.Context, hash *types.TaggedBase64) error { 
2    numNodesWithTransaction := 0 
3    for queryUrl in s.queryUrls{ 
4        data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash) 
5        numNodesWithTransaction += 1 
6    } 
7    if numNodesWithTransaction > len(s.queryUrls)/2{ 
8        return data, nil 
9    } else{ 
10        reutrn nil, fmt.ErrorF("Wasnt able to fetch transaction from the majority of query nodes") 
11    }

This function requires we also add some state to the transaction streamer in the form of an array of query node urls. This can be read in from the config when the transaction streamer struct is built

# ResubmitTransactionIfPastDelay

This function will take in the previously submitted txn and resubmit it like so

1func (s *TransactionStreamer) resubmitTransactionIfPastDelay(ctx context.Context, submittedTxn SubmittedEspressoTx) SubmittedEspressoTx{ 
2    timeSinceSubmission := time.Since(submittedTxn.SubmittedAt) 
3    if timeSinceSubmission > s.resubmitEspressoTxDeadline{ 
4        hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{ 
5            Payload: submittedTxn.Payload, 
6            Namespace: submittedTxn.Namespace 
7        }) 
8        SubmittedAt := time.Now() 
9        if err != nil{ 
10            log.Warn("Failed to resubmit transaction with error", "error", err) 
11            return nil 
12        } 
13        //If we were successful in resubmitting, we build the new submitted tx object and return it to the caller. 
14        resubmittedTxn := SubmittedEspressoTx{ 
15            Hash: hash.String(), 
16            Pos: submittedTxn.Pos, 
17            Payload: submittedTxn.Payload, 
18            SubmittedAt: SubmittedAt 
19        } 
20        return resubmittedTxn 
21    } 
22}

22.6 # Task 2: Espresso  L1

The batch poster will use the Nitro Espresso streamer , which is analogous to the upstream transaction streamaer, to read an ordered stream of messages from Espresso. the task to relay messages from HotShot to L1 becomes a straightforward adaptation of maybePostSequencerBatch.

22.6.1 # MaybePostSequencerBatch

maybePostSequencerBatch will use the EspressoStreamer instead of the TransactionStreamer

In order to account for the batch cache being reset by any errors in maybePostSequencerBatch, we will check if it is nil at the start of maybePostSequencerBatch. If it is, we will reset the Espresso streamer to the last checkpoint, so that our state is constantly resyncing with L1.

Listing 22.6: Batch building loop utilizing espresso streamer
1for { 
2    msg, err := b.espressoStreamer.Next() 
3    if err != nil { 
4        return false, fmt.Errorf("error getting message from streamer: %w", err) 
5    } 
6    success, err := b.building.segments.AddMessage(msg) 
7    if err != nil { 
8        // Clear our cache 
9        b.building = nil 
10        return false, fmt.Errorf("error adding message to batch: %w", err) 
11    } 
12    if !success { 
13        // this batch is full 
14        if !config.WaitForMaxDelay { 
15            forcePostBatch = true 
16        } 
17        b.building.haveUsefulMessage = true 
18        if b.building.firstUsefulMsg == nil { 
19            b.building.firstUsefulMsg = msg 
20        } 
21        break 
22    } 
23    if b.firstUsefulMsg is past max delay{ 
24        //batch has message older than max delay, we should attempt to post. 
25        break 
26    } 
27}

Where b.building.segments.AddMessage() is the vanilla nitro function that adds messages to the batch currently being built.

b.fetchBatchPositions will be a function that queries the inbox trackers database to see what the latest message position was, and filters logs emitted by the sequencer inbox contract for events related to new hotshot heights it has seen.

When the batch is closed, we will send it to the SequencerInbox.

22.6.2 # Submission To Sequencer Inbox

# Signing

Batches will have an additional signature from the secp256k1 public key generated by the batch poster process when it is started as outlined in the startup section. This signature will be verified in the Nitro sequencer inbox as outlined in Arbitrum Nitro sequencer inbox .

To do so, we will use the crypto.Sign() function provided by the crypto package of geth. The digest provided to Sign() will be the result of calling keccak256() on the calldata and metadata provided to addSequencerL2Batch() In case of blobs, the Sign() function will result of calling keccak256() on the blobHashes and metadata provided to addSequencerL2BatchFromBlobs()

Listing 22.7: Attested key signature
1    func(batch poster) SignBatch (bytes[] data) (signature, error){ 
2        digest := keccak256(data) 
3 
4        signature := crypto.Sign(digest, b.attestedPrivateKey) 
5 
6        return signature 
7    }

# Checkpoint

Along with the batch and signature, we also persist on L1 a checkpoint of the Espresso streamer state, allowing the batcher after a restart, or any other batcher running concurrently, to sync their own streamer with the state of the L1 after every batch post.

A checkpoint consists of the Nitro message position and an Espresso block number (specifically, the lowest block number associated with any message in the streamer’s buffer). The message position of the last message in the batch is already encoded in the batch and recorded by the Arbitrum Nitro sequencer inbox . We get the appropriate Espresso block number from the Espresso streamer.

The Espresso block number is prepended to the byte array containing the signature. This is to avoid having to change the ABI of the SequencerInbox.

22.6.3 # Resetting

It is possible that the Nitro Espresso streamer becomes out of sync with the state of the inbox contract on L1. That is, the message position of the streamer may differ from the last message sent to L1. In this case, the batcher needs to resync its streamer state to a snapshot consistent with what is on L1. Notably, this is necessary every time the batcher starts up, but can also occur at runtime due to various exceptional cases.

This is possible, for example, if

All of these cases are handled by resetting the Nitro Espresso streamer to a checkpoint which was posted to L1 along with the last successful batch posted. This ensures that the streamer is now in sync with L1, and we can continue batch posting as normal from there.

Finding this checkpoint requires finding the last time a batch was committed to L1 via the Arbitrum Nitro sequencer inbox . This is done by scanning backwards from the latest L1 block looking for an event emitted from the inbox contract with the signature LastHotshotHeight(uint256). This event is emitted each time a new batch is committed, and it gives us the Espresso block height to use in constructing the Espresso streamer snapshot. The other piece of the snapshot is the Nitro message position, which can be read from the contract storage at the L1 block number where the event was emitted.

It is possible that no such event will be found, if the batcher is starting for the first time since the rollup enabled the Espresso integration. In this case the batcher config contains the Espresso streamer snapshot from the ”genesis” of the Espresso integration.