Agile Zone is brought to you in partnership with:

Entrepreneur. Creator of Groovy++ Alex is a DZone Zone Leader and has posted 31 posts at DZone. You can read more from them at their website. View Full User Profile

Groovy concurrency in action: asynchronious resource pools with Groovy++

05.11.2010
| 8199 views |
  • submit to reddit

Message passing concurrency is very convinient tool for building highly performant applications. Very roughly speaking the idea is that we try to avoid thread locking when awaiting for resources and instead of that send messages to reactive objects when there is work to do. That solves a lot of problems including minimizing potential deadlocks and significantly improve application performance for heavy multi-threaded applications.

Unfortunately, additionally to thread locking we also have problem of blocking operations with shared resource pool. Let me give you specific example. Several days ago I was playing with Cassandra. Here is simpliest code accessing Cassandra

   def transport = new TSocket("localhost", 9160)
def client = new Cassandra.Client(new TBinaryProtocol(transport))
transport.open()
// do whatever you need with Cassandra
transport.close ()

There are two points to notice here:

  • we obviously don't want to open/close connection for each request to Cassandra cluster, so we need to have shared (meaning requiring synchronization) pool of connections
  • even when we have connection available for our use we will be blocked before operation completed, in many situation (UI thread for example) it is not acceptable
Our first wish was to rewrite Cassandra client API in continuation style based on non-blocking socket IO but it seems to be not so easy task (it is based on auto-generated Thrift code)

Here is our general solution for such kind of problems. It is relatively short (74 LOC) and we will describe key ideas below

@Typed abstract class ResourcePool<R> {
Executor executor
boolean runFair

private volatile Pair<FQueue<Action<R>>,FList<R>> state = [FQueue.emptyQueue,null]

abstract FList<R> initResources ()

abstract static class Action<R> extends Function1<R,?> {
Runnable whenDone
}

final void execute (Action action, Runnable whenDone = null) {
action.whenDone = whenDone
if (state.second == null) {
initPool ()
}
for (;;) {
def s = state
if (s.second.empty) {
// no resource available, so put action in to the queue
if(state.compareAndSet(s, [s.first.addLast(action), FList.emptyList]))
break
}
else {
// queue is guaranteed to be empty
if(state.compareAndSet(s, [FQueue.emptyQueue, s.second.tail])) {
// schedule action
executor.execute {
scheduledAction(action,s.second.head)
}
break
}
}
}
}

private final def scheduledAction(Action<R> action, R resource) {
action(resource)

for (;;) {
def s = state
if (s.first.empty) {
// no more actions => we return resource to the pool
if(state.compareAndSet(s, [FQueue.emptyQueue, s.second + resource])) {
return action.whenDone?.run ()
}
}
else {
def removed = s.first.removeFirst()
if(state.compareAndSet(s, [removed.second, s.second])) {
if (runFair || action.whenDone) {
// schedule action
executor.execute {
scheduledAction(removed.first,resource)
}

return action.whenDone?.run ()
}
else {
// tail recursion
return scheduledAction(removed.first, resource)
}
}
}
}
}

private synchronized void initPool () {
if (state.second == null) {
state.second = initResources ()
}
}
}

Let me explain what's going on in the code above

  • Generic parameter R is type of resource we manage
  • There is abstract method initResources, which is responsible for creation and initialization of pooled resources
  • ResourcePool should be backed by some Executor to manage concurrency
  • There is boolean property runFair, which roughly speaking, responsible for strategy of sharing resources of Executor with other tasks. I will explain it in details below

Our example with Сassandra can look something like

  ResourcePool<Cassandra.Client> cassandraPool = {
FList<Cassandra.Client> cp = FList.emptyList
for (i in 0..<3) {
def transport = new TSocket("localhost", 9160)
def client = new Cassandra.Client(new TBinaryProtocol(transport))
transport.open()
cp = cp + client
}
cp
}
cassandraPool.executor = pool

The only interface method for ResourcePool is the method execute, which accepts one or two closures. The first closure defines operation to be executed when shared resource became available. The second closure is optional and define continuation to be scheduled after main operation completed. For example we can use our pool like this

    cassandraPool.execute { client ->
def start = System.currentTimeMillis()
for (i in integers) {
client.insert(keyspace, i, colPathName, "Chris Goffinet$i".toString().getBytes(UTF8), start, ConsistencyLevel.ONE)
client.insert(keyspace, i, colPathAge, "$j".toString().getBytes(UTF8), start, ConsistencyLevel.ONE)
}
def elapsed = System.currentTimeMillis() - start
println "Thread$j: ${integers.size()} inserts in $elapsed ms ${(1.0d*elapsed)/integers.size()}"
}{
cdl.countDown()
}

The implementation itself is pretty straight forward. We use our old friends functiuonal lists and functional queues and keep state as pair of queue of actions to execute and list of available resources.

There is very important invariant, which we always keep in our algorithm: if there is any action in the queue then there is no available resource and if there is available resource then action queue is empty.

Keeping note above in mind it is relatively easy to understand how does method pair of methods execute/scheduledAction works.

  • If message comes when there is no available resource we just place it in to the queue.
  • When a resource become available (by finishing previous processing) it takes care for taking element out of the queue and scheduling execution (or direct execution).
  • If message comes when there is available resource then resource is removed from available list and execution scheduled
runFair is special property, which says if we need to schedule new execution after processing of previous action or could reuse the same thread. Usually it is OK from application standpoint and faster from performance point of view to reuse. When it does not it usually means that you might want to consider using two different executors instead for different parts of your logic.

I hope it was interesting and you will find this tool instrumentual in your development. Thank you for reading and till next time.

Published at DZone with permission of its author, Alex Tkachman.

Comments

Yiguang Hu replied on Tue, 2010/05/11 - 9:21am

Nice. Do you have some statistics that compares performance of groovy++ with groovy and java thread? One obstacle in evangelizing groovy/groovy++ is the concern of performance. Good statics helps a lot.

Thanks

Alex Tkachman replied on Tue, 2010/05/11 - 10:18am in response to: Yiguang Hu

Groovy++ is as fast as Java

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.