Chapter 22
Nitro Caffeinated Node

Requirements: Nitro Caff Node Requirements


Contents



22.1 Overview
22.2 Initialization
22.3 Starting Caff Node
22.4 Create Block
22.5 NextMessage()
22.6 Publish Transaction



22.1 # Overview

This page documents the technical design of the Caff node component of the Arbitrum Nitro integration . This node is used by intent bridges to listen to blocks that have been finalized by Hotshot. It uses Nitro Espresso streamer to fetch finalized messages and then runs them through the state transition function of Nitro to verify their validity.

22.2 # Initialization

The struct for CaffNode contains the following fields:

22.3 # Starting Caff Node

When the Start function of Caff Node is called, it starts a polling function to create blocks.

Listing 22.1: Start Method of CaffNode
1    func (n *CaffNode) Start(ctx context.Context) error { 
2        n.StopWaiter.Start(ctx, n) 
3        n.espressoStreamer.Start(ctx) 
4        n.caffNodeWatchDog.Start(ctx) 
5 
6        // Start the Streamer at positions based on our nodes state 
7        streamerStartBlockNum := n.executionEngine.Bc().CurrentBlock().Number.Uint64() + 1 
8        streamerStartMessagePosition := n.executionEngine.BlockNumberToMessageIndex(streamerStartBlockNum) 
9        nextHotshotBlock := n.GetNextHotshotBlockFromDb() 
10        n.espressoStreamer.Reset(streamerStartMessagePosition, nextHotshotBlock) 
11 
12        err := n.CallIterativelySafe(func(ctx context.Context) time.Duration { 
13            madeBlock := n.createBlock() 
14            if madeBlock { 
15                return 0 
16            } 
17            return retryTime 
18        }) 
19        if err != nil { 
20            return fmt.Errorf("failed to start node, error in createBlock: %w", err) 
21        } 
22        return nil 
23    }

22.4 # Create Block

The ‘createBlock‘ method creates a block.

This function needs to store a checkpoint of the espresso streamer state in its database before it attempts to add the produced block to the queue. In the event the process crashes before we successfully update the nodes state, the node will fetch the same message on startup, and then reprocess it so that it can be added to the nodes state. Otherwise, If it is successfully added the produced block to the state, and then the process crashes, on restart it will skip this message position with the espresso streamer.

Listing 22.2: createBlock Method
1    func (n *CaffNode) createBlock() (returnValue bool) { 
2 
3        lastBlockHeader := n.executionEngine.Bc().CurrentBlock() 
4 
5 
6        messageWithMetadataAndPos, err := n.NextMessage() 
7        if err != nil || messageWithMetadataAndPos == nil { 
8            return false 
9        } 
10        // Store the height of the last processed block 
11        n.prevHotshotBlockNum = n.espressoStreamer.CheckpointEspressoHeight() 
12 
13        messageWithMetadata := messageWithMetadataAndPos.MessageWithMeta 
14 
15        // Get the state of the database at the last block 
16        statedb, err := n.executionEngine.Bc().StateAt(lastBlockHeader.Root) 
17        if err != nil { 
18            return false 
19        } 
20 
21        startTime := time.Now() 
22 
23        // Run the Produce block function in replay mode 
24        // This is the core function that is used by replay.wasm to validate the block 
25        block, receipts, err := arbos.ProduceBlock(messageWithMetadata.Message, 
26            messageWithMetadata.DelayedMessagesRead, 
27            lastBlockHeader, 
28            statedb, 
29            n.executionEngine.Bc(), 
30            n.executionEngine.Bc().Config(), 
31            false, 
32            core.MessageReplayMode) 
33 
34        if err != nil { 
35            return false 
36        } 
37        // Handle data storage before we alter the state of the node, just incase any errors happen and we need to retry. 
38        err = n.storeHotshotBlockInDb(messageWithMetadata.HotshotHeight) 
39        if err != nil{ 
40            return false 
41        } 
42 
43        blockCalcTime := time.Since(startTime) 
44        err = n.executionEngine.appendBlock(block, statedb, receipts, blockCalcTime) 
45        if err != nil { 
46            return false 
47        } 
48        n.espressoStreamer.Next() 
49        return true 
50    }

Create block will use the NextMessage() method on the caff node to wrap the espresso streamer to fetch and filter delayed messages based on their finalization status. This can be configured by the node operator to be based on fully finalized messages, or messages with some number of confirmations. If the messages don’t meet the parameters, we will reset the espresso streamer and wait for a message to appear that does meet the parameters.

22.5 # NextMessage()

Next message will use a component DelayedMessageFetcher to fetch delayed messages when the messages posted to espresso increment the delayed message count. The once we get the delayed message from the l1, we will be able to use a function isDelayedMessageWithinSafetyTolerance) to determine if we should produce the delayed message at this time. If the message is not within the safety tolerance, the Caff Node resets the streamer, such that it’s next message should be for the same position, until the Caff Node determines the message is within the safety tolerance.

Listing 22.3: NextMessage() Method
1    func (n* CaffNode) nextMessage() (MessageWithMetadataAndPos, error){ 
2        messageWithMetadataAndPos, err := n.espressoStreamer.Peek() 
3        if err != nil { 
4            return espressostreamer.MessageWithMetadataAndPos{}, err 
5        } 
6        // 0 is the position of the first and init delayed message, we want to skip this. 
7        if n.delayedCount > 0 && messageWithMetadataAndPos.MessageWithMeta.DelayedMessagesRead == n.nextDelayedCount { 
8            // If this is delayed message, we need to get the message from L1 
9            // and replace the message in the messageWithMetadataAndPos 
10            message, err := n.delayedMessageFetcher.getDelayedMessage(n.nextDelayedCount) 
11            if err != nil { 
12                n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 
13                return nil, err 
14            } 
15            messageWithMetadataAndPos.MessageWithMeta.Message = message 
16            isDelayedMessageWithinSafetyTolerance, err = isDelayedMessageWithinSafetyTolerance(messageWithMetadataAndPos) 
17            if err != nil{ 
18                log.Error("error fetching delayed message", "err", err) 
19                n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 
20                return nil, fmt.Errorf("Delayed current message not within safety tolerance, waiting till it is.") 
21            } 
22            if !isDelayedMessageWithinSafetyTolerance{ 
23                n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 
24                return nil, fmt.Errorf("Delayed current message not within safety tolerance, waiting till it is.") 
25            } 
26            n.nextDelayedCount++ 
27        } 
28        return messageWithMetadataAndPos, nil 
29 
30    }

Only when these conditions are satisfied does the function return true. otherwise it returns false with no error if not in the safety tolerance, or false with an error if an inner function produces an error

22.6 # Publish Transaction

This function will be used to publish a transaction from the user to the centralized sequencer. This will be handled by the transactionForwarder struct held by the Node when constructed without a sequencer, as the Caff node will be.