Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 232 additions & 4 deletions src/metrics/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,208 @@ use nix::sys::resource::{Resource, getrlimit};
use prometheus_client::collector::Collector;
use prometheus_client::encoding::{DescriptorEncoder, EncodeMetric};
use prometheus_client::metrics;
use prometheus_client::registry::Unit;
use std::io::{Read, Seek, SeekFrom};
use tracing::error;

// Track open fds
#[derive(Debug)]
pub struct ProcessMetrics {}
const FD_PATH: &str = "/dev/fd";
const PROC_STAT_PATH: &str = "/proc/self/stat";
const SYSTEM_STAT_PATH: &str = "/proc/stat";

#[derive(Debug)]
struct SystemStat {
btime: u64,
}

impl SystemStat {
fn parse(contents: &str) -> Option<Self> {
const BTIME: &str = "btime ";
let start = contents.find(BTIME)? + BTIME.len();
let end = start + contents[start..].find('\n')?;
Some(Self {
btime: contents[start..end].parse().ok()?,
})
}
}

#[derive(Debug)]
struct ProcessStat {
starttime: u64,
vsize: u64,
rss: u64, // This is %ld formatted, but that's for historical reasons and should be positive
}

impl ProcessStat {
fn parse(contents: &str) -> Option<ProcessStat> {
// `contents` is a space delimited array of values that look like
//
// 167211 (ztunnel) S 55841 167211 ...
//
// Since the process name (ztunnel) could contain spaces, we do `split_whitespace`
// after the last `)`. `starttime`, `vsize`, and `rss` are the 22nd, 23rd, and 24th elements
// of `contents` using 1-indexing per proc(5). Since we are skipping the first two elements
// and have 0-indexing, `starttime` is the 19th element.
let start = contents.rfind(')')? + 2;
let mut items = contents[start..].split_whitespace();

let starttime: u64 = items.nth(19)?.parse().ok()?;
let vsize: u64 = items.next()?.parse().ok()?;
let rss: u64 = items.next()?.parse().ok()?;

Some(ProcessStat {
starttime,
vsize,
rss,
})
}
}

// See man 5 process for descriptions of these.
#[derive(Debug)]
pub struct ProcessMetrics {
// These should never be `None`, but we still try to handle parsing/syscall
// errors gracefully so it is impossible to panic.
process_stat: Option<std::fs::File>,
page_size: Option<u64>,
clock_ticks_per_second: Option<u64>,
system_stat: Option<SystemStat>,
}

