Programmers often see regular expressions and the respective matching engines as a complex, almost unattainable blackbox. PCRE spans hundreds of thousands of lines of code. It supports a large and evolving feature set. Lookarounds, backreferences, Unicode properties, and multiple matching modes are all present. On top of this, PCRE includes a JIT compiler that translates patterns into machine code for speed.

While all these features are important for a production-grade regex engine, they can obscure the fundamental concepts behind pattern matching. In this blog post, we will start our journey towards the exploration of the process of building a matcher from scratch, all by ourselves.

1. Exact match

In this section we focus on exact string matching. The goal is to find occurrences of a given pattern, verbatim within a text. The constrained form of this problem opens up many optimisation tricks and algorithmic techniques that we can leverage.

Much ink has been spilled on this topic and many algorithms have been proposed over the years. Nonetheless, we take a stab at deriving some of the folk wisdom techniques ourselves, bottom-up.

1.1. Exact match of a single character

The simplest case of exact matching is to find a single character within a text. Programmers are already used to the function strchr from the C standard library, which does exactly that:

#include <cstring>
#include <iostream>

int main() {
  const char * text = "Hello, world!";
  char target = 'o';

  const char * result = std::strchr(text, target);
  if (result) {
    std::cout << "Found at position: " << (result - text) << std::endl;
  } else {
    std::cout << "Not found" << std::endl;
  }
}

/* Output:
Found at position: 4
*/

The function strchr scans the input string text character by character, comparing each character to the target. If a match is found, it returns a pointer to the first occurrence; otherwise, it returns nullptr. A naive implementation of this idea could look like this:

const char * simple_strchr(const char * text, char target) {
  while (*text) {
    if (*text == target) {
      return text;
    }
    ++text;
  }
  return nullptr;
}

A quick skim at the generated assembly code (from clang 21.1 with -O3 optimizations) reveals that the compiler does not bother unrolling the loop or using SIMD instructions, preferring to translate the function almost verbatim to x86-64 assembly:

simple_strchr(char const*, char):
        movzx   ecx, byte ptr [rdi]
        test    cl, cl
        je      .LBB0_1
        mov     rax, rdi
.LBB0_4:
        cmp     cl, sil
        je      .LBB0_2
        movzx   ecx, byte ptr [rax + 1]
        inc     rax
        test    cl, cl
        jne     .LBB0_4
.LBB0_1:
        xor     eax, eax
.LBB0_2:
        ret

The use of C-style strings (i.e., with a null terminator) prevents us from knowing the length of the input string in advance. This limitation hinders our ability to apply certain optimizations, such as processing multiple characters simultaneously using SIMD instructions. One alternative is the memchr function, which takes an explicit length parameter:

int main() {
  const char * text = "Hello, world!";
  size_t length = std::strlen(text);
  char target = 'o';

  const char * result = static_cast<const char *>(std::memchr(text, target, length));
  if (result) {
    std::cout << "Found at position: " << (result - text) << std::endl;
  } else {
    std::cout << "Not found" << std::endl;
  }
}

In this case, the implementation can take advantage of the known length to optimize the search process. Once again we can implement a simple version of memchr ourselves:

#include <cstddef>

const char * simple_memchr(const char * text, char target, size_t length) {
  for (size_t i = 0; i < length; ++i) {
    if (text[i] == target) {
      return &text[i];
    }
  }
  return nullptr;
}

1.1.2. Search via SWAR.

Disappointingly, the compiler chose to not vectorise it either. Our first idea for optimisation is to process multiple characters in each iteration. We will load 8 bytes at a time, treating them as a single integer, and comparing them via the bit-xor (^) operation. If any byte matches the target character, the corresponding byte in the result of the xor will be zero. We can then check for the presence and location of the zero byte using some bit-twiddling tricks. Here is the implementation:

// __restrict__: a GNU extension that tells the compiler that
// the pointer is the only way to access the underlying data.
void * memchr2(const void * __restrict__ s, int c, size_t n) {
  const unsigned char * __restrict__ p = (const unsigned char * __restrict__) s;
  const unsigned char uc = (unsigned char)c;
  // Align to 8 bytes (optional, helps on some CPUs; also avoids unaligned loads)
  while (n && ((uintptr_t)p & 7u)) {
    if (*p == uc) return (void *)p;
    ++p;  --n;
  }
  // Broadcast target character to all bytes of a 64-bit word
  // E.g. for 'A' (0x41), pattern becomes 0x4141414141414141
  const uint64_t pattern = 0x0101010101010101ULL * (uint64_t)uc;
  while (n >= 8) {
    // Avoids aliasing + unaligned-load issues.
    // __builtin_memcpy is non-standard and optimized away by the compiler.
    uint64_t w;  __builtin_memcpy(&w, p, sizeof(w));
    uint64_t x = w ^ pattern;
    if (((x - 0x0101010101010101ULL) & ~x & 0x8080808080808080ULL) != 0) {
      // Locate first match within this 8-byte block.
      for (int i = 0; i < 8; ++i) {
        if (p[i] == uc) return (void *)(p + i);
      }
    }
    p += 8;  n -= 8;
  }
  // Tail
  while (n--) {
    if (*p == uc) return (void *)p;
    ++p;
  }
  return NULL;
}

Forfeiting some auxiliary explanations and assuming a sufficient level of C-fu in the reader, we can express this more compactly:

const void * memchr2(const void * __restrict__ s, int c, size_t n) {
  const unsigned char * __restrict__ p = (const unsigned char * __restrict__) s;
  const unsigned char uc = (unsigned char)c;
  for (; n && ((uintptr_t)p & 7u); --n, ++p)
    if (*p == uc) return (const void *)p;
  const uint64_t pattern = 0x0101010101010101ULL * (uint64_t)uc;
  for (; n >= 8; n -= 8, p += 8) {
    uint64_t w;  __builtin_memcpy(&w, p, sizeof(w));
    uint64_t x = w ^ pattern;
    if (((x - 0x0101010101010101ULL) & ~x & 0x8080808080808080ULL))
      for (int i = 0; i < 8; ++i)
        if (p[i] == uc) return (const void *)(p + i);
  }
  for (; n--; ++p) if (*p == uc) return (const void *)p;
  return NULL;
}

Using a quickly hacked together benchmark harness, we compare the three implementations of memchr for throughput on my Ryzen 7 PRO 7840U laptop:

buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr    time=4093754278 ns  checksum=0x06d1490b613edffc
simple_memchr   time=13403055073 ns  checksum=0x06d1490b613edffc
memchr2         time=10869952023 ns  checksum=0x06d1490b613edffc

Estimated throughput (GiB/s):
  glibc_memchr : 259.694
  simple_memchr: 79.320         // 3.27x slower than glibc, 1.22x slower than memchr2
  memchr2      : 97.804         // 2.67x slower than glibc

Skewing the test so that the latency becomes more prominent (smaller buffers) yields:

buf_size=1024 bytes, queries=65536, iters=500, seed=1
glibc_memchr    time=401259363 ns  checksum=0x18c93a5a77e26bd1
simple_memchr   time=1493447878 ns  checksum=0x18c93a5a77e26bd1
memchr2         time=977337739 ns  checksum=0x18c93a5a77e26bd1

Estimated throughput (GiB/s):
  glibc_memchr : 36.016
  simple_memchr: 9.677        // 4x slower than glibc, 1.55x slower than memchr2
  memchr2      : 14.787       // 2.57x slower than glibc

Our SWAR (SIMD Within A Register) implementation of memchr is about 2.5-3x slower than the highly optimized glibc version, which is not bad for a first attempt.

1.1.3. Search via SSE4.2 PCMPESTRI.

There is still room for improvement, and we will reach for an indispensable trick: x86-64 SIMD intrinsics. We will specifically target the PCMPESTRI instruction from the SSE4.2 instruction set, which essentially does the bulk of our work for us.

To quote the Intel® 64 and IA-32 Architectures Software Developer’s Manual, Volume 2B:

PCMPESTRI — Packed Compare Explicit Length Strings, Return Index

Instruction Summary

Opcode / Instruction Op/En 64/32-bit Mode CPUID Flag Description
66 0F 3A 61 /r imm8PCMPESTRI xmm1, xmm2/m128, imm8 RMI V / V SSE4_2 Perform a packed comparison of string data with explicit lengths, generate an index, and store the result in ECX.
VEX.128.66.0F3A 61 /r ibVPCMPESTRI xmm1, xmm2/m128, imm8 RMI V / V AVX Perform a packed comparison of string data with explicit lengths, generate an index, and store the result in ECX.

Instruction Operand Encoding

Op/En Operand 1 Operand 2 Operand 3 Operand 4
RMI ModRM:reg (r) ModRM:r/m (r) imm8 N/A

Description

The instruction compares and processes data from two string fragments based on the encoded value in the imm8 control byte (see Section 4.1, “Imm8 Control Byte Operation for PCMPESTRI / PCMPESTRM / PCMPISTRI / PCMPISTRM”).
It generates an index that is written to the count register ECX.

Each string fragment is represented by two values:

  1. Data vector

    • An xmm register for the first operand.
    • An xmm register or m128 memory operand for the second operand.
    • Contains byte or word elements of the string.
  2. Length

    • For xmm1: length is taken from EAX/RAX.
    • For xmm2/m128: length is taken from EDX/RDX.
    • The length specifies how many bytes or words are valid in the corresponding vector.

Length Interpretation

  • The effective length is the absolute value of the length register.
  • The absolute-value computation saturates:
    • Bytes: maximum 16
    • Words: maximum 8
  • Saturation occurs when the length register value is:
    • Greater than 16 (or 8 for words), or
    • Less than −16 (or −8 for words)
  • Whether data elements are bytes or words is determined by imm8[3].

Comparison and Result

  • Comparison and aggregation behavior is controlled by fields in imm8 (see Section 4.1).
  • The instruction computes an intermediate result mask (IntRes2, see Section 4.1.4).
  • The index of the first or last set bit of IntRes2 is returned in ECX, depending on imm8[6]:
    • If imm8[6] = 0: first (least significant) set bit
    • If imm8[6] = 1: last (most significant) set bit
  • If no bits are set in IntRes2:
    • ECX = 16 (byte elements)
    • ECX = 8 (word elements)

We proceed with the implementation in the same vein as the previous SWAR memchr2 function:

#include <nmmintrin.h>

void * memchr_sse42(const void * s, int c, size_t n) {
  const unsigned char * p = (const unsigned char *) s;
  const unsigned char uc = (unsigned char) c;
  const __m128i needle = _mm_cvtsi32_si128((int) uc);
  for (; n && ((uintptr_t)p & 15u); --n, ++p) // Align to 16-byte boundary.
    if (*p == uc) return (void *)p;
  // Compare any byte in hay equals any byte in needle-set (size 1).
  const int imm8 = _SIDD_UBYTE_OPS | _SIDD_CMP_EQUAL_ANY | _SIDD_LEAST_SIGNIFICANT;
  for (; n >= 16; n -= 16, p += 16) {
    const int idx = _mm_cmpestri(needle, 1, _mm_load_si128((const __m128i *) p), 16, imm8);
    if (idx != 16) return (void *)(p + (unsigned)idx);
  }
  for (; n; --n, ++p) if (*p == uc) return (void *)p;
  return nullptr;
}

The results are promising. Our implementation is now only about 2x slower than glibc’s highly optimized version:

buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr    time=4129417820 ns  checksum=0xcdc64e6fe915f3ed
simple_memchr   time=13248285362 ns  checksum=0xcdc64e6fe915f3ed
memchr2         time=10671500036 ns  checksum=0xcdc64e6fe915f3ed
memchr_sse42    time=8801114604 ns  checksum=0xcdc64e6fe915f3ed

Estimated throughput (GiB/s):
  glibc_memchr : 257.451
  simple_memchr: 80.246
  memchr2      : 99.623
  memchr_sse42 : 120.794

The source of the remaining gap is a disappointing but frequently recurring story in performance engineering on x86-64 platforms. Looking up the instruction on uops.info reveals the following picture for my Zen4 CPU:

Instruction Lat TP Uops Ports
PCMPESTRI (XMM, M128, I8) [≤11; ≤29] 2.00 / 4.00 12 1*FP01 + 2*FP1 + 2*FP45
PCMPESTRI (XMM, XMM, I8) [≤11; 15] 2.00 / 3.00 8 1*FP01 + 2*FP1 + 2*FP45
PCMPESTRI64 (XMM, M128, I8) [≤11; ≤29] 2.00 / 4.00 12 1*FP01 + 2*FP1 + 2*FP45
PCMPESTRI64 (XMM, XMM, I8) [≤11; 15] 2.00 / 3.00 8 1*FP01 + 2*FP1 + 2*FP45

The instruction has a high latency (up to 29 cycles when reading from memory!) and consumes multiple micro-operations (uops) that occupy several execution ports. This means that even though we are processing 16 bytes at a time, the instruction’s throughput is limited by its latency and port usage. A natural conclusion would be that glibc clearly uses more advanced techniques that certainly don’t involve PCMPESTRI. But why? Wasn’t the point of CISC CPU architectures to have complex instructions that do more work per instruction?

This is neither the first nor last time we see that complex, purpose-built instructions do not necessarily lead to better performance. It’s quite difficult to wrap one’s head around such a paradox. Other examples include:

  • REP STOSB is not the fastest way to zero out memory (memset/bzero), despite being a single instruction that does exactly that.
  • Even more counterintuitively, seemingly unrolled REP STOSQ is typically even slower than REP STOSB for the same purpose. The reason is in fact simple: the microcode that makes REP STOSB relatively efficient is simply not present for this opcode.
  • REP MOVSB is not the fastest way to copy memory (memcpy), even though it is designed for that purpose.
  • REPNE SCASB is not the fastest way to search for a byte in memory (memchr), despite being a single instruction that performs that operation. And finally, here we are: another purpose-built instruction, PCMPESTRI, that does not deliver the expected performance boost.

Tangentially, on the topic of REPNE SCASB, let’s probe the performance of this. The memchr implementation using this instruction is a simple wrapper over inline assembly:

NOINLINE void * memchr_repne_scas(const void * s, int c, size_t n) {
  if (n == 0) return NULL;
  const unsigned char * p = (const unsigned char *) s;
  void * ret;
  __asm__ volatile(
      "cld\n\t"
      "repne scasb\n\t"
      "jne 1f\n\t"              // if ZF==0 (no match), jump to notfound
      "lea -1(%%rdi), %0\n\t"   // match: ret = rdi-1
      "jmp 2f\n\t"
      "1:\n\t"
      "xor %0, %0\n\t"          // not found: ret = NULL
      "2:\n\t"
      : "=&r"(ret), "+D"(p), "+c"(n)
      : "a"((unsigned char)c)
      : "cc", "memory");
  return ret;
}

