# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import ssl
import socket
import threading
from OpenSSL import crypto
from etcd3gw.client import Etcd3Client
from etcd3gw.tests import base
from future.backports.http.server import (HTTPServer as _HTTPServer, SimpleHTTPRequestHandler, BaseHTTPRequestHandler)
with open(cert_file, 'w') as crt:
if crt is not None: crt.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert).decode("utf-8"))
with open(key_file, 'w') as key:
if key is not None: key.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pub_key).decode("utf-8"))
with open(ca_file, 'w') as ca:
if ca is not None: ca.write(crypto.dump_certificate_request(crypto.FILETYPE_PEM, csr).decode("utf-8"))
# Licensed under the Apache License, Version 2.0 (the "License"); you may www.apache. org/licenses/ LICENSE- 2.0
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import ssl
import socket
import threading
from OpenSSL import crypto backports. http.server import (HTTPServer as _HTTPServer,
SimpleHTTPReque stHandler, BaseHTTPRequest Handler)
from etcd3gw.client import Etcd3Client
from etcd3gw.tests import base
from future.
class ETCDMock( _HTTPServer) :
def __init__(self, server_address, handler_class, context):
_HTTPServer. __init_ _(self, server_address, handler_class)
self.context = context
def __str__(self):
(self. __class_ _.__name_ _,
self. server_ name,
self. server_ port))
return ('<%s %s:%s>' %
def get_request(self): accept( ) wrap_socket( sock, server_side=True)
self. sock = sock
print( "failure in etcdservermock: %s" % e)
try:
sock, addr = self.socket.
sslconn = self.context.
except socket.error as e:
exit(1)
return sslconn, addr
class ETCDMockRequest Handler( SimpleHTTPReque stHandler) : version = "HTTP/1.0"
protocol_
def do_GET(self):
example_ response = b"{health:true}"
self. send_response( 200)
self. send_header( "Content- Type", "application/json")
self. send_header( "Content- Length" , len(example_ response) )
self. end_headers( )
self. wfile.write( example_ response)
super( ).do_GET( )
if self.path == "/health":
else:
def do_POST(self): status" :
example_ response = b"{health:true}"
self. send_response( 200)
self. send_header( "Content- Type", "application/json")
self. send_header( "Content- Length" , len(example_ response) )
self. end_headers( )
self. wfile.write( example_ response)
super( ).do_POST( )
if self.path == "/maintenance/
else:
class ETCDServerThrea d(threading. Thread) :
def __init__(self, context):
self.flag = None ('127.0. 0.1', 2379),
ETCDMockRequ estHandler,
context) server_ port
threading. Thread. __init_ _(self)
self.server = ETCDMock(
self.port = self.server.
self.daemon = True
def __str__(self): _class_ _.__name_ _, self.server)
return "<%s %s>" % (self._
def start(self, flag=None):
threading. Thread. start(self)
self.flag = flag
def run(self):
self. flag.set( )
self. server. serve_forever( 0.05)
self. server. server_ close()
if self.flag:
try:
finally:
def stop(self):
self.server. shutdown( )
def create_ self_signed_ cert(): key.generate_ key(crypto. TYPE_RSA, 2048)
# create a key pair
pub_key = crypto.PKey()
pub_
# create a csr get_subject( ).C = "US" get_subject( ).ST = "Boston" get_subject( ).L = "Boston" get_subject( ).O = "Test Company Ltd" get_subject( ).OU = "Test Company Ltd" get_subject( ).CN = "127.0.0.1" set_pubkey( pub_key) sign(pub_ key, "sha256")
csr = crypto.X509Req()
csr.
csr.
csr.
csr.
csr.
csr.
csr.
csr.
# create a self-signed cert get_subject( ).C = "US" get_subject( ).ST = "Boston" get_subject( ).L = "Boston" get_subject( ).O = "Test Company Ltd" get_subject( ).OU = "Test Company Ltd" get_subject( ).CN = "127.0.0.1" set_serial_ number( 1000) gmtime_ adj_notBefore( 0) gmtime_ adj_notAfter( 10 * 365 * 24 * 60 * 60) set_issuer( cert.get_ subject( )) set_pubkey( pub_key) sign(pub_ key, "sha256")
cert = crypto.X509()
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert.
cert_file = 'test.crt'
key_file = 'test.key'
ca_file = 'test.ca'
with open(cert_file, 'w') as crt:
crt. write(crypto. dump_certificat e(crypto. FILETYPE_ PEM, cert).decode( "utf-8" ))
key. write(crypto. dump_privatekey (crypto. FILETYPE_ PEM, pub_key) .decode( "utf-8" ))
ca. write(crypto. dump_certificat e_request( crypto. FILETYPE_ PEM, csr).decode( "utf-8" ))
if crt is not None:
with open(key_file, 'w') as key:
if key is not None:
with open(ca_file, 'w') as ca:
if ca is not None:
return cert_file, key_file, cert_file
class TestEtcd3Gatewa y(base. TestCase) :
def test_client_ default( self):
self.assertEqu al("http:// localhost: 2379/v3alpha/ lease/grant",
client. get_url( "/lease/ grant") )
client = Etcd3Client()
def test_client_ ipv4(self) : host="127. 0.0.1")
self.assertEqu al("http:// 127.0.0. 1:2379/ v3alpha/ lease/grant",
client. get_url( "/lease/ grant") )
client = Etcd3Client(
def test_client_ ipv6(self) : host=": :1")
self.assertEqu al("http://[::1]:2379/ v3alpha/ lease/grant" ,
client. get_url( "/lease/ grant") )
client = Etcd3Client(
def test_client_ tls(self) : self_signed_ cert()
cert_file, key_file, ca_file = create_
ctx = ssl.SSLContext()
ctx.load_ cert_chain( certfile= cert_file, keyfile=key_file)
ctx.load_ verify_ locations( cafile= ca_file)
server = ETCDServerThrea d(ctx)
server. start(flag) host="127. 0.0.1", protocol="https", ca_cert=ca_file,
cert_ key=key_ file,
cert_ cert=cert_ file, timeout=10)
flag = threading.Event()
try:
client = Etcd3Client(
try:
except ValueError as e:
print( e, "Connection failure to TLS etcd")
os. remove( cert_file)
os. remove( key_file)
os.remove( ca_file)
client. session. close()
server. stop()
finally:
if ca_file != cert_file: