Imagine that you wrote a program for a problem that is very easy to solve. Each thread does its own piece of work, and the threads don’t have to work together except to combine their results at the end. The more cores it runs on, the faster it is. You try it out on a laptop first, and it works almost perfectly on all of the four cores. Because the laptop is so small, you run it on a big, fancy machine with a lot of cores and expect even better performance. Instead, it runs slower than the laptop, no matter how many cores you give it. Uh. In the last few days, I’ve had that happen to me.

In the last few weeks, I’ve been working on a Cassandra benchmarking tool called Latte. This tool is probably the most efficient Cassandra benchmarking tool you can get, both in terms of CPU use and memory use. Everything is very simple: a small piece of code makes data and runs a bunch of asynchronous CQL statements against Cassandra at the same time. In a loop, Latte runs that code over and over again and keeps track of how long each time. Finally, it does a statistical analysis and shows it in different ways.

Benchmarking looks like a good thing to do in groups. There aren’t any restrictions on how many threads can call the code under the benchmark. Blogged about how to do that in Rust.

However, at the time I wrote those earlier blog posts, Latte’s ability to define workloads was not very good. It had only two pre-set, hard-coded tasks: one for reading and one for writing. You could change some things, like the number of columns in a table, but not very much. In this case, there aren’t any extra indexes. None of the filtering clauses are made up by the author. The CQL text is out of your hands. Nothing. So, in general, Latte at the time was more of a proof-of-concept than a tool that could be used for real work. There’s no doubt you could fork it and write a new task in Rust, then compile everything from source. Why spend time learning about a niche benchmarking tool when you don’t even need it to do it?

Rune scripting

So last year, in order to be able to measure the performance of storage-attached indexes in Cassandra, I decided to integrate Latte with a scripting engine that would allow me to easily define workloads without recompiling the whole program. After playing a bit with embedding CQL statements in TOML config files (which turned out to be both messy and limited at the same time), and having some fun with embedding Lua (which is probably great in the C world, but didn’t play as nicely with Rust as I expected), I eventually ended up with a design similar to that of sysbench but with an embedded Rune interpreter instead of Lua.

The main selling points of Rune that convinced me were painless Rust integration and support for async code. Thanks to async support, the user can execute CQL statements directly in the workload scripts, leveraging the asynchronous nature of the Cassandra driver. Additionally, the Rune team is amazingly helpful and removed anything that blocked me in virtually no time.

Here is an example of a complete workload that measures performance of selecting rows by random keys:

const ROW_COUNT = latte::param!("rows", 100000);

const KEYSPACE = "latte";
const TABLE = "basic";