As expected, the performance is underwhelming, but but the biggest surprise is that it is even worse than our naive simple_memchr implementation:

buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr    time=4364084308 ns  checksum=0x90e66e3000583c48
simple_memchr   time=14458207884 ns  checksum=0x90e66e3000583c48
memchr2         time=12134840048 ns  checksum=0x90e66e3000583c48
memchr_sse42    time=9476436712 ns  checksum=0x90e66e3000583c48
repne_scas      time=17571507777 ns  checksum=0x90e66e3000583c48

Estimated throughput (GiB/s):
  glibc_memchr : 243.608
  simple_memchr: 73.531
  memchr2      : 87.609
  memchr_sse42 : 112.186
  repne_scas   : 60.503

1.1.4. Search via SSE2 PCMPEQ + PMOVMSKB.

To go one step further, we can come back to the SSE4.2, PCMPESTRI-based implementation and simply use two instructions with lower latency: PCMPEQ (compare), followed by PMOVMSKB (move mask). This combination allows us to achieve similar functionality with reduced latency and better throughput. We will also forfeit the alignment loop: since we have tied to machine specific code, we are free to use unaligned loads. Here is the implementation:

void * memchr_sse2(const void * s, int c, size_t n) {
  const unsigned char * p = (const unsigned char *)s;
  const unsigned char uc = (unsigned char)c;
  const __m128i needle = _mm_set1_epi8((char) uc);
  for (; n && ((uintptr_t)p & 15u); --n, ++p)
    if (*p == uc) return (void *)p;
  for (; n >= 16; n -= 16, p += 16) {
    __m128i hay = _mm_load_si128((const __m128i *)p);
    __m128i eq  = _mm_cmpeq_epi8(hay, needle);
    unsigned mask = (unsigned)_mm_movemask_epi8(eq);
    if (mask)
      return (void *)(p + __builtin_ctz(mask));
  }
  for (; n--; ++p)
    if (*p == uc) return (void *)p;
  return NULL;
}

To justify this, we come back to the instruction table for Zen 4:

Instruction Lat TP Uops Ports
PCMPESTRI (XMM, M128, I8) [≤11; ≤29] 2.00 / 4.00 12 1*FP01 + 2*FP1 + 2*FP45
PCMPEQB (XMM, M128) [1;≤9] 0.25 / 0.50 1 1*FP0123
PMOVMSKB (R32, XMM) ≤6 0.50 / 1.00 1 1*FP45

It is clear that this combination of opcodes has better throughput guarantees, throughput and requires less Uops. The execution port usage of the CPU is also better, further suggesting better practical performance. Validation of this result is quite easy:

buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr    time=4966514321 ns  checksum=0x42a2457f5bb57a25
simple_memchr   time=12864543169 ns  checksum=0x42a2457f5bb57a25
memchr2         time=10555408424 ns  checksum=0x42a2457f5bb57a25
memchr_sse42    time=8888208502 ns  checksum=0x42a2457f5bb57a25
repne_scas      time=16022187564 ns  checksum=0x42a2457f5bb57a25
memchr_sse2     time=6822718398 ns  checksum=0x42a2457f5bb57a25

Estimated throughput (GiB/s):
  glibc_memchr : 214.058
  simple_memchr: 82.640
  memchr2      : 100.718
  memchr_sse42 : 119.611
  repne_scas   : 66.353
  memchr_sse2  : 155.821

The soundness of this optimisation is further suported by the outputs of e.g. the LLVM machine code analyser; we can compare the movemask version and the the PCMPESTRI version.

The MCA dumps are for the same overall structure (scalar align/peel into a 16B loop, followed by scalar tail), but with different 16B core, as illustrated by the C code. In terms of assembly code, we see:

  • Variant 1 core: vpcmpestri xmm0, [rdi], imm8=0 (SSE4.2 string-compare instruction)
  • Variant 2 core: vpcmpeqb + vpmovmskb + tzcnt (SIMD compare + mask + bit-scan) f99 over ~100 iterations. Every indicator favours the second version:
Metric vpcmpestri vpcmpeqb + pmovmskb + tzcnt Analysis
Total cycles 3199 1511 Variant 2 is, on paper, about 2.1 times faster
Instructions 5000 4900 Similar instruction count
Total uOps 6400 5600 Variant 2 uses fewer uops
uOps/cycle 2.00 3.71 Variant 1 is throughput-starved.
IPC 1.56 3.24 Variant 2 keeps frontend/backend busy
Block RThroughput 16.0 14.0 Variant 2 has a better steady-state loop throughput

MCA also shows us some auxiliary information about stalls and port usage. In the core, we see vpcmpestri as annotated with “9 uops, lat 23, RThroughput 4.00”. It also loads (MayLoad). The reciprocal throughput of 4.00 is particularly damning: in the best case, we can only issue one such instruction every 4 cycles. This is a hard limit on the loop throughput, regardless of other factors.

1.1.5. Search via AVX2 (improved).

Naturally increasing the vector width to 256 bits (32 bytes) should yield further performance improvements. This is however not the case, or at least it will not be as straightforward as before.

void * memchr_avx2_simple(const void * s, int c, size_t n) {
  const unsigned char * p = (const unsigned char *)s;
  const unsigned char uc = (unsigned char)c;
  const __m256i needle = _mm256_set1_epi8((char) uc);
  for (; n && ((uintptr_t)p & 31u); --n, ++p)
    if (*p == uc) return (void *)p;
  for (; n >= 32; n -= 32, p += 32) {
    __m256i hay = _mm256_load_si256((const __m256i *)p);
    __m256i eq  = _mm256_cmpeq_epi8(hay, needle);
    unsigned mask = (unsigned)_mm256_movemask_epi8(eq);
    if (mask)
      return (void *)(p + __builtin_ctz(mask));
  }
  for (; n--; ++p)
    if (*p == uc) return (void *)p;
  return NULL;
}

The benchmark results are as follows:

Estimated throughput (GiB/s):
  glibc_memchr : 250.816
  simple_memchr: 87.112
  memchr2      : 97.577
  memchr_sse2  : 155.821
  memchr_avx2_simple : 171.643

The AVX2 version uses the same techniques as the glibc memchr, but we are still quite far away from the 250GiB/s ballpark. The way forward is to capitalise on the broader vector width even more and modify the core loop to scan 128 bytes at a time. This can be achieved by unrolling the loop four times and combining the results using bit-wise OR, scanning for the precise location of the match only when the return is guaranteed:

void * memchr_avx2_simple(const void * s, int c, size_t n) {
  const unsigned char * p = (const unsigned char *)s;
  const unsigned char uc = (unsigned char)c;
  const __m256i needle = _mm256_set1_epi8((char) uc);
  for (; n >= 128; n -= 128, p += 128) {
    __m256i hay1 = _mm256_loadu_si256((const __m256i *)p);
    __m256i hay2 = _mm256_loadu_si256((const __m256i *)(p + 32));
    __m256i hay3 = _mm256_loadu_si256((const __m256i *)(p + 64));
    __m256i hay4 = _mm256_loadu_si256((const __m256i *)(p + 96));
    __m256i eq1  = _mm256_cmpeq_epi8(hay1, needle);
    __m256i eq2  = _mm256_cmpeq_epi8(hay2, needle);
    __m256i eq3  = _mm256_cmpeq_epi8(hay3, needle);
    __m256i eq4  = _mm256_cmpeq_epi8(hay4, needle);
    __m256i any  = _mm256_or_si256(_mm256_or_si256(eq1, eq2), _mm256_or_si256(eq3, eq4));
    unsigned mask = (unsigned)_mm256_movemask_epi8(any);
    if (mask) {
      unsigned mmask1 = (unsigned)_mm256_movemask_epi8(eq1);
      if (mmask1) return (void *)(p + __builtin_ctz(mmask1));
      unsigned mmask2 = (unsigned)_mm256_movemask_epi8(eq2);
      if (mmask2) return (void *)(p + 32 + __builtin_ctz(mmask2));
      unsigned mmask3 = (unsigned)_mm256_movemask_epi8(eq3);
      if (mmask3) return (void *)(p + 64 + __builtin_ctz(mmask3));
      unsigned mmask4 = (unsigned)_mm256_movemask_epi8(eq4);
      return (void *)(p + 96 + __builtin_ctz(mmask4));
    }
  }
  for (; n >= 32; n -= 32, p += 32) {
    __m256i hay = _mm256_loadu_si256((const __m256i *)p);
    __m256i eq  = _mm256_cmpeq_epi8(hay, needle);
    unsigned mask = (unsigned)_mm256_movemask_epi8(eq);
    if (mask)
      return (void *)(p + __builtin_ctz(mask));
  }
  for (; n--; ++p)
    if (*p == uc) return (void *)p;
  return NULL;
}

This almost brings us to parity with glibc on large buffers:

buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr    time=3884077381 ns  checksum=0xf3bb7462ab9aa0ae
simple_memchr   time=12991746698 ns  checksum=0xf3bb7462ab9aa0ae
memchr2         time=10150611634 ns  checksum=0xf3bb7462ab9aa0ae
memchr_sse2     time=6738995996 ns  checksum=0xf3bb7462ab9aa0ae
memchr_avx2_simple  time=4027457053 ns  checksum=0xf3bb7462ab9aa0ae

Estimated throughput (GiB/s):
  glibc_memchr : 273.713
  simple_memchr: 81.831
  memchr2      : 104.735
  memchr_sse2  : 157.757
  memchr_avx2_simple : 263.969

1.1.6. Search via SSE2 + BMI (improved).

The previous AVX2 version uses unaligned loads everywhere (_mm256_loadu_si256), and relies on modern CPUs handling that well enough. This is not a great assumption to make for SSE2 code - if your CPU is old enough to not support AVX2, it is likely to have worse performance for unaligned loads. We can therefore revisit the SSE2 version and improve it by adding proper alignment handling.

The new version will spend a lot of code up front to avoid bad alignment situations and to enable aligned loads (_mm_load_si128) in the hot loops. We will explicitly consider p & 63 (position within a 64-byte cache line), a crosscache path when the initial 16B load would straddle a cache-line boundary (more likely to be slower on some microarchitectures), adjusting p to a 16-byte boundary, then later to a 64-byte boundary (aligned) for the main loop.

The AVX2 code has clean loop tiers: 128B AVX2-unroll, then 32B AVX2, then scalar. Our new version that treats old processors slightly better will do early probing loads that may read past len (still within an aligned block on the same page, which does not trigger a fault) and then checks the found index against len before returning.

For example, if we load 16 bytes even if len < 16, assuming that we don’t cross a page boundary (which would cause a segmentation fault), then we can simply do:

idx = ctz(mask); if (idx >= len) return nullptr;

This is a typical speculative read within the same page / valid mapped memory assumption-style trick, often used in high-performance code.

The special cases have crept up and lead us to produce the following code:

static inline __m128i splat_u8_sse2(unsigned char c) {
  __m128i x = _mm_cvtsi32_si128((int)c);
  x = _mm_unpacklo_epi8(x, x);
  x = _mm_unpacklo_epi8(x, x);
  x = _mm_shuffle_epi32(x, 0);
  return x;
}

