Using SubPub (Optional)
Obviously, implementing your own Source
can be annoying especially for ones that you need to create dynamically and have each of them fan out to multiple consumers.
A package called SubPub already handles this problem (I made this as well).
#
SubPubSubPub uses ActorSystem
that have the SpawnProtocol
behavior which is similar to this package.
SubPub can easily make Source[T, NotUsed]
dynamically, fully concurrent safe, and able to fan out to dynamic sets of downstream consumers which is very useful for GraphQL Subscriptions.
Simple example usage
import io.github.dexclaimation.subpub.SubPub
val sp = new SubPub()
val res0 = sp.source[Int]("counter-topic") // <- Non-blocking, Async, Thread-safe// >> akka.stream.Source[Int, NotUsed]
res0.runForEach(x => println(s"res0: $x"))
sp.publish[Int]("counter-topic", 10)// >> res0: 10
sp.publish[Int]("counter-topic", 9)// >> res0: 9
sp.publish[Int]("counter-topic", 8)// >> res0: 8
#
Installing SubPublibraryDependencies ++= { // ...
Seq( // .. "io.github.d-exclaimation" %% "over-layer" % latestVersion, "io.github.d-exclaimation" %% "subpub" % ??? )}
#
Updating Schema and Context#
Contextimport io.github.dexclaimation.subpub.SubPubimport /* path to file */.MessageStore
case class Context( store: MessageStore, pubsub: SubPub)
#
MutationUpdating the mutation object type to use SubPub instead.
// ...val MutationType = ObjectType( "Mutation", fields[Context, Unit]( Field("send", ListType(Message.t), arguments = msgArg :: byArg :: Nil, resolve = { c => val msg = Message(uuid(), c.arg(msgArg), c.arg(byArg)) c.ctx.store.add(msg) c.pubsub.publish[Message]("chat", msg) msg } ) ))// ...
#
SubscriptionUpdating the subscription to use the stream from the SubPub.
// ...val SubscriptionType = ObjectType( "Mutation", fields[Context, Unit]( Field.subs("roomLobby", Message.t, // You can see that you can specific a topic which would be useful for multiple rooms. resolve = _.ctx.pubsub.source[Message]("chat").map(Action(_)) ) ))// ...
#
Updating Serverimport ...import io.github.dexclaimation.subpub.SubPub
object Main extends SprayJsonSupport { // ...
val transport = OverTransportLayer(Schema.t, ())
val pubsub = SubPub()
val route: Route = { (post & path("graphql") & entity(as[JsValue])) { req => graphQLEndpoint(req, Context(store, pubsub)) } ~ path("graphql" / "websocket") { transport.applyMiddleware(Context(store, pubsub)) } ~ path(Remaining) { _ => getFromResource("assets/playground.html") } }
// ...}
That's about it, simple swap and now you can have multiple rooms as suppose to a single lobby, but I'll leave that as an exercise for the readers.