Skip to main content

Using SubPub (Optional)

Obviously, implementing your own Source can be annoying especially for ones that you need to create dynamically and have each of them fan out to multiple consumers.

A package called SubPub already handles this problem (I made this as well).

SubPub#

SubPub uses ActorSystem that have the SpawnProtocol behavior which is similar to this package.

SubPub can easily make Source[T, NotUsed] dynamically, fully concurrent safe, and able to fan out to dynamic sets of downstream consumers which is very useful for GraphQL Subscriptions.

Simple example usage
Basic
import io.github.dexclaimation.subpub.SubPub
val sp = new SubPub()
val res0 = sp.source[Int]("counter-topic") // <- Non-blocking, Async, Thread-safe// >> akka.stream.Source[Int, NotUsed]
res0.runForEach(x => println(s"res0: $x"))
sp.publish[Int]("counter-topic", 10)// >> res0: 10
sp.publish[Int]("counter-topic", 9)// >> res0: 9
sp.publish[Int]("counter-topic", 8)// >> res0: 8

Installing SubPub#

build.sbt
libraryDependencies ++= {  // ...
  Seq(    // ..    "io.github.d-exclaimation" %% "over-layer" % latestVersion,    "io.github.d-exclaimation" %% "subpub" % ???  )}

Updating Schema and Context#

Context#

Context.scala
import io.github.dexclaimation.subpub.SubPubimport /* path to file */.MessageStore
case class Context(  store: MessageStore,  pubsub: SubPub)

Mutation#

Updating the mutation object type to use SubPub instead.

Schema.scala
// ...val MutationType = ObjectType(  "Mutation",  fields[Context, Unit](    Field("send", ListType(Message.t),      arguments = msgArg :: byArg :: Nil,      resolve = { c =>        val msg = Message(uuid(), c.arg(msgArg), c.arg(byArg))        c.ctx.store.add(msg)        c.pubsub.publish[Message]("chat", msg)        msg      }    )  ))// ...

Subscription#

Updating the subscription to use the stream from the SubPub.

// ...val SubscriptionType = ObjectType(  "Mutation",  fields[Context, Unit](    Field.subs("roomLobby", Message.t,      // You can see that you can specific a topic which would be useful for multiple rooms.      resolve = _.ctx.pubsub.source[Message]("chat").map(Action(_))    )  ))// ...

Updating Server#

Main.scala
import ...import io.github.dexclaimation.subpub.SubPub
object Main extends SprayJsonSupport {  // ...
  val transport = OverTransportLayer(Schema.t, ())
  val pubsub = SubPub()
  val route: Route = {    (post & path("graphql") & entity(as[JsValue])) { req =>      graphQLEndpoint(req, Context(store, pubsub))    } ~ path("graphql" / "websocket") {      transport.applyMiddleware(Context(store, pubsub))    } ~ path(Remaining) { _ =>      getFromResource("assets/playground.html")    }  }
  // ...}

That's about it, simple swap and now you can have multiple rooms as suppose to a single lobby, but I'll leave that as an exercise for the readers.