void * memchr_sse2_bsf(const void * __restrict__  s, int c, size_t n) {
  const unsigned char * __restrict__ p0 = (const unsigned char * __restrict__) s;
  const __m128i needle = splat_u8_sse2((unsigned char)c);
  if (n == 0) return nullptr;
  size_t len = n;
  const unsigned char *p = p0;
  uintptr_t addr = (uintptr_t)p;
  unsigned off64 = (unsigned)(addr & 63u); /* if >48 crosscache */
  if (off64 <= 48u) {
    /* unaligned first 16B */
    __m128i v = _mm_loadu_si128((const __m128i *)(const void *)p);
    __m128i m = _mm_cmpeq_epi8(needle, v);
    unsigned mask = (unsigned)_mm_movemask_epi8(m);
    if (mask) {
      unsigned idx = __builtin_ctz(mask);
      if ((size_t)idx >= len) return nullptr;
      return (void *)(p + idx);
    }
    /* unaligned_no_match_1 */
    if (len <= 16) return nullptr;
    len -= 16;
    const unsigned char *p16 = p + 16;
    unsigned a15 = (unsigned)((uintptr_t)p & 15u);
    p = (const unsigned char *)((uintptr_t)p16 & ~(uintptr_t)15u);
    len += a15;
  } else {
    /* crosscache */
    unsigned off16 = (unsigned)(addr & 15u);
    const unsigned char *pal = (const unsigned char *)(addr & ~(uintptr_t)15u);
    __m128i v = _mm_load_si128((const __m128i *)(const void *)pal);
    __m128i m = _mm_cmpeq_epi8(needle, v);
    unsigned mask = (unsigned)_mm_movemask_epi8(m);
    mask >>= off16;
    if (mask) {
      unsigned idx = __builtin_ctz(mask);
      if ((size_t)idx >= len) return nullptr;
      return (void *)(p0 + idx);
    }
    {
      size_t sum = off16 + len;
      if (sum < off16) sum = (size_t)-1;
      if (sum <= 16) return nullptr;
      len = sum - 16;
    }
    p = pal + 16;
  }
  /* early 2x unroll before 64-byte alignment loop */
  for (;;) {
    if (len <= 64) break;
    len -= 64;
    __m128i v0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
    __m128i m0 = _mm_cmpeq_epi8(needle, v0);
    unsigned k0 = (unsigned)_mm_movemask_epi8(m0);
    if (k0) return (void *)(p + __builtin_ctz(k0));
    __m128i v1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
    __m128i m1 = _mm_cmpeq_epi8(needle, v1);
    unsigned k1 = (unsigned)_mm_movemask_epi8(m1);
    if (k1) return (void *)(p + 16 + __builtin_ctz(k1));
    __m128i v2 = _mm_load_si128((const __m128i *)(const void *)(p + 32));
    __m128i m2 = _mm_cmpeq_epi8(needle, v2);
    unsigned k2 = (unsigned)_mm_movemask_epi8(m2);
    if (k2) return (void *)(p + 32 + __builtin_ctz(k2));
    __m128i v3 = _mm_load_si128((const __m128i *)(const void *)(p + 48));
    __m128i m3 = _mm_cmpeq_epi8(needle, v3);
    unsigned k3 = (unsigned)_mm_movemask_epi8(m3);
    if (k3) return (void *)(p + 48 + __builtin_ctz(k3));
    p += 64;
    /* if aligned jump forward, else do another 64 scan */
    if ((((uintptr_t)p) & 63u) == 0) break;
    if (len <= 64) break;
    len -= 64;
    __m128i w0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
    __m128i n0 = _mm_cmpeq_epi8(needle, w0);
    unsigned q0 = (unsigned)_mm_movemask_epi8(n0);
    if (q0) return (void *)(p + __builtin_ctz(q0));
    __m128i w1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
    __m128i n1 = _mm_cmpeq_epi8(needle, w1);
    unsigned q1 = (unsigned)_mm_movemask_epi8(n1);
    if (q1) return (void *)(p + 16 + __builtin_ctz(q1));
    __m128i w2 = _mm_load_si128((const __m128i *)(const void *)(p + 32));
    __m128i n2 = _mm_cmpeq_epi8(needle, w2);
    unsigned q2 = (unsigned)_mm_movemask_epi8(n2);
    if (q2) return (void *)(p + 32 + __builtin_ctz(q2));
    __m128i w3 = _mm_load_si128((const __m128i *)(const void *)(p + 48));
    __m128i n3 = _mm_cmpeq_epi8(needle, w3);
    unsigned q3 = (unsigned)_mm_movemask_epi8(n3);
    if (q3) return (void *)(p + 48 + __builtin_ctz(q3));
    p += 64;
    /* align to 64 and carry remainder into len */
    uintptr_t a = (uintptr_t)p;
    unsigned r = (unsigned)(a & 63u);
    p = (const unsigned char *)(a & ~(uintptr_t)63u);
    len += r;
    break;
  }
  /* assumes align of 64 */
  while (len > 64) {
    len -= 64;
    __m128i v0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
    __m128i v1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
    __m128i v2 = _mm_load_si128((const __m128i *)(const void *)(p + 32));
    __m128i v3 = _mm_load_si128((const __m128i *)(const void *)(p + 48));
    __m128i m0 = _mm_cmpeq_epi8(needle, v0);
    __m128i m1 = _mm_cmpeq_epi8(needle, v1);
    __m128i m2 = _mm_cmpeq_epi8(needle, v2);
    __m128i m3 = _mm_cmpeq_epi8(needle, v3);
    __m128i t0 = _mm_max_epu8(m0, m2);
    __m128i t1 = _mm_max_epu8(m1, m3);
    __m128i t2 = _mm_max_epu8(t0, t1);
    unsigned any = (unsigned)_mm_movemask_epi8(t2);
    if (any) {
      unsigned k0 = (unsigned)_mm_movemask_epi8(m0);
      if (k0) return (void *)(p + __builtin_ctz(k0));
      unsigned k1 = (unsigned)_mm_movemask_epi8(m1);
      if (k1) return (void *)(p + 16 + __builtin_ctz(k1));
      unsigned k2 = (unsigned)_mm_movemask_epi8(m2);
      if (k2) return (void *)(p + 32 + __builtin_ctz(k2));
      unsigned k3 = (unsigned)_mm_movemask_epi8(m3);
      return (void *)(p + 48 + __builtin_ctz(k3));
    }
    p += 64;
  }
  /* 0 <= len <= 64 */
  {
    if (len == 0) return nullptr;
    if (len > 32) {
      __m128i v0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
      __m128i m0 = _mm_cmpeq_epi8(needle, v0);
      unsigned k0 = (unsigned)_mm_movemask_epi8(m0);
      if (k0) {
        unsigned idx = __builtin_ctz(k0);
        if ((size_t)idx < len) return (void *)(p + idx);
        return nullptr;
      }
      __m128i v1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
      __m128i m1 = _mm_cmpeq_epi8(needle, v1);
      unsigned k1 = (unsigned)_mm_movemask_epi8(m1);
      if (k1) {
        unsigned idx = 16 + __builtin_ctz(k1);
        if ((size_t)idx < len) return (void *)(p + idx);
        return nullptr;
      }
      __m128i v2 = _mm_load_si128((const __m128i *)(const void *)(p + 32));
      __m128i m2 = _mm_cmpeq_epi8(needle, v2);
      unsigned k2 = (unsigned)_mm_movemask_epi8(m2);
      if (k2) {
        unsigned idx = 32 + __builtin_ctz(k2);
        if ((size_t)idx < len) return (void *)(p + idx);
        return nullptr;
      }
      if (len <= 48) return nullptr;
      __m128i v3 = _mm_load_si128((const __m128i *)(const void *)(p + 48));
      __m128i m3 = _mm_cmpeq_epi8(needle, v3);
      unsigned k3 = (unsigned)_mm_movemask_epi8(m3);
      if (k3) {
        unsigned idx = 48 + __builtin_ctz(k3);
        if ((size_t)idx < len) return (void *)(p + idx);
        return nullptr;
      }
      return nullptr;
    } else {
      __m128i v0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
      __m128i m0 = _mm_cmpeq_epi8(needle, v0);
      unsigned k0 = (unsigned)_mm_movemask_epi8(m0);
      if (k0) {
        unsigned idx = __builtin_ctz(k0);
        if ((size_t)idx < len) return (void *)(p + idx);
        return nullptr;
      }
      if (len <= 16) return nullptr;
      __m128i v1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
      __m128i m1 = _mm_cmpeq_epi8(needle, v1);
      unsigned k1 = (unsigned)_mm_movemask_epi8(m1);
      if (k1) {
        unsigned idx = 16 + __builtin_ctz(k1);
        if ((size_t)idx < len) return (void *)(p + idx);
        return nullptr;
      }
      return nullptr;
    }
  }
}

Benchmarking this version shows that we have attained significantly better throughput:

buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr    time=4525973864 ns  checksum=0x93785bd40474de84
simple_memchr   time=14300729997 ns  checksum=0x93785bd40474de84
memchr2         time=11173968344 ns  checksum=0x93785bd40474de84
memchr_sse2     time=7703221849 ns  checksum=0x93785bd40474de84
memchr_avx2_simple  time=4467790687 ns  checksum=0x93785bd40474de84
memchr_sse2_bsf  time=5780660724 ns  checksum=0x93785bd40474de84

Estimated throughput (GiB/s):
  glibc_memchr : 234.894
  simple_memchr: 74.341
  memchr2      : 95.143
  memchr_sse2  : 138.010
  memchr_avx2_simple : 237.953
  memchr_sse2_bsf : 183.911

1.1.7 Conclusion

Through a series of optimisations and careful use of SIMD instructions, we have developed a memchr implementation that approaches the performance of glibc’s highly optimized version. They also extend to other similar functions like strlen (equivalent to memchr searching for zero) or strchr (equivalent to memchr searching for either a specific character or zero) with analogous techniques.

A straightforward implementation of strlen given the techniques that we have just covered achieves performance that is on-par with glibc’s strlen:

size_t strlen_avx2_simple(const void * __restrict__ s) {
  const unsigned char * base = (const unsigned char *) s;
  const unsigned char * p    = base;
  const __m256i zero = _mm256_setzero_si256();
  size_t to_page_end = 4096u - ((uintptr_t)(p) & 4095u);

  for (; to_page_end >= 128u; to_page_end -= 128u, p += 128u) {
    __m256i v1 = _mm256_loadu_si256((const __m256i *)(p + 0));
    __m256i v2 = _mm256_loadu_si256((const __m256i *)(p + 32));
    __m256i v3 = _mm256_loadu_si256((const __m256i *)(p + 64));
    __m256i v4 = _mm256_loadu_si256((const __m256i *)(p + 96));
    __m256i e1 = _mm256_cmpeq_epi8(v1, zero);
    __m256i e2 = _mm256_cmpeq_epi8(v2, zero);
    __m256i e3 = _mm256_cmpeq_epi8(v3, zero);
    __m256i e4 = _mm256_cmpeq_epi8(v4, zero);
    __m256i any = _mm256_or_si256(_mm256_or_si256(e1, e2),
                                  _mm256_or_si256(e3, e4));
    if (_mm256_testz_si256(any, any) == 0) {
      unsigned m1 = (unsigned)_mm256_movemask_epi8(e1);
      if (m1) return (size_t)((p + (size_t)__builtin_ctz(m1)) - base);
      unsigned m2 = (unsigned)_mm256_movemask_epi8(e2);
      if (m2) return (size_t)((p + 32u + (size_t)__builtin_ctz(m2)) - base);
      unsigned m3 = (unsigned)_mm256_movemask_epi8(e3);
      if (m3) return (size_t)((p + 64u + (size_t)__builtin_ctz(m3)) - base);
      unsigned m4 = (unsigned)_mm256_movemask_epi8(e4);
      return (size_t)((p + 96u + (size_t)__builtin_ctz(m4)) - base);
    }
  }

  for (; to_page_end >= 32u; to_page_end -= 32u, p += 32u) {
    __m256i v  = _mm256_loadu_si256((const __m256i *)p);
    __m256i eq = _mm256_cmpeq_epi8(v, zero);
    unsigned mask = (unsigned)_mm256_movemask_epi8(eq);
    if (mask) return (size_t)((p + (size_t)__builtin_ctz(mask)) - base);
  }

  for (; to_page_end; --to_page_end, ++p) {
    if (*p == 0) return (size_t)(p - base);
  }
  
  for (;;) {
    __m256i v1 = _mm256_loadu_si256((const __m256i *)(p + 0));
    __m256i v2 = _mm256_loadu_si256((const __m256i *)(p + 32));
    __m256i v3 = _mm256_loadu_si256((const __m256i *)(p + 64));
    __m256i v4 = _mm256_loadu_si256((const __m256i *)(p + 96));
    __m256i e1 = _mm256_cmpeq_epi8(v1, zero);
    __m256i e2 = _mm256_cmpeq_epi8(v2, zero);
    __m256i e3 = _mm256_cmpeq_epi8(v3, zero);
    __m256i e4 = _mm256_cmpeq_epi8(v4, zero);
    __m256i any = _mm256_or_si256(_mm256_or_si256(e1, e2),
                                  _mm256_or_si256(e3, e4));
    if (_mm256_testz_si256(any, any) == 0) {
      unsigned m1 = (unsigned)_mm256_movemask_epi8(e1);
      if (m1) return (size_t)((p + (size_t)__builtin_ctz(m1)) - base);
      unsigned m2 = (unsigned)_mm256_movemask_epi8(e2);
      if (m2) return (size_t)((p + 32u + (size_t)__builtin_ctz(m2)) - base);
      unsigned m3 = (unsigned)_mm256_movemask_epi8(e3);
      if (m3) return (size_t)((p + 64u + (size_t)__builtin_ctz(m3)) - base);
      unsigned m4 = (unsigned)_mm256_movemask_epi8(e4);
      return (size_t)((p + 96u + (size_t)__builtin_ctz(m4)) - base);
    }
    p += 128;
  }
}

As evidenced by a benchmark in similar vein to the previous ones:

buf_size=67108864 bytes, queries=100000, iters=500, seed=1
strlen_glibc    time=1873349773 ns  gbps=17.911
strlen_avx2_simple  time=2190930772 ns  gbps=15.315

And finally, an implementation of strchr:

char * strchr_avx2_pagesafe(const char * s, int c) {
  const unsigned char * p = (const unsigned char *)s;
  const unsigned char uc = (unsigned char)c;
  const __m256i needle = _mm256_set1_epi8((char)uc);
  const __m256i zero   = _mm256_setzero_si256();
  for (;;) {
    uintptr_t addr = (uintptr_t)p;
    size_t bytes_to_page_end = 4096u - (size_t)(addr & 4095u);
    while (bytes_to_page_end < 32) {
      unsigned char ch = *p;
      if (ch == uc) return (char *)p;
      if (ch == 0)  return NULL;
      ++p;
      ++bytes_to_page_end;
      if (bytes_to_page_end == PAGE_SIZE) break;
    }
    addr = (uintptr_t)p;
    bytes_to_page_end = PAGE_SIZE - (size_t)(addr & PAGE_MASK);
    while (bytes_to_page_end >= 128) {
      __m256i hay1 = _mm256_loadu_si256((const __m256i *)(p +  0));
      __m256i hay2 = _mm256_loadu_si256((const __m256i *)(p + 32));
      __m256i hay3 = _mm256_loadu_si256((const __m256i *)(p + 64));
      __m256i hay4 = _mm256_loadu_si256((const __m256i *)(p + 96));
      __m256i eq1 = _mm256_cmpeq_epi8(hay1, needle);
      __m256i eq2 = _mm256_cmpeq_epi8(hay2, needle);
      __m256i eq3 = _mm256_cmpeq_epi8(hay3, needle);
      __m256i eq4 = _mm256_cmpeq_epi8(hay4, needle);
      __m256i z1  = _mm256_cmpeq_epi8(hay1, zero);
      __m256i z2  = _mm256_cmpeq_epi8(hay2, zero);
      __m256i z3  = _mm256_cmpeq_epi8(hay3, zero);
      __m256i z4  = _mm256_cmpeq_epi8(hay4, zero);
      __m256i m1  = _mm256_or_si256(eq1, z1);
      __m256i m2  = _mm256_or_si256(eq2, z2);
      __m256i m3  = _mm256_or_si256(eq3, z3);
      __m256i m4  = _mm256_or_si256(eq4, z4);
      __m256i any = _mm256_or_si256(_mm256_or_si256(m1, m2), _mm256_or_si256(m3, m4));
      if (_mm256_testz_si256(any, any) == 0) {
        unsigned comb, eqm, bit;
        comb = (unsigned)_mm256_movemask_epi8(m1);
        if (comb) {
          unsigned idx = (unsigned)__builtin_ctz(comb);
          eqm = (unsigned)_mm256_movemask_epi8(eq1);
          bit = 1u << idx;
          return (eqm & bit) ? (char *)(p + idx) : NULL;
        }
        comb = (unsigned)_mm256_movemask_epi8(m2);
        if (comb) {
          unsigned idx = (unsigned)__builtin_ctz(comb);
          eqm = (unsigned)_mm256_movemask_epi8(eq2);
          bit = 1u << idx;
          return (eqm & bit) ? (char *)(p + 32 + idx) : NULL;
        }
        comb = (unsigned)_mm256_movemask_epi8(m3);
        if (comb) {
          unsigned idx = (unsigned)__builtin_ctz(comb);
          eqm = (unsigned)_mm256_movemask_epi8(eq3);
          bit = 1u << idx;
          return (eqm & bit) ? (char *)(p + 64 + idx) : NULL;
        }
        comb = (unsigned)_mm256_movemask_epi8(m4);
        {
          unsigned idx = (unsigned)__builtin_ctz(comb);
          eqm = (unsigned)_mm256_movemask_epi8(eq4);
          bit = 1u << idx;
          return (eqm & bit) ? (char *)(p + 96 + idx) : NULL;
        }
      }
      p += 128;
      bytes_to_page_end -= 128;
    }
    while (bytes_to_page_end >= 32) {
      __m256i hay = _mm256_loadu_si256((const __m256i *)p);
      __m256i eq  = _mm256_cmpeq_epi8(hay, needle);
      __m256i z   = _mm256_cmpeq_epi8(hay, zero);
      __m256i m   = _mm256_or_si256(eq, z);
      unsigned comb = (unsigned)_mm256_movemask_epi8(m);
      if (comb) {
        unsigned idx = (unsigned)__builtin_ctz(comb);
        unsigned eqm = (unsigned)_mm256_movemask_epi8(eq);
        return (eqm & (1u << idx)) ? (char *)(p + idx) : NULL;
      }
      p += 32;
      bytes_to_page_end -= 32;
    }
  }
}

