Skip to content
Snippets Groups Projects

pull changes from dev

Merged Gregor Cerar requested to merge dev into main
23 files
+ 3326
11
Compare changes
  • Side-by-side
  • Inline
Files
23
deprecated/app.py 0 → 100644
+ 318
0
 
import json
 
import logging
 
import os
 
import time
 
from datetime import datetime
 
from pathlib import Path
 
from typing import Literal, Optional, Type, TypeVar, cast
 
from urllib.parse import urljoin
 
 
import httpx
 
import kubernetes as k8s
 
import yaml
 
from dotenv import load_dotenv
 
from kubernetes.client import CoreV1Api, V1Pod, V1PodList
 
from pydantic import BaseModel
 
from pydantic.alias_generators import to_snake
 
 
# Define a TypeVar for a Pydantic class
 
 
 
 
class TrafficStats(BaseModel):
 
req_rate: Optional[float]
 
 
res_rate: Optional[float]
 
res_rate_by_code: dict[int, float]
 
res_time_quantiles_ms: dict[str, float]
 
 
def is_empty(self) -> bool:
 
return len(self.res_rate_by_code) == 0 or len(self.res_time_quantiles_ms) == 0
 
 
 
class PodStatus(BaseModel):
 
status_type: str
 
status: Literal["True", "False", "Unknown"]
 
last_transition_time: datetime
 
message: Optional[str]
 
 
 
class PodStats(BaseModel):
 
pod_name: str
 
timestamp: datetime
 
workload_id: str
 
status: list[PodStatus]
 
labels: dict[str, str]
 
traffic_stats: Optional[TrafficStats]
 
 
 
class WorkloadStats(BaseModel):
 
timestamp: datetime
 
workload_id: str
 
pods: list[PodStats]
 
 
 
class IntentConfig(BaseModel):
 
base_url: str
 
 
@property
 
def login_url(self) -> str:
 
return urljoin(self.base_url, "./auth/login/")
 
 
 
class PromConfig(BaseModel):
 
# base_url: str = "http://prometheus.linkerd.svc.cluster.local:9090/api/v1" # /api/v1/metrics
 
base_url: str
 
 
@property
 
def query_url(self) -> str:
 
return urljoin(self.base_url, "./api/v1/query")
 
 
 
class RabbitConfig(BaseModel):
 
base_url: str
 
username: str
 
password: str
 
 
channel: str
 
 
 
class Config(BaseModel):
 
intent: IntentConfig
 
prometheus: PromConfig
 
rabbitmq: RabbitConfig
 
 
 
# create logger
 
logger = logging.getLogger(__name__)
 
logger.setLevel(logging.DEBUG)
 
 
# create console handler and set level to debug
 
ch = logging.StreamHandler()
 
ch.setLevel(logging.DEBUG)
 
 
# create formatter
 
# formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
 
formatter = logging.Formatter("%(asctime)s [%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s")
 
 
# add formatter to ch
 
ch.setFormatter(formatter)
 
 
# add ch to logger
 
logger.addHandler(ch)
 
 
 
def is_kubernetes() -> bool:
 
return Path("/var/run/secrets/kubernetes.io/serviceaccount/namespace").exists() or (
 
Path("/proc/self/cgroup").is_file() and any("kubepods" in line for line in open("/proc/self/cgroup"))
 
)
 
 
 
def get_pods_list(api: CoreV1Api, label_selector: Optional[str] = "nemo.eu/workload") -> tuple[V1PodList, list[V1Pod]]:
 
pods_list = api.list_pod_for_all_namespaces(watch=False, label_selector=label_selector)
 
assert isinstance(pods_list, V1PodList), f"Invalid data type {type(pods_list)}"
 
_pods_list = list(pods_list.items)
 
return pods_list, _pods_list
 
 
 
def _get_pod_inbound_traffic_rate(pod_name: str, timestamp: float, cfg: PromConfig) -> Optional[float]:
 
template = 'sum(rate(request_total{{pod="{pod}", direction="inbound"}}[15m] @ {ts}))'
 
