Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.imageworks.spcue;
Comment thread
DiegoTavares marked this conversation as resolved.

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
Expand All @@ -8,6 +10,7 @@
import com.imageworks.spcue.dispatcher.DispatchQueue;
import com.imageworks.spcue.dispatcher.HostReportHandler;
import com.imageworks.spcue.dispatcher.HostReportQueue;
import com.imageworks.spcue.service.HostManager;

import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
Expand Down Expand Up @@ -151,6 +154,22 @@ public class PrometheusMetricsCollector {
.name("cue_host_reports_received_total").help("Total number of host reports received")
.labelNames("env", "cuebot_host", "facility").register();

// Memory-stranded cores: idle cores that cannot be booked because their host is out of memory.
// Reported per allocation.
private static final Gauge coresTotal =
Gauge.build().name("cue_cores_total").help("Total cores on UP and OPEN hosts")
.labelNames("env", "cuebot_hosts", "alloc").register();
private static final Gauge coresIdle =
Gauge.build().name("cue_cores_idle").help("Idle cores on UP and OPEN hosts")
.labelNames("env", "cuebot_hosts", "alloc").register();
private static final Gauge coresMemoryStranded = Gauge.build().name("cue_cores_memory_stranded")
.help("Idle cores on UP and OPEN hosts stranded by insufficient host memory")
.labelNames("env", "cuebot_hosts", "alloc").register();

private static final Logger logger = LogManager.getLogger(PrometheusMetricsCollector.class);

private HostManager hostManager;

private String deployment_environment;
private String cuebot_host;

Expand Down Expand Up @@ -251,6 +270,31 @@ public void collectPrometheusMetrics() {
.set(reportQueue.getTaskCount());
reportQueueRejectedTotal.labels(this.deployment_environment, this.cuebot_host)
.set(reportQueue.getRejectedTaskCount());

// Memory-stranded cores, per allocation. Wrapped separately so a query failure does
// not prevent the queue metrics above from being collected. Core counts are stored in
// units of 100 (100 == one physical core), so divide to report whole cores. The gauges
// are cleared first so allocations that no longer have UP and OPEN hosts do not linger
// as stale series.
if (hostManager != null) {
try {
java.util.List<StrandedCoreStats> strandedStats =
hostManager.getStrandedCoreStats();
coresTotal.clear();
coresIdle.clear();
coresMemoryStranded.clear();
for (StrandedCoreStats stats : strandedStats) {
coresTotal.labels(this.deployment_environment, this.cuebot_host,
stats.allocName).set(stats.totalCores / 100.0);
coresIdle.labels(this.deployment_environment, this.cuebot_host,
stats.allocName).set(stats.idleCores / 100.0);
coresMemoryStranded.labels(this.deployment_environment, this.cuebot_host,
stats.allocName).set(stats.strandedCores / 100.0);
}
} catch (Exception e) {
logger.error("Failed to collect memory-stranded core metrics", e);
}
}
}
}

