-
Notifications
You must be signed in to change notification settings - Fork 68
Implement PubSub api #714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement PubSub api #714
Changes from 43 commits
a280504
cd4a3f3
a01213d
9541519
ba89a59
9723182
65b524b
892ba20
43edb47
a6f74fa
c7decb7
b6438f0
5fb75e4
ce97f95
a25805b
fd0b303
7cb1491
2ab2db1
87eee71
f532e4b
e9e2a3c
014d2e4
49c141b
8b6ce73
f44c7fc
758ea9b
9febcf8
92aea5e
3fc77c0
3c8a307
0ea75a1
ef3b9f1
478577b
c4105ff
f432815
149ae4b
67652e6
8606409
6a3f68f
1a0862f
92457c4
bf52e26
c027295
a154683
592d327
386fefd
415f75e
1699211
be11d67
552e002
e218b08
ac85cb9
dadb5e7
88c3f4e
6e50d4d
5ca98e7
539ef3f
63cc263
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ package zio.redis | |
| import zio._ | ||
| import zio.redis.internal.RespValue | ||
| import zio.redis.options.Cluster.{Node, Partition, SlotRange} | ||
| import zio.redis.options.PubSub.{NumberOfSubscribers, PushProtocol} | ||
| import zio.schema.Schema | ||
| import zio.schema.codec.BinaryCodec | ||
|
|
||
|
|
@@ -639,6 +640,50 @@ object Output { | |
| } | ||
| } | ||
|
|
||
| case object PushProtocolOutput extends Output[PushProtocol] { | ||
| protected def tryDecode(respValue: RespValue): PushProtocol = | ||
| respValue match { | ||
| case RespValue.NullArray => throw ProtocolError(s"Array must not be empty") | ||
| case RespValue.Array(values) => | ||
| val name = MultiStringOutput.unsafeDecode(values(0)) | ||
| val key = MultiStringOutput.unsafeDecode(values(1)) | ||
| name match { | ||
| case "subscribe" => | ||
| val num = LongOutput.unsafeDecode(values(2)) | ||
| PushProtocol.Subscribe(key, num) | ||
| case "psubscribe" => | ||
| val num = LongOutput.unsafeDecode(values(2)) | ||
| PushProtocol.PSubscribe(key, num) | ||
| case "unsubscribe" => | ||
| val num = LongOutput.unsafeDecode(values(2)) | ||
| PushProtocol.Unsubscribe(key, num) | ||
| case "punsubscribe" => | ||
| val num = LongOutput.unsafeDecode(values(2)) | ||
| PushProtocol.PUnsubscribe(key, num) | ||
| case "message" => | ||
| PushProtocol.Message(key, values(2)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would assign |
||
| case "pmessage" => | ||
| val channel = MultiStringOutput.unsafeDecode(values(2)) | ||
| PushProtocol.PMessage(key, channel, values(3)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would assign |
||
| case other => throw ProtocolError(s"$other isn't a pushed message") | ||
| } | ||
| case other => throw ProtocolError(s"$other isn't an array") | ||
| } | ||
| } | ||
|
|
||
| case object NumSubResponseOutput extends Output[Chunk[NumberOfSubscribers]] { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know that there is an order in the response, but maybe you should use a |
||
| protected def tryDecode(respValue: RespValue): Chunk[NumberOfSubscribers] = | ||
| respValue match { | ||
| case RespValue.Array(values) => | ||
| Chunk.fromIterator(values.grouped(2).map { chunk => | ||
| val channel = MultiStringOutput.unsafeDecode(chunk(0)) | ||
| val numOfSubs = LongOutput.unsafeDecode(chunk(1)) | ||
| NumberOfSubscribers(channel, numOfSubs) | ||
| }) | ||
| case other => throw ProtocolError(s"$other isn't an array") | ||
| } | ||
| } | ||
|
|
||
| private def decodeDouble(bytes: Chunk[Byte]): Double = { | ||
| val text = new String(bytes.toArray, StandardCharsets.UTF_8) | ||
| try text.toDouble | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| /* | ||
| * Copyright 2021 John A. De Goes and the ZIO contributors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package zio.redis | ||
|
|
||
| import zio._ | ||
|
|
||
| trait RedisSubscription extends api.Subscription | ||
|
mijicd marked this conversation as resolved.
|
||
|
|
||
| object RedisSubscription { | ||
| lazy val local: ZLayer[CodecSupplier, RedisError.IOError, RedisSubscription] = | ||
| SubscriptionExecutor.local >>> makeLayer | ||
|
|
||
| lazy val singleNode: ZLayer[CodecSupplier & RedisConfig, RedisError.IOError, RedisSubscription] = | ||
| SubscriptionExecutor.layer >>> makeLayer | ||
|
|
||
| private def makeLayer: URLayer[CodecSupplier & SubscriptionExecutor, RedisSubscription] = | ||
| ZLayer.fromFunction(Live.apply _) | ||
|
|
||
| private final case class Live(codecSupplier: CodecSupplier, executor: SubscriptionExecutor) extends RedisSubscription | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,219 @@ | ||
| /* | ||
| * Copyright 2021 John A. De Goes and the ZIO contributors | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package zio.redis | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It belongs to |
||
|
|
||
| import zio.redis.Input.{CommandNameInput, StringInput} | ||
| import zio.redis.Output.PushProtocolOutput | ||
| import zio.redis.SingleNodeSubscriptionExecutor.{Request, RequestQueueSize, True} | ||
| import zio.redis.api.Subscription | ||
| import zio.redis.internal._ | ||
| import zio.redis.options.PubSub.PushProtocol | ||
| import zio.stream._ | ||
| import zio.{Chunk, ChunkBuilder, IO, Promise, Queue, Ref, Schedule, Scope, URIO, ZIO} | ||
|
|
||
| private[redis] final class SingleNodeSubscriptionExecutor private ( | ||
| channelSubsRef: Ref[Map[String, Chunk[Queue[Take[RedisError, PushProtocol]]]]], | ||
| patternSubsRef: Ref[Map[String, Chunk[Queue[Take[RedisError, PushProtocol]]]]], | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't we use hubs?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is to ensure the callback is invoked. response messages can be delivered from Redis before target streams are consumed. if we use hub then response messages can be loss that have to invoke callbacks
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but maybe It seems like we could use a promise queue for handling response messages (like
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added promise queue for waiting subscribe responses and let them use |
||
| reqQueue: Queue[Request], | ||
| connection: RedisConnection | ||
| ) extends SubscriptionExecutor { | ||
| def execute(command: RespCommand): Stream[RedisError, PushProtocol] = | ||
| ZStream | ||
| .fromZIO( | ||
| for { | ||
| commandName <- | ||
| ZIO | ||
| .fromOption(command.args.collectFirst { case RespCommandArgument.CommandName(name) => name }) | ||
| .orElseFail(RedisError.CommandNameNotFound(command.args.toString())) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's adjust the error constructor to avoid invoking |
||
| stream <- commandName match { | ||
| case Subscription.Subscribe => ZIO.succeed(subscribe(channelSubsRef, command)) | ||
| case Subscription.PSubscribe => ZIO.succeed(subscribe(patternSubsRef, command)) | ||
| case Subscription.Unsubscribe => ZIO.succeed(unsubscribe(command)) | ||
| case Subscription.PUnsubscribe => ZIO.succeed(unsubscribe(command)) | ||
| case other => ZIO.fail(RedisError.InvalidPubSubCommand(other)) | ||
| } | ||
| } yield stream | ||
| ) | ||
| .flatten | ||
|
|
||
| private def subscribe( | ||
| subscriptionRef: Ref[Map[String, Chunk[Queue[Take[RedisError, PushProtocol]]]]], | ||
| command: RespCommand | ||
| ): Stream[RedisError, PushProtocol] = | ||
| ZStream | ||
| .fromZIO( | ||
| for { | ||
| queues <- | ||
| ZIO.foreach(command.args.collect { case key: RespCommandArgument.Key => key.value.asString })(key => | ||
| Queue | ||
| .unbounded[Take[RedisError, PushProtocol]] | ||
| .tap(queue => | ||
| subscriptionRef.update(subscription => | ||
| subscription.updated(key, subscription.getOrElse(key, Chunk.empty) ++ Chunk.single(queue)) | ||
| ) | ||
| ) | ||
| .map(key -> _) | ||
| ) | ||
| promise <- Promise.make[RedisError, Unit] | ||
| _ <- reqQueue.offer( | ||
| Request( | ||
| command.args.map(_.value), | ||
| promise | ||
| ) | ||
| ) | ||
| streams = queues.map { case (key, queue) => | ||
| ZStream | ||
| .fromQueueWithShutdown(queue) | ||
| .ensuring( | ||
| subscriptionRef.update(subscription => | ||
| subscription.get(key) match { | ||
| case Some(queues) => subscription.updated(key, queues.filterNot(_ == queue)) | ||
| case None => subscription | ||
| } | ||
| ) | ||
| ) | ||
| } | ||
| _ <- promise.await.tapError(_ => ZIO.foreachDiscard(queues) { case (_, queue) => queue.shutdown }) | ||
| } yield streams.fold(ZStream.empty)(_ merge _) | ||
| ) | ||
| .flatten | ||
| .flattenTake | ||
|
|
||
| private def unsubscribe(command: RespCommand): Stream[RedisError, PushProtocol] = | ||
| ZStream | ||
| .fromZIO( | ||
| for { | ||
| promise <- Promise.make[RedisError, Unit] | ||
| _ <- reqQueue.offer(Request(command.args.map(_.value), promise)) | ||
| _ <- promise.await | ||
| } yield ZStream.empty | ||
| ) | ||
| .flatten | ||
|
|
||
| private def send = | ||
| reqQueue.takeBetween(1, RequestQueueSize).flatMap { reqs => | ||
| val buffer = ChunkBuilder.make[Byte]() | ||
| val it = reqs.iterator | ||
|
|
||
| while (it.hasNext) { | ||
| val req = it.next() | ||
| buffer ++= RespValue.Array(req.command).asBytes | ||
| } | ||
|
|
||
| val bytes = buffer.result() | ||
|
|
||
| connection | ||
| .write(bytes) | ||
| .mapError(RedisError.IOError(_)) | ||
| .tapBoth( | ||
| e => ZIO.foreachDiscard(reqs.map(_.promise))(_.fail(e)), | ||
| _ => ZIO.foreachDiscard(reqs.map(_.promise))(_.succeed(())) | ||
| ) | ||
| } | ||
|
|
||
| private def receive: IO[RedisError, Unit] = { | ||
| def offerMessage( | ||
| subscriptionRef: Ref[Map[String, Chunk[Queue[Take[RedisError, PushProtocol]]]]], | ||
| key: String, | ||
| msg: PushProtocol | ||
| ) = for { | ||
| subscription <- subscriptionRef.get | ||
| _ <- ZIO.foreachDiscard(subscription.get(key))( | ||
| ZIO.foreachDiscard(_)(queue => queue.offer(Take.single(msg)).unlessZIO(queue.isShutdown)) | ||
| ) | ||
| } yield () | ||
|
|
||
| def releaseStream(subscriptionRef: Ref[Map[String, Chunk[Queue[Take[RedisError, PushProtocol]]]]], key: String) = | ||
| for { | ||
| subscription <- subscriptionRef.getAndUpdate(_ - key) | ||
| _ <- ZIO.foreachDiscard(subscription.get(key))( | ||
| ZIO.foreachDiscard(_)(queue => queue.offer(Take.end).unlessZIO(queue.isShutdown)) | ||
| ) | ||
| } yield () | ||
|
|
||
| connection.read | ||
| .mapError(RedisError.IOError(_)) | ||
| .via(RespValue.Decoder) | ||
| .collectSome | ||
| .mapZIO(resp => ZIO.attempt(PushProtocolOutput.unsafeDecode(resp))) | ||
| .refineToOrDie[RedisError] | ||
| .foreach { | ||
| case msg @ PushProtocol.Subscribe(channel, _) => offerMessage(channelSubsRef, channel, msg) | ||
| case msg @ PushProtocol.Unsubscribe(channel, _) => | ||
| offerMessage(channelSubsRef, channel, msg) *> releaseStream(channelSubsRef, channel) | ||
| case msg @ PushProtocol.Message(channel, _) => offerMessage(channelSubsRef, channel, msg) | ||
| case msg @ PushProtocol.PSubscribe(pattern, _) => offerMessage(patternSubsRef, pattern, msg) | ||
| case msg @ PushProtocol.PUnsubscribe(pattern, _) => | ||
| offerMessage(patternSubsRef, pattern, msg) *> releaseStream(patternSubsRef, pattern) | ||
| case msg @ PushProtocol.PMessage(pattern, _, _) => offerMessage(patternSubsRef, pattern, msg) | ||
| } | ||
| } | ||
|
|
||
| private def resubscribe: IO[RedisError, Unit] = { | ||
| def makeCommand(name: String, keys: Chunk[String]) = | ||
| if (keys.isEmpty) | ||
| Chunk.empty | ||
| else | ||
| RespValue | ||
| .Array((CommandNameInput.encode(name) ++ Input.Varargs(StringInput).encode(keys)).args.map(_.value)) | ||
| .asBytes | ||
|
|
||
| for { | ||
| channels <- channelSubsRef.get.map(_.keys) | ||
| patterns <- patternSubsRef.get.map(_.keys) | ||
| commands = makeCommand(Subscription.Subscribe, Chunk.fromIterable(channels)) ++ | ||
| makeCommand(Subscription.PSubscribe, Chunk.fromIterable(patterns)) | ||
| _ <- connection | ||
| .write(commands) | ||
| .when(commands.nonEmpty) | ||
| .mapError(RedisError.IOError(_)) | ||
| .retryWhile(True) | ||
| } yield () | ||
| } | ||
|
|
||
| /** | ||
| * Opens a connection to the server and launches receive operations. All failures are retried by opening a new | ||
| * connection. Only exits by interruption or defect. | ||
| */ | ||
| val run: IO[RedisError, AnyVal] = | ||
| ZIO.logTrace(s"$this PubSub sender and reader has been started") *> | ||
| (send.repeat(Schedule.forever) race receive) | ||
| .tapError(e => ZIO.logWarning(s"Reconnecting due to error: $e") *> resubscribe) | ||
| .retryWhile(True) | ||
| .tapError(e => ZIO.logError(s"Executor exiting: $e")) | ||
| } | ||
|
|
||
| private[redis] object SingleNodeSubscriptionExecutor { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This whole module if full of copy-paste. Let's try to reduce that.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made |
||
| private final case class Request( | ||
| command: Chunk[RespValue.BulkString], | ||
| promise: Promise[RedisError, Unit] | ||
| ) | ||
|
|
||
| private final val True: Any => Boolean = _ => true | ||
|
|
||
| private final val RequestQueueSize = 16 | ||
|
|
||
| def create(conn: RedisConnection): URIO[Scope, SubscriptionExecutor] = | ||
| for { | ||
| reqQueue <- Queue.bounded[Request](RequestQueueSize) | ||
| channelRef <- Ref.make(Map.empty[String, Chunk[Queue[Take[RedisError, PushProtocol]]]]) | ||
| patternRef <- Ref.make(Map.empty[String, Chunk[Queue[Take[RedisError, PushProtocol]]]]) | ||
| pubSub = new SingleNodeSubscriptionExecutor(channelRef, patternRef, reqQueue, conn) | ||
| _ <- pubSub.run.forkScoped | ||
| _ <- logScopeFinalizer(s"$pubSub Subscription Node is closed") | ||
| } yield pubSub | ||
| } | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,42 @@ | ||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||
| * Copyright 2021 John A. De Goes and the ZIO contributors | ||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||||
| * you may not use this file except in compliance with the License. | ||||||||||||||||||||||||||
| * You may obtain a copy of the License at | ||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||
| * Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||
| * See the License for the specific language governing permissions and | ||||||||||||||||||||||||||
| * limitations under the License. | ||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| package zio.redis | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| import zio.redis.internal.{RedisConnection, RespCommand} | ||||||||||||||||||||||||||
| import zio.redis.options.PubSub.PushProtocol | ||||||||||||||||||||||||||
| import zio.stream._ | ||||||||||||||||||||||||||
| import zio.{Layer, ZIO, ZLayer} | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| trait SubscriptionExecutor { | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't belong here. Similarly to all other executors, it should go to |
||||||||||||||||||||||||||
| private[redis] def execute(command: RespCommand): Stream[RedisError, PushProtocol] | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| object SubscriptionExecutor { | ||||||||||||||||||||||||||
| lazy val layer: ZLayer[RedisConfig, RedisError.IOError, SubscriptionExecutor] = | ||||||||||||||||||||||||||
| RedisConnection.layer.fresh >>> pubSublayer | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I know, when a client subscribes to channels, it is restricted from sending most commands except for a few that are allowed in a subscribing connection
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so it's for separating connection between |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| lazy val local: Layer[RedisError.IOError, SubscriptionExecutor] = | ||||||||||||||||||||||||||
| RedisConnection.local.fresh >>> pubSublayer | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| private lazy val pubSublayer: ZLayer[RedisConnection, RedisError.IOError, SubscriptionExecutor] = | ||||||||||||||||||||||||||
| ZLayer.scoped( | ||||||||||||||||||||||||||
| for { | ||||||||||||||||||||||||||
| conn <- ZIO.service[RedisConnection] | ||||||||||||||||||||||||||
| pubSub <- SingleNodeSubscriptionExecutor.create(conn) | ||||||||||||||||||||||||||
| } yield pubSub | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename accordingly (see the remark about
PushProtocolbelow). Another question is how visible it should be.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PushMessageOutputis only used ininternalpackage so I addedprivate[redis]to this objecta154683