Requirements: Query Service Requirements
Contents |
The public interface to Espresso, on which all applications and integrations are built, is a set of HTTP (and WebSockets) APIs which allow observers to access the state and history of the Espresso network. These APIs cover a broad range of use cases including rollups sending data to Espresso for finalization, applications reading confirmations from Espresso for low-latency user experiences, and even operators of Espresso nodes monitoring the health and performance of the system. These APIs can be provided, in various combinations, by any node running the Espresso protocol, and in some cases by nodes that are not participating in the protocol but are gathering data from other nodes running these services. Collectively, we call these APIs the query service.
The query service is divided into a collection of API modules. These modules can be toggled on and off (mostly) independently, depending on the resources available to the node and the use cases it aims to support.
Provides consistent and reliable access to objects that have been agreed upon by consensus (e.g. decided blocks and leaves). Strives for strongly consistent and deterministic semantics.
Complements the availability API by providing insight into objects and statistics known to a particular node, not necessarily agreed upon by consensus. Eventually consistent semantics.
Provides metrics useful to monitor the performance of consensus within an individual node.
Provide storage and access for various pieces of consensus state which take the form of Merkle trees .
Serves as the backend for the Espresso block explorer.
As the lowest abstraction layer of the query service, the schema does not follow the high-level separation into various modules of the APIs. Instead, there is one schema which is shared by all API modules, which ensures deduplication of data and the best possible efficiencies for queries. Still, the schema can be conceptually broken into various independent collections of tables.
The core of the schema is a collection of consensus artifacts tables, which store the raw outputs of consensus, like blocks and leaves.
Aggregated statistics computed from these artifacts are stored in separate tables.
The confirmation layer state is stored in a collection of tables which are optimized for efficiently storing and querying Merkle trees in a relational database setting.
There are a number of singleton tables which store the peristent state of the query service itself, used to implement various higher level operations rather than being directly surfaced to clients.
This section describes the storage layout, possible queries, and indexing of each. However, a full specification of the schema is beyond the scope of the Wiki; the set of database migrations in the code repository is the authoritative spec of the schema. This section only describes the salient points that are relevant to understanding the rest of the design of the query service.
Consensus artifacts make up the core of the query service. This includes:
Leaves
Headers
Blocks
Payloads
Transactions
Of these, only leaves, payloads, VID artifacts, transactions, and state certificates have their own dedicated storage. Headers live inside of leaves, while blocks are composed of a leaf and a payload, so while they are thought of as their own distinct object types at the higher levels of abstraction, they are not represented explicitly in the schema.
Tech Debt:
This is not true. The current implementation does have a separate header table, for no reason. We could likely save a lot of storage by consolidating the header and leaf tables.
There is a separate table for each object type, with the exception of VID common and VID shares, which are stored together in a VID table. This gives us the following consensus artifacts tables:
header
leaf
payload
vid
transactions
finalized_state_cert
All of these tables represent complete consensus objects (be they headers,
payloads, etc.) except the transactions
table, which carries metadata only, since the
full contents of each transaction are already represented in the payload
table. Thus,
loading a transaction means loading the entire payload plus the metadata in the
corresponding row of the transactions
table, which tells you where in the payload to
find the payload of the transaction.
All of the tables are indexed by block height, except finalized_state_cert
, which is
indexed by epoch number, as state certificates are produced only once per epoch. For
everything else, the block height columns all have a foreign key relation to the height
column of the header table. This foreign key relation is indicative of an important
invariant. For a given block height, some of these tables may have a complete row
while others are missing certain fields within that row and others are missing the row
entirely:
A row in the payload
table exists if and only if the corresponding leaf
and
header
rows exist, although the payload
row may be a placeholder with NULL
for the data
.
A transaction is present if and only if the full payload for the corresponding
block is also present (payload.data
is not NULL
).
If a row exists in the vid
table, the corresponding row exists in the header
and leaf
tables. The share
in the vid
row may be NULL
, but VID common
will never be.
The reason for the asymmetry (where missing payloads are represented with a
NULL
placeholder column, while the VID table is allowed to have entirely missing
rows) is mostly historical. An equivalent design is possible where payload entries are
missing outright until the payload is available in full. The important thing to keep in
mind is that if we have a payload, VID common, or VID share, we must also have the
corresponding leaf and header, since these contain information required to interpret
the payload and VID.
Often, not every field of a consensus object needs to be directly accessed or queried by the query service. We generally only care about fields that index the data in some way, like heights and hashes. Other fields, like payload data, are relevant only in that we must return them when the client asks to download a complete object.
To keep the schema simple, a pattern that is employed across all of these tables is
to represent only the fields we actually need (e.g. fields that appear in WHERE
or ORDER BY
clauses) as actual columns in the schema, and to store the complete object in a single
column as a serialized blob. Depending on the object, this blob uses either the JSONB
data type (serialized using serde_json
) or the BYTEA
data type (serialized using
bincode
). The former has the advantage that we can query individual fields of the
blob using JSON queries if we need to. The latter has the advantage of being more
compact on disk.
Table | Explicit Columns | Blob Type |
Leaf | height, hash, block hash | JSON |
Header | height, hash, payload hash, timestamp | JSON |
Payload | height, size, transaction count | binary |
VID | height | binary (for both common and shares) |
Transaction | height, index | N/A |
State Certificate | epoch | binary |
All objects are indexed by block height, except state certificates which are indexed by epoch number. This index also serves as the primary key for all except transaction. Transactions are special because they are uniquely identified neither by height (because a single block can have multiple transactions) nor by hash (because the same transaction can be confirmed multiple times in different blocks). Instead, transactions have a combined primary key on block height and index within the block. The index is stored as a string to easily support complex indexing types (like a pair of namespace ID and offset within namespace) where the query service itself only cares about equality comparisons.
As a secondary indexing mechanism, we have three types of hash:
header
and leaf
are explicitly indexed by
block hash. Other objects can be indexed this way by joining with one of
these tables on block height.
header
is explicitly indexed by payload hash. Other objects can
be indexed this way by joining with header
on block height.
Finally, header
is also indexed by timestamp, which allows querying ranges of
consensus objects by real world time in addition to block height.
The table and index structure described above allows for a very general and efficient query mechanism over consensus objects. The primary classes of queries supported and utilized by the query service are:
Selecting a single object by unique index (block height, or some cases where hash is unique)
Selecting a single object by non-unique hash. In this case the first object (by block height) is returned in case of duplicates.
Selecting a range of consecutive objects by block height.
Selecting a range of consecutive objects where the bounds are defined by timestamp or a combination of timestamp for one bound and block height for the other.
All of these queries are straightforward to implement with combinations of WHERE
,
ORDER BY
, and JOIN
clauses on the various indexed columns. Leaves, headers, and state
certificates are stored in self-contained tables with all relevant metadata, and thus
can be selected directly without joining.
Queries for all other consensus objects start by selecting from the header
table,
which enables flexible indexing on all the metadata columns of that table, like block
height and various hashes. The header results can then be joined with the table of
interest on the block height key.
The only query that is more complex than this is the query to load an individual
transaction, which joins header
to transactions
and payload
. In the results, the index of
the transaction within the payload can be extracted from the transactions
table and
used to index into the data from the payload
table to recover the individual
transaction. This is not optimally efficient as it involves loading the entire payload
into memory just to extract a single transaction. However, queries for individual
transactions are not common and are not on the critical path of any high-throughput
integration. Robust integrations download whole payloads or namespaces at once.
For this reason, it is more important to have the entire payload stored in
a convenient way, and the functionality of extracting a single transaction
from that payload becomes just an additional convenience provided by the
server.
There are some object types which are derived from other primitive consensus artifacts, and are not stored directly in the database, but which present a query interface at the database layer as if they were stored in the database. These are:
provides payload metadata (height and hashes, plus size and transaction count) but does not include the full payload data
provides VID common metadata (height and hashes) but does not include the actual VID common data
The advantage of these pseudo types is that they are considerably smaller than their full counterparts, and can be loaded from the database with significantly less I/O. For example, they are used by the scanner task to check if an object exists in the database (or trigger a fetch if not) without unnecessarily loading the whole object.
These pseudo objects have their own lighter weight queries, that look just like the queries for the corresponding ”full” objects, with the full payload blob excluded from the select list. However, they do not have their own fetching or storage statements. If a request for a pseudo object triggers a fetch, the full object will be fetched and stored, implicitly making the pseudo object available as well.
We aggregate transaction counts and total payload bytes for efficient queries over
various ranges of history. These are stored in a separate aggregate
table. This table
stores one row for each position in the chain, representing the total value of each
statistic computed from genesis to the block corresponding to that row,
inclusive. The primary key is block height, which references the header
table as
a foreign key. Each row has cumulative num_transactions
and payload_size
columns.
This layout permits very flexible queries:
We can get the total count of transactions or total number of bytes committed over all time simply by querying the last row in this table.
We can get the totals up to a given block by looking at the corresponding row.
We can get the number of transactions or number of bytes committed in a specified block range by reading the statistics from the row corresponding to the last block in the range, and subtracting the statistics from the row just prior to the first block in the range.
This table is populated asynchronously as new blocks are received, by the aggregator task.
TODO:
Document Merklized state storage
The query service has its own state, which is not surfaced to users, but is used by the various asynchronous streaming and fetching tasks. We want this state to persist so the tasks can resume properly after the query service restarts. Thus, some tasks store state in singleton tables.
A singleton table stores a single value which is updated by the query service as
state changes. Each singleton table has an id
column which is used as a primary key,
and only has one valid value (usually 0). This allows upserts to be performed to
maintain a single value at any given time in the other column of the table, which
holds the state of interest.
The singleton internal state tables are:
pruned_height
: stores last_height
, the height of the highest block deleted
by the Pruning task, which allows the fetcher to avoid fetching already
pruned data.
last_merklized_state_height
: stores the height
of the highest available
confirmation layer state populated by the state update task. Allows state
queries to fail gracefully when asked for a state that is unavailable.
TODO:
Document SQL migrations, differences between Postgres and SQLite, and ad hoc migrations (e.g. Leaf2)
The query service has several tasks which run asynchronously to pull in new data from various sources and populate the corresponding tables. The primary one of these populates Consensus Artifacts by listening to a HotShot event stream for newly decided objects. The rest of the tasks derive additional data from these core artifacts, such as computing the confirmation layer state in the state update task and calculating aggregate statistics in the aggregator.
The aggregator task is responsible for populating aggregate statistics, which are derived from a sequence of blocks populated by the Consensus Artifacts update task or the fetcher. For each block, it computes the sum of transactions and payload bytes confirmed by that block and all blocks under it.
This necessarily means that the aggregator task cannot do its job for a given block until it has done its job for all previous blocks. Thus, the aggregator task looks like a loop that starts from genesis (or from wherever it last left off) and consumes blocks in sequence. The fetching stream automatically takes care of blocking until the next block in the sequence is available, which makes the aggregator’s job simple.
Throughout its loop, the aggregator maintains the following state in memory, corresponding to the previous set of aggregated statistics:
prev.num_transactions
prev.payload_size
These are updated each version of the loop so they always correspond to the last block processed by the aggregator. This reduces the overhead of running the task, since the processing of each new block does not need to load the previous statistics from the database for the parent block; it already has them in memory.
prev
by selecting from the
database the row in the aggregate
table with the highest height
. It also finds
the height of the next block to process, start
, which is one more than the
height in this row. If there are no rows in the aggregate
table, it starts from
a genesis state where start
, prev.num_transactions
and prev.payload_size
are
all 0.
PayloadMetadata
objects starting from
height start
. PayloadMetadata
is a pseudo consensus object which includes
only the metadata about a payload (e.g. its size and transaction count).
Since the aggregator task doesn’t need the full payload, this makes it more
efficient when loading an existing payload from the database.
ready_chunks
, which transforms a stream to yield fixed-size chunks when
many objects are already buffered, but to yield smaller chunks (potentially
containing only a single item) when few objects are available and waiting
for new objects would mean blocking. This method strikes a good balance
between performance (when the aggregator task has to process many
blocks at once to work through a backlog) and low latency (when the
aggregator is processing blocks as they are produced by consensus, which
happens relatively slowly, so that the overhead of the aggregator task is
negligible anyways).
prev
, updating prev
so it always corresponds to the previous
block.
aggregate
table in a single upsert transaction.The asynchronous nature of this task means that it may lag behind the other APIs. Consensus artifacts may be available up to some height h while the aggregated statistics reported include only blocks up to some height less than h, because the aggregator task is still syncing, or it is blocked waiting for some missing object. For this reason, aggregate statistics are exposed via the node API (which is eventually consistent) rather than the availability API.
Still, these discrepancies should be very rare, occurring only when some old data is missing for unusually long or shortly after the node has just been started. In the common case, the aggregator task keeps up with the Consensus Artifacts task and processes each new block as a singleton chunk immediately after it is produced by consensus.
For monitoring the progress of the aggregator task in syncing with the blockchain,
a metric called aggregator_height
is exposed by the status API. This metric is
updated by the aggregator task itself every time it finishes processing a
chunk.