Using Streams
#
A proper ContextAt the moment, the context is empty and doesn't do anything, so it makes sense to start filling in some logic.
For this example, I will make additional singleton classes instead of making the context a singleton, this helps when you want a request specific context.
#
MessageStoreimport scala.jdk.CollectionConverters._import java.util.concurrent.ConcurrentHashMapimport /* path to file */.Message
class MessageStore { private val store = new ConcurrentHashMap[String, Message]() .asScala
def get: List[Message] = store .values .toList
def add(msg: Message) = store .putIfAbsent(msg.id, msg)}
#
ChannelChannel
is just a very simplified version of a PubSub like system to push data into a stream but also access a stream.
The stream here will be made from a publisher using the ActorSource.actorRef
function feel free to customize this.
SubPub
You can instead use SubPub which is a fully concurrent lightweight PubSub system using Akka actor and source stream. SubPub also use the same ActorSytem requirement as this package.
More info here
val pubsub = new SubPub()
pubsub.source[Message]("some-topic")// >> Source[Message, NotUsed]
pubsub.publish[Message]("some-topic", Message(...))
Source timeout
It's bad idea to use a single shared Source for multiple subscription operations, since Source
may be shutdown on timeout and unsubscribe operation to prevent memory leaks & waste. For more info look into here.
import akka.stream.scaladsl.{Keep, Source, BroadcastHub}import akka.stream.typed.ActorSourceimport akka.stream.{Materializer, OverflowStrategy}import /* path to file */.Message
class Channel(implicit mat: Materializer) { private val (actorRef, source) = ActorSource .actorRef[Message]( completionMatcher = PartialFunction.empty, failureMatcher = PartialFunction.empty, bufferSize = 100, overflowStrategy = OverflowStrategy.dropHead ) .toMat(BroadcastHub.sink(256))(Keep.both) // Using the BroadcastHub API to allow dynamic sets of consumers .run()
val subscribe: Source[Message, NotUsed] = source
def publish(msg: Message) = actorRef ! msg}
#
Adding to ContextLet's now add all of this to the context.
import /* path to file */.Channelimport /* path to file */.MessageStore
case class Context(store: MessageStore, channel: Channel)
#
Stream based SubscriptionLet's update the schema to use the new context.
#
Query and MutationStarting with the Query
and Mutation
import ...
object Schema { // ...
val QueryType = ObjectType( "Query", fields[Context, Unit]( Field("messages", ListType(Message.t), arguments = limitArg :: Nil, resolve = { c => val limit = min(c.arg(limitArg), 100) c.ctx.store.get.reverse.take(limit) } ), Field("messageBy", ListType(Message.t), arguments = authorNameArg :: Nil, resolve = c => c.ctx.store.get.reverse.filter(_.authorName == c.arg(authorNameArg)) ) ) )
// ...
def uuid(): String = UUID.randomUUID().toString
val MutationType = ObjectType( "Mutation", fields[Context, Unit]( Field("send", ListType(Message.t), arguments = msgArg :: byArg :: Nil, resolve = { c => val msg = Message(uuid(), c.arg(msgArg), c.arg(byArg)) c.ctx.store.add(msg) // This line here pushes the latest message to the Channel c.ctx.channel.publish(msg) msg } ) ) )
// ...}
#
Subscription Streaming APILet's now tackle the subscription type. Sangria provide a special field Field.subs
for handling stream based subscription but it require a bit more configuration namely some implicits.
Let's start by adding a global package to store implicits including the ActorSystem
and Materializer
.
Implicits.scala (feel free to customize)
import akka.actor.typed.ActorSystemimport akka.stream.Materializerimport akka.actor.typed.scaladsl.Behaviors
object Implicits { implicit val actorSystem: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, "MainActorSystem") implicit val materializer: Materializer = Materializer.createMaterializer(system)}
And let's add that to Schema.scala
and add the subscription.
import ...import /* path to file */.Implicits._import sangria.streaming.akkaStreams._
object Schema { // ...
val QueryType = ...
// ...
val MutationType = ...
val SubscriptionType = ObjectType( "Subscription", fields[Context, Unit]( Field.subs("roomLobby", Message.t, // Mapping the result into Action type to be understood by Sangria. resolve = _.ctx.channel.subscribe.map(Action(_)) ) ) )
// For accessing the schema using `.t`. val t = Schema(QueryType, Some(MutationType), Some(SubscriptionType))}
Equivalent ObjectType (from the GraphQL SDL)
type Subscription { roomLobby: Message!}