Akka stream

  • Published on
    11-Jan-2017

  • View
    3.346

  • Download
    2

Embed Size (px)

Transcript

  • copyright Fringe81 Co.,Ltd.

    Akka Stream

    @mtoyoshi

  • copyright Fringe81 Co.,Ltd.

    AmazonKinesis

    1

    2

    3

    4

  • copyright Fringe81 Co.,Ltd.

    AmazonKinesis

    1

    2

    3

    4

    Akka Actor

  • copyright Fringe81 Co.,Ltd.

    Akka Actor

    OutOfMemory

  • copyright Fringe81 Co.,Ltd.

    Akka Stream?

    Typesafe2015.071.0

    Akka Actor

    Akka Stream

    Akka HTTP

  • copyright Fringe81 Co.,Ltd.

    Migration Guide 1.0 to 2.0https://github.com/drewhk/akka/pull/30/files

    https://github.com/drewhk/akka/pull/30/fileshttps://github.com/drewhk/akka/pull/30/files

  • copyright Fringe81 Co.,Ltd.

    RxJava

    Reactive Streams(JEP266)

    Vert.xAkka

    Stream

    Slick3 mongoDB

    a standard for asynchronous stream processing

    with non-blocking back pressure

    OSS

  • copyright Fringe81 Co.,Ltd.

    ReactiveStreamsDSL

  • copyright Fringe81 Co.,Ltd.

    API(1.0->2.0)

  • copyright Fringe81 Co.,Ltd.

    OverviewAkka Stream

  • copyright Fringe81 Co.,Ltd.

    RunnableGraph Source Sink

  • copyright Fringe81 Co.,Ltd.

    RunnableGraph Source Sink

  • copyright Fringe81 Co.,Ltd.

    val source = Source(1 to 10)val filter = Flow[Int].filter(_ % 2 == 0)val map = Flow[Int].map(_ * 2)val sink = Sink.foreach[Int](println)

    val runnableGraph = source.via(filter).via(map).to(sink)

    runnableGraph.run()

    RunnableGraph

  • copyright Fringe81 Co.,Ltd.

    Source(1 to 10) .filter(_ % 2 == 0) .map(_ * 2) .runForeach(println)

    RunnableGraph

    val source = Source(1 to 10)val filter = Flow[Int].filter(_ % 2 == 0)val map = Flow[Int].map(_ * 2)val sink = Sink.foreach[Int](println)

    val runnableGraph =source.via(filter).via(map).to(sink)

    runnableGraph.run()

  • copyright Fringe81 Co.,Ltd.

    Source[Int] - Flow[Int,String] - Sink[String]

    Source[Int] - Flow[String,Long] Sink[String]

    Function1InOut

  • copyright Fringe81 Co.,Ltd.

    implicit val system = ActorSystem()implicit val materializer = ActorMaterializer()

    val source = Source(1 to 10)val filter = Flow[Int].filter(_ % 2 == 0)val map = Flow[Int].map(_ * 2)val sink = Sink.foreach[Int](println)

    val runnableGraph = source.via(filter).via(map).to(sink)

    runnableGraph.run()

    materializer

    WHAT

    HOW

  • copyright Fringe81 Co.,Ltd.

    map filter collect take / takeWhile drop / dropWhile flatten fold scan grouped / groupBy recover

  • copyright Fringe81 Co.,Ltd.

    val future: Future[List[Int]] = ...

    val src: Source[List[Int], Unit] = Source(future)

    src.mapConcat(identity).map(_ * 2)

    def mapConcat[T](f: Out => Iterable[T])

    Source[List[Int]]]List[Int]mapConcatListInt

  • copyright Fringe81 Co.,Ltd.

    zipWithIndex...

    case class ZipWithIndex[T]() extends PushStage[T, (T, Int)] { var i = -1

    override def onPush(elem: T, ctx: Context[(T, Int)]): SyncDirective = { i += 1 ctx.push((elem, i)) }}

    Source(List("A", "B", "C")) .transform(() => ZipWithIndex()) .runForeach(println) // (A,0) (B,1) (C, 2)

  • copyright Fringe81 Co.,Ltd.

    Source#apply

    (?)IterableIteratorFutureFile

    SynchronousFileSource(new java.io.File("..."))Source[ByteString]

    Akka2.4Java7AsynchronousFileChannelNIO API

  • copyright Fringe81 Co.,Ltd.

    IterableSource// Compile Errorval src = Source(Seq(1,2,3))

    // Compile Successval src = Source(List(1, 2, 3))

  • copyright Fringe81 Co.,Ltd.

    Iterable collection.Immutable.Iterable// Compile Errorval src = Source(Seq(1,2,3))

    // Compile Successval src = Source(List(1, 2, 3))

    Seq collection.Seqcollection.Iterabletype Seq[+A] = scala.collection.Seq[A]val Seq = scala.collection.Seq scala/package.scala

  • copyright Fringe81 Co.,Ltd.

    Source#apply

    Source(initialDelay=1.second, interval=100.millis, tick="msg")

    100msmsgString

    Tcp().bind("127.0.0.1", 8888)

    Source

    TCP connectionByteString

    Source(Props[MyActor])

    Actor

  • copyright Fringe81 Co.,Ltd.

    val src: Source[String, Cancellable] = Source(initialDelay=0.second, interval=100.millis, tick="msg")

    val sink: Sink[String, Future[Int]] = Sink.fold[Int, String](0){ case (sum, _) => sum + 1 }

    src sink

    100ms"msg" msgfold

    Cancellable Future[Int]

    Materialized Value

  • copyright Fringe81 Co.,Ltd.

    val rg1: RunnableGraph[Cancellable] = src.to(sink)

    val rg2: RunnableGraph[Future[Int]] = src.toMat(sink)(Keep.right)

    val rg3: RunnableGraph[(Cancellable, Future[Int])] = src.toMat(sink)(Keep.both)

    val (cancellable, futureInt) = rg3.run()

    src.toMat(sink)(Keep.left)

  • copyright Fringe81 Co.,Ltd.

    val src: Source[String, Cancellable] = Source(initialDelay = 0.second, interval = 100.millis, tick = "msg")val sink: Sink[String, Future[Int]] = Sink.fold[Int, String](0){ case (sum, _) => sum + 1 }

    val rg: RunnableGraph[(Cancellable, Future[Int])] = src.toMat(sink)(Keep.both)

    val (cancellable, futureInt) = rg.run()

    futureInt.foreach(println)

    Thread.sleep(1000 * 5)cancellable.cancel()

  • copyright Fringe81 Co.,Ltd.

    PublisherReactive Stream

    Source#apply

  • copyright Fringe81 Co.,Ltd.

    Scala Collection

  • copyright Fringe81 Co.,Ltd.

    (1 to 3) .map{ i => println(s"A: $i"); i } .map{ i => println(s"B: $i"); i } .foreach(i => println(s"C $i"))

    A: 1A: 2A: 3B: 1B: 2B: 3C: 1C: 2C: 3

    Scala Collection

  • copyright Fringe81 Co.,Ltd.

    Source(1 to 3) .map{ i => println(s"A: $i"); i } .map{ i => println(s"B: $i"); i } .runForeach(i => println(s"C: $i"))

    A: 1A: 2B: 1A: 3B: 2C: 1B: 3C: 2C: 3

    Akka Stream

  • copyright Fringe81 Co.,Ltd.

    source map:A map:B sink:C

    1

    123

    2

    3

    1

    2

    3

    1

    2

    3

  • copyright Fringe81 Co.,Ltd.

    source map:A map:B sink:C

    1

    123

    2

    3

    1

    2

    3

    1

    2

    3

  • copyright Fringe81 Co.,Ltd.

    Backpressure

  • copyright Fringe81 Co.,Ltd.

    Backpressure?

  • copyright Fringe81 Co.,Ltd.

    PushModel

    Pull Model

    dynamic Push/Pull Modeln initial-buffer-size(4), max-buffer-size(16)

  • copyright Fringe81 Co.,Ltd.

    implicit val system = ActorSystem()

    implicit val materializer = ActorMaterializer()

  • copyright Fringe81 Co.,Ltd.

    // implicit val system = ActorSystem()

    // implicit val materializer = ActorMaterializer()

    akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 1

    akka.stream.materializer { initial-input-buffer-size = 1 max-input-buffer-size = 1}

  • copyright Fringe81 Co.,Ltd.

    mapC

    source mapA mapB sinkmapC

    heavy!

  • copyright Fringe81 Co.,Ltd.

    source mapA mapB mapC

    11234567...

    2

    3

    1

    1

    2

    sink

    1

    2

    2

    = 1 = 1

    3

    3

    4

    3

  • copyright Fringe81 Co.,Ltd.

    source mapA mapB mapC

    11234567...

    2

    3

    1

    sink

    1

    2

    2

    = 1 = 2

    3

    2

    4

    1

    2

    4

    5

    6

  • copyright Fringe81 Co.,Ltd.

    source mapA mapB mapC

    11234567...

    2

    3

    1

    sink

    1

    = 2 = 2

    2

    4

    1

    2

    4

    5

    6

    3

  • copyright Fringe81 Co.,Ltd.

    source mapA mapB mapC

    11234567...

    2

    3

    1

    sink

    1

    = 2 = 2

    2

    4

    1

    2

    4

    5

    6

    3

    = 2

  • copyright Fringe81 Co.,Ltd.

    source mapA mapB mapC

    11234567...

    2

    3

    1

    sink

    1

    = 2 = 2

    2

    4

    1

    2

    4

    5

    6

    3

    mapC

  • copyright Fringe81 Co.,Ltd.

    source mapA mapB mapC

    11234567...

    2

    3

    1

    sink

    1

    = 2 = 2

    2

    4

    1

    2

    4

    5

    6

    3

    = 2

    mapC

  • copyright Fringe81 Co.,Ltd.

    /

    mapC

  • copyright Fringe81 Co.,Ltd.

    1-1buffer

  • copyright Fringe81 Co.,Ltd.

    buffer

    2

    buffer4

    buffer

    val buffer = Flow[Int].buffer(4, OverflowStrategy.backpressure)

    ... mapB.via(buffer).via(mapC) ...

  • copyright Fringe81 Co.,Ltd.

    Flow[Int].buffer(4, OverflowStrategy.dropNew)

    BP

    buffer

  • copyright Fringe81 Co.,Ltd.

    1-2conflate

  • copyright Fringe81 Co.,Ltd.

    def conflate[S](seed: Out => S)(aggregate: (S, Out) => S)

    ... .conflate(List(_)){ (elems, elem) => elem :: elems }...

    ... .conflate(identity){ (e, _) => e }...

    BPaggregate

    List[T]

    T

  • copyright Fringe81 Co.,Ltd.

    2-1mapAsync

    mapC

  • copyright Fringe81 Co.,Ltd.

    val mapC = Flow[Int].mapAsync(4) { n => Future { } }

    mapC

    12

    34

  • copyright Fringe81 Co.,Ltd.

    2-2Fan-Out

    parallel

    mapC

  • copyright Fringe81 Co.,Ltd.

    BalanceNFan-Out

    MergeNFan-In

    balance mergemapCmapC

  • copyright Fringe81 Co.,Ltd.

    mapC

    FlowmapC

    balance mergemapCmapC

    balance mergemapCmapC

    mapC

  • copyright Fringe81 Co.,Ltd.