query = template.format(pod=pod_name, ts=timestamp)
 
logger.debug(f"query={' '.join(query.split())}")
 
 
res = httpx.get(cfg.query_url, params={"query": query})
 
 
traffic_req_rate: Optional[float] = None
 
if res.status_code == 200:
 
data = res.json()
 
logger.debug(f"inbound traffic rate query response:\n{json.dumps(data, indent=2)}")
 
if data.get("status") == "success":
 
results = data.get("data", {}).get("result", [])
 
for result in results:
 
metric = result.get("metric", {})
 
value = result.get("value", [])
 
logger.debug(f"Metric: {metric} Value: {value}")
 
traffic_req_rate = cast(float, value[1])
 
 
return traffic_req_rate
 
 
 
def _get_pod_outbound_traffic_rate(pod_name: str, timestamp: float, cfg: PromConfig) -> Optional[float]:
 
template = 'sum(rate(response_total{{pod="{pod}", direction="outbound"}}[15m] @ {ts}))'
 
query = template.format(pod=pod_name, ts=timestamp)
 
logger.debug(f"query={' '.join(query.split())}")
 
 
res = httpx.get(cfg.query_url, params={"query": query})
 
 
traffic_res_rate: Optional[float] = None
 
if res.status_code == 200:
 
data = res.json()
 
logger.debug(f"outbound traffic rate query response:\n{json.dumps(data, indent=2)}")
 
if data.get("status") == "success":
 
results = data.get("data", {}).get("result", [])
 
for result in results:
 
metric = result.get("metric", {})
 
value = result.get("value", [])
 
logger.debug(f"Metric: {metric} Value: {value}")
 
traffic_res_rate = cast(float, value[1])
 
 
return traffic_res_rate
 
 
 
def _get_outbound_traffic_rate_by_status_code(pod_name: str, timestamp: float, cfg: PromConfig) -> Optional[float]:
 
template = 'sum by (status_code) (rate(response_total{{pod="{pod}", direction="outbound"}}[15m] @ {ts}))'
 
query = template.format(pod=pod_name, ts=timestamp)
 
logger.debug(f"query={' '.join(query.split())}")
 
 
res = httpx.get(cfg.query_url, params={"query": query})
 
 
traffic_res_rate_by_code: dict[int, float] = {}
 
if res.status_code == 200:
 
data = res.json()
 
logger.debug(f"outbound traffic rate by status code query reponse:'{pod_name}'\n{json.dumps(data, indent=2)}")
 
if data.get("status") == "success":
 
results = data.get("data", {}).get("result", [])
 
for result in results:
 
metric = result.get("metric", {})
 
value = result.get("value", [])
 
logger.debug(f"Metric: {metric} Value: {value}")
 
status_code = cast(int, metric["status_code"])
 
status_code_rate = cast(float, value[1])
 
traffic_res_rate_by_code[status_code] = status_code_rate
 
 
return traffic_res_rate_by_code
 
 
 
def _get_pod_outbound_traffic_latency(pod_name: str, timestamp: float, config: PromConfig) -> dict[str, float]:
 
template = """
 
label_replace(
 
histogram_quantile(
 
{phi},
 
sum(
 
rate(
 
response_latency_ms_bucket{{pod="{pod}", direction="outbound"}}[15m] @ {timestamp}
 
)
 
)
 
by (le)
 
),
 
"quantile",
 
"{label}",
 
"",
 
""
 
)
 
"""
 
 
queries: list[str] = []
 
for phi, label in [(0.99, "p99"), (0.95, "p95"), (0.75, "p75"), (0.50, "p50")]:
 
queries.append(template.format(phi=phi, pod=pod_name, timestamp=timestamp, label=label))
 
 
query = "\n or \n".join(queries)
 
logger.debug(f"query={' '.join(query.split())}")
 
 
res = httpx.get(config.query_url, params={"query": query})
 
logger.debug(res)
 
 
traffic_res_quantiles_ms: dict[str, float] = {}
 
if res.status_code == 200:
 
data = res.json()
 