1.2. Exact match of a pair of characters in overlapping regions.

For the sake of illustrating the techniques involved, we will consider the problem of searching for either of two characters in a memory block (like a variant of wmemchr or such).

1.2.1. Baseline scalar/SWAR implementations.

We have a couple of the obvious ones. Simple scalar version with 8-bit loads:

size_t memchr2_portable_simple(const void * ptr, size_t n, uint16_t needle) {
  const uint8_t * p = (const uint8_t *) ptr;
  if (n < 2) return n;
  const uint8_t lo = (uint8_t)needle;
  const uint8_t hi = (uint8_t)(needle >> 8);
  for (size_t i = 0; i + 1 < n; ++i)
    if (p[i] == lo && p[i + 1] == hi) return i;
  return n;
}

Simple scalar version with direct, unaligned 16-bit loads:

NOINLINE size_t memchr2_portable_elim2(const void * ptr, size_t n, uint16_t needle) {
  const uint8_t * p = (const uint8_t *) ptr;
  if (n < 2) return n;
  for (size_t i = 0; i + 1 < n; i++)
    if (*(const uint16_t *)(p + i) == needle) return i;
  return n;
}

An attempt to save a load in each iteration:

size_t memchr2_portable_running(const void * ptr, size_t n, uint16_t needle) {
  const uint8_t * p = (const uint8_t *) ptr;
  if (n < 2) return n;
  uint16_t window = *((const uint16_t *)p);
  if (window == needle) return 0;
  for (size_t i = 0; i + 2 < n; i++) {
    window = (uint16_t)((window >> 8) | ((uint16_t)(p[i + 2]) << 8));
    if (window == needle) return i + 1;
  }
  return n;
}

A SWAR-based version that processes 8 bytes at a time, pre-filtering candidates that contain either of the two bytes in any order in succession:

NOINLINE size_t memchr2_portable_qword(const void * data, size_t len, uint16_t needle) {
  if (len < 2) return len;
  uint8_t b1 = (uint8_t)(needle & 0xFF);
  uint8_t b2 = (uint8_t)(needle >> 8);
  const uint64_t pattern1 = 0x0101010101010101ULL * (uint64_t)b1;
  const uint64_t pattern2 = 0x0101010101010101ULL * (uint64_t)b2;
  const uint8_t * p = (const uint8_t *) data;
  const uint8_t * end = (const uint8_t *) data + len - 1;
  while (p + 7 < end) {
    uint64_t chunk = *(uint64_t *)p;
    uint64_t x1 = chunk ^ pattern1;
    uint64_t x2 = chunk ^ pattern2;
    uint64_t hit1 = (x1 - 0x0101010101010101ULL) & ~x1 & 0x8080808080808080ULL;
    uint64_t hit2 = (x2 - 0x0101010101010101ULL) & ~x2 & 0x8080808080808080ULL;
    uint64_t hc1 = (hit1 >> 8) & hit2;
    uint64_t hc2 = (hit2 >> 8) & hit1;
    if (hc1 | hc2) {
      for (int i = 0; i < 7; i++) {
        uint16_t w = *(const uint16_t *)(p + i);
        if (w == needle) return p + i - (const uint8_t *) data;
      }
    }
    p += 7;
  }
  for (; p < end; p++) {
    uint16_t w = *(const uint16_t *)p;
    if (w == needle) return p - (const uint8_t *) data;
  }
  return len;
}

Checking for both hc1 and hc2 helps to convince the compiler to vectorise the inner loop, leading to - paradoxically - better performance than checking for just one of them.

Also unexpectedly, the SWAR version beats all others in benchmarks by a wide margin, and the second runner-up is the naive, 8-bit direct load version:

buf_size=64000000 bytes, queries=65536, iters=100, seed=666
portable_elim2        time=15880241758 ns  checksum=0xc25f073958310200
scalar                time=14393013166 ns  checksum=0xc25f073958310200
portable_qword        time=8435549272 ns  checksum=0xc25f073958310200
portable_running      time=28003921570 ns  checksum=0xc25f073958310200

Estimated throughput (GiB/s):
  portable_elim2   : 13.499
  scalar           : 14.894
  portable_qword   : 25.412
  portable_running : 7.655

1.2.2. SSE2 and SSSE3 implementations.

The obvious next step is to try to vectorise the search. Without minding unaligned loads, we can do it the obvious way:

NOINLINE size_t memchr2_sse2_bmi1(const void * ptr, size_t n, uint16_t needle) {
  const uint8_t * p = (const uint8_t *) ptr;
  if (n < 2) return n;
  const uint8_t lo = (uint8_t)needle;
  const uint8_t hi = (uint8_t)(needle >> 8);
  const __m128i vlo = _mm_set1_epi8((char)lo);
  const __m128i vhi = _mm_set1_epi8((char)hi);
  size_t i = 0;
  for (; i + 17 <= n; i += 16) {
    __m128i a = _mm_loadu_si128((const __m128i *)(p + i));
    __m128i b = _mm_loadu_si128((const __m128i *)(p + i + 1));
    __m128i eq0 = _mm_cmpeq_epi8(a, vlo);
    __m128i eq1 = _mm_cmpeq_epi8(b, vhi);
    __m128i both = _mm_and_si128(eq0, eq1);
    unsigned mask = (unsigned)_mm_movemask_epi8(both);
    if (mask)
      return i + (size_t)_tzcnt_u32(mask);   // BMI1
  }
  for (; i + 1 < n; ++i)
    if (p[i] == lo && p[i + 1] == hi) return i;
  return n;
}

The result? A twice speedup over the SWAR version!

buf_size=64000000 bytes, queries=65536, iters=100, seed=666
portable_elim2        time=15880241758 ns  checksum=0xc25f073958310200
scalar                time=14393013166 ns  checksum=0xc25f073958310200
portable_qword        time=8435549272 ns  checksum=0xc25f073958310200
portable_running      time=28003921570 ns  checksum=0xc25f073958310200
sse2_bmi1             time=5253894697 ns  checksum=0xc25f073958310200

Estimated throughput (GiB/s):
  portable_elim2   : 13.499
  scalar           : 14.894
  portable_qword   : 25.412
  portable_running : 7.655
  sse2_bmi1        : 40.802

Using SSSE3 we can sidestep the nasty unaligned loads by using PALIGNR, like so:

NOINLINE size_t memchr2_ssse3_bmi1(const void * ptr, size_t n, uint16_t needle) {
  const uint8_t * p = (const uint8_t *) ptr;
  if (n < 2) return n;
  const uint8_t lo = (uint8_t)needle;
  const uint8_t hi = (uint8_t)(needle >> 8);
  const __m128i vlo = _mm_set1_epi8((char)lo);
  const __m128i vhi = _mm_set1_epi8((char)hi);
  size_t i = 0;
  if (n >= 32) {
    __m128i A = _mm_loadu_si128((const __m128i *)(p + 0));
    for (; i + 32 <= n; i += 16) {
      __m128i N = _mm_loadu_si128((const __m128i *)(p + i + 16));
      __m128i B = _mm_alignr_epi8(N, A, 1); // SSSE3
      __m128i eq0 = _mm_cmpeq_epi8(A, vlo);
      __m128i eq1 = _mm_cmpeq_epi8(B, vhi);
      __m128i both = _mm_and_si128(eq0, eq1);
      unsigned mask = (unsigned)_mm_movemask_epi8(both);
      if (mask) return i + (size_t)_tzcnt_u32(mask); // BMI1
      A = N;
    }
  }
  for (; i + 1 < n; ++i)
    if (p[i] == lo && p[i + 1] == hi) return i;
  return n;
}

Disappointingly, that doesn’t seem to win us much:

buf_size=64000000 bytes, queries=65536, iters=100, seed=666
portable_elim2        time=15880241758 ns  checksum=0xc25f073958310200
scalar                time=14393013166 ns  checksum=0xc25f073958310200
portable_qword        time=8435549272 ns  checksum=0xc25f073958310200
portable_running      time=28003921570 ns  checksum=0xc25f073958310200
sse2_bmi1             time=5253894697 ns  checksum=0xc25f073958310200
ssse3_bmi1            time=5354999130 ns  checksum=0xc25f073958310200

Estimated throughput (GiB/s):
  portable_elim2   : 13.499
  scalar           : 14.894
  portable_qword   : 25.412
  portable_running : 7.655
  sse2_bmi1        : 40.802
  ssse3_bmi1       : 40.031

Judging by the output of the profiling tool perf, most of the time is spent in the hot loop, suggesting that nothing funny is going on with the tail or tzcnt again. Zen2 analysis via llvm-mca is also somewhat insubtantial:

Iterations:        100
Instructions:      1400
Total Cycles:      364
Total uOps:        1400

Dispatch Width:    4
uOps Per Cycle:    3.85
IPC:               3.85
Block RThroughput: 3.5

Instruction Info:
[1]: #uOps
[2]: Latency
[3]: RThroughput
[4]: MayLoad
[5]: MayStore
[6]: HasSideEffects (U)

[1]    [2]    [3]    [4]    [5]    [6]    Instructions:
 1      1     0.25                        vmovdqa       xmm3, xmm2
 1      8     0.33    *                   vmovdqu       xmm2, xmmword ptr [rdi + r8 + 16]
 1      1     0.50                        vpalignr      xmm4, xmm2, xmm3, 1
 1      1     0.33                        vpcmpeqb      xmm3, xmm0, xmm3
 1      1     0.33                        vpcmpeqb      xmm4, xmm4, xmm1
 1      1     0.25                        vpand xmm3, xmm3, xmm4
 1      1     1.00                        vpmovmskb     esi, xmm3
 1      1     0.25                        test  esi, esi
 1      1     0.25                        jne   .over
 1      1     0.25                        lea   rsi, [r8 + 16]
 1      1     0.25                        add   r8, 48
 1      1     0.25                        cmp   r8, rax
 1      1     0.25                        mov   r8, rsi
 1      1     0.25                        jbe   .star

It also appears that we don’t have enough execution ports to unroll and interleave the loop further.

1.2.3. AVX2 implementation.

We start tame, by widening the SSE vectors to AVX2 256-bit ones. Unfortunately, life is never as simple as one would have hoped. AVX2 likes to include opcodes that operate on 128-bit lanes of 256-bit vectors only. Reportedly this was dictated by microarchitectural considerations in Intel’s implementation of AVX2 that are now long obsolete, but we have to deal with it nonetheless.

Our implementation will use permute2x128 and valignr to construct the shifted vectors:

NOINLINE size_t memchr2_avx2_bmi1(const void * ptr, size_t n, uint16_t needle) {
  const uint8_t *p = (const uint8_t *)ptr;
  if (n < 2) return n;
  const uint8_t lo = (uint8_t)needle;
  const uint8_t hi = (uint8_t)(needle >> 8);
  const __m256i vlo = _mm256_set1_epi8((char)lo);
  const __m256i vhi = _mm256_set1_epi8((char)hi);
  size_t i = 0;
  if (n >= 64) {
    __m256i A = _mm256_loadu_si256((const __m256i *)(p + 0));
    for (; i + 64 <= n; i += 32) {
      __m256i N = _mm256_loadu_si256((const __m256i *)(p + i + 32));
      __m256i M = _mm256_permute2x128_si256(A, N, 0x21);
      __m256i B = _mm256_alignr_epi8(M, A, 1);
      __m256i eq0  = _mm256_cmpeq_epi8(A, vlo);
      __m256i eq1  = _mm256_cmpeq_epi8(B, vhi);
      __m256i both = _mm256_and_si256(eq0, eq1);
      unsigned mask = (unsigned)_mm256_movemask_epi8(both);
      if (mask) return i + (size_t)_tzcnt_u32(mask);
      A = N;
    }
  }
  for (; i + 1 < n; ++i)
    if (p[i] == lo && p[i + 1] == hi) return i;
  return n;
}

Instinctively, we will also try to shove a couple of _mm256_movemask_epi8 away from the hot path. To do this we will apply the same optimisation idea as in the regular memchr implementation:

static inline __m256i both_from_AN(__m256i A, __m256i N, __m256i vlo, __m256i vhi) {
  __m256i M = _mm256_permute2x128_si256(A, N, 0x21);
  __m256i B = _mm256_alignr_epi8(M, A, 1);
  __m256i eq0  = _mm256_cmpeq_epi8(A, vlo);
  __m256i eq1  = _mm256_cmpeq_epi8(B, vhi);
  return _mm256_and_si256(eq0, eq1);
}
NOINLINE size_t memchr2_avx2_bmi1_alt(const void * ptr, size_t n, uint16_t needle) {
  const uint8_t * p = (const uint8_t *) ptr;
  if (n < 2) return n;
  const uint8_t lo = (uint8_t)needle;
  const uint8_t hi = (uint8_t)(needle >> 8);
  const __m256i vlo = _mm256_set1_epi8((char)lo);
  const __m256i vhi = _mm256_set1_epi8((char)hi);
  size_t i = 0;
  if (n >= 96) {
    __m256i A = _mm256_loadu_si256((const __m256i *)(p + 0));
    for (; i + 96 <= n; i += 64) {
      __m256i N0 = _mm256_loadu_si256((const __m256i *)(p + i + 32));
      __m256i N1 = _mm256_loadu_si256((const __m256i *)(p + i + 64));
      __m256i both0 = both_from_AN(A,  N0, vlo, vhi);
      __m256i both1 = both_from_AN(N0, N1, vlo, vhi);
      __m256i any = _mm256_or_si256(both0, both1);
      if (!_mm256_testz_si256(any, any)) {
        unsigned m0 = (unsigned)_mm256_movemask_epi8(both0);
        if (m0) return i + (size_t)_tzcnt_u32(m0);
        unsigned m1 = (unsigned)_mm256_movemask_epi8(both1);
        return (i + 32) + (size_t)_tzcnt_u32(m1);
      }
      A = N1;
    }
    for (; i + 64 <= n; i += 32) {
      __m256i N = _mm256_loadu_si256((const __m256i *)(p + i + 32));
      __m256i both = both_from_AN(A, N, vlo, vhi);
      if (!_mm256_testz_si256(both, both)) {
        unsigned m = (unsigned)_mm256_movemask_epi8(both);
        return i + (size_t)_tzcnt_u32(m);
      }
      A = N;
    }
  } else if (n >= 64) {
    __m256i A = _mm256_loadu_si256((const __m256i *)(p + 0));
    for (; i + 64 <= n; i += 32) {
      __m256i N = _mm256_loadu_si256((const __m256i *)(p + i + 32));
      __m256i both = both_from_AN(A, N, vlo, vhi);
      if (!_mm256_testz_si256(both, both)) {
        unsigned m = (unsigned)_mm256_movemask_epi8(both);
        return i + (size_t)_tzcnt_u32(m);
      }
      A = N;
    }
  }
  for (; i + 1 < n; ++i)
      if (p[i] == lo && p[i + 1] == hi) return i;
  return n;
}

