Barrett reduction for polynomials

Let F denote some field (e.g. the real numbers, or integers modulo some prime), and let F[x] denote the ring of polynomials over that field, and let P, S, T denote elements of that polynomial ring. Polynomial division can then be defined; division of one polynomial by another yields a (unique) quotient and a (unique) remainder. Following the Python 3 notation where // denotes floor division, let S // P denote the quotient obtained from dividing S by P. The key property of Barrett reduction is then:

Provided that deg(S // T) ≤ 0, S // P equals (S * (T // P)) // T.

This lets us turn division by P into multiplication by T // P followed by division by T. Said division is trivial if T is chosen to be xn (with n ≥ deg(S) to make the precondition hold), and T // P can be precomputed if P is fixed.

Proving that this property is true turns out to be surprisingly elusive; all the good expositions on the internet are about Barrett reduction for integers rather than for polynomials. Accordingly, I'm providing a proof in this post. The proof proceeds in four parts:

  1. Defining / (true division rather than floor division) for (extended) polynomials, and defining term-slicing P[j:k] for (extended) polynomials.
  2. Showing that S // P equals (S * (T / P)) // T.
  3. Showing that (S*(T/P))//T equals (S*(T//P))//T plus (S*(T/P)[-∞:0])//T.
  4. Showing that (S*(T/P)[-∞:0])//T equals zero. This final part is the only part which requires the precondition of deg(S // T) ≤ 0.

These four parts combine in the obvious way to prove the overall reduction property.

Part 1: Definitions

P being an element of the polynomial ring F[x] just means that P equals sum(pi * xi), where i runs over some finite set of non-negative integers, and each pi is a non-zero element of F. Note that deg(P) denotes the maximum such i from this set (or -∞ if this set is empty).

Extended polynomials can be defined similarly; P equals sum(pi * xi), where i runs over some (possibly infinite) set of (possibly negative) integers, and each pi is a non-zero element of F. The requirement on pi being non-zero can also be dropped, provided that deg(P) is redefined as the maximum i such that pi is non-zero. As described, deg(P) could be +∞, but we'll restrict ourselves to deg(P) being finite (or being -∞ for the zero polynomial).

Every polynomial is trivially an extended polynomial. Going the other way is harder; an extended polynomial can only be trivially turned into a polynomial when it has a finite number of terms and has no xi terms with negative i.

It is also useful to define term-slicing; given P and integer j and integer k, P[j:k] denotes sum(pi * xi) restricted to just j ≤ i < k. As a shorthand, P[j] denotes P[j:j+1]. It is trivially true that P equals P[-∞:j] plus P[j:+∞].

Addition and multiplication of extended polynomials follows that of normal polynomials. Division is where things get interesting; the whole point of introducing extended polynomials is to allow the definition of true division (/) for extended polynomials, which is just like floor division (//) for normal polynomials, except that it produces a quotient with an infinite number of (negative i) terms (and no remainder), rather than terminating with a finite quotient and a remainder. For comparison, the division algorithms are:

def floor_division(S, P):
    Q = 0
    while S != 0 and deg(S) >= deg(P):
        T = S[deg(S)] / P[deg(P)]
        Q = Q + T
        S = S - T * P
    return Q, S
def true_division(S, P):
    Q = 0
    while S != 0:
        T = S[deg(S)] / P[deg(P)]
        Q = Q + T
        S = S - T * P
    return Q

Note that as written true_division often never terminates. For actual computation, it needs to be re-expressed as a generator which yields terms of Q in descending i order (and then only a finite number of terms used). Both algorithms involve division of monomonials, which is just division of the field elements and subtraction of the x powers.

Given the similarity in algorithms, it is clear that S // P equals (S / P)[0:+∞]. Various other properties hold; most notably (S / P) // T equals S // (P * T).

One benefit of true division over floor division is that true division is associative with multiplication; that is (S / P) * T equals S / (P * T). Accordingly, S / P * T can expressed without any parentheses. Additionally, it commutes with multiplication, so S / P * T equals S * T / P, and it commutes with itself in so far as S / P / T equals S / T / P.

Part 2: S // P equals (S * (T / P)) // T

This is just algebraic manipulation:

S // P == (S      / P    ) // 1
       == (S *  T / P / T) // 1
       == (S *  T / P    ) // T
       == (S * (T / P)   ) // T

Part 3: (S*(T/P))//T equals (S*(T//P))//T plus (S*(T/P)[-∞:0])//T

This involves term-slicing on T / P, splitting it into (T / P)[-∞:0] plus (T / P)[0:+∞], then noting that the latter is T // P:

(S*(T/P))//T == ( S*((T /P)[0:+∞] +    (T/P)[-∞:0]) )//T
             == ( S*((T//P)       +    (T/P)[-∞:0]) )//T
             ==  (S* (T//P))//T   + (S*(T/P)[-∞:0] )//T

Part 4: (S*(T/P)[-∞:0])//T equals zero

Start with some algebraic manipulation:

(S*(T/P)[-∞:0])//T == ( S    * (T/P)[-∞:0]  )//T
                   == ( S    * (T/P)[-∞:0]/T)//1
                   == ((S/T) * (T/P)[-∞:0]  )//1
                   == ((S/T) * (T/P)[-∞:0]  )[0:+∞]

As deg(S // T) ≤ 0 (by hypothesis), it is also the case that deg(S / T) ≤ 0. Clearly also deg((T/P)[-∞:0]) < 0 by the nature of term-slicing. Hence the deg of their product is also less than zero (as multiplication of polynomials sums their degrees). Hence the [0:+∞] term-slicing results in no terms — or in other words, results in the zero polynomial.

Faster CRC32-C on x86

Computing the CRC-32C of a 4KB buffer on a modern x86 chip can be done as a simple loop around the crc32 instruction:

uint32_t crc32_4k(uint32_t acc, char* buf) {
    for (char* end = buf + 4096; buf < end; buf += 8) {
        acc = _mm_crc32_u64(acc, *(uint64_t*)buf);
    }
    return acc;
}

Note that this is bit-reflected CRC32-C with polynomial 0x11EDC6F41, which is different to the zlib CRC (polynomial 0x104C11DB7). Furthermore, there is no bitwise-inverse here; some texts apply ~ to acc on input and again on output, as in:

uint32_t crc32_4k_inverse(uint32_t acc, char* buf) {
    return ~crc32_4k(~acc, buf);
}

The loop within crc32_4k is fine, but leaves a lot of performance on the table: the underlying crc32 instruction has latency 3 and throughput 1 on every Intel chip since its introduction in Nehalem. To fully hide the latency, three independent CRC streams need to be calculated in parallel. Some mathematical tricks allow us to chop the 4KB buffer into multiple chunks, compute the CRC of each chunks independently, and then merge the results together. For a good exposition of the tricks, see Option 12 and Option 13 of komrad36/CRC. As covered in that exposition, it can be convenient to chop into four chunks rather than three, with the fourth being exactly 8 bytes long. This leads to the following function:

uint32_t crc32_4k_three_way(uint32_t acc_a, char* buf) {
    // Four chunks:
    //  Chunk A: 1360 bytes from 0 through 1360
    //  Chunk B: 1360 bytes from 1360 through 2720
    //  Chunk C: 1368 bytes from 2720 through 4088
    //  Chunk D: 8 bytes from 4088 through 4096
    uint32_t acc_b = 0;
    uint32_t acc_c = 0;
    for (char* end = buf + 1360; buf < end; buf += 8) {
        acc_a = _mm_crc32_u64(acc_a, *(uint64_t*)buf);
        acc_b = _mm_crc32_u64(acc_b, *(uint64_t*)(buf + 1360));
        acc_c = _mm_crc32_u64(acc_c, *(uint64_t*)(buf + 1360*2));
    }
    // Merge together A and B, leaving space for C+D
    // kA == magic((1360+1368+8)*8-33)
    // kB == magic((     1368+8)*8-33)
    __m128i kAkB = _mm_setr_epi32(/*kA*/ 0x8A074012, 0, /*kB*/ 0x93E106A4, 0);
    __m128i vec_a = _mm_clmulepi64_si128(_mm_cvtsi32_si128(acc_a), kAkB, 0x00);
    __m128i vec_b = _mm_clmulepi64_si128(_mm_cvtsi32_si128(acc_b), kAkB, 0x10);
    uint64_t ab = _mm_cvtsi128_si64(_mm_xor_si128(vec_a, vec_b));
    // Final 8 bytes of C
    acc_c = _mm_crc32_u64(acc_c, *(uint64_t*)(buf + 1360*2));
    // Merge together C, AB, and D
    return _mm_crc32_u64(acc_c, ab ^ *(uint64_t*)(buf + 1360*2 + 8));
}

The main loop of crc32_4k_three_way contains three times as much code as crc32_4k, and runs approximately three times faster. The magic happens in the merging step, with literal magic numbers. The magic numbers can be generated with the following function, which computes the CRC of a 1 bit followed by n zero bits:

uint32_t magic(uint32_t n) {
    uint32_t crc = ((uint32_t)1) << (31 - (n & 31));
    if (n & 32) crc = _mm_crc32_u32(crc, 0);
    for (n >>= 6; n; --n) crc = _mm_crc32_u64(crc, 0);
    return crc;
}

Read the aforementioned exposition for details, but the quick summary is that merging step for a given chunk requires a magic number corresponding to how many bits appear in subsequent chunks. Chunks C/D doesn't require any magic, as there are no subsequent chunks. Chunk B has 1368+8 bytes in subsequent chunks, hence magic((1368+8)*8-33), and chunk A has 1360+1368+8 bytes in subsequent chunks, hence magic((1360+1368+8)*8-33). The -33 is actually -32-1: -32 to counter the shift performed by the final _mm_crc32_u64, and -1 to counter the bit-reversed inputs/outputs to _mm_clmulepi64_si128.

There is also a completely different way of computing CRC-32C (or any other 32-bit CRC), based around the pclmulqdq instruction (which is what _mm_clmulepi64_si128 compiles to). The canonical exposition for this is Intel's whitepaper, and the following code is inspired by Chromium's implementation of said whitepaper:

uint32_t crc32_4k_pclmulqdq(uint32_t acc, char* buf) {
    // First block of 64 is easy.
    __m128i x1 = _mm_loadu_si128((__m128i*)buf);
    __m128i x2 = _mm_loadu_si128((__m128i*)(buf + 16));
    __m128i x3 = _mm_loadu_si128((__m128i*)(buf + 32));
    __m128i x4 = _mm_loadu_si128((__m128i*)(buf + 48));
    x1 = _mm_xor_si128(_mm_cvtsi32_si128(acc), x1);
    // Parallel fold remaining blocks of 64.
    // k1 == magic(4*128+32-1)
    // k2 == magic(4*128-32-1)
    __m128i k1k2 = _mm_setr_epi32(/*k1*/ 0x740EEF02, 0, /*k2*/ 0x9E4ADDF8, 0);
    char* end = buf + 4096 - 64;
    do {
        __m128i x5 = _mm_clmulepi64_si128(x1, k1k2, 0x00);
        x1 = _mm_clmulepi64_si128(x1, k1k2, 0x11);
        __m128i x6 = _mm_clmulepi64_si128(x2, k1k2, 0x00);
        x2 = _mm_clmulepi64_si128(x2, k1k2, 0x11);
        __m128i x7 = _mm_clmulepi64_si128(x3, k1k2, 0x00);
        x3 = _mm_clmulepi64_si128(x3, k1k2, 0x11);
        __m128i x8 = _mm_clmulepi64_si128(x4, k1k2, 0x00);
        x4 = _mm_clmulepi64_si128(x4, k1k2, 0x11);
        x5 = _mm_xor_si128(x5, _mm_loadu_si128((__m128i*)(buf + 64)));
        x1 = _mm_xor_si128(x1, x5);
        x6 = _mm_xor_si128(x6, _mm_loadu_si128((__m128i*)(buf + 80)));
        x2 = _mm_xor_si128(x2, x6);
        x7 = _mm_xor_si128(x7, _mm_loadu_si128((__m128i*)(buf + 96)));
        x3 = _mm_xor_si128(x3, x7);
        x8 = _mm_xor_si128(x8, _mm_loadu_si128((__m128i*)(buf + 112)));
        x4 = _mm_xor_si128(x4, x8);
        buf += 64;
    } while (buf < end);
    // Fold together the four parallel streams into one.
    // k3 == magic(128+32-1)
    // k4 == magic(128-32-1)
    __m128i k3k4 = _mm_setr_epi32(/*k3*/ 0xF20C0DFE, 0, /*k4*/ 0x493C7D27, 0);
    __m128i x5 = _mm_clmulepi64_si128(x1, k3k4, 0x00);
    x1 = _mm_clmulepi64_si128(x1, k3k4, 0x11);
    x5 = _mm_xor_si128(x5, x2);
    x1 = _mm_xor_si128(x1, x5);
    x5 = _mm_clmulepi64_si128(x3, k3k4, 0x00);
    x3 = _mm_clmulepi64_si128(x3, k3k4, 0x11);
    x5 = _mm_xor_si128(x5, x4);
    x3 = _mm_xor_si128(x3, x5);
    // k5 == magic(2*128+32-1)
    // k6 == magic(2*128-32-1)
    __m128i k5k6 = _mm_setr_epi32(/*k5*/ 0x3DA6D0CB, 0, /*k6*/ 0xBA4FC28E, 0);
    x5 = _mm_clmulepi64_si128(x1, k5k6, 0x00);
    x1 = _mm_clmulepi64_si128(x1, k5k6, 0x11);
    x5 = _mm_xor_si128(x5, x3);
    x1 = _mm_xor_si128(x1, x5);
    // Apply missing <<32 and fold down to 32-bits.
    acc = _mm_crc32_u64(0, _mm_extract_epi64(x1, 0));
    return _mm_crc32_u64(acc, _mm_extract_epi64(x1, 1));
}

Note that crc32_4k_three_way consists of lots of _mm_crc32_u64 calls followed by a few _mm_clmulepi64_si128 calls, whereas crc32_4k_pclmulqdq is the opposite: lots of _mm_clmulepi64_si128 calls followed by a few _mm_crc32_u64 calls. This is very convenient, as on all Intel chips since Broadwell, _mm_clmulepi64_si128 has throughput 1, and executes on a separate port to _mm_crc32_u64. This means that we should be able to issue one _mm_clmulepi64_si128 and one _mm_crc32_u64 per cycle, thus doubling our speed. The latency of _mm_clmulepi64_si128 has varied slightly over time; 5 cycles on Broadwell, 7 cycles on Skylake and Coffee Lake and Cannon Lake, then 6 cycles on Ice Lake. In the best case (5 cycles), the main loop of crc32_4k_pclmulqdq consumes 64 bytes every 7 cycles, and in the worst case (7 cycles), the main loop of crc32_4k_pclmulqdq consumes 64 bytes every 9 cycles. In contrast, the main loop of crc32_4k_three_way consumes 24 bytes every 3 cycles, which is 72 bytes every 9 cycles. By combining the two main loops, we should be able to consume 728 bytes every 9 cycles. Splitting 4096 into two pieces in a 72:64 ratio gives 2168.47:1927.53, which we'll round to 2176:1920. The _mm_clmulepi64_si128 main loop can work through the 1920, while the crc32_4k_three_way main loop works through the 2176 (as 728+728+720). The resultant code is an unholy fusion of crc32_4k_three_way and crc32_4k_pclmulqdq, which interleaves the code from both:

uint32_t crc32_4k_fusion(uint32_t acc_a, char* buf) {
    // Four chunks:
    //  Chunk A: 728 bytes from 0 through 728
    //  Chunk B: 728 bytes from 728 through 1456
    //  Chunk C: 720 bytes from 1456 through 2176
    //  Chunk D: 1920 bytes from 2176 through 4096
    // First block of 64 from D is easy.
    char* buf2 = buf + 2176;
    __m128i x1 = _mm_loadu_si128((__m128i*)buf2);
    __m128i x2 = _mm_loadu_si128((__m128i*)(buf2 + 16));
    __m128i x3 = _mm_loadu_si128((__m128i*)(buf2 + 32));
    __m128i x4 = _mm_loadu_si128((__m128i*)(buf2 + 48));
    uint32_t acc_b = 0;
    uint32_t acc_c = 0;
    // Parallel fold remaining blocks of 64 from D, and 24 from each of A/B/C.
    // k1 == magic(4*128+32-1)
    // k2 == magic(4*128-32-1)
    __m128i k1k2 = _mm_setr_epi32(/*k1*/ 0x740EEF02, 0, /*k2*/ 0x9E4ADDF8, 0);
    char* end = buf + 4096 - 64;
    do {
        acc_a = _mm_crc32_u64(acc_a, *(uint64_t*)buf);
        __m128i x5 = _mm_clmulepi64_si128(x1, k1k2, 0x00);
        acc_b = _mm_crc32_u64(acc_b, *(uint64_t*)(buf + 728));
        x1 = _mm_clmulepi64_si128(x1, k1k2, 0x11);
        acc_c = _mm_crc32_u64(acc_c, *(uint64_t*)(buf + 728*2));
        __m128i x6 = _mm_clmulepi64_si128(x2, k1k2, 0x00);
        acc_a = _mm_crc32_u64(acc_a, *(uint64_t*)(buf + 8));
        x2 = _mm_clmulepi64_si128(x2, k1k2, 0x11);
        acc_b = _mm_crc32_u64(acc_b, *(uint64_t*)(buf + 728 + 8));
        __m128i x7 = _mm_clmulepi64_si128(x3, k1k2, 0x00);
        acc_c = _mm_crc32_u64(acc_c, *(uint64_t*)(buf + 728*2 + 8));
        x3 = _mm_clmulepi64_si128(x3, k1k2, 0x11);
        acc_a = _mm_crc32_u64(acc_a, *(uint64_t*)(buf + 16));
        __m128i x8 = _mm_clmulepi64_si128(x4, k1k2, 0x00);
        acc_b = _mm_crc32_u64(acc_b, *(uint64_t*)(buf + 728 + 16));
        x4 = _mm_clmulepi64_si128(x4, k1k2, 0x11);
        acc_c = _mm_crc32_u64(acc_c, *(uint64_t*)(buf + 728*2 + 16));
        x5 = _mm_xor_si128(x5, _mm_loadu_si128((__m128i*)(buf2 + 64)));
        x1 = _mm_xor_si128(x1, x5);
        x6 = _mm_xor_si128(x6, _mm_loadu_si128((__m128i*)(buf2 + 80)));
        x2 = _mm_xor_si128(x2, x6);
        x7 = _mm_xor_si128(x7, _mm_loadu_si128((__m128i*)(buf2 + 96)));
        x3 = _mm_xor_si128(x3, x7);
        x8 = _mm_xor_si128(x8, _mm_loadu_si128((__m128i*)(buf2 + 112)));
        x4 = _mm_xor_si128(x4, x8);
        buf2 += 64;
        buf += 24;
    } while (buf2 < end);
    // Next 24 bytes from A/B/C, and 8 more from A/B, then merge A/B/C.
    // Meanwhile, fold together D's four parallel streams.
    // k3 == magic(128+32-1)
    // k4 == magic(128-32-1)
    __m128i k3k4 = _mm_setr_epi32(/*k3*/ 0xF20C0DFE, 0, /*k4*/ 0x493C7D27, 0);
    acc_a = _mm_crc32_u64(acc_a, *(uint64_t*)buf);
    __m128i x5 = _mm_clmulepi64_si128(x1, k3k4, 0x00);
    acc_b = _mm_crc32_u64(acc_b, *(uint64_t*)(buf + 728));
    x1 = _mm_clmulepi64_si128(x1, k3k4, 0x11);
    acc_c = _mm_crc32_u64(acc_c, *(uint64_t*)(buf + 728*2));
    __m128i x6 = _mm_clmulepi64_si128(x3, k3k4, 0x00);
    acc_a = _mm_crc32_u64(acc_a, *(uint64_t*)(buf + 8));
    x3 = _mm_clmulepi64_si128(x3, k3k4, 0x11);
    acc_b = _mm_crc32_u64(acc_b, *(uint64_t*)(buf + 728 + 8));
    acc_c = _mm_crc32_u64(acc_c, *(uint64_t*)(buf + 728*2 + 8));
    acc_a = _mm_crc32_u64(acc_a, *(uint64_t*)(buf + 16));
    acc_b = _mm_crc32_u64(acc_b, *(uint64_t*)(buf + 728 + 16));
    x5 = _mm_xor_si128(x5, x2);
    acc_c = _mm_crc32_u64(acc_c, *(uint64_t*)(buf + 728*2 + 16));
    x1 = _mm_xor_si128(x1, x5);
    acc_a = _mm_crc32_u64(acc_a, *(uint64_t*)(buf + 24));
    // k5 == magic(2*128+32-1)
    // k6 == magic(2*128-32-1)
    __m128i k5k6 = _mm_setr_epi32(/*k5*/ 0x3DA6D0CB, 0, /*k6*/ 0xBA4FC28E, 0);
    x6 = _mm_xor_si128(x6, x4);
    x3 = _mm_xor_si128(x3, x6);
    x5 = _mm_clmulepi64_si128(x1, k5k6, 0x00);
    acc_b = _mm_crc32_u64(acc_b, *(uint64_t*)(buf + 728 + 24));
    x1 = _mm_clmulepi64_si128(x1, k5k6, 0x11);
    // kC == magic((        1920)*8-33)
    __m128i kCk0 = _mm_setr_epi32(/*kC*/ 0xF48642E9, 0, 0, 0);
    __m128i vec_c = _mm_clmulepi64_si128(_mm_cvtsi32_si128(acc_c), kCk0, 0x00);
    // kB == magic((    720+1920)*8-33)
    // kA == magic((728+720+1920)*8-33)
    __m128i kAkB = _mm_setr_epi32(/*kA*/ 0x155AD968, 0, /*kB*/ 0x2E7D11A7, 0);
    __m128i vec_a = _mm_clmulepi64_si128(_mm_cvtsi32_si128(acc_a), kAkB, 0x00);
    __m128i vec_b = _mm_clmulepi64_si128(_mm_cvtsi32_si128(acc_b), kAkB, 0x10);
    x5 = _mm_xor_si128(x5, x3);
    x1 = _mm_xor_si128(x1, x5);
    uint64_t abc = _mm_cvtsi128_si64(_mm_xor_si128(_mm_xor_si128(vec_c, vec_a), vec_b));
    // Apply missing <<32 and fold down to 32-bits.
    uint32_t crc = _mm_crc32_u64(0, _mm_extract_epi64(x1, 0));
    crc = _mm_crc32_u64(crc, abc ^ _mm_extract_epi64(x1, 1));
    return crc;
}

To give an idea of relative performance, I ran some very crude benchmarks of every function on a range of servers. The resultant gigabytes per second (GB/s) measurements are given below. As the servers have different clock speeds (both advertised and burst), I've assumed crc32_4k runs at 21.33 bits per cycle (b/c) on each, and used this to translate the other GB/s measurements to b/c.

                    Broadwell @ 3.00GHz  Skylake @ 3.20GHz    Ice Lake @ 2.40GHz
          crc32_4k  10.19 GB/s 21.3 b/c  12.64 GB/s 21.3 b/c  11.59 GB/s 21.3 b/c
crc32_4k_pclmulqdq  23.32 GB/s 48.8 b/c  28.12 GB/s 47.5 b/c  27.45 GB/s 50.5 b/c
crc32_4k_three_way  26.99 GB/s 56.5 b/c  33.26 GB/s 56.1 b/c  29.18 GB/s 53.7 b/c
   crc32_4k_fusion  44.44 GB/s 93.0 b/c  55.65 GB/s 93.9 b/c  50.36 GB/s 92.7 b/c

PS. If running on ARM rather than Intel, see dougallj's Faster CRC32 on the Apple M1.

Update: See https://github.com/corsix/fast-crc32/.

Seeing the raft for the trees

The Raft distributed consensus protocol is normally seen as having an array of log entries. There is however a generalisation of Raft which replaces the array of log entries with a tree of nodes, and then (standard) Raft can be seen as a very simple tree that contains no branches. The resultant protocol can be called Tree Raft, and is very similar in spirit to Chained Raft. This post describes Tree Raft and contrasts it to standard Raft.

In standard Raft, the log is an array, indexed by uint64_t index, with each array element being a log entry:

struct LogEntry {
  uint64_t term;
  string contents;
};

In Tree Raft, the log is a tree. The tree can be stored in a map/dictionary whose key type is NodeRef and value type is Node:

struct NodeRef {
  uint64_t index;
  uint64_t term;
};
struct Node {
  string contents;
  NodeRef parent;
};

There are two restrictions on the parent field: if Node n has key (i, t) then n.parent.index == i - 1 and n.parent.term <= t. This first restriction allows an optimisation: parent.index does not need to be stored; only parent.term needs to be stored.

There are two obvious orderings that can be defined on NodeRef: by ascending index then ascending term (lexicographic), and by ascending term then ascending index (transposed lexicographic). Both orderings turn out to be useful in different places. Note that a node comes after its parent in both orderings.

As an optimisation, a Node and (some number of) its transitive parents can be represented as an array of LogEntry plus the parent term of the oldest Node. This optimised representation is the only representation in standard Raft, and it appears twice: the per-server log takes this form (with the parent term of the oldest Node being lastTerm in the InstallSnapshot RPC), and the AppendEntries RPC takes this form (with the parent term of the oldest Node being prevLogTerm). A Tree Raft implementation could use this optimised form for committed nodes, and only use the more expensive map/dictionary representation for uncommitted nodes.

Note that a NodeRef identifies a single Node, but because every node identifies its parent, a NodeRef implicitly identifies an entire chain of nodes. This is similar to a commit hash in git: a hash identifies a single commit, but a commit references its parent commit(s), and thus a single hash identifies an entire graph of commits leading to that point. Standard Raft describes this as the Log Matching property: if two logs contain an entry with the same index and term, then the logs are identical in all entries up through the given index.

A Tree Raft server maintains two cursors into the tree: NodeRef commit and NodeRef head. The NodeRef commit cursor replaces the uint64_t commitIndex from standard Raft. The NodeRef head cursor replaces "last log entry" everywhere that it comes up: the RequestVote RPC contains the candidate's NodeRef head, and other servers are willing to vote for candidates whose NodeRef head is greater than or equal to theirs (using transposed lexicographic order). The leader is able to create a new node by using the key (head.index + 1, currentTerm) and then setting its head to this new node. For every follower, there will be a path in the tree from the follower's NodeRef head to the leader's NodeRef head, and one objective of a follower is to move its head cursor along this path toward the leader's head cursor (note that the path might involve going up the tree before going back down). Similarly, for every follower, there will be a path in the tree from the follower's NodeRef commit to the leader's NodeRef commit, and another objective of a follower is to move its commit cursor along this path toward the leader's commit cursor (crucially, this path is always downwards and never backtracks). The leader is responsible for moving its commit cursor toward its head cursor when it is safe to do so, and the safety constraint here is very similar to standard Raft: a strict majority of servers must have their head.term equal to the leader's currentTerm, and within this majority, the minimum head.index gives the safe commit point. On every server, the commit cursor will either be equal to the head cursor, or be an ancestor of the head cursor. This permits an optimisation: only the index of the commit cursor needs to be stored, as its term can be reconstructed by finding the ancestor of head with the given index. Standard Raft contains this optimisation, as it only stores uint64_t commitIndex. It can be seen that standard Raft is just Tree Raft where a server stores the chain of nodes between the root and its head (a very simple non-branching tree) as its log, and does not store any other nodes.

Tree Raft replaces the AppendEntries RPC with an AddNodes RPC. The prevLogIndex, prevLogTerm, entries[], and leaderCommit arguments are replaced by the leader's NodeRef head, pair<NodeRef, Node> nodes[], and the leader's NodeRef commit. Note that these arguments can be optimised somewhat (only including commit.index and not commit.term, using the array of LogEntry representation for nodes, not including head if the last nodes entry is the head, not including the term of nodes when equal to the leader's currentTerm, and so forth). After processing the RPC, followers respond with their currentTerm and their (possibly moved) NodeRef head. The receiver implementation is:

  1. If term < currentTerm, finish processing.
  2. Add all the nodes to your map.
  3. If there is a path from your NodeRef head to the leader's NodeRef head, set your head to the leader's.
  4. If there is a path from your NodeRef commit to the leader's NodeRef commit, and the leader's commit either equals your head or is an ancestor of your head, set your commit to the leader's (this can be deferred until after the response has been sent, and the movement can be done one step at a time).

Note that if a follower misses an AddNodes RPC (due to packet loss, or leader failure, or late startup), then the nodes added in step 2 might not have a path back to the root; that is, there'll be a node in the map whose parent is not in the map. In this case, steps 3 and 4 will not be able to find a path to the new nodes. In a departure from standard Raft, in Tree Raft, followers are responsible for identifying these missing parent nodes and issuing Replay RPCs (against a randomly chosen server) in an attempt to obtain them. Once obtained, steps 3 and 4 can be retried. This has a number of nice properties: leaders no longer need to track nextIndex for each follower, the replay workload is shared between followers rather than being yet another task that the leader is responsible for, and the AddNodes RPC can be a UDP multicast to all followers. Furthermore, as nodes are not removed from the map as part of processing an AddNodes RPC, there are slightly nicer forward progress guarantees during leader changeover. As yet another nice property, if implementing the PreVote extension (which is necessary for liveness), then PreVote rejects can carry the (server's latest view of) the leader's currentTerm and NodeRef head and NodeRef commit, and then the receiver of the reject can treat this like an AddNodes and then replay the missing nodes (note that currentTerm is monotonic, and within a term, the leader's head and commit are also monotonic, hence stale views of this triplet can be arbitrated). Consequently, followers can keep up with the leader even when direct communication between the leader and the follower is not possible, avoiding the need for a long recovery process once direct communication is restored.

As nodes are not removed from the map as part of the AddNodes RPC, it is worth considering when nodes do get removed. A server cannot remove its head node, nor any ancestor of its head node (except when moving these nodes into a snapshot), but it is safe to remove any other node at any time. Standard Raft takes a very aggressive policy: when head moves backward (as part of moving it toward the common parent of the follower's head and the leader's head, i.e. moving it along the path in the tree from follower's head to leader's head), immediately remove everything that can be removed. A very lax policy is also possible: never remove nodes. A good middle ground is to remove during commitment: when committing a node, remove any other nodes which have the same index but a different term, and when removing a node, also remove its children (recursively). This fits very nicely with using the optimised array representation for committed nodes. If taking this middle ground, then it becomes attractive to store the uncommitted nodes in a sorted map, with nodes in lexicographic order, as then nodes with the same index can be found with a range scan, and the children of a node can also be found with a range scan (find all nodes with index + 1 using range scan, then filter based on their parent.term).

To efficiently answer "is there a path between node X and node Y" queries, it can be useful for each server to store an additional NodeRef furthest_ancestor field on every Node. Initially, furthest_ancestor is set to parent, but when it is discovered that n.furthest_ancestor exists in the map, n.furthest_ancestor is replaced by lookup(n.furthest_ancestor).furthest_ancestor (giving a union-find-like behaviour). If two nodes have the same furthest_ancestor then a path exists between them. As an optimisation, furthest_ancestor does not need to be stored for committed nodes, as it'll always refer to the root for them. Furthermore, furthest_ancestor precisely identifies the node which needs to be the target of a Replay RPC.

Incrementally snapshotable data structures

A fixed-length array (of length N) supports two O(1) operations (list A):

  1. Retrieve the element at index i (where 0 ≤ i < N).
  2. Set the element at index i (where 0 ≤ i < N) to some new value.

An incrementally snapshotable fixed-length array (of length N) supports two more O(1) operations (list A, continued):

  1. Take a snapshot of all N elements.
  2. Drain some of the snapshot to a sink.

Given the O(1) requirement, it shouldn't be surprising that operation A4 only drains some of the snapshot. It needs to be called O(N) times to fully drain the snapshot to the sink, but this is fine, as the entire snapshot of N elements is taken atomically as operation A3, and other operations can be performed between draining steps.

The only non-obvious part is how to implement operation A3 in O(1) time, as a naïve implementation would take O(N) time. To make A3 possible in O(1) time, some of the work needs to be pushed elsewhere (to A2 and A4), and a number of requirements need to be imposed (list B):

  1. At most one snapshot can exist at any time.
  2. Once a snapshot has been taken, it must be fully drained before the next snapshot is taken.
  3. The underlying data structure must be incrementally iterable (this is trivial for arrays).
  4. Given the iterator cursor from B3, and an element being mutated by operation A2, it needs to be possible to determine on which side of the cursor the element falls (also trivial for arrays).
  5. The sink must be willing to receive the snapshot contents in any order.
  6. The sink must be willing to receive snapshot contents at any time.
  7. An additional one bit of mutable storage must be available for each element in the data structure.

Some of these requirements (B1, B2, B5, and B6) can be relaxed slightly (at a cost elsewhere), but we'll go with the full set of requirements for now, as practical use-cases exist which are compatible with all of these requirements. Given all these requirements, the iteration cursor from B3 splits the underlying data structure into two pieces, which I'll call visited and pending. The additional one bit per element from B7 I'll call already-visited, and impose the invariant that already-visited is false for elements in the visited part of the data structure (whereas it can be either true or false for elements in the pending part). The list A operations are then implemented as (list C):

  1. Retrieve the element at index i (where 0 ≤ i < N): exactly the same as a fixed-length array without snapshots.
  2. Set the element at index i (where 0 ≤ i < N) to some new value: if element i is in the pending part of the data structure, and already-visited is false, then before setting the element, send the previous value of the element to the sink and set already-visited to true. After this, proceed exactly the same as a fixed-length array without snapshots.
  3. Take a snapshot of all N elements: Set the B3 iteration cursor to one end of the data structure, such that the entire structure is pending.
  4. Drain some of the snapshot to a sink: Advance the B3 iteration cursor by one element. If that element was already-visited, then set already-visited to false. Otherwise, send the element to the sink.

There is a choice in C3 as to whether to set the cursor to the very start of the structure, or to the very of end the structure. This then impacts C4: the iteration is either forwards (if starting from the start) or backwards (if starting from the end). Forwards iteration is more natural, but for data structures whose mutations often happen at the end, backwards iteration can reduce the number of already-visited elements, which can improve efficiency. Requirement B2 (once a snapshot has been taken, it must be fully drained before the next snapshot is taken) ensures that the cursor can be set to either of these points without having any simultaneously visited and already-visited elements, as C4 ensures that already-visited is false everywhere after the previous draining is complete.

C++ happens to give the name operator[] to both operation A1 (retrieve) and A2 (set), albeit with different const specifiers. This point of confusion aside, the implementation of a fixed-length array without snapshots is trivial:

template <typename T, size_t N>
struct fixed_length_array {
  T _elems[N];

  const T& operator[](size_t i) const { // A1
    assert(("index should be within bounds", i < N));
    return _elems[i];
  }
  T& operator[](size_t i) { // A2
    assert(("index should be within bounds", i < N));
    return _elems[i];
  }
};

It can be extended to an incrementally snapshotable fixed-length array like so:

template <typename T, size_t N, typename Sink>
struct fixed_length_array_with_snapshot : fixed_length_array<T, N> {
  bool _already_visited[N] = {};
  Sink _sink = nullptr;
  size_t _cursor = 0;

  const T& operator[](size_t i) const { // A1
    return fixed_length_array<T, N>::operator[](i);
  }
  T& operator[](size_t i) { // A2
    if (i < _cursor && !_already_visited[i]) {
      _already_visited[i] = true;
      (*_sink)[i] = this->_elems[i];
    }
    return fixed_length_array<T, N>::operator[](i); 
  }

  void take_snapshot(Sink sink) { // A3
    assert(("requirement B2", _sink == nullptr));
    assert(("sink should not be null", sink != nullptr));
    _sink = sink;
    _cursor = N;
  }
  bool drain_snapshot() { // A4
    assert(("snapshot not yet taken", _sink != nullptr));
    if (_cursor == 0) {
      _sink = nullptr;
      return true; // finished draining
    } else if (_already_visited[--_cursor]) {
      _already_visited[_cursor] = false;
      return false;
    } else {
      (*_sink)[_cursor] = this->_elems[_cursor];
      return false;
    }
  }
};

An example usage is:

fixed_length_array<int, 3> sink;
fixed_length_array_with_snapshot<int, 3, decltype(&sink)> arr;
arr[0] = 10;
arr[1] = 20;
arr[2] = 30;
arr.take_snapshot(&sink);
arr[1] = 25;
do {} while (arr.drain_snapshot() == false);
std::cout << arr[0] << " " << arr[1] << " " << arr[2] << "\n";
std::cout << sink[0] << " " << sink[1] << " " << sink[2] << "\n";

Requirement B6 (the sink must be willing to receive snapshot contents at any time) can be relaxed by introducing a buffer, with operation A2 pushing into this buffer when the sink is not willing to receive contents, and operation A4 popping from the buffer (if non-empty) rather than advancing the cursor. This gives a slightly different implementation:

template <typename T, size_t N>
struct fixed_length_array_with_snapshot : fixed_length_array<T, N> {
  bool _already_visited[N] = {};
  std::vector<std::pair<size_t, T>> _buffer;
  size_t _cursor = 0;

  const T& operator[](size_t i) const { // A1
    return fixed_length_array<T, N>::operator[](i);
  }
  T& operator[](size_t i) { // A2
    if (i < _cursor && !_already_visited[i]) {
      _already_visited[i] = true;
      _buffer.emplace_back(i, this->_elems[i]);
    }
    return fixed_length_array<T, N>::operator[](i); 
  }

  void take_snapshot() { // A3
    _cursor = N;
  }
  template <typename Sink>
  bool drain_snapshot(Sink sink) { // A4
    if (!_buffer.empty()) {
      (*sink)[_buffer.back().first] = _buffer.back().second;
      _buffer.pop_back();
      return false;
    } else if (_cursor == 0) {
      return true; // finished draining
    } else if (_already_visited[--_cursor]) {
      _already_visited[_cursor] = false;
      return false;
    } else {
      (*sink)[_cursor] = this->_elems[_cursor];
      return false;
    }
  }
};

And slightly different example usage:

fixed_length_array<int, 3> sink;
fixed_length_array_with_snapshot<int, 3> arr;
arr[0] = 10;
arr[1] = 20;
arr[2] = 30;
arr.take_snapshot();
arr[1] = 25;
do {} while (!arr.drain_snapshot(&sink));
std::cout << arr[0] << " " << arr[1] << " " << arr[2] << "\n";
std::cout << sink[0] << " " << sink[1] << " " << sink[2] << "\n";

From here, requirement B5 (the sink must be willing to receive the snapshot contents in any order) can be relaxed by changing the data structure of the buffer: rather than a first-in-last-out stack, use a tree (as in binary tree or btree) data structure, and have A4 perform dual iteration (using the same cursor) of the underlying data structure and the buffer.

To explore a different area of the implementation space, we can reinstate requirements B5 and B6, and instead relax requirement B2 (once a snapshot has been taken, it must be fully drained before the next snapshot is taken). For this, one option is to replace the one bit of mutable storage per element with a sequence number per element. The implementation looks like:

template <typename T, size_t N, typename Sink>
struct fixed_length_array_with_snapshot : fixed_length_array<T, N> {
  uint64_t _sequence[N] = {};
  Sink _sink = nullptr;
  uint64_t _snap_sequence = 0;
  size_t _cursor = 0;

  const T& operator[](size_t i) const { // A1
    return fixed_length_array<T, N>::operator[](i);
  }
  T& operator[](size_t i) { // A2
    if (i < _cursor && _sequence[i] < _snap_sequence) {
      _sequence[i] = _snap_sequence;
      (*_sink)[i] = this->_elems[i];
    }
    return fixed_length_array<T, N>::operator[](i); 
  }

  void take_snapshot(Sink sink) { // A3
    _sink = sink;
    _snap_sequence += 1;
    _cursor = N;
  }
  bool drain_snapshot() { // A4
    if (_cursor == 0) {
      return true; // finished draining
    } else if (_sequence[--_cursor] < _snap_sequence) {
      (*_sink)[_cursor] = this->_elems[_cursor];
    }
    return false;
  }
};

One advantage of this implementation is that drain_snapshot no longer needs to mutate anything except the cursor, which can be useful when mutation is expensive. It also provides a path to relaxing requirement B1 (at most one snapshot can exist at any time) slightly: to support a handful of snapshots at a time, each one needs its own _cursor, its own _snap_sequence, and its own _sink, and mutating operator[] needs to check against every snapshot and possibly send the element to every sink.

To explore yet another different area, we can reinstate requirements B1 and B2, and instead think about different data structures. A fixed-length array of T is arguably the easiest data structure to consider, but the exact same methodology can be used to add incremental snapshots to any kind of array, or any kind of tree (including binary trees and btrees), or any kind of hash table: a cursor needs to be maintained, mutating operations (including element insertion and removal) need to check against that cursor, and some extra state (either one bit or a snapshot sequence number) needs to be maintained per element. If the underlying data structure already needs to maintain some state per element, then the state for snapshots might fit in very easily. For an example of this, consider a fixed-length array of Maybe[T]. That is, an array where every element is either Nothing or Something(T). A simple implementation without snapshots might look like:

template <typename T, size_t N>
struct fixed_length_maybe {
  T _elems[N];
  bool _has[N] = {};

  bool has(size_t i) const {
    return i < N && _has[i];
  }
  void emplace(size_t i, const T& value) {
    assert(("index should be within bounds", i < N));
    assert(("element already exists", !_has[i]));
    _has[i] = true;
    _elems[i] = value;
  }
  void remove(size_t i) {
    assert(has(i));
    _has[i] = false;
  }
  const T& get(size_t i) const {
    assert(has(i));
    return _elems[i];
  }
  void set(size_t i, const T& value) {
    assert(has(i));
    _elems[i] = value;
  }
};

Note that the above implementation assumes that T is default-constructible. It should be clear how to remove this assumption (change the element type of the _elems array from T to std::aligned_storage_t<sizeof(T), alignof(T)>, use placement-new in emplace, invoke destructor in remove, write copy/move constructors/operators and destructor as appropriate), but the resultant boilerplate isn't interesting to the topic of this blog post.

The implementation can then be extended with support for incremental snapshots:

template <typename T, size_t N, typename Sink>
struct fixed_length_maybe_with_snapshot : fixed_length_maybe<T, N> {
  bool _already_visited[N] = {};
  Sink _sink = nullptr;
  size_t _cursor = 0;

  void emplace(size_t i, const T& value) {
    _already_visited[i] = (i < _cursor);
    return fixed_length_maybe<T, N>::emplace(i, value);
  }
  void remove(size_t i) {
    if (i < _cursor && !_already_visited[i]) {
      _sink->emplace(i, this->_elems[i]);
    }
    _already_visited[i] = false;
    return fixed_length_maybe<T, N>::remove(i);
  }
  void set(size_t i, const T& value) {
    if (i < _cursor && !_already_visited[i]) {
      _already_visited[i] = true;
      _sink->emplace(i, this->_elems[i]);
    }
    return fixed_length_maybe<T, N>::set(i, value);
  }

  void take_snapshot(Sink sink) {
    assert(("requirement B2", _sink == nullptr));
    assert(("sink should not be null", sink != nullptr));
    _sink = sink;
    _cursor = N;
  }
  bool drain_snapshot() {
    assert(("snapshot not yet taken", _sink != nullptr));
    if (_cursor == 0) {
      _sink = nullptr;
      return true; // finished draining
    } else if (_already_visited[--_cursor]) {
      _already_visited[_cursor] = false;
    } else if (this->_has[_cursor]) {
      _sink->emplace(_cursor, this->_elems[_cursor]);
    }
    return false;
  }
};

A better implementation would note that the _has array and the _already_visited array can be combined into a single array, with every element being in one of three possible states:

  1. Nothing
  2. Something(T)
  3. Something(T) already-visited

A fixed-length array of Maybe[T] might feel slightly contrived, but an open addressing hash table from K to V is basically just a fixed-length array of Maybe[Pair[K,V]]. Depending on how deletion is implemented, a fourth element state (Tombstone) might be added. It is also common for the Something states to contain a fingerprint of the element hash (in the otherwise spare bits). The only non-obvious part is how rehashing (to grow the table) interacts with the snapshot state, but this turns out to be easy: at the end of rehashing, set the snapshot cursor such that the entire new table is pending, and when moving elements from the old table to the new table, set them as already-visited if they were already-visited to start with or they were in the visited part of the old table.

Zstd streaming compression using ZSTD_compressBlock

Zstandard is, at its core, a block compressor. This core is usually packaged up and presented as a higher-level API (either streaming compression with ZSTD_compressStream2, or one-shot compression with ZSTD_compress), but the core remains available in the form of ZSTD_compressBlock:

ZSTDLIB_STATIC_API
size_t ZSTD_compressBlock(ZSTD_CCtx* cctx,
                          void* dst, size_t dstCapacity,
                          const void* src, size_t srcSize);

The ZSTDLIB_STATIC_API specifier means that this API isn't considered stable, and is only available when statically linking against the zstd library. It also means that #define ZSTD_STATIC_LINKING_ONLY needs to appear before #include <zstd.h>. The cctx argument contains a bunch of state which is carried over between successive blocks. The result of compression will be written into the memory range [dst, dst + dstCapacity). The dstCapacity argument exists mostly as a safety check; ZSTD_compressBlock will return an error if this value is not large enough. The block to be compressed is read from [src, src + srcSize), and the caller needs to keep this memory range unmodified for as long as it remains within the compression window. srcSize must not exceed 1<<17. The return value falls into one of a few ranges:

The compression window is formed by the concatenation of two spans of bytes, both initially empty. Diverging from the internal terminology, I'll call the two spans the old generation and the current generation. Within ZSTD_compressBlock, if the current generation ends at src, then the current generation is expanded to the right by srcSize. Otherwise, the old generation is discarded, the current generation becomes the old generation, and [src, src + srcSize) becomes the current generation. If the old generation overlaps with the current generation, then the old generation is trimmed (from the left) to eliminate the overlap. The caller is responsible for calculating the maximum window size resulting from all this.

To generate a compressed stream which is compatible with the higher-level API, two layers of framing need to be applied. Each block needs a three-byte header, and then multiple blocks are concatencated to form frames, with frames having an additional header and optional footer (and in turn, multiple frames can be concatenated).

The three-byte block header consists of three fields, which starting from the least significant bit are:

  1. is_last_block flag (1 bit)
  2. block_type field (2 bits)
  3. block_size field (21 bits, though value not allowed to exceed 1<<17)

If the is_last_block flag is not set, then another block is expected to follow in the stream. If it is set, then the current frame footer (if any) is expected to follow, and then either the end of the stream or the start of the next frame.

The block_type field has three valid values, though only two of them arise when using ZSTD_compressBlock directly. A block_type of 0 is used for blocks which could not be compressed (ZSTD_compressBlock returning 0). In this case, the "compressed data" exactly equals the decompressed data, and block_size gives the length of this data. A block_type of 2 is used for compressed blocks. In this case, block_size gives the length of the compressed data (the return value of ZSTD_compressBlock).

A frame header consists of:

  1. Magic number (4 bytes 28 B5 2F FD)
  2. Compression parameters, notably including the window size (1-6 bytes in general, typically 2 bytes)
  3. Optional uncompressed size (if present, 1-8 bytes)

See RFC8878 for details on the frame header fields. One of the compression parameters controls whether a frame footer is present. When a footer is present, it consists of a single field:

  1. XXH64 checksum of uncompressed data (4 bytes)

Special skippable frames can also be present. The reference implementation of the decompressor simply ignores such frames, but other implementations are allowed to do other things with them. They consist of:

  1. Magic number (4 bytes, first byte in range 50 - 5F, then 2A 4D 18)
  2. length field (4 bytes)
  3. length bytes of data
page: 2 3 4 5 6