Skip to content

Commit e3e02e3

Browse files
authored
Include support for x509 certificate chains in DER format (#1822)
* Fix certificate chain handling in x509_from_der function This change enhances the x509_from_der function to properly handle certificate chains by extracting and using the first certificate when a chain is detected. When a certificate chain is received, the cryptography library raises a ValueError with 'extradata' in the error message. This fix catches that specific error and implements DER parsing to extract the first certificate from the chain. This allows the library to work with certificate chains that some OPC UA servers might provide, improving compatibility. Fixes #1148 and #1245 * Update CHANGELOG.md with certificate chain fix * Improve exception handling comment in x509_from_der function * Add tests for certificate chain handling * Fix code formatting issues * Apply Ruff formatting changes * Remove unused os import from test file
1 parent 4a11975 commit e3e02e3

File tree

3 files changed

+161
-1
lines changed

3 files changed

+161
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919
[#1277](https://github.com/FreeOpcUa/opcua-asyncio/pull/1277)
2020
- Fixed incorrect function signature in Node and it's Sync wrapper
2121
[#1690](https://github.com/FreeOpcUa/opcua-asyncio/pull/1690)
22+
- Fix certificate chain handling in x509_from_der function
23+
Fixes [#1148](https://github.com/FreeOpcUa/opcua-asyncio/issues/1148) and [#1245](https://github.com/FreeOpcUa/opcua-asyncio/issues/1245)
2224

2325
## [1.0.2] - 2022-04-05
2426

asyncua/crypto/uacrypto.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,56 @@ async def load_certificate(path_or_content: Union[bytes, str, Path], extension:
5757

5858

5959
def x509_from_der(data):
60+
"""Load X.509 certificate from DER data.
61+
62+
This function handles both single certificates and certificate chains.
63+
When a certificate chain is detected, it extracts and returns the first
64+
certificate in the chain.
65+
66+
Args:
67+
data (bytes): DER-encoded certificate data
68+
69+
Returns:
70+
x509.Certificate or None: Loaded certificate object or None if data is empty
71+
"""
6072
if not data:
6173
return None
62-
return x509.load_der_x509_certificate(data, default_backend())
74+
try:
75+
# First try to load as a single certificate
76+
return x509.load_der_x509_certificate(data, default_backend())
77+
except ValueError as e:
78+
# The specific error 'ParseError { kind: ExtraData }' indicates we might have a certificate chain
79+
# This happens when the cryptography library encounters additional data after a valid certificate
80+
if "extradata" not in str(e).lower():
81+
raise
82+
83+
# Try parsing as certificate chain by manually extracting the first certificate
84+
# using DER encoding rules: https://en.wikipedia.org/wiki/X.690#DER_encoding
85+
offset = 0
86+
while offset < len(data):
87+
if data[offset] != 0x30: # 0x30 is the ASN.1 SEQUENCE tag that starts a certificate
88+
break
89+
90+
# Parse the length field according to DER rules
91+
length_byte = data[offset + 1]
92+
if length_byte < 128: # Short form length
93+
cert_len = length_byte
94+
header_len = 2 # Tag (1 byte) + length (1 byte)
95+
else: # Long form length
96+
# First byte indicates how many bytes are used to represent the length
97+
num_len_bytes = length_byte & 0x7F
98+
# Read the actual length from the following bytes
99+
cert_len = int.from_bytes(data[offset + 2 : offset + 2 + num_len_bytes], "big")
100+
header_len = 2 + num_len_bytes # Tag (1) + length indicator (1) + length bytes
101+
102+
# Extract just the first certificate from the chain
103+
total_len = header_len + cert_len
104+
cert_data = data[offset : offset + total_len]
105+
# Return only the first certificate in the chain
106+
return x509.load_der_x509_certificate(cert_data, default_backend())
107+
108+
# If we get here, we have a certificate chain but no valid certificates
109+
raise ValueError("No valid certificates found in the certificate chain")
63110

64111

65112
async def load_private_key(

tests/test_cert_chain.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
"""Tests for certificate chain handling in the uacrypto module.
2+
3+
This module tests the enhanced x509_from_der function which now supports:
4+
1. Loading single certificates (original functionality)
5+
2. Loading the first certificate from a certificate chain
6+
3. Proper error handling for invalid certificate data
7+
8+
These tests ensure that the function correctly handles certificate chains
9+
that some OPC UA servers might provide, improving compatibility.
10+
"""
11+
12+
import pytest
13+
from pathlib import Path
14+
15+
from asyncua.crypto import uacrypto
16+
from cryptography import x509
17+
from cryptography.hazmat.backends import default_backend
18+
from cryptography.hazmat.primitives import serialization
19+
20+
pytestmark = pytest.mark.asyncio
21+
22+
BASE_DIR = Path(__file__).parent.parent
23+
EXAMPLE_CERT_PATH = BASE_DIR / "examples" / "certificate-example.der"
24+
EXAMPLE_CERT_3072_PATH = BASE_DIR / "examples" / "certificate-3072-example.der"
25+
PEER_CERT_PATH = BASE_DIR / "examples" / "certificates" / "peer-certificate-example-1.der"
26+
27+
28+
def create_cert_chain():
29+
"""Create a certificate chain for testing.
30+
31+
This function creates a simulated certificate chain by concatenating
32+
two different certificates. In a real-world scenario, certificate chains
33+
would contain a server certificate followed by intermediate CA certificates.
34+
35+
Returns:
36+
tuple: (first_cert_data, cert_chain_data)
37+
- first_cert_data: The DER-encoded data of the first certificate
38+
- cert_chain_data: The DER-encoded data of the entire certificate chain
39+
"""
40+
# Load the example certificates
41+
with open(EXAMPLE_CERT_PATH, "rb") as f:
42+
cert1_data = f.read()
43+
44+
with open(PEER_CERT_PATH, "rb") as f:
45+
cert2_data = f.read()
46+
47+
# Create a certificate chain by concatenating two different certificates
48+
cert_chain = cert1_data + cert2_data
49+
50+
return cert1_data, cert_chain
51+
52+
53+
def test_x509_from_der_single_cert():
54+
"""Test that x509_from_der works with a single certificate."""
55+
# Test with the standard example certificate
56+
with open(EXAMPLE_CERT_PATH, "rb") as f:
57+
cert_data = f.read()
58+
59+
cert = uacrypto.x509_from_der(cert_data)
60+
assert cert is not None
61+
assert isinstance(cert, x509.Certificate)
62+
63+
# Test with the 3072-bit example certificate
64+
with open(EXAMPLE_CERT_3072_PATH, "rb") as f:
65+
cert_data_3072 = f.read()
66+
67+
cert_3072 = uacrypto.x509_from_der(cert_data_3072)
68+
assert cert_3072 is not None
69+
assert isinstance(cert_3072, x509.Certificate)
70+
71+
# Test with the peer certificate
72+
with open(PEER_CERT_PATH, "rb") as f:
73+
peer_cert_data = f.read()
74+
75+
peer_cert = uacrypto.x509_from_der(peer_cert_data)
76+
assert peer_cert is not None
77+
assert isinstance(peer_cert, x509.Certificate)
78+
79+
80+
def test_x509_from_der_cert_chain():
81+
"""Test that x509_from_der works with a certificate chain."""
82+
first_cert_data, cert_chain = create_cert_chain()
83+
84+
# Load the certificate chain using x509_from_der
85+
cert = uacrypto.x509_from_der(cert_chain)
86+
87+
# Verify that the certificate was loaded correctly
88+
assert cert is not None
89+
assert isinstance(cert, x509.Certificate)
90+
91+
# Verify that the loaded certificate is the first one in the chain
92+
# by comparing it with the original certificate
93+
original_cert = x509.load_der_x509_certificate(first_cert_data, default_backend())
94+
assert cert.public_bytes(serialization.Encoding.DER) == original_cert.public_bytes(serialization.Encoding.DER)
95+
96+
97+
def test_x509_from_der_invalid_data():
98+
"""Test that x509_from_der handles invalid data correctly."""
99+
# Test with None
100+
assert uacrypto.x509_from_der(None) is None
101+
102+
# Test with empty bytes
103+
assert uacrypto.x509_from_der(b"") is None
104+
105+
# Test with invalid data that doesn't start with a SEQUENCE tag
106+
with pytest.raises(ValueError):
107+
uacrypto.x509_from_der(b"invalid data")
108+
109+
# Test with data that starts with a SEQUENCE tag but is otherwise invalid
110+
with pytest.raises(ValueError):
111+
uacrypto.x509_from_der(b"\x30\x03\x01\x02\x03")

0 commit comments

Comments
 (0)