Benchmarks show that the second AVX2 version is about the same in terms of performance as the first one, except it seems to stall on the vtest instruction on both Zen2 and Zen4 CPUs. A test on a Zen2 CPU (5900X) gives:

buf_size=64000000 bytes, queries=65536, iters=50, seed=16
portable_elim2        time=7122549226 ns  checksum=0x9ea45cc56838615c
scalar                time=3940411071 ns  checksum=0x9ea45cc56838615c
portable_qword        time=1868259066 ns  checksum=0x9ea45cc56838615c
portable_running      time=11036034203 ns  checksum=0x9ea45cc56838615c
sse2_bmi1             time=774990423 ns  checksum=0x9ea45cc56838615c
sse2_bmi1_fast        time=837601040 ns  checksum=0x9ea45cc56838615c
ssse3_bmi1            time=821152775 ns  checksum=0x9ea45cc56838615c
avx2_bmi1             time=710584728 ns  checksum=0x9ea45cc56838615c
avx2_bmi1_alt         time=703469492 ns  checksum=0x9ea45cc56838615c

Estimated throughput (GiB/s):
  portable_elim2   : 14.727
  scalar           : 26.621
  portable_qword   : 56.147
  portable_running : 9.505
  sse2_bmi1        : 135.352
  sse2_bmi1_fast   : 125.234
  ssse3_bmi1       : 127.743
  avx2_bmi1        : 147.620
  avx2_bmi1_alt    : 149.113

The kernel of the improved AVX2 loop appears to be:

   3.81 │ 40:   vmovdqu      ymm4,YMMWORD PTR [rdi+rsi*1+0x20]
   2.90 │       vpcmpeqb     ymm5,ymm0,ymm2
   4.17 │       vperm2i128   ymm3,ymm2,ymm4,0x21
   3.16 │       vpalignr     ymm3,ymm3,ymm2,0x1
   2.78 │       vmovdqu      ymm2,YMMWORD PTR [rdi+rsi*1+0x40]
   3.27 │       vpcmpeqb     ymm3,ymm3,ymm1
   1.95 │       vpand        ymm3,ymm5,ymm3
   2.49 │       vperm2i128   ymm5,ymm4,ymm2,0x21
   1.91 │       vpalignr     ymm5,ymm5,ymm4,0x1
   2.32 │       vpcmpeqb     ymm4,ymm0,ymm4
   3.60 │       vpcmpeqb     ymm5,ymm5,ymm1
   3.15 │       vpand        ymm4,ymm4,ymm5
   5.93 │       vpor         ymm5,ymm3,ymm4
  15.79 │       vptest       ymm5,ymm5
   7.24 │     ↓ jne          154
   7.20 │       lea          r8,[rsi+0x40]
   7.08 │       add          rsi,0xa0
   4.72 │       cmp          rsi,rax
   3.19 │       mov          rsi,r8
   3.13 │     ↑ jbe          40

Unrolling four times gives the following kernel:

   0.79 │ 40:   vmovdqu      ymm4,YMMWORD PTR [rdi+rsi*1+0x20]
   0.53 │       vmovdqu      ymm5,YMMWORD PTR [rdi+rsi*1+0x40]
   0.50 │       vmovdqu      ymm6,YMMWORD PTR [rdi+rsi*1+0x60]
   0.52 │       vpcmpeqb     ymm7,ymm0,ymm2
   1.73 │       vperm2i128   ymm3,ymm2,ymm4,0x21
   1.86 │       vpalignr     ymm3,ymm3,ymm2,0x1
   2.43 │       vmovdqu      ymm2,YMMWORD PTR [rdi+rsi*1+0x80]
   2.29 │       vpcmpeqb     ymm3,ymm3,ymm1
   1.95 │       vpand        ymm3,ymm7,ymm3
   2.29 │       vperm2i128   ymm7,ymm4,ymm5,0x21
   1.10 │       vpalignr     ymm7,ymm7,ymm4,0x1
   0.94 │       vpcmpeqb     ymm4,ymm0,ymm4
   0.84 │       vpcmpeqb     ymm7,ymm7,ymm1
   0.69 │       vpand        ymm4,ymm4,ymm7
   0.71 │       vperm2i128   ymm7,ymm5,ymm6,0x21
   1.04 │       vpor         ymm8,ymm3,ymm4
   0.76 │       vpalignr     ymm7,ymm7,ymm5,0x1
   1.06 │       vpcmpeqb     ymm5,ymm0,ymm5
   0.72 │       vpcmpeqb     ymm7,ymm7,ymm1
   1.04 │       vpand        ymm5,ymm5,ymm7
   0.93 │       vperm2i128   ymm7,ymm6,ymm2,0x21
   0.98 │       vpalignr     ymm7,ymm7,ymm6,0x1
   1.21 │       vpcmpeqb     ymm6,ymm0,ymm6
   1.51 │       vpcmpeqb     ymm7,ymm7,ymm1
   1.23 │       vpand        ymm6,ymm6,ymm7
   3.14 │       vpor         ymm7,ymm5,ymm6
   1.50 │       vpor         ymm7,ymm8,ymm7
  19.06 │       vptest       ymm7,ymm7
   9.21 │     ↓ jne          140
   8.78 │       lea          r8,[rsi+0x80]
   8.29 │       add          rsi,0x120
   8.92 │       cmp          rsi,rax
   0.56 │       mov          rsi,r8
   0.62 │     ↑ jbe          40

Which also stalls on vtest, albeit less frequently.

1.2.4. AVX512 (AVX-512F + AVX-512BW + AVX-512VBMI) implementation.

Straightforward translation using _mm512_permutex2var_epi8 yields us:

NOINLINE size_t memchr2_avx512vbmi(const void * ptr, size_t n, uint16_t needle) {
  const uint8_t * p = (const uint8_t *) ptr;
  if (n < 2) return n;
  const uint8_t lo = (uint8_t)needle;
  const uint8_t hi = (uint8_t)(needle >> 8);
  const __m512i vlo = _mm512_set1_epi8((char)lo);
  const __m512i vhi = _mm512_set1_epi8((char)hi);
  const __m512i idx = _mm512_setr_epi32(
      0x04030201, 0x08070605, 0x0c0b0a09, 0x100f0e0d,
      0x14131211, 0x18171615, 0x1c1b1a19, 0x201f1e1d,
      0x24232221, 0x28272625, 0x2c2b2a29, 0x302f2e2d,
      0x34333231, 0x38373635, 0x3c3b3a39, 0x403f3e3d
  );
  size_t i = 0;
  if (n >= 128) {
    __m512i A = _mm512_loadu_si512((const void *)(p + 0));
    for (; i + 128 <= n; i += 64) {
      __m512i N = _mm512_loadu_si512((const void *)(p + i + 64));
      __m512i B = _mm512_permutex2var_epi8(A, idx, N);
      __mmask64 k0 = _mm512_cmpeq_epi8_mask(A, vlo);
      __mmask64 k1 = _mm512_cmpeq_epi8_mask(B, vhi);
      __mmask64 k  = k0 & k1;
      if (k) return i + (size_t)_tzcnt_u64((uint64_t)k);
      A = N;  // roll
    }
  }
  for (; i + 1 < n; ++i) if (p[i] == lo && p[i + 1] == hi) return i;
  return n;
}

The perf profiling output tells us that we may have overdone a little with the vector width, but the stall persists:

Percent │      mov          rax,rsi
   0.01 │      cmp          rsi,0x2
        │    ↓ jb           7b
   0.01 │      movzx        ecx,dx
   0.02 │      shr          ecx,0x8
        │      cmp          rax,0x80
        │    ↓ jb           7f
   0.01 │      vmovdqu64    zmm2,ZMMWORD PTR [rdi]
   0.25 │      vpbroadcastb zmm0,edx
   0.04 │      vpbroadcastb zmm1,ecx
   0.01 │      xor          r8d,r8d
        │      nop
   4.02 │30:   vmovdqa64    zmm3,zmm2
   4.02 │      vmovdqu64    zmm2,ZMMWORD PTR [rdi+r8*1+0x40]
   4.52 │      vpcmpeqb     k0,zmm0,zmm3
   4.48 │      valignq      zmm4,zmm2,zmm3,0x2
   6.98 │      vpalignr     zmm4,zmm4,zmm3,0x1
   7.18 │      vpcmpeqb     k1,zmm4,zmm1
  20.62 │      ktestq       k0,k1
   9.31 │    ↓ jne          b0
   9.61 │      lea          rsi,[r8+0x40]
   6.78 │      add          r8,0xc0
   6.66 │      cmp          r8,rax
   0.73 │      mov          r8,rsi
   0.76 │    ↑ jbe          30
   0.09 │      lea          r8,[rsi+0x1]
   0.05 │      cmp          r8,rax
        │    ↓ jb           8a
   0.12 │7b:   vzeroupper
   0.01 │    ← ret
   0.02 │7f:   xor          esi,esi
        │      lea          r8,[rsi+0x1]
   0.01 │      cmp          r8,rax
        │    ↑ jae          7b
   0.03 │8a:   lea          r8,[rax-0x1]
   0.04 │    ↓ jmp          98
   2.39 │90:   inc          rsi
   1.90 │      cmp          r8,rsi
        │    ↑ je           7b
   6.05 │98:   cmp          BYTE PTR [rdi+rsi*1],dl
        │    ↑ jne          90
   0.12 │      movzx        r9d,BYTE PTR [rdi+rsi*1+0x1]
   0.11 │      cmp          cx,r9w
        │    ↑ jne          90
        │      mov          rax,rsi
        │      vzeroupper
        │    ← ret
   1.49 │b0:   kandq        k0,k0,k1
   1.09 │      kmovq        rax,k0
   0.47 │      tzcnt        rax,rax
        │      add          rax,r8
   0.01 │      vzeroupper
   0.01 │    ← ret

Perhaps interestingly, the AVX-512 implementation seems to not incur too much code bloat:

% nm --no-demangle -a -P --size-sort memchr2-benchmark | grep memchr2
_Z22memchr2_portable_elim2PKvmt T 1960 3a
_Z23memchr2_portable_simplePKvmt T 1920 3d
_Z24memchr2_portable_runningPKvmt T 19a0 53
_Z17memchr2_sse2_bmi1PKvmt T 11f0 9d
_Z18memchr2_ssse3_bmi1PKvmt T 1340 ad
_Z22memchr2_sse2_bmi1_fastPKvmt T 1290 ad
_Z17memchr2_avx2_bmi1PKvmt T 13f0 c4
_Z18memchr2_avx512vbmiPKvmt T 16c0 c6       <<--- AVX-512 version
_Z22memchr2_portable_qwordPKvmt T 1790 18d
_Z21memchr2_avx2_bmi1_altPKvmt T 14c0 1f5

We also observe a significant performance boost over AVX2:

buf_size=64000000 bytes, queries=65536, iters=100, seed=666
sse2_bmi1             time=6581546429 ns  checksum=0xc25f073958310200
sse2_bmi1_fast        time=6248299510 ns  checksum=0xc25f073958310200
ssse3_bmi1            time=6445409513 ns  checksum=0xc25f073958310200
avx2_bmi1             time=5446783838 ns  checksum=0xc25f073958310200
avx2_bmi1_alt         time=5882657242 ns  checksum=0xc25f073958310200
avx512vbmi            time=4824502035 ns  checksum=0xc25f073958310200

Estimated throughput (GiB/s):
  sse2_bmi1        : 32.571
  sse2_bmi1_fast   : 34.308
  ssse3_bmi1       : 33.259
  avx2_bmi1        : 39.357
  avx2_bmi1_alt    : 36.440
  avx512vbmi       : 44.433

With the target feature that discourages the compiler from splitting permutex2var into multiple instructions, we see the assembly output:

   4.45 │ 90:   mov          rdx,rcx
   0.54 │       add          rcx,0x40
   0.51 │       vmovdqa64    zmm1,zmm0
   2.34 │       vmovdqu64    zmm0,ZMMWORD PTR [r8+rcx*1]
   5.69 │       vmovdqa64    zmm2,zmm1
   7.80 │       vpermt2b     zmm2,zmm3,zmm0
  10.07 │       vpcmpeqb     k1,zmm2,zmm4
   9.24 │       vpcmpeqb     k0{k1},zmm1,zmm5
  24.38 │       kortestq     k0,k0
   8.04 │     ↓ jne          100
   6.57 │       lea          rdi,[rdx+0xc0]
   5.39 │       cmp          rax,rdi
   0.01 │     ↑ jae          90

Without major performance improvements.

1.3. Exact match of a pair of characters in non-overlapping regions.

This is an easier but less useful problem (applicable only to wmemchr or related problems). Essentially, all of our solutions are derivative of the previous memchr implementations, except we operate on two-byte chunks directly.

1.3.1. Baseline and SWAR implementations.

Three preliminary implementations (simple, SWAR, repne scasw) follow:

NOINLINE void * wmemchr_repne_scasw(const void *s, uint16_t c, size_t n) {
  if (n == 0) return NULL;
  const uint16_t *p = (const uint16_t *)s;
  void *ret;
  __asm__ volatile(
      "cld\n\t"
      "repne scasw\n\t"
      "jne 1f\n\t"              // if ZF==0 (no match), jump to notfound
      "lea -2(%%rdi), %0\n\t"   // match: ret = rdi-2
      "jmp 2f\n\t"
      "1:\n\t"
      "xor %0, %0\n\t"          // not found: ret = NULL
      "2:\n\t"
      : "=&r"(ret), "+D"(p), "+c"(n) : "a"(c) : "cc", "memory");
  return ret;
}

NOINLINE const uint16_t * simple_wmemchr(const uint16_t * __restrict__ text, uint16_t target, size_t length) {
  for (size_t i = 0; i < length; ++i)
    if (text[i] == target)
      return &text[i];
  return nullptr;
}

NOINLINE const uint16_t * qword_wmemchr(const uint16_t * __restrict__ s, uint16_t target, size_t length) {
  const uint16_t * p = s;
  const uint64_t pattern = 0x0001000100010001ULL * (uint64_t)target;
  while (length >= 4) {
    uint64_t w;  __builtin_memcpy(&w, p, sizeof(w));
    uint64_t x = w ^ pattern;
    if (((x - 0x0001000100010001ULL) & ~x & 0x8000800080008000ULL) != 0) {
      if (p[0] == target) return p + 0;
      if (p[1] == target) return p + 1;
      if (p[2] == target) return p + 2;
      if (p[3] == target) return p + 3;
    }
    p += 4;  length -= 4;
  }
  while (length) {
    if (*p == target) return p;
    ++p;  --length;
  }
  return nullptr;
}

Benchmarks show:

buf_size=67108864 bytes, queries=65536, iters=50, seed=1
simple_wmemchr  time=6638559210 ns  checksum=0xcb426bda4f4d21ff
wmemchr_repne_scasw  time=10031833115 ns  checksum=0xcb426bda4f4d21ff
qword_wmemchr   time=5605833323 ns  checksum=0xcb426bda4f4d21ff

Estimated throughput (GiB/s):
  simple_wmemchr: 16.014
  wmemchr_repne_scasw: 10.598
  qword_wmemchr: 18.965

Unsurprisingly, as we have remarked previously, repne scasw is not competitive here due to the missing microcode. The SWAR version is the fastest, but only marginally so. More significant performance improvements will be obtained using SIMD techniques, similar to those used in the previous section.

1.3.2. SIMD implementations.

The only issue with porting the previous memchr SIMD implementations to wmemchr is that we have to deal with 16-bit elements instead of 8-bit ones in the _mm_movemask_epi8 instruction. This means that we have to adjust the way we compute the positions from the comparison results.

NOINLINE void * wmemchr_sse2(const void * s, uint16_t c, size_t n) {
  const uint16_t * p = (const uint16_t *) s;
  const __m128i needle = _mm_set1_epi16((short)c);
  for (; n >= 8; n -= 8, p += 8) {
    __m128i hay = _mm_loadu_si128((const __m128i*)p);
    __m128i eq  = _mm_cmpeq_epi16(hay, needle);
    unsigned mask = (unsigned)_mm_movemask_epi8(eq);
    if (mask) {
      unsigned byte_index = (unsigned)__builtin_ctz(mask);
      return (void *) (p + (byte_index >> 1));
    }
  }
  for (; n; --n, ++p) {
    if (*p == c) return (void *)p;
  }
  return nullptr;
}

NOINLINE const uint16_t * wmemchr_avx2_simple(const uint16_t * __restrict__ s, uint16_t target, size_t length) {
  const uint16_t * __restrict__ p = s;
  const __m256i needle = _mm256_set1_epi16((short)target);
  for (; length >= 64; length -= 64, p += 64) {
    __m256i hay1 = _mm256_loadu_si256((const __m256i *)(p));
    __m256i hay2 = _mm256_loadu_si256((const __m256i *)(p + 16));
    __m256i hay3 = _mm256_loadu_si256((const __m256i *)(p + 32));
    __m256i hay4 = _mm256_loadu_si256((const __m256i *)(p + 48));
    __m256i eq1  = _mm256_cmpeq_epi16(hay1, needle);
    __m256i eq2  = _mm256_cmpeq_epi16(hay2, needle);
    __m256i eq3  = _mm256_cmpeq_epi16(hay3, needle);
    __m256i eq4  = _mm256_cmpeq_epi16(hay4, needle);
    __m256i any  = _mm256_or_si256(_mm256_or_si256(eq1, eq2), _mm256_or_si256(eq3, eq4));
    if (_mm256_testz_si256(any, any) == 0) {
      unsigned mm1 = (unsigned)_mm256_movemask_epi8(eq1);
      if (mm1) return p + ((unsigned)__builtin_ctz(mm1) >> 1);
      unsigned mm2 = (unsigned)_mm256_movemask_epi8(eq2);
      if (mm2) return (p + 16) + ((unsigned)__builtin_ctz(mm2) >> 1);
      unsigned mm3 = (unsigned)_mm256_movemask_epi8(eq3);
      if (mm3) return (p + 32) + ((unsigned)__builtin_ctz(mm3) >> 1);
      unsigned mm4 = (unsigned)_mm256_movemask_epi8(eq4);
      return (p + 48) + ((unsigned)__builtin_ctz(mm4) >> 1);
    }
  }
  for (; length >= 16; length -= 16, p += 16) {
    __m256i hay = _mm256_loadu_si256((const __m256i *)p);
    __m256i eq  = _mm256_cmpeq_epi16(hay, needle);
    unsigned mask = (unsigned)_mm256_movemask_epi8(eq);
    if (mask)
      return p + ((unsigned)__builtin_ctz(mask) >> 1);
  }
  for (; length; --length, ++p)
    if (*p == target) return p;
  return nullptr;
}

The final benchmarks look like this:

buf_size=67108864 bytes, queries=65536, iters=50, seed=1
simple_wmemchr  time=6638559210 ns  checksum=0xcb426bda4f4d21ff
wmemchr_repne_scasw  time=10031833115 ns  checksum=0xcb426bda4f4d21ff
wmemchr_sse2    time=4114077958 ns  checksum=0xcb426bda4f4d21ff
qword_wmemchr   time=5605833323 ns  checksum=0xcb426bda4f4d21ff
wmemchr_avx2_simple  time=3277355737 ns  checksum=0xcb426bda4f4d21ff

Estimated throughput (GiB/s):
  simple_wmemchr: 16.014
  wmemchr_repne_scasw: 10.598
  wmemchr_sse2 : 25.841
  qword_wmemchr: 18.965
  wmemchr_avx2_simple : 32.438

We leave the problem be here, as we believe the problem to be memory-beyond this point.

1.4. Exact match of a triplet of consecutive characters.

Matching triplets is of particular interest to us and poses a challenge:

  • There isn’t a single instruction to load a triplet from memory, unlike before.
  • Naive scaling of the 16-bit variant will increase the volume of code and provide strictly worse performance.
  • The needle is staring to become large enough to stack the statistics within random data against us.

Because at this length the concrete underlying distribution of substrings in the input data matters, we will conduct two benchmarks: one with uniformly random data, and another with a prefix of enwik8, 100MB of Wikipedia in XML format.

1.4.1. Baseline and SWAR implementations.

We once again compare the “rolling read” and baseline approaches:

NOINLINE size_t memchr3_portable_simple(const void * ptr, size_t n, uint32_t needle) {
  const uint8_t * p = (const uint8_t *)ptr;
  if (n < 3) return n;
  const uint8_t b1 = (uint8_t)needle;
  const uint8_t b2 = (uint8_t)(needle >> 8);
  const uint8_t b3 = (uint8_t)(needle >> 16);
  for (size_t i = 0; i + 2 < n; ++i)
    if (p[i] == b1 && p[i + 1] == b2 && p[i + 2] == b3) return i;
  return n;
}

NOINLINE size_t memchr3_rolling_cmp(const void * ptr, size_t n, uint32_t needle) {
  const uint8_t * p = (const uint8_t *)ptr;
  if (n < 3) return n;
  uint32_t nw = (uint32_t)(
      ((needle & 0xFFu) << 24) |
      (((needle >> 8) & 0xFFu) << 16) |
      (((needle >> 16) & 0xFFu) << 8)
  );

  uint32_t hw = (uint32_t)((uint32_t)p[0] << 24 | (uint32_t)p[1] << 16 | (uint32_t)p[2] << 8);
  if (hw == nw) return 0;
  size_t i = 2;
  while (++i < n) {
    hw = (hw | (uint32_t)p[i]) << 8;
    if (hw == nw) return i - 2;
  }
  return n;
}

Benchmarks show that the rolling read approach (musl-like) still isn’t worth the effort here; Zen2 benchmarks show:

Testing on uniformly random data...
buf_size=67108864 bytes, queries=65536, iters=10, seed=1
memchr3_simple          time=3838123745 ns  checksum=0x64135f1aecd4b15f
memchr3_rolling         time=6907529497 ns  checksum=0x64135f1aecd4b15f

Estimated throughput (GiB/s):
  memchr3_simple   : 5.498
  memchr3_rolling  : 3.055

Testing on enwik8 with sampled needles...
memchr3_simple          time=231706636 ns  checksum=0x49c24b926896aabe
memchr3_rolling         time=206304819 ns  checksum=0x49c24b926896aabe

Estimated throughput (GiB/s):
  memchr3_simple   : 5.401
  memchr3_rolling  : 6.066

Moving on to the SWAR implementation, we have three choices that make assumptions about the underlying distribution: test all bytes (most general, also most overhead on the hot path, no false positives), test only the first and last bytes (moderate overhead, some false positives), and test only the middle byte (least overhead, most false positives). We implement all three without excessive commentary:

NOINLINE size_t memchr3_portable_qword(const void * data, size_t len, uint32_t needle) {
  if (len < 3) return len;
  uint8_t b1 = (uint8_t)(needle);
  uint8_t b2 = (uint8_t)(needle >> 8);
  uint8_t b3 = (uint8_t)(needle >> 16);
  const uint64_t pattern1 = 0x0101010101010101ULL * (uint64_t)b1;
  const uint64_t pattern2 = 0x0101010101010101ULL * (uint64_t)b2;
  const uint64_t pattern3 = 0x0101010101010101ULL * (uint64_t)b3;
  const uint8_t * p = (const uint8_t *) data;
  const uint8_t * end = (const uint8_t *) data + len - 2;
  while (p + 6 < end) {
    uint64_t chunk = *(uint64_t *)p;
    uint64_t x1 = chunk ^ pattern1;
    uint64_t x2 = chunk ^ pattern2;
    uint64_t x3 = chunk ^ pattern3;
    uint64_t hit1 = (x1 - 0x0101010101010101ULL) & ~x1 & 0x8080808080808080ULL;
    uint64_t hit2 = (x2 - 0x0101010101010101ULL) & ~x2 & 0x8080808080808080ULL;
    uint64_t hit3 = (x3 - 0x0101010101010101ULL) & ~x3 & 0x8080808080808080ULL;
    uint64_t hc2 = (hit3 >> 16) & (hit2 >> 8) & hit1;
    if (hc2) {
      for (int i = 0; i < 6; i++) {
        if (p[i] == b1 && p[i + 1] == b2 && p[i + 2] == b3) return p + i - (const uint8_t *) data;
      }
    }
    p += 6;
  }
  for (; p < end; p++) {
    if (p[0] == b1 && p[1] == b2 && p[2] == b3) return p - (const uint8_t *) data;
  }
  return len;
}

NOINLINE size_t memchr3_portable_middle_qword(const void * data, size_t len, uint32_t needle) {
  if (len < 3) return len;
  uint8_t b1 = (uint8_t)(needle);
  uint8_t b2 = (uint8_t)(needle >> 8);
  uint8_t b3 = (uint8_t)(needle >> 16);
  const uint64_t pattern1 = 0x0101010101010101ULL * (uint64_t)b1;
  const uint64_t pattern3 = 0x0101010101010101ULL * (uint64_t)b3;
  const uint8_t * p = (const uint8_t *) data;
  const uint8_t * end = (const uint8_t *) data + len - 2;
  while (p + 6 < end) {
    uint64_t chunk = *(uint64_t *)p;
    uint64_t x1 = chunk ^ pattern1;
    uint64_t x3 = chunk ^ pattern3;
    uint64_t hit1 = (x1 - 0x0101010101010101ULL) & ~x1 & 0x8080808080808080ULL;
    uint64_t hit3 = (x3 - 0x0101010101010101ULL) & ~x3 & 0x8080808080808080ULL;
    uint64_t hc2 = (hit3 >> 16) & hit1;
    if (hc2) {
      for (int i = 0; i < 6; i++) {
        if (p[i] == b1 && p[i + 1] == b2 && p[i + 2] == b3) return p + i - (const uint8_t *) data;
      }
    }
    p += 6;
  }
  for (; p < end; p++) {
    if (p[0] == b1 && p[1] == b2 && p[2] == b3) return p - (const uint8_t *) data;
  }
  return len;
}

NOINLINE size_t memchr3_portable_fb_qword(const void * data, size_t len, uint32_t needle) {
  if (len < 3) return len;
  uint8_t b1 = (uint8_t)(needle);
  uint8_t b2 = (uint8_t)(needle >> 8);
  uint8_t b3 = (uint8_t)(needle >> 16);
  const uint64_t pattern1 = 0x0101010101010101ULL * (uint64_t)b1;
  const uint8_t * p = (const uint8_t *) data;
  const uint8_t * end = (const uint8_t *) data + len - 2;
  while (p + 6 < end) {
    uint64_t chunk = *(uint64_t *)p;
    uint64_t x1 = chunk ^ pattern1;
    uint64_t hit1 = (x1 - 0x0101010101010101ULL) & ~x1 & 0x8080808080808080ULL;
    if (hit1) {
      for (int i = 0; i < 6; i++) {
        if (p[i] == b1 && p[i + 1] == b2 && p[i + 2] == b3) return p + i - (const uint8_t *) data;
      }
    }
    p += 6;
  }
  for (; p < end; p++) {
    if (p[0] == b1 && p[1] == b2 && p[2] == b3) return p - (const uint8_t *) data;
  }
  return len;
}

The runtimes suggest that for real data, searching the first and last bytes is the best compromise:

Testing on uniformly random data...
buf_size=67108864 bytes, queries=65536, iters=10, seed=1
memchr3_simple          time=3838123745 ns  checksum=0x64135f1aecd4b15f
memchr3_rolling         time=6907529497 ns  checksum=0x64135f1aecd4b15f
memchr3_qword           time=3501706757 ns  checksum=0x64135f1aecd4b15f
memchr3_middle          time=2558240974 ns  checksum=0x64135f1aecd4b15f
memchr3_fb              time=2119827609 ns  checksum=0x64135f1aecd4b15f

Estimated throughput (GiB/s):
  memchr3_simple   : 5.498
  memchr3_rolling  : 3.055
  memchr3_qword    : 6.027
  memchr3_middle   : 8.249
  memchr3_fb       : 9.955

Testing on enwik8 with sampled needles...
memchr3_simple          time=231706636 ns  checksum=0x49c24b926896aabe
memchr3_rolling         time=206304819 ns  checksum=0x49c24b926896aabe
memchr3_qword           time=126368474 ns  checksum=0x49c24b926896aabe
memchr3_middle          time=110629408 ns  checksum=0x49c24b926896aabe
memchr3_fb              time=194372178 ns  checksum=0x49c24b926896aabe

Estimated throughput (GiB/s):
  memchr3_simple   : 5.401
  memchr3_rolling  : 6.066
  memchr3_qword    : 9.903
  memchr3_middle   : 11.312
  memchr3_fb       : 6.439

