Skip to content
Snippets Groups Projects

#36

Merged Tobias Elvermann requested to merge #36 into main
1 file
+ 219
170
Compare changes
  • Side-by-side
  • Inline
import os
import os
import re
import glob
import glob
import yaml
import yaml
import json
import json
import subprocess
import argparse
import argparse
from kubernetes import client, config
from kubernetes import client, config
import base64
import base64
 
import threading
 
import logging
 
from typing import Optional, Dict
 
logging.basicConfig(level=logging.INFO,
 
format='%(asctime)s - %(filename)s:%(lineno)d\t- %(levelname)s - %(message)s')
class DockerInfo:
config.load_kube_config()
def __init__(self):
corev1api = client.CoreV1Api()
print("")
appsv1api = client.AppsV1Api()
 
class DockerInfo:
def update_node_port(self, ports_mapping, filename):
def update_node_port(self, ports_mapping, filename):
print("Start updating the docker info Json : ")
logging.info("Start updating the docker info JSON.")
 
with open(filename, "r") as jsonFile:
with open(filename, "r") as jsonFile:
data = json.load(jsonFile)
data = json.load(jsonFile)
for x in range(len(data["docker_info_list"])):
self._update_docker_info_list(data["docker_info_list"], ports_mapping)
container_name = (data["docker_info_list"][x]["container_name"]).lower()
data["docker_info_list"][x]["port"] = ports_mapping[container_name]
logging.info(f"update_node_port: {data['docker_info_list']}")
### Updates the container names
data["docker_info_list"][x]["container_name"] = container_name
### Update the ip_address
ip_address = (data["docker_info_list"][x]["ip_address"]).lower()
data["docker_info_list"][x]["ip_address"] = ip_address
print("update_node_port: %s" % data["docker_info_list"])
with open(filename, "w") as jsonFile:
with open(filename, "w") as jsonFile:
json.dump(data, jsonFile)
json.dump(data, jsonFile)
print("\n Docker info file is successfully updated ")
logging.info("Docker info file successfully updated.")
 
 
def _update_docker_info_list(self, docker_info_list: list, ports_mapping: dict):
 
"""Updates the port and standardizes naming in the docker info list."""
 
for info in docker_info_list:
 
container_name = info["container_name"].lower()
 
info["port"] = ports_mapping[container_name]
 
info["container_name"] = container_name
 
info["ip_address"] = info["ip_address"].lower()
 
class Deployment:
class Deployment:
 
WEBUI_PORT_NAME = "webui"
 
WEBUI_TARGET_PORT = 8062
 
WEBUI_SUFFIX = "_webui.yaml"
 
def __init__(self, namespace, start_port=30000, end_port=32767, base_path=""):
def __init__(self, namespace, start_port=30000, end_port=32767, base_path=""):
self.namespace = namespace
self.namespace = namespace
self.base_path = base_path
self.base_path = base_path
@@ -49,20 +58,13 @@ class Deployment:
@@ -49,20 +58,13 @@ class Deployment:
if not self.is_valid_namespace():
if not self.is_valid_namespace():
raise("deployment is invalid")
raise("deployment is invalid")
if not os.path.isdir(self.get_deployment_dir()):
if not os.path.isdir(self.get_deployment_dir()):
print(base_path)
logging.error(base_path)
print(self.get_deployment_dir())
logging.error(self.get_deployment_dir())
raise("Path to the target directory is invalid : ")
raise("Path to the target directory is invalid : ")
def get_deployment_dir(self):
def get_deployment_dir(self):
return os.path.join(self.base_path,"deployments")
return os.path.join(self.base_path,"deployments")
def get_next_free_port(self):
if(self.port is None):
self.port = 8061
else:
self.port = self.port + 1
return self.port
def get_current_dir(self):
def get_current_dir(self):
return os.getcwd()
return os.getcwd()
@@ -80,146 +82,189 @@ class Deployment:
@@ -80,146 +82,189 @@ class Deployment:
for container in containers:
for container in containers:
old_policy = container.get('imagePullPolicy', None)
old_policy = container.get('imagePullPolicy', None)
if old_policy is not None and old_policy != new_policy:
if old_policy is not None and old_policy != new_policy:
print("set_image_pull_policy changing imagePullPolicy from", old_policy, "to", new_policy)
logging.info(f"set_image_pull_policy changing imagePullPolicy from {old_policy} to {new_policy}")
elif old_policy is None:
elif old_policy is None:
print("set_image_pull_policy setting imagePullPolicy to", new_policy)
logging.info(f"set_image_pull_policy setting imagePullPolicy to {new_policy}")
container['imagePullPolicy'] = new_policy
container['imagePullPolicy'] = new_policy
with open(deployment_file_name, "w") as f:
with open(deployment_file_name, "w") as f:
yaml.dump(doc, f)
yaml.dump(doc, f)
except Exception:
except Exception:
# if we process a file that is not a deployment - warn
# if we process a file that is not a deployment - warn
print("WARNING: set_image_pull_policy encountered incompatible input file", deployment_file_name)
logging.warning(f"Set_image_pull_policy encountered incompatible input file {deployment_file_name}")
def set_port(self, file_name, port):
def get_node_port(self, service_name: str) -> Optional[int]:
print("set_port in", file_name, "to", port)
try:
with open(file_name) as f:
service = corev1api.read_namespaced_service(service_name, self.namespace)
doc = yaml.safe_load(f)
for port in service.spec.ports:
 
