Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package zio.redis

import com.dimafeng.testcontainers.DockerComposeContainer
import zio._
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.TimeUnit

object DeadlockReproductionSpec extends IntegrationSpec {

def spec: Spec[TestEnvironment, Any] =
suite("Redis deadlock reproduction tests")(
suite("Queue backpressure deadlock")(
test("concurrent requests exceed queue capacity") {
for {
redis <- ZIO.service[Redis]
_ <- ZIO.logInfo("Starting queue backpressure test")

// Flood the queue with concurrent requests
fibers <- ZIO.foreach(1 to 1000) { i =>
redis.get(s"key-$i").returning[String].fork
}

// Try to join all fibers - this should deadlock if the issue exists
results <- ZIO.collectAllPar(fibers.map(_.join))
.timeoutFail(new Exception("Deadlock detected: requests timed out"))(30.seconds)

_ <- ZIO.logInfo(s"Completed ${results.length} requests")
} yield assertTrue(results.length == 1000)
},

test("mixed read/write operations under backpressure") {
for {
redis <- ZIO.service[Redis]

// Create rapid concurrent mixed operations
writeFibers <- ZIO.foreach(1 to 500) { i =>
redis.set(s"key-$i", s"value-$i").fork
}

readFibers <- ZIO.foreach(1 to 500) { i =>
redis.get(s"key-$i").returning[String].fork
}

// Join all operations with timeout
_ <- ZIO.collectAllPar(writeFibers.map(_.join))
.timeoutFail(new Exception("Write operations deadlocked"))(60.seconds)

_ <- ZIO.collectAllPar(readFibers.map(_.join))
.timeoutFail(new Exception("Read operations deadlocked"))(60.seconds)

} yield assertTrue(true)
}
),

suite("Connection failure scenarios")(
test("simulated connection stress") {
for {
redis <- ZIO.service[Redis]

// Start some background operations
backgroundFibers <- ZIO.foreach(1 to 100) { i =>
(for {
_ <- ZIO.sleep(100.millis)
_ <- redis.set(s"bg-key-$i", s"bg-value-$i")
result <- redis.get(s"bg-key-$i").returning[String]
} yield result).timeout(5.seconds).forever.fork
}

// Wait a bit for operations to start
_ <- ZIO.sleep(2.seconds)

// Try new operations - these should not deadlock
newOperations <- ZIO.foreach(1 to 50) { i =>
redis.get(s"test-key-$i").returning[String].timeout(10.seconds).fork
}

results <- ZIO.collectAllPar(newOperations.map(_.join))
.timeoutFail(new Exception("Operations deadlocked under stress"))(30.seconds)

// Clean up background fibers
_ <- ZIO.foreachDiscard(backgroundFibers)(_.interrupt)

} yield assertTrue(results.length == 50)
} @@ flaky // This test might be flaky due to timing
),

suite("High concurrency stress test")(
test("massive concurrent get operations") {
for {
redis <- ZIO.service[Redis]

// Pre-populate some keys
_ <- ZIO.foreachDiscard(1 to 100) { i =>
redis.set(s"stress-key-$i", s"stress-value-$i")
}

// Launch massive number of concurrent get operations
startTime <- Clock.currentTime(TimeUnit.MILLISECONDS)
fibers <- ZIO.foreach(1 to 2000) { i =>
redis.get(s"stress-key-${i % 100}").returning[String].fork
}

// Collect results with timeout to detect deadlocks
results <- ZIO.collectAllPar(fibers.map(_.join))
.timeoutFail(new Exception("Massive concurrency caused deadlock"))(120.seconds)

endTime <- Clock.currentTime(TimeUnit.MILLISECONDS)
duration = endTime - startTime

_ <- ZIO.logInfo(s"Completed ${results.length} operations in ${duration}ms")

} yield assertTrue(results.length == 2000)
},

test("interleaved operations with fiber racing") {
for {
redis <- ZIO.service[Redis]

// Create racing operations on the same keys
racingOperations <- ZIO.foreach(1 to 100) { i =>
val key = s"race-key-${i % 10}" // Intentional key collision

ZIO.raceAll(
redis.set(key, s"value-$i"),
List(
redis.get(key).returning[String].map(_.getOrElse("not-found")),
redis.del(key),
redis.set(key, s"updated-value-$i")
)
).fork
}

// Wait for all racing operations to complete
results <- ZIO.collectAllPar(racingOperations.map(_.join))
.timeoutFail(new Exception("Racing operations caused deadlock"))(60.seconds)

} yield assertTrue(results.length == 100)
}
),

suite("Resource cleanup and error handling")(
test("proper cleanup on fiber interruption") {
for {
redis <- ZIO.service[Redis]

// Start long-running operations that will be interrupted
longRunningFibers <- ZIO.foreach(1 to 50) { i =>
(for {
_ <- ZIO.sleep(10.seconds) // Long delay
_ <- redis.get(s"cleanup-key-$i").returning[String]
} yield ()).fork
}

// Let them start, then interrupt them
_ <- ZIO.sleep(1.second)
_ <- ZIO.foreachDiscard(longRunningFibers)(_.interrupt)

// Verify that new operations can still proceed after interruptions
newOperations <- ZIO.foreach(1 to 20) { i =>
redis.set(s"after-interrupt-$i", s"value-$i").fork
}

results <- ZIO.collectAllPar(newOperations.map(_.join))
.timeoutFail(new Exception("Operations failed after fiber interruption"))(30.seconds)

} yield assertTrue(results.length == 20)
}
)
).provideSomeShared[TestEnvironment](
Redis.singleNode,
singleNodeConfig(IntegrationSpec.SingleNode0),
ZLayer.succeed(ProtobufCodecSupplier),
compose(
service(IntegrationSpec.SingleNode0, ".*Ready to accept connections.*")
)
) @@ sequential @@ withLiveEnvironment
}
152 changes: 152 additions & 0 deletions modules/redis-it/src/test/scala/zio/redis/ExecutorDeadlockSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package zio.redis

