If you've used Symbol for any length of time or poked around its configuration files, you've probably come across the term "extension". What is an extension and what can you do with them? 🤔 This is the first part in a series of articles to help you understand extensions and eventually build your own.
Catapult was built from the ground up with extensibility in mind. Catapult is basically just an application shell that loads plugins and extensions, hydrates their services and lets them run. It does slightly more than that - for example, preparing the data directory and managing the loading of saved data on boot - but not much. Nearly all of its functionality resides in its plugins and extensions.
Plugins customize blockchain consensus. They can add transaction types, state changes and custom validation rules. All nodes in a network must load the same plugins with the same configuration.
Extensions customize client functionality. They can do anything except change consensus rules. Nodes in a network may load different extensions and/or the same extensions with different configurations. In fact, the only difference between the different types of nodes - Peer, API, Dual - is the set of extensions they load.
How does catapult know which extensions to load? 🤔 It's all through configuration. Extension configuration files are INI files with the name of the extension as the key and a boolean (true/false) as the value. If the value is true, the extension is loaded. Otherwise, it's not.
For example in this snippet from config-extensions-server.properties, you can see that extension.filespooling
is disabled and extension.harvesting
is enabled.
[extensions]
# api extensions
extension.filespooling = false
extension.partialtransaction = false
# addressextraction must be first because mongo and zeromq depend on extracted addresses
extension.addressextraction = false
extension.mongo = false
extension.zeromq = false
# p2p extensions
extension.harvesting = true
extension.syncsource = true
...
Depending on node settings, there can be up to three different extensions configuration files:
config-extensions-server.properties
Extensions loaded by the catapult.server process used to enforce the blockchain consensus.config-extensions-broker.properties
Extensions loaded by the catapult.broker process used to perform less urgent work.config-extensions-recovery.properties
Extensions loaded by the catapult.recovery process used to repair a broken node.⚠️ Given the flexibility of catapult, it's possible to launch it with no extensions. However, just because it's possible doesn't meant it's a good idea. This would not be very useful because the node would not be able to do anything. 😬 Similarly, there are other extensions that should be enabled in nearly all useful cases. For example, without
extension.sync
loaded, the node will not be able receive nor process blocks from other nodes. Of course, even these, you could replace with a custom implementation that performs similar functionality.
At this point you might be wondering, don't these various pieces need to communicate somehow. The answer, of course, is yes!
There are two primary ways for extensions to communicate:
Catapult uses a bespoke pub/sub model for propagating events among extensions. All events are managed by the SubscriptionManager. Currently, events related to the following are supported: blocks, partial transactions, unconfirmed transactions, finalization, nodes, state transitions and transaction status transitions. For the curious, a custom pub/sub implementation was chosen due to the relatively small number of events and the benefits of having the events be strongly typed.
As an example of usage, here is a snippet from the Mongo extension where the subscriptions are made.
// register subscriptions
bootstrapper.subscriptionManager().addBlockChangeSubscriber(
io::CreateBlockStorageChangeSubscriber(std::move(pMongoBlockStorage)));
bootstrapper.subscriptionManager().addPtChangeSubscriber(CreateMongoPtStorage(*pMongoContext, *pTransactionRegistry));
bootstrapper.subscriptionManager().addUtChangeSubscriber(
CreateMongoTransactionStorage(*pMongoContext, *pTransactionRegistry, Ut_Collection_Name));
bootstrapper.subscriptionManager().addFinalizationSubscriber(CreateMongoFinalizationStorage(*pMongoContext));
bootstrapper.subscriptionManager().addStateChangeSubscriber(std::make_unique<ApiStateChangeSubscriber>(
std::move(pChainScoreProvider),
std::move(pExternalCacheStorage)));
bootstrapper.subscriptionManager().addTransactionStatusSubscriber(CreateMongoTransactionStatusStorage(*pMongoContext));
While this model is very simple, it is also very powerful. There is a good amount of functionality that can be built using nothing but subscriptions. Anything that needs to take data from the events and transform or forward them is a perfect candidate. Both the Mongo and ZeroMQ extensions are built around this subscription model. The Mongo extension extracts data from the events and inserts them into a NoSQL store for easier querying. The ZeroMQ extension extracts data from the events and publishes them to other external subscribers, like the catapult REST process.
Observant readers might have noticed that these extensions that only read - but never publish - events. That's absolutely correct! This let's us do something really clever - process isolation! 🎊
If you've ever run an API or Dual node, you might have noticed a catapult.broker
process running.
If you're have eyes like a hawk, you might also have noticed that it loads the Mongo and ZeroMQ extensions.
Is that a coincidence?
No!
There is a special filespooling
extension that can run in the catapult.server
process.
It subscribes to events, serializes the event data and writes them out to disk.
If you're familiar with message queues, you can think of this as writing to the queue.
On the other side, the catapult.broker
process reads these files, rehydrates the event data and forwards them to its loaded extensions.
This is equivalent to reading from a message queue.
Like the catapult.server
process, the catapult.broker
does little more than orchestration.
The real heavy lifting is, again, from the extensions.
Since all the event data is present in a separate process - catapult.broker
.
The main node process is much more nimble and secure.
If an attacker finds an exploit in MongoDB, he would only be able to take down the catapult.broker
process but not the catapult.server
process.
Since that catapult.server
process runs all the node operations, the network is unaffected despite the catapult.broker
going down.
Additionally, if the MongoDB processing is very intensive and time consuming, it does not affect the catapult.server
process, because it is being done asynchronously in another process and does.
In fact, during periods of high activity, event data to process can build up on disk, such that the catapult.broker
process can fall behind the catapult.server
process.
In periods of lower activity, the catapult.broker
can catch up.
This is all by design to streamline the critical operations - in catapult.server
- and isolate them from the non-critical operations in catapult.broker
.
Cool!
Of course not all extensions are so easy to isolate. Sometimes data needs to be pushed from one extension to another for processing. This is done via predefined set of server hooks.
The easiest way to understand how this works is to see an example about the lifetime of a block.
Let's start back in ServerHooks.h
and look at three relevant snippets:
...
/// Factory for creating a CompletionAwareBlockRangeConsumerFunc bound to an input source.
using CompletionAwareBlockRangeConsumerFactoryFunc = RangeConsumerFactoryFunc<chain::CompletionAwareBlockRangeConsumerFunc>;
...
/// Sets the \a factory for creating a CompletionAwareBlockRangeConsumerFunc bound to an input source.
void setCompletionAwareBlockRangeConsumerFactory(const CompletionAwareBlockRangeConsumerFactoryFunc& factory) {
SetOnce(m_completionAwareBlockRangeConsumerFactory, factory);
}
...
/// Gets the factory for creating a CompletionAwareBlockRangeConsumerFunc bound to an input source.
const auto& completionAwareBlockRangeConsumerFactory() const {
return Require(m_completionAwareBlockRangeConsumerFactory);
}
...
First, we declare a function prototype.
Second, we define a way to set it.
Finally, we define a way to access it.
Notice SetOnce
and Require
are used in the setter and accessor, respectively.
This means that this hook must be set exactly once across all extensions.
Other hooks have add
semantics, where push_back
and AggregateConsumers
consumers are used.
As you might have guessed, these can be set multiple times, with the caveat that all adds must have taken place before the aggregation.
😵 What exactly is
CompletionAwareBlockRangeConsumerFactoryFunc
? It is a function prototype for a function that accepts anInputSource
and returns a function that accepts a range of blocks (one or more blocks) and a function to be called when the processing of that range is complete. If you don't understand what you just read, don't worry, it's an example of functional programming in the wild. You should still be able to understand everything else!
Great, now that we've seen what hooks are, let's see how they're used!
The DispatcherService
sets the completion aware block range consumer factory:
state.hooks().setCompletionAwareBlockRangeConsumerFactory([&dispatcher = *pDispatcher, &nodes = state.nodes()](auto source) {
return [&dispatcher, &nodes, source](auto&& range, const auto& processingComplete) {
return disruptor::InputSource::Local == source || !nodes.view().isBanned(range.SourceIdentity)
? dispatcher.processElement(ConsumerInput(std::move(range), source), processingComplete)
: 0;
};
});
If the blocks (range
) are from the local node - i.e. the local node harvested them - or from a remote node that isn't banned, they are passed to dispatcher.processElement
via ConsumerInput
.
This begins the journey of these blocks through the multiple stages of block processing.
If all stages pass, the blocks will be added to the blockchain.
If any fail, they will be rejected.
Remember, since this is a set
hook, it is the only one set in the whole process.
The completion aware block range consumer factor is accessed in two separate places.
First, in the harvesting service, it is used to push harvested blocks (with a local input source):
ScheduledHarvesterTaskOptions CreateHarvesterTaskOptions(extensions::ServiceState& state) {
ScheduledHarvesterTaskOptions options;
options.HarvestingAllowed = state.hooks().chainSyncedPredicate();
options.LastBlockElementSupplier = [&storage = state.storage()]() {
auto storageView = storage.view();
return storageView.loadBlockElement(storageView.chainHeight());
};
options.TimeSupplier = state.timeSupplier();
options.RangeConsumer = state.hooks().completionAwareBlockRangeConsumerFactory()(disruptor::InputSource::Local);
return options;
}
😵 If you're curious, the harvested block is actually pushed here.
m_rangeConsumer(model::BlockRange::FromEntity(std::move(pBlock)), [pIsAnyBlockPending = m_pIsAnyHarvestedBlockPending]( auto, auto) { *pIsAnyBlockPending = false; });
Notice the usage of the callback and the flag
m_pIsAnyHarvestedBlockPending
to skip harvesting while a previously harvested block is still being processed.
Second, in the sync service, it is used to push pulled blocks (with a remote pull input source).
😵 If you're curious, the pulled blocks are actually pushed here.
// need to use shared_from_this because dispatcher can finish processing a block after // scheduler is stopped (and owning DefaultChainSynchronizer is destroyed) auto newId = m_blockRangeConsumer(std::move(range), [pThis = shared_from_this()](auto id, auto result) { pThis->remove(id, result.CompletionStatus); });
Notice the usage of the callback to execute custom logic upon the completion of processing.
Finally, in ServerHooks
, you might have noticed the similarly named blockRangeConsumerFactory
hook.
This hook is very similar to the completionAwareBlockRangeConsumerFactory
except that it doesn't provide a notification upon completion of processing.
Nevertheless, this hook is used to forward blocks pushed by other nodes (with a remote push input source) to the block dispatcher.
The utility function CreateBlockPushEntityCallback creates an appropriate BlockRangeConsumerFunc
that is used by the sync source service here.