Chapter 25
Nitro Caffeinated Node

Requirements: Nitro Caff Node Requirements


Contents



25.1 Overview
25.2 Initialization
25.3 Starting Caff Node
25.4 Delayed Message Fetcher
25.5 Backfill
25.6 Get Messages In Range
25.7 Start Watch Delayed Messages
25.8 Create Block
25.9 NextMessage()
25.10 Publish Transaction



25.1 # Overview

This page documents the technical design of the Caff node component of the Arbitrum Nitro integration . This node is used by intent bridges to listen to blocks that have been finalized by Hotshot. It uses Nitro Espresso streamer to fetch finalized messages and then runs them through the state transition function of Nitro to verify their validity.

25.2 # Initialization

The struct for CaffNode contains the following fields:

25.3 # Starting Caff Node

When the Start function of Caff Node is called, it starts a polling function to create blocks.

Listing 25.1: Start Method of CaffNode
1    func (n *CaffNode) Start(ctx context.Context) error { 
2        n.StopWaiter.Start(ctx, n) 
3        n.espressoStreamer.Start(ctx) 
4        n.caffNodeWatchDog.Start(ctx) 
5 
6        // Start the Streamer at positions based on our nodes state 
7        streamerStartBlockNum := n.executionEngine.Bc().CurrentBlock().Number.Uint64() + 1 
8        streamerStartMessagePosition := n.executionEngine.BlockNumberToMessageIndex(streamerStartBlockNum) 
9        nextHotshotBlock := n.GetNextHotshotBlockFromDb() 
10        n.espressoStreamer.Reset(streamerStartMessagePosition, nextHotshotBlock) 
11 
12        % Blocking call, dont start building the blocks until delayed messages are all backfilled 
13         n.delayedMessageFetcher.Start(ctx) 
14 
15        err := n.CallIterativelySafe(func(ctx context.Context) time.Duration { 
16            madeBlock := n.createBlock() 
17            if madeBlock { 
18                return 0 
19            } 
20            return retryTime 
21        }) 
22        if err != nil { 
23            return fmt.Errorf("failed to start node, error in createBlock: %w", err) 
24        } 
25        return nil 
26    }

25.4 # Delayed Message Fetcher

The Delayed Message Fetcher is responsible for retrieving delayed messages from L1 and storing them in the database. It first runs a backfill to fetch all the delayed messages from the user specified fromBlock (or, if unset, the rollup’s deployment block or the fromBlock saved in the database) to the current block within the safety tolerance window. Then it starts a websocket connection to subscribe to new block headers, determines which blocks fall within the configured safety tolerance, filters those blocks logs for the MessageDelivered event, and saves each message.

Listing 25.2: Delayed Message Fetcher
1    func (d *DelayedMessageFetcher) Start(ctx context.Context, delayedMessageCountInDb uint64) bool { 
2        d.backFill() 
3                // Once the backfil finishes we start the watch delayed messages 
4        err = d.StartWatchDelayedMessages(ctx, matureBlock) 
5 
6                // Only return true if the backfill was successful and subscribed to new headers has started 
7        return true 
8    }

25.5 # Backfill

The role of the backfill is to fetch all the delayed messages from the user specified fromBlock (or, if unset, the rollup’s deployment block or the fromBlock saved in the database) to the current block within the safety tolerance window. At the end it updates the fromBlock so that the websocket subscription can start from that block to avoid missing any messages.

Listing 25.3: backfil
1    func (d *DelayedMessageFetcher) backFill() { 
2        // Note this function already exists in the delayed message fetcher code: 
3        // https://github.com/EspressoSystems/nitro-espresso-integration/blob/integration/arbnode/espresso_delayed_message_fetcher.go#L344 
4        delayedMessageCountInDB := readDelayedMessageCount(db) 
5 
6        currentBlock := l1Client.GetCurrentBlock() 
7        matureBlock := currentBlock - config.safetyTolerance 
8 
9        // Get the from block from the user/db and set that as the starting block to backfill 
10        fromBlock := d.fromBlock 
11 
12        // Parse all the messages till the mature block 
13        for fromBlock <= matureBlock { 
14 
15            if (matureBlock - fromBlock > config.MaxBlocksToFetch){ 
16                getMessagesInRange(fromBlock, fromBlock + config.MaxBlocksToFetch) 
17                d.fromBlock = fromBlock + config.MaxBlocksToFetch 
18            } else { 
19                getMessagesInRange(fromBlock, matureBlock) 
20                d.fromBlock = matureBlock 
21            } 
22        } 
23        % If we reach here this means the backfill was successful 
24        saveToDB(fromBlock) 
25    }