impl ProcessMetrics {
fn encode_proc_stat(&self, encoder: &mut DescriptorEncoder) -> Result<(), std::fmt::Error> {
let mut fd = match &self.process_stat {
Some(fd) => fd,
None => return Ok(()),
};

let mut contents = String::new();
// We want to seek to the start of the file and reread as
// contents might have changed.
if let Err(e) = fd.seek(SeekFrom::Start(0)) {
tracing::warn!("Failed to seek {}: {}", PROC_STAT_PATH, e);
return Ok(());
}
if let Err(e) = fd.read_to_string(&mut contents) {
tracing::warn!("Failed to read {}: {}", PROC_STAT_PATH, e);
return Ok(());
}

let stat = match ProcessStat::parse(&contents) {
Some(stat) => stat,
None => {
tracing::warn!("Failed to parse stat file.");
tracing::debug!(
"Failed to parse {} file. Contents: {}",
PROC_STAT_PATH,
contents
);
return Ok(());
}
};

{
let gauge = metrics::gauge::ConstGauge::new(stat.vsize);
let metric_encoder = encoder.encode_descriptor(
"process_virtual_memory",
"unstable: Virtual memory size in bytes.",
Some(&Unit::Bytes),
gauge.metric_type(),
)?;
gauge.encode(metric_encoder)?;
}

if let Some(page_size) = self.page_size {
let gauge = metrics::gauge::ConstGauge::new(stat.rss * page_size);
let metric_encoder = encoder.encode_descriptor(
"process_resident_memory",
"unstable: Resident memory size in bytes.",
Some(&Unit::Bytes),
gauge.metric_type(),
)?;
gauge.encode(metric_encoder)?;
}

if let (Some(system_stat), Some(clock_ticks_per_second)) =
(&self.system_stat, self.clock_ticks_per_second)
{
let gauge = metrics::gauge::ConstGauge::new(
system_stat.btime + stat.starttime / clock_ticks_per_second,
);
let metric_encoder = encoder.encode_descriptor(
"process_start_time",
"unstable: Start time of the process since unix epoch in seconds.",
Some(&Unit::Seconds),
gauge.metric_type(),
)?;
gauge.encode(metric_encoder)?;
}

Ok(())
}

pub fn new() -> Self {
Self {}
let proc_stat_fd = match std::fs::File::open(PROC_STAT_PATH) {
Ok(fd) => Option::Some(fd),
Err(e) => {
tracing::warn!("Failed to open {}: {}", PROC_STAT_PATH, e);
None
}
};

let system_stat = match std::fs::File::open(SYSTEM_STAT_PATH) {
Ok(mut fd) => {
let mut contents = String::new();
match fd.read_to_string(&mut contents) {
Err(e) => {
tracing::warn!("Failed to read {}: {}", SYSTEM_STAT_PATH, e);
None
}
Ok(_) => {
let system_stat = SystemStat::parse(&contents);
if system_stat.is_none() {
tracing::warn!("Failed to parse {}", SYSTEM_STAT_PATH);
tracing::debug!("Failed to parse {}: {}", SYSTEM_STAT_PATH, contents);
}
system_stat
}
}
}
Err(e) => {
tracing::warn!("Failed to open {}: {}", SYSTEM_STAT_PATH, e);
None
}
};

let page_size = match nix::unistd::sysconf(nix::unistd::SysconfVar::PAGE_SIZE) {
Ok(Some(s)) => s.try_into().ok(),
Err(e) => {
tracing::warn!("Failed to get page size: {}", e);
None
}
_ => {
tracing::warn!("Failed to get page size");
None
}
};

let clock_ticks_per_second = match nix::unistd::sysconf(nix::unistd::SysconfVar::CLK_TCK) {
Ok(Some(s)) if s > 0 => s.try_into().ok(),
Err(e) => {
tracing::warn!("Failed to get clock ticks per second: {}", e);
None
}
_ => {
tracing::warn!("Failed to get clock ticks per second");
None
}
};

Self {
process_stat: proc_stat_fd,
system_stat,
page_size,
clock_ticks_per_second,
}
}

fn encode_open_fds(&self, encoder: &mut DescriptorEncoder) -> Result<(), std::fmt::Error> {
Expand Down Expand Up @@ -67,6 +259,28 @@ impl ProcessMetrics {
gauge.encode(metric_encoder)?;
Ok(())
}

fn encode_max_vmem(&self, encoder: &mut DescriptorEncoder) -> Result<(), std::fmt::Error> {
let max_vmem = match getrlimit(Resource::RLIMIT_AS) {
// Often, max_vmem is unlimited. This is expressed as a soft limit of
// 0xFFFFFFFFFFFFFFFF. This gives us a format error if we try to pass this to prometheus
// despite it being a perfectly valid u64 so we use f64 instead.
Ok((soft_limit, _)) => soft_limit as f64,
Err(e) => {
error!("Failed to get rlimit: {}", e);
return Ok(());
}
};
let gauge = metrics::gauge::ConstGauge::new(max_vmem);
let metric_encoder = encoder.encode_descriptor(
"process_virtual_memory_max",
"unstable: Maximum amount of virtual memory available in bytes.",
Some(&Unit::Bytes),
gauge.metric_type(),
)?;
gauge.encode(metric_encoder)?;
Ok(())
}
}

impl Default for ProcessMetrics {
Expand All @@ -77,6 +291,20 @@ impl Default for ProcessMetrics {

impl Collector for ProcessMetrics {
fn encode(&self, mut encoder: DescriptorEncoder) -> Result<(), std::fmt::Error> {
match self.encode_proc_stat(&mut encoder) {
Ok(_) => {}
Err(e) => {
error!("Failed to encode open process stats: {}", e);
return Ok(());
}
}
match self.encode_max_vmem(&mut encoder) {
Ok(_) => {}
Err(e) => {
error!("Failed to encode max vmem: {}", e);
return Ok(());
}
}
match self.encode_open_fds(&mut encoder) {
Ok(_) => {}
Err(e) => {
Expand Down
52 changes: 41 additions & 11 deletions tests/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ async fn test_quit_lifecycle() {
}

fn process_metrics_assertions(metrics: &ParsedMetrics) {
for metric in ["process_open_fds", "process_max_fds"] {
let metric = &(metric);
fn get_gauge(metrics: &ParsedMetrics, metric: &str) -> f64 {
let m = metrics.query(metric, &Default::default());
assert!(m.is_some(), "expected metric {metric}");
assert!(
Expand All @@ -209,17 +208,48 @@ fn process_metrics_assertions(metrics: &ParsedMetrics) {
);
let value = m.unwrap()[0].value.clone();
match value {
prometheus_parse::Value::Gauge(v) => {
assert!(
v > 0.0,
"expected metric {metric} to be positive, was {value:?}",
);
}
_ => {
panic!("unexpected metric type");
}
prometheus_parse::Value::Gauge(v) => v,
_ => panic!("unexpected metric type for {metric}: {value:?}"),
}
}

for metric in ["process_open_fds", "process_max_fds"] {
let v = get_gauge(metrics, metric);
assert!(v > 0.0, "expected metric {metric} to be positive, was {v}");
}

let vmem = get_gauge(metrics, "process_virtual_memory_bytes");
assert!(
vmem > 0.0,
"expected virtual memory to be positive, was {vmem}"
);

let rss = get_gauge(metrics, "process_resident_memory_bytes");
assert!(
rss > 0.0,
"expected resident memory to be positive, was {rss}"
);
assert!(
rss <= vmem,
"expected resident memory <= virtual memory, got rss={rss} vmem={vmem}"
);

let start = get_gauge(metrics, "process_start_time_seconds");
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs_f64();
assert!(start > 0.0, "expected start time to be set, was {start}");
assert!(
start <= now + 5.0,
"expected start time to be <= now (+5s skew), got start={start} now={now}"
);

let max_vmem = get_gauge(metrics, "process_virtual_memory_max_bytes");
assert!(
max_vmem >= vmem,
"expected max vmem >= vmem, got max_vmem={max_vmem} vmem={vmem}"
);
}

fn base_metrics_assertions(metrics: ParsedMetrics) {
Expand Down