if port.node_port:
 
return port.node_port
 
except client.exceptions.ApiException as e:
 
logging.error(f"An error occurred while fetching node port for {service_name}: {e}")
 
return None
 
 
def update_yaml_ports(self, file_name: str) -> None:
 
doc = self._load_yaml(file_name)
 
if not doc:
 
return
doc['spec']['ports'][0]['port'] = port
service_name = doc['metadata']['name']
 
node_port = self.get_node_port(service_name)
name = doc['metadata']['name']
if node_port:
self.port_mapping[name] = port
logging.info(f"Setting node_port in {file_name} to {node_port}")
 
doc['spec']['ports'][0]['nodePort'] = node_port
 
doc['spec']['ports'][0]['port'] = node_port
 
self.port_mapping[service_name] = node_port
 
self._write_yaml(file_name, doc)
with open(file_name, "w") as f:
def _load_yaml(self, file_name: str) -> Optional[dict]:
yaml.dump(doc, f)
try:
 
with open(file_name) as f:
 
return yaml.safe_load(f)
 
except Exception as e:
 
logging.error(f"An error occurred while loading {file_name}: {e}")
 
return None
def get_node_port(self, service):
def _write_yaml(self, file_name: str, doc: dict) -> None:
process = subprocess.run([
try:
'kubectl', '-n', self.namespace, 'get', 'svc', service, '-o',
with open(file_name, "w") as f:
'go-template={{range .spec.ports}}{{if .nodePort}}{{.nodePort}}{{end}}{{end}}'],
yaml.dump(doc, f)
check=True, stdout=subprocess.PIPE, universal_newlines=True)
except Exception as e:
 
logging.error(f"An error occurred while writing to {file_name}: {e}")
return int(process.stdout)
def apply_yaml_process(self, file_name: str):
 
yaml_content = self._load_yaml(file_name)
 
kind = yaml_content.get('kind')
 
name = yaml_content.get('metadata', {}).get('name')
 
 
handlers = {
 
"Deployment": self._handle_deployment,
 
"Service": self._handle_service,
 
"PersistentVolumeClaim": self._handle_pvc
 
}
def update_yaml_ports(self, file_name):
handler = handlers.get(kind)
with open(file_name) as f:
if not handler:
doc = yaml.safe_load(f)
raise ValueError(f"Unsupported kind: {kind}")
 
handler(name, yaml_content)
service_name = doc['metadata']['name']
def _handle_deployment(self, name: str, yaml_content: Dict):
node_port = self.get_node_port(service_name)
try:
appsv1api.read_namespaced_deployment(name, namespace=self.namespace)
print("set_node_port in", file_name, "to", node_port)
appsv1api.replace_namespaced_deployment(name, namespace=self.namespace, body=yaml_content)
doc['spec']['ports'][0]['nodePort'] = node_port
logging.info(f"Deployment {name} updated.")
doc['spec']['ports'][0]['port'] = node_port
except client.exceptions.ApiException as e:
 
if e.status == 404: # Not Found
 
appsv1api.create_namespaced_deployment(body=yaml_content, namespace=self.namespace)
 
logging.info(f"Deployment {name} created.")
 
else:
 
raise
self.port_mapping[service_name] = node_port
def _handle_service(self, name: str, yaml_content: Dict):
 
try:
 
existing_service = corev1api.read_namespaced_service(name, namespace=self.namespace)
 
yaml_content['metadata']['resourceVersion'] = existing_service.metadata.resource_version
 
if not yaml_content['spec'].get('clusterIP'):
 
yaml_content['spec']['clusterIP'] = existing_service.spec.cluster_ip
 
corev1api.replace_namespaced_service(name, namespace=self.namespace, body=yaml_content)
 
logging.info(f"Service {name} updated.")
 
except client.exceptions.ApiException as e:
 