25.6 # Get Messages In Range

This will be a helper function that will fetch all the messages in the range from the fromBlock to the toBlock. It also parses the messages and saves them to the database.

Listing 25.4: getMessagesInRange
1    func (d *DelayedMessageFetcher) getMessagesInRange(fromBlock, toBlock) { 
2        // Note this function borrows a similar design to get messages as inbox l1Reader 
3        // https://github.com/EspressoSystems/nitro-espresso-integration/blob/integration/arbnode/inbox_reader.go#L464-L474 
4 
5        sequencerBatches, err := r.sequencerInbox.LookupBatchesInRange(ctx, from, to) 
6        logs, err := client.LookupMessagesInRange(ctx, MessageDelivered, fromBlock: fromBlock, toBlock: toBlock, , func(batchNum uint64) ([]byte, error) { 
7            if len(sequencerBatches) > 0 && batchNum >= sequencerBatches[0].SequenceNumber { 
8                idx := batchNum - sequencerBatches[0].SequenceNumber 
9                if idx < uint64(len(sequencerBatches)) { 
10                    return sequencerBatches[idx].Serialize(ctx, r.l1Reader.Client()) 
11                } 
12                log.Warn("missing mentioned batch in L1 message lookup", "batch", batchNum) 
13            } 
14            data, _, err := r.GetSequencerMessageBytes(ctx, batchNum) 
15            return data, err 
16        }) 
17        for _, log := range logs { 
18            % Save to the database only if its not already stored and 
19            if delayedMessageCountInDb > log.delayedMessage.number { 
20                skip 
21            } else if delayedMessageCountInDb + 1 != log.delayedMessage.number { 
22                % Crash the app something gone seriously wrong 
23            } 
24            saveToDB(parse(log)) 
25            delayedMessageCountInDb++ 
26        } 
27    }

25.7 # Start Watch Delayed Messages

This function will start a websocket subscription to the new head event. It will then fetch all the delayed messages in the range from the fromBlock to the toBlock.

Listing 25.5: StartWatchDelayedMessages
1    func (d *DelayedSequencer) StartWatchDelayedMessages(ctx context.Context, matureFromBlock uint64) error { 
2 
3            select headSub, _ := client.SubscribeNewHead(ctx, headsCh) { 
4                case head := <-headsCh: 
5                    matureBlock := head.Number.Uint64() - safetyTolerance 
6                    fromBlockInDb := readFromBlock(db) 
7 
8                    if (matureBlock - fromBlockInDb > config.MaxBlocksToFetch){ 
9                        getMessagesInRange(fromBlockInDb, fromBlockInDb + config.MaxBlocksToFetch) 
10                        d.fromBlock = fromBlockInDb + config.MaxBlocksToFetch 
11                    } else { 
12                        getMessagesInRange(fromBlockInDb, matureBlock) 
13                        d.fromBlock = matureBlock 
14                    } 
15 
16                    // Save to db the from block 
17                    saveToDB(d.fromBlock) 
18            } 
19        }

25.8 # Create Block

The ‘createBlock‘ method creates a block.

This function needs to store a checkpoint of the espresso streamer state in its database before it attempts to add the produced block to the queue. In the event the process crashes before we successfully update the nodes state, the node will fetch the same message on startup, and then reprocess it so that it can be added to the nodes state. Otherwise, If it is successfully added the produced block to the state, and then the process crashes, on restart it will skip this message position with the espresso streamer.

