Skip to main content

Using Streams

A proper Context#

At the moment, the context is empty and doesn't do anything, so it makes sense to start filling in some logic.

For this example, I will make additional singleton classes instead of making the context a singleton, this helps when you want a request specific context.

MessageStore#

MessageStore.scala
import scala.jdk.CollectionConverters._import java.util.concurrent.ConcurrentHashMapimport /* path to file */.Message
class MessageStore {  private val store =    new ConcurrentHashMap[String, Message]()      .asScala
  def get: List[Message] = store    .values    .toList
  def add(msg: Message) = store    .putIfAbsent(msg.id, msg)}

Channel#

Channel is just a very simplified version of a PubSub like system to push data into a stream but also access a stream.

The stream here will be made from a publisher using the ActorSource.actorRef function feel free to customize this.

SubPub

You can instead use SubPub which is a fully concurrent lightweight PubSub system using Akka actor and source stream. SubPub also use the same ActorSytem requirement as this package.

More info here

val pubsub = new SubPub()
pubsub.source[Message]("some-topic")// >> Source[Message, NotUsed]
pubsub.publish[Message]("some-topic", Message(...))
Source timeout

It's bad idea to use a single shared Source for multiple subscription operations, since Source may be shutdown on timeout and unsubscribe operation to prevent memory leaks & waste. For more info look into here.

Channel
import akka.stream.scaladsl.{Keep, Source, BroadcastHub}import akka.stream.typed.ActorSourceimport akka.stream.{Materializer, OverflowStrategy}import /* path to file */.Message
class Channel(implicit mat: Materializer) {  private val (actorRef, source) = ActorSource    .actorRef[Message](      completionMatcher = PartialFunction.empty,      failureMatcher = PartialFunction.empty,      bufferSize = 100,      overflowStrategy = OverflowStrategy.dropHead    )    .toMat(BroadcastHub.sink(256))(Keep.both) // Using the BroadcastHub API to allow dynamic sets of consumers    .run()
  val subscribe: Source[Message, NotUsed] = source
  def publish(msg: Message) = actorRef ! msg}

Adding to Context#

Let's now add all of this to the context.

Context.scala
import /* path to file */.Channelimport /* path to file */.MessageStore
case class Context(store: MessageStore, channel: Channel)

Stream based Subscription#

Let's update the schema to use the new context.

Query and Mutation#

Starting with the Query and Mutation

Schema.scala
import ...
object Schema {  // ...
  val QueryType = ObjectType(    "Query",    fields[Context, Unit](      Field("messages", ListType(Message.t),        arguments = limitArg :: Nil,        resolve = { c =>          val limit = min(c.arg(limitArg), 100)          c.ctx.store.get.reverse.take(limit)        }      ),      Field("messageBy", ListType(Message.t),        arguments = authorNameArg :: Nil,        resolve = c => c.ctx.store.get.reverse.filter(_.authorName == c.arg(authorNameArg))      )    )  )
  // ...
  def uuid(): String = UUID.randomUUID().toString
  val MutationType = ObjectType(    "Mutation",    fields[Context, Unit](      Field("send", ListType(Message.t),        arguments = msgArg :: byArg :: Nil,        resolve = { c =>          val msg = Message(uuid(), c.arg(msgArg), c.arg(byArg))          c.ctx.store.add(msg)          // This line here pushes the latest message to the Channel          c.ctx.channel.publish(msg)          msg        }      )    )  )
  // ...}

Subscription Streaming API#

Let's now tackle the subscription type. Sangria provide a special field Field.subs for handling stream based subscription but it require a bit more configuration namely some implicits.

Let's start by adding a global package to store implicits including the ActorSystem and Materializer.

Implicits.scala (feel free to customize)
Implicits.scala
import akka.actor.typed.ActorSystemimport akka.stream.Materializerimport akka.actor.typed.scaladsl.Behaviors
object Implicits {  implicit val actorSystem: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "MainActorSystem")  implicit val materializer: Materializer = Materializer.createMaterializer(system)}

And let's add that to Schema.scala and add the subscription.

Schema.scala
import ...import /* path to file */.Implicits._import sangria.streaming.akkaStreams._
object Schema {  // ...
  val QueryType = ...
  // ...
  val MutationType = ...
  val SubscriptionType = ObjectType(    "Subscription",    fields[Context, Unit](      Field.subs("roomLobby", Message.t,        // Mapping the result into Action type to be understood by Sangria.        resolve = _.ctx.channel.subscribe.map(Action(_))      )    )  )
  // For accessing the schema using `.t`.  val t = Schema(QueryType, Some(MutationType), Some(SubscriptionType))}
Equivalent ObjectType (from the GraphQL SDL)
type Subscription {  roomLobby: Message!}