For the record this is the proposed unit test to be added. Since the pastebin is set to expire after one year. # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os from socket import gethostname # from OpenSSL import crypto from etcd3gw.client import Etcd3Client from etcd3gw.tests import base def create_self_signed_cert(): # create a key pair pub_key = crypto.PKey() pub_key.generate_key(crypto.TYPE_RSA, 2048) # create a csr csr = crypto.X509Req() csr.get_subject().C = "US" csr.get_subject().ST = "Boston" csr.get_subject().L = "Boston" csr.get_subject().O = "Test Company Ltd" csr.get_subject().OU = "Test Company Ltd" csr.get_subject().CN = gethostname() csr.set_pubkey(pub_key) csr.sign(pub_key, "sha256") # create a self-signed cert cert = crypto.X509() cert.get_subject().C = "US" cert.get_subject().ST = "Boston" cert.get_subject().L = "Boston" cert.get_subject().O = "Test Company Ltd" cert.get_subject().OU = "Test Company Ltd" cert.get_subject().CN = gethostname() cert.set_serial_number(1000) cert.gmtime_adj_notBefore(0) cert.gmtime_adj_notAfter(10 * 365 * 24 * 60 * 60) cert.set_issuer(cert.get_subject()) cert.set_pubkey(pub_key) cert.sign(pub_key, "sha256") with open('cert.crt', 'w') as crt: if crt is not None: crt.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8")) with open('test.key', 'w') as key: if key is not None: key.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pub_key).decode("utf-8")) with open('test.ca', 'w') as ca: if ca is not None: ca.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr).decode("utf-8")) crt.close() key.close() ca.close() class TestEtcd3Gateway(base.TestCase): def test_client_default(self): client = Etcd3Client() self.assertEqual("http://localhost:2379/v3alpha/lease/grant", client.get_url("/lease/grant")) def test_client_ipv4(self): client = Etcd3Client(host="127.0.0.1") self.assertEqual("http://127.0.0.1:2379/v3alpha/lease/grant", client.get_url("/lease/grant")) def test_client_ipv6(self): client = Etcd3Client(host="::1") self.assertEqual("http://[::1]:2379/v3alpha/lease/grant", client.get_url("/lease/grant")) def test_client_tls(self): create_self_signed_cert() with open('cert.crt', 'r') as crt_file, \ open('test.key', 'r') as key_file, \ open('test.ca', 'r') as ca_file: client = Etcd3Client(host="127.0.0.1", protocol="https", ca_cert=ca_file, cert_key=key_file, cert_cert=crt_file, timeout=10) self.assertEqual(client.session.cert, (crt_file, key_file)) self.assertEqual(client.session.verify, ca_file) os.remove("cert.crt") os.remove("test.key") os.remove("test.ca")