Listing 25.6: createBlock Method
1    func (n *CaffNode) createBlock() (returnValue bool) { 
2 
3        lastBlockHeader := n.executionEngine.Bc().CurrentBlock() 
4 
5 
6        messageWithMetadataAndPos, err := n.NextMessage() 
7        if err != nil || messageWithMetadataAndPos == nil { 
8            return false 
9        } 
10        // Store the height of the last processed block 
11        n.prevHotshotBlockNum = n.espressoStreamer.CheckpointEspressoHeight() 
12 
13        messageWithMetadata := messageWithMetadataAndPos.MessageWithMeta 
14 
15        // Get the state of the database at the last block 
16        statedb, err := n.executionEngine.Bc().StateAt(lastBlockHeader.Root) 
17        if err != nil { 
18            return false 
19        } 
20 
21        startTime := time.Now() 
22 
23        // Run the Produce block function in replay mode 
24        // This is the core function that is used by replay.wasm to validate the block 
25        block, receipts, err := arbos.ProduceBlock(messageWithMetadata.Message, 
26            messageWithMetadata.DelayedMessagesRead, 
27            lastBlockHeader, 
28            statedb, 
29            n.executionEngine.Bc(), 
30            n.executionEngine.Bc().Config(), 
31            false, 
32            core.MessageReplayMode) 
33 
34        if err != nil { 
35            return false 
36        } 
37        // Handle data storage before we alter the state of the node, just incase any errors happen and we need to retry. 
38        err = n.storeHotshotBlockInDb(messageWithMetadata.HotshotHeight) 
39        if err != nil{ 
40            return false 
41        } 
42 
43        blockCalcTime := time.Since(startTime) 
44        err = n.executionEngine.appendBlock(block, statedb, receipts, blockCalcTime) 
45        if err != nil { 
46            return false 
47        } 
48        n.espressoStreamer.Next() 
49        return true 
50    }

Create block will use the NextMessage() method on the caff node to wrap the espresso streamer to fetch and filter delayed messages based on their finalization status. This can be configured by the node operator to be based on fully finalized messages, or messages with some number of confirmations. If the messages don’t meet the parameters, we will reset the espresso streamer and wait for a message to appear that does meet the parameters.

25.9 # NextMessage()

Next message will use a component DelayedMessageFetcher to fetch delayed messages when the messages posted to espresso increment the delayed message count. The once we get the delayed message from the l1, we will be able to use a function isDelayedMessageWithinSafetyTolerance) to determine if we should produce the delayed message at this time. If the message is not within the safety tolerance, the Caff Node resets the streamer, such that it’s next message should be for the same position, until the Caff Node determines the message is within the safety tolerance.

Listing 25.7: NextMessage() Method
1    func (n* CaffNode) nextMessage() (MessageWithMetadataAndPos, error){ 
2        messageWithMetadataAndPos, err := n.espressoStreamer.Peek() 
3        if err != nil { 
4            return espressostreamer.MessageWithMetadataAndPos{}, err 
5        } 
6        // 0 is the position of the first and init delayed message, we want to skip this. 
7        if n.delayedCount > 0 && messageWithMetadataAndPos.MessageWithMeta.DelayedMessagesRead == n.nextDelayedCount { 
8            % Get the delayed message from the database, if not present retry again 
9            message, err := n.delayedMessageFetcher.GetDelayedMessageFromDatabase(n.nextDelayedCount) 
10            if err != nil { 
11                n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 
12                return nil, err 
13            } 
14            messageWithMetadataAndPos.MessageWithMeta.Message = message 
15            isDelayedMessageWithinSafetyTolerance, err = isDelayedMessageWithinSafetyTolerance(messageWithMetadataAndPos) 
16            if err != nil{ 
17                log.Error("error fetching delayed message", "err", err) 
18                n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 
19                return nil, fmt.Errorf("Delayed current message not within safety tolerance, waiting till it is.") 
20            } 
21            if !isDelayedMessageWithinSafetyTolerance{ 
22                n.espressoStreamer.Reset(messageWithMetadataAndPos.Pos, MessageWithMetadataAndPos.HotshotHeight) 
23                return nil, fmt.Errorf("Delayed current message not within safety tolerance, waiting till it is.") 
24            } 
25            n.nextDelayedCount++ 
26        } 
27        return messageWithMetadataAndPos, nil 
28 
29    }

Only when these conditions are satisfied does the function return true. otherwise it returns false with no error if not in the safety tolerance, or false with an error if an inner function produces an error

25.10 # Publish Transaction

This function will be used to publish a transaction from the user to the centralized sequencer. This will be handled by the transactionForwarder struct held by the Node when constructed without a sequencer, as the Caff node will be.