-
Notifications
You must be signed in to change notification settings - Fork 68
Implement PubSub api #714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement PubSub api #714
Changes from 12 commits
a280504
cd4a3f3
a01213d
9541519
ba89a59
9723182
65b524b
892ba20
43edb47
a6f74fa
c7decb7
b6438f0
5fb75e4
ce97f95
a25805b
fd0b303
7cb1491
2ab2db1
87eee71
f532e4b
e9e2a3c
014d2e4
49c141b
8b6ce73
f44c7fc
758ea9b
9febcf8
92aea5e
3fc77c0
3c8a307
0ea75a1
ef3b9f1
478577b
c4105ff
f432815
149ae4b
67652e6
8606409
6a3f68f
1a0862f
92457c4
bf52e26
c027295
a154683
592d327
386fefd
415f75e
1699211
be11d67
552e002
e218b08
ac85cb9
dadb5e7
88c3f4e
6e50d4d
5ca98e7
539ef3f
63cc263
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| package zio.redis | ||
|
|
||
| import zio.schema.codec.BinaryCodec | ||
| import zio.stream._ | ||
| import zio.{IO, ZLayer} | ||
|
|
||
| final case class RedisPubSubCommand(command: PubSubCommand, codec: BinaryCodec, executor: RedisPubSub) { | ||
| def run: IO[RedisError, List[Stream[RedisError, PushProtocol]]] = { | ||
| val codecLayer = ZLayer.succeed(codec) | ||
| executor.execute(command).provideLayer(codecLayer) | ||
| } | ||
| } | ||
|
|
||
| sealed trait PubSubCommand | ||
|
|
||
| object PubSubCommand { | ||
| case class Subscribe( | ||
| channel: String, | ||
| channels: List[String], | ||
| onSubscribe: PubSubCallback | ||
| ) extends PubSubCommand | ||
| case class PSubscribe( | ||
| pattern: String, | ||
| patterns: List[String], | ||
| onSubscribe: PubSubCallback | ||
| ) extends PubSubCommand | ||
| case class Unsubscribe(channels: List[String]) extends PubSubCommand | ||
| case class PUnsubscribe(patterns: List[String]) extends PubSubCommand | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should hide these from the end user. These should be in a dedicated
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved it to its own file but do we need to add package private access modifier?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that we do if it's not part of the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added package private accessor |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| package zio.redis | ||
|
|
||
| import zio.schema.codec.BinaryCodec | ||
| import zio.stream._ | ||
| import zio.{ZIO, ZLayer} | ||
|
|
||
| trait RedisPubSub { | ||
| def execute(command: PubSubCommand): ZIO[BinaryCodec, RedisError, List[Stream[RedisError, PushProtocol]]] | ||
| } | ||
|
|
||
| object RedisPubSub { | ||
| lazy val layer: ZLayer[RedisConfig with BinaryCodec, RedisError.IOError, RedisPubSub] = | ||
| RedisConnectionLive.layer.fresh >>> pubSublayer | ||
|
|
||
| lazy val local: ZLayer[BinaryCodec, RedisError.IOError, RedisPubSub] = | ||
| RedisConnectionLive.default.fresh >>> pubSublayer | ||
|
|
||
| private lazy val pubSublayer: ZLayer[RedisConnection with BinaryCodec, RedisError.IOError, RedisPubSub] = | ||
| ZLayer.scoped( | ||
| ZIO.service[RedisConnection].flatMap(SingleNodeRedisPubSub.create(_)) | ||
| ) | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.