A small benchmark util for Scala continued

Extending the benchmark

Note: This is a follow-up post of a previous post. You might want to read the first to get a simpler view of the extensions added here.

One improvement I wanted to add to the previous benchmark util was monitors which generate statistics for the task being benchmarked. I chose traits for implementing them. That way I’ll be able to adapt the monitor using inheritance. A Monitor is simply defined as having a begin() and end() method, where end() returns a sequence of statistical data. The type of data returned is defined by the type parameter of Monitor.

In addition to the Monitor trait the previous code get extended with a newMonitor() argument for instantiating them:

case class TaskDone[Id,Stats](id: Id, run: Int, stats: Seq[Stats])
case class TaskFail[Id,Stats](id: Id, run: Int, stats: Seq[Stats], error: Throwable)
case class BenchDone[Id,Stats](warmups: Seq[Task[Id,Stats]], runs: Seq[Task[Id,Stats]])
 
trait Monitor[Stats] {
  private[this] var step = 0
  def begin(): Unit = {
    assert(step == 0, "Improperly handled monitor.")
    step = 1
  }
  def end(): Seq[Stats] = {
    assert(step == 1, "Improperly handled monitor.")
    step = 2
    Nil
  }
}
 
type Report[Result]  = Result => Unit
type Make[Value]     = () => Value
type Task[Id,Stats]  = Either[TaskFail[Id,Stats],TaskDone[Id,Stats]]
type Bench[Id,Stats] = BenchDone[Id,Stats]
type Batch[Id,Stats] = Seq[Bench[Id,Stats]]
 
def task[Id,Stats]
    (id: Id, fn: () => Unit)
    (run: Int)
    (newMonitor: Make[Monitor[Stats]])
    (report: Report[Task[Id,Stats]]): Task[Id,Stats] = {
  val monitor = newMonitor()
  monitor.begin()
  val result = try {
    fn()
    Right(TaskDone(id=id,run=run,stats=monitor.end()))
  } catch {
    case exp: Throwable =>
      Left(TaskFail(id=id,run=run,stats=monitor.end(),error=exp))
  }
  report(result)
  result
}
 
def benchmark[Id,Stats]
    (task: Int => Make[Monitor[Stats]] => Report[Task[Id,Stats]] => Task[Id,Stats])
    (runs: Int, warmups: Int)
    (newMonitor: Make[Monitor[Stats]])
    (warmupReport: Report[Task[Id,Stats]], runReport: Report[Task[Id,Stats]])
    (benchReport: Report[Bench[Id,Stats]]): Bench[Id,Stats] = {
  assert(runs > 0, "Number of runs must be greater than zero.")
  assert(warmups >= 0, "Number of warmups must be zero or greater.")
 
  val (warmupsResults,runsResults) = (
    (1 to warmups) map (run => task(run)(newMonitor)(warmupReport)),
    (1 to runs) map (run => task(run)(newMonitor)(runReport))
  )
  val result = BenchDone(warmups=warmupsResults,runs=runsResults)
  benchReport(result)
  result
}
 
def batch[Id,Stats]
    (benchs: Seq[Report[Bench[Id,Stats]] => Bench[Id,Stats]])
    (benchReport: Report[Bench[Id,Stats]])
    (batchReport: Report[Batch[Id,Stats]]) {
  val result = benchs map (bench => bench(benchReport))
  batchReport(result)
}

The revised diagram sketch now visualizes how statistics generated by the monitor is fed back to reporters:

Example usage

In this revised example we need to define some statistical datatypes. These will be TaskTime and TaskMem which collect time and memory usage. They are case classes extended from the sealed trait TaskStats which will allow us to pattern match them.

type Id = String
 
case class MemStats(free: Long, total: Long, max: Long) {
  def used(): Long = total - free
}
 
sealed trait TaskStats
 
case class TaskTime(duration: Long) extends TaskStats
 
case class TaskMem(samples: Seq[MemStats]) extends TaskStats {
  def avgUsed(): Long = {
    val totals = samples map (mem => mem.used)
    if (totals.nonEmpty) totals.sum / totals.length else 0
  }
  def minUsed(): Long = {
    samples.collect{ case mem: MemStats => mem.used }.min
  }
  def maxUsed(): Long = {
    samples.collect{ case mem: MemStats => mem.used }.max
  }
}

