Conversation
|
It might be better to get the subscription state for connection recovery 🤔 |
|
@mijicd I think I've done for the first step. please give me a feedback about this PR when you are ready 😃 |
| ) extends RedisExecutor { | ||
| scope: Scope.Closeable, | ||
| codec: BinaryCodec | ||
| ) extends RedisExecutor |
There was a problem hiding this comment.
Don't think that cluster executor should inherit RedisPubSub.
I think it's more logic if it would be other way, there are would be some RedisPubSubClusterExecutor that is going to inherit ClusterExecuter, like SingleNodeRedisPubSubExecutor
And there is going to be some consistency.
|
@0pg please rebase and address the change requested by @anatolysergeev 🙏 |
| } | ||
| } | ||
|
|
||
| final def subscribe(channel: String, channels: String*): ZStream[Redis, RedisError, PushProtocol] = |
There was a problem hiding this comment.
In case of multiple channels passed, we should return a List/Chunk of streams. Also, I think that we should hide PushProtocol from the end user.
There was a problem hiding this comment.
In case of multiple channels passed, we should return a List/Chunk of streams.
okay then it'd be better to get api with a single param and the callers have to handle multiple params using ZIO.foreach
I think that we should hide PushProtocol from the end user.
hmm... meaning that defines separate data types containing pushed message content for only end users or registers callbacks for each pubsub command for the users who want to handle subscribe/unsubscribe messages?
There was a problem hiding this comment.
or using ResultBuilder that ask to give callbacks of sub/unsub when calls returning
There was a problem hiding this comment.
def subscribe(channel: String): ResultOutputStreamBuilder =
new ResultOutputStreamBuilder(
override def returning[R: Schema]: ZStream[Redis, RedisError, R] =
ZStream.serviceWithStream[Redis] { redis =>
RedisPubSubCommand
.run(RedisPubSubCommand.Subscribe(channel))
.collect { case t: PushProtocol.Message => t.message }
.mapZIO { resp =>
ZIO
.attempt(ArbitraryOutput[R]().unsafeDecode(resp)(redis.codec))
.refineToOrDie[RedisError]
}
}
)
def subscribe(channels: Chunk[String]): ResultOutputMultiStreamBuilder =
new ResultOutputMultiStreamBuilder(
override def returning[R: Schema]: UIO[Chunk[ZStream[Redis, RedisError, R]]] =
ZIO.serviceWithZIO[Redis] { redis =>
RedisPubSubCommand
.run(RedisPubSubCommand.Subscribe(channels.head, channels.tail))
.collect { case t: PushProtocol.Message => t.message }
.broadcast(channels.length, maximumLag)
.map(streams =>
streams.zipWith(channels) { (stream, channel) =>
stream
.filter(_.key.value == channel)
.mapZIO { resp =>
ZIO
.attempt(ArbitraryOutput[R]().unsafeDecode(resp)(redis.codec))
.refineToOrDie[RedisError]
}
}
)
}This is just a sketch. I hope that I managed to explain what's the base idea through this code. Of course, something like ResultOutputMultiStreamBuilder should be added for multi-stream scenarios.
There was a problem hiding this comment.
yeah that's almost same what I though and mentioned above 👍
Fix RedisExecutor structure Add PubSub api Fix broken compile Add PushProtocolOutput suite Fix message field type as generic Fix broken output Fix PushProtocol output spec Add key property in PushProtocol Add test implementation Add PubSub integration test Fix formatting Refactor RedisPubSub Apply RedisPubSub refactoring Apply RedisPubSub refactoring to t/c Remove unused file Fix logic bugs Fix broken t/c Fix unsubscribe process Fix pubSubSpec Add request message broker in SingleNodeRedisPubSub Simplify RedisPubSub's public api Revert unrelated changes
|
@anovakovic01 Hi there are some changes please check 🙏
Cluster PubSubExecutor will be worked by another PR |
| case class Pattern(value: String) extends SubscriptionKey | ||
| } | ||
|
|
||
| case class NumSubResponse(channel: String, subscriberCount: Long) |
There was a problem hiding this comment.
Maybe it would be better to name this class NumberOfSubscribers, or NumOfSubs. Also, you should place it in the options/PubSub.scala file and make it final.
There was a problem hiding this comment.
I renamed and moved it inside to companion object of options/PubSub
|
|
||
| object PushProtocol { | ||
| case class Subscribe(channel: String, numOfSubscription: Long) extends PushProtocol { | ||
| def key: SubscriptionKey = SubscriptionKey.Channel(channel) |
There was a problem hiding this comment.
Do you need SubscriptionKey? Can you use String instead?
There was a problem hiding this comment.
you mean that gets rid of SubscriptionKey? or changes return type of def key to String?
I changed return type to String
There was a problem hiding this comment.
Sorry for not being clear. I meant to remove SubscriptionKey entirely and replace it with a plain String.
There was a problem hiding this comment.
ah I moved SubscriptionKey into SingleNodeRedisPubSub as a private case class
|
|
||
| final def subscribeWithCallback( | ||
| channel: String, | ||
| channels: List[String] |
| final def subscribe(channel: String): ResultStreamBuilder[Id] = | ||
| subscribeWithCallback(channel)(emptyCallback) | ||
|
|
||
| final def subscribe(channel: String, channels: List[String]): ResultStreamBuilder[List] = |
| ZIO | ||
| .attempt(ArbitraryOutput[R]().unsafeDecode(msg)(codec)) | ||
| .refineToOrDie[RedisError] | ||
| .asSome |
There was a problem hiding this comment.
I think that this can be part of the run implementation in the RedisPubSubCommand.
There was a problem hiding this comment.
I made RedisPubSubCommand takes responsibilities of transforming message types and invoking callback
| sealed trait PubSubCommand | ||
|
|
||
| object PubSubCommand { | ||
| case class Subscribe( | ||
| channel: String, | ||
| channels: List[String], | ||
| onSubscribe: PubSubCallback | ||
| ) extends PubSubCommand | ||
| case class PSubscribe( | ||
| pattern: String, | ||
| patterns: List[String], | ||
| onSubscribe: PubSubCallback | ||
| ) extends PubSubCommand | ||
| case class Unsubscribe(channels: List[String]) extends PubSubCommand | ||
| case class PUnsubscribe(patterns: List[String]) extends PubSubCommand | ||
| } |
There was a problem hiding this comment.
We should hide these from the end user. These should be in a dedicated PubSubCommand file.
There was a problem hiding this comment.
I moved it to its own file but do we need to add package private access modifier?
There was a problem hiding this comment.
I think that we do if it's not part of the api interface.
There was a problem hiding this comment.
I added package private accessor
Co-authored-by: Aleksandar Novaković <anovakovic01@gmail.com>
Co-authored-by: Aleksandar Novaković <anovakovic01@gmail.com>
|
@anovakovic01 I fixed some cross compile issues 🙏 |
|
@anovakovic01 this PR needs to address a few things before merging (compilation, linting, sync with master) |
|
Is there anything to do about test failures? it seems like non deterministic 🤔 |
|
It looks like these aren't directly related to your change, so I'd say let's not touch any of them in this PR. |
mijicd
left a comment
There was a problem hiding this comment.
Thank you for taking care of this. It's a massive chunk of work, and it's going in the right direction.
During the first review pass, I focused on the package layout, top-level abstractions, and protocol correctness. Once all of them are in place, I'll do a more detailed review of its internals, specifically the streaming part.
One general remark is that we can improve the test suite to use property tests and cover more than just a "happy path". Note that this remark applies to most tests, but we must start improving their state somewhere.
| case class Subscribe(channel: String, numOfSubs: Long) extends PushProtocol | ||
| case class PSubscribe(pattern: String, numOfSubs: Long) extends PushProtocol | ||
| case class Unsubscribe(channel: String, numOfSubs: Long) extends PushProtocol | ||
| case class PUnsubscribe(pattern: String, numOfSubs: Long) extends PushProtocol | ||
| case class Message(channel: String, message: RespValue) extends PushProtocol | ||
| case class PMessage(pattern: String, channel: String, message: RespValue) extends PushProtocol |
There was a problem hiding this comment.
Please make the case classes final. Besides that, types look odd, and there's a lot of duplication.
There was a problem hiding this comment.
I grouped message types using new SubscriptionKey type that contains context of channel or pattern
a154683
There was a problem hiding this comment.
it also changes SingleNodeSubscriptionExecutor's member Ref that manages subscription state
| object PubSub { | ||
| type PubSubCallback = (String, Long) => UIO[Unit] | ||
|
|
||
| private[redis] sealed trait PushProtocol |
There was a problem hiding this comment.
Let's rename this to PushMessage. Also, this doesn't really look like an option, it's very much internal thing.
| } | ||
| } | ||
|
|
||
| case object PushProtocolOutput extends Output[PushProtocol] { |
There was a problem hiding this comment.
Please rename accordingly (see the remark about PushProtocol below). Another question is how visible it should be.
There was a problem hiding this comment.
PushMessageOutput is only used in internal package so I added private[redis] to this object
a154683
| commandName <- | ||
| ZIO | ||
| .fromOption(command.args.collectFirst { case RespCommandArgument.CommandName(name) => name }) | ||
| .orElseFail(RedisError.CommandNameNotFound(command.args.toString())) |
There was a problem hiding this comment.
Let's adjust the error constructor to avoid invoking toString explicitly.
| new ResultStreamBuilder1[({ type lambda[x] = (String, x) })#lambda] { | ||
| def returning[R: Schema]: Stream[RedisError, (String, R)] = | ||
| RedisSubscriptionCommand(executor).subscribe( | ||
| Chunk.single(channel) ++ Chunk.fromIterable(channels), |
| pattern: String, | ||
| patterns: String* |
There was a problem hiding this comment.
Are you suggesting that defines the parameter types for channels and patterns?
| } | ||
|
|
||
| object Subscription { | ||
| private lazy val emptyCallback = (_: String, _: Long) => ZIO.unit |
There was a problem hiding this comment.
Let's follow the constants definition guideline. Also no need to make it lazy.
There was a problem hiding this comment.
I removed this value and replaced to using Option
| import zio.schema.Schema | ||
| import zio.schema.codec.BinaryCodec | ||
|
|
||
| private[redis] trait SubscribeEnvironment { |
There was a problem hiding this comment.
We might be able to generalize the environments, though I'm fine with exploring that possibility in a follow-up.
|
|
||
| import zio._ | ||
| import zio.redis.Output._ | ||
| import zio.redis.Output.{PushProtocolOutput, _} |
mijicd
left a comment
There was a problem hiding this comment.
Thank you for taking care of this. It's a massive chunk of work, and it's going in the right direction.
During the first review pass, I focused on the package layout, top-level abstractions, and protocol correctness. Once all of them are in place, I'll do a more detailed review of its internals, specifically the streaming part.
One general remark is that we can improve the test suite to use property tests and cover more than just a "happy path". Note that this remark applies to most tests, but we must start improving their state somewhere.
|
@mijicd Thank you for your review. I worked some parts based on comments. please to check changes and reply comments when you're fine. 😃 |
mijicd
left a comment
There was a problem hiding this comment.
@anovakovic01 @anatolysergeev Please take a look!
|
|
||
| package zio.redis.internal | ||
|
|
||
| object PubSub { |
There was a problem hiding this comment.
Just mark the whole object as private[redis]. Everything in internal is marked that way.
|
|
||
| private[redis] object RequestQueue { | ||
| private final val RequestQueueSize = 16 | ||
| def create[A]: UIO[Queue[A]] = Queue.bounded[A](RequestQueueSize) |
There was a problem hiding this comment.
I would drop this and move constant to package object.
There was a problem hiding this comment.
I moved constant to package object and removed this object
| val run: IO[RedisError, AnyVal] = | ||
| ZIO.logTrace(s"$this sender and reader has been started") *> | ||
| (send.repeat(Schedule.forever) race receive) | ||
| .tapError(e => ZIO.logWarning(s"Reconnecting due to error: $e") *> onError(e)) | ||
| .retryWhile(True) | ||
| .tapError(e => ZIO.logError(s"Executor exiting: $e")) |
There was a problem hiding this comment.
It should likely be private and final.
There was a problem hiding this comment.
I marked it as (it broke compile on scala3) protected finalprivate[internal] final
| import zio.{Chunk, ChunkBuilder, Hub, IO, Promise, Queue, Ref, Scope, UIO, URIO, ZIO} | ||
|
|
||
| private[redis] final class SingleNodeSubscriptionExecutor private ( | ||
| subsRef: Ref[Map[SubscriptionKey, Hub[Take[RedisError, PushMessage]]]], |
There was a problem hiding this comment.
It might be worth using ConcurrentMap.
There was a problem hiding this comment.
I replaced Ref[Map[...]] to ConcurrentMap
| def getHub(key: SubscriptionKey): IO[RedisError, Hub[Take[RedisError, PushMessage]]] = | ||
| subsRef.get | ||
| .map(_.get(key)) | ||
| .flatMap(ZIO.fromOption(_)) |
There was a problem hiding this comment.
I think you can go without (_). Do that on all applicable places.
There was a problem hiding this comment.
I changed to use .apply because 3.2.2 version requires it
…criptionExecutor.scala Co-authored-by: Dejan Mijić <dmijic@acm.org>
| } | ||
| } | ||
|
|
||
| case object NumSubResponseOutput extends Output[Chunk[NumberOfSubscribers]] { |
There was a problem hiding this comment.
I know that there is an order in the response, but maybe you should use a Map instead (for easier access to the value of the specific channel).
| .fromOption(command.args.collectFirst { case RespCommandArgument.CommandName(name) => | ||
| name | ||
| }) | ||
| .orElseFail(RedisError.CommandNameNotFound(command)) |
There was a problem hiding this comment.
I think that ProtocolError should be thrown instead, but I'm not sure how you can do it from here. @mijicd maybe we can update RespCommand to receive a NonEmptyChunk because it always has to have a name (or add a name parameter). We could do this in a dedicated PR.
There was a problem hiding this comment.
Thrown ProtocolError more makes sense 🤔 I'll try to this using NonEmptyChunk approach in another PR
There was a problem hiding this comment.
I made PR that gives name separated field
#852
| onSubscribe: Option[PubSubCallback], | ||
| onUnsubscribe: Option[PubSubCallback] |
There was a problem hiding this comment.
What do you think about giving default values here and removing Option by introducing NoopCallback:
| onSubscribe: Option[PubSubCallback], | |
| onUnsubscribe: Option[PubSubCallback] | |
| onSubscribe: PubSubCallback = NoopCallback, | |
| onUnsubscribe: PubSubCallback = NoopCallback |
The same suggestion applies to other methods with optional callback parameters.
There was a problem hiding this comment.
yeah it seems to make more ergonomic api. I changed to use NoopCallback.
I changed function name subscribe to subscribeSingle for only single channel subscription call because it's more clear to distinct multiple channels subscription call (also they have different return type signature)
5ca98e7 (#714)
| }.map(SubscriptionKey.Pattern.apply) | ||
|
|
||
| def send: IO[RedisError.IOError, Unit] = | ||
| requests.takeAll.flatMap { reqs => |
There was a problem hiding this comment.
Use takeBetween(1, RequestQueueSize) just like in the existing send implementation.
|
|
||
| trait Subscription extends SubscribeEnvironment { | ||
|
|
||
| final def subscribe(channel: String): ResultStreamBuilder1[Id] = |
There was a problem hiding this comment.
Add doc comments for the public API.
There was a problem hiding this comment.
I added doc Subscription and Publishing apis
|
@0pg Great work! You're almost there, these are some minor changes that I've commented on, but nothing major. |
resolves #160
Added
RedisSubscriptionandSubscriptionExecutorSubscriptionExecutorStream[RedisError, (String, A)]that tuple is (key, value) pair