Akka Streams [4]: Flow Control

分享 未结
1 1 1 86
小编 2019-12-24发布
收藏 点赞
来源: 刘颖

Akka Streams 提供了丰富的流量控制功能来处理各种生产者和消费者的速度不匹配情况。流量控制包括了 backpressure(回压),缓存,阀门,以及溢出策略等。本文首先介绍了 async boundary(异步边界)的概念,然后给出了各种流量控制的用例。

1 Async Boundar

Akka Streams 的一个流处理中的 Operators 可以在运行在不同的线程甚至不同的 JVM 里面,一个线程就是一个 async boundary(异步边界)。从 Akka Streams 2.5 开始,当组合 operators 时,如果不调用 async() 方法,这些 operators 会运行在同一个 async boundary,也就是同一个线程里面。这种优化 Akka Streams 称其为 Fusion (聚合)。Fusion 模式时上下游 Operators 之间直接用共享内存访问 elements,每一个 element 处理完成后再处理下一个 element,不要流量控制机制。比如下面的例子:

object HelloWorld extends App {
  implicit val system = ActorSystem("testStreams")
  implicit val ec = scala.concurrent.ExecutionContext.global
  val dataFormat = DateTimeFormatter.ofPattern("hh:mm:ss:SSS")

  val source = Source(1 to 3)

  private def getTime() = {
    LocalTime.now().format(dataFormat)
  }

  private def flowFactory(name: String) = {
    Flow[Int].map(element => {
      Thread.sleep(1000)
      val threadName = Thread.currentThread().getName()
      val now = getTime()
      println(s"Flow-${name}: ${threadName} [${element}] ${now}")
      element
    })
  }

  val flowA = flowFactory("A")
  val flowB = flowFactory("B")
  val flowC = flowFactory("C")

  println(s"Start at ${getTime()}")
  val result = source
    .via(flowA)
    .via(flowB)
    .via(flowC)
    .runWith(Sink.ignore)

  result.onComplete(_ => {
    println(s"End at ${getTime()}")
    system.terminate()
  })
}

其输出为:

Start at 07:20:47:823
Flow-A: testStreams-akka.actor.default-dispatcher-7 [1] 07:20:48:847
Flow-B: testStreams-akka.actor.default-dispatcher-7 [1] 07:20:49:852
Flow-C: testStreams-akka.actor.default-dispatcher-7 [1] 07:20:50:855
Flow-A: testStreams-akka.actor.default-dispatcher-7 [2] 07:20:51:858
Flow-B: testStreams-akka.actor.default-dispatcher-7 [2] 07:20:52:862
Flow-C: testStreams-akka.actor.default-dispatcher-7 [2] 07:20:53:862
Flow-A: testStreams-akka.actor.default-dispatcher-7 [3] 07:20:54:864
Flow-B: testStreams-akka.actor.default-dispatcher-7 [3] 07:20:55:865
Flow-C: testStreams-akka.actor.default-dispatcher-7 [3] 07:20:56:869
End at 07:20:56:871

上例中用 Thread.sleep 来模拟长时间的处理,生产代码中应该尽可能避免这种 blocking call 阻塞调用。可以看到所有 Operators 运行在同一个线程,处理完一个 element 再处理下一个,没有流量控制的必要。对于程序中模拟的长时间处理,由于只用到一个线程,一个接一个串行处理每个元素。处理一个需要 3 秒,总耗时 9 秒。如果各个 Operator 运行在不同的异步边界,则各个 Operator 异步并发在不同的线程执行。创建异步边界非常简单,只需要在 Operator 后面调用 async 即可。如下面程序所示:

val result = source
  .via(flowA)
  .async
  .via(flowB)
  .async
  .via(flowC)
  .async
  .runWith(Sink.ignore)

此时的运行结果如下:

Start at 07:24:42:062
Flow-A: testStreams-akka.actor.default-dispatcher-9 [1] 07:24:43:091
Flow-A: testStreams-akka.actor.default-dispatcher-9 [2] 07:24:44:097
Flow-B: testStreams-akka.actor.default-dispatcher-7 [1] 07:24:44:097
Flow-B: testStreams-akka.actor.default-dispatcher-7 [2] 07:24:45:101
Flow-C: testStreams-akka.actor.default-dispatcher-8 [1] 07:24:45:101
Flow-A: testStreams-akka.actor.default-dispatcher-9 [3] 07:24:45:101
Flow-C: testStreams-akka.actor.default-dispatcher-8 [2] 07:24:46:101
Flow-B: testStreams-akka.actor.default-dispatcher-6 [3] 07:24:46:102
Flow-C: testStreams-akka.actor.default-dispatcher-7 [3] 07:24:47:105
End at 07:24:47:106

每一个 async 调用创建一个新的异步边界,底层的实现是运行在不同的 Akka Actor 上。可以看到每个 Operator 运行在不同的线程并发处理收到的数据。总耗时从大约 9 秒降低到大约 5 秒。值得说明的是,如同其它的多线程编程,需要仔细衡量异步边界切换的开销和任务的性质决定是否采用异步并发。比如上面例子中,当去掉 Thread.sleep 和打印,对一百万 elements,在我的笔记本电脑上,Fusion 单线程费时 0.22 秒而多线程版本费时约 2 秒。

重要的是,对 Akka Streams 而言,当不同 Operators 在不同的异步边界时,Operator 之间的连接会引入 buffer 和 backpressure。

2 Buffer 和 Backpressure

Akka Streams 中每个异步边界运行在不同的 Akka Actor 上。二个异步边界的连接在下游一侧有一个缺省值为 16 的 buffer。这个值是 element 的个数,大的 element 数据类型会占用更多内存。可以通过参数设置改变这个值 akka.stream.materializer.max-input-buffer-size = 16

在开始运行时,上游的 Operator 会等待下游的 Operator 给出明确的 Pull 特定数目的 element 请求时才 Push 推送相应数目的 elements。初始时下游会请求其缓存大小的 elements,只有当缓存有空余时(比如空出一半了)才会向上游发出新的请求。这种回压控制也是通过异步消息方式进行的。


回帖
  • 2019-12-24

    Akka Streams也是akka其他模块的基础,多搞些文章或者翻译文档不错。

    0