if e.status == 404: # Not Found
 
corev1api.create_namespaced_service(body=yaml_content, namespace=self.namespace)
 
logging.info(f"Service {name} created.")
 
else:
 
raise
with open(file_name, "w") as f:
def _handle_pvc(self, name: str, yaml_content: Dict):
yaml.dump(doc, f)
try:
 
existing_pvc = corev1api.read_namespaced_persistent_volume_claim(name, namespace=self.namespace)
 
yaml_content['metadata']['resourceVersion'] = existing_pvc.metadata.resource_version
 
corev1api.replace_namespaced_persistent_volume_claim(name, namespace=self.namespace, body=yaml_content)
 
logging.info(f"PersistentVolumeClaim {name} updated.")
 
except client.exceptions.ApiException as e:
 
if e.status == 404: # Not Found
 
corev1api.create_namespaced_persistent_volume_claim(body=yaml_content, namespace=self.namespace)
 
logging.info(f"PersistentVolumeClaim {name} created.")
 
else:
 
raise
def apply_yaml_process(self, file_name):
process = subprocess.run(['kubectl', '-n', self.namespace, 'apply', '-f', file_name], check=True,
stdout=subprocess.PIPE,
universal_newlines=True)
return process
def apply_yaml(self, file_name, image_pull_policy):
def apply_yaml(self, file_name, image_pull_policy):
print("apply_yaml:", file_name)
logging.info(f"apply_yaml: {file_name}")
if self.is_service(file_name):
if not self.is_service(file_name):
port = self.get_next_free_port()
self.set_image_pull_policy(deployment_file_name=file_name, new_policy=image_pull_policy)
self.set_port(file_name, port)
else:
self.set_image_pull_policy(file_name, image_pull_policy)
process = self.apply_yaml_process(file_name)
self.apply_yaml_process(file_name=file_name)
if self.is_service(file_name):
if self.is_service(file_name=file_name):
self.update_yaml_ports(file_name=file_name)
self.update_yaml_ports(file_name=file_name)
process = self.apply_yaml_process(file_name)
self.apply_yaml_process(file_name=file_name)
output = process.stdout
def create_web_ui_service_yaml(self, file_service):
name = output.split(" ")
logging.info(f"File name of service yaml = {file_service}")
print(" apply got %s" % name)
return name[0]
def delete_deployment_services(self, names):
doc = self._load_yaml(file_name=file_service)
for name in names:
process = subprocess.run(['kubectl', '-n', self.namespace, 'delete', str(name)], check=True,
stdout=subprocess.PIPE,
universal_newlines=True)
output = process.stdout
print("delete_deployment_services output %s" % output)
def create_web_ui_service_yaml(self, file_service):
# Modify the loaded YAML
print("file_name of service yaml =", file_service)
metadata = doc.get('metadata', {})
with open(file_service) as f:
if not metadata:
doc = yaml.safe_load(f)
logging.error("Metadata not found in YAML")
 
return None
port_name = "webui"
service_name = metadata.get('name')
target_port = 8062
if not service_name:
name = (doc['metadata']['name']) + "webui"
logging.error("Service name not found in YAML metadata")
doc['metadata']['name'] = name
return None
doc['spec']['ports'][0]['name'] = port_name
doc['spec']['ports'][0]['targetPort'] = target_port
assert file_service.endswith('.yaml')
webui_name = service_name + self.WEBUI_PORT_NAME
file_service_web_ui = file_service[:-5] + '_webui.yaml'
doc['metadata']['name'] = webui_name
with open(file_service_web_ui, "w") as f:
yaml.dump(doc, f)
ports = doc.get('spec', {}).get('ports')
 
if not ports or not isinstance(ports, list) or len(ports) == 0:
 
logging.error("Ports not found or invalid in service spec")
 
return None
 
 
ports[0]['name'] = self.WEBUI_PORT_NAME
 
ports[0]['targetPort'] = self.WEBUI_TARGET_PORT
 
 
# Save the modified YAML as a new file
 
if not file_service.endswith('.yaml'):
 
logging.error("Provided file doesn't have a .yaml extension")
 
return None
 
 
file_service_web_ui = file_service[:-len('.yaml')] + self.WEBUI_SUFFIX
 
