Requirements: Nitro Integration Requirements
Contents |
This page documents the technical design of the batcher component of the Arbitrum Nitro integration .
The batcher consists of two tasks, which form a pipeline: Task 1 reads messages from the sequencer and sends them to Espresso. Task 2 reads messages from Espresso and sends them to L1.
Note that Task 1 need not be trusted for safety/integrity: neither the existing Nitro stack nor our added requirements make any guarantees about what happens prior to a valid confirmation being posted to Espresso (i.e. the sequencer is already untrusted).
We thus focus most of our attention on Task 2, which is designed to be stateless, self-contained, and maximally simple while upholding all the requirements save Liveness and Sequencer Sovereignty, which actually are left up to the rollup sequencer and Task 1.
The batcher maintains two sets of keys with different purposes:
The key which is registered with the rollup chain config as the centralized batcher key. In this key is vested the authority to add batches to the L1, and thus the ultimate authority to determine the sequence of inputs processed by the rollup. Thus, this key also acts as a centralized sequencing key.
This key may exist outside of the TEE enclave running the batcher, although the private key will need to be passed into the enclave in order for it to function.
A key generated inside the enclave which never leaves it. Thus, signatures from this key must originate inside the enclave. This is a way of proving some data originated from or was endorsed by the code running in the enclave. This is similar to producing a TEE attestation, but these signatures are cheaper to verify than the full TEE attestation.
The batcher must have both sets of keys in order to successfully post a batch; the former proves to the derivation pipeline that a batch is originating with the centralized sequencer, while the latter proves to the inbox contract that the batch is originating from within the TEE enclave.
On startup the Nitro batcher will generate its ephemeral key, a random secp256k1 private and public key pair. The batcher will then send the public key of this keypair to the SequencerInbox contract, along with an attestation quote over the public key. This will be used by the sequencer inbox to skip validating the attestation quote with every batch.
Tech Debt:
The assumption that fetching from a majority of the query nodes is a valid way to determine finality only holds if the majority of the query service nodes are honest, in the future we should verify the namespace and merkle proofs here.
1 privateKey := crypto.GenerateKey() 2 publicKey := privateKey.Public() 3 ecdsaPubKey, err := publicKey.(*ecdsa.PublicKey) 4 publicKeyAddress := crypto.PubkeyToAddress(ecdsaPubKey).Hex()
This key will then be sent to the sequencer inbox along with an attestation quote over the key.
In the SequencerInbox contract we will verify the attestation to this public key, and store it in a array of attested to keys that may post batches.
After this, posting batches will require that the batcher includes a signature over some sort of data with one of these public keys, otherwise posting the batch should revert.
This task will have 2 sub processes one which handles submitting the transactions to espresso, and one which will poll submitted transactions for finality, and resubmit any that have not been finalized past a resubmission deadline These processes will run in the current transaction streamer.
The overall flow of a transaction through Task 1, from sequencing to confirmation on HotShot, is as follows:
This process will be responsible for consuming the backlog of message positions enqueued by the sequencer, crafting the payload to be sent to hotshot, and then enqueuing the message as submitted in the database.
This process will handle polling the transactions marked as submitted in the database for finality. In this implementation, finality only means that they can be fetched by hash from a majority of nodes on the query service.
When we do this we can remove it from the submitted transactions queue, Otherwise we can resubmit
The transaction streamer will now store an arbutil.MessageIndex in its database to track the most recently submitted message. However, we need to ensure that messages to be submitted are not too large for the Espresso network; consequently, oversized messages are intentionally not stored in the database. While the Nitro codebase already enforces size limits, this additional check protects against malicious sequencers by preventing them from submitting oversized messages that exceed Espresso network limits. With this approach, should a message be too large, we throw an error as implemented in the ‘writeMessages‘ function; this ensures the sequencer’s signature remains valid over the unchanged message content. Liveness may be compromised if the next set of transactions due for submission cannot be formed into a message that satisfies the size limit. Such a liveness compromise is an acceptable outcome in the case of a malicious sequencer.
1func (s *TransactionStreamer) writeMessages(pos arbutil.MessageIndex, messages []arbostypes.MessageWithMetadataAndBlockInfo, batch ethdb.Batch) error { 2 if batch == nil { 3 batch = s.db.NewBatch() 4 } 5 for i, msg := range messages { 6 if len(msg.MessageWithMeta.Message.L2msg) > arbostypes.MaxL2MessageSize { 7 log.Warn("L2 message is too large", "pos", pos+arbutil.MessageIndex(i), "size", len(msg.MessageWithMeta.Message.L2msg)) 8 return fmt.Errorf("L2 message is too large") 9 } 10 ... 11 } 12 ... 13} 14 15\Subsection{Low level functions} 16 17\Subsubsection{submitTransactionsToEspresso task} 18This task runs as a polled function that is started when the transaction streamer is started 19 20\begin{lstlisting}[style=go, caption={Start function of the Transaction Streamer}] 21func (s *TransactionStreamer) Start(ctxIn context.Context) error { 22 s.StopWaiter.Start(ctxIn, s) 23 24 if s.lightClientReader != nil && s.espressoClient != nil { 25 err = stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.submitTransactionsToEspresso, s.newSovereignTxNotifier) 26 if err != nil { 27 return err 28 } 29 err := stopwaiter.CallIterativelyWith[struct{}](&s.StopWaiterSafe, s.pollSubmittedTransactionForFinality, s.newSovereignTxNotifier) 30 if err != nil { 31 return err 32 } 33 … 34 } 35 … 36}
submitTransactionsToEspresso exists mainly as a wrapper function for submitEspressoTransactions that handles logging errors and returning a polling interval for the StopWaiter.
1func (s *TransactionStreamer) submitTransactionsToEspresso(ctx context.Context, ignored struct{}) time.Duration { 2 retryRate := s.espressoTxnsPollingInterval * 50 3 err := s.submitEspressoTransactions(ctx) 4 if err != nil { 5 log.Error("failed to submit espresso transactions", "err", err) 6 return retryRate 7 } 8 return s.espressoTxnsPollingInterval 9}
The main logic for submission lives in this inner function submitEspressoTransactions. This function will use the new previously stored position and iterate from that messages positon + 1, till the current message count in the transaction streamer submitting all messages to Espresso, and then updating the latest submitted position in the database.
1func (s *TransactionStreamer) submitEspressoTransactions(ctx context.Context) error { 2 /* unless otherwise noted all errors in this listing are propagated to the caller, but this code 3 is ommitted for brevity. */ 4 5 lastSubmittedTxnsPos, err := s.getLastSubmittedTxnPos() 6 if lastSubmittedTxnsPos == nil && err == nil{ 7 // We are initializing and haven’t recently submitted anything. 8 // Initialize last submitted as the head of the database, and start submitting txns to espresso 9 lastSubmittedTxnsPos = s.getMessageCount() - 1 10 } 11 //build an array of the list of positions for batching transactions in the espresso submission. 12 payloadPositions: []byte{} 13 for position := lastSubmittedTxnsPos+1; position < s.getMessageCount(); position++{ 14 payloadPositions = append(payloadPositions, position) 15 } 16 if len(payloadPositions) == 0 { 17 return nil 18 } 19 payload, msgCnt := s.buildRawHotShotPayload(payloadPositions) 20 21 // Note: same key should not be used for two namespaces for this to work 22 hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{ 23 Payload: payload, 24 Namespace: s.chainConfig.ChainID.Uint64(), 25 }) 26 s.espressoTxnsStateInsertionMutex.Lock() 27 defer s.espressoTxnsStateIn sertionMutex.Unlock() 28 batch := s.db.NewBatch() 29 submittedTxns, err := s.getEspressoSubmittedTxns() 30 tx := SubmittedEspressoTx{ 31 Hash: hash.String(), 32 Pos: payloadPositions[msgCnt-1], 33 Payload: payload, 34 SubmittedAt: time.Now(), 35 } 36 if submittedTxns == nil { 37 submittedTxns = []SubmittedEspressoTx{tx} 38 } else { 39 submittedTxns = append(submittedTxns, tx) 40 } 41 42 err = s.setLastSubmittedTxnPos(batch, payloadPositions[msgCnt-1]) 43 err = batch.Write() 44 return nil 45}
buildRawHotShotPayload
takes a list of message positions and constructs an Espresso
transaction for submitting these to HotShot. For each message position, it reads from
the database the message bytes and the sequencer signature. It then concatenates
these for each message into a single buffer which forms the payload of the Espresso
transaction.
1func (s *EspressoStreamer) BuildRawHotShotPayload(msgPositions []arbutil.MessageIndex) ([]byte, error){ 2 payload := []byte{} 3 msgCnt := 0 4 5 for _, p := range msgPositions { 6 msgBytes, err := s.fetchMessage(p) 7 if err != nil { 8 break 9 } 10 11 sigBytes, err := s.fetchSequencerSignature(p) 12 if err != nil { 13 break 14 } 15 16 sizeBuf := make([]byte, LEN_SIZE) 17 positionBuf := make([]byte, INDEX_SIZE) 18 19 if len(payload)+len(sizeBuf)+len(msgBytes)+len(sigBytes)+len(positionBuf) > int(s.espressoMaxTransactionSize) { 20 break 21 } 22 binary.BigEndian.PutUint64(sizeBuf, uint64(len(msgBytes))) 23 binary.BigEndian.PutUint64(positionBuf, uint64(p)) 24 25 // Add the submitted txn position and the size of the message along with the signature and message 26 payload = append(payload, positionBuf...) 27 payload = append(payload, sizeBuf...) 28 payload = append(payload, sigBytes...) 29 payload = append(payload, msgBytes...) 30 msgCnt += 1 31 } 32 return payload, msgCnt 33 34}
checkSubmittedTransactionForFinality
This function will validate that the payload is in the block using the response from the ‘FetchTransactionsInBlock‘ call. Additionally, we can improve the submission and resubmission process by storing a timestamp when we first attempted to submit the transaction to espresso. This would involve changing the SubmittedEspressoTx type to include a timestamp as such.
1type SubmittedEspressoTx struct { 2 Hash string 3 Pos []arbutil.MessageIndex 4 Payload []byte 5 SubmittedAt *time.Time 6}
When we poll for finality, we resubmit any transactions that haven’t seen finality
over a certain interval past the timestamp from when it was submitted. We
would then update this messages submitted timestamp in the database with
the timestamp created when we resubmit the transaction which results in
checkSubmittedTransactionForFinality
looking like this
1// Check if the latest submitted transaction has been finalized on L1 and verify it. 2// Return a bool indicating whether a new transaction can be submitted to HotShot 3func (s *TransactionStreamer) checkSubmittedTransactionForFinality(ctx context.Context) error { 4 submittedTxns, err := s.getEspressoSubmittedTxns() 5 resultTxns := submittedTxns 6 if err != nil { 7 return fmt.Errorf("submitted pos not found: %w", err) 8 } 9 if len(submittedTxns) == 0 { 10 return nil // no submitted transaction, treated as successful 11 } 12 for i, submittedTxn := range submittedTxns{ 13 hash := submittedTxn.Hash 14 15 submittedTxHash, err := tagged_base64.Parse(hash) 16 if err != nil || submittedTxHash == nil { 17 return fmt.Errorf("invalid hotshot tx hash, failed to parse hash %s: %w", hash, err) 18 } 19 data, err := s.checkEspressoQueryNodesForTransaction(ctx, submittedTxHash) 20 21 if err != nil { 22 //We haven’t send this txn in hotshot should resubmit if it’s past the delay threshhold 23 log.Warn("Unable to fetch transaction by hash, checking if we need to resubmit", "hash", 24 //If we are past the delay, resubmit the transaction in lower level function and return new espressoSubmittedTx that we put in the resubmitted list 25 resubmittedTxn := resubmitTransactionIfPastDelay(submittedTxn) 26 if resubmittedTxn != nil{ 27 //if we successfully resubmitted the tx update it in the list, if not we will retry on the next iteration 28 resubmittedTxns := append(resultTxns[:i], resubmittedTxn, resultTxns[i+1:]A 29 } 30 continue 31 } 32 height := data.BlockHeight 33 34 // Validate the transaction payload is in this block 35 resp, err := s.espressoClient.FetchTransactionsInBlock(ctx, height, s.chainConfig.ChainID.Uint64()) 36 if err != nil { 37 log.Warn("Failed to fetch transactions in block referenced in fetch transaction by hash", "height", height, "error", err) 38 continue 39 } 40 41 submittedPayload := submittedTxn.Payload 42 validated := validateIfPayloadIsInBlock(submittedPayload, resp.Transactions) 43 if !validated { 44 log.Warn("submitted payload was not in block that had this transaction, we should attempt to resubmit") 45 //I’m not sure if we need to resubmit here or not (In theory this case shouldn’t happen), but this case seems as though it won’t resolve naturally, so we should try to resubmit 46 resubmittedTxn := resubmitTransactionIfPastDelay(submittedTxn) 47 if resubmittedTxn != nil{ 48 //if we successfully resubmitted the tx update it in the list, if not we will retry on the next iteration 49 resubmittedTxns := append(resultTxns[:i], resubmittedTxn, resultTxns[i+1:]) 50 } 51 continue 52 } 53 //If this transaction is validated remove it from the result transactions 54 resultTxns := append(resultTxns[:i], resultTxns[i+1:]) 55 } 56 // We have checked all transactions for finality and resubmitted transactions that needed to be resubmitted. Update the db batch. 57 s.espressoTxnsStateInsertionMutex.Lock() 58 defer s.espressoTxnsStateInsertionMutex.Unlock() 59 60 batch := s.db.NewBatch() 61 if err := s.setEspressoSubmittedTxns(batch, submittedTxns[1:]); err != nil { 62 return fmt.Errorf("failed to set the espresso submitted txns: %w", err) 63 } 64 65 if err := batch.Write(); err != nil { 66 return fmt.Errorf("failed to write to db: %w", err) 67 } 68 69 return nil 70}
1func (s *TransactionStreamer) checkEspressoQueryNodesForTransaction(ctx context.Context, hash *types.TaggedBase64) error { 2 numNodesWithTransaction := 0 3 for queryUrl in s.queryUrls{ 4 data, err := s.espressoClient.FetchTransactionByHash(ctx, submittedTxHash) 5 numNodesWithTransaction += 1 6 } 7 if numNodesWithTransaction > len(s.queryUrls)/2{ 8 return data, nil 9 } else{ 10 reutrn nil, fmt.ErrorF("Wasn’t able to fetch transaction from the majority of query nodes") 11 }
This function requires we also add some state to the transaction streamer in the form of an array of query node urls. This can be read in from the config when the transaction streamer struct is built
This function will take in the previously submitted txn and resubmit it like so
1func (s *TransactionStreamer) resubmitTransactionIfPastDelay(ctx context.Context, submittedTxn SubmittedEspressoTx) SubmittedEspressoTx{ 2 timeSinceSubmission := time.Since(submittedTxn.SubmittedAt) 3 if timeSinceSubmission > s.resubmitEspressoTxDeadline{ 4 hash, err := s.espressoClient.SubmitTransaction(ctx, espressoTypes.Transaction{ 5 Payload: submittedTxn.Payload, 6 Namespace: submittedTxn.Namespace 7 }) 8 SubmittedAt := time.Now() 9 if err != nil{ 10 log.Warn("Failed to resubmit transaction with error", "error", err) 11 return nil 12 } 13 //If we were successful in resubmitting, we build the new submitted tx object and return it to the caller. 14 resubmittedTxn := SubmittedEspressoTx{ 15 Hash: hash.String(), 16 Pos: submittedTxn.Pos, 17 Payload: submittedTxn.Payload, 18 SubmittedAt: SubmittedAt 19 } 20 return resubmittedTxn 21 } 22}
The batch poster will use the Nitro Espresso streamer , which is analogous to the
upstream transaction streamaer, to read an ordered stream of messages from
Espresso. the task to relay messages from HotShot to L1 becomes a straightforward
adaptation of maybePostSequencerBatch
.
maybePostSequencerBatch will use the EspressoStreamer
instead of the
TransactionStreamer
In order to account for the batch cache being reset by any errors
in maybePostSequencerBatch, we will check if it is nil at the start of
maybePostSequencerBatch
. If it is, we will reset the Espresso streamer to the last
checkpoint, so that our state is constantly resyncing with L1.
1for { 2 msg, err := b.espressoStreamer.Next() 3 if err != nil { 4 return false, fmt.Errorf("error getting message from streamer: %w", err) 5 } 6 success, err := b.building.segments.AddMessage(msg) 7 if err != nil { 8 // Clear our cache 9 b.building = nil 10 return false, fmt.Errorf("error adding message to batch: %w", err) 11 } 12 if !success { 13 // this batch is full 14 if !config.WaitForMaxDelay { 15 forcePostBatch = true 16 } 17 b.building.haveUsefulMessage = true 18 if b.building.firstUsefulMsg == nil { 19 b.building.firstUsefulMsg = msg 20 } 21 break 22 } 23 if b.firstUsefulMsg is past max delay{ 24 //batch has message older than max delay, we should attempt to post. 25 break 26 } 27}
Where b.building.segments.AddMessage()
is the vanilla nitro function that adds
messages to the batch currently being built.
b.fetchBatchPositions
will be a function that queries the inbox trackers database to
see what the latest message position was, and filters logs emitted by the
sequencer inbox contract for events related to new hotshot heights it has
seen.
When the batch is closed, we will send it to the SequencerInbox.
To do so, we will use the crypto.Sign() function provided by the crypto package of geth. The digest provided to Sign() will be the result of calling keccak256() on the calldata and metadata provided to addSequencerL2Batch() In case of blobs, the Sign() function will result of calling keccak256() on the blobHashes and metadata provided to addSequencerL2BatchFromBlobs()
1 func(batch poster) SignBatch (bytes[] data) (signature, error){ 2 digest := keccak256(data) 3 4 signature := crypto.Sign(digest, b.attestedPrivateKey) 5 6 return signature 7 }
Along with the batch and signature, we also persist on L1 a checkpoint of the Espresso streamer state, allowing the batcher after a restart, or any other batcher running concurrently, to sync their own streamer with the state of the L1 after every batch post.
A checkpoint consists of the Nitro message position and an Espresso block number (specifically, the lowest block number associated with any message in the streamer’s buffer). The message position of the last message in the batch is already encoded in the batch and recorded by the Arbitrum Nitro sequencer inbox . We get the appropriate Espresso block number from the Espresso streamer.
The Espresso block number is prepended to the byte array containing the signature. This is to avoid having to change the ABI of the SequencerInbox.
It is possible that the Nitro Espresso streamer becomes out of sync with the state of the inbox contract on L1. That is, the message position of the streamer may differ from the last message sent to L1. In this case, the batcher needs to resync its streamer state to a snapshot consistent with what is on L1. Notably, this is necessary every time the batcher starts up, but can also occur at runtime due to various exceptional cases.
This is possible, for example, if
The L1 has a reorg causing a batch which was previously sent to L1 to be removed. In this case the state of the Nitro Espresso streamer may be ahead of what is actually on L1.
Another batch poster may post a batch that we haven’t gotten to yet. In this case the Nitro Espresso streamer may be behind L1.
The batcher may restart. In this case it loses the state of the Nitro Espresso streamer and needs to resync with L1. Note that handling this means we handle normal startup exactly the same way we handle various issues that come up during runtime, which is a nice property.
All of these cases are handled by resetting the Nitro Espresso streamer to a checkpoint which was posted to L1 along with the last successful batch posted. This ensures that the streamer is now in sync with L1, and we can continue batch posting as normal from there.
Finding this checkpoint requires finding the last time a batch was committed to
L1 via the Arbitrum Nitro sequencer inbox . This is done by scanning backwards
from the latest L1 block looking for an event emitted from the inbox contract with
the signature LastHotshotHeight(uint256)
. This event is emitted each time a new batch
is committed, and it gives us the Espresso block height to use in constructing the
Espresso streamer snapshot. The other piece of the snapshot is the Nitro message
position, which can be read from the contract storage at the L1 block number where
the event was emitted.
It is possible that no such event will be found, if the batcher is starting for the first time since the rollup enabled the Espresso integration. In this case the batcher config contains the Espresso streamer snapshot from the ”genesis” of the Espresso integration.