import com.dimafeng.testcontainers.DockerComposeContainer
import zio._
import zio.test.TestAspect._
import zio.test._

/**
* Tests designed to reproduce the specific deadlock scenario where promises
* await indefinitely due to issues in the SingleNodeExecutor's send/receive loop.
*/
object ExecutorDeadlockSpec extends IntegrationSpec {

def spec: Spec[TestEnvironment, Any] =
suite("Executor promise deadlock reproduction")(
test("promise await deadlock under stress") {
for {
redis <- ZIO.service[Redis]

_ <- ZIO.logInfo("Testing promise await deadlock scenario")

// Create a burst of operations that might overwhelm the executor
burstSize = 500
concurrentFibers <- ZIO.foreach(1 to burstSize) { i =>
(for {
// Multiple operations in sequence to increase pressure
_ <- redis.set(s"burst-$i", s"value-$i")
result1 <- redis.get(s"burst-$i").returning[String]
_ <- redis.del(s"burst-$i")
result2 <- redis.get(s"burst-$i").returning[String]

// Return both results
} yield (result1, result2))
.timeout(10.seconds) // Individual operation timeout
.fork
}

// Collect all results - this should complete or timeout indicating deadlock
results <- ZIO.collectAllPar(concurrentFibers.map(_.join))
.timeoutFail(new Exception("EXECUTOR DEADLOCK: Promises stuck awaiting!"))(60.seconds)

successful = results.count(_.isDefined)
timedOut = results.count(_.isEmpty)

_ <- ZIO.logInfo(s"Burst test results: $successful successful, $timedOut timed out")

// Even if some operations timeout, we should not have a complete deadlock
} yield assertTrue(successful > 0)
},

test("rapid fire operations with connection stress") {
for {
redis <- ZIO.service[Redis]

_ <- ZIO.logInfo("Testing rapid fire operations")

// Fire operations as fast as possible without delays
rapidOperations <- ZIO.foreach(1 to 1000) { i =>
redis.ping().timeout(5.seconds).fork
}

startTime <- Clock.currentTime(java.util.concurrent.TimeUnit.MILLISECONDS)

results <- ZIO.collectAllPar(rapidOperations.map(_.join))
.timeoutFail(new Exception("Rapid fire operations caused deadlock"))(90.seconds)

endTime <- Clock.currentTime(java.util.concurrent.TimeUnit.MILLISECONDS)
duration = endTime - startTime

successful = results.count(_.isDefined)
_ <- ZIO.logInfo(s"Rapid fire: $successful/1000 operations in ${duration}ms")

} yield assertTrue(successful >= 900) // Allow for some failures but not complete deadlock
},

test("interleaved long and short operations") {
for {
redis <- ZIO.service[Redis]

// Mix of quick operations and slower operations that might cause timing issues
quickOps <- ZIO.foreach(1 to 100) { i =>
redis.ping().timeout(1.second).fork
}

// Operations that use more complex data structures
complexOps <- ZIO.foreach(1 to 50) { i =>
(for {
_ <- redis.hSet(s"hash-$i", ("field1", "value1"), ("field2", "value2"), ("field3", "value3"))
result <- redis.hGetAll(s"hash-$i").returning[String, String]
_ <- redis.del(s"hash-$i")
} yield result).timeout(5.seconds).fork
}

// Collect quick operations
quickResults <- ZIO.collectAllPar(quickOps.map(_.join))
.timeoutFail(new Exception("Quick operations deadlocked"))(30.seconds)

// Collect complex operations
complexResults <- ZIO.collectAllPar(complexOps.map(_.join))
.timeoutFail(new Exception("Complex operations deadlocked"))(45.seconds)

_ <- ZIO.logInfo(s"Mixed operations: ${quickResults.length} quick, ${complexResults.length} complex")

} yield assertTrue(quickResults.length == 100 && complexResults.length == 50)
},

test("stress test with error recovery") {
for {
redis <- ZIO.service[Redis]

// Start background operations
backgroundOps <- ZIO.foreach(1 to 20) { i =>
(for {
_ <- ZIO.sleep((i * 100).millis) // Stagger the operations
_ <- redis.set(s"bg-$i", s"background-$i")
result <- redis.get(s"bg-$i").returning[String]
} yield result).timeout(10.seconds).fork
}

// Wait for operations to start
_ <- ZIO.sleep(1.second)

// Additional stress operations
stressOps <- ZIO.foreach(1 to 30) { i =>
redis.set(s"stress-$i", s"value-$i").timeout(10.seconds).fork
}

// Wait for background operations
backgroundResults <- ZIO.collectAllPar(backgroundOps.map(_.join))
.timeout(30.seconds)
.orElse(ZIO.succeed(Chunk.empty))

// Stress operations should mostly succeed
stressResults <- ZIO.collectAllPar(stressOps.map(_.join))
.timeoutFail(new Exception("Operations deadlocked under stress"))(30.seconds)

successfulStress = stressResults.count(_.isDefined)
_ <- ZIO.logInfo(s"Stress operations: $successfulStress/30 successful")

} yield assertTrue(successfulStress >= 20) // Allow some failures

} @@ flaky // This test might be flaky due to timing

).provideSomeShared[TestEnvironment](
Redis.singleNode,
singleNodeConfig(IntegrationSpec.SingleNode0),
ZLayer.succeed(ProtobufCodecSupplier),
compose(
service(IntegrationSpec.SingleNode0, ".*Ready to accept connections.*")
)
) @@ sequential @@ withLiveEnvironment @@ timeout(5.minutes)
}
Loading
Loading