Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions src/Discord.Net.WebSocket/Audio/AudioClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,8 @@ internal async Task CreateInputStreamAsync(ulong userId)

AudioOutStream opusDecoder = new OpusDecodeStream(readerStream); //Passes header

if (_dave is not null)
var hasDave = _dave is not null;
if (hasDave)
{
opusDecoder = new DaveDecryptStream(
this,
Expand All @@ -396,6 +397,7 @@ internal async Task CreateInputStreamAsync(ulong userId)

_streams.TryAdd(userId, new StreamPair(readerStream, decryptStream));

await _audioLogger.DebugAsync($"Created input stream for user {userId} (dave={hasDave})");
await _streamCreatedEvent.InvokeAsync(userId, readerStream);
}
}
Expand Down Expand Up @@ -429,6 +431,35 @@ internal async Task ClearInputStreamsAsync()
_streams.Clear();
}

/// <summary>
/// Rebuilds all existing input streams to include the DAVE decrypt layer.
/// Called after the initial DAVE transition completes since streams may have
/// been created before the DaveSessionManager was initialized.
/// </summary>
internal async Task RebuildInputStreamsForDaveAsync()
{
if (_dave is null) return;

var existingUsers = _streams.Keys.ToArray();
if (existingUsers.Length == 0) return;

await _audioLogger.DebugAsync($"Rebuilding {existingUsers.Length} input stream(s) with DAVE decrypt layer");

foreach (var userId in existingUsers)
{
if (_streams.TryRemove(userId, out var pair))
{
await _streamDestroyedEvent.InvokeAsync(userId).ConfigureAwait(false);
await pair.Reader.DisposeAsync();
}
}

foreach (var userId in existingUsers)
{
await CreateInputStreamAsync(userId);
}
}

#endregion

private async Task ProcessMessageAsync(VoiceOpCode opCode, object payload)
Expand Down Expand Up @@ -599,18 +630,20 @@ private async Task ProcessPacketAsync(byte[] packet)
}

string ip;
ushort externalPort;
try
{
ip = Encoding.UTF8.GetString(packet, 8, 74 - 10).TrimEnd('\0');
externalPort = (ushort)((packet[72] << 8) | packet[73]);
}
catch (Exception ex)
{
await _audioLogger.DebugAsync("Malformed Packet", ex).ConfigureAwait(false);
return;
}

await _audioLogger.DebugAsync("Received Discovery").ConfigureAwait(false);
await ApiClient.SendSelectProtocol(ip).ConfigureAwait(false);
await _audioLogger.DebugAsync($"Received Discovery (ip={ip}, externalPort={externalPort}, localPort={ApiClient.UdpPort})").ConfigureAwait(false);
await ApiClient.SendSelectProtocol(ip, externalPort).ConfigureAwait(false);
}
else if (_connection.State == ConnectionState.Connected)
{
Expand Down
4 changes: 4 additions & 0 deletions src/Discord.Net.WebSocket/Audio/DaveSessionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ await _logger.DebugAsync(

if (protocolVersion is not Dave.DisabledProtocolVersion && !ratchet.IsNull)
Encryptor.Ratchet = ratchet;

// Streams created before DAVE was initialized lack the DaveDecryptStream layer.
// Rebuild them now that keys are ready.
await _client.RebuildInputStreamsForDaveAsync();
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/Discord.Net.WebSocket/DiscordVoiceApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -164,15 +164,15 @@ public Task SendIdentityAsync(ulong userId, string sessionId, string token, usho
});
}

public Task SendSelectProtocol(string externalIp)
public Task SendSelectProtocol(string externalIp, ushort externalPort)
{
return SendAsync(VoiceOpCode.SelectProtocol, new SelectProtocolParams
{
Protocol = "udp",
Data = new UdpProtocolInfo
{
Address = externalIp,
Port = UdpPort,
Port = externalPort,
Mode = Mode
}
});
Expand Down