Skip to main content

Source for GraphQL

OverLayer used stream based subscription for akka-streams. Akka streams have the Source data structure to represent a stream of output.

There are not much limitation on how to make the Source, but there are some notable limitations and behaviours to be aware of

Limitations#

Given that this is a akka centered implementation, OverLayer does not support other streaming option like monix, rxscala, and zio streams. Beside that obvious limitation, there are some other ones.

Jedi / Materialized value of NotUsed#

Sangria require that the source given must have a jedi / materialized value of NotUsed. Some of the built-in Source operators does not return a Source with NotUsed as the materialize value, so you will have to convert it beforehand.

Using .alsoToMat and Keep.right
Field.subs("ticking", IntType,  resolve = { c =>    val tickingSource = Source      .tick(Duration.Zero, 250.milliseconds, 45)        // Source[Int, Cancellable]      .alsoToMat(Sink.onComplete(_ => ...))(Keep.right) // Source[Int, NotUsed]
    tickingSource.map(Action(_))  })

Action output type#

Sangria also require the output type to be Action[T] if the subscription type is of T. This can be easily done with using .map or a custom flow.

Convert using .map
Field.subs("notificationAdded", Notification.t,  resolve = { _ =>    Source      .empty[Notification] // Source[Notification, NotUsed]      .map(Action(_))      // Source[Action[Notification], NotUsed]  })

Behaviors#

Timeouts and Shutdowns#

In many scenarios, Subscriptions are ones that await for value to be pushed from mutations. In this scenario, Source are likely to run forever until explicit stopped by the server or from a request by the client.

A non self stopping stream can cause problems notably memory leaks. To prevent this, subscriptions stream will be shutdown in a timeout or unsubscribe to prevent callbacks and any Sink to be ran.

Given that, you should be aware that the original Graph may be shutdowned and avoid using a singleton source that are re-used on multiple subscriptions.

Fan out to multiple consumers#

In a scenario of a GraphQL Subscriptions, the amount of consumer will be dynamic since it depends on the amount of subscriptions request coming in.

Source by default does not have mechanism for fanning out to multiple consumers and treat all the Sink and Flow as one Graph.

Like mentioned above already, given a timeout or an unsubscribe, the original Graph will be shutdowned not allowing new request to take the incoming data stream anymore.

Solutions#

TLDR; Avoid a single Source that you share across multiple clients' subscriptions, should always compute a new source for each subscriptions, or use a technique that allow to fan out / broadcast to multiple downstream consumers.

Avoid / Bad
object Singleton {  val source = Source.repeat("Hello!!") // Source will be shutdowned after unsubscribe and cannot be re-used}
object Schema {  val t = Schema(    query = Query.t,    subscription = Some(      ObjectType("Subscription"        fields[Ctx, Unit](          // Single source will be shutdown when timeout or unsubscribe, so new subscription request will fail.          Field.subs("helloSaid", StringType, resolve = _ => Singleton.source.map(Action(_)))        )      )    )  )}

Compute on demand#

By computing on each subscription request, you can make sure that the other streams will not be abruptly shutdowned once one of the subscriber stops subscribing and stops the Graph.

However, posses a challenge for connecting stream to other part of application

Make into function, to compute on demand
Possible Fix #1
object Singleton {  def source: Source[String, NotUsed] = Source.repeat("Hello!!")}

Broadcast Hub#

An alternative is to use the BroadcastHub, which abstract away the process of allowing the stream to be fan out into multiple other streams, emitting each values to all the downstream consumers. This will allow the consumer Source to be shutdown to clean memory space while still having the publisher stream alive and re-used.

Make into an upstream publisher that can fan out to multiple downstream consumers
Possible Fix #2
object Singleton {  val source = Source    .repeat("Hello!!")    .toMat(BroadcastHub.sink(256))(Keep.right)    .run()}

Using SubPub#

If you find this portion complicated, highly recommend checking out SubPub.

Make into an upstream publisher that can fan out to multiple downstream consumers
SubPub's API
import io.github.dexclaimation.subpub.SubPub
object Schema {  val SubscriptionType = ObjectType("Subscription"    fields[SubPub, Unit](      Field.subs("helloSaid", StringType,        resolve = _.ctx          .source[String]("helloSaid")          .map(Action(_))      )    )  )
  val t = Schema(    query = Query.t,    subscription = Some(SubscriptionType)  )}