Comment 31 for bug 1820083

Revision history for this message
Heather Lemon (hypothetical-lemon) wrote :

For the record this is the proposed unit test to be added. Since the pastebin is set to expire after one year.

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import os
from socket import gethostname

# from OpenSSL import crypto
from etcd3gw.client import Etcd3Client
from etcd3gw.tests import base

def create_self_signed_cert():
    # create a key pair
    pub_key = crypto.PKey()
    pub_key.generate_key(crypto.TYPE_RSA, 2048)

    # create a csr
    csr = crypto.X509Req()
    csr.get_subject().C = "US"
    csr.get_subject().ST = "Boston"
    csr.get_subject().L = "Boston"
    csr.get_subject().O = "Test Company Ltd"
    csr.get_subject().OU = "Test Company Ltd"
    csr.get_subject().CN = gethostname()
    csr.set_pubkey(pub_key)
    csr.sign(pub_key, "sha256")

    # create a self-signed cert
    cert = crypto.X509()
    cert.get_subject().C = "US"
    cert.get_subject().ST = "Boston"
    cert.get_subject().L = "Boston"
    cert.get_subject().O = "Test Company Ltd"
    cert.get_subject().OU = "Test Company Ltd"
    cert.get_subject().CN = gethostname()
    cert.set_serial_number(1000)
    cert.gmtime_adj_notBefore(0)
    cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60)
    cert.set_issuer(cert.get_subject())
    cert.set_pubkey(pub_key)
    cert.sign(pub_key, "sha256")

    with open('cert.crt', 'w') as crt:
        if crt is not None:
            crt.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8"))
    with open('test.key', 'w') as key:
        if key is not None:
            key.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pub_key).decode("utf-8"))
    with open('test.ca', 'w') as ca:
        if ca is not None:
            ca.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr).decode("utf-8"))

    crt.close()
    key.close()
    ca.close()

class TestEtcd3Gateway(base.TestCase):

    def test_client_default(self):
        client = Etcd3Client()
        self.assertEqual("http://localhost:2379/v3alpha/lease/grant",
                         client.get_url("/lease/grant"))

    def test_client_ipv4(self):
        client = Etcd3Client(host="127.0.0.1")
        self.assertEqual("http://127.0.0.1:2379/v3alpha/lease/grant",
                         client.get_url("/lease/grant"))

    def test_client_ipv6(self):
        client = Etcd3Client(host="::1")
        self.assertEqual("http://[::1]:2379/v3alpha/lease/grant",
                         client.get_url("/lease/grant"))

    def test_client_tls(self):
        create_self_signed_cert()
        with open('cert.crt', 'r') as crt_file, \
                open('test.key', 'r') as key_file, \
                open('test.ca', 'r') as ca_file:
            client = Etcd3Client(host="127.0.0.1", protocol="https", ca_cert=ca_file, cert_key=key_file,
                                 cert_cert=crt_file, timeout=10)
            self.assertEqual(client.session.cert, (crt_file, key_file))
            self.assertEqual(client.session.verify, ca_file)
            os.remove("cert.crt")
            os.remove("test.key")
            os.remove("test.ca")