Requirements: Nitro Caff Node Requirements
Contents |
This page documents the technical design of the Caff node component of the Arbitrum Nitro integration . This node is used by intent bridges to listen to blocks that have been finalized by Hotshot. It uses Nitro Espresso streamer to fetch finalized messages and then runs them through the state transition function of Nitro to verify their validity.
The struct for CaffNode contains the following fields:
config: The configuration for the node.
executionEngine: Client responsible for writing blocks to the database.
espressoStreamer: Client responsible for fetching messages from the query nodes
caffNodeWatchDog: Client responsible for checking if a delayed message was posted alteast 20 hours ago and has the potential to be force included or there is some state on the l1 that does not match the state of the Caff node.
skippedBlockPos: Pointer to the last block that was skipped. Used to skip invalid blocks ( that produce an error during state transition)
L2SequencerClient: The L2 sequencer client.
Db: The node will maintain a database that it uses to stores the most recently considered hotshot block.
fromBlock: The block number to start fetching delayed messages from (by default it would be the rollup’s deployment block)
When the Start function of Caff Node is called, it starts a polling function to create blocks.
Start: This function starts the ‘EspressoStreamer‘
createBlock: This function runs the State Transition Function on the messages and writes them to the database.
1 func (n *CaffNode) Start(ctx context.Context) error { 2 n.StopWaiter.Start(ctx, n) 3 n.espressoStreamer.Start(ctx) 4 n.caffNodeWatchDog.Start(ctx) 5 6 // Start the Streamer at positions based on our nodes state 7 streamerStartBlockNum := n.executionEngine.Bc().CurrentBlock().Number.Uint64() + 1 8 streamerStartMessagePosition := n.executionEngine.BlockNumberToMessageIndex(streamerStartBlockNum) 9 nextHotshotBlock := n.GetNextHotshotBlockFromDb() 10 n.espressoStreamer.Reset(streamerStartMessagePosition, nextHotshotBlock) 11 12 % Blocking call, dont start building the blocks until delayed messages are all backfilled 13 n.delayedMessageFetcher.Start(ctx) 14 15 err := n.CallIterativelySafe(func(ctx context.Context) time.Duration { 16 madeBlock := n.createBlock() 17 if madeBlock { 18 return 0 19 } 20 return retryTime 21 }) 22 if err != nil { 23 return fmt.Errorf("failed to start node, error in createBlock: %w", err) 24 } 25 return nil 26 }
The Delayed Message Fetcher is responsible for retrieving delayed messages from L1 and storing them in the database. It first runs a backfill to fetch all the delayed messages from the user specified fromBlock (or, if unset, the rollup’s deployment block or the fromBlock saved in the database) to the current block within the safety tolerance window. Then it starts a websocket connection to subscribe to new block headers, determines which blocks fall within the configured safety tolerance, filters those blocks logs for the MessageDelivered event, and saves each message.
1 func (d *DelayedMessageFetcher) Start(ctx context.Context, delayedMessageCountInDb uint64) bool { 2 d.backFill() 3 // Once the backfil finishes we start the watch delayed messages 4 err = d.StartWatchDelayedMessages(ctx, matureBlock) 5 6 // Only return true if the backfill was successful and subscribed to new headers has started 7 return true 8 }
The role of the backfill is to fetch all the delayed messages from the user specified fromBlock (or, if unset, the rollup’s deployment block or the fromBlock saved in the database) to the current block within the safety tolerance window. At the end it updates the fromBlock so that the websocket subscription can start from that block to avoid missing any messages.
1 func (d *DelayedMessageFetcher) backFill() { 2 // Note this function already exists in the delayed message fetcher code: 3 // https://github.com/EspressoSystems/nitro-espresso-integration/blob/integration/arbnode/espresso_delayed_message_fetcher.go#L344 4 delayedMessageCountInDB := readDelayedMessageCount(db) 5 6 currentBlock := l1Client.GetCurrentBlock() 7 matureBlock := currentBlock - config.safetyTolerance 8 9 // Get the from block from the user/db and set that as the starting block to backfill 10 fromBlock := d.fromBlock 11 12 // Parse all the messages till the mature block 13 for fromBlock <= matureBlock { 14 15 if (matureBlock - fromBlock > config.MaxBlocksToFetch){ 16 getMessagesInRange(fromBlock, fromBlock + config.MaxBlocksToFetch) 17 d.fromBlock = fromBlock + config.MaxBlocksToFetch 18 } else { 19 getMessagesInRange(fromBlock, matureBlock) 20 d.fromBlock = matureBlock 21 } 22 } 23 % If we reach here this means the backfill was successful 24 saveToDB(fromBlock) 25 }
This will be a helper function that will fetch all the messages in the range from the fromBlock to the toBlock. It also parses the messages and saves them to the database.
1 func (d *DelayedMessageFetcher) getMessagesInRange(fromBlock, toBlock) { 2 // Note this function borrows a similar design to get messages as inbox l1Reader 3 // https://github.com/EspressoSystems/nitro-espresso-integration/blob/integration/arbnode/inbox_reader.go#L464-L474 4 5 sequencerBatches, err := r.sequencerInbox.LookupBatchesInRange(ctx, from, to) 6 logs, err := client.LookupMessagesInRange(ctx, ‘MessageDelivered‘, fromBlock: fromBlock, toBlock: toBlock, , func(batchNum uint64) ([]byte, error) { 7 if len(sequencerBatches) > 0 && batchNum >= sequencerBatches[0].SequenceNumber { 8 idx := batchNum - sequencerBatches[0].SequenceNumber 9 if idx < uint64(len(sequencerBatches)) { 10 return sequencerBatches[idx].Serialize(ctx, r.l1Reader.Client()) 11 } 12 log.Warn("missing mentioned batch in L1 message lookup", "batch", batchNum) 13 } 14 data, _, err := r.GetSequencerMessageBytes(ctx, batchNum) 15 return data, err 16 }) 17 for _, log := range logs { 18 % Save to the database only if its not already stored and 19 if delayedMessageCountInDb > log.delayedMessage.number { 20 skip 21 } else if delayedMessageCountInDb + 1 != log.delayedMessage.number { 22 % Crash the app something gone seriously wrong 23 } 24 saveToDB(parse(log)) 25 delayedMessageCountInDb++ 26 } 27 }
This function will start a websocket subscription to the new head event. It will then fetch all the delayed messages in the range from the fromBlock to the toBlock.
1 func (d *DelayedSequencer) StartWatchDelayedMessages(ctx context.Context, matureFromBlock uint64) error { 2 3 select headSub, _ := client.SubscribeNewHead(ctx, headsCh) { 4 case head := <-headsCh: 5 matureBlock := head.Number.Uint64() - safetyTolerance 6 fromBlockInDb := readFromBlock(db) 7 8 if (matureBlock - fromBlockInDb > config.MaxBlocksToFetch){ 9 getMessagesInRange(fromBlockInDb, fromBlockInDb + config.MaxBlocksToFetch) 10 d.fromBlock = fromBlockInDb + config.MaxBlocksToFetch 11 } else { 12 getMessagesInRange(fromBlockInDb, matureBlock) 13 d.fromBlock = matureBlock 14 } 15 16 // Save to db the from block 17 saveToDB(d.fromBlock) 18 } 19 }
The ‘createBlock‘ method creates a block.
It starts by retrieving the lastBlockHeader, which is the current header stored in the database and its corresponding state.
Fetches the next message from the queue using the Peek
function of the
Nitro Espresso streamer
Calls the STF ‘ProduceBlock‘, which is the same function that replay.wasm calls, but the sequencer runs it in native Go.
If STF returns no error, the function creates a block.
It appends the state of this block to the database.
Finally, if everything succeeded, it calls the Next
method of the Nitro
Espresso streamer to advance to the next message
This function needs to store a checkpoint of the espresso streamer state in its database before it attempts to add the produced block to the queue. In the event the process crashes before we successfully update the nodes state, the node will fetch the same message on startup, and then reprocess it so that it can be added to the nodes state. Otherwise, If it is successfully added the produced block to the state, and then the process crashes, on restart it will skip this message position with the espresso streamer.
1 func (n *CaffNode) createBlock() (returnValue bool) { 2 3 lastBlockHeader := n.executionEngine.Bc().CurrentBlock() 4 5 6 messageWithMetadataAndPos, err := n.NextMessage() 7 if err != nil || messageWithMetadataAndPos == nil { 8 return false 9 } 10 // Store the height of the last processed block 11 n.prevHotshotBlockNum = n.espressoStreamer.CheckpointEspressoHeight() 12 13 messageWithMetadata := messageWithMetadataAndPos.MessageWithMeta 14 15 // Get the state of the database at the last block 16 statedb, err := n.executionEngine.Bc().StateAt(lastBlockHeader.Root) 17 if err != nil { 18 return false 19 } 20 21 startTime := time.Now() 22 23 // Run the Produce block function in replay mode 24 // This is the core function that is used by replay.wasm to validate the block 25 block, receipts, err := arbos.ProduceBlock(messageWithMetadata.Message, 26 messageWithMetadata.DelayedMessagesRead, 27 lastBlockHeader, 28 statedb, 29 n.executionEngine.Bc(), 30 n.executionEngine.Bc().Config(), 31 false, 32 core.MessageReplayMode) 33 34 if err != nil { 35 return false 36 } 37 // Handle data storage before we alter the state of the node, just incase any errors happen and we need to retry. 38 err = n.storeHotshotBlockInDb(messageWithMetadata.HotshotHeight) 39 if err != nil{ 40 return false 41 } 42 43 blockCalcTime := time.Since(startTime) 44 err = n.executionEngine.appendBlock(block, statedb, receipts, blockCalcTime) 45 if err != nil { 46 return false 47 } 48 n.espressoStreamer.Next() 49 return true 50 }
Create block will use the NextMessage() method on the caff node to wrap the espresso streamer to fetch and filter delayed messages based on their finalization status. This can be configured by the node operator to be based on fully finalized messages, or messages with some number of confirmations. If the messages don’t meet the parameters, we will reset the espresso streamer and wait for a message to appear that does meet the parameters.
Next message will use a component DelayedMessageFetcher
to fetch delayed
messages when the messages posted to espresso increment the delayed message count.
The once we get the delayed message from the l1, we will be able to use a function
isDelayedMessageWithinSafetyTolerance)
to determine if we should produce the delayed
message at this time. If the message is not within the safety tolerance, the Caff
Node resets the streamer, such that it’s next message should be for the same
position, until the Caff Node determines the message is within the safety
tolerance.
1 func (n* CaffNode) nextMessage() (MessageWithMetadataAndPos, error){ 2 messageWithMetadataAndPos, err := n.espressoStreamer.Peek() 3 if err != nil { 4 return espressostreamer.MessageWithMetadataAndPos{}, err 5 } 6 // 0 is the position of the first and init delayed message, we want to skip this. 7 if n.delayedCount > 0 && messageWithMetadataAndPos.MessageWithMeta.DelayedMessagesRead == n.nextDelayedCount { 8 % Get the delayed message from the database, if not present retry again 9 message, err := n.delayedMessageFetcher.GetDelayedMessageFromDatabase(n.nextDelayedCount) 10 if err != nil { 11 n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 12 return nil, err 13 } 14 messageWithMetadataAndPos.MessageWithMeta.Message = message 15 isDelayedMessageWithinSafetyTolerance, err = isDelayedMessageWithinSafetyTolerance(messageWithMetadataAndPos) 16 if err != nil{ 17 log.Error("error fetching delayed message", "err", err) 18 n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 19 return nil, fmt.Errorf("Delayed current message not within safety tolerance, waiting till it is.") 20 } 21 if !isDelayedMessageWithinSafetyTolerance{ 22 n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 23 return nil, fmt.Errorf("Delayed current message not within safety tolerance, waiting till it is.") 24 } 25 n.nextDelayedCount++ 26 } 27 return messageWithMetadataAndPos, nil 28 29 }
isDelayedMessageWithinSafetyTolerance will take
a messageWithMetadataAndPos
, and based on configuration values, check that
the message is within the safety tolerance
First, it will calculate the safeBlockNumber If configured to wait for finalization, it will query the delayed bridge for the delayed message count at the finalized parent chain block number. If configured for a number of confirmations, it will query the delayed bridge for the delayed message count at (the latest parent chain block number - config.requiredBlockDepth)
Second, the function will check that the delayed count in the message passed in is below the remote delayed count for the specified block range. Additionally, in the case of confirmations, it checks that the delayed message is from a block number less than or equal to block with the required depth in the parent chain.
Only when these conditions are satisfied does the function return true. otherwise it returns false with no error if not in the safety tolerance, or false with an error if an inner function produces an error
This function will be used to publish a transaction from the user to the centralized sequencer. This will be handled by the transactionForwarder struct held by the Node when constructed without a sequencer, as the Caff node will be.