Requirements: Nitro Caff Node Requirements
Contents |
This page documents the technical design of the Caff node component of the Arbitrum Nitro integration . This node is used by intent bridges to listen to blocks that have been finalized by Hotshot. It uses Nitro Espresso streamer to fetch finalized messages and then runs them through the state transition function of Nitro to verify their validity.
The struct for CaffNode contains the following fields:
config: The configuration for the node.
executionEngine: Client responsible for writing blocks to the database.
espressoStreamer: Client responsible for fetching messages from the query nodes
caffNodeWatchDog: Client responsible for checking if a delayed message was posted alteast 20 hours ago and has the potential to be force included or there is some state on the l1 that does not match the state of the Caff node.
skippedBlockPos: Pointer to the last block that was skipped. Used to skip invalid blocks ( that produce an error during state transition)
L2SequencerClient: The L2 sequencer client.
Db: The node will maintain a database that it uses to stores the most recently considered hotshot block.
When the Start function of Caff Node is called, it starts a polling function to create blocks.
Start: This function starts the ‘EspressoStreamer‘
createBlock: This function runs the State Transition Function on the messages and writes them to the database.
1 func (n *CaffNode) Start(ctx context.Context) error { 2 n.StopWaiter.Start(ctx, n) 3 n.espressoStreamer.Start(ctx) 4 n.caffNodeWatchDog.Start(ctx) 5 6 // Start the Streamer at positions based on our nodes state 7 streamerStartBlockNum := n.executionEngine.Bc().CurrentBlock().Number.Uint64() + 1 8 streamerStartMessagePosition := n.executionEngine.BlockNumberToMessageIndex(streamerStartBlockNum) 9 nextHotshotBlock := n.GetNextHotshotBlockFromDb() 10 n.espressoStreamer.Reset(streamerStartMessagePosition, nextHotshotBlock) 11 12 err := n.CallIterativelySafe(func(ctx context.Context) time.Duration { 13 madeBlock := n.createBlock() 14 if madeBlock { 15 return 0 16 } 17 return retryTime 18 }) 19 if err != nil { 20 return fmt.Errorf("failed to start node, error in createBlock: %w", err) 21 } 22 return nil 23 }
The ‘createBlock‘ method creates a block.
It starts by retrieving the lastBlockHeader, which is the current header stored in the database and its corresponding state.
Fetches the next message from the queue using the Peek
function of the
Nitro Espresso streamer
Calls the STF ‘ProduceBlock‘, which is the same function that replay.wasm calls, but the sequencer runs it in native Go.
If STF returns no error, the function creates a block.
It appends the state of this block to the database.
Finally, if everything succeeded, it calls the Next
method of the Nitro
Espresso streamer to advance to the next message
This function needs to store a checkpoint of the espresso streamer state in its database before it attempts to add the produced block to the queue. In the event the process crashes before we successfully update the nodes state, the node will fetch the same message on startup, and then reprocess it so that it can be added to the nodes state. Otherwise, If it is successfully added the produced block to the state, and then the process crashes, on restart it will skip this message position with the espresso streamer.
1 func (n *CaffNode) createBlock() (returnValue bool) { 2 3 lastBlockHeader := n.executionEngine.Bc().CurrentBlock() 4 5 6 messageWithMetadataAndPos, err := n.NextMessage() 7 if err != nil || messageWithMetadataAndPos == nil { 8 return false 9 } 10 // Store the height of the last processed block 11 n.prevHotshotBlockNum = n.espressoStreamer.CheckpointEspressoHeight() 12 13 messageWithMetadata := messageWithMetadataAndPos.MessageWithMeta 14 15 // Get the state of the database at the last block 16 statedb, err := n.executionEngine.Bc().StateAt(lastBlockHeader.Root) 17 if err != nil { 18 return false 19 } 20 21 startTime := time.Now() 22 23 // Run the Produce block function in replay mode 24 // This is the core function that is used by replay.wasm to validate the block 25 block, receipts, err := arbos.ProduceBlock(messageWithMetadata.Message, 26 messageWithMetadata.DelayedMessagesRead, 27 lastBlockHeader, 28 statedb, 29 n.executionEngine.Bc(), 30 n.executionEngine.Bc().Config(), 31 false, 32 core.MessageReplayMode) 33 34 if err != nil { 35 return false 36 } 37 // Handle data storage before we alter the state of the node, just incase any errors happen and we need to retry. 38 err = n.storeHotshotBlockInDb(messageWithMetadata.HotshotHeight) 39 if err != nil{ 40 return false 41 } 42 43 blockCalcTime := time.Since(startTime) 44 err = n.executionEngine.appendBlock(block, statedb, receipts, blockCalcTime) 45 if err != nil { 46 return false 47 } 48 n.espressoStreamer.Next() 49 return true 50 }
Create block will use the NextMessage() method on the caff node to wrap the espresso streamer to fetch and filter delayed messages based on their finalization status. This can be configured by the node operator to be based on fully finalized messages, or messages with some number of confirmations. If the messages don’t meet the parameters, we will reset the espresso streamer and wait for a message to appear that does meet the parameters.
Next message will use a component DelayedMessageFetcher
to fetch delayed
messages when the messages posted to espresso increment the delayed message count.
The once we get the delayed message from the l1, we will be able to use a function
isDelayedMessageWithinSafetyTolerance)
to determine if we should produce the delayed
message at this time. If the message is not within the safety tolerance, the Caff
Node resets the streamer, such that it’s next message should be for the same
position, until the Caff Node determines the message is within the safety
tolerance.
1 func (n* CaffNode) nextMessage() (MessageWithMetadataAndPos, error){ 2 messageWithMetadataAndPos, err := n.espressoStreamer.Peek() 3 if err != nil { 4 return espressostreamer.MessageWithMetadataAndPos{}, err 5 } 6 // 0 is the position of the first and init delayed message, we want to skip this. 7 if n.delayedCount > 0 && messageWithMetadataAndPos.MessageWithMeta.DelayedMessagesRead == n.nextDelayedCount { 8 // If this is delayed message, we need to get the message from L1 9 // and replace the message in the messageWithMetadataAndPos 10 message, err := n.delayedMessageFetcher.getDelayedMessage(n.nextDelayedCount) 11 if err != nil { 12 n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 13 return nil, err 14 } 15 messageWithMetadataAndPos.MessageWithMeta.Message = message 16 isDelayedMessageWithinSafetyTolerance, err = isDelayedMessageWithinSafetyTolerance(messageWithMetadataAndPos) 17 if err != nil{ 18 log.Error("error fetching delayed message", "err", err) 19 n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 20 return nil, fmt.Errorf("Delayed current message not within safety tolerance, waiting till it is.") 21 } 22 if !isDelayedMessageWithinSafetyTolerance{ 23 n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 24 return nil, fmt.Errorf("Delayed current message not within safety tolerance, waiting till it is.") 25 } 26 n.nextDelayedCount++ 27 } 28 return messageWithMetadataAndPos, nil 29 30 }
isDelayedMessageWithinSafetyTolerance will take
a messageWithMetadataAndPos
, and based on configuration values, check that
the message is within the safety tolerance
First, it will calculate the safeBlockNumber If configured to wait for finalization, it will query the delayed bridge for the delayed message count at the finalized parent chain block number. If configured for a number of confirmations, it will query the delayed bridge for the delayed message count at (the latest parent chain block number - config.requiredBlockDepth)
Second, the function will check that the delayed count in the message passed in is below the remote delayed count for the specified block range. Additionally, in the case of confirmations, it checks that the delayed message is from a block number less than or equal to block with the required depth in the parent chain.
Only when these conditions are satisfied does the function return true. otherwise it returns false with no error if not in the safety tolerance, or false with an error if an inner function produces an error
This function will be used to publish a transaction from the user to the centralized sequencer. This will be handled by the transactionForwarder struct held by the Node when constructed without a sequencer, as the Caff node will be.