Saturday, 1 September 2012

Disruptor utility

At LMAX, we use the Disruptor in lots of places within our codebase. Usages vary, but one pattern that has been repeated is the use of a Disruptor to serialise multi-threaded access to a non-thread-safe resource (following the single-writer principle).

The approach we've used has always seemed a little clunky, though it has been sufficient for our needs. The general pattern follows this approach:

Given an interface:

public interface EventListener
{
void onEventTypeA(final String arg1, final int arg2);
void onEventTypeB(final long arg1, final long arg2);
}
view raw gistfile1.java hosted with ❤ by GitHub
we will write an interface that puts messages into a ring-buffer:

public final class EventListenerProxy implements EventListener
{
public void onEventTypeA(final String arg1, final int arg2)
{
// claim slot from ring-buffer
proxyEvent = ringBuffer.get(sequence);
proxyEvent.setMethod(ProxyMethod.EVENT_TYPE_A);
proxyEvent.setArgumentsForA(arg1, arg2);
ringBuffer.publish(sequence);
}
public void onEventTypeB(final long arg1, final long arg2)
{
// claim slot from ring-buffer
proxyEvent = ringBuffer.get(sequence);
proxyEvent.setMethod(ProxyMethod.EVENT_TYPE_B);
proxyEvent.setArgumentsForB(arg1, arg2);
ringBuffer.publish(sequence);
}
}
view raw gistfile1.java hosted with ❤ by GitHub
the EventHandler then dispatches to the correct method on the actual implementation object:

public final class InvokerEventHandler implements EventHandler<ProxyEvent>
{
private final EventListener implementation;
public InvokerEventHandler(final EventListener implementation)
{
this.implementation = implementation;
}
@Override
public void onEvent(final ProxyEvent event, final long sequence, final boolean endOfBatch) throws Exception
{
switch(event.getMethod())
{
case EVENT_TYPE_A:
implementation.onEventTypeA(event.getStringArg(), event.getIntArg());
break;
case EVENT_TYPE_B:
implementation.onEventTypeB(event.getLongArg1(), event.getLongArg2());
break;
}
}
}
view raw gistfile1.java hosted with ❤ by GitHub
This approach works well enough, but can be quite verbose if the interface in question has many methods or long parameter lists. We've used it so far because we haven't had any of those problems.

While working on a side-project, I wanted to use the Disruptor to serialise access to an object that had a slightly more complex interface. After starting down the road described above, I realised that the problem could be solved for the general case with a little byte-code generation.

disruptor-proxy is a small library designed to handle all the boiler-plate involved in the described approach. Using the example above, it is trivial to create a proxy object backed by a ring-buffer:

final EventListener implmentation = // create actual implementation
final Disruptor<ProxyMethodInvocation> disruptor = // create disruptor
final RingBufferProxyGeneratorFactory generatorFactory =
new RingBufferProxyGeneratorFactory();
final RingBufferProxyGenerator proxyGenerator =
generatorFactory.create(GeneratorType.BYTECODE_GENERATION);
final EventListener proxy = ringBufferProxyGenerator.createRingBufferProxy(
implementation, EventListener.class, disruptor);
disruptor.start();
proxy.onEventA("foo", 42);
proxy.onEventB(Long.MIN_VALUE, Long.MAX_VALUE);
view raw gistfile1.java hosted with ❤ by GitHub
Two types of generator are supported: BYTECODE_GENERATION and JDK_REFLECTION, which should be self-explanatory. Using the byte-code generation method will add a dependency on the Javassist library.

The performance difference between to the two methods was not as great as I had expected it to be. Reflection doesn't seem to be anywhere near as expensive as it used to be:


Tests performed on Intel(R) Core(TM) i7-2670QM CPU @ 2.20GHz
[JDK_REFLECTION] - Ops per second: 14461091.00, avg latency 69ns
[BYTECODE_GENERATION] - Ops per second: 18313904.00, avg latency 54ns