Skip to content
Snippets Groups Projects
Commit 433f1aed authored by Syed Mafooq Ull Hassan's avatar Syed Mafooq Ull Hassan
Browse files

Feat#3 Energy Efficiency Implemented

parent 690d00ec
No related branches found
No related tags found
No related merge requests found
......@@ -4,9 +4,10 @@ default_language_version:
default_stages: [pre-commit]
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
rev: v5.0.0
hooks:
- id: check-added-large-files
args: ['--maxkb=5000']
- id: check-case-conflict
- id: check-json
- id: check-merge-conflict
......@@ -16,13 +17,13 @@ repos:
- id: trailing-whitespace
- repo: https://github.com/asottile/pyupgrade
rev: v3.17.0
rev: v3.20.0
hooks:
- id: pyupgrade
args: [--py38-plus]
- repo: https://github.com/PyCQA/flake8
rev: 7.1.1
rev: 7.3.0
hooks:
- id: flake8
additional_dependencies:
......@@ -34,30 +35,30 @@ repos:
- flake8-no-pep420
- repo: https://github.com/psf/black
rev: 24.8.0
rev: 25.1.0
hooks:
- id: black
- repo: https://github.com/adamchainz/blacken-docs
rev: 1.18.0
rev: 1.19.1
hooks:
- id: blacken-docs
additional_dependencies:
- black==23.1.0
- repo: https://github.com/rstcheck/rstcheck
rev: v6.2.4
rev: v6.2.5
hooks:
- id: rstcheck
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.11.2
rev: v1.16.1
hooks:
- id: mypy
additional_dependencies: ["types-requests"]
- repo: https://github.com/asottile/reorder-python-imports
rev: v3.13.0
rev: v3.15.0
hooks:
- id: reorder-python-imports
args: ["--application-directories", "src"]
This diff is collapsed.
This diff is collapsed.
import logging
import re
from pathlib import Path
import joblib
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
def extract_numeric_ghz(text):
try:
numbers = re.findall(r"[\d.]+", str(text))
return float(numbers[0]) if numbers else np.nan
except Exception:
return np.nan
def extract_cache_mb(text):
try:
text = str(text).upper()
if "MB" in text:
return float(re.findall(r"[\d.]+", text)[0])
elif "GB" in text:
return float(re.findall(r"[\d.]+", text)[0]) * 1024
except Exception:
return np.nan
def extract_cores(text):
try:
if "/" in str(text):
return int(str(text).split("/")[0].strip())
return int(text)
except Exception:
return np.nan
# Load data
data_dir = Path(__file__).parent
intel_df = pd.read_csv(data_dir / "intel_cpus.csv")
amd_df = pd.read_csv(data_dir / "amd_cpus.csv")
# Clean Intel data
intel_clean = pd.DataFrame(
{
"Clock (GHz)": intel_df["Processor Base Frequency"].apply(extract_numeric_ghz),
"Cores": intel_df["Total Cores"].apply(pd.to_numeric, errors="coerce"),
"L3_Cache": intel_df["Cache"].apply(extract_cache_mb),
"TDP": intel_df["TDP"].str.extract(r"(\d+)").iloc[:, 0].astype(float),
}
)
# Clean AMD data
amd_clean = pd.DataFrame(
{
"Clock (GHz)": amd_df["Clock"].apply(extract_numeric_ghz),
"Cores": amd_df["Cores"].apply(extract_cores),
"L3_Cache": amd_df["L3 Cache"].apply(extract_cache_mb),
"TDP": amd_df["TDP"].str.extract(r"(\d+)").iloc[:, 0].astype(float),
}
)
# Combine datasets
df = pd.concat([intel_clean, amd_clean], ignore_index=True)
df.dropna(inplace=True)
# Features and target
X = df[["Clock (GHz)", "Cores", "L3_Cache"]]
y = df["TDP"]
# Define model pipeline
model = Pipeline(
[("scaler", StandardScaler()), ("regressor", RandomForestRegressor(n_estimators=100, random_state=42))]
)
# Train
model.fit(X, y)
# Save model
output_path = data_dir / "tdp_regressor.pkl"
joblib.dump(model, output_path)
logging.info(f"Model saved to {output_path}")
......@@ -36,16 +36,21 @@ def test_command() -> None:
def run(dry_run: bool, ip: str) -> None:
network_interfaces = utils.get_network_interfaces_annotations()
monetary_cost = utils.get_monetary_cost_annotation()
energy_efficiency = utils.get_energy_efficiency_annotation()
annotations = {
**network_interfaces,
**monetary_cost,
**energy_efficiency,
}
labels = {
**utils.get_geolocation(),
}
if ip:
annotations.update(utils.get_interface_name_and_type(ip))
if dry_run:
click.echo("[DRY-RUN] The following attributes would be applied: \n")
click.echo("Annotations:")
......@@ -54,7 +59,6 @@ def run(dry_run: bool, ip: str) -> None:
click.echo("Labels:")
for key, value in labels.items():
click.echo(f"\t{key}: {value}")
return
pass
......@@ -67,26 +71,29 @@ def start_daemon() -> None:
raise exceptions.HyperToolKubernetesError("Error loading in-cluster config")
v1 = client.CoreV1Api()
api_server_ip = os.environ.get("KUBERNETES_SERVICE_HOST")
while True:
network_interfaces = utils.get_network_interfaces_annotations()
monetary_cost = utils.get_monetary_cost_annotation()
energy_efficiency = utils.get_energy_efficiency_annotation()
interface_name_and_type = utils.get_interface_name_and_type(str(api_server_ip))
geolocation = utils.get_geolocation()
annotations = {
**network_interfaces,
**monetary_cost,
**energy_efficiency,
**interface_name_and_type,
}
labels = {
**utils.get_geolocation(),
**geolocation,
}
patch = {"metadata": {"labels": labels, "annotations": annotations}}
try:
v1.patch_node(name=NODE_NAME, body=patch)
except ApiException as e:
raise exceptions.HyperToolUpdateError(f"Error updating node: {e}")
......
File added
import json
import logging
import os
import platform
import re
import subprocess
import time
from enum import Enum
from pathlib import Path
import geocoder
import joblib
import netifaces
import numpy as np
import pandas as pd
import requests
from pyroute2 import IPRoute
......@@ -19,6 +24,7 @@ def get_network_interfaces_annotations():
return {"hyperai.eu/node-available-interfaces": interfaces}
# === MONETARY COST ===
def get_cpu_count():
return os.cpu_count() or 1
......@@ -65,6 +71,7 @@ def get_monetary_cost_annotation():
return {"hyperai.eu/node-monetary-cost-category": label}
# === NETWORK TYPE ===
class NetworkType(Enum):
ETHERNET = 1
WIFI = 771
......@@ -99,6 +106,7 @@ def get_interface_name_and_type(ip: str):
}
# === GEOLOCATION ===
def get_geolocation():
try:
g = geocoder.ip("me")
......@@ -116,3 +124,104 @@ def get_geolocation():
"hyperai.eu/node-geolocation-region": g.state,
"hyperai.eu/node-geolocation-country": g.country,
}
# === ENERGY EFFICIENCY ===
def get_cpu_model():
return platform.uname().processor or platform.processor() or platform.uname().machine
def extract_cpu_features():
try:
output = subprocess.check_output(["lscpu"], text=True)
lines = output.splitlines()
features = {}
for line in lines:
if "CPU(s):" in line and "NUMA" not in line:
features["Cores"] = int(line.split(":")[1].strip())
elif "CPU max MHz" in line:
features["Clock (GHz)"] = round(float(line.split(":")[1].strip()) / 1000, 2)
elif "L3 cache" in line:
value = line.split(":")[1].strip()
if value.endswith("K"):
features["L3_Cache"] = round(int(value[:-1]) / 1024, 2)
elif value.endswith("M"):
features["L3_Cache"] = float(value[:-1])
else:
features["L3_Cache"] = 0.0
features.setdefault("Cores", os.cpu_count() or 4)
features.setdefault("Clock (GHz)", 2.5)
features.setdefault("L3_Cache", 8.0)
return features
except Exception as e:
logging.warning(f"Failed to extract CPU features dynamically: {e}")
return {"Clock (GHz)": 2.5, "Cores": os.cpu_count() or 4, "L3_Cache": 8.0}
def get_tdp(cpu_model: str) -> float | None:
csv_dir = Path(__file__).parents[2] / "data_sources"
model_dir = Path(__file__).parent / "data"
try:
# Load CSVs
intel_df = pd.read_csv(csv_dir / "intel_cpus.csv")
match = intel_df[intel_df["Product Name"].str.contains(cpu_model, case=False, na=False)]
if not match.empty:
tdp_value = match.iloc[0]["TDP"]
return float(tdp_value.split()[0]) if isinstance(tdp_value, str) else float(tdp_value)
amd_df = pd.read_csv(csv_dir / "amd_cpus.csv")
match = amd_df[amd_df["Name"].str.contains(cpu_model, case=False, na=False)]
if not match.empty:
tdp_value = match.iloc[0]["TDP"]
return float(tdp_value.split()[0]) if isinstance(tdp_value, str) else float(tdp_value)
# Load regressor
logging.info("CPU not found in CSVs. Falling back to regressor.")
regressor = joblib.load(model_dir / "tdp_regressor.pkl")
features_dict = extract_cpu_features()
features_df = pd.DataFrame([features_dict], columns=["Clock (GHz)", "Cores", "L3_Cache"])
predicted_tdp = float(regressor.predict(features_df)[0])
return predicted_tdp
except Exception as e:
logging.error(f"TDP estimation failed: {e}")
return None
def get_energy_efficiency_annotation():
try:
cpu_model = get_cpu_model()
tdp = get_tdp(cpu_model)
if not tdp:
return {"hyperai.eu/node-energy-efficiency": "unknown"}
a = np.random.rand(1_000_000).astype(np.float32)
b = np.random.rand(1_000_000).astype(np.float32)
start = time.time()
for _ in range(10):
np.dot(a, b)
duration = time.time() - start
flops = 10 * 2 * len(a)
flops_per_sec = flops / duration
gflops_per_joule = (flops_per_sec / tdp) / 1e9
if gflops_per_joule < 0.5:
label = "very low"
elif gflops_per_joule < 1.0:
label = "low"
elif gflops_per_joule < 2.0:
label = "medium"
elif gflops_per_joule < 5.0:
label = "high"
else:
label = "very high"
return {"hyperai.eu/node-energy-efficiency": label}
except Exception as e:
logging.error(f"Failed energy estimation: {e}")
return {"hyperai.eu/node-energy-efficiency": "unknown"}
  • Developer

    Is it possible to include also the docs for energy-efficiency in the functionality.rst file

0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment