Skip to content

Getting Started

Michael Barker edited this page May 2, 2014 · 34 revisions

Getting the Disruptor

The Disruptor jar file is available from Maven Central and can be integrated into your dependency manager of choice from there. You can also download the binary from our downloads page on the Wiki. The observant will note that these are just a link to the binaries on Maven Central.

Basic Event Produce and Consume

To get started with the Disruptor we are going to consider very simple and contrived example, one that will pass a single long value from a producer to a consumer, where the consumer will simply print out the value. Firstly we will define the Event that will carry the data.

public class LongEvent
{
    private long value;

    public void set(long value)
    {
        this.value = value;
    }
}

In order to allow the Disruptor to preallocate these events for us, we need to an EventFactory that will perform the construction

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent>
{
    public LongEvent newInstance()
    {
        return new LongEvent();
    }
}

Once we have the event defined we need to create a consumer that will handle these events. In our case all we want to do is print the value out the the console.

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent>
{
    public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println("Event: " + event);
    }
}

We will need a source for these events, for the sake of an example I am going to assume that the data is coming from some sort of I/O device, e.g. network or file in the form of a ByteBuffer.

import com.lmax.disruptor.RingBuffer;

public class LongEventProducer
{
    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducer(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }

    public void onData(ByteBuffer bb)
    {
        long sequence = ringBuffer.next();  // Grab the next sequence
        try
        {
            LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
                                                        // for the sequence
            event.set(bb.getLong(0));  // Fill with data
        }
        finally
        {
            ringBuffer.publish(sequence);
        }
    }
}

What becomes immediately obvious is that event publication becomes more invovled that using a simple queue. This is due to the desire for event preallocation. In requires (at the lowest level) a 2-phase approach to message publication, i.e. claim the slot in the ring buffer then publish the available data. It is also necessary to wrap publication in a try/finally block. If we claim a slot in the Ring Buffer (calling RingBuffer.next()) then we must publish this sequence. Failing to do can result in corruption of the state of the Disruptor. Specially, in the multi-producer case this will result in the consumers stalling and being unable to recover without a restart.

With version 3.0 of the Disruptor a richer Lambda-style API was added to help developers by encapsulating this complexity within the Ring Buffer, so post-3.0 the preferred approach for publishing messages is via the Event Publisher/Event Translator portion of the API. E.g.

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;

public class LongEventProducerWithTranslator
{
    private final RingBuffer<LongEvent> ringBuffer;
    
    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer)
    {
        this.ringBuffer = ringBuffer;
    }
    
    private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
        new EventTranslatorOneArg<LongEvent, ByteBuffer>()
        {
            public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
            {
                event.set(bb.getLong(0));
            }
        };

    public void onData(ByteBuffer bb)
    {
        ringBuffer.publishEvent(TRANSLATOR, bb);
    }
}

The other advantage of this approach is that the translator code can be pulled into a separate class and easily unit tested independently. The Disruptor provides a number of different interfaces (EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, etc.) that can be implemented to provide translators. The reason for is to allow for the translators to be represented as static classes or non-capturing lambda (when Java 8 rolls around) as the arguments to the translation method are passed through the call on the Ring Buffer through to the translator.

The final step is to wire the whole thing together. It is possible to wire all of the components manually, however it can be a little bit complicated so a DSL is provided to simplify construction. Some of the more complicated options are not available via the DSL, however it is suitable for most circumstances.

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // Executor that will be used to construct new threads for consumers
        Executor executor = Executors.newCachedThreadPool();

        // The factory for the event
        LongEventFactory factory = new LongEventFactory();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, executor);

        // Connect the handler
        disruptor.handleEventsWith(new LongEventHandler());

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            producer.onData(bb);
            Thread.sleep(1000);
        }
    }
}

Using Java 8

One of the design influences of the Disruptor's API was that Java 8 was going to rely on the concept of Functional Interfaces to serve as type declarations for Java Lambdas. Most of the interface definitions in the Disruptor API fit the requirements of a Functional Interface so that in most cases a Lambda can be used instead of a custom class, which can reduce the boiler place required.

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        // Executor that will be used to construct new threads for consumers
        Executor executor = Executors.newCachedThreadPool();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(() -> new LongEvent(), bufferSize, executor);

        // Connect the handler
        disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event));

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
            Thread.sleep(1000);
        }
    }
}

Note how a number of the classes (e.g. handler, translator) are no longer required. Also note how the lambda used for publishEvent() only refers to the parameters that are passed in. If we were to instead write that code as:

ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
    bb.putLong(0, l);
    ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0)));
    Thread.sleep(1000);
}

This would create a capturing lambda, meaning that it would need to instantiate an object to hold the ByteBuffer bb variable as it passes the lambda through to the publishEvent() call. This will create additional (unnecessary) garbage, so the call that passes the argument through to the lambda should be preferred if low GC pressure is a requirement.

Give that method references can be used instead of anonymous lamdbas it is possible to rewrite the example in this fashion.

import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class LongEventMain
{
    public static LongEvent createLongEvent()
    {
        return new LongEvent();
    }

    public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch)
    {
        System.out.println(event);
    }

    public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
    {
        event.set(buffer.getLong(0));
    }

    public static void main(String[] args) throws Exception
    {
        // Executor that will be used to construct new threads for consumers
        Executor executor = Executors.newCachedThreadPool();

        // Specify the size of the ring buffer, must be power of 2.
        int bufferSize = 1024;

        // Construct the Disruptor
        Disruptor<LongEvent> disruptor = new Disruptor<>(LongEventMain::createLongEvent,
                                                         bufferSize,
                                                         executor);

        // Connect the handler
        disruptor.handleEventsWith(LongEventMain::handleEvent);

        // Start the Disruptor, starts all threads running
        disruptor.start();

        // Get the ring buffer from the Disruptor to be used for publishing.
        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();

        LongEventProducer producer = new LongEventProducer(ringBuffer);

        ByteBuffer bb = ByteBuffer.allocate(8);
        for (long l = 0; true; l++)
        {
            bb.putLong(0, l);
            ringBuffer.publishEvent(LongEventMain::translate, bb);
            Thread.sleep(1000);
        }
    }
}

Basic Tuning Options

Using the above approach will work functionally in the widest set of deployment scenarios. However, if you able to make certain assumptions about the hardware and software environment that the Disruptor will run in then you can take advantage of a number of tuning options to improve performance. There are 2 main options for tuning, single vs. multiple producers and alternative wait strategies.

Single vs. Multiple Producers

One of the best ways to improve performance in concurrect systems is to ahere to the Single Writer Princple, this applies to the Disruptor. If you are in the situation where there will only ever be a single thread producing events into the Disruptor, then you can take advantage of this to gain additional performance.

public class LongEventMain
{
    public static void main(String[] args) throws Exception
    {
        //.....
        // Construct the Disruptor with a SingleProducerSequencer
        Disruptor<LongEvent> disruptor = new Disruptor(factory, 
                                                       bufferSize,
                                                       ProducerType.SINGLE, // Single producer
                                                       new BlockingWaitStrategy(),
                                                       executor);

        //.....
    }
}

To give an indication of how much of a performance advantage can be achieved through this technique we can change the producer type in the OneToOne performance test. Tests run on i7 Sandy Bridge MacBook Air.

Multiple Producer

Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec

Single Producer

Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec

Alternative Wait Strategies

The default wait strategy used by the Disruptor is the BlockingWaitStrategy. Internally the BlockingWaitStrategy uses a typical lock and condition variable to handle thread wake-up. The BlockingWaitStrategy is the slowest of the available wait strategies, but is the most conservative with the respect to CPU usage and will give the most consistent behaviour across the widest variety of deployment options. However, again knowledge of the deployed system can allow for additional performance.

Like the BlockingWaitStrategy the SleepingWaitStrategy it attempts to be conservative with CPU usage, by using a simple busy wait loop, but uses a call to LockSupport.parkNanos(1) in the middle of the loop. On a typical Linux system this will pause the thread for around 60µs. However it has the benefit that the producing thread does not need to take any action other increment the appropriate counter and does not require the cost of signalling a condition variable. However, the mean latency of moving the event between the producer and consumer threads will be higher. It works best in situations where low latency is not required, but a low impact on the producing thread is desired. A common use case is for asynchronous logging.

The YieldingWaitStrategy is one of 2 Wait Strategies that can be use in low latency systems, where there is the option to burn CPU cycles with the goal of improving latency. The YieldingWaitStrategy will busy spin waiting for the sequence to increment to the appropriate value. Inside the body of the loop Thread.yield() will be called allowing other queued threads to run. This is the recommended wait strategy when need very high performance and the number of Event Handler threads is less than the total number of logical cores, e.g. you have hyper-threading enabled.

The BusySpinWaitStrategy is the highest performing Wait Strategy, but puts the highest constraints on the deployment environment. This wait strategy should only be used if the number of Event Handler threads is smaller than the number of physical cores on the box. E.g. hyper-threading should be disabled.

Clone this wiki locally