Conversely, because the random data is, well, random, checking only the first byte is the best option there as it filters out most candidates with minimal overhead.

1.4.2. SSE searcher.

We will naively extend the unaligned load idea from before:

NOINLINE size_t memchr3_sse2_bmi1(const void * ptr, size_t n, uint32_t needle) {
  const uint8_t *p = (const uint8_t *)ptr;
  if (n < 3) return n;
  const uint8_t b0 = (uint8_t)(needle);
  const uint8_t b1 = (uint8_t)(needle >> 8);
  const uint8_t b2 = (uint8_t)(needle >> 16);
  const __m128i v0 = _mm_set1_epi8((char)b0);
  const __m128i v1 = _mm_set1_epi8((char)b1);
  const __m128i v2 = _mm_set1_epi8((char)b2);
  size_t i = 0;
  for (; i + 18 <= n; i += 16) {
    __m128i a = _mm_loadu_si128((const __m128i *)(p + i));
    __m128i b = _mm_loadu_si128((const __m128i *)(p + i + 1));
    __m128i c = _mm_loadu_si128((const __m128i *)(p + i + 2));
    __m128i eq0 = _mm_cmpeq_epi8(a, v0);
    __m128i eq1 = _mm_cmpeq_epi8(b, v1);
    __m128i eq2 = _mm_cmpeq_epi8(c, v2);
    __m128i m01  = _mm_and_si128(eq0, eq1);
    __m128i both = _mm_and_si128(m01, eq2);
    unsigned mask = (unsigned)_mm_movemask_epi8(both);
    if (mask) return i + (size_t)_tzcnt_u32(mask);
  }
  for (; i + 2 < n; ++i) {
    if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
  }
  return n;
}

We see the following performance characteristics:

buf_size=67108864 bytes, queries=65536, iters=10, seed=1
memchr3_simple          time=3829049092 ns  checksum=0x64135f1aecd4b15f
memchr3_rolling         time=6954385344 ns  checksum=0x64135f1aecd4b15f
memchr3_qword           time=3522594643 ns  checksum=0x64135f1aecd4b15f
memchr3_middle          time=2594670220 ns  checksum=0x64135f1aecd4b15f
memchr3_fb              time=2152706551 ns  checksum=0x64135f1aecd4b15f
memchr3_sse2_bmi1       time=612184228 ns  checksum=0x64135f1aecd4b15f

Estimated throughput (GiB/s):
  memchr3_simple   : 5.511
  memchr3_rolling  : 3.035
  memchr3_qword    : 5.991
  memchr3_middle   : 8.133
  memchr3_fb       : 9.803
  memchr3_sse2_bmi1: 34.472

Testing on enwik8 with sampled needles...
memchr3_simple          time=232567051 ns  checksum=0x49c24b926896aabe
memchr3_rolling         time=208465357 ns  checksum=0x49c24b926896aabe
memchr3_qword           time=127860831 ns  checksum=0x49c24b926896aabe
memchr3_middle          time=112110693 ns  checksum=0x49c24b926896aabe
memchr3_fb              time=196291157 ns  checksum=0x49c24b926896aabe
memchr3_sse2_bmi1       time=56776812 ns  checksum=0x49c24b926896aabe

Estimated throughput (GiB/s):
  memchr3_simple   : 5.381
  memchr3_rolling  : 6.003
  memchr3_qword    : 9.788
  memchr3_middle   : 11.163
  memchr3_fb       : 6.376
  memchr3_sse2_bmi1: 22.042

The SSSE3 versions (basic, unrolled) which avoid excessive loads still do not prove themselves worthwhile:

NOINLINE size_t memchr3_ssse3_bmi1(const void * ptr, size_t n, uint32_t needle) {
  const uint8_t *p = (const uint8_t *)ptr;
  if (n < 3) return n;
  const uint8_t b0 = (uint8_t)needle;
  const uint8_t b1 = (uint8_t)(needle >> 8);
  const uint8_t b2 = (uint8_t)(needle >> 16);
  const __m128i v0 = _mm_set1_epi8((char)b0);
  const __m128i v1 = _mm_set1_epi8((char)b1);
  const __m128i v2 = _mm_set1_epi8((char)b2);
  size_t i = 0;
  if (n >= 32) {
    __m128i A = _mm_loadu_si128((const __m128i *)(p + 0));
    for (; i + 32 <= n; i += 16) {
      __m128i N  = _mm_loadu_si128((const __m128i *)(p + i + 16));
      __m128i B1 = _mm_alignr_epi8(N, A, 1); // SSSE3
      __m128i B2 = _mm_alignr_epi8(N, A, 2); // SSSE3
      __m128i eq0  = _mm_cmpeq_epi8(A,  v0);
      __m128i eq1  = _mm_cmpeq_epi8(B1, v1);
      __m128i eq2  = _mm_cmpeq_epi8(B2, v2);
      __m128i m01  = _mm_and_si128(eq0, eq1);
      __m128i both = _mm_and_si128(m01, eq2);
      unsigned mask = (unsigned)_mm_movemask_epi8(both);
      if (mask) return i + (size_t)_tzcnt_u32(mask); // BMI1
      A = N;
    }
  }
  for (; i + 2 < n; ++i) {
    if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
  }
  return n;
}

static inline unsigned mask3_ssse3(__m128i A, __m128i B, __m128i v0, __m128i v1, __m128i v2) {
  __m128i B1 = _mm_alignr_epi8(B, A, 1);
  __m128i B2 = _mm_alignr_epi8(B, A, 2);
  __m128i m  = _mm_and_si128(_mm_and_si128(_mm_cmpeq_epi8(A,  v0), _mm_cmpeq_epi8(B1, v1)), _mm_cmpeq_epi8(B2, v2));
  return (unsigned)_mm_movemask_epi8(m);
}

NOINLINE size_t memchr3_ssse3_fast(const void * ptr, size_t n, uint32_t needle) {
  const uint8_t * p = (const uint8_t*) ptr;
  if (n < 3) return n;
  const uint8_t b0 = (uint8_t)needle;
  const uint8_t b1 = (uint8_t)(needle >> 8);
  const uint8_t b2 = (uint8_t)(needle >> 16);
  const __m128i v0 = _mm_set1_epi8((char)b0);
  const __m128i v1 = _mm_set1_epi8((char)b1);
  const __m128i v2 = _mm_set1_epi8((char)b2);
  size_t i = 0;
  if (n >= 32) {
    __m128i A = _mm_loadu_si128((const __m128i *)(p + 0));
    i = 0;
    for (; i + 80 <= n; i += 64) {
      __m128i B = _mm_loadu_si128((const __m128i *)(p + i + 16));
      __m128i C = _mm_loadu_si128((const __m128i *)(p + i + 32));
      __m128i D = _mm_loadu_si128((const __m128i *)(p + i + 48));
      __m128i E = _mm_loadu_si128((const __m128i *)(p + i + 64));
      unsigned m0 = mask3_ssse3(A, B, v0, v1, v2);
      unsigned m1 = mask3_ssse3(B, C, v0, v1, v2);
      unsigned m2 = mask3_ssse3(C, D, v0, v1, v2);
      unsigned m3 = mask3_ssse3(D, E, v0, v1, v2);
      if (m0 | m1 | m2 | m3) {
          if (m0) return i + 0  + (size_t)_tzcnt_u32(m0);
          if (m1) return i + 16 + (size_t)_tzcnt_u32(m1);
          if (m2) return i + 32 + (size_t)_tzcnt_u32(m2);
          return         i + 48 + (size_t)_tzcnt_u32(m3);
      }
      A = E;
    }
    for (; i + 32 <= n; i += 16) {
      __m128i B = _mm_loadu_si128((const __m128i*)(p + i + 16));
      unsigned m = mask3_ssse3(A, B, v0, v1, v2);
      if (m) return i + (size_t)_tzcnt_u32(m);
      A = B;
    }
  }
  for (; i + 2 < n; ++i) {
    if (p[i] == b0 && p[i+1] == b1 && p[i+2] == b2) return i;
  }
  return n;
}

Giving:

buf_size=67108864 bytes, queries=65536, iters=10, seed=1

Testing on uniformly random data...
Estimated throughput (GiB/s):
  memchr3_sse2_bmi1   : 34.752
  memchr3_ssse3_bmi1  : 31.485
  memchr3_ssse3_fast  : 33.956

Testing on enwik8 with sampled needles...
Estimated throughput (GiB/s):
  memchr3_sse2_bmi1   : 22.036
  memchr3_ssse3_bmi1  : 21.842
  memchr3_ssse3_fast  : 19.704

However, we can once again re-use the idea of checking the first and last positions as a pre-filter, and together with some unrolling attain better performance:

NOINLINE size_t memchr3_ssse3_prefilter_bmi1(const void * ptr, size_t n, uint32_t needle) {
  const uint8_t * p = (const uint8_t *)ptr;
  if (n < 3) return n;
  const uint8_t b0 = (uint8_t)(needle);
  const uint8_t b1 = (uint8_t)(needle >> 8);
  const uint8_t b2 = (uint8_t)(needle >> 16);
  const __m128i v0 = _mm_set1_epi8((char)b0);
  const __m128i v1 = _mm_set1_epi8((char)b1);
  const __m128i v2 = _mm_set1_epi8((char)b2);
  size_t i = 0;
  // Scalar until 16B alignment
  uintptr_t mis = (uintptr_t)p & 15u;
  if (mis) {
    size_t lim = 16u - (size_t)mis;
    if (lim > n) lim = n;
    for (; i < lim && i + 2 < n; ++i) {
      if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
    }
    if (i + 2 >= n) return n;
  }
  // Now p+i is 16B aligned.
  const uint8_t * q = p + i;
  size_t m = n - i;
  // Need at least 32 bytes so that for starts in [0..15], bytes +1 and +2 exist in A/N window.
  if (m >= 32) {
    __m128i A = _mm_load_si128((const __m128i*)q);
    size_t j = 0;
    // 64B unroll: 4x16B per iteration, one aligned load per step.
    for (; j + 64 <= m; j += 64) {
      __m128i N0  = _mm_load_si128((const __m128i*)(q + j + 16));
      __m128i B10 = _mm_alignr_epi8(N0, A, 1);
      __m128i B20 = _mm_alignr_epi8(N0, A, 2);
      unsigned m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A,  v0));
      unsigned m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B20, v2));
      unsigned cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B10, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + (size_t)_tzcnt_u32(hit);
      }
      __m128i A1  = N0;
      __m128i N1  = _mm_load_si128((const __m128i*)(q + j + 32));
      __m128i B11 = _mm_alignr_epi8(N1, A1, 1);
      __m128i B21 = _mm_alignr_epi8(N1, A1, 2);
      m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A1,  v0));
      m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B21, v2));
      cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B11, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + 16 + (size_t)_tzcnt_u32(hit);
      }
      __m128i A2  = N1;
      __m128i N2  = _mm_load_si128((const __m128i*)(q + j + 48));
      __m128i B12 = _mm_alignr_epi8(N2, A2, 1);
      __m128i B22 = _mm_alignr_epi8(N2, A2, 2);
      m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A2,  v0));
      m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B22, v2));
      cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B12, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + 32 + (size_t)_tzcnt_u32(hit);
      }
      __m128i A3  = N2;
      __m128i N3  = _mm_load_si128((const __m128i*)(q + j + 64));
      __m128i B13 = _mm_alignr_epi8(N3, A3, 1);
      __m128i B23 = _mm_alignr_epi8(N3, A3, 2);
      m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A3,  v0));
      m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B23, v2));
      cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B13, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + 48 + (size_t)_tzcnt_u32(hit);
      }
      A = N3;
    }
    // Finish remaining aligned 16B steps while we still have a full A/N window (>= 32 bytes remaining).
    for (; j + 32 <= m; j += 16) {
      __m128i N  = _mm_load_si128((const __m128i*)(q + j + 16));
      __m128i B1 = _mm_alignr_epi8(N, A, 1);
      __m128i B2 = _mm_alignr_epi8(N, A, 2);
      unsigned m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A,  v0));
      unsigned m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B2, v2));
      unsigned cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B1, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + (size_t)_tzcnt_u32(hit);
      }
      A = N;
    }
    i += j;
  }
  for (; i + 2 < n; ++i) {
    if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
  }
  return n;
}

The performance has improved and the code has no glaring red stall spots as evidenced by perf, hence we are satisfied with the following outcomes:

buf_size=67108864 bytes, queries=65536, iters=10, seed=1

Testing on uniformly random data...
memchr3_simple          time=3849644032 ns  checksum=0x64135f1aecd4b15f
memchr3_rolling         time=6982241710 ns  checksum=0x64135f1aecd4b15f
memchr3_qword           time=3545738908 ns  checksum=0x64135f1aecd4b15f
memchr3_middle          time=2592279127 ns  checksum=0x64135f1aecd4b15f
memchr3_fb              time=2163220216 ns  checksum=0x64135f1aecd4b15f
memchr3_sse2_bmi1       time=607247793 ns  checksum=0x64135f1aecd4b15f
memchr3_ssse3_bmi1      time=670266244 ns  checksum=0x64135f1aecd4b15f
memchr3_ssse3_fast      time=621495733 ns  checksum=0x64135f1aecd4b15f
memchr3_ssse3_prefilt   time=570113226 ns  checksum=0x64135f1aecd4b15f

Estimated throughput (GiB/s):
  memchr3_simple      : 5.482
  memchr3_rolling     : 3.022
  memchr3_qword       : 5.952
  memchr3_middle      : 8.141
  memchr3_fb          : 9.756
  memchr3_sse2_bmi1   : 34.752
  memchr3_ssse3_bmi1  : 31.485
  memchr3_ssse3_fast  : 33.956
  memchr3_ssse3_prefilt: 37.016

Testing on enwik8 with sampled needles...

memchr3_simple          time=234533252 ns  checksum=0x49c24b926896aabe
memchr3_rolling         time=208617318 ns  checksum=0x49c24b926896aabe
memchr3_qword           time=128926290 ns  checksum=0x49c24b926896aabe
memchr3_middle          time=112717791 ns  checksum=0x49c24b926896aabe
memchr3_fb              time=197571431 ns  checksum=0x49c24b926896aabe
memchr3_sse2_bmi1       time=56792236 ns  checksum=0x49c24b926896aabe
memchr3_ssse3_bmi1      time=57297401 ns  checksum=0x49c24b926896aabe
memchr3_ssse3_fast      time=63515382 ns  checksum=0x49c24b926896aabe
memchr3_ssse3_prefilt   time=65642677 ns  checksum=0x49c24b926896aabe