For utility we add some more convenience methods for text conversion and calculating statistics of memory usage.

def timeToText(value: Long): String = {
  def pad(n: Int, value: Long): String =
    value.toString.reverse.padTo(n,"0").reverse.mkString
  // utility for turning nanoseconds into text
  val ms      = (value / 1000000L).toLong
  val msPart  = ms % 1000L
  val sec     = (ms - msPart) / 1000L
  val secPart = sec % 60L
  val min     = (sec - secPart) / 60L
  val minPart = min
  if (minPart > 0) minPart + "m" + pad(2,secPart) + "s" + pad(3,msPart) + "ms"
  else if (secPart > 0)  secPart + "s" + pad(3,msPart) + "ms"
  else msPart + "ms"
}
 
def memToText(value: Long): String = {
  def pad(n: Int, value: Long): String =
    value.toString.reverse.padTo(n,"0").reverse.mkString
  // utility for turning bytes into text
  val mb     = value / (1024L * 1024L)
  val mbPart = mb % 1024L
  val gb     = (mb - mbPart) / 1024L
  val gbPart = gb
  if (gbPart > 0) gbPart + "gb" + pad(3,mbPart) + "mb"
  else mbPart + "mb"
}
 
def taskId[Id](bench: Bench[Id,TaskStats]): Id = {
  // utility for extracting task id
  val firstDone = bench.runs find (result => result.isRight)
  val firstFail = bench.runs find (result => result.isLeft)
  if (firstDone.nonEmpty) firstDone.get.right.get.id
  else firstFail.get.left.get.id
}
 
def isDone[Id](bench: Bench[Id,TaskStats]): Boolean = {
  // utility for detecting a bench without errors
  val warmupsFail = bench.warmups collect { case Left(task) => task }
  val runsFail    = bench.runs collect { case Left(task) => task }
  warmupsFail.isEmpty && runsFail.isEmpty
}
 
def avgTime(bench: Bench[Id,TaskStats]): Long = {
  // utility for calculating average time of runs in a benchmark
  val totals = bench.runs.collect{
    case Right(task) => task.stats.collect{ case TaskTime(value) => value }.sum
    case Left(task)  => task.stats.collect{ case TaskTime(value) => value }.sum
  }
  if (totals.nonEmpty) totals.sum / totals.length else 0
}
 
def minTime(bench: Bench[Id,TaskStats]): Long = {
  // utility for calculating minimum time of runs in a benchmark
  val totals = bench.runs.collect{
    case Right(task) => task.stats.collect{ case TaskTime(value) => value }.min
    case Left(task)  => task.stats.collect{ case TaskTime(value) => value }.min
  }
  totals.min
}
 
def noTime(bench: Bench[Id,TaskStats]): Long = {
  // utility for calculating number of runs in a benchmark
  val totals = bench.runs.collect{
    case Right(task) => task.stats.collect{ case TaskTime(value) => 1 }.sum
    case Left(task)  => task.stats.collect{ case TaskTime(value) => 1 }.sum
  }
  totals.sum
}
 
def avgMem(bench: Bench[Id,TaskStats]): Long = {
  // utility for calculating average memory usage in benchmark
  val totals = bench.runs.collect{
    case Right(task) => task.stats.collect{ case mem: TaskMem => mem.avgUsed }.sum
    case Left(task)  => task.stats.collect{ case mem: TaskMem => mem.avgUsed }.sum
  }
  if (totals.nonEmpty) totals.sum / totals.length else 0
}
 
def maxMem(bench: Bench[Id,TaskStats]): Long = {
  // utility for calculating max memory usage in benchmark
  val totals = bench.runs.collect{
    case Right(task) => task.stats.collect{ case mem: TaskMem => mem.maxUsed }.max
    case Left(task)  => task.stats.collect{ case mem: TaskMem => mem.maxUsed }.max
  }
  totals.max
}
 
