Newer
Older
import kopf
import os
import sys
import json
import subprocess
import secrets
import kubernetes
from subprocess import CalledProcessError
from random import randrange
from kubernetes import client, config
import pymysql
import random
import time
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import requests
import re
import sys
from requests.auth import HTTPBasicAuth
databaseIP = "127.0.0.1"
baseControllerUrl = 'http://' + os.environ['CONTROLLER_IP'] + ':8181' + '/onos/v1'
def beginSessionController(baseControllerUrl,username,password):
# Create a session with basic authentication
auth = HTTPBasicAuth(username, password)
session = requests.Session()
session.auth = auth
#Check if connection is possible
response = session.get(baseControllerUrl + '/l2sm/networks/status')
if response.status_code == 200:
# Successful request
print("Initialized session between operator and controller.")
return session
else:
print("Could not initialize session with l2sm-controller")
sys.exit()
return None
session = beginSessionController(baseControllerUrl,"karaf","karaf")
def getSwitchId(cur, node):
switchQuery = "SELECT * FROM switches WHERE node = '%s'" % (node)
cur.execute(switchQuery)
switchRecord = cur.fetchone()
if switchRecord is not None:
switchId = switchRecord[0]
if switchId is not None:
return switchId # If openflowId is already set, return it
# If openflowId is not set, make a request to get the information from the API
response = session.get(baseControllerUrl + '/devices')
devices = response.json().get('devices', [])
for device in devices:
if 'id' in device and 'annotations' in device and 'managementAddress' in device['annotations']:
if device['annotations']['managementAddress'] == switchRecord[1]:
openflowId = device['id']
switchId = openflowId
# Save the openflowId in the database
updateQuery = "UPDATE switches SET openflowId = '%s' WHERE node = '%s'" % (switchId, node)
cur.execute(updateQuery)
return switchId # Return the openflowId
return None # Return None if no matching device is found
#POPULATE DATABASE ENTRIES WHEN A NEW L2SM POD IS CREATED (A NEW NODE APPEARS)
@kopf.on.create('pods.v1', labels={'l2sm-component': 'l2sm-switch'})
def build_db(body, logger, annotations, **kwargs):
db = pymysql.connect(host=databaseIP,user="l2sm",password="l2sm;",db="L2SM")
#CREATE TABLES IF THEY DO NOT EXIST
table1 = "CREATE TABLE IF NOT EXISTS networks (network TEXT NOT NULL, id TEXT NOT NULL);"
table2 = "CREATE TABLE IF NOT EXISTS interfaces (interface TEXT NOT NULL, node TEXT NOT NULL, network TEXT, pod TEXT);"
table3 = "CREATE TABLE IF NOT EXISTS switches (openflowId TEXT, ip TEXT, node TEXT NOT NULL);"
cur.execute(table1)
cur.execute(table2)
#MODIFY THE END VALUE TO ADD MORE INTERFACES
Alex ubuntu vm
committed
values.append(['veth'+str(i), body['spec']['nodeName'], '-1', ''])
sqlInterfaces = "INSERT INTO interfaces (interface, node, network, pod) VALUES (%s, %s, %s, %s)"
cur.executemany(sqlInterfaces, values)
db.commit()
#ADD The switch identification to the database, without the of13 id yet, as it may not be connected yet.
sqlSwitch = "INSERT INTO switches (node) VALUES ('" + body['spec']['nodeName'] + "')"
cur.execute(sqlSwitch)
db.close()
logger.info(f"Node {body['spec']['nodeName']} has been registered in the operator")
Alex ubuntu vm
committed
@kopf.on.field('pods.v1', labels={'l2sm-component': 'l2sm-switch'}, field='status.podIP')
def update_db(body, logger, annotations, **kwargs):
if 'status' in body and 'podIP' in body['status']:
db = pymysql.connect(host=databaseIP,user="l2sm",password="l2sm;",db="L2SM")
cur = db.cursor()
updateQuery = "UPDATE switches SET ip = '%s', OpenFlowId = NULL WHERE node = '%s'" % (body['status']['podIP'], body['spec']['nodeName'])
cur.execute(updateQuery)
db.commit()
db.close()
logger.info(f"Updated switch ip")
#UPDATE DATABASE WHEN NETWORK IS CREATED, I.E: IS A MULTUS CRD WITH OUR DUMMY INTERFACE PRESENT IN ITS CONFIG
#@kopf.on.create('NetworkAttachmentDefinition', field="spec.config['device']", value='l2sm-vNet')
@kopf.on.create('NetworkAttachmentDefinition', when=lambda spec, **_: '"device": "l2sm-vNet"' in spec['config'])
def create_vn(spec, name, namespace, logger, **kwargs):
db = pymysql.connect(host=databaseIP,user="l2sm",password="l2sm;",db="L2SM")
cur = db.cursor()
id = secrets.token_hex(32)
sql = "INSERT INTO networks (network, id) VALUES ('%s', '%s')" % (name.strip(), id.strip())
cur.execute(sql)
db.commit()
db.close()
# Create the network in the controller, using a post request
data = {
"networkId": name.strip()
}
# json_payload = {
# "Content-Type": "application/json",
# "data": payload
# }
response = session.post(baseControllerUrl + '/l2sm/networks', json=data)
# Check the response status
if response.status_code == 204:
logger.info(f"Network has been created")
#print("Response:", response.json())
else:
# Handle errors
logger.info(f"Network could not be created, check controller status")
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#ASSIGN POD TO NETWORK (TRIGGERS ONLY IF ANNOTATION IS PRESENT)
@kopf.on.create('pods.v1', annotations={'k8s.v1.cni.cncf.io/networks': kopf.PRESENT})
def pod_vn(body, name, namespace, logger, annotations, **kwargs):
#GET MULTUS INTERFACES IN THE DESCRIPTOR
#IN QUARANTINE: SLOWER THAN MULTUS!!!!!
time.sleep(random.uniform(0,0.8)) #Make sure the database is not consulted at the same time to avoid overlaping
multusInt = annotations.get('k8s.v1.cni.cncf.io/networks').split(",")
#VERIFY IF NETWORK IS PRESENT IN THE CLUSTER
api = client.CustomObjectsApi()
items = api.list_namespaced_custom_object('k8s.cni.cncf.io', 'v1', namespace, 'network-attachment-definitions').get('items')
resources = []
# NETWORK POSITION IN ANNOTATION
network = []
#FIND OUR NETWORKS IN MULTUS
for i in items:
if '"device": "l2sm-vNet"' in i['spec']['config']:
resources.append(i['metadata']['name'])
for k in range(len(multusInt)):
multusInt[k] = multusInt[k].strip()
if multusInt[k] in resources:
network.append(k)
#IF THERE ARE NO NETWORKS, LET MULTUS HANDLE THIS
if not network:
return
#CHECK IF NODE HAS FREE VIRTUAL INTERFACES LEFT
v1 = client.CoreV1Api()
ret = v1.read_namespaced_pod(name, namespace)
node = body['spec']['nodeName']
db = pymysql.connect(host=databaseIP,user="l2sm",password="l2sm;",db="L2SM")
nsql = "SELECT * FROM interfaces WHERE node = '%s' AND network = '-1'" % (node.strip())
cur = db.cursor()
cur.execute(nsql)
data = cur.fetchall()
if not data or len(data)<len(network):
db.close()
raise kopf.PermanentError("l2sm could not deploy the pod: Node " + node.strip() + "has no free interfaces left")
#IF THERE IS ALREADY A MULTUS ANNOTATION, APPEND IT TO THE END.
interface_to_attach = []
network_array = []
j = 0
for interface in data[0:len(network)]:
network_array.append(multusInt[network[j]])
multusInt[network[j]] = interface[0].strip()
interface_to_attach.append(interface[0].strip())
j = j + 1
ret.metadata.annotations['k8s.v1.cni.cncf.io/networks'] = ', '.join(multusInt)
#GET NETWORK ID'S
#for j in items:
# if network in j['metadata']['name']:
# idsql = "SELECT id FROM networks WHERE network = '%s'" % (network.strip())
# cur.execute(idsql)
# retrieve = cur.fetchone()
# networkN = retrieve[0].strip()
# break
Alex ubuntu vm
committed
switchId = getSwitchId(cur, node) # TODO: diferenciar caso en el que es un veth el conectado y el de que es una red de vdd.
if switchId is None:
logger.info(f"The l2sm switch is not connected to controller. Not connecting the pod")
return
Alex ubuntu vm
committed
vethPattern = re.compile(r'\d+$')
portNumbers = [int(vethPattern.search(interface).group()) for interface in interface_to_attach]
for m in range(len(network)):
sql = "UPDATE interfaces SET network = '%s', pod = '%s' WHERE interface = '%s' AND node = '%s'" % (network_array[m], name, interface_to_attach[m], node)
cur.execute(sql)
payload = {
"networkId": network_array[m],
"networkEndpoints": [switchId + '/' + str(portNumbers[m])]
}
response = session.post(baseControllerUrl + '/l2sm/networks/port', json=payload)
#PATCH NETWORK WITH ANNOTATION
v1.patch_namespaced_pod(name, namespace, ret)
# Check the response status
if response.status_code == 200:
# Successful request
print("Request successful!")
else:
# Handle errors
print(f"Error: {response.status_code}")
logger.info(f"Pod {name} attached to network {network_array}")
#UPDATE DATABASE WHEN POD IS DELETED
@kopf.on.delete('pods.v1', annotations={'k8s.v1.cni.cncf.io/networks': kopf.PRESENT})
def dpod_vn(name, logger, **kwargs):
db = pymysql.connect(host=databaseIP,user="l2sm",password="l2sm;",db="L2SM")
cur = db.cursor()
sql = "UPDATE interfaces SET network = '-1', pod = '' WHERE pod = '%s'" % (name)
cur.execute(sql)
db.commit()
db.close()
logger.info(f"Pod {name} removed")
#UPDATE DATABASE WHEN NETWORK IS DELETED
@kopf.on.delete('NetworkAttachmentDefinition', when=lambda spec, **_: '"device": "l2sm-vNet"' in spec['config'])
def delete_vn(spec, name, logger, **kwargs):
db = pymysql.connect(host=databaseIP,user="l2sm",password="l2sm;",db="L2SM")
cur = db.cursor()
sql = "DELETE FROM networks WHERE network = '%s'" % (name)
cur.execute(sql)
response = session.delete(baseControllerUrl + '/l2sm/networks/' + name)
if response.status_code == 204:
# Successful request
logger.info(f"Network has been deleted")
db.commit()
else:
# Handle errors
logger.info(f"Error: {response.status_code}")
#DELETE DATABASE ENTRIES WHEN A NEW L2SM SWITCH IS DELETED (A NEW NODE GETS OUT OF THE CLUSTER)
@kopf.on.delete('pods.v1', labels={'l2sm-component': 'l2sm-switch'})
def remove_node(body, logger, annotations, **kwargs):
db = pymysql.connect(host=databaseIP,user="l2sm",password="l2sm;",db="L2SM")
cur = db.cursor()
sql = "DELETE FROM interfaces WHERE node = '%s'" % (body['spec']['nodeName'])
switchSql = "DELETE FROM switches WHERE node = '%s'" % (body['spec']['nodeName'])
cur.execute(switchSql)
db.commit()
db.close()
logger.info(f"Node {body['spec']['nodeName']} has been deleted from the cluster")