Estimated throughput (GiB/s):
  memchr3_simple      : 5.336
  memchr3_rolling     : 5.999
  memchr3_qword       : 9.707
  memchr3_middle      : 11.103
  memchr3_fb          : 6.334
  memchr3_sse2_bmi1   : 22.036
  memchr3_ssse3_bmi1  : 21.842
  memchr3_ssse3_fast  : 19.704
  memchr3_ssse3_prefilt: 19.065

1.4.3. AVX2 searcher.

We simply widen the SSE2 approach to AVX2 and use the same solution as before to the problem of lane-wise operations:

NOINLINE size_t memchr3_avx2_prefilter_bmi1(const void * ptr, size_t n, uint32_t needle) {
  const uint8_t * p = (const uint8_t*)ptr;
  if (n < 3) return n;
  const uint8_t b0 = (uint8_t)(needle);
  const uint8_t b1 = (uint8_t)(needle >> 8);
  const uint8_t b2 = (uint8_t)(needle >> 16);
  const __m256i v0 = _mm256_set1_epi8((char)b0);
  const __m256i v1 = _mm256_set1_epi8((char)b1);
  const __m256i v2 = _mm256_set1_epi8((char)b2);
  size_t i = 0;
  uintptr_t mis = (uintptr_t)p & 31u;
  if (mis) {
    size_t lim = 32u - (size_t)mis;
    if (lim > n) lim = n;
    for (; i < lim && i + 2 < n; ++i) {
      if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
    }
    if (i + 2 >= n) return n;
  }
  const uint8_t* q = p + i;
  size_t m = n - i;
  if (m >= 64) {
    __m256i A = _mm256_load_si256((const __m256i*)(const void*)q);
    size_t j = 0;
    // 128B unroll: 4x32B steps per iteration.
    for (; j + 128 <= m; j += 128) {
      __m256i N0 = _mm256_load_si256((const __m256i*)(const void*)(q + j + 32));
      __m256i X0 = _mm256_permute2x128_si256(A, N0, 0x21); // [A_hi | N0_lo]
      __m256i B10 = _mm256_alignr_epi8(X0, A, 1);      // bytes at +1
      __m256i B20 = _mm256_alignr_epi8(X0, A, 2);      // bytes at +2
      unsigned m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A,   v0));
      unsigned m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B20, v2));
      unsigned cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B10, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + (size_t)_tzcnt_u32(hit);
      }
      __m256i A1 = N0;
      __m256i N1 = _mm256_load_si256((const __m256i*)(const void*)(q + j + 64));
      __m256i X1 = _mm256_permute2x128_si256(A1, N1, 0x21);
      __m256i B11 = _mm256_alignr_epi8(X1, A1, 1);
      __m256i B21 = _mm256_alignr_epi8(X1, A1, 2);
      m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A1,  v0));
      m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B21, v2));
      cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B11, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + 32 + (size_t)_tzcnt_u32(hit);
      }
      __m256i A2 = N1;
      __m256i N2 = _mm256_load_si256((const __m256i*)(const void*)(q + j + 96));
      __m256i X2 = _mm256_permute2x128_si256(A2, N2, 0x21);
      __m256i B12 = _mm256_alignr_epi8(X2, A2, 1);
      __m256i B22 = _mm256_alignr_epi8(X2, A2, 2);
      m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A2,  v0));
      m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B22, v2));
      cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B12, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + 64 + (size_t)_tzcnt_u32(hit);
      }
      __m256i A3 = N2;
      __m256i N3 = _mm256_load_si256((const __m256i*)(const void*)(q + j + 128));
      __m256i X3 = _mm256_permute2x128_si256(A3, N3, 0x21);
      __m256i B13 = _mm256_alignr_epi8(X3, A3, 1);
      __m256i B23 = _mm256_alignr_epi8(X3, A3, 2);
      m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A3,  v0));
      m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B23, v2));
      cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B13, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + 96 + (size_t)_tzcnt_u32(hit);
      }
      A = N3;
    }
    for (; j + 64 <= m; j += 32) {
      __m256i N  = _mm256_load_si256((const __m256i*)(const void*)(q + j + 32));
      __m256i X  = _mm256_permute2x128_si256(A, N, 0x21);
      __m256i B1 = _mm256_alignr_epi8(X, A, 1);
      __m256i B2 = _mm256_alignr_epi8(X, A, 2);
      unsigned m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A,  v0));
      unsigned m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B2, v2));
      unsigned cand = m0 & m2;
      if (cand) {
        unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B1, v1));
        unsigned hit = cand & m1;
        if (hit) return i + j + (size_t)_tzcnt_u32(hit);
      }
      A = N;
    }
    i += j;
  }
  // Tail scalar (covers last < 64 bytes safely).
  for (; i + 2 < n; ++i)
    if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
  return n;
}

The results are still underwhelming:

Testing on uniformly random data...
Estimated throughput (GiB/s):
  memchr3_sse2_bmi1   : 34.412
  memchr3_ssse3_bmi1  : 31.588
  memchr3_ssse3_fast  : 33.978
  memchr3_ssse3_prefilt: 36.525
  memchr3_avx2_prefilt: 41.549

Testing on enwik8 with sampled needles...
Estimated throughput (GiB/s):
  memchr3_sse2_bmi1   : 22.268
  memchr3_ssse3_bmi1  : 22.075
  memchr3_ssse3_fast  : 19.961
  memchr3_ssse3_prefilt: 19.191
  memchr3_avx2_prefilt: 18.313

α. Fun House

While we’re at re-implementing a couple of libc methods, why not strfry? This function randomizes the order of characters in a string via a Fisher-Yates shuffle.

It has been reported that glibc’s implementation is suboptimal and the distribution of characters is not uniform. Later on in the thread, we learn that Ulrich Drepper’s mother was a hamster, while his father smells of elderberries.

We opt in for an approach based on rdseed mixed together with clever use of AES-NI to generate high-quality uniformly random numbers at a good speed, followed by a binary algorithm for sampling indices in range [0..n) without modulo bias or the general slowness of Lemire’s method.

static ALWAYS_INLINE uint64_t rdseed64(void) {
  uint64_t x;
  while (_rdseed64_step(&x) == 0) { }
  return x;
}

#define aes128_keyassist(key, rcon) \
({ \
  __m128i tmp = _mm_aeskeygenassist_si128((key), (rcon)); \
  tmp = _mm_shuffle_epi32((tmp), _MM_SHUFFLE(3,3,3,3)); \
  __m128i k = (key); \
  k = _mm_xor_si128((k), _mm_slli_si128((k), 4)); \
  k = _mm_xor_si128((k), _mm_slli_si128((k), 4)); \
  k = _mm_xor_si128((k), _mm_slli_si128((k), 4)); \
  _mm_xor_si128((k), (tmp)); \
})

static ALWAYS_INLINE void aes128_expand_key(__m128i rk[11], __m128i key) {
  rk[0]  = key;
  rk[1]  = aes128_keyassist(rk[0], 0x01);
  rk[2]  = aes128_keyassist(rk[1], 0x02);
  rk[3]  = aes128_keyassist(rk[2], 0x04);
  rk[4]  = aes128_keyassist(rk[3], 0x08);
  rk[5]  = aes128_keyassist(rk[4], 0x10);
  rk[6]  = aes128_keyassist(rk[5], 0x20);
  rk[7]  = aes128_keyassist(rk[6], 0x40);
  rk[8]  = aes128_keyassist(rk[7], 0x80);
  rk[9]  = aes128_keyassist(rk[8], 0x1B);
  rk[10] = aes128_keyassist(rk[9], 0x36);
}

static ALWAYS_INLINE __m128i aes128_encrypt_block(const __m128i rk[11], __m128i in) {
  __m128i x = _mm_xor_si128(in, rk[0]);
  x = _mm_aesenc_si128(x, rk[1]);
  x = _mm_aesenc_si128(x, rk[2]);
  x = _mm_aesenc_si128(x, rk[3]);
  x = _mm_aesenc_si128(x, rk[4]);
  x = _mm_aesenc_si128(x, rk[5]);
  x = _mm_aesenc_si128(x, rk[6]);
  x = _mm_aesenc_si128(x, rk[7]);
  x = _mm_aesenc_si128(x, rk[8]);
  x = _mm_aesenc_si128(x, rk[9]);
  x = _mm_aesenclast_si128(x, rk[10]);
  return x;
}

typedef struct {
  __m128i rk[11];
  uint64_t ctr_lo, ctr_hi; // 128-bit counter (little-endian increment)
  uint32_t buf[4];         // 16 bytes = 4x u32
  int idx;                 // next unread word
} aesctr_rng_t;

static ALWAYS_INLINE void aesctr_init_rdseed(aesctr_rng_t * g) {
  uint64_t k0 = rdseed64(), k1 = rdseed64();
  uint64_t c0 = rdseed64(), c1 = rdseed64();
  __m128i key = _mm_set_epi64x((long long)k1, (long long)k0);
  aes128_expand_key(g->rk, key);
  g->ctr_lo = c0;  g->ctr_hi = c1;
  g->idx = 4; // force refill on first use
}

static ALWAYS_INLINE void aesctr_refill(aesctr_rng_t * g) {
  __m128i ctr = _mm_set_epi64x((long long)g->ctr_hi, (long long)g->ctr_lo);
  __m128i block = aes128_encrypt_block(g->rk, ctr);
  g->ctr_lo++;
  if (g->ctr_lo == 0) g->ctr_hi++;
  _mm_storeu_si128((__m128i*)g->buf, block);
  g->idx = 0;
}

static ALWAYS_INLINE uint32_t aesctr_u32(aesctr_rng_t * g) {
  if (g->idx >= 4) aesctr_refill(g);
  return g->buf[g->idx++];
}

static ALWAYS_INLINE uint32_t bounded_mask(aesctr_rng_t * g, uint32_t bound) {
  if (bound <= 1) return 0;
  uint32_t range = bound - 1;
  uint32_t mask = ~0u;
  mask >>= __builtin_clz(range | 1u); // mask = 2^b - 1, with 2^b > range
  uint32_t x;
  do { x = aesctr_u32(g) & mask; } while (x > range);
  return x;
}

void strfry_bytes(uint8_t * a, size_t n) {
  if (n <= 1) return;
  if (n - 1 > UINT32_MAX) return;
  aesctr_rng_t g;
  aesctr_init_rdseed(&g);
  for (size_t i = n - 1; i > 0; --i) {
    uint32_t j = bounded_mask(&g, (uint32_t)(i + 1)); // j in [0..i]
    uint8_t tmp = a[i];  a[i] = a[j];  a[j] = tmp;
  }
}

char * strfry(char * s) {
  size_t n = strlen(s);
  strfry_bytes((uint8_t *) s, n);
  return s;
}

Benchmarking against glibc is pointless (it’s not uniform, not fast and not “secure”). We can probe for uniformity using Lehmer codes; a base-64 alphabet that counts permutations by index shows us some concrete data:

m=2  trials=1000000  perms=2
  perm counts: expected=500000.000  min=499783  max=500217  max_rel_dev=0.0004
  chi2(perms)=0.19  dof=1  z≈-0.57
  chi2(pos 0)=0.19  dof=1  z≈-0.57
  chi2(pos 1)=0.19  dof=1  z≈-0.57
  chi2(pos sum)=0.38  (sum dof=2)

m=3  trials=1000000  perms=6
  perm counts: expected=166666.667  min=166046  max=167113  max_rel_dev=0.0037
  chi2(perms)=4.45  dof=5  z≈-0.17
  chi2(pos 0)=0.88  dof=2  z≈-0.56
  chi2(pos 1)=0.63  dof=2  z≈-0.69
  chi2(pos 2)=0.17  dof=2  z≈-0.92
  chi2(pos sum)=1.68  (sum dof=6)

m=4  trials=1000000  perms=24
  perm counts: expected=41666.667  min=41326  max=41934  max_rel_dev=0.0082
  chi2(perms)=18.25  dof=23  z≈-0.70
  chi2(pos 0)=0.89  dof=3  z≈-0.86
  chi2(pos 1)=6.52  dof=3  z≈1.44
  chi2(pos 2)=0.77  dof=3  z≈-0.91
  chi2(pos 3)=2.02  dof=3  z≈-0.40
  chi2(pos sum)=10.19  (sum dof=12)

m=5  trials=1000000  perms=120
  perm counts: expected=8333.333  min=8107  max=8524  max_rel_dev=0.0272
  chi2(perms)=111.13  dof=119  z≈-0.51
  chi2(pos 0)=3.37  dof=4  z≈-0.22
  chi2(pos 1)=6.40  dof=4  z≈0.85
  chi2(pos 2)=3.34  dof=4  z≈-0.23
  chi2(pos 3)=8.38  dof=4  z≈1.55
  chi2(pos 4)=3.47  dof=4  z≈-0.19
  chi2(pos sum)=24.96  (sum dof=20)

m=6  trials=1000000  perms=720
  perm counts: expected=1388.889  min=1269  max=1508  max_rel_dev=0.0863
  chi2(perms)=745.06  dof=719  z≈0.69
  chi2(pos 0)=2.38  dof=5  z≈-0.83
  chi2(pos 1)=5.68  dof=5  z≈0.22
  chi2(pos 2)=11.88  dof=5  z≈2.18
  chi2(pos 3)=4.52  dof=5  z≈-0.15
  chi2(pos 4)=5.23  dof=5  z≈0.07
  chi2(pos 5)=6.85  dof=5  z≈0.58
  chi2(pos sum)=36.54  (sum dof=30)

m=7  trials=1000000  perms=5040
  perm counts: expected=198.413  min=139  max=250  max_rel_dev=0.2994
  chi2(perms)=5022.50  dof=5039  z≈-0.16
  chi2(pos 0)=5.69  dof=6  z≈-0.09
  chi2(pos 1)=5.98  dof=6  z≈-0.01
  chi2(pos 2)=6.84  dof=6  z≈0.24
  chi2(pos 3)=9.99  dof=6  z≈1.15
  chi2(pos 4)=4.28  dof=6  z≈-0.50
  chi2(pos 5)=6.46  dof=6  z≈0.13
  chi2(pos 6)=5.24  dof=6  z≈-0.22
  chi2(pos sum)=44.48  (sum dof=42)

Our code could be adapted to shuffle objects of any size, but since this is a simple exercise in programming, we leave this to the reader. Other interesting functions that we may tackle in the future are strtok, strspn, strrchr, strpbrk or memcmp.

In the next post, we will be looking more closely into longer (>= 4 bytes), sparser (as in, matching with gaps) and finer (non-overlapping 2-bit or 4-bit sequences, useful in Bioinformatics) needles before moving on to string algorithms based on suffix trees, suffix arrays and the BWT and the FM-index in part three.