Source for GraphQL
OverLayer used stream based subscription for akka-streams. Akka streams have the Source
data structure to represent a stream of output.
There are not much limitation on how to make the Source
, but there are some notable limitations and behaviours to be aware of
#
LimitationsGiven that this is a akka centered implementation, OverLayer does not support other streaming option like monix
, rxscala
, and zio
streams. Beside that obvious limitation, there are some other ones.
NotUsed
#
Jedi / Materialized value of Sangria require that the source given must have a jedi / materialized value of NotUsed
. Some of the built-in Source
operators does not return a Source
with NotUsed
as the materialize value, so you will have to convert it beforehand.
Using .alsoToMat
and Keep.right
Field.subs("ticking", IntType, resolve = { c => val tickingSource = Source .tick(Duration.Zero, 250.milliseconds, 45) // Source[Int, Cancellable] .alsoToMat(Sink.onComplete(_ => ...))(Keep.right) // Source[Int, NotUsed]
tickingSource.map(Action(_)) })
#
Action output typeSangria also require the output type to be Action[T]
if the subscription type is of T
. This can be easily done with using .map
or a custom flow.
Convert using .map
Field.subs("notificationAdded", Notification.t, resolve = { _ => Source .empty[Notification] // Source[Notification, NotUsed] .map(Action(_)) // Source[Action[Notification], NotUsed] })
#
Behaviors#
Timeouts and ShutdownsIn many scenarios, Subscriptions are ones that await for value to be pushed from mutations. In this scenario, Source
are likely to run forever until explicit stopped by the server or from a request by the client.
A non self stopping stream can cause problems notably memory leaks. To prevent this, subscriptions stream will be shutdown in a timeout or unsubscribe to prevent callbacks and any Sink
to be ran.
Given that, you should be aware that the original Graph
may be shutdowned and avoid using a singleton source that are re-used on multiple subscriptions.
#
Fan out to multiple consumersIn a scenario of a GraphQL Subscriptions, the amount of consumer will be dynamic since it depends on the amount of subscriptions request coming in.
Source
by default does not have mechanism for fanning out to multiple consumers and treat all the Sink
and Flow
as one Graph
.
Like mentioned above already, given a timeout or an unsubscribe, the original Graph
will be shutdowned not allowing new request to take the incoming data stream anymore.
#
SolutionsTLDR; Avoid a single Source
that you share across multiple clients' subscriptions, should always compute a new source for each subscriptions, or use a technique that allow to fan out / broadcast to multiple downstream consumers.
object Singleton { val source = Source.repeat("Hello!!") // Source will be shutdowned after unsubscribe and cannot be re-used}
object Schema { val t = Schema( query = Query.t, subscription = Some( ObjectType("Subscription" fields[Ctx, Unit]( // Single source will be shutdown when timeout or unsubscribe, so new subscription request will fail. Field.subs("helloSaid", StringType, resolve = _ => Singleton.source.map(Action(_))) ) ) ) )}
#
Compute on demandBy computing on each subscription request, you can make sure that the other streams will not be abruptly shutdowned once one of the subscriber stops subscribing and stops the Graph
.
However, posses a challenge for connecting stream to other part of application
Make into function, to compute on demand
object Singleton { def source: Source[String, NotUsed] = Source.repeat("Hello!!")}
#
Broadcast HubAn alternative is to use the BroadcastHub, which abstract away the process of allowing the stream to be fan out into multiple other streams, emitting each values to all the downstream consumers. This will allow the consumer Source
to be shutdown to clean memory space while still having the publisher stream alive and re-used.
Make into an upstream publisher that can fan out to multiple downstream consumers
object Singleton { val source = Source .repeat("Hello!!") .toMat(BroadcastHub.sink(256))(Keep.right) .run()}
#
Using SubPubIf you find this portion complicated, highly recommend checking out SubPub.
Make into an upstream publisher that can fan out to multiple downstream consumers
import io.github.dexclaimation.subpub.SubPub
object Schema { val SubscriptionType = ObjectType("Subscription" fields[SubPub, Unit]( Field.subs("helloSaid", StringType, resolve = _.ctx .source[String]("helloSaid") .map(Action(_)) ) ) )
val t = Schema( query = Query.t, subscription = Some(SubscriptionType) )}