logger.debug(f"traffic response times quantile for pod '{pod_name}'\n{json.dumps(data, indent=2)}")
 
if data.get("status") == "success":
 
results = data.get("data", {}).get("result", [])
 
for result in results:
 
metric = result.get("metric", {})
 
value = result.get("value", [])
 
logger.debug(f"Metric: {metric} Value: {value}")
 
quantile = cast(str, metric["quantile"])
 
response_ms = cast(float, value[1])
 
traffic_res_quantiles_ms[quantile] = response_ms
 
 
return traffic_res_quantiles_ms
 
 
 
def get_pod_stats(pod: V1Pod, config: PromConfig) -> PodStats:
 
time_start = time.monotonic()
 
pod_name = cast(str, pod.metadata.name)
 
timestamp = cast(float, datetime.now().timestamp())
 
 
# make a list of pod status changes, filter, and sort them by most recent
 
pod_status_changes: list[PodStatus] = []
 
for condition in pod.status.conditions or []:
 
pod_status_changes.append(
 
PodStatus(
 
status_type=condition.type,
 
status=condition.status,
 
last_transition_time=condition.last_transition_time,
 
message=condition.message,
 
)
 
)
 
pod_status_changes.sort(key=lambda x: x.last_transition_time, reverse=True)
 
 
# get pod's labels
 
labels = pod.metadata.labels or {} # Get labels (or empty dict if none)
 
 
traffic_stats: TrafficStats = TrafficStats(
 
req_rate=_get_pod_inbound_traffic_rate(pod_name, timestamp, config),
 
res_rate=_get_pod_outbound_traffic_rate(pod_name, timestamp, config),
 
res_rate_by_code=_get_outbound_traffic_rate_by_status_code(pod_name, timestamp, config),
 
res_time_quantiles_ms=_get_pod_outbound_traffic_latency(pod_name, timestamp, config),
 
)
 
 
time_end = time.monotonic()
 
logger.info(f"Queries took {time_end - time_start:.3f}s")
 
 
return PodStats(
 
pod_name=pod_name,
 
timestamp=timestamp,
 
status=pod_status_changes,
 
labels=labels,
 
traffic_stats=traffic_stats,
 
workload_id=labels.get("nemo.eu/workload", ""),
 
)
 
 
 
def main(config: Config) -> None:
 
try:
 
k8s.config.load_incluster_config()
 
except k8s.config.config_exception.ConfigException:
 
logger.warning("App is not running in K8s pod, or cannot obtain config")
 
 
try:
 
k8s.config.load_kube_config()
 
except k8s.config.config_exception.ConfigException:
 
logger.error("Cannot load config for Kubernetes.", exc_info=True)
 
 
raise RuntimeError("Cannot access Kubernetes API.") from None
 
 
api = CoreV1Api()
 
_, pods_list = get_pods_list(api, label_selector="nemo.eu/workload")
 
logger.debug(f"Found {len(pods_list)} pods.")
 
 
for pod in pods_list:
 
logger.debug(f"{pod.metadata.name=}")
 
pod_stats = get_pod_stats(pod, config.prometheus)
 
if not pod_stats.traffic_stats.is_empty():
 
logger.info(f"Pod stats\n{pod_stats.model_dump_json(indent=2)}")
 
 
raise NotImplementedError()
 
 
 
if __name__ == "__main__":
 
if not is_kubernetes():
 
load_dotenv()
 
 
config = Config(
 
intent=IntentConfig(
 
base_url=os.environ.get("INTENT_API_BASE_URL"),
 
),
 
prometheus=PromConfig(
 
base_url=os.environ.get("THANOS_BASE_URL"),
 
),
 
rabbitmq=RabbitConfig(
 
base_url=os.environ.get("NEMO_RABBITMQ_HOST"),
 
username=os.environ.get("RABBITMQ_USERNAME"),
 
password=os.environ.get("RABBITMQ_PASSWORD"),
 
channel=os.environ.get("RABBITMQ_CHANNEL"),
 
),
 
)
 
 
main(config)
Loading