diff --git a/.vpython3 b/.vpython3 index 31a2c59009..96feab5c86 100644 --- a/.vpython3 +++ b/.vpython3 @@ -86,3 +86,10 @@ wheel: < name: "infra/python/wheels/requests-py2_py3" version: "version:2.13.0" > + +# Used by: +# tools_webrtc/sslroots +wheel: < + name: "infra/python/wheels/asn1crypto-py2_py3" + version: "version:1.0.1" +> diff --git a/tools_webrtc/sslroots/generate_sslroots.py b/tools_webrtc/sslroots/generate_sslroots.py index 291c3ce3c8..14acff92fb 100644 --- a/tools_webrtc/sslroots/generate_sslroots.py +++ b/tools_webrtc/sslroots/generate_sslroots.py @@ -1,195 +1,238 @@ #!/usr/bin/env vpython3 # -*- coding:utf-8 -*- -# Copyright (c) 2015 The WebRTC project authors. All Rights Reserved. +# Copyright (c) 2023 The WebRTC project authors. All Rights Reserved. # # Use of this source code is governed by a BSD-style license # that can be found in the LICENSE file in the root of the source # tree. An additional intellectual property rights grant can be found # in the file PATENTS. All contributing project authors may # be found in the AUTHORS file in the root of the source tree. -"""This is a tool to transform a crt file into a C/C++ header. - -Usage: -python3 generate_sslroots.py certfile.pem [--verbose | -v] [--full_cert | -f] - -Arguments: - -v Print output while running. - -f Add public key and certificate name. Default is to skip and reduce - generated file size. - -The supported cert files are: - - Google: https://pki.goog/roots.pem - - Mozilla: https://curl.se/docs/caextract.html -""" - -import subprocess -from optparse import OptionParser -import os -import re +import argparse +import logging +from pathlib import Path +import tempfile +from typing import Tuple, Any, List, ByteString +from datetime import datetime, timezone +from hashlib import sha256 +from urllib.request import urlopen +from asn1crypto import pem, x509 _GENERATED_FILE = 'ssl_roots.h' -_PREFIX = '__generated__' -_EXTENSION = '.crt' -_SUBJECT_NAME_ARRAY = 'subject_name' -_SUBJECT_NAME_VARIABLE = 'SubjectName' -_PUBLIC_KEY_ARRAY = 'public_key' -_PUBLIC_KEY_VARIABLE = 'PublicKey' -_CERTIFICATE_ARRAY = 'certificate' -_CERTIFICATE_VARIABLE = 'Certificate' -_CERTIFICATE_SIZE_VARIABLE = 'CertificateSize' -_INT_TYPE = 'size_t' -_CHAR_TYPE = 'unsigned char* const' -_VERBOSE = 'verbose' -_MOZILLA_BUNDLE_CHECK = '## Certificate data from Mozilla as of:' - def main(): - """The main entrypoint.""" - parser = OptionParser('usage %prog FILE') - parser.add_option('-v', '--verbose', dest='verbose', action='store_true') - parser.add_option('-f', '--full_cert', dest='full_cert', action='store_true') - options, args = parser.parse_args() - if len(args) < 1: - parser.error('No crt file specified.') - return - root_dir, bundle_type = _SplitCrt(args[0], options) - _GenCFiles(root_dir, options, bundle_type) - _Cleanup(root_dir) + parser = argparse.ArgumentParser( + description='This is a tool to transform a crt file ' + f'into a C/C++ header: {_GENERATED_FILE}.') + parser.add_argument('source_path_or_url', + help='File path or URL to PEM storage file. ' + 'The supported cert files are: ' + '- Google: https://pki.goog/roots.pem; ' + '- Mozilla: https://curl.se/ca/cacert.pem') + parser.add_argument('-v', + '--verbose', + dest='verbose', + action='store_true', + help='Print output while running') + parser.add_argument('-f', + '--full_cert', + dest='full_cert', + action='store_true', + help='Add public key and certificate name. ' + 'Default is to skip and reduce generated file size.') + args = parser.parse_args() + logging.basicConfig(level=logging.DEBUG if args.verbose else logging.WARNING) -def _SplitCrt(source_file, options): - sub_file_blocks = [] - label_name = '' - prev_line = None - root_dir = os.path.dirname(os.path.abspath(source_file)) + '/' - _PrintOutput(root_dir, options) - lines = None - with open(source_file) as f: - lines = f.readlines() - mozilla_bundle = any(l.startswith(_MOZILLA_BUNDLE_CHECK) for l in lines) - for line in lines: - if line.startswith('#'): - if mozilla_bundle: - continue - if line.startswith('# Label: '): - sub_file_blocks.append(line) - label = re.search(r'\".*\"', line) - temp_label = label.group(0) - end = len(temp_label) - 1 - label_name = _SafeName(temp_label[1:end]) - if mozilla_bundle and line.startswith('==='): - sub_file_blocks.append(line) - label_name = _SafeName(prev_line) - elif line.startswith('-----END CERTIFICATE-----'): - sub_file_blocks.append(line) - new_file_name = root_dir + _PREFIX + label_name + _EXTENSION - _PrintOutput('Generating: ' + new_file_name, options) - new_file = open(new_file_name, 'w') - for out_line in sub_file_blocks: - new_file.write(out_line) - new_file.close() - sub_file_blocks = [] + with tempfile.TemporaryDirectory() as temp_dir: + cert_file = Path(temp_dir) / "cacert.pem" + + if args.source_path_or_url.startswith( + 'https://') or args.source_path_or_url.startswith('http://'): + _DownloadCertificatesStore(args.source_path_or_url, cert_file) + destination_dir = Path.cwd() else: - sub_file_blocks.append(line) - prev_line = line - return root_dir, 'Mozilla' if mozilla_bundle else 'Google' + source_path = Path(args.source_path_or_url) + cert_file.write_bytes(source_path.read_bytes()) + destination_dir = source_path.parent + + logging.debug('Stored certificate from %s into %s', args.source_path_or_url, + cert_file) + + header_file = destination_dir / _GENERATED_FILE + + digest, certificates = _LoadCertificatesStore(cert_file) + _GenerateCHeader(header_file, args.source_path_or_url, digest, certificates, + args.full_cert) + + logging.debug('Did generate %s from %s [%s]', header_file, + args.source_path_or_url, digest) -def _GenCFiles(root_dir, options, bundle_type): - output_header_file = open(root_dir + _GENERATED_FILE, 'w') - output_header_file.write(_CreateOutputHeader(bundle_type)) - if options.full_cert: - subject_name_list = _CreateArraySectionHeader(_SUBJECT_NAME_VARIABLE, - _CHAR_TYPE, options) - public_key_list = _CreateArraySectionHeader(_PUBLIC_KEY_VARIABLE, - _CHAR_TYPE, options) - certificate_list = _CreateArraySectionHeader(_CERTIFICATE_VARIABLE, - _CHAR_TYPE, options) - certificate_size_list = _CreateArraySectionHeader(_CERTIFICATE_SIZE_VARIABLE, - _INT_TYPE, options) +def _DownloadCertificatesStore(pem_url: str, destination_file: Path): + with urlopen(pem_url) as response: + pem_file = response.read() + logging.info('Got response with status [%d]: %s', response.status, pem_url) - for _, _, files in os.walk(root_dir): - for current_file in files: - if current_file.startswith(_PREFIX): - prefix_length = len(_PREFIX) - length = len(current_file) - len(_EXTENSION) - label = current_file[prefix_length:length] - filtered_output, cert_size = _CreateCertSection(root_dir, current_file, - label, options) - output_header_file.write(filtered_output + '\n\n\n') - if options.full_cert: - subject_name_list += _AddLabelToArray(label, _SUBJECT_NAME_ARRAY) - public_key_list += _AddLabelToArray(label, _PUBLIC_KEY_ARRAY) - certificate_list += _AddLabelToArray(label, _CERTIFICATE_ARRAY) - certificate_size_list += (' %s,\n') % (cert_size) + if destination_file.parent.exists(): + logging.debug('Creating directory and it\'s parents %s', + destination_file.parent) + destination_file.parent.mkdir(parents=True, exist_ok=True) + if destination_file.exists(): + logging.debug('Unlink existing file %s', destination_file) + destination_file.unlink(missing_ok=True) - if options.full_cert: - subject_name_list += _CreateArraySectionFooter() - output_header_file.write(subject_name_list) - public_key_list += _CreateArraySectionFooter() - output_header_file.write(public_key_list) - certificate_list += _CreateArraySectionFooter() - output_header_file.write(certificate_list) - certificate_size_list += _CreateArraySectionFooter() - output_header_file.write(certificate_size_list) - output_header_file.write(_CreateOutputFooter()) - output_header_file.close() + destination_file.write_bytes(pem_file) + logging.info('Stored downloaded %d bytes pem file to `%s`', len(pem_file), + destination_file) -def _Cleanup(root_dir): - for f in os.listdir(root_dir): - if f.startswith(_PREFIX): - os.remove(root_dir + f) +def _LoadCertificatesStore( + source_file: Path) -> Tuple[str, List[x509.Certificate]]: + pem_bytes = source_file.read_bytes() + + certificates = [ + x509.Certificate.load(der) + for type, _, der in pem.unarmor(pem_bytes, True) if type == 'CERTIFICATE' + ] + digest = f'sha256:{sha256(pem_bytes).hexdigest()}' + logging.debug('Loaded %d certificates from %s [%s] ', len(certificates), + source_file, digest) + return digest, certificates -def _CreateCertSection(root_dir, source_file, label, options): - command = 'openssl x509 -in %s%s -noout -C' % (root_dir, source_file) - _PrintOutput(command, options) - output = subprocess.getstatusoutput(command)[1] - decl_block = 'unsigned char .*_(%s|%s|%s)' %\ - (_SUBJECT_NAME_ARRAY, _PUBLIC_KEY_ARRAY, _CERTIFICATE_ARRAY) - prog = re.compile(decl_block, re.IGNORECASE) - renamed_output = prog.sub('const unsigned char ' + label + r'_\1', output) +def _GenerateCHeader(header_file: Path, source: str, source_digest: str, + certificates: List[x509.Certificate], full_cert: bool): + header_file.parent.mkdir(parents=True, exist_ok=True) + with header_file.open('w') as output: + output.write(_CreateOutputHeader(source, source_digest)) - filtered_output = '' - cert_block = '^const unsigned char.*?};$' - prog2 = re.compile(cert_block, re.IGNORECASE | re.MULTILINE | re.DOTALL) - if not options.full_cert: - filtered_output = prog2.sub('', renamed_output, count=2) - else: - filtered_output = renamed_output + named_certificates = [(cert, + f'kCertificateWithFingerprint_{cert.sha256.hex()}') + for cert in certificates] - cert_size_block = r'\d\d\d+' - prog3 = re.compile(cert_size_block, re.MULTILINE | re.VERBOSE) - result = prog3.findall(renamed_output) - cert_size = result[len(result) - 1] + names = list(map(lambda x: x[1], named_certificates)) + unique_names = list(set(names)) + if len(names) != len(unique_names): + raise RuntimeError( + f'There are {len(names) - len(unique_names)} non-unique ' + 'certificate names generated. Generator script must be ' + 'fixed to handle collision.') - return filtered_output, cert_size + for cert, name in named_certificates: + + output.write(_CreateCertificateMetadataHeader(cert)) + + if full_cert: + output.write( + _CArrayConstantDefinition('unsigned char', + f'{name}_subject_name', + _CreateHexList(cert.subject.dump()), + max_items_per_line=16)) + output.write('\n') + output.write( + _CArrayConstantDefinition('unsigned char', + f'{name}_public_key', + _CreateHexList(cert.public_key.dump()), + max_items_per_line=16)) + output.write('\n') + + output.write( + _CArrayConstantDefinition('unsigned char', + f'{name}_certificate', + _CreateHexList(cert.dump()), + max_items_per_line=16)) + output.write('\n\n') + + if full_cert: + output.write( + _CArrayConstantDefinition('unsigned char* const', + 'kSSLCertSubjectNameList', + [f'{name}_subject_name' for name in names])) + output.write('\n\n') + + output.write( + _CArrayConstantDefinition('unsigned char* const', + 'kSSLCertPublicKeyList', + [f'{name}_public_key' for name in names])) + output.write('\n\n') + + output.write( + _CArrayConstantDefinition('unsigned char* const', + 'kSSLCertCertificateList', + [f'{name}_certificate' for name in names])) + output.write('\n\n') + + output.write( + _CArrayConstantDefinition( + 'size_t', 'kSSLCertCertificateSizeList', + [f'{len(cert.dump())}' for cert, _ in named_certificates])) + output.write('\n\n') + + output.write(_CreateOutputFooter()) -def _CreateOutputHeader(bundle_type): - output = ('/*\n' - ' * Copyright 2004 The WebRTC Project Authors. All rights ' - 'reserved.\n' - ' *\n' - ' * Use of this source code is governed by a BSD-style license\n' - ' * that can be found in the LICENSE file in the root of the ' - 'source\n' - ' * tree. An additional intellectual property rights grant can be ' - 'found\n' - ' * in the file PATENTS. All contributing project authors may\n' - ' * be found in the AUTHORS file in the root of the source tree.\n' - ' */\n\n' - '#ifndef RTC_BASE_SSL_ROOTS_H_\n' - '#define RTC_BASE_SSL_ROOTS_H_\n\n' - '// This file is the root certificates in C form.\n\n' - '// It was generated with the following script:\n' - '// tools_webrtc/sslroots/generate_sslroots.py' - ' %s_CA_bundle.pem\n\n' - '// clang-format off\n' - '// Don\'t bother formatting generated code,\n' - '// also it would breaks subject/issuer lines.\n\n' % bundle_type) +def _CreateHexList(items: ByteString) -> List[str]: + """ + Produces list of strings each item is hex literal of byte of source sequence + """ + return [f'0x{item:02X}' for item in items] + + +def _CArrayConstantDefinition(type_name: str, + array_name: str, + items: List[Any], + max_items_per_line: int = 1) -> str: + """ + Produces C array definition like: `const type_name array_name = { items };` + """ + return (f'const {type_name} {array_name}[{len(items)}]=' + f'{_CArrayInitializerList(items, max_items_per_line)};') + + +def _CArrayInitializerList(items: List[Any], + max_items_per_line: int = 1) -> str: + """ + Produces C initializer list like: `{\\nitems[0], \\n ...}` + """ + return '{\n' + '\n'.join([ + ','.join(items[i:i + max_items_per_line]) + ',' + for i in range(0, len(items), max_items_per_line) + ]) + '\n}' + + +def _CreateCertificateMetadataHeader(cert: x509.Certificate) -> str: + return (f'/* subject: {cert.subject.human_friendly} */\n' + f'/* issuer: {cert.issuer.human_friendly} */\n' + f'/* link: https://crt.sh/?q={cert.sha256.hex()} */\n') + + +def _CreateOutputHeader(source_path_or_url: str, source_digest: str) -> str: + now_utc = datetime.now(timezone.utc).replace(microsecond=0) + output = ( + '/*\n' + f' * Copyright {now_utc.year} The WebRTC Project Authors. All rights ' + 'reserved.\n' + ' *\n' + ' * Use of this source code is governed by a BSD-style license\n' + ' * that can be found in the LICENSE file in the root of the ' + 'source\n' + ' * tree. An additional intellectual property rights grant can be ' + 'found\n' + ' * in the file PATENTS. All contributing project authors may\n' + ' * be found in the AUTHORS file in the root of the source tree.\n' + ' */\n\n' + '#ifndef RTC_BASE_SSL_ROOTS_H_\n' + '#define RTC_BASE_SSL_ROOTS_H_\n\n' + '// This file is the root certificates in C form.\n\n' + f'// It was generated at {now_utc.isoformat()} by the following script:\n' + '// `tools_webrtc/sslroots/generate_sslroots.py ' + f'{source_path_or_url}`\n\n' + '// clang-format off\n' + '// Don\'t bother formatting generated code,\n' + '// also it would breaks subject/issuer lines.\n\n' + f'// Source bundle `{source_path_or_url}` digest is [{source_digest}]\n\n' + ) return output @@ -197,33 +240,5 @@ def _CreateOutputFooter(): return '// clang-format on\n\n#endif // RTC_BASE_SSL_ROOTS_H_\n' -def _CreateArraySectionHeader(type_name, type_type, options): - output = ('const %s kSSLCert%sList[] = {\n') % (type_type, type_name) - _PrintOutput(output, options) - return output - - -def _AddLabelToArray(label, type_name): - return ' %s_%s,\n' % (label, type_name) - - -def _CreateArraySectionFooter(): - return '};\n\n' - - -def _SafeName(original_file_name): - bad_chars = ' -./\\()áéíőú\r\n' - replacement_chars = '' - for _ in bad_chars: - replacement_chars += '_' - translation_table = str.maketrans(bad_chars, replacement_chars) - return original_file_name.translate(translation_table) - - -def _PrintOutput(output, options): - if options.verbose: - print(output) - - if __name__ == '__main__': main()