-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Getting Started
To get started with the Disruptor we are going to consider very simple and contrived example, one that will pass a single long value from a producer to a consumer, where the consumer will simply print out the value. Firstly we will define the Event that will carry the data.
public class LongEvent
{
private long value;
public void set(long value)
{
this.value = value;
}
}
In order to allow the Disruptor to preallocate these events for us, we need to an EventFactory that will perform the construction
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent>
{
public static final LongEventFactory INSTANCE = new LongEventFactory();
public LongEvent newInstance()
{
return new LongEvent();
}
}
Once we have the event defined we need to create a consumer that will handle these events. In our case all we want to do is print the value out the the console.
import com.lmax.disruptor.EventHandler
public class LongEventHandler implements EventHandler<LongEvent>
{
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
System.out.println("Event: " + event);
}
}
We will need a source for these events, for the sake of an example I am going to assume that the data is coming from some sort of I/O device, e.g. network or file in the form of a ByteBuffer.
import com.lmax.disruptor.RingBuffer
public class LongEventProducer
{
private final RingBuffer<LongEvent> ringBuffer;
public class LongEventProducer(RingBuffer<LongEvent> ringBuffer)
{
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer bb)
{
long sequence = ringBuffer.next(); // Grab the next sequence
try
{
LongEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor
// for the sequence
event.set(bb.getLong(0)); // Fill with data
}
finally
{
ringBuffer.publish(sequence);
}
}
}
What becomes immediately obvious is that event publication becomes more invovled that using a simple queue. This is due to the desire for event preallocation. In requires (at the lowest level) a 2-phase approach to message publication, i.e. claim the slot in the ring buffer then publish the available data. It is also necessary to wrap publication in a try/finally block. If we claim a slot in the Ring Buffer (calling RingBuffer.next()) then we must publish this sequence. Failing to do can result in corruption of the state of the Disruptor. Specially, in the multi-producer case this will result in the consumers stalling and being unable to recover without a restart.
With version 3.0 of the Disruptor a richer Lambda-style API was added to help developers by encapsulating this complexity within the Ring Buffer, so post-3.0 the preferred approach for publishing messages is via the Event Publisher/Event Translator portion of the API. E.g.
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.EventTranslatorOneArg;
public class LongEventProducerWithTranslator
{
private final RingBuffer<LongEvent> ringBuffer;
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR =
new EventTranslatorOneArg<LongEvent, ByteBuffer>()
{
public void translateTo(LongEvent event, long sequence, ByteBuffer bb)
{
event.set(bb.getLong(0));
}
}
public void onData(ByteBuffer bb)
{
ringBuffer.publishEvent(TRANSLATOR, bb);
}
}