Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import net.minecraft.client.sounds.SoundEngine;
import org.jspecify.annotations.Nullable;
import org.lwjgl.BufferUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sound.sampled.AudioFormat;
import java.nio.ByteBuffer;
Expand All @@ -32,6 +34,9 @@ class DfpwmStream implements AudioStream {

private static final AudioFormat MONO_8 = new AudioFormat(SpeakerPeripheral.SAMPLE_RATE, 8, 1, true, false);

private static final Logger LOG = LoggerFactory.getLogger(DfpwmStream.class);
private static final long DRIFT_THRESHOLD = (long) (0.5 * SpeakerPeripheral.SAMPLE_RATE); // 24000 samples = 500ms

private final Queue<ByteBuffer> buffers = new ArrayDeque<>(2);

/**
Expand All @@ -53,10 +58,15 @@ class DfpwmStream implements AudioStream {

private int lowPassCharge;

private long consumedSamples = 0;
private long bufferedSamples = 0;
private long streamBaseOffset = 0;
private boolean initialized = false;

DfpwmStream() {
}

void push(EncodedAudio audio) {
private ByteBuffer decodeDfpwm(EncodedAudio audio) {
var charge = audio.charge();
var strength = audio.strength();
var previousBit = audio.previousBit();
Expand Down Expand Up @@ -102,8 +112,45 @@ void push(EncodedAudio audio) {
}

output.flip();
return output;
}

void push(EncodedAudio audio) {
var decoded = decodeDfpwm(audio);
var sampleCount = decoded.remaining();

synchronized (this) {
buffers.add(output);
if (!initialized) {
streamBaseOffset = audio.sampleOffset();
consumedSamples = 0;
bufferedSamples = 0;
initialized = true;
buffers.add(decoded);
bufferedSamples += sampleCount;
return;
}

var expectedServerOffset = streamBaseOffset + consumedSamples + bufferedSamples;
var drift = audio.sampleOffset() - expectedServerOffset;

if (drift > DRIFT_THRESHOLD) {
// Client is behind — hard reset
LOG.debug("Speaker audio hard reset: drift={} samples ({}s)", drift,
String.format("%.2f", (double) drift / SpeakerPeripheral.SAMPLE_RATE));
buffers.clear();
streamBaseOffset = audio.sampleOffset();
consumedSamples = 0;
bufferedSamples = 0;
buffers.add(decoded);
bufferedSamples += sampleCount;
} else if (drift < -DRIFT_THRESHOLD) {
// Packet is stale — drop it
LOG.debug("Speaker audio dropping stale packet: drift={} samples", drift);
} else {
// Normal — enqueue
buffers.add(decoded);
bufferedSamples += sampleCount;
}
}
}

Expand Down Expand Up @@ -131,13 +178,23 @@ public synchronized ByteBuffer read(int capacity) {

result.flip();

// Track consumed samples for drift detection (MONO_8: 1 byte = 1 sample)
var bytesRead = result.remaining();
consumedSamples += bytesRead;
bufferedSamples -= bytesRead;

// This is naughty, but ensures we're not enqueuing empty buffers when the stream is exhausted.
return result.remaining() == 0 ? null : result;
return bytesRead == 0 ? null : result;
}

@Override
public void close() {
buffers.clear();
synchronized (this) {
buffers.clear();
consumedSamples = 0;
bufferedSamples = 0;
initialized = false;
}
}

public boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static dan200.computercraft.shared.peripheral.speaker.SpeakerPeripheral.SAMPLE_RATE;
import static dan200.computercraft.shared.peripheral.speaker.SpeakerPeripheral.clampVolume;

/**
Expand All @@ -22,11 +21,7 @@
class DfpwmState {
private static final long SECOND = TimeUnit.SECONDS.toNanos(1);

/**
* The minimum size of the client's audio buffer. Once we have less than this on the client, we should send another
* batch of audio.
*/
private static final long CLIENT_BUFFER = (long) (SECOND * 0.5);
private static final long STREAM_TIMEOUT = 2L * SECOND;

private static final int PREC = 10;

Expand All @@ -35,7 +30,8 @@ class DfpwmState {
private boolean previousBit = false;

private boolean unplayed = true;
private long clientEndTime = PauseAwareTimer.getTime();
private long sampleOffset = 0;
private long lastSendNanos;
private float pendingVolume = 1.0f;
private @Nullable EncodedAudio pendingAudio;

Expand Down Expand Up @@ -84,27 +80,29 @@ synchronized boolean pushBuffer(LuaTable<?, ?> table, int size, Optional<Double>

buffer.flip();

pendingAudio = new EncodedAudio(initialCharge, initialStrength, initialPreviousBit, buffer);
pendingAudio = new EncodedAudio(initialCharge, initialStrength, initialPreviousBit, buffer, 0L);
pendingVolume = (float) clampVolume(volume.orElse((double) pendingVolume));
return true;
}

boolean shouldSendPending(long now) {
return pendingAudio != null && now >= clientEndTime - CLIENT_BUFFER;
boolean shouldSendPending() {
return pendingAudio != null;
}

EncodedAudio pullPending(long now) {
var audio = pendingAudio;
if (audio == null) throw new IllegalStateException("Should not pull pending audio yet");
pendingAudio = null;
// Compute when we should consider sending the next packet.
clientEndTime = Math.max(now, clientEndTime) + (audio.audio().remaining() * SECOND * 8 / SAMPLE_RATE);
unplayed = false;
audio = audio.withSampleOffset(sampleOffset);
sampleOffset += audio.audio().remaining() * 8L;
lastSendNanos = now;
return audio;
}

boolean isPlaying() {
return unplayed || clientEndTime >= PauseAwareTimer.getTime();
return unplayed || pendingAudio != null
|| PauseAwareTimer.getTime() - lastSendNanos < STREAM_TIMEOUT;
}

float getVolume() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@
/**
* A chunk of encoded audio, along with the state required for the decoder to reproduce the original audio samples.
*
* @param charge The DFPWM charge.
* @param strength The DFPWM strength.
* @param previousBit The previous bit.
* @param audio The block of encoded audio.
* @param charge The DFPWM charge.
* @param strength The DFPWM strength.
* @param previousBit The previous bit.
* @param audio The block of encoded audio.
* @param sampleOffset The sample offset of this audio chunk, used for synchronisation.
*/
public record EncodedAudio(int charge, int strength, boolean previousBit, ByteBuffer audio) {
public record EncodedAudio(int charge, int strength, boolean previousBit, ByteBuffer audio, long sampleOffset) {
public void write(FriendlyByteBuf buf) {
buf.writeVarInt(charge());
buf.writeVarInt(strength());
buf.writeBoolean(previousBit());
buf.writeVarInt(audio.remaining());
buf.writeBytes(audio().duplicate());
buf.writeVarLong(sampleOffset());
}

public static EncodedAudio read(FriendlyByteBuf buf) {
Expand All @@ -34,6 +36,12 @@ public static EncodedAudio read(FriendlyByteBuf buf) {
var bytes = new byte[length];
buf.readBytes(bytes);

return new EncodedAudio(charge, strength, previousBit, ByteBuffer.wrap(bytes));
var sampleOffset = buf.readVarLong();

return new EncodedAudio(charge, strength, previousBit, ByteBuffer.wrap(bytes), sampleOffset);
}

public EncodedAudio withSampleOffset(long offset) {
return new EncodedAudio(charge(), strength(), previousBit(), audio(), offset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void update() {
(ServerLevel) level, pos, sound.volume * 16
);
syncedPosition(position);
} else if (dfpwmState != null && dfpwmState.shouldSendPending(now)) {
} else if (dfpwmState != null && dfpwmState.shouldSendPending()) {
// If clients need to receive another batch of audio, send it and then notify computers our internal buffer is
// free again.
ServerNetworking.sendToAllTracking(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@

import java.nio.ByteBuffer;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.*;

public class DfpwmStreamTest {
@Test
public void testDecodesBytes() {
var stream = new DfpwmStream();

var input = ByteBuffer.wrap(new byte[]{ 43, -31, 33, 44, 30, -16, -85, 23, -3, -55, 46, -70, 68, -67, 74, -96, -68, 16, 94, -87, -5, 87, 11, -16, 19, 92, 85, -71, 126, 5, -84, 64, 17, -6, 85, -11, -1, -87, -12, 1, 85, -56, 33, -80, 82, 104, -93, 17, 126, 23, 91, -30, 37, -32, 117, -72, -58, 11, -76, 19, -108, 86, -65, -10, -1, -68, -25, 10, -46, 85, 124, -54, 15, -24, 43, -94, 117, 63, -36, 15, -6, 88, 87, -26, -83, 106, 41, 13, -28, -113, -10, -66, 119, -87, -113, 68, -55, 40, -107, 62, 20, 72, 3, -96, 114, -87, -2, 39, -104, 30, 20, 42, 84, 24, 47, 64, 43, 61, -35, 95, -65, 42, 61, 42, -50, 4, -9, 81 });
stream.push(new EncodedAudio(0, 0, false, input));
stream.push(new EncodedAudio(0, 0, false, input, 0L));

var buffer = stream.read(1024 + 1);
assertEquals(1024, buffer.remaining(), "Must have read 1024 bytes");
Expand All @@ -29,4 +29,94 @@ public void testDecodesBytes() {

assertEquals(0, buffer.remaining(), "Must have read all bytes");
}

@Test
public void testNormalSequentialPush() {
var stream = new DfpwmStream();
stream.push(makeEncodedAudio(128, 0L));
stream.push(makeEncodedAudio(128, 1024L));
stream.push(makeEncodedAudio(128, 2048L));
assertFalse(stream.isEmpty());

var totalRead = 0;
ByteBuffer result;
while ((result = stream.read(1024)) != null) {
totalRead += result.remaining();
}
assertEquals(3072, totalRead);
}

@Test
public void testHardResetWhenClientBehind() {
var stream = new DfpwmStream();
stream.push(makeEncodedAudio(128, 0L));
// Don't read — simulating client stall
stream.push(makeEncodedAudio(128, 50000L));

var totalRead = 0;
ByteBuffer result;
while ((result = stream.read(1024)) != null) {
totalRead += result.remaining();
}
assertEquals(1024, totalRead);
}

@Test
public void testDropStalePacket() {
var stream = new DfpwmStream();
stream.push(makeEncodedAudio(128, 0L));
stream.read(1024);

stream.push(makeEncodedAudio(128, 50000L));
stream.read(1024);

// Stale packet — way behind current position
stream.push(makeEncodedAudio(128, 0L));
assertTrue(stream.isEmpty());
}

@Test
public void testSmallDriftSelfCorrects() {
var stream = new DfpwmStream();
stream.push(makeEncodedAudio(128, 0L));
stream.push(makeEncodedAudio(128, 1100L)); // slight drift

var totalRead = 0;
ByteBuffer result;
while ((result = stream.read(1024)) != null) {
totalRead += result.remaining();
}
assertEquals(2048, totalRead);
}

@Test
public void testReadStallTriggersReset() {
var stream = new DfpwmStream();
for (var i = 0; i < 30; i++) {
stream.push(makeEncodedAudio(128, (long) i * 1024));
}
// 30 packets * 1024 samples = 30720 buffered. Offset 60000 gives drift of 29280 > 24000 threshold.
stream.push(makeEncodedAudio(128, 60000L));

var totalRead = 0;
ByteBuffer result;
while ((result = stream.read(1024)) != null) {
totalRead += result.remaining();
}
assertEquals(1024, totalRead);
}

@Test
public void testStreamRestartWithHighOffset() {
var stream = new DfpwmStream();
stream.push(makeEncodedAudio(128, 960000L));
assertFalse(stream.isEmpty());
stream.push(makeEncodedAudio(128, 961024L));
assertFalse(stream.isEmpty());
}

private EncodedAudio makeEncodedAudio(int dfpwmBytes, long sampleOffset) {
var audio = ByteBuffer.allocate(dfpwmBytes);
return new EncodedAudio(0, 0, false, audio, sampleOffset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@

import dan200.computercraft.api.lua.LuaException;
import dan200.computercraft.api.lua.ObjectLuaTable;
import dan200.computercraft.shared.util.PauseAwareTimer;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.*;

class DfpwmStateTest {
@Test
Expand All @@ -32,4 +33,44 @@ public void testEncoder() throws LuaException {
contents
);
}

@Test
public void testSampleOffsetAdvances() throws LuaException {
var state = new DfpwmState();
// Create a table with 1024 PCM samples (all zeros for simplicity)
Map<Object, Object> inputTbl = new HashMap<>();
for (var i = 1; i <= 1024; i++) inputTbl.put((double) i, 0);

assertTrue(state.pushBuffer(new ObjectLuaTable(inputTbl), 1024, Optional.empty()));

// Pull first chunk — offset should be 0
var audio = state.pullPending(PauseAwareTimer.getTime());
assertEquals(0L, audio.sampleOffset());

// Push another buffer
assertTrue(state.pushBuffer(new ObjectLuaTable(inputTbl), 1024, Optional.empty()));
var audio2 = state.pullPending(PauseAwareTimer.getTime());
// 1024 samples encode to 128 DFPWM bytes, which represent 1024 samples
assertEquals(1024L, audio2.sampleOffset());
}

@Test
public void testShouldSendPendingWhenBufferAvailable() throws LuaException {
var state = new DfpwmState();
assertFalse(state.shouldSendPending());
Map<Object, Object> inputTbl = new HashMap<>();
for (var i = 1; i <= 1024; i++) inputTbl.put((double) i, 0);
state.pushBuffer(new ObjectLuaTable(inputTbl), 1024, Optional.empty());
assertTrue(state.shouldSendPending());
}

@Test
public void testIsPlayingAfterPull() throws LuaException {
var state = new DfpwmState();
Map<Object, Object> inputTbl = new HashMap<>();
for (var i = 1; i <= 1024; i++) inputTbl.put((double) i, 0);
state.pushBuffer(new ObjectLuaTable(inputTbl), 1024, Optional.empty());
state.pullPending(PauseAwareTimer.getTime());
assertTrue(state.isPlaying());
}
}
Loading
Loading