Composable Concurrency with ZIO STM

Posted on April 17, 2020 by contrun

引子

并发问题

并发很难。为什么不找一些聪明人, once and for all 解决掉这个问题呢?

一个例子

def transfer(sender: Account, receiver: Account, much: Int) {
   while (sender.balance < much) {}
   receiver.balance.update(_ + much)
   sender.balance.update(_ - much)
}

回到 good old days。

def transfer(sender: Account, receiver: Account, much: Int) {
   while (sender.balance < much) {}
   sender.lock()
   sender.balance.update(_ - much)
   sender.unlock()
   receiver.lock()
   receiver.balance.update(_ + much)
   receiver.unlock()
}

beginTransaction;
  accountB += 100;
  accountA -= 100;
commitTransaction;

软件事物内存

while(!commitTransaction()) {
  sudo commitTransaction()
}

一个新的魔术

def transfer(sender: TRef[Int], receiver: TRef[Int], much: Int): UIO[Unit] =
  STM.atomically {
    for {
      balance <- sender.get
      _ <- STM.check(balance >= much) // block until
      _ <- receiver.update(_ + much)
      _ <- sender.update(_ - much)
    } yield ()
  }
def transfer(sender: Account, receiver: Account, much: Int) {
   while (sender.balance < much) {}
   receiver.balance.update(_ + much)
   sender.balance.update(_ - much)
}

魔鬼隐藏在细节中

values = getValues()
while(!commit(vaules)) {
  rollback()
  maybeBackoff()
  vaules = getValues()
}

ZIO/STM 介绍

ZIO 介绍

val clockLayer: ZLayer[Any, Nothing, Clock] = ???
val zio: ZIO[Clock with Random, Nothing, Unit] = ???
val zio2 = zio.provideSomeLayer[Random](clockLayer)

a match in the heaven

STM.atomically(queue.take).flatMap(x => putStrLn(x)).forever.timeout(5.seconds)
def transfer(sender: Account, receiver: Account, much: Int) {
   while (sender.balance < much) {}
   sender.lock()
   sender.balance.update(_ - much)
   sender.unlock()
   receiver.lock()
   receiver.balance.update(_ + much)
   receiver.unlock()
}
def transfer(sender: TRef[Int], receiver: TRef[Int], much: Int): UIO[Unit] =
  STM.atomically {
    for {
      balance <- sender.get
      _ <- STM.check(balance >= much) // block until
      _ <- receiver.update(_ + much)
      _ <- sender.update(_ - much)
    } yield ()
  }

Yet another certainly harmful monad tutorial

for {
  balance <- sender.get // USTM[Int]
  _ <- STM.check(balance >= much) // USTM[Unit]
} yield ()

Dining philosophers

val leftFork = forks(n)
val rightFork = forks((n + 1) % forks.length)
for {
  _ <- leftFork.acquire()
  _ <- rightFork.acquire()
  _ <- queue.offer(s"Philosopher $n haz forks")
  _ <- rightFork.release()
  _ <- leftFork.release()
} yield ()

一些实现细节

STM vs IO

STM.atomically(
  leftFork.withPermit(rightFork.withPermit(queue.offer(s"Philosopher $n haz forks")))
)

Compare and Set

def getAndSet(a: A): A {
  while (true) {
    current = value.get
    loop = !value.compareAndSet(current, a)
    if (!loop) return current
  }
}

回滚

验证并 commit

val leftFork = forks(n)
val rightFork = forks((n + 1) % forks.length)
for {
  _ <- leftFork.acquire()
  _ <- rightFork.acquire()
  _ <- queue.offer(s"Philosopher $n haz forks")
  _ <- rightFork.release()
  _ <- leftFork.release()
} yield ()

Coarse grain concurrency control

Fine grain concurrency control

谢谢

一点小小的搬运工作,谢谢大家。

参考资料