Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package zio.redis

import zio._
import zio.test.TestAspect._
import zio.test._

/**
* Simple test to validate diagnostic logging is working
*/
object DiagnosticLoggingSpec extends IntegrationSpec {

def spec: Spec[TestEnvironment, Any] =
suite("Diagnostic logging validation")(
test("basic redis operations with diagnostic logging") {
for {
redis <- ZIO.service[Redis]
_ <- ZIO.logInfo("Starting diagnostic logging test")

// Simple get/set operations to trigger logging
_ <- redis.set("diagnostic-key", "diagnostic-value")
result <- redis.get("diagnostic-key").returning[String]

_ <- ZIO.logInfo(s"Diagnostic test completed, result: $result")
} yield assertTrue(result == Some("diagnostic-value"))
},

test("multiple concurrent operations to trigger detailed logging") {
for {
redis <- ZIO.service[Redis]

// Run multiple operations concurrently to see queue logging
fibers <- ZIO.foreach(1 to 10) { i =>
redis.set(s"concurrent-key-$i", s"value-$i").fork
}

_ <- ZIO.collectAllPar(fibers.map(_.join))

// Read them back
results <- ZIO.foreach(1 to 10) { i =>
redis.get(s"concurrent-key-$i").returning[String]
}

} yield assertTrue(results.flatten.length == 10)
}
).provideSomeShared[TestEnvironment](
Redis.singleNode,
singleNodeConfig(IntegrationSpec.SingleNode0),
ZLayer.succeed(ProtobufCodecSupplier),
compose(
service(IntegrationSpec.SingleNode0, ".*Ready to accept connections.*")
)
) @@ sequential @@ withLiveEnvironment
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,70 @@ private[redis] final class SingleNodeExecutor private (

// TODO NodeExecutor doesn't throw connection errors, timeout errors, it is hanging forever
def execute(command: RespCommand): UIO[IO[RedisError, RespValue]] =
Promise
.make[RedisError, RespValue]
.flatMap(promise => requests.offer(Request(command.args.map(_.value), promise)).as(promise.await))
for {
promiseId <- Random.nextUUID.map(_.toString.take(8))
_ <- ZIO.logDebug(s"[REDIS-EXEC-$promiseId] Creating promise for command: ${command.args.head.value}")
promise <- Promise.make[RedisError, RespValue]
_ <- requests.size.flatMap(size => ZIO.logDebug(s"[REDIS-EXEC-$promiseId] Current requests queue size: $size/$requestQueueSize"))
_ <- requests.offer(Request(command.args.map(_.value), promise, promiseId))
_ <- requests.size.flatMap(size => ZIO.logDebug(s"[REDIS-EXEC-$promiseId] After offer, requests queue size: $size/$requestQueueSize"))
_ <- ZIO.logDebug(s"[REDIS-EXEC-$promiseId] Request queued, starting promise.await")
result = promise.await.tap(_ => ZIO.logDebug(s"[REDIS-EXEC-$promiseId] Promise completed successfully"))
.tapError(e => ZIO.logWarning(s"[REDIS-EXEC-$promiseId] Promise failed: $e"))
} yield result

def onError(e: RedisError): UIO[Unit] = responses.takeAll.flatMap(ZIO.foreachDiscard(_)(_.fail(e)))
def onError(e: RedisError): UIO[Unit] =
for {
_ <- ZIO.logWarning(s"[REDIS-ERROR] SingleNodeExecutor error occurred: $e")
pendingResponses <- responses.takeAll
_ <- ZIO.logWarning(s"[REDIS-ERROR] Failing ${pendingResponses.length} pending responses")
_ <- ZIO.foreachDiscard(pendingResponses)(_.fail(e))
_ <- ZIO.logWarning(s"[REDIS-ERROR] Error handling completed")
} yield ()

def send: IO[RedisError.IOError, Unit] =
requests.takeBetween(1, requestQueueSize).flatMap { requests =>
val bytes =
requests
.foldLeft(new ChunkBuilder.Byte())((builder, req) => builder ++= RespValue.Array(req.command).asBytes)
.result()

connection
.write(bytes)
.mapError(RedisError.IOError(_))
.tapBoth(
e => ZIO.foreachDiscard(requests)(_.promise.fail(e)),
_ => ZIO.foreachDiscard(requests)(req => responses.offer(req.promise))
)
.unit
}
for {
queueSizeBefore <- requests.size
_ <- ZIO.logDebug(s"[REDIS-SEND] About to take requests from queue (current size: $queueSizeBefore)")
requestsToSend <- requests.takeBetween(1, requestQueueSize)
queueSizeAfter <- requests.size
_ <- ZIO.logDebug(s"[REDIS-SEND] Processing ${requestsToSend.length} requests (queue size: $queueSizeBefore -> $queueSizeAfter): ${requestsToSend.map(_.promiseId).mkString(",")}")
bytes = requestsToSend
.foldLeft(new ChunkBuilder.Byte())((builder, req) => builder ++= RespValue.Array(req.command).asBytes)
.result()
_ <- ZIO.logDebug(s"[REDIS-SEND] Writing ${bytes.length} bytes to connection")
result <- connection
.write(bytes)
.mapError(RedisError.IOError(_))
.tapBoth(
e => ZIO.logError(s"[REDIS-SEND] Write failed, failing ${requestsToSend.length} promises: $e") *>
ZIO.foreachDiscard(requestsToSend)(_.promise.fail(e)),
_ => responses.size.flatMap(responseQueueSize =>
ZIO.logDebug(s"[REDIS-SEND] Write successful, moving ${requestsToSend.length} promises to response queue (current response queue size: $responseQueueSize)")
) *>
ZIO.foreachDiscard(requestsToSend)(req =>
responses.offer(req.promise).tap(_ => ZIO.logDebug(s"[REDIS-SEND] Promise ${req.promiseId} moved to response queue"))
)
)
.unit
} yield result

def receive: IO[RedisError, Unit] =
connection.read
.mapError(RedisError.IOError(_))
.via(RespValue.Decoder)
.collectSome
.foreach(response => responses.take.flatMap(_.succeed(response)))
.foreach { response =>
responses.size.flatMap(responseQueueSize =>
ZIO.logDebug(s"[REDIS-RECV] Received response, taking promise from queue (current response queue size: $responseQueueSize)")
) *>
responses.take.tap(_ => ZIO.logDebug(s"[REDIS-RECV] Completing promise with response"))
.flatMap(_.succeed(response))
.tap(_ => ZIO.logDebug(s"[REDIS-RECV] Promise completed successfully")) *>
responses.size.flatMap(responseQueueSize =>
ZIO.logDebug(s"[REDIS-RECV] After completion, response queue size: $responseQueueSize")
)
}

}

Expand All @@ -72,14 +107,21 @@ private[redis] object SingleNodeExecutor {
def create(connection: RedisConnection): URIO[Scope & RedisConfig, SingleNodeExecutor] =
for {
requestQueueSize <- ZIO.serviceWith[RedisConfig](_.requestQueueSize)
_ <- ZIO.logInfo(s"[REDIS-INIT] Creating SingleNodeExecutor with queue size: $requestQueueSize")
requests <- Queue.bounded[Request](requestQueueSize)
responses <- Queue.unbounded[Promise[RedisError, RespValue]]
executor = new SingleNodeExecutor(connection, requests, responses, requestQueueSize)
_ <- ZIO.logInfo(s"[REDIS-INIT] Starting executor run fiber")
_ <- executor.run.forkScoped
_ <- ZIO.logInfo(s"[REDIS-INIT] SingleNodeExecutor initialized successfully")
_ <- logScopeFinalizer(s"$executor Node Executor is closed")
} yield executor

private final case class Request(command: Chunk[RespValue.BulkString], promise: Promise[RedisError, RespValue])
private final case class Request(
command: Chunk[RespValue.BulkString],
promise: Promise[RedisError, RespValue],
promiseId: String
)

private def makeLayer: ZLayer[RedisConnection & RedisConfig, RedisError.IOError, RedisExecutor] =
ZLayer.scoped(ZIO.serviceWithZIO[RedisConnection](create))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ private[redis] trait SingleNodeRunner {
* connection. Only exits by interruption or defect.
*/
private[internal] final val run: IO[RedisError, AnyVal] =
ZIO.logTrace(s"$this sender and reader has been started") *>
ZIO.logInfo(s"[REDIS-RUNNER] Starting sender and receiver fibers") *>
(send.repeat(Schedule.forever) race receive)
.tapError(e => ZIO.logWarning(s"Reconnecting due to error: $e") *> onError(e))
.tapError(e => ZIO.logError(s"[REDIS-RUNNER] Fiber failed, triggering error handling and reconnect: $e") *> onError(e))
.retryWhile(True)
.tapError(e => ZIO.logError(s"Executor exiting: $e"))
.tapError(e => ZIO.logError(s"[REDIS-RUNNER] Executor fiber exiting permanently: $e"))
}

private[redis] object SingleNodeRunner {
Expand Down
Loading