For the record this is the proposed unit test to be added. Since the pastebin is set to expire after one year.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from socket import gethostname
# from OpenSSL import crypto
from etcd3gw.client import Etcd3Client
from etcd3gw.tests import base
with open('cert.crt', 'w') as crt:
if crt is not None: crt.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8"))
with open('test.key', 'w') as key:
if key is not None: key.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pub_key).decode("utf-8"))
with open('test.ca', 'w') as ca:
if ca is not None: ca.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr).decode("utf-8"))
For the record this is the proposed unit test to be added. Since the pastebin is set to expire after one year.
# Licensed under the Apache License, Version 2.0 (the "License"); you may www.apache. org/licenses/ LICENSE- 2.0
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from socket import gethostname
# from OpenSSL import crypto
from etcd3gw.client import Etcd3Client
from etcd3gw.tests import base
def create_ self_signed_ cert(): key.generate_ key(crypto. TYPE_RSA, 2048)
# create a key pair
pub_key = crypto.PKey()
pub_
# create a csr get_subject( ).C = "US" get_subject( ).ST = "Boston" get_subject( ).L = "Boston" get_subject( ).O = "Test Company Ltd" get_subject( ).OU = "Test Company Ltd" get_subject( ).CN = gethostname() set_pubkey( pub_key) sign(pub_ key, "sha256")
csr = crypto.X509Req()
csr.
csr.
csr.
csr.
csr.
csr.
csr.
csr.
# create a self-signed cert get_subject( ).C = "US" get_subject( ).ST = "Boston" get_subject( ).L = "Boston" get_subject( ).O = "Test Company Ltd" get_subject( ).OU = "Test Company Ltd" get_subject( ).CN = gethostname() set_serial_ number( 1000) gmtime_ adj_notBefore( 0) gmtime_ adj_notAfter( 10 * 365 * 24 * 60 * 60) set_issuer( cert.get_ subject( )) set_pubkey( pub_key) sign(pub_ key, "sha256")
cert = crypto.X509()
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
with open('cert.crt', 'w') as crt:
crt. write(crypto. dump_certificat e(crypto. FILETYPE_ PEM, cert).decode( "utf-8" ))
key. write(crypto. dump_privatekey (crypto. FILETYPE_ PEM, pub_key) .decode( "utf-8" ))
ca. write(crypto. dump_certificat e_request( crypto. FILETYPE_ PEM, csr).decode( "utf-8" ))
if crt is not None:
with open('test.key', 'w') as key:
if key is not None:
with open('test.ca', 'w') as ca:
if ca is not None:
crt.close()
key.close()
ca.close()
class TestEtcd3Gatewa y(base. TestCase) :
def test_client_ default( self):
self.assertEqu al("http:// localhost: 2379/v3alpha/ lease/grant",
client. get_url( "/lease/ grant") )
client = Etcd3Client()
def test_client_ ipv4(self) : host="127. 0.0.1")
self.assertEqu al("http:// 127.0.0. 1:2379/ v3alpha/ lease/grant",
client. get_url( "/lease/ grant") )
client = Etcd3Client(
def test_client_ ipv6(self) : host=": :1")
self.assertEqu al("http://[::1]:2379/ v3alpha/ lease/grant" ,
client. get_url( "/lease/ grant") )
client = Etcd3Client(
def test_client_ tls(self) :
create_ self_signed_ cert()
open( 'test.key' , 'r') as key_file, \
open( 'test.ca' , 'r') as ca_file: host="127. 0.0.1", protocol="https", ca_cert=ca_file, cert_key=key_file,
cert_ cert=crt_ file, timeout=10)
self. assertEqual( client. session. cert, (crt_file, key_file))
self. assertEqual( client. session. verify, ca_file)
os. remove( "cert.crt" )
os. remove( "test.key" )
os. remove( "test.ca" )
with open('cert.crt', 'r') as crt_file, \
client = Etcd3Client(