def noMem(bench: Bench[Id,TaskStats]): Long = {
  // utility for calculating number of memory samples taken in benchmark
  val totals = bench.runs.collect{
    case Right(task) => task.stats.collect{ case TaskMem(samples) => samples.length }.sum
    case Left(task)  => task.stats.collect{ case TaskMem(samples) => samples.length }.sum
  }
  totals.sum
}

Next up, we need to refine the reporters to account for the new datatypes and output relevant statistics on memory usage.

def taskReport(title: String)(result: Task[Id,TaskStats]) {
  result match {
    case Right(task) =>
      val time = task.stats.collect{ case TaskTime(value) => value }.sum
      println(
        "%s %s %s completed in %s"
          .format(task.id.toString, title, task.run, timeToText(time))
      )
    case Left(task) =>
      val time = task.stats.collect{ case TaskTime(value) => value }.sum
      val error = task.error.getClass.getName
      println(
        "%s %s %s failed after %s from %s"
          .format(task.id.toString, title, task.run, timeToText(time), error)
      )
  }
}
 
val warmupReport = taskReport("warmup") _
 
val runReport = taskReport("run") _
 
def benchReport[Id](bench: Bench[Id,TaskStats]) {
  val id = taskId(bench)
  if (isDone(bench)) {
    val totalTime = bench.runs.collect{
      case Right(task) =>
        task.stats.collect{ case TaskTime(value) => value }.sum
    }.sum
    println(id + " finished in " + timeToText(totalTime) + "\n")
  }
  else
    println(id + " failed\n")
}
 
def batchReport(batch: Batch[Id,TaskStats]) {
  def propsToText(): String = {
    import scala.util.{Properties => Props}
    val os = java.lang.management.ManagementFactory.getOperatingSystemMXBean
    val props = scala.collection.immutable.SortedMap(
      "os" -> (os.getName + " (" + os.getVersion + ")"),
      "arch" -> os.getArch,
      "cpus" -> os.getAvailableProcessors.toString,
      "java" -> (Props.javaVmName + " " + Props.javaVersion),
      "scala" -> Props.versionString.replace("version","").trim
    )
    props.map{ case (k,v) => k + ":" + "\"" + v + "\"" }.mkString(" ")
  }
  def statsToText(status: String, benchs: Seq[Bench[Id,TaskStats]]): String = {
    benchs
      .map(bench => (
        avgTime(bench),noTime(bench),minTime(bench),
        avgMem(bench),noMem(bench),maxMem(bench),taskId(bench)
      ))
      .sortWith((a,b) => a._1 < b._1) // sort on durations
      .map(item =>
        "%2s %12s/%-6s %12s %10s/%-6s %10s %s"
         .format(
           status,timeToText(item._1),item._2,timeToText(item._3),
           memToText(item._4),item._5,memToText(item._6),item._7)
      )
      .mkString("\n")
  }
  val (doneBenchs,failedBenchs) = batch partition isDone
  println(
    "Batch of benchmarks finished with %s completed and %s failed.\n"
      .format(doneBenchs.length,failedBenchs.length)
  )
  println("Statistics for benchmarks:\n")
  println(
    "   %12s/%-6s %12s %10s/%-6s %10s %s"
      .format("avgtime","no","mintime","avgmem","no","maxmem","id")
  )
  println(statsToText("ok", doneBenchs))
  println(statsToText("na", failedBenchs))
  println("\nSystem properties:")
  println(propsToText)
}

For implementing the Monitor trait I use mixin inheritance for this example. One for TimeUsage and one for MemUsage. These traits can be stacked together and accumulate the statistics generated. TimeUsage is just an abstraction of the previous post’s task() code. MemUsage on the other hand is a little more involved. It will stay active during benchmarking and sample memory usage at regular intervals.

For multithreaded sampling I chose actors since they are fairly simple in use. A MemUsage object simply starts the sampler actor by sending a message with NextSample. This will trigger the sampler to start regular recordings of memory usage. When the benchmark is done the MemUsage object will send and wait for reply for a LastSample message. The sampler will reply with the sequence of MemStats samples collected and terminate. Last, the samples are stored in a TaskMem object and accumulated with the other TaskStats results.

trait TimeUsage extends Monitor[TaskStats] {
  var startTime: Long = 0L
 
