I have some code that is similar to the following:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val input = builder.add(Balance[Int](1)) //Question 1) how to get rid of this input
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val balance = builder.add(Balance[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left right
}))
val flow3 = Flow[Int].map(_*2)
input ~> buffer ~> balance.in
balance.out(0) ~> flow1 ~> zip.in0
balance.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3
FlowShape(input.in, flow3) //Question 2) how to make an outlet here
})
}
Notice that I had to add a Balance called input, because I cannot retrieve an Inlet from the first Buffer of the FlowShape I want to create. Is there any other simpler way to solve this? Creating a Balance with 1 Outlet seems to be the wrong way to do this.
My second question is similar. I cannot retrieve an Outlet from flow3. The only way I know to solve this problem is to create yet another Balance, and expose its Outlet as the Outlet of the entire FlowShape. Any better way to solve this problem?
CodePudding user response:
A Balance is a fan-out shape that emits to the first available output. Considering you are zipping the flows in the next step, what you need is a Broadcast. It will fan-out to all outputs when all of them are available.
Also, the builder can add any of the shapes that are a Graph, this includes Flow. You don't have to use a custom shape for that.
The updated code:
object Test extends App {
val SomeComplicatedFlow: Flow[Int, Int, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val buffer = Flow[Int].buffer(12, OverflowStrategy.backpressure)
val input = builder.add(buffer)
val broadcast = builder.add(Broadcast[Int](2))
val flow1 = Flow[Int].map(_*2)
val flow2 = Flow[Int].map(_*2)
val zip = builder.add(ZipWith[Int, Int, Int]((left, right) => {
left right
}))
val flow3 = builder.add(Flow[Int].map(_*2))
input ~> broadcast.in
broadcast.out(0) ~> flow1 ~> zip.in0
broadcast.out(1) ~> flow2 ~> zip.in1
zip.out ~> flow3.in
FlowShape(input.in, flow3.out)
})
}
