Rewrite 'generate_sslroots' w/o OpenSSL.

OpenSSL removed ability to generate C code:
a18cf8fc63

CL rewrites generation script to use pure Python asn1crypto library.

The changes in generated code leading to huge diff in generated file:
- Certificate array names are based on certificate fingerprints instead
of semi-human readable names, which were not referenced externally;
- Order of arrays in generated file matches the order of certificates
as they are appeared in source pem file. Previously re-ordering happen
due to writing temporary files on disk;


Bug: webrtc:11710
Change-Id: Ie7a97b3658f6ccb397f0fd0c21d341934a2cc12e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/304642
Commit-Queue: Yury Yarashevich <yura.yaroshevich@gmail.com>
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#40039}
This commit is contained in:
Yury Yarashevich 2023-05-09 17:03:48 +02:00 committed by WebRTC LUCI CQ
parent 2d7424305d
commit c7ff896999
2 changed files with 213 additions and 191 deletions

View file

@ -86,3 +86,10 @@ wheel: <
name: "infra/python/wheels/requests-py2_py3"
version: "version:2.13.0"
>
# Used by:
# tools_webrtc/sslroots
wheel: <
name: "infra/python/wheels/asn1crypto-py2_py3"
version: "version:1.0.1"
>

View file

@ -1,176 +1,217 @@
#!/usr/bin/env vpython3
# -*- coding:utf-8 -*-
# Copyright (c) 2015 The WebRTC project authors. All Rights Reserved.
# Copyright (c) 2023 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
"""This is a tool to transform a crt file into a C/C++ header.
Usage:
python3 generate_sslroots.py certfile.pem [--verbose | -v] [--full_cert | -f]
Arguments:
-v Print output while running.
-f Add public key and certificate name. Default is to skip and reduce
generated file size.
The supported cert files are:
- Google: https://pki.goog/roots.pem
- Mozilla: https://curl.se/docs/caextract.html
"""
import subprocess
from optparse import OptionParser
import os
import re
import argparse
import logging
from pathlib import Path
import tempfile
from typing import Tuple, Any, List, ByteString
from datetime import datetime, timezone
from hashlib import sha256
from urllib.request import urlopen
from asn1crypto import pem, x509
_GENERATED_FILE = 'ssl_roots.h'
_PREFIX = '__generated__'
_EXTENSION = '.crt'
_SUBJECT_NAME_ARRAY = 'subject_name'
_SUBJECT_NAME_VARIABLE = 'SubjectName'
_PUBLIC_KEY_ARRAY = 'public_key'
_PUBLIC_KEY_VARIABLE = 'PublicKey'
_CERTIFICATE_ARRAY = 'certificate'
_CERTIFICATE_VARIABLE = 'Certificate'
_CERTIFICATE_SIZE_VARIABLE = 'CertificateSize'
_INT_TYPE = 'size_t'
_CHAR_TYPE = 'unsigned char* const'
_VERBOSE = 'verbose'
_MOZILLA_BUNDLE_CHECK = '## Certificate data from Mozilla as of:'
def main():
"""The main entrypoint."""
parser = OptionParser('usage %prog FILE')
parser.add_option('-v', '--verbose', dest='verbose', action='store_true')
parser.add_option('-f', '--full_cert', dest='full_cert', action='store_true')
options, args = parser.parse_args()
if len(args) < 1:
parser.error('No crt file specified.')
return
root_dir, bundle_type = _SplitCrt(args[0], options)
_GenCFiles(root_dir, options, bundle_type)
_Cleanup(root_dir)
parser = argparse.ArgumentParser(
description='This is a tool to transform a crt file '
f'into a C/C++ header: {_GENERATED_FILE}.')
parser.add_argument('source_path_or_url',
help='File path or URL to PEM storage file. '
'The supported cert files are: '
'- Google: https://pki.goog/roots.pem; '
'- Mozilla: https://curl.se/ca/cacert.pem')
parser.add_argument('-v',
'--verbose',
dest='verbose',
action='store_true',
help='Print output while running')
parser.add_argument('-f',
'--full_cert',
dest='full_cert',
action='store_true',
help='Add public key and certificate name. '
'Default is to skip and reduce generated file size.')
args = parser.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING)
def _SplitCrt(source_file, options):
sub_file_blocks = []
label_name = ''
prev_line = None
root_dir = os.path.dirname(os.path.abspath(source_file)) + '/'
_PrintOutput(root_dir, options)
lines = None
with open(source_file) as f:
lines = f.readlines()
mozilla_bundle = any(l.startswith(_MOZILLA_BUNDLE_CHECK) for l in lines)
for line in lines:
if line.startswith('#'):
if mozilla_bundle:
continue
if line.startswith('# Label: '):
sub_file_blocks.append(line)
label = re.search(r'\".*\"', line)
temp_label = label.group(0)
end = len(temp_label) - 1
label_name = _SafeName(temp_label[1:end])
if mozilla_bundle and line.startswith('==='):
sub_file_blocks.append(line)
label_name = _SafeName(prev_line)
elif line.startswith('-----END CERTIFICATE-----'):
sub_file_blocks.append(line)
new_file_name = root_dir + _PREFIX + label_name + _EXTENSION
_PrintOutput('Generating: ' + new_file_name, options)
new_file = open(new_file_name, 'w')
for out_line in sub_file_blocks:
new_file.write(out_line)
new_file.close()
sub_file_blocks = []
with tempfile.TemporaryDirectory() as temp_dir:
cert_file = Path(temp_dir) / "cacert.pem"
if args.source_path_or_url.startswith(
'https://') or args.source_path_or_url.startswith('http://'):
_DownloadCertificatesStore(args.source_path_or_url, cert_file)
destination_dir = Path.cwd()
else:
sub_file_blocks.append(line)
prev_line = line
return root_dir, 'Mozilla' if mozilla_bundle else 'Google'
source_path = Path(args.source_path_or_url)
cert_file.write_bytes(source_path.read_bytes())
destination_dir = source_path.parent
logging.debug('Stored certificate from %s into %s', args.source_path_or_url,
cert_file)
header_file = destination_dir / _GENERATED_FILE
digest, certificates = _LoadCertificatesStore(cert_file)
_GenerateCHeader(header_file, args.source_path_or_url, digest, certificates,
args.full_cert)
logging.debug('Did generate %s from %s [%s]', header_file,
args.source_path_or_url, digest)
def _GenCFiles(root_dir, options, bundle_type):
output_header_file = open(root_dir + _GENERATED_FILE, 'w')
output_header_file.write(_CreateOutputHeader(bundle_type))
if options.full_cert:
subject_name_list = _CreateArraySectionHeader(_SUBJECT_NAME_VARIABLE,
_CHAR_TYPE, options)
public_key_list = _CreateArraySectionHeader(_PUBLIC_KEY_VARIABLE,
_CHAR_TYPE, options)
certificate_list = _CreateArraySectionHeader(_CERTIFICATE_VARIABLE,
_CHAR_TYPE, options)
certificate_size_list = _CreateArraySectionHeader(_CERTIFICATE_SIZE_VARIABLE,
_INT_TYPE, options)
def _DownloadCertificatesStore(pem_url: str, destination_file: Path):
with urlopen(pem_url) as response:
pem_file = response.read()
logging.info('Got response with status [%d]: %s', response.status, pem_url)
for _, _, files in os.walk(root_dir):
for current_file in files:
if current_file.startswith(_PREFIX):
prefix_length = len(_PREFIX)
length = len(current_file) - len(_EXTENSION)
label = current_file[prefix_length:length]
filtered_output, cert_size = _CreateCertSection(root_dir, current_file,
label, options)
output_header_file.write(filtered_output + '\n\n\n')
if options.full_cert:
subject_name_list += _AddLabelToArray(label, _SUBJECT_NAME_ARRAY)
public_key_list += _AddLabelToArray(label, _PUBLIC_KEY_ARRAY)
certificate_list += _AddLabelToArray(label, _CERTIFICATE_ARRAY)
certificate_size_list += (' %s,\n') % (cert_size)
if destination_file.parent.exists():
logging.debug('Creating directory and it\'s parents %s',
destination_file.parent)
destination_file.parent.mkdir(parents=True, exist_ok=True)
if destination_file.exists():
logging.debug('Unlink existing file %s', destination_file)
destination_file.unlink(missing_ok=True)
if options.full_cert:
subject_name_list += _CreateArraySectionFooter()
output_header_file.write(subject_name_list)
public_key_list += _CreateArraySectionFooter()
output_header_file.write(public_key_list)
certificate_list += _CreateArraySectionFooter()
output_header_file.write(certificate_list)
certificate_size_list += _CreateArraySectionFooter()
output_header_file.write(certificate_size_list)
output_header_file.write(_CreateOutputFooter())
output_header_file.close()
destination_file.write_bytes(pem_file)
logging.info('Stored downloaded %d bytes pem file to `%s`', len(pem_file),
destination_file)
def _Cleanup(root_dir):
for f in os.listdir(root_dir):
if f.startswith(_PREFIX):
os.remove(root_dir + f)
def _LoadCertificatesStore(
source_file: Path) -> Tuple[str, List[x509.Certificate]]:
pem_bytes = source_file.read_bytes()
certificates = [
x509.Certificate.load(der)
for type, _, der in pem.unarmor(pem_bytes, True) if type == 'CERTIFICATE'
]
digest = f'sha256:{sha256(pem_bytes).hexdigest()}'
logging.debug('Loaded %d certificates from %s [%s] ', len(certificates),
source_file, digest)
return digest, certificates
def _CreateCertSection(root_dir, source_file, label, options):
command = 'openssl x509 -in %s%s -noout -C' % (root_dir, source_file)
_PrintOutput(command, options)
output = subprocess.getstatusoutput(command)[1]
decl_block = 'unsigned char .*_(%s|%s|%s)' %\
(_SUBJECT_NAME_ARRAY, _PUBLIC_KEY_ARRAY, _CERTIFICATE_ARRAY)
prog = re.compile(decl_block, re.IGNORECASE)
renamed_output = prog.sub('const unsigned char ' + label + r'_\1', output)
def _GenerateCHeader(header_file: Path, source: str, source_digest: str,
certificates: List[x509.Certificate], full_cert: bool):
header_file.parent.mkdir(parents=True, exist_ok=True)
with header_file.open('w') as output:
output.write(_CreateOutputHeader(source, source_digest))
filtered_output = ''
cert_block = '^const unsigned char.*?};$'
prog2 = re.compile(cert_block, re.IGNORECASE | re.MULTILINE | re.DOTALL)
if not options.full_cert:
filtered_output = prog2.sub('', renamed_output, count=2)
else:
filtered_output = renamed_output
named_certificates = [(cert,
f'kCertificateWithFingerprint_{cert.sha256.hex()}')
for cert in certificates]
cert_size_block = r'\d\d\d+'
prog3 = re.compile(cert_size_block, re.MULTILINE | re.VERBOSE)
result = prog3.findall(renamed_output)
cert_size = result[len(result) - 1]
names = list(map(lambda x: x[1], named_certificates))
unique_names = list(set(names))
if len(names) != len(unique_names):
raise RuntimeError(
f'There are {len(names) - len(unique_names)} non-unique '
'certificate names generated. Generator script must be '
'fixed to handle collision.')
return filtered_output, cert_size
for cert, name in named_certificates:
output.write(_CreateCertificateMetadataHeader(cert))
if full_cert:
output.write(
_CArrayConstantDefinition('unsigned char',
f'{name}_subject_name',
_CreateHexList(cert.subject.dump()),
max_items_per_line=16))
output.write('\n')
output.write(
_CArrayConstantDefinition('unsigned char',
f'{name}_public_key',
_CreateHexList(cert.public_key.dump()),
max_items_per_line=16))
output.write('\n')
output.write(
_CArrayConstantDefinition('unsigned char',
f'{name}_certificate',
_CreateHexList(cert.dump()),
max_items_per_line=16))
output.write('\n\n')
if full_cert:
output.write(
_CArrayConstantDefinition('unsigned char* const',
'kSSLCertSubjectNameList',
[f'{name}_subject_name' for name in names]))
output.write('\n\n')
output.write(
_CArrayConstantDefinition('unsigned char* const',
'kSSLCertPublicKeyList',
[f'{name}_public_key' for name in names]))
output.write('\n\n')
output.write(
_CArrayConstantDefinition('unsigned char* const',
'kSSLCertCertificateList',
[f'{name}_certificate' for name in names]))
output.write('\n\n')
output.write(
_CArrayConstantDefinition(
'size_t', 'kSSLCertCertificateSizeList',
[f'{len(cert.dump())}' for cert, _ in named_certificates]))
output.write('\n\n')
output.write(_CreateOutputFooter())
def _CreateOutputHeader(bundle_type):
output = ('/*\n'
' * Copyright 2004 The WebRTC Project Authors. All rights '
def _CreateHexList(items: ByteString) -> List[str]:
"""
Produces list of strings each item is hex literal of byte of source sequence
"""
return [f'0x{item:02X}' for item in items]
def _CArrayConstantDefinition(type_name: str,
array_name: str,
items: List[Any],
max_items_per_line: int = 1) -> str:
"""
Produces C array definition like: `const type_name array_name = { items };`
"""
return (f'const {type_name} {array_name}[{len(items)}]='
f'{_CArrayInitializerList(items, max_items_per_line)};')
def _CArrayInitializerList(items: List[Any],
max_items_per_line: int = 1) -> str:
"""
Produces C initializer list like: `{\\nitems[0], \\n ...}`
"""
return '{\n' + '\n'.join([
','.join(items[i:i + max_items_per_line]) + ','
for i in range(0, len(items), max_items_per_line)
]) + '\n}'
def _CreateCertificateMetadataHeader(cert: x509.Certificate) -> str:
return (f'/* subject: {cert.subject.human_friendly} */\n'
f'/* issuer: {cert.issuer.human_friendly} */\n'
f'/* link: https://crt.sh/?q={cert.sha256.hex()} */\n')
def _CreateOutputHeader(source_path_or_url: str, source_digest: str) -> str:
now_utc = datetime.now(timezone.utc).replace(microsecond=0)
output = (
'/*\n'
f' * Copyright {now_utc.year} The WebRTC Project Authors. All rights '
'reserved.\n'
' *\n'
' * Use of this source code is governed by a BSD-style license\n'
@ -184,12 +225,14 @@ def _CreateOutputHeader(bundle_type):
'#ifndef RTC_BASE_SSL_ROOTS_H_\n'
'#define RTC_BASE_SSL_ROOTS_H_\n\n'
'// This file is the root certificates in C form.\n\n'
'// It was generated with the following script:\n'
'// tools_webrtc/sslroots/generate_sslroots.py'
' %s_CA_bundle.pem\n\n'
f'// It was generated at {now_utc.isoformat()} by the following script:\n'
'// `tools_webrtc/sslroots/generate_sslroots.py '
f'{source_path_or_url}`\n\n'
'// clang-format off\n'
'// Don\'t bother formatting generated code,\n'
'// also it would breaks subject/issuer lines.\n\n' % bundle_type)
'// also it would breaks subject/issuer lines.\n\n'
f'// Source bundle `{source_path_or_url}` digest is [{source_digest}]\n\n'
)
return output
@ -197,33 +240,5 @@ def _CreateOutputFooter():
return '// clang-format on\n\n#endif // RTC_BASE_SSL_ROOTS_H_\n'
def _CreateArraySectionHeader(type_name, type_type, options):
output = ('const %s kSSLCert%sList[] = {\n') % (type_type, type_name)
_PrintOutput(output, options)
return output
def _AddLabelToArray(label, type_name):
return ' %s_%s,\n' % (label, type_name)
def _CreateArraySectionFooter():
return '};\n\n'
def _SafeName(original_file_name):
bad_chars = ' -./\\()áéíőú\r\n'
replacement_chars = ''
for _ in bad_chars:
replacement_chars += '_'
translation_table = str.maketrans(bad_chars, replacement_chars)
return original_file_name.translate(translation_table)
def _PrintOutput(output, options):
if options.verbose:
print(output)
if __name__ == '__main__':
main()