Skip to content

Commit 35f2c18

Browse files
pwalczaksmiklosovic
authored andcommitted
CASSANDRA-21321: Add RowsRead and RowsMutated counters to TableMetrics for accurate per-table row throughput tracking
1 parent c84f3d4 commit 35f2c18

4 files changed

Lines changed: 68 additions & 0 deletions

File tree

src/java/org/apache/cassandra/db/Keyspace.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,8 @@ else if (isDeferrable)
577577
}
578578

579579
cfs.getWriteHandler().write(upd, ctx, updateIndexes);
580+
int rowCount = upd.affectedRowCount();
581+
cfs.metric.rowsMutated.inc(rowCount);
580582

581583
if (requiresViewUpdate)
582584
baseComplete.set(currentTimeMillis());

src/java/org/apache/cassandra/db/ReadCommand.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,7 @@ public void onClose()
718718

719719
metric.tombstoneScannedHistogram.update(tombstones);
720720
metric.liveScannedHistogram.update(liveRows);
721+
metric.rowsRead.inc(liveRows);
721722

722723
boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
723724
if (warnTombstones)

src/java/org/apache/cassandra/metrics/TableMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ public class TableMetrics
166166
public final TableHistogram purgeableTombstoneScannedHistogram;
167167
/** Live rows scanned in queries on this CF */
168168
public final TableHistogram liveScannedHistogram;
169+
/** Total number of live rows read from this CF (cumulative counter, suitable for rate/windowed calculations) */
170+
public final Counter rowsRead;
171+
/** Total number of rows mutated in writes to this CF (cumulative counter, suitable for rate/windowed calculations) */
172+
public final Counter rowsMutated;
169173
/** Column update time delta on this CF */
170174
public final TableHistogram colUpdateTimeDeltaHistogram;
171175
/** time taken acquiring the partition lock for materialized view updates for this table */
@@ -812,6 +816,8 @@ public Long getValue()
812816
tombstoneScannedHistogram = createTableHistogram("TombstoneScannedHistogram", cfs.keyspace.metric.tombstoneScannedHistogram, false);
813817
purgeableTombstoneScannedHistogram = createTableHistogram("PurgeableTombstoneScannedHistogram", cfs.keyspace.metric.purgeableTombstoneScannedHistogram, true);
814818
liveScannedHistogram = createTableHistogram("LiveScannedHistogram", cfs.keyspace.metric.liveScannedHistogram, false);
819+
rowsRead = createTableCounter("RowsRead");
820+
rowsMutated = createTableCounter("RowsMutated");
815821
colUpdateTimeDeltaHistogram = createTableHistogram("ColUpdateTimeDeltaHistogram", cfs.keyspace.metric.colUpdateTimeDeltaHistogram, false);
816822
coordinatorReadLatency = createTableTimer("CoordinatorReadLatency");
817823
coordinatorScanLatency = createTableTimer("CoordinatorScanLatency");

test/unit/org/apache/cassandra/metrics/TableMetricsTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,65 @@ public void testPreparedStatementsExecuted()
213213
assertGreaterThan(cfs.metric.coordinatorWriteLatency.getMeanRate(), 0);
214214
}
215215

216+
@Test
217+
public void testRowsMutatedCounter()
218+
{
219+
ColumnFamilyStore cfs = recreateTable();
220+
assertEquals(0, cfs.metric.rowsMutated.getCount());
221+
222+
// Each INSERT touches exactly one row
223+
session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (1, 'a', 'b')", KEYSPACE, TABLE));
224+
assertEquals(1, cfs.metric.rowsMutated.getCount());
225+
226+
session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (2, 'c', 'd')", KEYSPACE, TABLE));
227+
assertEquals(2, cfs.metric.rowsMutated.getCount());
228+
229+
// Batch of 3 rows — counter should jump by 3
230+
executeBatch(false, 3, 1);
231+
assertEquals(5, cfs.metric.rowsMutated.getCount());
232+
233+
assertRowsContains(cluster, session.execute("SELECT * FROM system_metrics.table_group"),
234+
row("org.apache.cassandra.metrics.Table.RowsMutated.junit.tablemetricstest",
235+
"junit.tablemetricstest",
236+
"counter",
237+
String.valueOf(cfs.metric.rowsMutated.getCount())));
238+
assertRowsContains(cluster, session.execute("SELECT * FROM system_metrics.column_family_group"),
239+
row("org.apache.cassandra.metrics.ColumnFamily.RowsMutated.junit.tablemetricstest",
240+
"junit.tablemetricstest",
241+
"counter",
242+
String.valueOf(cfs.metric.rowsMutated.getCount())));
243+
}
244+
245+
@Test
246+
public void testRowsReadCounter()
247+
{
248+
ColumnFamilyStore cfs = recreateTable();
249+
assertEquals(0, cfs.metric.rowsRead.getCount());
250+
251+
// Seed some rows
252+
for (int i = 0; i < 5; i++)
253+
session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (%d, 'v%d', 'x')", KEYSPACE, TABLE, i, i));
254+
255+
// Full-table scan should touch all 5 rows
256+
session.execute(String.format("SELECT * FROM %s.%s", KEYSPACE, TABLE));
257+
assertEquals(5, cfs.metric.rowsRead.getCount());
258+
259+
// Single-partition read touches 1 row
260+
session.execute(String.format("SELECT * FROM %s.%s WHERE id = 0", KEYSPACE, TABLE));
261+
assertEquals(6, cfs.metric.rowsRead.getCount());
262+
263+
assertRowsContains(cluster, session.execute("SELECT * FROM system_metrics.table_group"),
264+
row("org.apache.cassandra.metrics.Table.RowsRead.junit.tablemetricstest",
265+
"junit.tablemetricstest",
266+
"counter",
267+
String.valueOf(cfs.metric.rowsRead.getCount())));
268+
assertRowsContains(cluster, session.execute("SELECT * FROM system_metrics.column_family_group"),
269+
row("org.apache.cassandra.metrics.ColumnFamily.RowsRead.junit.tablemetricstest",
270+
"junit.tablemetricstest",
271+
"counter",
272+
String.valueOf(cfs.metric.rowsRead.getCount())));
273+
}
274+
216275
@Test
217276
public void testLoggedPartitionsPerBatch()
218277
{

0 commit comments

Comments
 (0)