Skip to content

Commit 45bbdfd

Browse files
committed
Added replication tests
1 parent f525efd commit 45bbdfd

3 files changed

Lines changed: 70 additions & 28 deletions

File tree

src/DotNext.Tests/Net/Cluster/Consensus/Raft/NetworkTransport/ConnectionOriented/Tcp/TcpTransportTests.cs

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,16 @@ namespace DotNext.Net.Cluster.Consensus.Raft.NetworkTransport.ConnectionOriented
1010

1111
using Membership;
1212
using NetworkTransport;
13+
using Replication;
1314
using static DotNext.Extensions.Logging.TestLoggers;
1415

1516
[Collection(TestCollections.Raft)]
1617
public sealed class TcpTransportTests : TransportTestSuite
1718
{
19+
private const int Host1Port = 3362;
20+
private const int Host2Port = 3363;
21+
private const int Host3Port = 3364;
22+
1823
private static X509Certificate2 LoadCertificate()
1924
{
2025
using var rawCertificate = Assembly.GetCallingAssembly().GetManifestResourceStream(typeof(Test), "node.pfx");
@@ -267,13 +272,9 @@ public Task RequestSynchronization()
267272
[Fact]
268273
public static async Task ConcurrentElection() // https://github.com/dotnet/dotNext/issues/168
269274
{
270-
const int host1Port = 3362;
271-
const int host2Port = 3363;
272-
const int host3Port = 3364;
273-
274-
await using var host1 = new RaftCluster(CreateConfiguration(host1Port)) { AuditTrail = new ConsensusOnlyState() };
275-
await using var host2 = new RaftCluster(CreateConfiguration(host2Port)) { AuditTrail = new ConsensusOnlyState() };
276-
await using var host3 = new RaftCluster(CreateConfiguration(host3Port)) { AuditTrail = new ConsensusOnlyState() };
275+
await using var host1 = new RaftCluster(CreateConfiguration(Host1Port)) { AuditTrail = new ConsensusOnlyState() };
276+
await using var host2 = new RaftCluster(CreateConfiguration(Host2Port)) { AuditTrail = new ConsensusOnlyState() };
277+
await using var host3 = new RaftCluster(CreateConfiguration(Host3Port)) { AuditTrail = new ConsensusOnlyState() };
277278

278279
// start in parallel
279280
await Task.WhenAll(
@@ -303,28 +304,55 @@ await Task.WhenAll(
303304
await host1.StopAsync(TestToken);
304305
await host2.StopAsync(TestToken);
305306
await host3.StopAsync(TestToken);
307+
}
308+
309+
[Fact]
310+
public static async Task ReplicateLogEntry()
311+
{
312+
await using var wal1 = CreateWal();
313+
await using var host1 = new RaftCluster(CreateConfiguration(Host1Port)) { AuditTrail = wal1 };
314+
await host1.StartAsync(TestToken);
315+
316+
await using var wal2 = CreateWal();
317+
await using var host2 = new RaftCluster(CreateConfiguration(Host2Port)) { AuditTrail = wal2 };
318+
await host2.StartAsync(TestToken);
319+
320+
await using var wal3 = CreateWal();
321+
await using var host3 = new RaftCluster(CreateConfiguration(Host3Port)) { AuditTrail = wal3 };
322+
await host3.StartAsync(TestToken);
323+
324+
await AssertLeadershipAsync(EqualityComparer<EndPoint>.Default, host1, host2, host3);
325+
var leader = await FindLeaderAsync(host1, host2, host3);
306326

307-
static RaftCluster.TcpConfiguration CreateConfiguration(int port)
327+
// Expected index is 2, because index 1 is added by the leader as a write barrier
328+
Equal(2L, await leader.ReplicateAsync(new EmptyLogEntry { Term = leader.Term }, TestToken));
329+
await leader.As<IReplicationCluster<IRaftLogEntry>>().ReplicateAsync(new EmptyLogEntry { Term = leader.Term }, TestToken);
330+
331+
await host1.StopAsync(TestToken);
332+
await host2.StopAsync(TestToken);
333+
await host3.StopAsync(TestToken);
334+
}
335+
336+
private static RaftCluster.TcpConfiguration CreateConfiguration(int port)
337+
{
338+
var result = new RaftCluster.TcpConfiguration(new IPEndPoint(IPAddress.Loopback, port))
308339
{
309-
var result = new RaftCluster.TcpConfiguration(new IPEndPoint(IPAddress.Loopback, port))
310-
{
311-
// LowerElectionTimeout = 1000,
312-
// UpperElectionTimeout = 2000,
313-
ColdStart = false,
314-
LoggerFactory = CreateDebugLoggerFactory(port.ToString(), static builder => builder.SetMinimumLevel(LogLevel.Debug)),
315-
ConfigurationStorage = null,
316-
};
340+
// LowerElectionTimeout = 1000,
341+
// UpperElectionTimeout = 2000,
342+
ColdStart = false,
343+
LoggerFactory = CreateDebugLoggerFactory(port.ToString(), static builder => builder.SetMinimumLevel(LogLevel.Debug)),
344+
ConfigurationStorage = null,
345+
};
317346

318-
var configuration = result.ConfigurationStorage as InMemoryClusterConfigurationStorage<EndPoint>;
319-
NotNull(configuration);
320-
var builder = configuration.CreateInitialConfigurationBuilder();
347+
var configuration = result.ConfigurationStorage as InMemoryClusterConfigurationStorage<EndPoint>;
348+
NotNull(configuration);
349+
var builder = configuration.CreateInitialConfigurationBuilder();
321350

322-
builder.Add(new IPEndPoint(IPAddress.Loopback, host1Port));
323-
builder.Add(new IPEndPoint(IPAddress.Loopback, host2Port));
324-
builder.Add(new IPEndPoint(IPAddress.Loopback, host3Port));
351+
builder.Add(new IPEndPoint(IPAddress.Loopback, Host1Port));
352+
builder.Add(new IPEndPoint(IPAddress.Loopback, Host2Port));
353+
builder.Add(new IPEndPoint(IPAddress.Loopback, Host3Port));
325354

326-
builder.Build();
327-
return result;
328-
}
355+
builder.Build();
356+
return result;
329357
}
330358
}

src/DotNext.Tests/Net/Cluster/Consensus/Raft/NetworkTransport/TransportTestSuite.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -496,6 +496,6 @@ private protected async Task LeadershipCore(Func<int, bool, IPersistentState, Ra
496496
await host1.StopAsync();
497497
}
498498

499-
private static WriteAheadLog CreateWal()
499+
protected static WriteAheadLog CreateWal()
500500
=> new(new() { Location = GetTempPath() }, IStateMachine.CreateNoOp());
501501
}

src/DotNext.Tests/Net/Cluster/Consensus/Raft/RaftTest.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ internal void OnLeaderChanged(ICluster sender, IClusterMember leader)
2222
}
2323
}
2424

