From 2a21f3099aa76b9a225559ab88169c38f1aa3660 Mon Sep 17 00:00:00 2001 From: Legrandin Date: Sun, 27 Dec 2009 17:26:59 +0100 Subject: [PATCH] Add ability to export and import RSA keys in DER and PEM format. Typical usage for importing an RSA key: f = file("ssl.pem") key = RSA.importKey(f.read()) f.close() key.verify(hash, signature) Typical usage for exporting an RSA public key: key = RSA.generate(512, randfunc) f = file("ssl.der","w") f.write(key.publickey.exportKey('DER')) f.close() I confirm I am eligible for submitting code to pycrypto according to http://www.dlitz.net/software/pycrypto/submission-requirements/ fetched on 27 December 2009. Committer: Legrandin --- lib/Crypto/PublicKey/RSA.py | 73 ++++++- lib/Crypto/SelfTest/PublicKey/__init__.py | 1 + lib/Crypto/SelfTest/PublicKey/test_importKey.py | 142 +++++++++++ lib/Crypto/SelfTest/Util/test_asn1.py | 287 +++++++++++++++++++++++ lib/Crypto/Util/__init__.py | 3 +- lib/Crypto/Util/asn1.py | 162 +++++++++++++ 6 files changed, 666 insertions(+), 2 deletions(-) create mode 100644 lib/Crypto/SelfTest/PublicKey/test_importKey.py create mode 100644 lib/Crypto/SelfTest/Util/test_asn1.py create mode 100644 lib/Crypto/Util/asn1.py diff --git a/lib/Crypto/PublicKey/RSA.py b/lib/Crypto/PublicKey/RSA.py index fd8678d..363bb12 100644 --- a/lib/Crypto/PublicKey/RSA.py +++ b/lib/Crypto/PublicKey/RSA.py @@ -26,13 +26,17 @@ __revision__ = "$Id$" -__all__ = ['generate', 'construct', 'error'] +__all__ = ['generate', 'construct', 'error', 'importKey' ] from Crypto.Util.python_compat import * from Crypto.PublicKey import _RSA, _slowmath, pubkey from Crypto import Random +from Crypto.Util.asn1 import DerObject, DerSequence +from textwrap import fill +import base64 + try: from Crypto.PublicKey import _fastmath except ImportError: @@ -128,6 +132,36 @@ class _RSAobj(pubkey.pubkey): attrs.append("private") return "<%s @0x%x %s>" % (self.__class__.__name__, id(self), ",".join(attrs)) + def exportKey(self, format='PEM'): + """Export the RSA key. A string is returned + with the encoded public or the private half + under the selected format. + + format: 'DER' (PKCS#1) or 'PEM' (RFC1421) + """ + der = DerSequence() + if self.has_private(): + keyType = "RSA PRIVATE" + der[:] = [ 0, self.n, self.e, self.d, self.p, self.q, + self.d % (self.p-1), self.d % (self.q-1), + self.u ] + else: + keyType = "PUBLIC" + der.append('\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01\x05\x00') + bitmap = DerObject('BIT STRING') + derPK = DerSequence() + derPK[:] = [ self.n, self.e ] + bitmap.payload = '\x00' + derPK.encode() + der.append(bitmap.encode()) + if format=='DER': + return der.encode() + if format=='PEM': + pem = "-----BEGIN %s KEY-----\n" % keyType + pem += fill(base64.b64encode(der.encode()),64) + pem += "\n-----END %s KEY-----" % keyType + return pem + return ValueError("") + class RSAImplementation(object): def __init__(self, **kwargs): # 'use_fast_math' parameter: @@ -175,9 +209,46 @@ class RSAImplementation(object): key = self._math.rsa_construct(*tup) return _RSAobj(self, key) + def _importKeyDER(self, externKey): + der = DerSequence() + der.decode(externKey, True) + if len(der)==9 and der.hasOnlyInts() and der[0]==0: + # ASN.1 RSAPrivateKey element + del der[6:8] # Remove d mod (p-1) and d mod (q-1) + del der[0] # Remove version + return self.construct(der[:]) + if len(der)==2: + # ASN.1 SubjectPublicKeyInfo element + if der[0]=='\x30\x0D\x06\x09\x2A\x86\x48\x86\xF7\x0D\x01\x01\x01\x05\x00': + bitmap = DerObject() + bitmap.decode(der[1], True) + if bitmap.typeTag=='\x03' and bitmap.payload[0]=='\x00': + der.decode(bitmap.payload[1:], True) + if len(der)==2 and der.hasOnlyInts(): + return self.construct(der[:]) + raise ValueError("RSA key format is not supported") + + def importKey(self, externKey): + """Import an RSA key (public or private half). + + externKey: the RSA key to import, encoded as a string. + The key can be in DER (PKCS#1) or in unencrypted + PEM format (RFC1421). + """ + if externKey.startswith('-----'): + # This is probably a PEM encoded key + lines = externKey.replace(" ",'').split() + der = base64.b64decode(''.join(lines[1:-1])) + return self._importKeyDER(der) + if externKey[0]=='\x30': + # This is probably a DER encoded key + return self._importKeyDER(externKey) + raise ValueError("RSA key format is not supported") + _impl = RSAImplementation() generate = _impl.generate construct = _impl.construct +importKey = _impl.importKey error = _impl.error # vim:set ts=4 sw=4 sts=4 expandtab: diff --git a/lib/Crypto/SelfTest/PublicKey/__init__.py b/lib/Crypto/SelfTest/PublicKey/__init__.py index 4695a40..f29ae51 100644 --- a/lib/Crypto/SelfTest/PublicKey/__init__.py +++ b/lib/Crypto/SelfTest/PublicKey/__init__.py @@ -32,6 +32,7 @@ def get_tests(config={}): tests = [] import test_DSA; tests += test_DSA.get_tests(config=config) import test_RSA; tests += test_RSA.get_tests(config=config) + import test_importKey; tests += test_importKey.get_tests(config=config) return tests if __name__ == '__main__': diff --git a/lib/Crypto/SelfTest/PublicKey/test_importKey.py b/lib/Crypto/SelfTest/PublicKey/test_importKey.py new file mode 100644 index 0000000..0b69192 --- /dev/null +++ b/lib/Crypto/SelfTest/PublicKey/test_importKey.py @@ -0,0 +1,142 @@ +# -*- coding: utf-8 -*- +# +# SelfTest/PublicKey/test_importKey.py: Self-test for importing RSA keys +# +# =================================================================== +# The contents of this file are dedicated to the public domain. To +# the extent that dedication to the public domain is not available, +# everyone is granted a worldwide, perpetual, royalty-free, +# non-exclusive license to exercise all rights associated with the +# contents of this file for any purpose whatsoever. +# No rights are reserved. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# =================================================================== + +__revision__ = "$Id$" + +import unittest + +from Crypto.PublicKey import RSA +from Crypto.SelfTest.st_common import * +from Crypto.SelfTest.st_common import list_test_cases, a2b_hex, b2a_hex + +class ImportKeyTests(unittest.TestCase): + + # 512-bit RSA key generated with openssl + rsaKeyPEM = '''-----BEGIN RSA PRIVATE KEY----- +MIIBOwIBAAJBAL8eJ5AKoIsjURpcEoGubZMxLD7+kT+TLr7UkvEtFrRhDDKMtuII +q19FrL4pUIMymPMSLBn3hJLe30Dw48GQM4UCAwEAAQJACUSDEp8RTe32ftq8IwG8 +Wojl5mAd1wFiIOrZ/Uv8b963WJOJiuQcVN29vxU5+My9GPZ7RA3hrDBEAoHUDPrI +OQIhAPIPLz4dphiD9imAkivY31Rc5AfHJiQRA7XixTcjEkojAiEAyh/pJHks/Mlr ++rdPNEpotBjfV4M4BkgGAA/ipcmaAjcCIQCHvhwwKVBLzzTscT2HeUdEeBMoiXXK +JACAr3sJQJGxIQIgarRp+m1WSKV1MciwMaTOnbU7wxFs9DP1pva76lYBzgUCIQC9 +n0CnZCJ6IZYqSt0H5N7+Q+2Ro64nuwV/OSQfM6sBwQ== +-----END RSA PRIVATE KEY-----''' + + rsaPublicKeyPEM = '''-----BEGIN PUBLIC KEY----- +MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAL8eJ5AKoIsjURpcEoGubZMxLD7+kT+T +Lr7UkvEtFrRhDDKMtuIIq19FrL4pUIMymPMSLBn3hJLe30Dw48GQM4UCAwEAAQ== +-----END PUBLIC KEY-----''' + + rsaKeyDER = a2b_hex( + '''3082013b020100024100bf1e27900aa08b23511a5c1281ae6d93312c3efe + 913f932ebed492f12d16b4610c328cb6e208ab5f45acbe2950833298f312 + 2c19f78492dedf40f0e3c190338502030100010240094483129f114dedf6 + 7edabc2301bc5a88e5e6601dd7016220ead9fd4bfc6fdeb75893898ae41c + 54ddbdbf1539f8ccbd18f67b440de1ac30440281d40cfac839022100f20f + 2f3e1da61883f62980922bd8df545ce407c726241103b5e2c53723124a23 + 022100ca1fe924792cfcc96bfab74f344a68b418df578338064806000fe2 + a5c99a023702210087be1c3029504bcf34ec713d877947447813288975ca + 240080af7b094091b12102206ab469fa6d5648a57531c8b031a4ce9db53b + c3116cf433f5a6f6bbea5601ce05022100bd9f40a764227a21962a4add07 + e4defe43ed91a3ae27bb057f39241f33ab01c1 + '''.replace(" ","")) + + rsaPublicKeyDER = a2b_hex( + '''305c300d06092a864886f70d0101010500034b003048024100bf1e27900a + a08b23511a5c1281ae6d93312c3efe913f932ebed492f12d16b4610c328c + b6e208ab5f45acbe2950833298f3122c19f78492dedf40f0e3c190338502 + 03010001 + '''.replace(" ","")) + + n = long('BF 1E 27 90 0A A0 8B 23 51 1A 5C 12 81 AE 6D 93 31 2C 3E FE 91 3F 93 2E BE D4 92 F1 2D 16 B4 61 0C 32 8C B6 E2 08 AB 5F 45 AC BE 29 50 83 32 98 F3 12 2C 19 F7 84 92 DE DF 40 F0 E3 C1 90 33 85'.replace(" ",""),16) + e = 65537L + d = long('09 44 83 12 9F 11 4D ED F6 7E DA BC 23 01 BC 5A 88 E5 E6 60 1D D7 01 62 20 EA D9 FD 4B FC 6F DE B7 58 93 89 8A E4 1C 54 DD BD BF 15 39 F8 CC BD 18 F6 7B 44 0D E1 AC 30 44 02 81 D4 0C FA C8 39'.replace(" ",""),16) + p = long('00 F2 0F 2F 3E 1D A6 18 83 F6 29 80 92 2B D8 DF 54 5C E4 07 C7 26 24 11 03 B5 E2 C5 37 23 12 4A 23'.replace(" ",""),16) + q = long('00 CA 1F E9 24 79 2C FC C9 6B FA B7 4F 34 4A 68 B4 18 DF 57 83 38 06 48 06 00 0F E2 A5 C9 9A 02 37'.replace(" ",""),16) + coeff = long('00 BD 9F 40 A7 64 22 7A 21 96 2A 4A DD 07 E4 DE FE 43 ED 91 A3 AE 27 BB 05 7F 39 24 1F 33 AB 01 C1'.replace(" ",""),16) + + def testImportKey1(self): + key = RSA.importKey(self.rsaKeyDER) + self.assertTrue(key.has_private()) + self.assertEqual(key.n, self.n) + self.assertEqual(key.e, self.e) + self.assertEqual(key.d, self.d) + self.assertEqual(key.p, self.p) + self.assertEqual(key.q, self.q) + self.assertEqual(key.u, self.coeff) + + def testImportKey2(self): + key = RSA.importKey(self.rsaPublicKeyDER) + self.assertFalse(key.has_private()) + self.assertEqual(key.n, self.n) + self.assertEqual(key.e, self.e) + + def testImportKey3(self): + key = RSA.importKey(self.rsaKeyPEM) + self.assertTrue(key.has_private()) + self.assertEqual(key.n, self.n) + self.assertEqual(key.e, self.e) + self.assertEqual(key.d, self.d) + self.assertEqual(key.p, self.p) + self.assertEqual(key.q, self.q) + self.assertEqual(key.u, self.coeff) + + def testImportKey4(self): + key = RSA.importKey(self.rsaPublicKeyPEM) + self.assertFalse(key.has_private()) + self.assertEqual(key.n, self.n) + self.assertEqual(key.e, self.e) + + ### + def testExportKey1(self): + key = RSA.construct([self.n, self.e, self.d, self.p, self.q, self.coeff]) + derKey = key.exportKey("DER") + self.assertEqual(derKey, self.rsaKeyDER) + + def testExportKey2(self): + key = RSA.construct([self.n, self.e]) + derKey = key.exportKey("DER") + self.assertEqual(derKey, self.rsaPublicKeyDER) + + def testExportKey3(self): + key = RSA.construct([self.n, self.e, self.d, self.p, self.q, self.coeff]) + pemKey = key.exportKey("PEM") + self.assertEqual(pemKey, self.rsaKeyPEM) + + def testExportKey4(self): + key = RSA.construct([self.n, self.e]) + pemKey = key.exportKey("PEM") + self.assertEqual(pemKey, self.rsaPublicKeyPEM) + +if __name__ == '__main__': + unittest.main() + +def get_tests(config={}): + tests = [] + tests += list_test_cases(ImportKeyTests) + return tests + +if __name__ == '__main__': + suite = lambda: unittest.TestSuite(get_tests()) + unittest.main(defaultTest='suite') + +# vim:set ts=4 sw=4 sts=4 expandtab: diff --git a/lib/Crypto/SelfTest/Util/test_asn1.py b/lib/Crypto/SelfTest/Util/test_asn1.py new file mode 100644 index 0000000..64facb2 --- /dev/null +++ b/lib/Crypto/SelfTest/Util/test_asn1.py @@ -0,0 +1,287 @@ +# -*- coding: utf-8 -*- +# +# SelfTest/Util/test_asn.py: Self-test for the Crypto.Util.asn1 module +# +# =================================================================== +# The contents of this file are dedicated to the public domain. To +# the extent that dedication to the public domain is not available, +# everyone is granted a worldwide, perpetual, royalty-free, +# non-exclusive license to exercise all rights associated with the +# contents of this file for any purpose whatsoever. +# No rights are reserved. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# =================================================================== + +"""Self-tests for Crypto.Util.asn1""" + +__revision__ = "$Id$" + +import unittest +import sys + +from Crypto.Util.asn1 import DerSequence, DerObject + +class DerObjectTests(unittest.TestCase): + + def testObjEncode1(self): + # No payload + der = DerObject('\x33') + self.assertEquals(der.encode(), '\x33\x00') + # Small payload + der.payload = '\x45' + self.assertEquals(der.encode(), '\x33\x01\x45') + # Invariant + self.assertEquals(der.encode(), '\x33\x01\x45') + + def testObjEncode2(self): + # Known types + der = DerObject('SEQUENCE') + self.assertEquals(der.encode(), '\x30\x00') + der = DerObject('BIT STRING') + self.assertEquals(der.encode(), '\x03\x00') + + def testObjEncode3(self): + # Long payload + der = DerObject('\x34') + der.payload = "0"*128 + self.assertEquals(der.encode(), '\x34\x81\x80' + ("0"*128)) + + def testObjDecode1(self): + # Decode short payload + der = DerObject() + der.decode('\x20\x02\x01\x02') + self.assertEquals(der.payload, "\x01\x02") + self.assertEquals(der.typeTag, "\x20") + + def testObjDecode2(self): + # Decode short payload + der = DerObject() + der.decode('\x22\x81\x80' + ("1"*128)) + self.assertEquals(der.payload, "1"*128) + self.assertEquals(der.typeTag, "\x22") + +class DerSequenceTests(unittest.TestCase): + + def testEncode1(self): + # Empty sequence + der = DerSequence() + self.assertEquals(der.encode(), '0\x00') + self.assertFalse(der.hasOnlyInts()) + # One single-byte integer (zero) + der.append(0) + self.assertEquals(der.encode(), '0\x03\x02\x01\x00') + self.assertTrue(der.hasOnlyInts()) + # Invariant + self.assertEquals(der.encode(), '0\x03\x02\x01\x00') + + def testEncode2(self): + # One single-byte integer (non-zero) + der = DerSequence() + der.append(127) + self.assertEquals(der.encode(), '0\x03\x02\x01\x7f') + # Indexing + der[0] = 1 + self.assertEquals(len(der),1) + self.assertEquals(der[0],1) + self.assertEquals(der[-1],1) + self.assertEquals(der.encode(), '0\x03\x02\x01\x01') + # + der[:] = [1] + self.assertEquals(len(der),1) + self.assertEquals(der[0],1) + self.assertEquals(der.encode(), '0\x03\x02\x01\x01') + + def testEncode3(self): + # One multi-byte integer (non-zero) + der = DerSequence() + der.append(0x180L) + self.assertEquals(der.encode(), '0\x04\x02\x02\x01\x80') + + def testEncode4(self): + # One very long integer + der = DerSequence() + der.append(2**2048) + self.assertEquals(der.encode(), '0\x82\x01\x05' + '\x02\x82\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00') + + def testEncode5(self): + # One single-byte integer (looks negative) + der = DerSequence() + der.append(0xFFL) + self.assertEquals(der.encode(), '0\x04\x02\x02\x00\xff') + + def testEncode6(self): + # Two integers + der = DerSequence() + der.append(0x180L) + der.append(0xFFL) + self.assertEquals(der.encode(), '0\x08\x02\x02\x01\x80\x02\x02\x00\xff') + self.assertTrue(der.hasOnlyInts()) + # + der.append(0x01) + der[1:] = [9,8] + self.assertEquals(len(der),3) + self.assertEqual(der[1:],[9,8]) + self.assertEqual(der[1:-1],[9]) + self.assertEquals(der.encode(), '0\x0A\x02\x02\x01\x80\x02\x01\x09\x02\x01\x08') + + def testEncode6(self): + # One integer and another type (no matter what it is) + der = DerSequence() + der.append(0x180L) + der.append('\x00\x02\x00\x00') + self.assertEquals(der.encode(), '0\x08\x02\x02\x01\x80\x00\x02\x00\x00') + self.assertFalse(der.hasOnlyInts()) + + #### + + def testDecode1(self): + # Empty sequence + der = DerSequence() + der.decode('0\x00') + self.assertEquals(len(der),0) + # One single-byte integer (zero) + der.decode('0\x03\x02\x01\x00') + self.assertEquals(len(der),1) + self.assertEquals(der[0],0) + # Invariant + der.decode('0\x03\x02\x01\x00') + self.assertEquals(len(der),1) + self.assertEquals(der[0],0) + + def testDecode2(self): + # One single-byte integer (non-zero) + der = DerSequence() + der.decode('0\x03\x02\x01\x7f') + self.assertEquals(len(der),1) + self.assertEquals(der[0],127) + + def testDecode3(self): + # One multi-byte integer (non-zero) + der = DerSequence() + der.decode('0\x04\x02\x02\x01\x80') + self.assertEquals(len(der),1) + self.assertEquals(der[0],0x180L) + + def testDecode4(self): + # One very long integer + der = DerSequence() + der.decode('0\x82\x01\x05' + '\x02\x82\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + '\x00\x00\x00\x00\x00\x00\x00\x00\x00') + self.assertEquals(len(der),1) + self.assertEquals(der[0],2**2048) + + def testDecode5(self): + # One single-byte integer (looks negative) + der = DerSequence() + der.decode('0\x04\x02\x02\x00\xff') + self.assertEquals(len(der),1) + self.assertEquals(der[0],0xFFL) + + def testDecode6(self): + # Two integers + der = DerSequence() + der.decode('0\x08\x02\x02\x01\x80\x02\x02\x00\xff') + self.assertEquals(len(der),2) + self.assertEquals(der[0],0x180L) + self.assertEquals(der[1],0xFFL) + + def testDecode7(self): + # One integer and 2 other types + der = DerSequence() + der.decode('0\x0A\x02\x02\x01\x80\x24\x02\xb6\x63\x12\x00') + self.assertEquals(len(der),3) + self.assertEquals(der[0],0x180L) + self.assertEquals(der[1],'\x24\x02\xb6\x63') + self.assertEquals(der[2],'\x12\x00') + + def testDecode8(self): + # Only 2 other types + der = DerSequence() + der.decode('0\x06\x24\x02\xb6\x63\x12\x00') + self.assertEquals(len(der),2) + self.assertEquals(der[0],'\x24\x02\xb6\x63') + self.assertEquals(der[1],'\x12\x00') + + def testErrDecode1(self): + # Not a sequence + der = DerSequence() + self.assertRaises(ValueError, der.decode, '') + self.assertRaises(ValueError, der.decode, '\x00') + self.assertRaises(ValueError, der.decode, '\x30') + + def testErrDecode2(self): + # Wrong payload type + der = DerSequence() + self.assertRaises(ValueError, der.decode, '\x30\x00\x00', True) + + def testErrDecode3(self): + # Wrong length format + der = DerSequence() + self.assertRaises(ValueError, der.decode, '\x30\x04\x02\x01\x01\x00') + self.assertRaises(ValueError, der.decode, '\x30\x81\x03\x02\x01\x01') + self.assertRaises(ValueError, der.decode, '\x30\x04\x02\x81\x01\x01') + + def testErrDecode4(self): + # Wrong integer format + der = DerSequence() + # Multi-byte encoding for zero + #self.assertRaises(ValueError, der.decode, '\x30\x04\x02\x02\x00\x00') + # Negative integer + self.assertRaises(ValueError, der.decode, '\x30\x04\x02\x01\xFF') + +def get_tests(config={}): + from Crypto.SelfTest.st_common import list_test_cases + return list_test_cases(CounterTests) + +if __name__ == '__main__': + suite = lambda: unittest.TestSuite(get_tests()) + unittest.main(defaultTest='suite') + +# vim:set ts=4 sw=4 sts=4 expandtab: + + diff --git a/lib/Crypto/Util/__init__.py b/lib/Crypto/Util/__init__.py index aecd539..a3bef8a 100644 --- a/lib/Crypto/Util/__init__.py +++ b/lib/Crypto/Util/__init__.py @@ -27,10 +27,11 @@ Crypto.Util.number Number-theoretic functions (primality testing, etc.) Crypto.Util.randpool Random number generation Crypto.Util.RFC1751 Converts between 128-bit keys and human-readable strings of words. +Crypto.Util.asn1 Minimal support for ASN.1 DER encoding """ -__all__ = ['randpool', 'RFC1751', 'number', 'strxor'] +__all__ = ['randpool', 'RFC1751', 'number', 'strxor', 'asn1' ] __revision__ = "$Id$" diff --git a/lib/Crypto/Util/asn1.py b/lib/Crypto/Util/asn1.py new file mode 100644 index 0000000..7e3041b --- /dev/null +++ b/lib/Crypto/Util/asn1.py @@ -0,0 +1,162 @@ +# -*- coding: ascii -*- +# +# Util/asn1.py : Minimal support for ASN.1 DER binary encoding. +# +# =================================================================== +# The contents of this file are dedicated to the public domain. To +# the extent that dedication to the public domain is not available, +# everyone is granted a worldwide, perpetual, royalty-free, +# non-exclusive license to exercise all rights associated with the +# contents of this file for any purpose whatsoever. +# No rights are reserved. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# =================================================================== + +from Crypto.Util.number import long_to_bytes, bytes_to_long + +__all__ = [ 'DerObject', 'DerInteger', 'DerSequence' ] + +class DerObject(object): + typeTags = { 'SEQUENCE':'\x30', 'BIT STRING':'\x03', 'INTEGER':'\x02' } + + def __init__(self, ASN1Type=None): + self.typeTag = self.typeTags.get(ASN1Type, ASN1Type) + self.payload = '' + + def _lengthOctets(self, payloadLen): + ''' + Return an octet string that is suitable for the BER/DER + length element if the relevant payload is of the given + size (in bytes). + ''' + if payloadLen>127: + encoding = long_to_bytes(payloadLen) + return chr(len(encoding)+128) + encoding + return chr(payloadLen) + + def encode(self): + return self.typeTag + self._lengthOctets(len(self.payload)) + self.payload + + def _decodeLen(self, idx, str): + ''' + Given a string and an index to a DER LV, + this function returns a tuple with the length of V + and an index to the first byte of it. + ''' + length = ord(str[idx]) + if length<=127: + return (length,idx+1) + else: + payloadLength = bytes_to_long(str[idx+1:idx+1+(length & 0x7F)]) + if payloadLength<=127: + raise ValueError("Not a DER length tag.") + return (payloadLength, idx+1+(length & 0x7F)) + + def decode(self, input, noLeftOvers=False): + try: + self.typeTag = input[0] + if (ord(self.typeTag) & 0x1F)==0x1F: + raise ValueError("Unsupported DER tag") + (length,idx) = self._decodeLen(1,input) + if noLeftOvers and len(input) != (idx+length): + raise ValueError("Not a DER structure") + self.payload = input[idx:idx+length] + except IndexError: + raise ValueError("Not a valid DER SEQUENCE.") + return idx+length + +class DerInteger(DerObject): + def __init__(self, value = 0): + DerObject.__init__(self, 'INTEGER') + self.value = value + + def encode(self): + self.payload = long_to_bytes(self.value) + if ord(self.payload[0])>127: + self.payload = '\x00' + self.payload + return DerObject.encode(self) + + def decode(self, input, noLeftOvers=False): + tlvLength = DerObject.decode(self, input,noLeftOvers) + if ord(self.payload[0])>127: + raise ValueError ("Negative INTEGER.") + self.value = bytes_to_long(self.payload) + return tlvLength + +class DerSequence(DerObject): + def __init__(self): + DerObject.__init__(self, 'SEQUENCE') + self._seq = [] + def __delitem__(self, n): + del self._seq[n] + def __getitem__(self, n): + return self._seq[n] + def __setitem__(self, key, value): + self._seq[key] = value + def __setslice__(self,i,j,sequence): + self._seq[i:j] = sequence + def __delslice__(self,i,j): + del self._seq[i:j] + def __len__(self): + return len(self._seq) + def append(self, item): + return self._seq.append(item) + + def hasOnlyInts(self): + if not self._seq: return False + for item in self._seq: + if not isinstance(item,(int, long)): + return False + return True + + def encode(self): + ''' + Return the DER encoding for the ASN.1 SEQUENCE containing + the non-negative integers and longs added to this object. + ''' + self.payload = '' + for item in self._seq: + if isinstance(item,(long, int)): + self.payload += DerInteger(item).encode() + elif isinstance(item,basestring): + self.payload += item + else: + raise ValueError("Trying to DER encode an unknown object") + return DerObject.encode(self) + + def decode(self, input,noLeftOvers=False): + ''' + This function decodes the given string into a sequence of + ASN.1 objects. Yet, we only know about unsigned INTEGERs. + Any other type is stored as its rough TLV. In the latter + case, the correctectness of the TLV is not checked. + ''' + self._seq = [] + try: + tlvLength = DerObject.decode(self, input,noLeftOvers) + if self.typeTag!=self.typeTags['SEQUENCE']: + raise ValueError("Not a DER SEQUENCE.") + # Scan one TLV at once + idx = 0 + while idx