Monday, 21 December 2020

Multi-Topic Broadcast in Babl WebSocket Server

As of version 0.10.0, Babl applications gained the ability to broadcast messages to multiple topics simultaneously. This new functionality makes publishing of market data even more efficient.


Using multi-topic broadcast, an application can send a single message to multiple topics, and transform that message for each receiving topic. The message is broadcast to all session containers via IPC, and will be picked up by those containers hosting sessions belonging to the relevant topics.

This specialisation is designed for situations where the same basic information needs to be sent to a large number of sessions, but with a slightly different view depending on the session.

The obvious example of this use-case is the publication of market data, where a single update needs to be sent to several subscribed sessions. Some of these sessions might be subscribing to the full depth of market data, while others might only require the top-of-book updates. 
 
Without multi-topic broadcast, it would be necessary to pre-render each of these updates (full-depth, top-of-book) on the application thread, and to publish over IPC to the session containers. Instead, it is possible to configure a topic for each view-type, and publish a single message over IPC. The work of transforming the message is then off-loaded to a message-transformer, running on the session container thread.
 

Multi-Topic Broadcast API

 

To use the API, an application must create and maintain membership of topics:

private static final class MarketDataApplication
    implements Application, BroadcastSource
{
    private Broadcast broadcast;
    private int[] topicIds;

    @Override
    public void setBroadcast(final Broadcast broadcast)
    {
        // store the Broadcast implementation
        this.broadcast = broadcast;
        this.topicIds = new int[] {
            MARKET_DATA_FULL_DEPTH, MARKET_DATA_TOP_OF_BOOK};
        // create a topic for broadcast
        broadcast.createTopic(MARKET_DATA_FULL_DEPTH);
        broadcast.createTopic(MARKET_DATA_TOP_OF_BOOK);
    }

    @Override
    public int onSessionConnected(final Session session)
    {
        // add new sessions to the topic
        broadcast.addToTopic(MARKET_DATA_TOP_OF_BOOK, session.id());
        return SendResult.OK;
    }

    public void onMarketDataUpdate(final MarketDataUpdate update)
    {
        final DirectBuffer buffer = serialise(update);
        // send a message to all sessions registered on the topic
        broadcast.sendToTopics(topicIds, buffer, 0, buffer.capacity());
    }
}

Application messages broadcast to multiple topics will be handed to the registered message transformer before being written to sessions. A message transformer simply creates a topic-dependent view based on the topic ID:

final class MarketDataTransformer
    implements MessageTransformer
{
    private final ExpandableArrayBuffer dst =
        new ExpandableArrayBuffer();
    private final TransformResult transformResult =
        new TransformResult();

    @Override
    public TransformResult transform(
        final int topicId,
        final DirectBuffer input,
        final int offset,
        final int length)
    {
        if (topicId == MARKET_DATA_TOP_OF_BOOK)
        {
            transformResult.set(dst, 0, encodeTopOfBook(input, offset, length));
        }
        else
        {
            transformResult.set(dst, 0, encodeFullDepth(input, offset, length));
        }
        return transformResult;
    }
}

The message transformer must be registered before server start-up using the configuration API:

new BablConfig().sessionContainerConfig()
    .messageTransformerFactory(topicId -> new MarketDataTransformer());


More Info

Babl source-code is available on Github, and releases are published to maven central and Docker hub.

Full documentation is available at https://babl.ws.



Enjoyed this post? You might be interested in subscribing to the Four Steps Newsletter, a monthly newsletter covering techniques for writing faster, more performant software.









Tuesday, 13 October 2020

Babl High-Performance WebSocket Server

 

A lockdown project

A couple of years ago, after evaluating the available open-source solutions, I became interested in the idea of writing a low-latency websocket server for the JVM. The project I was working on required a high-throughput, low-latency server to provide connectivity to a clustered backend service. We started by writing some benchmarks to see whether the most popular servers would be fast enough. In the end, we found that an existing library provided the performance required, so the project moved on. I still had the feeling that a cut-down and minimal implementation of the websocket protocol could provide better overall, and tail latencies than a generic solution.