25-
internal static async Task<EndPoint> AssertLeadershipAsync(IEqualityComparer<EndPoint> comparer, params IRaftCluster[] nodes)
25+
protected static async Task<EndPoint> AssertLeadershipAsync(IEqualityComparer<EndPoint> comparer, params IRaftCluster[] nodes)
2626
{
2727
EndPoint ep = null;
2828
var startTime = new Timestamp();
2929

3030
restart:
3131
foreach (var n in nodes)
3232
{
33-
var leader = await n.WaitForLeaderAsync(DefaultTimeout);
33+
var leader = await n.WaitForLeaderAsync(DefaultTimeout, TestToken);
3434
NotNull(leader);
3535

3636
if (ep is null)
@@ -54,4 +54,18 @@ internal static async Task<EndPoint> AssertLeadershipAsync(IEqualityComparer<End
5454
NotNull(ep);
5555
return ep;
5656
}
57+
58+
protected static async Task<RaftCluster> FindLeaderAsync(params RaftCluster[] candidates)
59+
{
60+
for (;; TestToken.ThrowIfCancellationRequested())
61+
{
62+
foreach (var candidate in candidates)
63+
{
64+
if (!candidate.LeadershipToken.IsCancellationRequested)
65+
return candidate;
66+
}
67+
68+
await Task.Yield();
69+
}
70+
}
5771
}

0 commit comments

Comments
 (0)