pub async fn schema(ctx) {
    ctx.execute(`CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} \
                    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`).await?;
    ctx.execute(`CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
}

pub async fn erase(ctx) {
    ctx.execute(`TRUNCATE TABLE ${KEYSPACE}.${TABLE}`).await?;
}

pub async fn prepare(ctx) {
    ctx.load_cycle_count = ROW_COUNT;
    ctx.prepare("insert", `INSERT INTO ${KEYSPACE}.${TABLE}(id) VALUES (:id)`).await?;
    ctx.prepare("select", `SELECT * FROM ${KEYSPACE}.${TABLE} WHERE id = :id`).await?;
}

pub async fn load(ctx, i) {
    ctx.execute_prepared("insert", [i]).await?;
}

pub async fn run(ctx, i) {
    ctx.execute_prepared("select", [latte::hash(i) % ROW_COUNT]).await?;
}

You can find more info on how to write those scripts in the README.

Benchmarking the benchmarking program

Although the scripts are not JIT-compiled to native code yet, they are acceptably fast, and thanks to the limited amount of code they typically contain, they don’t show up at the top of the profile. I’ve empirically found that the overhead of Rust-Rune FFI was lower than that of Rust-Lua provided by mlua, probably due to the safety checks employed by mlua.

Initially, to assess the performance of the benchmarking loop, I created an empty script:

pub async fn run(ctx, i) {
}

Even though there is no function body there, the benchmarking program needs to do some work to actually run it:

  • schedule N parallel asynchronous invocations using buffer_unordered
  • setup a fresh local state (e.g. stack) for the Rune VM
  • invoke the Rune function, passing the parameters from the Rust side
  • measure the time it took to complete each returned future
  • collect logs, update HDR histograms and compute other statistics
  • and run all of that on M threads using Tokio threaded scheduler

Because there are 4 cores, the throughput increases linearly up to 4 threads. Then it increases slightly more up to 8 threads, thanks to hyper-threading that squeezes a bit more performance out of each core. Obviously there is no performance improvement beyond 8 threads, because all CPU resources are saturated at this point.

I was also satisfied with the absolute numbers I got. A few million of empty calls per second on a laptop sounds like the benchmarking loop is lightweight enough to not cause significant overhead in real measurements. A local Cassandra server launched on the same laptop can only do about 200k requests per second when fully loaded and that only if those requests are stupidly simple and all the data fits in memory.

By the way, after adding some real code for data generation in the body, but with no calls to the database, as expected everything got proportionally slower, but not more than 2x slower, so it was still in a “millions ops per second” range.

That was easy. I could have stopped here and announce victory. However, I was curious how fast it could go if tried on a bigger machine with more cores.

Running an empty loop on 24 cores

A server with two Intel Xeon CPU E5-2650L v3 processors, each with 12 cores running at 1.8 GHz should be obviously a lot faster than an old 4-core laptop, shouldn’t it? Well, maybe with 1 thread it would be slower because of lower CPU frequency (3 GHz vs 1.8 GHz), but it should make up for that by having many more cores.

Let the numbers speak for themselves:

umair-akbar-image 8 - How a Single Line of Code Made a 24-core Server Slower Than a Laptop
how a single line of code made a 24-core server slower than a laptop

You’ll agree there is something wrong here. Two threads are better than one… and that’s basically it. I couldn’t squeeze more throughput than about 2 million calls per second, which was about 4x worse than the throughput I got on the laptop. Either the server was a lemon or my program had a serious scalability issue.

Investigation

When you hit a performance problem, the most common way of investigating it is to run the code under profiler. In Rust, it is very easy to generate flamegraphs with cargo flamegraph. Let’s compare the flamegraphs collected when running the benchmark with 1 thread vs 12 threads:

umair-akbar-flamegraph t1 - How a Single Line of Code Made a 24-core Server Slower Than a Laptop
umair-akbar-flamegraph t12 - How a Single Line of Code Made a 24-core Server Slower Than a Laptop

I was expecting to find a single thing that was a bottleneck, e.g. a contended mutex or something similar, but to my surprise, there was nothing obvious there. There wasn’t even a single bottleneck! Rune’s VM::run code seemed to take about 1/3 of the time, but the rest was simply taken by polling futures and quite likely the culprit got inlined and disappeared from the profile.

Anyway, because of VM::run and the path rune::shared::assert_send::AssertSend leading also to Rune, I decided to disable the code responsible for calling the Rune function, and I reran the experiment with just a loop running an empty future, albeit with timing and statistics code still enabled:

// Executes a single iteration of a workload.
// This should be idempotent –
// the generated action should be a function of the iteration number.
// Returns the end time of the query.
pub async fn run(&self, iteration: i64) -> Result<Instant, LatteError> {
    let start_time = Instant::now();
    let session = SessionRef::new(&self.session);
    // let result = self
    //     .program
    //     .async_call(self.function, (session, iteration))
    //     .await
    //     .map(|_| ()); // erase Value, because Value is !Send
    let end_time = Instant::now();
    let mut state = self.state.try_lock().unwrap();
    state.fn_stats.operation_completed(end_time - start_time);
    // ... 
    Ok(end_time)   
}

That scaled fine to over 100M calls per second on 48 threads! So the problem must be somewhere below the Program::async_call function:

// Compiled workload program
pub struct Program {
    sources: Sources,
    context: Arc<RuntimeContext>, 
    unit: Arc<Unit>,
}

// Executes given async function with args.
// If execution fails, emits diagnostic messages, e.g. stacktrace to standard error stream.
// Also signals an error if the function execution succeeds, but the function returns
// an error value.    
pub async fn async_call(
    &self,
    fun: FnRef,
    args: impl Args + Send,
) -> Result<Value, LatteError> {
    let handle_err = |e: VmError| {
        let mut out = StandardStream::stderr(ColorChoice::Auto);
        let _ = e.emit(&mut out, &self.sources);
        LatteError::ScriptExecError(fun.name, e)
    };
    let execution = self.vm().send_execute(fun.hash, args).map_err(handle_err)?;
    let result = execution.async_complete().await.map_err(handle_err)?;
    self.convert_error(fun.name, result)
}

// Initializes a fresh virtual machine needed to execute this program.
// This is extremely lightweight.
fn vm(&self) -> Vm {
    Vm::new(self.context.clone(), self.unit.clone())
}

The async_call function does a few things:

  • it prepares a fresh Rune VM – this is supposed to be a very lightweight operation that basically prepares a fresh stack; the VMs are not shared between calls nor threads so they can run totally independently
  • it invokes a function by passing its identifier and parameters
  • finally it receives the result and converts some errors; we can safely assume that in an empty benchmark this is a no-op

My next idea was to just remove the send_execute and async_complete calls and leave just the VM preparation. So basically I wanted to benchmark that line:

Vm::new(self.context.clone(), self.unit.clone())

The code looks fairly innocent. No locks, no mutexes, no syscalls, no shared mutable data here. There are some read-only structures context and unit shared behind an Arc, but read-only sharing shouldn’t be a problem.

VM::new is also trivial:

impl Vm {

    // Construct a new virtual machine.
    pub const fn new(context: Arc<RuntimeContext>, unit: Arc<Unit>) -> Self {
        Self::with_stack(context, unit, Stack::new())
    }

    // Construct a new virtual machine with a custom stack.
    pub const fn with_stack(context: Arc<RuntimeContext>, unit: Arc<Unit>, stack: Stack) -> Self {
        Self {
            context,
            unit,
            ip: 0,
            stack,
            call_frames: vec::Vec::new(),
        }
    }

However, not matter how innocent the code looks, I like to double check my assumptions. I ran that with different numbers of threads and, although it was now faster than before, it didn’t scale at all again – it hit a throughput ceiling of about 4 million calls per second!

The problem

Although at first it doesn’t look like there is any sharing of mutable data in the code above, actually there is something slightly hidden that’s shared and mutated: the Arc reference counters themselves. Those counters are shared between all the invocations, performed from many threads, and they are the source of the congestion here.

Some may argue that atomically increasing or decreasing a shared atomic counter shouldn’t be a problem because those are “lockless” operations. They even translate to single assembly instructions (e.g. lock xadd)! If something is a single assembly instruction, it is not slow, isn’t it? That reasoning is unfortunately flawed.

The root of the problem is not really the computation, but the cost of maintaining the shared state.

The amount of time required to read or write data is mostly influenced by how far the CPU core needs to reach out for the data. Here are the typical latencies for the Intel Haswell Xeon CPUs according to this site:

  • L1 cache: 4 cycles
  • L2 cache: 12 cycles
  • L3 cache: 43 cycles
  • RAM: 62 cycles + 100 ns

L1 and L2 caches are typically local to a core (L2 may be shared by two cores). L3 cache is shared by all cores of a CPU. There is also a direct interconnect between L3 caches of different processors on the main board for managing L3 cache coherency, so L3 is logically shared between all processors.

As long as you don’t update the cache line and only read it from multiple threads, the line will be loaded by multiple cores and marked as shared. It is likely that frequent accesses to such data would be served from L1 cache, which is very fast. Therefore sharing read-only data is perfectly fine and scales well. Even using atomics for only reading will be plenty fast in that case.

However, once we introduce updates to the shared cache line, things start to complicate. The x86-amd64 architecture has coherent data caches. This means basically that what you write on one core, you can read back on another one. It is not possible to store a cache line with conflicting data in multiple cores. Once a thread decides to update a shared cache line, that line gets invalidated on all the other cores, so subsequent loads on those cores would have to fetch the data from at least L3. That is obviously a lot slower, and even slower if there are more processors than one on the main board.

The fact that our reference counters are atomic is an additional problem that makes things even more complex for the processor. Although using atomic instructions is often referred to as “lockless programming”, this is slightly misleading – in fact, atomic operations require some locking to happen at the hardware level. This locking is very fine-grained and cheap as long as there is no congestion, but as usual with locking, you may expect very poor performance if many things try to fight for the same lock at the same time. And it is of course much worse if those “things” are whole CPUs and not just single cores that are close to each other.

The fix

The obvious fix is to avoid sharing the reference counters. Latte has a very simple, hierarchical lifecycle structure, so all those Arc updates looked like an overkill to me and they could probably be replaced with simpler references and Rust lifetimes. However, this is easier said than done. Unfortunately Rune requires the references to the Unit and RuntimeContext to be passed wrapped in Arc for managing the lifetime (in probably more complex scenarios) and it also uses some Arc-wrapped values internally as part of those structures. Rewriting Rune just for my tiny use case was out of the question.

Therefore the Arc had to stay. Instead of using a single Arc value we can use one Arc per thread. That requires also separating the Unit and RuntimeContext values, so each thread would get their own. As a side effect, this guarantees there is no sharing at all, so even if Rune clones an Arc stored internally as a part of those values, that problem would be also fixed. The downside of this solution is higher memory use. Fortunately . Latte workload scripts are usually tiny, so higher memory use is likely not a big problem.

To be able to use separate Unit and RuntimeContext I submitted a patch to Rune to make them Clone. Then, on the Latte side, the whole fix was actually introducing a new function for “deep” cloning the Program struct and then making sure each thread gets its own copy:

    // Makes a deep copy of context and unit.
    // Calling this method instead of `clone` ensures that Rune runtime structures
    // are separate and can be moved to different CPU cores efficiently without accidental
    // sharing of Arc references.
    fn unshare(&self) -> Program {
        Program {
            sources: self.sources.clone(),
            context: Arc::new(self.context.as_ref().clone()),   // clones the value under Arc and wraps it in a new counter
            unit: Arc::new(self.unit.as_ref().clone()),         // clones the value under Arc and wraps it in a new counter
        }
    }

BTW: The sources field is not used during the execution, except for emitting diagnostics, so it could be left shared.

Note that the original line where I originally found the slowdown did not need any changes!

Vm::new(self.context.clone(), self.unit.clone())

This is because self.context and self.unit are not shared between threads anymore. Atomic updates to non-shared counters are fortunately very fast.

Final results

umair-akbar-image 7 1024x867 - How a Single Line of Code Made a 24-core Server Slower Than a Laptop
how a single line of code made a 24-core server slower than a laptop

Takeaways

  • The cost of a shared Arc might be absurdly high on some hardware configurations if it is updated frequently on many threads.
  • Don’t assume that a single assembly instruction cannot become a performance problem.
  • Don’t assume that if something scales fine on a single-CPU computer, it would still scale on a multi-CPU computer.