self._write_yaml(file_name=file_service_web_ui, doc=doc)
return file_service_web_ui
return file_service_web_ui
def get_namespaces(self):
def get_namespaces(self) -> Dict[str, bool]:
process = subprocess.run(['kubectl', 'get', 'namespaces'], check=True,
namespaces = corev1api.list_namespace()
stdout=subprocess.PIPE,
universal_newlines=True)
# Dictionary to hold namespace names and their active status
namespaces = process.stdout
namespace_statuses = {ns.metadata.name: ns.status.phase == "Active" for ns in namespaces.items}
return namespaces
return namespace_statuses
def get_service_ip_address(self, namespce, service_name):
process = subprocess.run(['kubectl', '-n', namespce, 'get', service_name], check=True, stdout=subprocess.PIPE,
universal_newlines=True)
# print(process.type())
output = process.stdout
name = output.split(" ")
name1 = [x for x in name if x]
return name1[7]
def get_node_ip_address(self):
def get_node_ip_address(self):
process = subprocess.run(['kubectl', 'get', 'node', '-o', 'wide'], check=True,
node_info = corev1api.list_node(watch=False)
stdout=subprocess.PIPE,
for node in node_info.items:
universal_newlines=True)
for address in node.status.addresses:
output = process.stdout
if address.type == "ExternalIP" or address.type == "InternalIP":
output_split = output.split(" ")
return address.address
output_clean = [x for x in output_split if x]
return output_clean[14]
def is_valid_namespace(self):
def is_valid_namespace(self):
existing_namespaces = [x for x in (re.split('[ \n]', self.get_namespaces())) if x]
existing_namespaces = self.get_namespaces()
if existing_namespaces.__contains__(self.namespace):
if self.namespace in existing_namespaces:
index = existing_namespaces.index(self.namespace)
if existing_namespaces[self.namespace]:
if existing_namespaces[index + 1] == 'Active':
logging.info("Given namespace is active ")
print("Given namespace is active ")
return True
return True
else:
else:
print("Given namespace is inactive ")
logging.info("Given namespace is inactive ")
return False
return False
else:
else:
print("Name of your given namespace is invalid")
logging.error("Name of your given namespace is invalid")
print("Existing namespaces are: ", existing_namespaces)
logging.error(f"Existing namespaces are: {existing_namespaces}")
return False
return False
def is_orchestrator_present(self, path):
def is_orchestrator_present(self, path):
@@ -227,74 +272,78 @@ class Deployment:
@@ -227,74 +272,78 @@ class Deployment:
for root, dirs, files in os.walk(path):
for root, dirs, files in os.walk(path):
if orchestrator_client in files:
if orchestrator_client in files:
return True
return True
 
return False
class KubernetesSecret:
class KubernetesSecret:
def __init__(self, namespace):
def __init__(self, namespace: str):
config.load_kube_config()
"""Initialize the KubernetesSecret class."""
self.api_instance = client.CoreV1Api()
self.namespace = namespace
self.namespace = namespace
def _get_secret_data(self, path_docker_config):
with open(path_docker_config) as docker_config_file:
def _get_secret_data(self, path_docker_config: str) -> dict:
 
"""Read and base64-encode the docker config file."""
 
with open(path_docker_config, 'r') as docker_config_file:
docker_config_json = docker_config_file.read()
docker_config_json = docker_config_file.read()
secret_data = {
secret_data = {
".dockerconfigjson": base64.b64encode(docker_config_json.encode()).decode()
".dockerconfigjson": base64.b64encode(docker_config_json.encode()).decode()
}
}
return secret_data
return secret_data
def _get_secret_metadata(self, name_secret):
def _get_secret_metadata(self, name_secret: str) -> client.V1ObjectMeta:
metadata = client.V1ObjectMeta(
"""Create secret metadata."""
name=name_secret
return client.V1ObjectMeta(name=name_secret)
)
return metadata
def _configure_secret(self, metadata, secret_data):
def _configure_secret(self, metadata: client.V1ObjectMeta, secret_data: dict) -> client.V1Secret:
secret = client.V1Secret(
"""Configure the secret."""
 
return client.V1Secret(
api_version="v1",
api_version="v1",
kind="Secret",
kind="Secret",
data=secret_data,
data=secret_data,
metadata=metadata,
metadata=metadata,
type="kubernetes.io/dockerconfigjson"
type="kubernetes.io/dockerconfigjson"
)
)
return secret
def _get_secret(self, path_docker_config, name_secret):
def _get_secret(self, path_docker_config: str, name_secret: str) -> client.V1Secret:
 
"""Prepare the Kubernetes secret from the docker config."""
metadata = self._get_secret_metadata(name_secret)
metadata = self._get_secret_metadata(name_secret)
secret_data = self._get_secret_data(path_docker_config)
secret_data = self._get_secret_data(path_docker_config)
return self._configure_secret(metadata, secret_data)
return self._configure_secret(metadata, secret_data)
def _create_secret(self, secret):
# api_instance = client.CoreV1Api()
api_response = self.api_instance.create_namespaced_secret(
namespace=self.namespace,
body=secret,
)
print(f"Secret {api_response.metadata.name} created in the namespace {api_response.metadata.namespace}")
def create_secret(self, path_docker_config, name_secret="my-secret"):
def _create_secret(self, secret: client.V1Secret):
 