Expand Down Expand Up @@ -393,4 +437,8 @@ public void setDispatchQueue(DispatchQueue dispatchQueue) {
public void setReportQueue(HostReportQueue reportQueue) {
this.reportQueue = reportQueue;
}

public void setHostManager(HostManager hostManager) {
this.hostManager = hostManager;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@

Comment thread
DiegoTavares marked this conversation as resolved.
package com.imageworks.spcue;

/**
* Per-allocation core counts used to measure memory-stranded cores: idle cores that cannot be
* booked because their host has run out of memory.
*
* All core values are in units of 100 (the same units stored in the host table, where 100 == one
* physical core).
*/
public class StrandedCoreStats {

/** Name of the allocation these counts belong to. */
public final String allocName;

/** Total cores on UP and OPEN hosts in the allocation. */
public final long totalCores;

/** Idle cores on UP and OPEN hosts in the allocation. */
public final long idleCores;

/**
* Idle cores on UP and OPEN hosts in the allocation whose idle memory is at or below
* {@code Dispatcher.MEM_STRANDED_THRESHHOLD}, i.e. cores stranded by memory exhaustion.
*/
public final long strandedCores;

public StrandedCoreStats(String allocName, long totalCores, long idleCores,
long strandedCores) {
this.allocName = allocName;
this.totalCores = totalCores;
this.idleCores = idleCores;
this.strandedCores = strandedCores;
}
}
11 changes: 11 additions & 0 deletions cuebot/src/main/java/com/imageworks/spcue/dao/HostDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
package com.imageworks.spcue.dao;

import java.sql.Timestamp;
import java.util.List;

import com.imageworks.spcue.AllocationInterface;
import com.imageworks.spcue.DispatchHost;
import com.imageworks.spcue.HostEntity;
import com.imageworks.spcue.HostInterface;
import com.imageworks.spcue.LocalHostAssignment;
import com.imageworks.spcue.Source;
import com.imageworks.spcue.StrandedCoreStats;
import com.imageworks.spcue.grpc.host.HardwareState;
import com.imageworks.spcue.grpc.host.HostTagType;
import com.imageworks.spcue.grpc.host.LockState;
Expand Down Expand Up @@ -279,6 +281,15 @@ void updateHostStats(HostInterface host, long totalMemory, long freeMemory, long
*/
int getStrandedCoreUnits(HostInterface h);

/**
* Return per-allocation core counts (total, idle, memory-stranded) across all UP and OPEN
* hosts, one entry per allocation. Stranded cores are idle cores on hosts whose idle memory is
* at or below Dispatcher.MEM_STRANDED_THRESHHOLD. Used to expose memory-stranded core metrics.
*
* @return List of StrandedCoreStats, one per allocation
*/
List<StrandedCoreStats> getStrandedCoreStats();

/**
* Return the number of whole stranded gpus on this host. The must have less than
* Dispacher.MEM_STRANDED_THRESHHOLD for the gpus to be considered stranded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand All @@ -41,6 +42,7 @@
import com.imageworks.spcue.HostInterface;
import com.imageworks.spcue.LocalHostAssignment;
import com.imageworks.spcue.Source;
import com.imageworks.spcue.StrandedCoreStats;
import com.imageworks.spcue.dao.HostDao;
import com.imageworks.spcue.dispatcher.Dispatcher;
import com.imageworks.spcue.dispatcher.ResourceReservationFailureException;
Expand Down Expand Up @@ -595,6 +597,31 @@ public int getStrandedCoreUnits(HostInterface h) {
}
}

// spotless:off
private static final String GET_STRANDED_CORE_STATS =
"SELECT "
+ " alloc.str_name AS alloc_name, "
+ " COALESCE(SUM(host.int_cores), 0) AS total_cores, "
+ " COALESCE(SUM(host.int_cores_idle), 0) AS idle_cores, "
+ " COALESCE(SUM(CASE WHEN host.int_mem_idle <= ? "
+ " THEN host.int_cores_idle ELSE 0 END), 0) AS stranded_cores "
+ "FROM host "
+ "INNER JOIN host_stat ON host.pk_host = host_stat.pk_host "
+ "INNER JOIN alloc ON host.pk_alloc = alloc.pk_alloc "
+ "WHERE host_stat.str_state = ? AND host.str_lock_state = ? "
+ "GROUP BY alloc.str_name";
// spotless:on

@Override
public List<StrandedCoreStats> getStrandedCoreStats() {
return getJdbcTemplate().query(GET_STRANDED_CORE_STATS,
(rs, rowNum) -> new StrandedCoreStats(rs.getString("alloc_name"),
rs.getLong("total_cores"), rs.getLong("idle_cores"),
rs.getLong("stranded_cores")),
Dispatcher.MEM_STRANDED_THRESHHOLD, HardwareState.UP.toString(),
LockState.OPEN.toString());
}

@Override
public int getStrandedGpus(HostInterface h) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.imageworks.spcue.ProcInterface;
import com.imageworks.spcue.ShowInterface;
import com.imageworks.spcue.Source;
import com.imageworks.spcue.StrandedCoreStats;
import com.imageworks.spcue.VirtualProc;
import com.imageworks.spcue.dao.criteria.FrameSearchInterface;
import com.imageworks.spcue.dao.criteria.ProcSearchInterface;
Expand Down Expand Up @@ -200,6 +201,12 @@ void setHostStatistics(HostInterface host, long totalMemory, long freeMemory, lo
*/
int getStrandedCoreUnits(HostInterface h);

/**
* Return per-allocation core counts (total, idle, memory-stranded) across all UP and OPEN
* hosts, one entry per allocation.
*/
List<StrandedCoreStats> getStrandedCoreStats();

/**
* Return the number of stranded cores on the host.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.imageworks.spcue.ProcInterface;
import com.imageworks.spcue.ShowInterface;
import com.imageworks.spcue.Source;
import com.imageworks.spcue.StrandedCoreStats;
import com.imageworks.spcue.VirtualProc;
import com.imageworks.spcue.dao.AllocationDao;
import com.imageworks.spcue.dao.FacilityDao;
Expand Down Expand Up @@ -255,6 +256,12 @@ public int getStrandedCoreUnits(HostInterface h) {
return hostDao.getStrandedCoreUnits(h);
}

@Override
@Transactional(propagation = Propagation.REQUIRED, readOnly = true)
public List<StrandedCoreStats> getStrandedCoreStats() {
return hostDao.getStrandedCoreStats();
}

@Override
@Transactional(propagation = Propagation.REQUIRED, readOnly = true)
public int getStrandedGpuUnits(HostInterface h) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@
<property name="manageQueue" ref="manageQueue" />
<property name="dispatchQueue" ref="dispatchQueue" />
<property name="reportQueue" ref="reportQueue" />
<property name="hostManager" ref="hostManager" />
</bean>

<bean id="historicalManager" class="com.imageworks.spcue.service.HistoricalManagerService">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.imageworks.spcue.test.dao.postgres;

import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;

Expand All @@ -34,6 +35,7 @@
import com.imageworks.spcue.HostEntity;
import com.imageworks.spcue.HostInterface;
import com.imageworks.spcue.Source;
import com.imageworks.spcue.StrandedCoreStats;
import com.imageworks.spcue.config.TestAppConfig;
import com.imageworks.spcue.dao.AllocationDao;
import com.imageworks.spcue.dao.FacilityDao;
Expand Down Expand Up @@ -532,6 +534,45 @@ public void testGetStrandedCoreUnits() {
assertEquals(100, hostDao.getStrandedCoreUnits(host));
}

@Test
@Transactional
@Rollback(true)
public void testGetStrandedCoreStats() {
DispatchHost host = hostManager.createHost(buildRenderHost(TEST_HOST));

String allocName = jdbcTemplate.queryForObject(
"SELECT alloc.str_name FROM host JOIN alloc ON host.pk_alloc = alloc.pk_alloc "
+ "WHERE host.pk_host = ?",
String.class, host.getHostId());

// Host has insufficient idle memory: its idle cores are stranded.
jdbcTemplate.update("UPDATE host SET int_mem_idle = ? WHERE pk_host = ?", CueUtil.GB,
host.getHostId());

StrandedCoreStats stranded = findAllocStats(hostDao.getStrandedCoreStats(), allocName);
assertEquals(host.cores, stranded.totalCores);
assertEquals(host.idleCores, stranded.idleCores);
assertEquals(host.idleCores, stranded.strandedCores);
assertEquals(allocName, stranded.allocName);

// Host now has plenty of idle memory: its idle cores are no longer stranded, but they
// still count toward total and idle cores.
jdbcTemplate.update("UPDATE host SET int_mem_idle = ? WHERE pk_host = ?", CueUtil.GB2,
host.getHostId());

StrandedCoreStats notStranded = findAllocStats(hostDao.getStrandedCoreStats(), allocName);
assertEquals(host.cores, notStranded.totalCores);
assertEquals(host.idleCores, notStranded.idleCores);
assertEquals(0, notStranded.strandedCores);
}

private static StrandedCoreStats findAllocStats(List<StrandedCoreStats> stats,
String allocName) {
return stats.stream().filter(s -> allocName.equals(s.allocName)).findFirst()
.orElseThrow(() -> new AssertionError(
"No StrandedCoreStats returned for allocation " + allocName));
}

@Test
@Transactional
@Rollback(true)
Expand Down
Loading