package sbt
trait ConcurrentRestrictions[A]
{
type G
def empty: G
def add(g: G, a: A): G
def remove(g: G, a: A): G
def valid(g: G): Boolean
}
import java.util.{LinkedList,Queue}
import java.util.concurrent.{Executor, Executors, ExecutorCompletionService}
import annotation.tailrec
object ConcurrentRestrictions
{
def unrestricted[A]: ConcurrentRestrictions[A] =
new ConcurrentRestrictions[A]
{
type G = Unit
def empty = ()
def add(g: G, a: A) = ()
def remove(g: G, a: A) = ()
def valid(g: G) = true
}
def limitTotal[A](i: Int): ConcurrentRestrictions[A] =
{
assert(i >= 1, "Maximum must be at least 1 (was " + i + ")")
new ConcurrentRestrictions[A]
{
type G = Int
def empty = 0
def add(g: Int, a: A) = g + 1
def remove(g: Int, a: A) = g - 1
def valid(g: Int) = g <= i
}
}
final case class Tag(name: String)
val tagsKey = AttributeKey[TagMap]("tags", "Attributes restricting concurrent execution of tasks.")
val Untagged = Tag("untagged")
val All = Tag("all")
type TagMap = Map[Tag, Int]
def tagged[A](get: A => TagMap, validF: TagMap => Boolean): ConcurrentRestrictions[A] =
new ConcurrentRestrictions[A]
{
type G = TagMap
def empty = Map.empty
def add(g: TagMap, a: A) = merge(g, a, get)(_ + _)
def remove(g: TagMap, a: A) = merge(g, a, get)(_ - _)
def valid(g: TagMap) = validF(g)
}
private[this] def merge[A](m: TagMap, a: A, get: A => TagMap)(f: (Int,Int) => Int): TagMap =
{
val base = merge(m, get(a))(f)
val un = if(base.isEmpty) update(base, Untagged, 1)(f) else base
update(un, All, 1)(f)
}
private[this] def update[A,B](m: Map[A,B], a: A, b: B)(f: (B,B) => B): Map[A,B] =
{
val newb =
(m get a) match {
case Some(bv) => f(bv,b)
case None => b
}
m.updated(a,newb)
}
private[this] def merge[A,B](m: Map[A,B], n: Map[A,B])(f: (B,B) => B): Map[A,B] =
(m /: n) { case (acc, (a,b)) => update(acc, a, b)(f) }
def completionService[A,R](tags: ConcurrentRestrictions[A], warn: String => Unit): (CompletionService[A,R], () => Unit) =
{
val pool = Executors.newCachedThreadPool()
(completionService[A,R](pool, tags, warn), () => pool.shutdownNow() )
}
def completionService[A,R](backing: Executor, tags: ConcurrentRestrictions[A], warn: String => Unit): CompletionService[A,R] =
{
final class Enqueue(val node: A, val work: () => R)
new CompletionService[A,R]
{
private[this] val jservice = new ExecutorCompletionService[R](backing)
private[this] var tagState = tags.empty
private[this] var running = 0
private[this] val pending = new LinkedList[Enqueue]
def submit(node: A, work: () => R): Unit = synchronized
{
val newState = tags.add(tagState, node)
if(tags valid newState)
{
tagState = newState
submitValid( node, work )
}
else
{
if(running == 0) errorAddingToIdle()
pending.add( new Enqueue(node, work) )
}
}
private[this] def submitValid(node: A, work: () => R) =
{
running += 1
val wrappedWork = () => try work() finally cleanup(node)
CompletionService.submit(wrappedWork, jservice)
}
private[this] def cleanup(node: A): Unit = synchronized
{
running -= 1
tagState = tags.remove(tagState, node)
if(!tags.valid(tagState)) warn("Invalid restriction: removing a completed node from a valid system must result in a valid system.")
submitValid(new LinkedList)
}
private[this] def errorAddingToIdle() = warn("Invalid restriction: adding a node to an idle system must be allowed.")
@tailrec private[this] def submitValid(tried: Queue[Enqueue]): Unit =
if(pending.isEmpty)
{
if(!tried.isEmpty)
{
if(running == 0) errorAddingToIdle()
pending.addAll(tried)
}
}
else
{
val next = pending.remove()
val newState = tags.add(tagState, next.node)
if(tags.valid(newState))
{
tagState = newState
submitValid(next.node, next.work)
}
else
tried.add(next)
submitValid(tried)
}
def take(): R = jservice.take().get()
}
}
}