"""Create the secret in the Kubernetes cluster."""
 
try:
 
api_response = corev1api.create_namespaced_secret(
 
namespace=self.namespace,
 
body=secret,
 
)
 
logging.info(f"Secret {api_response.metadata.name} created in the namespace {api_response.metadata.namespace}")
 
except Exception as e:
 
logging.error(f"Error creating the secret: {e}")
 
 
def create_secret(self, path_docker_config: str, name_secret: str = "my-secret"):
 
"""Public method to create a Kubernetes secret from a docker config."""
secret = self._get_secret(path_docker_config, name_secret)
secret = self._get_secret(path_docker_config, name_secret)
self._create_secret(secret)
self._create_secret(secret)
def apply_yamls(image_pull_policy, deployment):
def apply_yamls(image_pull_policy, deployment):
 
threads = []
yaml_files = glob.glob(deployment.get_deployment_dir() + "/*.yaml")
yaml_files = glob.glob(deployment.get_deployment_dir() + "/*.yaml")
for yaml_file in yaml_files:
for yaml_file in yaml_files:
if yaml_file.endswith('webui.yaml'):
if yaml_file.endswith('webui.yaml'):
continue
continue
if deployment.is_service(yaml_file):
if deployment.is_service(yaml_file):
yaml_file_web_ui = deployment.create_web_ui_service_yaml(yaml_file)
yaml_file_web_ui = deployment.create_web_ui_service_yaml(yaml_file)
deployment.apply_yaml(file_name=yaml_file_web_ui, image_pull_policy=image_pull_policy)
t = threading.Thread(target=deployment.apply_yaml, args=(yaml_file_web_ui, image_pull_policy))
 
threads.append(t)
 
t.start()
deployment.apply_yaml(file_name=yaml_file, image_pull_policy=image_pull_policy)
t = threading.Thread(target=deployment.apply_yaml, args=(yaml_file, image_pull_policy))
print(deployment.port_mapping)
threads.append(t)
 
t.start()
 
 
for t in threads:
 
t.join()
 
logging.info(f"Port mapping: {deployment.port_mapping}")
def create_dockerinfo(base_path, deployment):
def create_dockerinfo(base_path, deployment):
dockerInfo = DockerInfo()
dockerInfo = DockerInfo()
@@ -308,11 +357,11 @@ def create_secret(namespace, path_docker_config, name_secret):
@@ -308,11 +357,11 @@ def create_secret(namespace, path_docker_config, name_secret):
def run_client(args):
def run_client(args):
namespace = args.namespace
namespace = args.namespace
print(f"namespace = {namespace}")
logging.info(f"namespace = {namespace}")
image_pull_policy=args.image_pull_policy
image_pull_policy=args.image_pull_policy
print(f"image_pull_policy = {image_pull_policy}")
logging.info(f"image_pull_policy = {image_pull_policy}")
base_path=args.base_path
base_path=args.base_path
print(f"base_path = {base_path}")
logging.info(f"base_path = {base_path}")
deployment = Deployment(namespace=namespace, base_path=base_path)
deployment = Deployment(namespace=namespace, base_path=base_path)
@@ -325,9 +374,9 @@ def run_client(args):
@@ -325,9 +374,9 @@ def run_client(args):
create_secret(namespace=namespace, path_docker_config=args.path_docker_secret, name_secret=args.secret_name)
create_secret(namespace=namespace, path_docker_config=args.path_docker_secret, name_secret=args.secret_name)
if deployment.is_orchestrator_present(base_path):
if deployment.is_orchestrator_present(base_path):
print("Node IP-address : " + deployment.get_node_ip_address())
logging.info(f"Node IP-address : {deployment.get_node_ip_address()}")
print("Orchestrator Port is : " + str(deployment.port_mapping.get('orchestrator')))
logging.info(f"Orchestrator Port is {str(deployment.port_mapping.get('orchestrator'))}")
print("Please run python orchestrator_client/orchestrator_client.py --endpoint=%s:%d --basepath=./" % (deployment.get_node_ip_address(), deployment.port_mapping.get('orchestrator')))
logging.info(f"Please run python orchestrator_client/orchestrator_client.py --endpoint={deployment.get_node_ip_address()}:{deployment.port_mapping.get('orchestrator')} --basepath=./")
def main():
def main():
Loading