Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions redis/src/main/scala/zio/redis/ClusterExecutor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package zio.redis

import zio._
import zio.redis.ClusterExecutor._
import zio.redis.api.Cluster.AskingCommand
import zio.redis.codecs.StringUtf8Codec
import zio.redis.commands.Cluster.askingCommand
import zio.redis.options.Cluster._
import zio.schema.codec.BinaryCodec

Expand All @@ -42,7 +42,7 @@ final case class ClusterExecutor(
def executeAsk(address: RedisUri) =
for {
executor <- executor(address)
_ <- executor.execute(AskingCommand(StringUtf8Codec, this).resp(()))
_ <- executor.execute(askingCommand(StringUtf8Codec, this).resp(()))
res <- executor.execute(command)
} yield res

Expand Down
89 changes: 82 additions & 7 deletions redis/src/main/scala/zio/redis/Output.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package zio.redis

import zio._
import zio.redis.Output.TransactionOutput
import zio.redis.options.Cluster.{Node, Partition, SlotRange}
import zio.schema.Schema
import zio.schema.codec.BinaryCodec

sealed trait Output[+A] {
self =>

sealed trait Output[+A] { self =>
private[redis] final def unsafeDecode(respValue: RespValue)(implicit codec: BinaryCodec): A =
respValue match {
case error: RespValue.Error => throw error.toRedisError
Expand All @@ -32,9 +31,17 @@ sealed trait Output[+A] {

protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): A

final def map[B](f: A => B): Output[B] =
def map[B](f: A => B): Output[B] =
new Output[B] {
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): B = f(self.tryDecode(respValue))

override protected def count: Int = self.count
}

protected def count: Int =
self match {
case value: TransactionOutput[_, _, _] => value.count
case _ => 1
}

}
Expand Down Expand Up @@ -369,7 +376,7 @@ object Output {
}

case object StreamGroupsInfoOutput extends Output[Chunk[StreamGroupsInfo]] {
override protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[StreamGroupsInfo] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[StreamGroupsInfo] =
respValue match {
case RespValue.NullArray => Chunk.empty
case RespValue.Array(messages) =>
Expand Down Expand Up @@ -408,7 +415,7 @@ object Output {
}

case object StreamConsumersInfoOutput extends Output[Chunk[StreamConsumersInfo]] {
override protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[StreamConsumersInfo] =
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Chunk[StreamConsumersInfo] =
respValue match {
case RespValue.NullArray => Chunk.empty
case RespValue.Array(messages) =>
Expand Down Expand Up @@ -446,7 +453,7 @@ object Output {

final case class StreamInfoFullOutput[I: Schema, K: Schema, V: Schema]()
extends Output[StreamInfoWithFull.FullStreamInfo[I, K, V]] {
override protected def tryDecode(
protected def tryDecode(
respValue: RespValue
)(implicit codec: BinaryCodec): StreamInfoWithFull.FullStreamInfo[I, K, V] = {
var streamInfoFull: StreamInfoWithFull.FullStreamInfo[I, K, V] = StreamInfoWithFull.FullStreamInfo.empty
Expand Down Expand Up @@ -827,4 +834,72 @@ object Output {
case other => throw ProtocolError(s"$other isn't an array")
}
}

case object QueuedOutput extends Output[Unit] {
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): Unit =
respValue match {
case RespValue.SimpleString("QUEUED") => ()
case other => throw ProtocolError(s"$other isn't queued")
}
}

trait TransactionOutput[A, B, Out] extends Output[Out] { self =>
def left: Output[A]
def right: Output[B]

override final def map[C](f: Out => C): Output[C] =
new TransactionOutput[A, B, C] {
def left: Output[A] = self.left
def right: Output[B] = self.right

protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): C = f(self.tryDecode(respValue))
}

override def count: Int = left.count + right.count
}

final case class Zip[A, B](left: Output[A], right: Output[B]) extends TransactionOutput[A, B, (A, B)] {
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): (A, B) =
respValue match {
case RespValue.Array(values) =>
(left, right) match {
case (left: TransactionOutput[_, _, _], right: TransactionOutput[_, _, _]) =>
(
left.tryDecode(RespValue.Array(values.take(left.count))),
right.tryDecode(RespValue.Array(values.drop(left.count).take(right.count)))
)
case (_, right: TransactionOutput[_, _, _]) =>
(left.tryDecode(values.head), right.tryDecode(RespValue.Array(values.tail)))
case (left: TransactionOutput[_, _, _], _) =>
(left.tryDecode(RespValue.Array(values.init)), right.tryDecode(values.last))
case _ =>
(left.tryDecode(values.head), right.tryDecode(values.last))
}
case other => throw ProtocolError(s"$other is not an array")
}
}

final case class ZipLeft[A, B](left: Output[A], right: Output[B]) extends TransactionOutput[A, B, A] {
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): A =
respValue match {
case RespValue.Array(values) =>
left match {
case _: TransactionOutput[_, _, _] => left.tryDecode(RespValue.Array(values.take(left.count)))
case _ => left.tryDecode(values.head)
}
case other => throw ProtocolError(s"$other is not an array")
}
}

final case class ZipRight[A, B](left: Output[A], right: Output[B]) extends TransactionOutput[A, B, B] {
protected def tryDecode(respValue: RespValue)(implicit codec: BinaryCodec): B =
respValue match {
case RespValue.Array(values) =>
left match {
case _: TransactionOutput[_, _, _] => right.tryDecode(RespValue.Array(values.drop(left.count)))
case _ => right.tryDecode(values.last)
}
case other => throw ProtocolError(s"$other is not an array")
}
}
}
4 changes: 2 additions & 2 deletions redis/src/main/scala/zio/redis/RedisEnvironment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ package zio.redis
import zio.schema.codec.BinaryCodec

private[redis] trait RedisEnvironment {
protected def codec: BinaryCodec
protected def executor: RedisExecutor
def codec: BinaryCodec
def executor: RedisExecutor
Comment on lines +22 to +23
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need them to be public?

}
67 changes: 12 additions & 55 deletions redis/src/main/scala/zio/redis/api/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,12 @@

package zio.redis.api

import zio.redis.Input._
import zio.redis.Output.{ChunkOutput, ClusterPartitionOutput, UnitOutput}
import zio.redis._
import zio.redis.api.Cluster.{AskingCommand, ClusterSetSlots, ClusterSlots}
import zio.redis.options.Cluster.SetSlotSubCommand._
import zio.redis.options.Cluster.{Partition, Slot}
import zio.schema.codec.BinaryCodec
import zio.{Chunk, IO}

trait Cluster extends RedisEnvironment {
trait Cluster extends commands.Cluster {

/**
* When a cluster client receives an -ASK redirect, the ASKING command is sent to the target node followed by the
Expand All @@ -34,19 +30,15 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def asking: IO[RedisError, Unit] =
AskingCommand(codec, executor).run(())
final def asking: IO[RedisError, Unit] = _asking.run(())

/**
* Returns details about which cluster slots map to which Redis instances.
*
* @return
* details about which cluster
*/
final def slots: IO[RedisError, Chunk[Partition]] = {
val command = RedisCommand(ClusterSlots, NoInput, ChunkOutput(ClusterPartitionOutput), codec, executor)
command.run(())
}
final def slots: IO[RedisError, Chunk[Partition]] = _slots.run(())

/**
* Clear any importing / migrating state from hash slot.
Expand All @@ -56,11 +48,8 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotStable(slot: Slot): IO[RedisError, Unit] = {
val command =
RedisCommand(ClusterSetSlots, Tuple2(LongInput, ArbitraryValueInput[String]()), UnitOutput, codec, executor)
command.run((slot.number, Stable.stringify))
}

final def setSlotStable(slot: Slot): IO[RedisError, Unit] = _setSlotStable.run((slot.number, Stable.stringify))

/**
* Set a hash slot in migrating state. Command should be executed on the node from which hash slot will be imported
Expand All @@ -73,16 +62,8 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotMigrating(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryValueInput[String](), ArbitraryValueInput[String]()),
UnitOutput,
codec,
executor
)
command.run((slot.number, Migrating.stringify, nodeId))
}
final def setSlotMigrating(slot: Slot, nodeId: String): IO[RedisError, Unit] =
_setSlotMigrating.run((slot.number, Migrating.stringify, nodeId))

/**
* Set a hash slot in importing state. Command should be executed on the node where hash slot will be migrated
Expand All @@ -95,16 +76,9 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotImporting(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryValueInput[String](), ArbitraryValueInput[String]()),
UnitOutput,
codec,
executor
)
command.run((slot.number, Importing.stringify, nodeId))
}

final def setSlotImporting(slot: Slot, nodeId: String): IO[RedisError, Unit] =
_setSlotImporting.run((slot.number, Importing.stringify, nodeId))

/**
* Bind the hash slot to a different node. It associates the hash slot with the specified node, however the command
Expand All @@ -117,23 +91,6 @@ trait Cluster extends RedisEnvironment {
* @return
* the Unit value.
*/
final def setSlotNode(slot: Slot, nodeId: String): IO[RedisError, Unit] = {
val command = RedisCommand(
ClusterSetSlots,
Tuple3(LongInput, ArbitraryValueInput[String](), ArbitraryValueInput[String]()),
UnitOutput,
codec,
executor
)
command.run((slot.number, Node.stringify, nodeId))
}
}

private[redis] object Cluster {
final val Asking = "ASKING"
final val ClusterSlots = "CLUSTER SLOTS"
final val ClusterSetSlots = "CLUSTER SETSLOT"

final val AskingCommand: (BinaryCodec, RedisExecutor) => RedisCommand[Unit, Unit] =
RedisCommand(Asking, NoInput, UnitOutput, _, _)
final def setSlotNode(slot: Slot, nodeId: String): IO[RedisError, Unit] =
_setSlotNode.run((slot.number, Node.stringify, nodeId))
}
Loading