The project in question was the Hydra platform, a product of Adaptive Financial Consulting[1], created to help accelerate development of their clients’ applications. The Hydra platform uses Aeron Cluster[2] for fault-tolerant, high-performance state replication, along with Artio[3] for FIX connectivity, and a Vert.x[4]-based websocket server for browser/API connectivity. Users of Hydra get a “batteries included” platform on which to deploy their business logic, with Hydra taking care of all the messaging, fault-tolerance, lifecycle, and state-management functions.

Vert-x was the right choice for the platform, as it provided good performance, and came with lots of other useful functionality such as long-poll fallback. However, I was still curious about whether it would be possible to create a better solution, performance-wise, just for the websocket connectivity component. During the UK’s lockdown period, I found time to begin development on a new project, of which Babl is the result.


High Performance Patterns

I have been working on low-latency systems of one sort or another for over 10 years. During that time the single-thread event-loop pattern has featured again and again. The consensus is that a busy-spinning thread is the best way to achieve low latencies. Conceptually, this makes sense, as we avoid the wake-up cost associated with some kind of signal to a waiting thread. In practice, depending on the deployment environment (such as a virtual cloud instance), busy-spinning can be detrimental to latency due to throttling effects. For this reason, it is usually reserved for bare-metal deployments, or cases where there are known to be no multi-tenanting issues.

Another well-known approach to achieve high performance is to exploit memory locality by using cache-friendly data-structures. In addition, object-reuse in a managed-runtime environment, such as the JVM, can help an application avoid long pause times due to garbage collection.

One other performance anti-pattern commonly seen is the usage of excess logging and monitoring. Babl exports metrics counters to shared memory, where they can be read from another process. Other metrics providers, such as JMX will cause allocation within the JVM, contributing to latency spikes. The shared memory approach means that a ‘side-car’ process can be launched, which is responsible for polling metrics and transmitting them to a backend store such as a time-series database.

By applying these techniques in a configuration inspired by Real Logic’s Artio FIX engine, I aimed to create a minimally functional websocket server that could be used in low-latency applications, such as cryptocurrency exchanges. The design allows for socket-processing work to be scaled out to the number of available CPUs, fully utilising the available resources on today’s multi-core machines.

Architecture

The Babl architecture separates the concern of socket- and protocol-handling from the needs of the application developer. The intent is that the developer needs only implement business logic, with Babl taking care of all protocol-level operations such as framing, heartbeats and connections.

Socket processing and session lifecycle is managed in the session container, instances of which can be scaled horizontally. The session container sends messages to the application container via an efficient IPC, provided by Aeron. In the application container, the user’s business logic is executed on a single thread, with outbound messages routed back to the originating session container.

The image below shows how the various components are related.

 


Use cases

Babl is designed for modern applications where latency is a differentiator, rather than as a general web-server. For this reason, there is no long-poll fallback for use in browsers that are unable to establish web-socket connections; nor is there any facility for serving web artifacts over HTTP.

If HTTP resources need to be served, the recommended approach is to use some static resource server (e.g. nginx) to handle browser requests, and to proxy websocket connections through the same endpoint. Currently, this must be done through the same domain; CORS/pre-flight checks will be added in a future release. An example of this approach can be seen in the Babl github repository.

Configuration

Babl can be launched using its APIs, or via the command-line using properties configuration. The various configuration options control everything from socket parameters, memory usage, and performance characteristics.

To quickly get started, add the Babl libraries to your application, implement the Application interface, and launch Babl server from the command-line or in a docker container.

Performance

Babl has been designed from the beginning to offer the best available performance. Relevant components of the system have been benchmarked and profiled to make sure that they are optimised by the JVM.

JMH benchmarks demonstrate that Babl can process millions of websocket messages per-second, on a single thread:


Benchmark (ops/sec)                        Score         Error

FrameDecoder.decodeMultipleFrames   13446695.952 ±  776489.255  

FrameDecoder.decodeSingleFrame      53432264.716 ±  854846.712  

FrameEncoder.encodeMultipleFrames   12328537.512 ±  425162.902  

FrameEncoder.encodeSingleFrame      39470675.807 ± 2432772.310  

WebSocketSession.processSingleMsg   15821571.760 ±  173051.962  



Due to Babl’s design and tight control over threading, it is possible to use thread-affinity to run the processing threads (socket-process and application logic) on isolated cores to further reduce latency outliers caused by system jitter. In a comparative benchmark between Babl and another leading open-source solution, Babl has much lower tail latencies due to its allocation-free design, and low-jitter configuration.

In this test, both server implementations use a queue to pass incoming messages to a single-threaded application. In the case of Babl, this is Aeron IPC, for the other server I used the trusted Disruptor with a busy-spin wait strategy. The application logic echoes responses back to the socket processing component via a similar queue, as shown below: 





Each load generator thread runs on its own isolated CPU, handling an even number of sessions; each session publishes a 200-byte payload to the server at a fixed rate, and records the RTT.

Both server and client processes are run using OpenOnload to bypass the kernel networking stack.

The load generator runs a 100 second warm-up period at the specified throughput, before running a 100 second measurement period; this ensures that any JIT compilation has had time to complete.

The work done on the server includes the following steps:


  1. Socket read

  2. Protocol decoding

  3. Thread hop on inbound queue

  4. Application logic (echo)

  5. Thread hop on outbound queue

  6. Protocol encoding

  7. Socket write


Benchmark Results

Response times are expressed in microseconds.

10,000 messages per second, 1,000 sessions




Min

50%

90%

99%

99.99%

Max

Babl

29.3

56.3

81.7

144.8

213.9

252.2

Vert.x

33.3

70.0

106.8

148.0

422.4

2,013.2


100,000 messages per second, 1,000 sessions




Min

50%

90%

99%

99.99%

Max

Babl

25.2

55.3

81.5

131.8

221.1

393.2

Vert.x

34.2

73.8

95.9

132.7

422.4

10,002.4



Getting Babl

Babl’s source-code is available on Github, and releases are published to maven central and docker hub.

The benchmarking harness used to compare relative performance is also on Github at https://github.com/babl-ws/ws-harness.

Thanks

Special thanks to Adaptive for providing access to their performance testing lab for running the benchmarks.

Links

  1. https://weareadaptive.com

  2. https://github.com/real-logic/aeron

  3. https://github.com/real-logic/artio

  4. https://vertx.io/


Enjoyed this post? You might be interested in subscribing to the Four Steps Newsletter, a monthly newsletter covering techniques for writing faster, more performant software.


Tuesday, 19 February 2019

Recall Design

This article discusses the design of Recall, an efficient, low-latency off-heap object store for the JVM.

Recall is designed for use by single-threaded applications, making it simple and performant.

In the multi-core era, it may seem odd that high-performance software is designed for use
by a single thread, but there is a good reason for this.

Developers of low-latency systems will be aware of the Disruptor pattern,
which showed that applying the Single Writer Principal could lead to extreme performance gains.

Recall is also allocation-free (in steady state), meaning that it will always use pieces of
memory that are more likely to be resident in CPU caches. Recall will never cause unwanted
garbage collection pauses, since it will not exhaust any memory region.

Allocations in Recall


While allocation-free in steady state, Recall will allocate new buffers as data is recorded
into a Store. For the most part, these should be direct ByteBuffers, which will not
greatly impact the JVM heap. Users can pre-empt these allocations by correctly sizing
a Store or SequenceMap before use, to ensure that necessary storage is available at
system start time.

Benefits of single-threaded design


Since the data structures in Recall are never accessed by multiple threads, there is no need
for locking, so there is no contention. Resource contention is one of the main things that
can limit throughput in multi-threaded programs, so side-stepping the problem altogether
leads to significant gains over solutions designed for multi-threaded applications.

Highly peformant multi-threaded map implementations will use compare-and-swap (CAS)
operations to avoid the cost of standard locks, and remain lock-free. While this does
lead to typically better scalability than a map utilising locks, those CAS operations
are still relatively expensive compared to the normal arithmetic operations that
are used in single-threaded designs.

 

Memory layout


Recall's Store object is an open-addressed hash map, storing a 64-bit long value against
an arbitrary number of bytes. The developer is responsible for providing functions to
serialise and deserialise objects to and from the Store's buffer. Each entry is of a fixed
length, eliminating the chance of compaction problems.

The mapping of long to byte sequence is performed by a simple open-addressed hash map Agrona library).
Each entry in the map records the long identifier against a
position in the Store's buffer. This makes the "index" for the data structure incredibly
compact (~16b per entry), and has the nice side-effect of making inserts, updates and removals
occur in effectively constant time.

Due to the use of a separate index, Recall's Store does not suffer from compaction problems,
and is always 100% efficient in terms of storage space used.

 

Inserting a new record


Record insertion involves adding a new key to the "index", serialising the entry,
and increasing the write pointer by the record size.

Note that if insertion causes the map to exceed its load factor, then a re-hash
will occur, causing each entry to be copied into a new, larger buffer.
For this reason it is recommended to correctly size the Store when it is
constructed.

 

Deleting a record


Removing a record from the map involves copying the highest entry in the buffer
over the top of the entry to be remove, updating the "index", and decreasing
the write pointer by the record size.

 

Example usage


Consider the following highly contrived example:

A trading exchange has the requirement to publish possible profit reports to holders
of open positions as market prices fluctuate. When a user creates an open position,
a distribution gateway will need to cache the position, and on every price update
received from the exchange, publish some information indicating the possible profit
associated with the position.

In order to meet the exchange's strict latency requirements, the gateway must be allocation-free,
and is written according to the single-writer principle.

 

Messages


Orders are represented by the Order class:


public final class Order {
  private long accountId;
  private long orderId;
  private int instrumentId;
  private double quantity;
  private double price;

  // getters and setters omitted
}


New Orders are received on the OrderEvents interface:


public interface OrderEvents {
  void onOrderPlaced(Order orderFlyweight);
}



Market data is received on the OrderBook interface:

public interface OrderBook {
  void onPriceUpdate(int instrumentId, double bid, double ask, long timestamp);
}


Profit updates are published on the ProfitEvents interface:


public interface ProfitEvents {
  void onProfitUpdate(long orderId, long accountId, double buyProfit, double sellProfit, long timestamp);
}


Implementation



public final class ProfitPublisher implements OrderBook, OrderEvents {
  private final SingleTypeStore<ByteBuffer, Order> store = createStore();
  private final IdAccessor<Order> idAccessor = new OrderIdAccessor();
  private final Encoder<Order> encoder = new OrderEncoder();
  private final Decoder<Order> decoder = new OrderDecoder();
  private final Int2ObjectMap<LongHashSet> instrumentToOrderSetMap = createMap();
  private final Order flyweight = new Order();
  private final ProfitEvents profitEvents; // initialised in constructor
 
  public void onOrderPlaced(Order orderFlyweight) {
    store.store(orderFlyweight, encoder, idAccessor);
    instrumentToOrderSetMap.get(orderFlyweight.instrumentId()).add(orderFlyweight.id());
  }

  public void onPriceUpdate(int instrumentId, double bid, double ask, long timestamp) {
    for (long orderId : instrumentToOrderSetMap.get(instrumentId)) {
      store.load(flyweight, decoder, orderId);
      double buyProfit = flyweight.quantity() * (flyweight.price() - ask);
      double sellProfit = flyweight.quantity() * (flyweight.price() - bid);
      profitEvents.onProfitUpdate(orderId, flyweight.accountId(), buyProfit, sellProfit, timestamp);
    }
  }
}


In this example, the incoming Orders are serialised to a Store. When a price update is received,
the system iterates over any Orders associated with the specified instrument, and publishes
a profit update for the Order.

Operation is allocation-free, meaning that the system will run without any garbage-collection pauses
that could cause unwanted latency spikes. There is no need for object-pooling, as domain objects are
serialised to the Store's buffer until required.