This article is a continuation of the previous article in this series.
If you have not already read that article, you should read it first, and we'll be here waiting for you!
One of the limitations of that article is that we omitted proper certificate verification using ssl.CERT_NONE
.
That setting allowed the python code to connect to a Symbol node without first verifying its certificate chain.
In this article, we'll implement the proper certificate verification.
First, we need create an SSL Context and load the appropriate (local) certificates used to connect with the peer. We're disabling hostname check because we allow certificates to be portable. For our purposes, it's more important that they're tied to a specific Symbol account as opposed to a IP/hostname. Also, notice that we're not setting a verification step yet.
self.ssl_context = ssl.create_default_context()
self.ssl_context.check_hostname = False
self.ssl_context.load_cert_chain(
certificate_directory / 'node.full.crt.pem',
keyfile=certificate_directory / 'node.key.pem')
Second, we need to use some advanced wizardry to get a pointer to the underlying OpenSSL handle (SSL_CTX
) because the python ssl
module does not support custom verification.
# get python wrapper object address (SSL_CTX* is offset 16 bytes)
ssl_context_object_address = id(self.ssl_context)
ssl_context_raw_address = ctypes.cast(ssl_context_object_address, ctypes.POINTER(ctypes.c_uint64))[2]
ssl_context_pointer = ffi.cast('SSL_CTX*', ssl_context_raw_address)
Third, we call OpenSSL functions via our custom OpenSSL CFFI bindings module.
Importantly, we need to call SSL_CTX_set_verify
.
SSL_VERIFY_PEER
indicates we want to verify the server certificates, and SSL_VERIFY_FAIL_IF_NO_PEER_CERT
indicates we want to fail verification if the server doesn't send any certificates.
The most important part is specifying a custom verify callback that will process the server certificates.
self._verify_callback_wrapper = ffi.callback('int (*)(int, X509_STORE_CTX *)', self._verify_callback)
lib.SSL_CTX_set_verify(
ssl_context_pointer,
lib.SSL_VERIFY_PEER | lib.SSL_VERIFY_FAIL_IF_NO_PEER_CERT,
self._verify_callback_wrapper)
ℹ️ Although the
cryptography
package provides an OpenSSL CFFI module, it doesn't expose everything we need to be able to implement a custom verification procedure. Instead, we will build a custom CFFI package exposing the minimum set of OpenSSL functions that we need to do so. Alternatively, building a CFFI package with only the missing functions and using it alongside the one fromcryptography
would work. The downside with this approach is that it would require a lot of casting because python views the types from different FFI modules as distinct even when they are the same underlying C type.
Each Symbol node uses a two level certificate chain for peer communication.
The top level certificate is self-signed by the node's main account private key. This allows the client to associate a node with a XYM balance, which is used to weight nodes for time synchronization and peer selection. By convention, top level certificates have a duration of ~10 years.
The second level certificate is signed by the node's main account private key and is used as the node's public key for establishing SSL sessions and decrypting harvest request messages. By convention, second level certificates have a duration of ~1 year.
An alternative way to think about this is that the main public key is the issuer of both certificates but they have different subjects - the main public key and the node public key respectively.
The verify callback accepts two parameters:
preverified
- True
if verification of the active certificate has passed; False
otherwisecertificate_store_context
- Context containing information about the entire certificate chain being verifiedDue to the composition of certificate chains used by Symbol, this callback is called three times when verifying a peer:
The top level (main) certificate initially fails default validation with a self signed error X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN
.
As a result, the callback is first called with the top level certificate active and preverified
is False
.
First, we check that the certificate chain is composed of two certificates:
chain = lib.X509_STORE_CTX_get0_chain(certificate_store_context)
chain_size = lib.sk_X509_num(chain)
if 2 != chain_size:
print(f'rejecting certificate chain with size {chain_size}')
return False
Second, we get the active (current) certificate from the context:
certificate = lib.X509_STORE_CTX_get_current_cert(certificate_store_context)
if not certificate:
raise RuntimeError('rejecting certificate chain with no active certificate')
Third, we confirm that the preverified failure is due to a self-signed error:
if lib.X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN != error_code:
print(f'rejecting certificate chain with unverified unexpected error {error_code}')
return False
Finally, we verify that the self signed signature is valid:
if not verify_self_signed(certificate):
print('rejecting certificate chain with improperly self-signed root certificate')
return False
Assuming all of these checks pass, we return True
from the verify callback.
The following steps are required to check a self signed certificate signature.
First, we create a new (empty) certificate store and add the certificate to it:
certificate_store = ffi.gc(lib.X509_STORE_new(), lib.X509_STORE_free)
if not lib.X509_STORE_add_cert(certificate_store, certificate):
return False
Second, we create a new certificate store context around the certificate store:
certificate_store_context = ffi.gc(lib.X509_STORE_CTX_new(), lib.X509_STORE_CTX_free)
if not lib.X509_STORE_CTX_init(certificate_store_context, certificate_store, certificate, ffi.cast('Cryptography_STACK_OF_X509 *', 0)):
return False
Finally, we check that the self signed signature can be verified:
lib.X509_STORE_CTX_set_flags(certificate_store_context, lib.X509_V_FLAG_CHECK_SS_SIGNATURE)
return 1 == lib.X509_verify_cert(certificate_store_context)
Now that we have verified the top level certificate as being properly self signed, the verify callback is called again with the same top level certificate but this time with preverified
set to True
.
First, we extract the certificate's subject and convert to a human readable string:
subject_x509_name = lib.X509_get_subject_name(certificate)
subject = _extract_one_line(subject_x509_name)
Second, we extract the certificate's public key and ensure it is an ED25519 key:
certificate_public_key = lib.X509_get0_pubkey(certificate)
if not certificate_public_key:
return None
if lib.EVP_PKEY_ED25519 != lib.EVP_PKEY_id(ffi.cast('EVP_PKEY *', certificate_public_key)):
return None
public_key = PublicKey(bytes(PublicKey.SIZE))
key_size_pointer = ffi.new('size_t *', PublicKey.SIZE)
if not lib.EVP_PKEY_get_raw_public_key(certificate_public_key, public_key.bytes, key_size_pointer):
return None
if PublicKey.SIZE != key_size_pointer[0]:
return None
Finally, we store the extracted parts in a tuple and add it to a (parsed) certificate stack:
self.certificate_infos.append(CertificateInfo(subject, public_key))
ℹ️ As part of this extraction routine, any other certificate information can be inspected and extracted as well, including the issuer, expiration date, etc.
The node certificate is automatically preverified because its signer (the self signed certificate) has already been preverified, so it is allowed in the certificate chain.
Like in the previous call, we simply extract the subject and public key and add it to the (parsed) certificate stack.
At this point, the (parsed) certificate stack has two entries:
The main public key is used to validate that a node doesn't lie about its identity.
If the main public key extracted from the certificate chain does not match the main public key reported by the node via /node/info
the node is rejected as a partner.
It is, at best, misconfigured and, at worst, malicious.
Install the symbol-lightapi package, and use the SymbolPeerConnector
to connect to peer nodes:
import asyncio
from pathlib import Path
from symbollightapi.connector.SymbolPeerConnector import SymbolPeerConnector
async def main():
connector = SymbolPeerConnector('xymharvesting.net', 7900, Path('/to/cert/directory'))
connector.timeout_seconds = 2
node_info = await connector.node_info()
print(f' network_identifier: {node_info.network_identifier}')
print(f'network_generation_hash_seed: {node_info.network_generation_hash_seed}')
print(f' main_public_key: {node_info.main_public_key}')
print(f' node_public_key: {node_info.node_public_key}')
print(f' endpoint: {node_info.endpoint}')
print(f' name: {node_info.name}')
print(f' version: {node_info.version}')
print(f' roles: {node_info.roles}')
if '__main__' == __name__:
asyncio.run(main())