  override def begin() {
    super.begin()
    startTime = System.nanoTime  
  }
 
  override def end(): Seq[TaskStats] = {
    List(TaskTime(duration=System.nanoTime - startTime)) ++ super.end()
  }
}
 
trait MemUsage extends Monitor[TaskStats] {
  import MemUsage.{NextSample,LastSample}
 
  val sampleRate: Long
  private val rt = java.lang.Runtime.getRuntime
  private val sampler = new actors.Actor {
    var samples = List.empty[MemStats]
    var running = true
    def act() {
      while(running) {
        try { receive {
          case NextSample =>
            samples ::=
              MemStats(free=rt.freeMemory, total=rt.totalMemory, max=rt.maxMemory)
            java.lang.Thread.sleep(sampleRate)
            this ! NextSample
          case LastSample =>
            running = false
            samples ::=
              MemStats(free=rt.freeMemory, total=rt.totalMemory, max=rt.maxMemory)
            reply(samples)
        } }
        catch {
          case error: java.lang.OutOfMemoryError =>
            // skip this sample then wait for LastSample
            java.lang.Thread.sleep(sampleRate)
        }
      }
    }
  }
 
  override def begin() {
    rt.gc()
    sampler.start()
    sampler ! NextSample
    super.begin()
  }
 
  override def end(): Seq[TaskStats] = {
    val stats = super.end()
    (sampler !? LastSample) match {
      case samples => List(TaskMem(samples.asInstanceOf[List[MemStats]])) ++ stats
    }
  }
}
 
object MemUsage {
  private case object NextSample
  private case object LastSample
}

In the new benchmark code we now need to make some adjustments to accommodate the extensions made. First we need to add a function for generating new monitors. Here we use inheritance to select which statistics we are interested in collecting and set relevant configuration. Finally, we also need to update task() calls. Since its a partially applied function where the Stats type parameter is specificed by a later argument, we now need to specify type parameters ahead of time.

val newMonitor = () => {
  new Monitor[TaskStats] with MemUsage with TimeUsage {
    val sampleRate = 50L
  }
}
val (warmups,runs) = (1,2)
val (start,end) = (1,5000000)
val sum = (a: Int, b: Int) => a + b
 
 
@scala.annotation.tailrec
def loopWithTailrec(start: Int, end: Int, sum: Int): Int = {
  if (start <= end) loopWithTailrec(start + 1, end, sum + start)
  else sum
}
 
val rangeTasks = List(
  task[Id,TaskStats](id="range+sum",
    fn=() => (start to end).sum) _,
  task[Id,TaskStats](id="range+foldLeft",
    fn=() => (start to end).foldLeft(0)(sum)) _,
  task[Id,TaskStats](id="range+foldRight",
    fn=() => (start to end).foldRight(0)(sum)) _
)
 
val listTasks = List(
  task[Id,TaskStats](id="list+sum",
    fn=() => List.range(start,end).sum) _,
  task[Id,TaskStats](id="list+foldLeft",
    fn=() => List.range(start,end).foldLeft(0)(sum)) _,
  task[Id,TaskStats](id="list+foldRight",
   fn=() => List.range(start,end).foldRight(0)(sum)) _
)
 
val vectorTasks = List(
  task[Id,TaskStats](id="vector+sum",
    fn=() => Vector.range(start,end).sum) _,
  task[Id,TaskStats](id="vector+foldLeft",
    fn=() => Vector.range(start,end).foldLeft(0)(sum)) _,
  task[Id,TaskStats](id="vector+foldRight",
    fn=() => Vector.range(start,end).foldRight(0)(sum)) _
)
 
val arrayTasks = List(
  task[Id,TaskStats](id="array+sum",
    fn=() => Array.range(start,end).sum) _,
  task[Id,TaskStats](id="array+foldLeft",
    fn=() => Array.range(start,end).foldLeft(0)(sum)) _,
  task[Id,TaskStats](id="array+foldRight",
    fn=() => Array.range(start,end).foldRight(0)(sum)) _
)
 
