Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
CMDT
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Eclipse Research Labs
NEMO Project
NEMO Kernel
Cybersecure Microservices’ Digital Twin
CMDT
Merge requests
!1
pull changes from dev
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
pull changes from dev
dev
into
main
Overview
0
Commits
6
Pipelines
0
Changes
23
Merged
Gregor Cerar
requested to merge
dev
into
main
6 months ago
Overview
0
Commits
6
Pipelines
0
Changes
23
Expand
0
0
Merge request reports
Compare
main
main (base)
and
latest version
latest version
a2b2c536
6 commits,
6 months ago
23 files
+
3326
−
11
Side-by-side
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
23
Search (e.g. *.vue) (Ctrl+P)
deprecated/app.py
0 → 100644
+
318
−
0
Options
import
json
import
logging
import
os
import
time
from
datetime
import
datetime
from
pathlib
import
Path
from
typing
import
Literal
,
Optional
,
Type
,
TypeVar
,
cast
from
urllib.parse
import
urljoin
import
httpx
import
kubernetes
as
k8s
import
yaml
from
dotenv
import
load_dotenv
from
kubernetes.client
import
CoreV1Api
,
V1Pod
,
V1PodList
from
pydantic
import
BaseModel
from
pydantic.alias_generators
import
to_snake
# Define a TypeVar for a Pydantic class
class
TrafficStats
(
BaseModel
):
req_rate
:
Optional
[
float
]
res_rate
:
Optional
[
float
]
res_rate_by_code
:
dict
[
int
,
float
]
res_time_quantiles_ms
:
dict
[
str
,
float
]
def
is_empty
(
self
)
->
bool
:
return
len
(
self
.
res_rate_by_code
)
==
0
or
len
(
self
.
res_time_quantiles_ms
)
==
0
class
PodStatus
(
BaseModel
):
status_type
:
str
status
:
Literal
[
"
True
"
,
"
False
"
,
"
Unknown
"
]
last_transition_time
:
datetime
message
:
Optional
[
str
]
class
PodStats
(
BaseModel
):
pod_name
:
str
timestamp
:
datetime
workload_id
:
str
status
:
list
[
PodStatus
]
labels
:
dict
[
str
,
str
]
traffic_stats
:
Optional
[
TrafficStats
]
class
WorkloadStats
(
BaseModel
):
timestamp
:
datetime
workload_id
:
str
pods
:
list
[
PodStats
]
class
IntentConfig
(
BaseModel
):
base_url
:
str
@property
def
login_url
(
self
)
->
str
:
return
urljoin
(
self
.
base_url
,
"
./auth/login/
"
)
class
PromConfig
(
BaseModel
):
# base_url: str = "http://prometheus.linkerd.svc.cluster.local:9090/api/v1" # /api/v1/metrics
base_url
:
str
@property
def
query_url
(
self
)
->
str
:
return
urljoin
(
self
.
base_url
,
"
./api/v1/query
"
)
class
RabbitConfig
(
BaseModel
):
base_url
:
str
username
:
str
password
:
str
channel
:
str
class
Config
(
BaseModel
):
intent
:
IntentConfig
prometheus
:
PromConfig
rabbitmq
:
RabbitConfig
# create logger
logger
=
logging
.
getLogger
(
__name__
)
logger
.
setLevel
(
logging
.
DEBUG
)
# create console handler and set level to debug
ch
=
logging
.
StreamHandler
()
ch
.
setLevel
(
logging
.
DEBUG
)
# create formatter
# formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
formatter
=
logging
.
Formatter
(
"
%(asctime)s [%(filename)s:%(lineno)s - %(funcName)20s() ] %(message)s
"
)
# add formatter to ch
ch
.
setFormatter
(
formatter
)
# add ch to logger
logger
.
addHandler
(
ch
)
def
is_kubernetes
()
->
bool
:
return
Path
(
"
/var/run/secrets/kubernetes.io/serviceaccount/namespace
"
).
exists
()
or
(
Path
(
"
/proc/self/cgroup
"
).
is_file
()
and
any
(
"
kubepods
"
in
line
for
line
in
open
(
"
/proc/self/cgroup
"
))
)
def
get_pods_list
(
api
:
CoreV1Api
,
label_selector
:
Optional
[
str
]
=
"
nemo.eu/workload
"
)
->
tuple
[
V1PodList
,
list
[
V1Pod
]]:
pods_list
=
api
.
list_pod_for_all_namespaces
(
watch
=
False
,
label_selector
=
label_selector
)
assert
isinstance
(
pods_list
,
V1PodList
),
f
"
Invalid data type
{
type
(
pods_list
)
}
"
_pods_list
=
list
(
pods_list
.
items
)
return
pods_list
,
_pods_list
def
_get_pod_inbound_traffic_rate
(
pod_name
:
str
,
timestamp
:
float
,
cfg
:
PromConfig
)
->
Optional
[
float
]:
template
=
'
sum(rate(request_total{{pod=
"
{pod}
"
, direction=
"
inbound
"
}}[15m] @ {ts}))
'
query
=
template
.
format
(
pod
=
pod_name
,
ts
=
timestamp
)
logger
.
debug
(
f
"
query=
{
'
'
.
join
(
query
.
split
())
}
"
)
res
=
httpx
.
get
(
cfg
.
query_url
,
params
=
{
"
query
"
:
query
})
traffic_req_rate
:
Optional
[
float
]
=
None
if
res
.
status_code
==
200
:
data
=
res
.
json
()
logger
.
debug
(
f
"
inbound traffic rate query response:
\n
{
json
.
dumps
(
data
,
indent
=
2
)
}
"
)
if
data
.
get
(
"
status
"
)
==
"
success
"
:
results
=
data
.
get
(
"
data
"
,
{}).
get
(
"
result
"
,
[])
for
result
in
results
:
metric
=
result
.
get
(
"
metric
"
,
{})
value
=
result
.
get
(
"
value
"
,
[])
logger
.
debug
(
f
"
Metric:
{
metric
}
Value:
{
value
}
"
)
traffic_req_rate
=
cast
(
float
,
value
[
1
])
return
traffic_req_rate
def
_get_pod_outbound_traffic_rate
(
pod_name
:
str
,
timestamp
:
float
,
cfg
:
PromConfig
)
->
Optional
[
float
]:
template
=
'
sum(rate(response_total{{pod=
"
{pod}
"
, direction=
"
outbound
"
}}[15m] @ {ts}))
'
query
=
template
.
format
(
pod
=
pod_name
,
ts
=
timestamp
)
logger
.
debug
(
f
"
query=
{
'
'
.
join
(
query
.
split
())
}
"
)
res
=
httpx
.
get
(
cfg
.
query_url
,
params
=
{
"
query
"
:
query
})
traffic_res_rate
:
Optional
[
float
]
=
None
if
res
.
status_code
==
200
:
data
=
res
.
json
()
logger
.
debug
(
f
"
outbound traffic rate query response:
\n
{
json
.
dumps
(
data
,
indent
=
2
)
}
"
)
if
data
.
get
(
"
status
"
)
==
"
success
"
:
results
=
data
.
get
(
"
data
"
,
{}).
get
(
"
result
"
,
[])
for
result
in
results
:
metric
=
result
.
get
(
"
metric
"
,
{})
value
=
result
.
get
(
"
value
"
,
[])
logger
.
debug
(
f
"
Metric:
{
metric
}
Value:
{
value
}
"
)
traffic_res_rate
=
cast
(
float
,
value
[
1
])
return
traffic_res_rate
def
_get_outbound_traffic_rate_by_status_code
(
pod_name
:
str
,
timestamp
:
float
,
cfg
:
PromConfig
)
->
Optional
[
float
]:
template
=
'
sum by (status_code) (rate(response_total{{pod=
"
{pod}
"
, direction=
"
outbound
"
}}[15m] @ {ts}))
'
query
=
template
.
format
(
pod
=
pod_name
,
ts
=
timestamp
)
logger
.
debug
(
f
"
query=
{
'
'
.
join
(
query
.
split
())
}
"
)
res
=
httpx
.
get
(
cfg
.
query_url
,
params
=
{
"
query
"
:
query
})
traffic_res_rate_by_code
:
dict
[
int
,
float
]
=
{}
if
res
.
status_code
==
200
:
data
=
res
.
json
()
logger
.
debug
(
f
"
outbound traffic rate by status code query reponse:
'
{
pod_name
}
'
\n
{
json
.
dumps
(
data
,
indent
=
2
)
}
"
)
if
data
.
get
(
"
status
"
)
==
"
success
"
:
results
=
data
.
get
(
"
data
"
,
{}).
get
(
"
result
"
,
[])
for
result
in
results
:
metric
=
result
.
get
(
"
metric
"
,
{})
value
=
result
.
get
(
"
value
"
,
[])
logger
.
debug
(
f
"
Metric:
{
metric
}
Value:
{
value
}
"
)
status_code
=
cast
(
int
,
metric
[
"
status_code
"
])
status_code_rate
=
cast
(
float
,
value
[
1
])
traffic_res_rate_by_code
[
status_code
]
=
status_code_rate
return
traffic_res_rate_by_code
def
_get_pod_outbound_traffic_latency
(
pod_name
:
str
,
timestamp
:
float
,
config
:
PromConfig
)
->
dict
[
str
,
float
]:
template
=
"""
label_replace(
histogram_quantile(
{phi},
sum(
rate(
response_latency_ms_bucket{{pod=
"
{pod}
"
, direction=
"
outbound
"
}}[15m] @ {timestamp}
)
)
by (le)
),
"
quantile
"
,
"
{label}
"
,
""
,
""
)
"""
queries
:
list
[
str
]
=
[]
for
phi
,
label
in
[(
0.99
,
"
p99
"
),
(
0.95
,
"
p95
"
),
(
0.75
,
"
p75
"
),
(
0.50
,
"
p50
"
)]:
queries
.
append
(
template
.
format
(
phi
=
phi
,
pod
=
pod_name
,
timestamp
=
timestamp
,
label
=
label
))
query
=
"
\n
or
\n
"
.
join
(
queries
)
logger
.
debug
(
f
"
query=
{
'
'
.
join
(
query
.
split
())
}
"
)
res
=
httpx
.
get
(
config
.
query_url
,
params
=
{
"
query
"
:
query
})
logger
.
debug
(
res
)
traffic_res_quantiles_ms
:
dict
[
str
,
float
]
=
{}
if
res
.
status_code
==
200
:
data
=
res
.
json
()
logger
.
debug
(
f
"
traffic response times quantile for pod
'
{
pod_name
}
'
\n
{
json
.
dumps
(
data
,
indent
=
2
)
}
"
)
if
data
.
get
(
"
status
"
)
==
"
success
"
:
results
=
data
.
get
(
"
data
"
,
{}).
get
(
"
result
"
,
[])
for
result
in
results
:
metric
=
result
.
get
(
"
metric
"
,
{})
value
=
result
.
get
(
"
value
"
,
[])
logger
.
debug
(
f
"
Metric:
{
metric
}
Value:
{
value
}
"
)
quantile
=
cast
(
str
,
metric
[
"
quantile
"
])
response_ms
=
cast
(
float
,
value
[
1
])
traffic_res_quantiles_ms
[
quantile
]
=
response_ms
return
traffic_res_quantiles_ms
def
get_pod_stats
(
pod
:
V1Pod
,
config
:
PromConfig
)
->
PodStats
:
time_start
=
time
.
monotonic
()
pod_name
=
cast
(
str
,
pod
.
metadata
.
name
)
timestamp
=
cast
(
float
,
datetime
.
now
().
timestamp
())
# make a list of pod status changes, filter, and sort them by most recent
pod_status_changes
:
list
[
PodStatus
]
=
[]
for
condition
in
pod
.
status
.
conditions
or
[]:
pod_status_changes
.
append
(
PodStatus
(
status_type
=
condition
.
type
,
status
=
condition
.
status
,
last_transition_time
=
condition
.
last_transition_time
,
message
=
condition
.
message
,
)
)
pod_status_changes
.
sort
(
key
=
lambda
x
:
x
.
last_transition_time
,
reverse
=
True
)
# get pod's labels
labels
=
pod
.
metadata
.
labels
or
{}
# Get labels (or empty dict if none)
traffic_stats
:
TrafficStats
=
TrafficStats
(
req_rate
=
_get_pod_inbound_traffic_rate
(
pod_name
,
timestamp
,
config
),
res_rate
=
_get_pod_outbound_traffic_rate
(
pod_name
,
timestamp
,
config
),
res_rate_by_code
=
_get_outbound_traffic_rate_by_status_code
(
pod_name
,
timestamp
,
config
),
res_time_quantiles_ms
=
_get_pod_outbound_traffic_latency
(
pod_name
,
timestamp
,
config
),
)
time_end
=
time
.
monotonic
()
logger
.
info
(
f
"
Queries took
{
time_end
-
time_start
:
.
3
f
}
s
"
)
return
PodStats
(
pod_name
=
pod_name
,
timestamp
=
timestamp
,
status
=
pod_status_changes
,
labels
=
labels
,
traffic_stats
=
traffic_stats
,
workload_id
=
labels
.
get
(
"
nemo.eu/workload
"
,
""
),
)
def
main
(
config
:
Config
)
->
None
:
try
:
k8s
.
config
.
load_incluster_config
()
except
k8s
.
config
.
config_exception
.
ConfigException
:
logger
.
warning
(
"
App is not running in K8s pod, or cannot obtain config
"
)
try
:
k8s
.
config
.
load_kube_config
()
except
k8s
.
config
.
config_exception
.
ConfigException
:
logger
.
error
(
"
Cannot load config for Kubernetes.
"
,
exc_info
=
True
)
raise
RuntimeError
(
"
Cannot access Kubernetes API.
"
)
from
None
api
=
CoreV1Api
()
_
,
pods_list
=
get_pods_list
(
api
,
label_selector
=
"
nemo.eu/workload
"
)
logger
.
debug
(
f
"
Found
{
len
(
pods_list
)
}
pods.
"
)
for
pod
in
pods_list
:
logger
.
debug
(
f
"
{
pod
.
metadata
.
name
=
}
"
)
pod_stats
=
get_pod_stats
(
pod
,
config
.
prometheus
)
if
not
pod_stats
.
traffic_stats
.
is_empty
():
logger
.
info
(
f
"
Pod stats
\n
{
pod_stats
.
model_dump_json
(
indent
=
2
)
}
"
)
raise
NotImplementedError
()
if
__name__
==
"
__main__
"
:
if
not
is_kubernetes
():
load_dotenv
()
config
=
Config
(
intent
=
IntentConfig
(
base_url
=
os
.
environ
.
get
(
"
INTENT_API_BASE_URL
"
),
),
prometheus
=
PromConfig
(
base_url
=
os
.
environ
.
get
(
"
THANOS_BASE_URL
"
),
),
rabbitmq
=
RabbitConfig
(
base_url
=
os
.
environ
.
get
(
"
NEMO_RABBITMQ_HOST
"
),
username
=
os
.
environ
.
get
(
"
RABBITMQ_USERNAME
"
),
password
=
os
.
environ
.
get
(
"
RABBITMQ_PASSWORD
"
),
channel
=
os
.
environ
.
get
(
"
RABBITMQ_CHANNEL
"
),
),
)
main
(
config
)
Loading