val streamTasks = List(
  task[Id,TaskStats](id="stream+sum",
    fn=() => Stream.range(start,end).sum) _,
  task[Id,TaskStats](id="stream+foldLeft",
    fn=() => Stream.range(start,end).foldLeft(0)(sum)) _,
  task[Id,TaskStats](id="stream+foldRight",
    fn=() => Stream.range(start,end).foldRight(0)(sum)) _
)
 
val itrTasks = List(
  task[Id,TaskStats](id="list+itr+foldRight",
    fn=() => List.range(start,end).iterator.foldRight(0)(sum)) _,
  task[Id,TaskStats](id="stream+itr+sum",
    fn=() => Stream.range(start,end).iterator.sum) _,
  task[Id,TaskStats](id="stream+itr+foldRight",
    fn=() => Stream.range(start,end).iterator.foldRight(0)(sum)) _
)
 
val loopTasks = List(
  task[Id,TaskStats](id="loop+for",
    fn=() => {
      var sum = 0
      for (i <- start to end) {
        sum = sum + i
      }
      sum
    }) _,
  task[Id,TaskStats](id="loop+while",
    fn=() => {
      var sum = 0
      var i = start
      while (i <= end) {
        sum = sum + i
        i = i + 1
      }
      sum
    }) _,
  task[Id,TaskStats](id="loop+tailrec",
    fn=() => {
      loopWithTailrec(start, end, 0)
    }) _
)
 
val benchs = (
  rangeTasks ++ listTasks ++ vectorTasks ++ arrayTasks ++
    streamTasks ++ itrTasks ++ loopTasks
).map(
  task => benchmark(task)(runs=runs,warmups=warmups)(newMonitor)(warmupReport,runReport) _
)
batch[Id,TaskStats](benchs)(benchReport)(batchReport)

Lets run the updated benchmark and see how memory usage plays into the equation. Now keep in mind that the memory related statistics isn’t necessarily accurate. Generated statistics accumulate memory. The JVM reserves memory for all kinds of optimizations so it might not reflect the actual memory usage of the data-structures. Runtime optimizations performed by the JVM might behave differently when code is used in an implementation. Different JVMs and hardware setups will also affect the result. Atleast you can get some idea of the time and memory usage involved and maybe get some insights into tradeoffs involved.

Batch of benchmarks finished with 16 completed and 5 failed.
 
Statistics for benchmarks:
 
        avgtime/no          mintime     avgmem/no         maxmem id
ok          5ms/2               5ms        3mb/4             3mb loop+while
ok          9ms/2               8ms        3mb/4             3mb loop+tailrec
ok         39ms/2              38ms        7mb/4            12mb loop+for
ok        152ms/2             150ms        3mb/8             7mb range+foldLeft
ok        181ms/2             178ms        4mb/9             8mb range+sum
ok        251ms/2             250ms       23mb/11           27mb array+foldRight
ok        254ms/2             253ms       21mb/10           25mb array+foldLeft
ok        294ms/2             291ms       34mb/12           58mb array+sum
ok      1s101ms/2           1s086ms       13mb/39           36mb stream+foldLeft
ok      1s366ms/2           1s361ms       78mb/45          139mb vector+foldLeft
ok      1s419ms/2           1s416ms       80mb/43          155mb vector+sum
ok      1s547ms/2           1s513ms      123mb/23          201mb range+foldRight
ok      2s002ms/2           1s989ms      112mb/36          199mb list+foldLeft
ok      2s062ms/2           2s061ms      116mb/38          212mb list+sum
ok      2s350ms/2           2s278ms      117mb/45          224mb vector+foldRight
ok      7s166ms/2           6s726ms      181mb/88          247mb list+itr+foldRight
na          2ms/2               2ms        1mb/4             1mb stream+foldRight
na      1s505ms/2           1s247ms       91mb/34          185mb list+foldRight
na     19s834ms/2          19s563ms      193mb/88          247mb stream+sum
na     26s811ms/2          26s255ms      193mb/101         247mb stream+itr+foldRight
na     26s869ms/2          26s692ms      186mb/114         247mb stream+itr+sum
 
System properties:
arch:"x86" cpus:"2" java:"Java HotSpot(TM) Client VM 1.6.0_27" os:"Windows 7 (6.1)" scala:"2.9.1.final"

Leave a Reply

Your email address will not be published. Required fields are marked *