
Programmers often see regular expressions and the respective matching engines as a complex, almost unattainable blackbox. PCRE spans hundreds of thousands of lines of code. It supports a large and evolving feature set. Lookarounds, backreferences, Unicode properties, and multiple matching modes are all present. On top of this, PCRE includes a JIT compiler that translates patterns into machine code for speed.
While all these features are important for a production-grade regex engine, they can obscure the fundamental concepts behind pattern matching. In this blog post, we will start our journey towards the exploration of the process of building a matcher from scratch, all by ourselves.
1. Exact match ⎇
In this section we focus on exact string matching. The goal is to find occurrences of a given pattern, verbatim within a text. The constrained form of this problem opens up many optimisation tricks and algorithmic techniques that we can leverage.
Much ink has been spilled on this topic and many algorithms have been proposed over the years. Nonetheless, we take a stab at deriving some of the folk wisdom techniques ourselves, bottom-up.
1.1. Exact match of a single character ⎇
The simplest case of exact matching is to find a single character within a text. Programmers are already used to the function strchr from the C standard library, which does exactly that:
#include <cstring>
#include <iostream>
int main() {
const char * text = "Hello, world!";
char target = 'o';
const char * result = std::strchr(text, target);
if (result) {
std::cout << "Found at position: " << (result - text) << std::endl;
} else {
std::cout << "Not found" << std::endl;
}
}
/* Output:
Found at position: 4
*/The function strchr scans the input string text character by character, comparing each character to the target. If a match is found, it returns a pointer to the first occurrence; otherwise, it returns nullptr. A naive implementation of this idea could look like this:
const char * simple_strchr(const char * text, char target) {
while (*text) {
if (*text == target) {
return text;
}
++text;
}
return nullptr;
}A quick skim at the generated assembly code (from clang 21.1 with -O3 optimizations) reveals that the compiler does not bother unrolling the loop or using SIMD instructions, preferring to translate the function almost verbatim to x86-64 assembly:
simple_strchr(char const*, char):
movzx ecx, byte ptr [rdi]
test cl, cl
je .LBB0_1
mov rax, rdi
.LBB0_4:
cmp cl, sil
je .LBB0_2
movzx ecx, byte ptr [rax + 1]
inc rax
test cl, cl
jne .LBB0_4
.LBB0_1:
xor eax, eax
.LBB0_2:
retThe use of C-style strings (i.e., with a null terminator) prevents us from knowing the length of the input string in advance. This limitation hinders our ability to apply certain optimizations, such as processing multiple characters simultaneously using SIMD instructions. One alternative is the memchr function, which takes an explicit length parameter:
int main() {
const char * text = "Hello, world!";
size_t length = std::strlen(text);
char target = 'o';
const char * result = static_cast<const char *>(std::memchr(text, target, length));
if (result) {
std::cout << "Found at position: " << (result - text) << std::endl;
} else {
std::cout << "Not found" << std::endl;
}
}1.1.1. Naive search. ⎇
In this case, the implementation can take advantage of the known length to optimize the search process. Once again we can implement a simple version of memchr ourselves:
#include <cstddef>
const char * simple_memchr(const char * text, char target, size_t length) {
for (size_t i = 0; i < length; ++i) {
if (text[i] == target) {
return &text[i];
}
}
return nullptr;
}1.1.2. Search via SWAR. ⎇
Disappointingly, the compiler chose to not vectorise it either. Our first idea for optimisation is to process multiple characters in each iteration. We will load 8 bytes at a time, treating them as a single integer, and comparing them via the bit-xor (^) operation. If any byte matches the target character, the corresponding byte in the result of the xor will be zero. We can then check for the presence and location of the zero byte using some bit-twiddling tricks. Here is the implementation:
// __restrict__: a GNU extension that tells the compiler that
// the pointer is the only way to access the underlying data.
void * memchr2(const void * __restrict__ s, int c, size_t n) {
const unsigned char * __restrict__ p = (const unsigned char * __restrict__) s;
const unsigned char uc = (unsigned char)c;
// Align to 8 bytes (optional, helps on some CPUs; also avoids unaligned loads)
while (n && ((uintptr_t)p & 7u)) {
if (*p == uc) return (void *)p;
++p; --n;
}
// Broadcast target character to all bytes of a 64-bit word
// E.g. for 'A' (0x41), pattern becomes 0x4141414141414141
const uint64_t pattern = 0x0101010101010101ULL * (uint64_t)uc;
while (n >= 8) {
// Avoids aliasing + unaligned-load issues.
// __builtin_memcpy is non-standard and optimized away by the compiler.
uint64_t w; __builtin_memcpy(&w, p, sizeof(w));
uint64_t x = w ^ pattern;
if (((x - 0x0101010101010101ULL) & ~x & 0x8080808080808080ULL) != 0) {
// Locate first match within this 8-byte block.
for (int i = 0; i < 8; ++i) {
if (p[i] == uc) return (void *)(p + i);
}
}
p += 8; n -= 8;
}
// Tail
while (n--) {
if (*p == uc) return (void *)p;
++p;
}
return NULL;
}Forfeiting some auxiliary explanations and assuming a sufficient level of C-fu in the reader, we can express this more compactly:
const void * memchr2(const void * __restrict__ s, int c, size_t n) {
const unsigned char * __restrict__ p = (const unsigned char * __restrict__) s;
const unsigned char uc = (unsigned char)c;
for (; n && ((uintptr_t)p & 7u); --n, ++p)
if (*p == uc) return (const void *)p;
const uint64_t pattern = 0x0101010101010101ULL * (uint64_t)uc;
for (; n >= 8; n -= 8, p += 8) {
uint64_t w; __builtin_memcpy(&w, p, sizeof(w));
uint64_t x = w ^ pattern;
if (((x - 0x0101010101010101ULL) & ~x & 0x8080808080808080ULL))
for (int i = 0; i < 8; ++i)
if (p[i] == uc) return (const void *)(p + i);
}
for (; n--; ++p) if (*p == uc) return (const void *)p;
return NULL;
}Using a quickly hacked together benchmark harness, we compare the three implementations of memchr for throughput on my Ryzen 7 PRO 7840U laptop:
buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr time=4093754278 ns checksum=0x06d1490b613edffc
simple_memchr time=13403055073 ns checksum=0x06d1490b613edffc
memchr2 time=10869952023 ns checksum=0x06d1490b613edffc
Estimated throughput (GiB/s):
glibc_memchr : 259.694
simple_memchr: 79.320 // 3.27x slower than glibc, 1.22x slower than memchr2
memchr2 : 97.804 // 2.67x slower than glibcSkewing the test so that the latency becomes more prominent (smaller buffers) yields:
buf_size=1024 bytes, queries=65536, iters=500, seed=1
glibc_memchr time=401259363 ns checksum=0x18c93a5a77e26bd1
simple_memchr time=1493447878 ns checksum=0x18c93a5a77e26bd1
memchr2 time=977337739 ns checksum=0x18c93a5a77e26bd1
Estimated throughput (GiB/s):
glibc_memchr : 36.016
simple_memchr: 9.677 // 4x slower than glibc, 1.55x slower than memchr2
memchr2 : 14.787 // 2.57x slower than glibcOur SWAR (SIMD Within A Register) implementation of memchr is about 2.5-3x slower than the highly optimized glibc version, which is not bad for a first attempt.
1.1.3. Search via SSE4.2 PCMPESTRI. ⎇
There is still room for improvement, and we will reach for an indispensable trick: x86-64 SIMD intrinsics. We will specifically target the PCMPESTRI instruction from the SSE4.2 instruction set, which essentially does the bulk of our work for us.
To quote the Intel® 64 and IA-32 Architectures Software Developer’s Manual, Volume 2B:
PCMPESTRI — Packed Compare Explicit Length Strings, Return Index
Instruction Summary ⎇
Opcode / Instruction Op/En 64/32-bit Mode CPUID Flag Description 66 0F 3A 61 /r imm8PCMPESTRI xmm1, xmm2/m128, imm8RMI V / V SSE4_2 Perform a packed comparison of string data with explicit lengths, generate an index, and store the result in ECX. VEX.128.66.0F3A 61 /r ibVPCMPESTRI xmm1, xmm2/m128, imm8RMI V / V AVX Perform a packed comparison of string data with explicit lengths, generate an index, and store the result in ECX. Instruction Operand Encoding ⎇
Op/En Operand 1 Operand 2 Operand 3 Operand 4 RMI ModRM:reg (r) ModRM:r/m (r) imm8 N/A Description ⎇
The instruction compares and processes data from two string fragments based on the encoded value in the imm8 control byte (see Section 4.1, “Imm8 Control Byte Operation for PCMPESTRI / PCMPESTRM / PCMPISTRI / PCMPISTRM”).
It generates an index that is written to the count register ECX.Each string fragment is represented by two values:
- Data vector
- An xmm register for the first operand.
- An xmm register or m128 memory operand for the second operand.
- Contains byte or word elements of the string.
- Length
- For
xmm1: length is taken from EAX/RAX.- For
xmm2/m128: length is taken from EDX/RDX.- The length specifies how many bytes or words are valid in the corresponding vector.
Length Interpretation ⎇
- The effective length is the absolute value of the length register.
- The absolute-value computation saturates:
- Bytes: maximum 16
- Words: maximum 8
- Saturation occurs when the length register value is:
- Greater than 16 (or 8 for words), or
- Less than −16 (or −8 for words)
- Whether data elements are bytes or words is determined by imm8[3].
Comparison and Result ⎇
- Comparison and aggregation behavior is controlled by fields in imm8 (see Section 4.1).
- The instruction computes an intermediate result mask (IntRes2, see Section 4.1.4).
- The index of the first or last set bit of IntRes2 is returned in ECX, depending on imm8[6]:
- If imm8[6] = 0: first (least significant) set bit
- If imm8[6] = 1: last (most significant) set bit
- If no bits are set in IntRes2:
- ECX = 16 (byte elements)
- ECX = 8 (word elements)
We proceed with the implementation in the same vein as the previous SWAR memchr2 function:
#include <nmmintrin.h>
void * memchr_sse42(const void * s, int c, size_t n) {
const unsigned char * p = (const unsigned char *) s;
const unsigned char uc = (unsigned char) c;
const __m128i needle = _mm_cvtsi32_si128((int) uc);
for (; n && ((uintptr_t)p & 15u); --n, ++p) // Align to 16-byte boundary.
if (*p == uc) return (void *)p;
// Compare any byte in hay equals any byte in needle-set (size 1).
const int imm8 = _SIDD_UBYTE_OPS | _SIDD_CMP_EQUAL_ANY | _SIDD_LEAST_SIGNIFICANT;
for (; n >= 16; n -= 16, p += 16) {
const int idx = _mm_cmpestri(needle, 1, _mm_load_si128((const __m128i *) p), 16, imm8);
if (idx != 16) return (void *)(p + (unsigned)idx);
}
for (; n; --n, ++p) if (*p == uc) return (void *)p;
return nullptr;
}The results are promising. Our implementation is now only about 2x slower than glibc's highly optimized version:
buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr time=4129417820 ns checksum=0xcdc64e6fe915f3ed
simple_memchr time=13248285362 ns checksum=0xcdc64e6fe915f3ed
memchr2 time=10671500036 ns checksum=0xcdc64e6fe915f3ed
memchr_sse42 time=8801114604 ns checksum=0xcdc64e6fe915f3ed
Estimated throughput (GiB/s):
glibc_memchr : 257.451
simple_memchr: 80.246
memchr2 : 99.623
memchr_sse42 : 120.794The source of the remaining gap is a disappointing but frequently recurring story in performance engineering on x86-64 platforms. Looking up the instruction on uops.info reveals the following picture for my Zen4 CPU:
| Instruction | Lat | TP | Uops | Ports |
|---|---|---|---|---|
| PCMPESTRI (XMM, M128, I8) | [≤11; ≤29] | 2.00 / 4.00 | 12 | 1*FP01 + 2*FP1 + 2*FP45 |
| PCMPESTRI (XMM, XMM, I8) | [≤11; 15] | 2.00 / 3.00 | 8 | 1*FP01 + 2*FP1 + 2*FP45 |
| PCMPESTRI64 (XMM, M128, I8) | [≤11; ≤29] | 2.00 / 4.00 | 12 | 1*FP01 + 2*FP1 + 2*FP45 |
| PCMPESTRI64 (XMM, XMM, I8) | [≤11; 15] | 2.00 / 3.00 | 8 | 1*FP01 + 2*FP1 + 2*FP45 |
The instruction has a high latency (up to 29 cycles when reading from memory!) and consumes multiple micro-operations (uops) that occupy several execution ports. This means that even though we are processing 16 bytes at a time, the instruction's throughput is limited by its latency and port usage. A natural conclusion would be that glibc clearly uses more advanced techniques that certainly don't involve PCMPESTRI. But why? Wasn't the point of CISC CPU architectures to have complex instructions that do more work per instruction?
This is neither the first nor last time we see that complex, purpose-built instructions do not necessarily lead to better performance. It's quite difficult to wrap one's head around such a paradox. Other examples include:
REP STOSBis not the fastest way to zero out memory (memset/bzero), despite being a single instruction that does exactly that.- Even more counterintuitively, seemingly unrolled
REP STOSQis typically even slower thanREP STOSBfor the same purpose. The reason is in fact simple: the microcode that makesREP STOSBrelatively efficient is simply not present for this opcode. REP MOVSBis not the fastest way to copy memory (memcpy), even though it is designed for that purpose.REPNE SCASBis not the fastest way to search for a byte in memory (memchr), despite being a single instruction that performs that operation. And finally, here we are: another purpose-built instruction,PCMPESTRI, that does not deliver the expected performance boost.
Tangentially, on the topic of REPNE SCASB, let's probe the performance of this. The memchr implementation using this instruction is a simple wrapper over inline assembly:
NOINLINE void * memchr_repne_scas(const void * s, int c, size_t n) {
if (n == 0) return NULL;
const unsigned char * p = (const unsigned char *) s;
void * ret;
__asm__ volatile(
"cld\n\t"
"repne scasb\n\t"
"jne 1f\n\t" // if ZF==0 (no match), jump to notfound
"lea -1(%%rdi), %0\n\t" // match: ret = rdi-1
"jmp 2f\n\t"
"1:\n\t"
"xor %0, %0\n\t" // not found: ret = NULL
"2:\n\t"
: "=&r"(ret), "+D"(p), "+c"(n)
: "a"((unsigned char)c)
: "cc", "memory");
return ret;
}As expected, the performance is underwhelming, but but the biggest surprise is that it is even worse than our naive simple_memchr implementation:
buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr time=4364084308 ns checksum=0x90e66e3000583c48
simple_memchr time=14458207884 ns checksum=0x90e66e3000583c48
memchr2 time=12134840048 ns checksum=0x90e66e3000583c48
memchr_sse42 time=9476436712 ns checksum=0x90e66e3000583c48
repne_scas time=17571507777 ns checksum=0x90e66e3000583c48
Estimated throughput (GiB/s):
glibc_memchr : 243.608
simple_memchr: 73.531
memchr2 : 87.609
memchr_sse42 : 112.186
repne_scas : 60.5031.1.4. Search via SSE2 PCMPEQ + PMOVMSKB. ⎇
To go one step further, we can come back to the SSE4.2, PCMPESTRI-based implementation and simply use two instructions with lower latency: PCMPEQ (compare), followed by PMOVMSKB (move mask). This combination allows us to achieve similar functionality with reduced latency and better throughput. We will also forfeit the alignment loop: since we have tied to machine specific code, we are free to use unaligned loads. Here is the implementation:
void * memchr_sse2(const void * s, int c, size_t n) {
const unsigned char * p = (const unsigned char *)s;
const unsigned char uc = (unsigned char)c;
const __m128i needle = _mm_set1_epi8((char) uc);
for (; n && ((uintptr_t)p & 15u); --n, ++p)
if (*p == uc) return (void *)p;
for (; n >= 16; n -= 16, p += 16) {
__m128i hay = _mm_load_si128((const __m128i *)p);
__m128i eq = _mm_cmpeq_epi8(hay, needle);
unsigned mask = (unsigned)_mm_movemask_epi8(eq);
if (mask)
return (void *)(p + __builtin_ctz(mask));
}
for (; n--; ++p)
if (*p == uc) return (void *)p;
return NULL;
}To justify this, we come back to the instruction table for Zen 4:
| Instruction | Lat | TP | Uops | Ports |
|---|---|---|---|---|
| PCMPESTRI (XMM, M128, I8) | [≤11; ≤29] | 2.00 / 4.00 | 12 | 1*FP01 + 2*FP1 + 2*FP45 |
| PCMPEQB (XMM, M128) | [1;≤9] | 0.25 / 0.50 | 1 | 1*FP0123 |
| PMOVMSKB (R32, XMM) | ≤6 | 0.50 / 1.00 | 1 | 1*FP45 |
It is clear that this combination of opcodes has better throughput guarantees, throughput and requires less Uops. The execution port usage of the CPU is also better, further suggesting better practical performance. Validation of this result is quite easy:
buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr time=4966514321 ns checksum=0x42a2457f5bb57a25
simple_memchr time=12864543169 ns checksum=0x42a2457f5bb57a25
memchr2 time=10555408424 ns checksum=0x42a2457f5bb57a25
memchr_sse42 time=8888208502 ns checksum=0x42a2457f5bb57a25
repne_scas time=16022187564 ns checksum=0x42a2457f5bb57a25
memchr_sse2 time=6822718398 ns checksum=0x42a2457f5bb57a25
Estimated throughput (GiB/s):
glibc_memchr : 214.058
simple_memchr: 82.640
memchr2 : 100.718
memchr_sse42 : 119.611
repne_scas : 66.353
memchr_sse2 : 155.821The soundness of this optimisation is further suported by the outputs of e.g. the LLVM machine code analyser; we can compare the movemask version and the the PCMPESTRI version.
The MCA dumps are for the same overall structure (scalar align/peel into a 16B loop, followed by scalar tail), but with different 16B core, as illustrated by the C code. In terms of assembly code, we see:
- Variant 1 core:
vpcmpestri xmm0, [rdi], imm8=0(SSE4.2 string-compare instruction) - Variant 2 core:
vpcmpeqb + vpmovmskb + tzcnt(SIMD compare + mask + bit-scan)
f99 over ~100 iterations. Every indicator favours the second version:
| Metric | vpcmpestri | vpcmpeqb + pmovmskb + tzcnt | Analysis |
|---|---|---|---|
| Total cycles | 3199 | 1511 | Variant 2 is, on paper, about 2.1 times faster |
| Instructions | 5000 | 4900 | Similar instruction count |
| Total uOps | 6400 | 5600 | Variant 2 uses fewer uops |
| uOps/cycle | 2.00 | 3.71 | Variant 1 is throughput-starved. |
| IPC | 1.56 | 3.24 | Variant 2 keeps frontend/backend busy |
| Block RThroughput | 16.0 | 14.0 | Variant 2 has a better steady-state loop throughput |
MCA also shows us some auxiliary information about stalls and port usage. In the core, we see vpcmpestri as annotated with "9 uops, lat 23, RThroughput 4.00". It also loads (MayLoad). The reciprocal throughput of 4.00 is particularly damning: in the best case, we can only issue one such instruction every 4 cycles. This is a hard limit on the loop throughput, regardless of other factors.
1.1.5. Search via AVX2 (improved). ⎇
Naturally increasing the vector width to 256 bits (32 bytes) should yield further performance improvements. This is however not the case, or at least it will not be as straightforward as before.
void * memchr_avx2_simple(const void * s, int c, size_t n) {
const unsigned char * p = (const unsigned char *)s;
const unsigned char uc = (unsigned char)c;
const __m256i needle = _mm256_set1_epi8((char) uc);
for (; n && ((uintptr_t)p & 31u); --n, ++p)
if (*p == uc) return (void *)p;
for (; n >= 32; n -= 32, p += 32) {
__m256i hay = _mm256_load_si256((const __m256i *)p);
__m256i eq = _mm256_cmpeq_epi8(hay, needle);
unsigned mask = (unsigned)_mm256_movemask_epi8(eq);
if (mask)
return (void *)(p + __builtin_ctz(mask));
}
for (; n--; ++p)
if (*p == uc) return (void *)p;
return NULL;
}The benchmark results are as follows:
Estimated throughput (GiB/s):
glibc_memchr : 250.816
simple_memchr: 87.112
memchr2 : 97.577
memchr_sse2 : 155.821
memchr_avx2_simple : 171.643The AVX2 version uses the same techniques as the glibc memchr, but we are still quite far away from the 250GiB/s ballpark. The way forward is to capitalise on the broader vector width even more and modify the core loop to scan 128 bytes at a time. This can be achieved by unrolling the loop four times and combining the results using bit-wise OR, scanning for the precise location of the match only when the return is guaranteed:
void * memchr_avx2_simple(const void * s, int c, size_t n) {
const unsigned char * p = (const unsigned char *)s;
const unsigned char uc = (unsigned char)c;
const __m256i needle = _mm256_set1_epi8((char) uc);
for (; n >= 128; n -= 128, p += 128) {
__m256i hay1 = _mm256_loadu_si256((const __m256i *)p);
__m256i hay2 = _mm256_loadu_si256((const __m256i *)(p + 32));
__m256i hay3 = _mm256_loadu_si256((const __m256i *)(p + 64));
__m256i hay4 = _mm256_loadu_si256((const __m256i *)(p + 96));
__m256i eq1 = _mm256_cmpeq_epi8(hay1, needle);
__m256i eq2 = _mm256_cmpeq_epi8(hay2, needle);
__m256i eq3 = _mm256_cmpeq_epi8(hay3, needle);
__m256i eq4 = _mm256_cmpeq_epi8(hay4, needle);
__m256i any = _mm256_or_si256(_mm256_or_si256(eq1, eq2), _mm256_or_si256(eq3, eq4));
unsigned mask = (unsigned)_mm256_movemask_epi8(any);
if (mask) {
unsigned mmask1 = (unsigned)_mm256_movemask_epi8(eq1);
if (mmask1) return (void *)(p + __builtin_ctz(mmask1));
unsigned mmask2 = (unsigned)_mm256_movemask_epi8(eq2);
if (mmask2) return (void *)(p + 32 + __builtin_ctz(mmask2));
unsigned mmask3 = (unsigned)_mm256_movemask_epi8(eq3);
if (mmask3) return (void *)(p + 64 + __builtin_ctz(mmask3));
unsigned mmask4 = (unsigned)_mm256_movemask_epi8(eq4);
return (void *)(p + 96 + __builtin_ctz(mmask4));
}
}
for (; n >= 32; n -= 32, p += 32) {
__m256i hay = _mm256_loadu_si256((const __m256i *)p);
__m256i eq = _mm256_cmpeq_epi8(hay, needle);
unsigned mask = (unsigned)_mm256_movemask_epi8(eq);
if (mask)
return (void *)(p + __builtin_ctz(mask));
}
for (; n--; ++p)
if (*p == uc) return (void *)p;
return NULL;
}This almost brings us to parity with glibc on large buffers:
buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr time=3884077381 ns checksum=0xf3bb7462ab9aa0ae
simple_memchr time=12991746698 ns checksum=0xf3bb7462ab9aa0ae
memchr2 time=10150611634 ns checksum=0xf3bb7462ab9aa0ae
memchr_sse2 time=6738995996 ns checksum=0xf3bb7462ab9aa0ae
memchr_avx2_simple time=4027457053 ns checksum=0xf3bb7462ab9aa0ae
Estimated throughput (GiB/s):
glibc_memchr : 273.713
simple_memchr: 81.831
memchr2 : 104.735
memchr_sse2 : 157.757
memchr_avx2_simple : 263.9691.1.6. Search via SSE2 + BMI (improved). ⎇
The previous AVX2 version uses unaligned loads everywhere (_mm256_loadu_si256), and relies on modern CPUs handling that well enough. This is not a great assumption to make for SSE2 code - if your CPU is old enough to not support AVX2, it is likely to have worse performance for unaligned loads. We can therefore revisit the SSE2 version and improve it by adding proper alignment handling.
The new version will spend a lot of code up front to avoid bad alignment situations and to enable aligned loads (_mm_load_si128) in the hot loops. We will explicitly consider p & 63 (position within a 64-byte cache line), a crosscache path when the initial 16B load would straddle a cache-line boundary (more likely to be slower on some microarchitectures), adjusting p to a 16-byte boundary, then later to a 64-byte boundary (aligned) for the main loop.
The AVX2 code has clean loop tiers: 128B AVX2-unroll, then 32B AVX2, then scalar. Our new version that treats old processors slightly better will do early probing loads that may read past len (still within an aligned block on the same page, which does not trigger a fault) and then checks the found index against len before returning.
For example, if we load 16 bytes even if len < 16, assuming that we don't cross a page boundary (which would cause a segmentation fault), then we can simply do:
idx = ctz(mask); if (idx >= len) return nullptr;This is a typical speculative read within the same page / valid mapped memory assumption-style trick, often used in high-performance code.
The special cases have crept up and lead us to produce the following code:
static inline __m128i splat_u8_sse2(unsigned char c) {
__m128i x = _mm_cvtsi32_si128((int)c);
x = _mm_unpacklo_epi8(x, x);
x = _mm_unpacklo_epi8(x, x);
x = _mm_shuffle_epi32(x, 0);
return x;
}
void * memchr_sse2_bsf(const void * __restrict__ s, int c, size_t n) {
const unsigned char * __restrict__ p0 = (const unsigned char * __restrict__) s;
const __m128i needle = splat_u8_sse2((unsigned char)c);
if (n == 0) return nullptr;
size_t len = n;
const unsigned char *p = p0;
uintptr_t addr = (uintptr_t)p;
unsigned off64 = (unsigned)(addr & 63u); /* if >48 crosscache */
if (off64 <= 48u) {
/* unaligned first 16B */
__m128i v = _mm_loadu_si128((const __m128i *)(const void *)p);
__m128i m = _mm_cmpeq_epi8(needle, v);
unsigned mask = (unsigned)_mm_movemask_epi8(m);
if (mask) {
unsigned idx = __builtin_ctz(mask);
if ((size_t)idx >= len) return nullptr;
return (void *)(p + idx);
}
/* unaligned_no_match_1 */
if (len <= 16) return nullptr;
len -= 16;
const unsigned char *p16 = p + 16;
unsigned a15 = (unsigned)((uintptr_t)p & 15u);
p = (const unsigned char *)((uintptr_t)p16 & ~(uintptr_t)15u);
len += a15;
} else {
/* crosscache */
unsigned off16 = (unsigned)(addr & 15u);
const unsigned char *pal = (const unsigned char *)(addr & ~(uintptr_t)15u);
__m128i v = _mm_load_si128((const __m128i *)(const void *)pal);
__m128i m = _mm_cmpeq_epi8(needle, v);
unsigned mask = (unsigned)_mm_movemask_epi8(m);
mask >>= off16;
if (mask) {
unsigned idx = __builtin_ctz(mask);
if ((size_t)idx >= len) return nullptr;
return (void *)(p0 + idx);
}
{
size_t sum = off16 + len;
if (sum < off16) sum = (size_t)-1;
if (sum <= 16) return nullptr;
len = sum - 16;
}
p = pal + 16;
}
/* early 2x unroll before 64-byte alignment loop */
for (;;) {
if (len <= 64) break;
len -= 64;
__m128i v0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
__m128i m0 = _mm_cmpeq_epi8(needle, v0);
unsigned k0 = (unsigned)_mm_movemask_epi8(m0);
if (k0) return (void *)(p + __builtin_ctz(k0));
__m128i v1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
__m128i m1 = _mm_cmpeq_epi8(needle, v1);
unsigned k1 = (unsigned)_mm_movemask_epi8(m1);
if (k1) return (void *)(p + 16 + __builtin_ctz(k1));
__m128i v2 = _mm_load_si128((const __m128i *)(const void *)(p + 32));
__m128i m2 = _mm_cmpeq_epi8(needle, v2);
unsigned k2 = (unsigned)_mm_movemask_epi8(m2);
if (k2) return (void *)(p + 32 + __builtin_ctz(k2));
__m128i v3 = _mm_load_si128((const __m128i *)(const void *)(p + 48));
__m128i m3 = _mm_cmpeq_epi8(needle, v3);
unsigned k3 = (unsigned)_mm_movemask_epi8(m3);
if (k3) return (void *)(p + 48 + __builtin_ctz(k3));
p += 64;
/* if aligned jump forward, else do another 64 scan */
if ((((uintptr_t)p) & 63u) == 0) break;
if (len <= 64) break;
len -= 64;
__m128i w0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
__m128i n0 = _mm_cmpeq_epi8(needle, w0);
unsigned q0 = (unsigned)_mm_movemask_epi8(n0);
if (q0) return (void *)(p + __builtin_ctz(q0));
__m128i w1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
__m128i n1 = _mm_cmpeq_epi8(needle, w1);
unsigned q1 = (unsigned)_mm_movemask_epi8(n1);
if (q1) return (void *)(p + 16 + __builtin_ctz(q1));
__m128i w2 = _mm_load_si128((const __m128i *)(const void *)(p + 32));
__m128i n2 = _mm_cmpeq_epi8(needle, w2);
unsigned q2 = (unsigned)_mm_movemask_epi8(n2);
if (q2) return (void *)(p + 32 + __builtin_ctz(q2));
__m128i w3 = _mm_load_si128((const __m128i *)(const void *)(p + 48));
__m128i n3 = _mm_cmpeq_epi8(needle, w3);
unsigned q3 = (unsigned)_mm_movemask_epi8(n3);
if (q3) return (void *)(p + 48 + __builtin_ctz(q3));
p += 64;
/* align to 64 and carry remainder into len */
uintptr_t a = (uintptr_t)p;
unsigned r = (unsigned)(a & 63u);
p = (const unsigned char *)(a & ~(uintptr_t)63u);
len += r;
break;
}
/* assumes align of 64 */
while (len > 64) {
len -= 64;
__m128i v0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
__m128i v1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
__m128i v2 = _mm_load_si128((const __m128i *)(const void *)(p + 32));
__m128i v3 = _mm_load_si128((const __m128i *)(const void *)(p + 48));
__m128i m0 = _mm_cmpeq_epi8(needle, v0);
__m128i m1 = _mm_cmpeq_epi8(needle, v1);
__m128i m2 = _mm_cmpeq_epi8(needle, v2);
__m128i m3 = _mm_cmpeq_epi8(needle, v3);
__m128i t0 = _mm_max_epu8(m0, m2);
__m128i t1 = _mm_max_epu8(m1, m3);
__m128i t2 = _mm_max_epu8(t0, t1);
unsigned any = (unsigned)_mm_movemask_epi8(t2);
if (any) {
unsigned k0 = (unsigned)_mm_movemask_epi8(m0);
if (k0) return (void *)(p + __builtin_ctz(k0));
unsigned k1 = (unsigned)_mm_movemask_epi8(m1);
if (k1) return (void *)(p + 16 + __builtin_ctz(k1));
unsigned k2 = (unsigned)_mm_movemask_epi8(m2);
if (k2) return (void *)(p + 32 + __builtin_ctz(k2));
unsigned k3 = (unsigned)_mm_movemask_epi8(m3);
return (void *)(p + 48 + __builtin_ctz(k3));
}
p += 64;
}
/* 0 <= len <= 64 */
{
if (len == 0) return nullptr;
if (len > 32) {
__m128i v0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
__m128i m0 = _mm_cmpeq_epi8(needle, v0);
unsigned k0 = (unsigned)_mm_movemask_epi8(m0);
if (k0) {
unsigned idx = __builtin_ctz(k0);
if ((size_t)idx < len) return (void *)(p + idx);
return nullptr;
}
__m128i v1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
__m128i m1 = _mm_cmpeq_epi8(needle, v1);
unsigned k1 = (unsigned)_mm_movemask_epi8(m1);
if (k1) {
unsigned idx = 16 + __builtin_ctz(k1);
if ((size_t)idx < len) return (void *)(p + idx);
return nullptr;
}
__m128i v2 = _mm_load_si128((const __m128i *)(const void *)(p + 32));
__m128i m2 = _mm_cmpeq_epi8(needle, v2);
unsigned k2 = (unsigned)_mm_movemask_epi8(m2);
if (k2) {
unsigned idx = 32 + __builtin_ctz(k2);
if ((size_t)idx < len) return (void *)(p + idx);
return nullptr;
}
if (len <= 48) return nullptr;
__m128i v3 = _mm_load_si128((const __m128i *)(const void *)(p + 48));
__m128i m3 = _mm_cmpeq_epi8(needle, v3);
unsigned k3 = (unsigned)_mm_movemask_epi8(m3);
if (k3) {
unsigned idx = 48 + __builtin_ctz(k3);
if ((size_t)idx < len) return (void *)(p + idx);
return nullptr;
}
return nullptr;
} else {
__m128i v0 = _mm_load_si128((const __m128i *)(const void *)(p + 0));
__m128i m0 = _mm_cmpeq_epi8(needle, v0);
unsigned k0 = (unsigned)_mm_movemask_epi8(m0);
if (k0) {
unsigned idx = __builtin_ctz(k0);
if ((size_t)idx < len) return (void *)(p + idx);
return nullptr;
}
if (len <= 16) return nullptr;
__m128i v1 = _mm_load_si128((const __m128i *)(const void *)(p + 16));
__m128i m1 = _mm_cmpeq_epi8(needle, v1);
unsigned k1 = (unsigned)_mm_movemask_epi8(m1);
if (k1) {
unsigned idx = 16 + __builtin_ctz(k1);
if ((size_t)idx < len) return (void *)(p + idx);
return nullptr;
}
return nullptr;
}
}
}Benchmarking this version shows that we have attained significantly better throughput:
buf_size=67108864 bytes, queries=65536, iters=500, seed=1
glibc_memchr time=4525973864 ns checksum=0x93785bd40474de84
simple_memchr time=14300729997 ns checksum=0x93785bd40474de84
memchr2 time=11173968344 ns checksum=0x93785bd40474de84
memchr_sse2 time=7703221849 ns checksum=0x93785bd40474de84
memchr_avx2_simple time=4467790687 ns checksum=0x93785bd40474de84
memchr_sse2_bsf time=5780660724 ns checksum=0x93785bd40474de84
Estimated throughput (GiB/s):
glibc_memchr : 234.894
simple_memchr: 74.341
memchr2 : 95.143
memchr_sse2 : 138.010
memchr_avx2_simple : 237.953
memchr_sse2_bsf : 183.9111.1.7 Conclusion ⎇
Through a series of optimisations and careful use of SIMD instructions, we have developed a memchr implementation that approaches the performance of glibc's highly optimized version. They also extend to other similar functions like strlen (equivalent to memchr searching for zero) or strchr (equivalent to memchr searching for either a specific character or zero) with analogous techniques.
A straightforward implementation of strlen given the techniques that we have just covered achieves performance that is on-par with glibc's strlen:
size_t strlen_avx2_simple(const void * __restrict__ s) {
const unsigned char * base = (const unsigned char *) s;
const unsigned char * p = base;
const __m256i zero = _mm256_setzero_si256();
size_t to_page_end = 4096u - ((uintptr_t)(p) & 4095u);
for (; to_page_end >= 128u; to_page_end -= 128u, p += 128u) {
__m256i v1 = _mm256_loadu_si256((const __m256i *)(p + 0));
__m256i v2 = _mm256_loadu_si256((const __m256i *)(p + 32));
__m256i v3 = _mm256_loadu_si256((const __m256i *)(p + 64));
__m256i v4 = _mm256_loadu_si256((const __m256i *)(p + 96));
__m256i e1 = _mm256_cmpeq_epi8(v1, zero);
__m256i e2 = _mm256_cmpeq_epi8(v2, zero);
__m256i e3 = _mm256_cmpeq_epi8(v3, zero);
__m256i e4 = _mm256_cmpeq_epi8(v4, zero);
__m256i any = _mm256_or_si256(_mm256_or_si256(e1, e2),
_mm256_or_si256(e3, e4));
if (_mm256_testz_si256(any, any) == 0) {
unsigned m1 = (unsigned)_mm256_movemask_epi8(e1);
if (m1) return (size_t)((p + (size_t)__builtin_ctz(m1)) - base);
unsigned m2 = (unsigned)_mm256_movemask_epi8(e2);
if (m2) return (size_t)((p + 32u + (size_t)__builtin_ctz(m2)) - base);
unsigned m3 = (unsigned)_mm256_movemask_epi8(e3);
if (m3) return (size_t)((p + 64u + (size_t)__builtin_ctz(m3)) - base);
unsigned m4 = (unsigned)_mm256_movemask_epi8(e4);
return (size_t)((p + 96u + (size_t)__builtin_ctz(m4)) - base);
}
}
for (; to_page_end >= 32u; to_page_end -= 32u, p += 32u) {
__m256i v = _mm256_loadu_si256((const __m256i *)p);
__m256i eq = _mm256_cmpeq_epi8(v, zero);
unsigned mask = (unsigned)_mm256_movemask_epi8(eq);
if (mask) return (size_t)((p + (size_t)__builtin_ctz(mask)) - base);
}
for (; to_page_end; --to_page_end, ++p) {
if (*p == 0) return (size_t)(p - base);
}
for (;;) {
__m256i v1 = _mm256_loadu_si256((const __m256i *)(p + 0));
__m256i v2 = _mm256_loadu_si256((const __m256i *)(p + 32));
__m256i v3 = _mm256_loadu_si256((const __m256i *)(p + 64));
__m256i v4 = _mm256_loadu_si256((const __m256i *)(p + 96));
__m256i e1 = _mm256_cmpeq_epi8(v1, zero);
__m256i e2 = _mm256_cmpeq_epi8(v2, zero);
__m256i e3 = _mm256_cmpeq_epi8(v3, zero);
__m256i e4 = _mm256_cmpeq_epi8(v4, zero);
__m256i any = _mm256_or_si256(_mm256_or_si256(e1, e2),
_mm256_or_si256(e3, e4));
if (_mm256_testz_si256(any, any) == 0) {
unsigned m1 = (unsigned)_mm256_movemask_epi8(e1);
if (m1) return (size_t)((p + (size_t)__builtin_ctz(m1)) - base);
unsigned m2 = (unsigned)_mm256_movemask_epi8(e2);
if (m2) return (size_t)((p + 32u + (size_t)__builtin_ctz(m2)) - base);
unsigned m3 = (unsigned)_mm256_movemask_epi8(e3);
if (m3) return (size_t)((p + 64u + (size_t)__builtin_ctz(m3)) - base);
unsigned m4 = (unsigned)_mm256_movemask_epi8(e4);
return (size_t)((p + 96u + (size_t)__builtin_ctz(m4)) - base);
}
p += 128;
}
}As evidenced by a benchmark in similar vein to the previous ones:
buf_size=67108864 bytes, queries=100000, iters=500, seed=1
strlen_glibc time=1873349773 ns gbps=17.911
strlen_avx2_simple time=2190930772 ns gbps=15.315And finally, an implementation of strchr:
char * strchr_avx2_pagesafe(const char * s, int c) {
const unsigned char * p = (const unsigned char *)s;
const unsigned char uc = (unsigned char)c;
const __m256i needle = _mm256_set1_epi8((char)uc);
const __m256i zero = _mm256_setzero_si256();
for (;;) {
uintptr_t addr = (uintptr_t)p;
size_t bytes_to_page_end = 4096u - (size_t)(addr & 4095u);
while (bytes_to_page_end < 32) {
unsigned char ch = *p;
if (ch == uc) return (char *)p;
if (ch == 0) return NULL;
++p;
++bytes_to_page_end;
if (bytes_to_page_end == PAGE_SIZE) break;
}
addr = (uintptr_t)p;
bytes_to_page_end = PAGE_SIZE - (size_t)(addr & PAGE_MASK);
while (bytes_to_page_end >= 128) {
__m256i hay1 = _mm256_loadu_si256((const __m256i *)(p + 0));
__m256i hay2 = _mm256_loadu_si256((const __m256i *)(p + 32));
__m256i hay3 = _mm256_loadu_si256((const __m256i *)(p + 64));
__m256i hay4 = _mm256_loadu_si256((const __m256i *)(p + 96));
__m256i eq1 = _mm256_cmpeq_epi8(hay1, needle);
__m256i eq2 = _mm256_cmpeq_epi8(hay2, needle);
__m256i eq3 = _mm256_cmpeq_epi8(hay3, needle);
__m256i eq4 = _mm256_cmpeq_epi8(hay4, needle);
__m256i z1 = _mm256_cmpeq_epi8(hay1, zero);
__m256i z2 = _mm256_cmpeq_epi8(hay2, zero);
__m256i z3 = _mm256_cmpeq_epi8(hay3, zero);
__m256i z4 = _mm256_cmpeq_epi8(hay4, zero);
__m256i m1 = _mm256_or_si256(eq1, z1);
__m256i m2 = _mm256_or_si256(eq2, z2);
__m256i m3 = _mm256_or_si256(eq3, z3);
__m256i m4 = _mm256_or_si256(eq4, z4);
__m256i any = _mm256_or_si256(_mm256_or_si256(m1, m2), _mm256_or_si256(m3, m4));
if (_mm256_testz_si256(any, any) == 0) {
unsigned comb, eqm, bit;
comb = (unsigned)_mm256_movemask_epi8(m1);
if (comb) {
unsigned idx = (unsigned)__builtin_ctz(comb);
eqm = (unsigned)_mm256_movemask_epi8(eq1);
bit = 1u << idx;
return (eqm & bit) ? (char *)(p + idx) : NULL;
}
comb = (unsigned)_mm256_movemask_epi8(m2);
if (comb) {
unsigned idx = (unsigned)__builtin_ctz(comb);
eqm = (unsigned)_mm256_movemask_epi8(eq2);
bit = 1u << idx;
return (eqm & bit) ? (char *)(p + 32 + idx) : NULL;
}
comb = (unsigned)_mm256_movemask_epi8(m3);
if (comb) {
unsigned idx = (unsigned)__builtin_ctz(comb);
eqm = (unsigned)_mm256_movemask_epi8(eq3);
bit = 1u << idx;
return (eqm & bit) ? (char *)(p + 64 + idx) : NULL;
}
comb = (unsigned)_mm256_movemask_epi8(m4);
{
unsigned idx = (unsigned)__builtin_ctz(comb);
eqm = (unsigned)_mm256_movemask_epi8(eq4);
bit = 1u << idx;
return (eqm & bit) ? (char *)(p + 96 + idx) : NULL;
}
}
p += 128;
bytes_to_page_end -= 128;
}
while (bytes_to_page_end >= 32) {
__m256i hay = _mm256_loadu_si256((const __m256i *)p);
__m256i eq = _mm256_cmpeq_epi8(hay, needle);
__m256i z = _mm256_cmpeq_epi8(hay, zero);
__m256i m = _mm256_or_si256(eq, z);
unsigned comb = (unsigned)_mm256_movemask_epi8(m);
if (comb) {
unsigned idx = (unsigned)__builtin_ctz(comb);
unsigned eqm = (unsigned)_mm256_movemask_epi8(eq);
return (eqm & (1u << idx)) ? (char *)(p + idx) : NULL;
}
p += 32;
bytes_to_page_end -= 32;
}
}
}1.2. Exact match of a pair of characters in overlapping regions. ⎇
For the sake of illustrating the techniques involved, we will consider the problem of searching for either of two characters in a memory block (like a variant of wmemchr or such).
1.2.1. Baseline scalar/SWAR implementations. ⎇
We have a couple of the obvious ones. Simple scalar version with 8-bit loads:
size_t memchr2_portable_simple(const void * ptr, size_t n, uint16_t needle) {
const uint8_t * p = (const uint8_t *) ptr;
if (n < 2) return n;
const uint8_t lo = (uint8_t)needle;
const uint8_t hi = (uint8_t)(needle >> 8);
for (size_t i = 0; i + 1 < n; ++i)
if (p[i] == lo && p[i + 1] == hi) return i;
return n;
}Simple scalar version with direct, unaligned 16-bit loads:
NOINLINE size_t memchr2_portable_elim2(const void * ptr, size_t n, uint16_t needle) {
const uint8_t * p = (const uint8_t *) ptr;
if (n < 2) return n;
for (size_t i = 0; i + 1 < n; i++)
if (*(const uint16_t *)(p + i) == needle) return i;
return n;
}An attempt to save a load in each iteration:
size_t memchr2_portable_running(const void * ptr, size_t n, uint16_t needle) {
const uint8_t * p = (const uint8_t *) ptr;
if (n < 2) return n;
uint16_t window = *((const uint16_t *)p);
if (window == needle) return 0;
for (size_t i = 0; i + 2 < n; i++) {
window = (uint16_t)((window >> 8) | ((uint16_t)(p[i + 2]) << 8));
if (window == needle) return i + 1;
}
return n;
}A SWAR-based version that processes 8 bytes at a time, pre-filtering candidates that contain either of the two bytes in any order in succession:
NOINLINE size_t memchr2_portable_qword(const void * data, size_t len, uint16_t needle) {
if (len < 2) return len;
uint8_t b1 = (uint8_t)(needle & 0xFF);
uint8_t b2 = (uint8_t)(needle >> 8);
const uint64_t pattern1 = 0x0101010101010101ULL * (uint64_t)b1;
const uint64_t pattern2 = 0x0101010101010101ULL * (uint64_t)b2;
const uint8_t * p = (const uint8_t *) data;
const uint8_t * end = (const uint8_t *) data + len - 1;
while (p + 7 < end) {
uint64_t chunk = *(uint64_t *)p;
uint64_t x1 = chunk ^ pattern1;
uint64_t x2 = chunk ^ pattern2;
uint64_t hit1 = (x1 - 0x0101010101010101ULL) & ~x1 & 0x8080808080808080ULL;
uint64_t hit2 = (x2 - 0x0101010101010101ULL) & ~x2 & 0x8080808080808080ULL;
uint64_t hc1 = (hit1 >> 8) & hit2;
uint64_t hc2 = (hit2 >> 8) & hit1;
if (hc1 | hc2) {
for (int i = 0; i < 7; i++) {
uint16_t w = *(const uint16_t *)(p + i);
if (w == needle) return p + i - (const uint8_t *) data;
}
}
p += 7;
}
for (; p < end; p++) {
uint16_t w = *(const uint16_t *)p;
if (w == needle) return p - (const uint8_t *) data;
}
return len;
}Checking for both hc1 and hc2 helps to convince the compiler to vectorise the inner loop, leading to - paradoxically - better performance than checking for just one of them.
Also unexpectedly, the SWAR version beats all others in benchmarks by a wide margin, and the second runner-up is the naive, 8-bit direct load version:
buf_size=64000000 bytes, queries=65536, iters=100, seed=666
portable_elim2 time=15880241758 ns checksum=0xc25f073958310200
scalar time=14393013166 ns checksum=0xc25f073958310200
portable_qword time=8435549272 ns checksum=0xc25f073958310200
portable_running time=28003921570 ns checksum=0xc25f073958310200
Estimated throughput (GiB/s):
portable_elim2 : 13.499
scalar : 14.894
portable_qword : 25.412
portable_running : 7.6551.2.2. SSE2 and SSSE3 implementations. ⎇
The obvious next step is to try to vectorise the search. Without minding unaligned loads, we can do it the obvious way:
NOINLINE size_t memchr2_sse2_bmi1(const void * ptr, size_t n, uint16_t needle) {
const uint8_t * p = (const uint8_t *) ptr;
if (n < 2) return n;
const uint8_t lo = (uint8_t)needle;
const uint8_t hi = (uint8_t)(needle >> 8);
const __m128i vlo = _mm_set1_epi8((char)lo);
const __m128i vhi = _mm_set1_epi8((char)hi);
size_t i = 0;
for (; i + 17 <= n; i += 16) {
__m128i a = _mm_loadu_si128((const __m128i *)(p + i));
__m128i b = _mm_loadu_si128((const __m128i *)(p + i + 1));
__m128i eq0 = _mm_cmpeq_epi8(a, vlo);
__m128i eq1 = _mm_cmpeq_epi8(b, vhi);
__m128i both = _mm_and_si128(eq0, eq1);
unsigned mask = (unsigned)_mm_movemask_epi8(both);
if (mask)
return i + (size_t)_tzcnt_u32(mask); // BMI1
}
for (; i + 1 < n; ++i)
if (p[i] == lo && p[i + 1] == hi) return i;
return n;
}The result? A twice speedup over the SWAR version!
buf_size=64000000 bytes, queries=65536, iters=100, seed=666
portable_elim2 time=15880241758 ns checksum=0xc25f073958310200
scalar time=14393013166 ns checksum=0xc25f073958310200
portable_qword time=8435549272 ns checksum=0xc25f073958310200
portable_running time=28003921570 ns checksum=0xc25f073958310200
sse2_bmi1 time=5253894697 ns checksum=0xc25f073958310200
Estimated throughput (GiB/s):
portable_elim2 : 13.499
scalar : 14.894
portable_qword : 25.412
portable_running : 7.655
sse2_bmi1 : 40.802Using SSSE3 we can sidestep the nasty unaligned loads by using PALIGNR, like so:
NOINLINE size_t memchr2_ssse3_bmi1(const void * ptr, size_t n, uint16_t needle) {
const uint8_t * p = (const uint8_t *) ptr;
if (n < 2) return n;
const uint8_t lo = (uint8_t)needle;
const uint8_t hi = (uint8_t)(needle >> 8);
const __m128i vlo = _mm_set1_epi8((char)lo);
const __m128i vhi = _mm_set1_epi8((char)hi);
size_t i = 0;
if (n >= 32) {
__m128i A = _mm_loadu_si128((const __m128i *)(p + 0));
for (; i + 32 <= n; i += 16) {
__m128i N = _mm_loadu_si128((const __m128i *)(p + i + 16));
__m128i B = _mm_alignr_epi8(N, A, 1); // SSSE3
__m128i eq0 = _mm_cmpeq_epi8(A, vlo);
__m128i eq1 = _mm_cmpeq_epi8(B, vhi);
__m128i both = _mm_and_si128(eq0, eq1);
unsigned mask = (unsigned)_mm_movemask_epi8(both);
if (mask) return i + (size_t)_tzcnt_u32(mask); // BMI1
A = N;
}
}
for (; i + 1 < n; ++i)
if (p[i] == lo && p[i + 1] == hi) return i;
return n;
}Disappointingly, that doesn't seem to win us much:
buf_size=64000000 bytes, queries=65536, iters=100, seed=666
portable_elim2 time=15880241758 ns checksum=0xc25f073958310200
scalar time=14393013166 ns checksum=0xc25f073958310200
portable_qword time=8435549272 ns checksum=0xc25f073958310200
portable_running time=28003921570 ns checksum=0xc25f073958310200
sse2_bmi1 time=5253894697 ns checksum=0xc25f073958310200
ssse3_bmi1 time=5354999130 ns checksum=0xc25f073958310200
Estimated throughput (GiB/s):
portable_elim2 : 13.499
scalar : 14.894
portable_qword : 25.412
portable_running : 7.655
sse2_bmi1 : 40.802
ssse3_bmi1 : 40.031Judging by the output of the profiling tool perf, most of the time is spent in the hot loop, suggesting that nothing funny is going on with the tail or tzcnt again. Zen2 analysis via llvm-mca is also somewhat insubtantial:
Iterations: 100
Instructions: 1400
Total Cycles: 364
Total uOps: 1400
Dispatch Width: 4
uOps Per Cycle: 3.85
IPC: 3.85
Block RThroughput: 3.5
Instruction Info:
[1] [2] [3] [4] [5] [6] Instructions:
1 1 0.25 vmovdqa xmm3, xmm2
1 8 0.33 * vmovdqu xmm2, xmmword ptr [rdi + r8 + 16]
1 1 0.50 vpalignr xmm4, xmm2, xmm3, 1
1 1 0.33 vpcmpeqb xmm3, xmm0, xmm3
1 1 0.33 vpcmpeqb xmm4, xmm4, xmm1
1 1 0.25 vpand xmm3, xmm3, xmm4
1 1 1.00 vpmovmskb esi, xmm3
1 1 0.25 test esi, esi
1 1 0.25 jne .over
1 1 0.25 lea rsi, [r8 + 16]
1 1 0.25 add r8, 48
1 1 0.25 cmp r8, rax
1 1 0.25 mov r8, rsi
1 1 0.25 jbe .starIt also appears that we don't have enough execution ports to unroll and interleave the loop further.
1.2.3. AVX2 implementation. ⎇
We start tame, by widening the SSE vectors to AVX2 256-bit ones. Unfortunately, life is never as simple as one would have hoped. AVX2 likes to include opcodes that operate on 128-bit lanes of 256-bit vectors only. Reportedly this was dictated by microarchitectural considerations in Intel's implementation of AVX2 that are now long obsolete, but we have to deal with it nonetheless.
Our implementation will use permute2x128 and valignr to construct the shifted vectors:
NOINLINE size_t memchr2_avx2_bmi1(const void * ptr, size_t n, uint16_t needle) {
const uint8_t *p = (const uint8_t *)ptr;
if (n < 2) return n;
const uint8_t lo = (uint8_t)needle;
const uint8_t hi = (uint8_t)(needle >> 8);
const __m256i vlo = _mm256_set1_epi8((char)lo);
const __m256i vhi = _mm256_set1_epi8((char)hi);
size_t i = 0;
if (n >= 64) {
__m256i A = _mm256_loadu_si256((const __m256i *)(p + 0));
for (; i + 64 <= n; i += 32) {
__m256i N = _mm256_loadu_si256((const __m256i *)(p + i + 32));
__m256i M = _mm256_permute2x128_si256(A, N, 0x21);
__m256i B = _mm256_alignr_epi8(M, A, 1);
__m256i eq0 = _mm256_cmpeq_epi8(A, vlo);
__m256i eq1 = _mm256_cmpeq_epi8(B, vhi);
__m256i both = _mm256_and_si256(eq0, eq1);
unsigned mask = (unsigned)_mm256_movemask_epi8(both);
if (mask) return i + (size_t)_tzcnt_u32(mask);
A = N;
}
}
for (; i + 1 < n; ++i)
if (p[i] == lo && p[i + 1] == hi) return i;
return n;
}Instinctively, we will also try to shove a couple of _mm256_movemask_epi8 away from the hot path. To do this we will apply the same optimisation idea as in the regular memchr implementation:
static inline __m256i both_from_AN(__m256i A, __m256i N, __m256i vlo, __m256i vhi) {
__m256i M = _mm256_permute2x128_si256(A, N, 0x21);
__m256i B = _mm256_alignr_epi8(M, A, 1);
__m256i eq0 = _mm256_cmpeq_epi8(A, vlo);
__m256i eq1 = _mm256_cmpeq_epi8(B, vhi);
return _mm256_and_si256(eq0, eq1);
}
NOINLINE size_t memchr2_avx2_bmi1_alt(const void * ptr, size_t n, uint16_t needle) {
const uint8_t * p = (const uint8_t *) ptr;
if (n < 2) return n;
const uint8_t lo = (uint8_t)needle;
const uint8_t hi = (uint8_t)(needle >> 8);
const __m256i vlo = _mm256_set1_epi8((char)lo);
const __m256i vhi = _mm256_set1_epi8((char)hi);
size_t i = 0;
if (n >= 96) {
__m256i A = _mm256_loadu_si256((const __m256i *)(p + 0));
for (; i + 96 <= n; i += 64) {
__m256i N0 = _mm256_loadu_si256((const __m256i *)(p + i + 32));
__m256i N1 = _mm256_loadu_si256((const __m256i *)(p + i + 64));
__m256i both0 = both_from_AN(A, N0, vlo, vhi);
__m256i both1 = both_from_AN(N0, N1, vlo, vhi);
__m256i any = _mm256_or_si256(both0, both1);
if (!_mm256_testz_si256(any, any)) {
unsigned m0 = (unsigned)_mm256_movemask_epi8(both0);
if (m0) return i + (size_t)_tzcnt_u32(m0);
unsigned m1 = (unsigned)_mm256_movemask_epi8(both1);
return (i + 32) + (size_t)_tzcnt_u32(m1);
}
A = N1;
}
for (; i + 64 <= n; i += 32) {
__m256i N = _mm256_loadu_si256((const __m256i *)(p + i + 32));
__m256i both = both_from_AN(A, N, vlo, vhi);
if (!_mm256_testz_si256(both, both)) {
unsigned m = (unsigned)_mm256_movemask_epi8(both);
return i + (size_t)_tzcnt_u32(m);
}
A = N;
}
} else if (n >= 64) {
__m256i A = _mm256_loadu_si256((const __m256i *)(p + 0));
for (; i + 64 <= n; i += 32) {
__m256i N = _mm256_loadu_si256((const __m256i *)(p + i + 32));
__m256i both = both_from_AN(A, N, vlo, vhi);
if (!_mm256_testz_si256(both, both)) {
unsigned m = (unsigned)_mm256_movemask_epi8(both);
return i + (size_t)_tzcnt_u32(m);
}
A = N;
}
}
for (; i + 1 < n; ++i)
if (p[i] == lo && p[i + 1] == hi) return i;
return n;
}Benchmarks show that the second AVX2 version is about the same in terms of performance as the first one, except it seems to stall on the vtest instruction on both Zen2 and Zen4 CPUs. A test on a Zen2 CPU (5900X) gives:
buf_size=64000000 bytes, queries=65536, iters=50, seed=16
portable_elim2 time=7122549226 ns checksum=0x9ea45cc56838615c
scalar time=3940411071 ns checksum=0x9ea45cc56838615c
portable_qword time=1868259066 ns checksum=0x9ea45cc56838615c
portable_running time=11036034203 ns checksum=0x9ea45cc56838615c
sse2_bmi1 time=774990423 ns checksum=0x9ea45cc56838615c
sse2_bmi1_fast time=837601040 ns checksum=0x9ea45cc56838615c
ssse3_bmi1 time=821152775 ns checksum=0x9ea45cc56838615c
avx2_bmi1 time=710584728 ns checksum=0x9ea45cc56838615c
avx2_bmi1_alt time=703469492 ns checksum=0x9ea45cc56838615c
Estimated throughput (GiB/s):
portable_elim2 : 14.727
scalar : 26.621
portable_qword : 56.147
portable_running : 9.505
sse2_bmi1 : 135.352
sse2_bmi1_fast : 125.234
ssse3_bmi1 : 127.743
avx2_bmi1 : 147.620
avx2_bmi1_alt : 149.113The kernel of the improved AVX2 loop appears to be:
3.81 │ 40: vmovdqu ymm4,YMMWORD PTR [rdi+rsi*1+0x20]
2.90 │ vpcmpeqb ymm5,ymm0,ymm2
4.17 │ vperm2i128 ymm3,ymm2,ymm4,0x21
3.16 │ vpalignr ymm3,ymm3,ymm2,0x1
2.78 │ vmovdqu ymm2,YMMWORD PTR [rdi+rsi*1+0x40]
3.27 │ vpcmpeqb ymm3,ymm3,ymm1
1.95 │ vpand ymm3,ymm5,ymm3
2.49 │ vperm2i128 ymm5,ymm4,ymm2,0x21
1.91 │ vpalignr ymm5,ymm5,ymm4,0x1
2.32 │ vpcmpeqb ymm4,ymm0,ymm4
3.60 │ vpcmpeqb ymm5,ymm5,ymm1
3.15 │ vpand ymm4,ymm4,ymm5
5.93 │ vpor ymm5,ymm3,ymm4
15.79 │ vptest ymm5,ymm5
7.24 │ ↓ jne 154
7.20 │ lea r8,[rsi+0x40]
7.08 │ add rsi,0xa0
4.72 │ cmp rsi,rax
3.19 │ mov rsi,r8
3.13 │ ↑ jbe 40Unrolling four times gives the following kernel:
0.79 │ 40: vmovdqu ymm4,YMMWORD PTR [rdi+rsi*1+0x20]
0.53 │ vmovdqu ymm5,YMMWORD PTR [rdi+rsi*1+0x40]
0.50 │ vmovdqu ymm6,YMMWORD PTR [rdi+rsi*1+0x60]
0.52 │ vpcmpeqb ymm7,ymm0,ymm2
1.73 │ vperm2i128 ymm3,ymm2,ymm4,0x21
1.86 │ vpalignr ymm3,ymm3,ymm2,0x1
2.43 │ vmovdqu ymm2,YMMWORD PTR [rdi+rsi*1+0x80]
2.29 │ vpcmpeqb ymm3,ymm3,ymm1
1.95 │ vpand ymm3,ymm7,ymm3
2.29 │ vperm2i128 ymm7,ymm4,ymm5,0x21
1.10 │ vpalignr ymm7,ymm7,ymm4,0x1
0.94 │ vpcmpeqb ymm4,ymm0,ymm4
0.84 │ vpcmpeqb ymm7,ymm7,ymm1
0.69 │ vpand ymm4,ymm4,ymm7
0.71 │ vperm2i128 ymm7,ymm5,ymm6,0x21
1.04 │ vpor ymm8,ymm3,ymm4
0.76 │ vpalignr ymm7,ymm7,ymm5,0x1
1.06 │ vpcmpeqb ymm5,ymm0,ymm5
0.72 │ vpcmpeqb ymm7,ymm7,ymm1
1.04 │ vpand ymm5,ymm5,ymm7
0.93 │ vperm2i128 ymm7,ymm6,ymm2,0x21
0.98 │ vpalignr ymm7,ymm7,ymm6,0x1
1.21 │ vpcmpeqb ymm6,ymm0,ymm6
1.51 │ vpcmpeqb ymm7,ymm7,ymm1
1.23 │ vpand ymm6,ymm6,ymm7
3.14 │ vpor ymm7,ymm5,ymm6
1.50 │ vpor ymm7,ymm8,ymm7
19.06 │ vptest ymm7,ymm7
9.21 │ ↓ jne 140
8.78 │ lea r8,[rsi+0x80]
8.29 │ add rsi,0x120
8.92 │ cmp rsi,rax
0.56 │ mov rsi,r8
0.62 │ ↑ jbe 40Which also stalls on vtest, albeit less frequently.
1.2.4. AVX512 (AVX-512F + AVX-512BW + AVX-512VBMI) implementation. ⎇
Straightforward translation using _mm512_permutex2var_epi8 yields us:
NOINLINE size_t memchr2_avx512vbmi(const void * ptr, size_t n, uint16_t needle) {
const uint8_t * p = (const uint8_t *) ptr;
if (n < 2) return n;
const uint8_t lo = (uint8_t)needle;
const uint8_t hi = (uint8_t)(needle >> 8);
const __m512i vlo = _mm512_set1_epi8((char)lo);
const __m512i vhi = _mm512_set1_epi8((char)hi);
const __m512i idx = _mm512_setr_epi32(
0x04030201, 0x08070605, 0x0c0b0a09, 0x100f0e0d,
0x14131211, 0x18171615, 0x1c1b1a19, 0x201f1e1d,
0x24232221, 0x28272625, 0x2c2b2a29, 0x302f2e2d,
0x34333231, 0x38373635, 0x3c3b3a39, 0x403f3e3d
);
size_t i = 0;
if (n >= 128) {
__m512i A = _mm512_loadu_si512((const void *)(p + 0));
for (; i + 128 <= n; i += 64) {
__m512i N = _mm512_loadu_si512((const void *)(p + i + 64));
__m512i B = _mm512_permutex2var_epi8(A, idx, N);
__mmask64 k0 = _mm512_cmpeq_epi8_mask(A, vlo);
__mmask64 k1 = _mm512_cmpeq_epi8_mask(B, vhi);
__mmask64 k = k0 & k1;
if (k) return i + (size_t)_tzcnt_u64((uint64_t)k);
A = N; // roll
}
}
for (; i + 1 < n; ++i) if (p[i] == lo && p[i + 1] == hi) return i;
return n;
}The perf profiling output tells us that we may have overdone a little with the vector width, but the stall persists:
Percent │ mov rax,rsi
0.01 │ cmp rsi,0x2
│ ↓ jb 7b
0.01 │ movzx ecx,dx
0.02 │ shr ecx,0x8
│ cmp rax,0x80
│ ↓ jb 7f
0.01 │ vmovdqu64 zmm2,ZMMWORD PTR [rdi]
0.25 │ vpbroadcastb zmm0,edx
0.04 │ vpbroadcastb zmm1,ecx
0.01 │ xor r8d,r8d
│ nop
4.02 │30: vmovdqa64 zmm3,zmm2
4.02 │ vmovdqu64 zmm2,ZMMWORD PTR [rdi+r8*1+0x40]
4.52 │ vpcmpeqb k0,zmm0,zmm3
4.48 │ valignq zmm4,zmm2,zmm3,0x2
6.98 │ vpalignr zmm4,zmm4,zmm3,0x1
7.18 │ vpcmpeqb k1,zmm4,zmm1
20.62 │ ktestq k0,k1
9.31 │ ↓ jne b0
9.61 │ lea rsi,[r8+0x40]
6.78 │ add r8,0xc0
6.66 │ cmp r8,rax
0.73 │ mov r8,rsi
0.76 │ ↑ jbe 30
0.09 │ lea r8,[rsi+0x1]
0.05 │ cmp r8,rax
│ ↓ jb 8a
0.12 │7b: vzeroupper
0.01 │ ← ret
0.02 │7f: xor esi,esi
│ lea r8,[rsi+0x1]
0.01 │ cmp r8,rax
│ ↑ jae 7b
0.03 │8a: lea r8,[rax-0x1]
0.04 │ ↓ jmp 98
2.39 │90: inc rsi
1.90 │ cmp r8,rsi
│ ↑ je 7b
6.05 │98: cmp BYTE PTR [rdi+rsi*1],dl
│ ↑ jne 90
0.12 │ movzx r9d,BYTE PTR [rdi+rsi*1+0x1]
0.11 │ cmp cx,r9w
│ ↑ jne 90
│ mov rax,rsi
│ vzeroupper
│ ← ret
1.49 │b0: kandq k0,k0,k1
1.09 │ kmovq rax,k0
0.47 │ tzcnt rax,rax
│ add rax,r8
0.01 │ vzeroupper
0.01 │ ← retPerhaps interestingly, the AVX-512 implementation seems to not incur too much code bloat:
% nm --no-demangle -a -P --size-sort memchr2-benchmark | grep memchr2
_Z22memchr2_portable_elim2PKvmt T 1960 3a
_Z23memchr2_portable_simplePKvmt T 1920 3d
_Z24memchr2_portable_runningPKvmt T 19a0 53
_Z17memchr2_sse2_bmi1PKvmt T 11f0 9d
_Z18memchr2_ssse3_bmi1PKvmt T 1340 ad
_Z22memchr2_sse2_bmi1_fastPKvmt T 1290 ad
_Z17memchr2_avx2_bmi1PKvmt T 13f0 c4
_Z18memchr2_avx512vbmiPKvmt T 16c0 c6 <<--- AVX-512 version
_Z22memchr2_portable_qwordPKvmt T 1790 18d
_Z21memchr2_avx2_bmi1_altPKvmt T 14c0 1f5We also observe a significant performance boost over AVX2:
buf_size=64000000 bytes, queries=65536, iters=100, seed=666
sse2_bmi1 time=6581546429 ns checksum=0xc25f073958310200
sse2_bmi1_fast time=6248299510 ns checksum=0xc25f073958310200
ssse3_bmi1 time=6445409513 ns checksum=0xc25f073958310200
avx2_bmi1 time=5446783838 ns checksum=0xc25f073958310200
avx2_bmi1_alt time=5882657242 ns checksum=0xc25f073958310200
avx512vbmi time=4824502035 ns checksum=0xc25f073958310200
Estimated throughput (GiB/s):
sse2_bmi1 : 32.571
sse2_bmi1_fast : 34.308
ssse3_bmi1 : 33.259
avx2_bmi1 : 39.357
avx2_bmi1_alt : 36.440
avx512vbmi : 44.433With the target feature that discourages the compiler from splitting permutex2var into multiple instructions, we see the assembly output:
4.45 │ 90: mov rdx,rcx
0.54 │ add rcx,0x40
0.51 │ vmovdqa64 zmm1,zmm0
2.34 │ vmovdqu64 zmm0,ZMMWORD PTR [r8+rcx*1]
5.69 │ vmovdqa64 zmm2,zmm1
7.80 │ vpermt2b zmm2,zmm3,zmm0
10.07 │ vpcmpeqb k1,zmm2,zmm4
9.24 │ vpcmpeqb k0{k1},zmm1,zmm5
24.38 │ kortestq k0,k0
8.04 │ ↓ jne 100
6.57 │ lea rdi,[rdx+0xc0]
5.39 │ cmp rax,rdi
0.01 │ ↑ jae 90Without major performance improvements.
1.3. Exact match of a pair of characters in non-overlapping regions. ⎇
This is an easier but less useful problem (applicable only to wmemchr or related problems). Essentially, all of our solutions are derivative of the previous memchr implementations, except we operate on two-byte chunks directly.
1.3.1. Baseline and SWAR implementations. ⎇
Three preliminary implementations (simple, SWAR, repne scasw) follow:
NOINLINE void * wmemchr_repne_scasw(const void *s, uint16_t c, size_t n) {
if (n == 0) return NULL;
const uint16_t *p = (const uint16_t *)s;
void *ret;
__asm__ volatile(
"cld\n\t"
"repne scasw\n\t"
"jne 1f\n\t" // if ZF==0 (no match), jump to notfound
"lea -2(%%rdi), %0\n\t" // match: ret = rdi-2
"jmp 2f\n\t"
"1:\n\t"
"xor %0, %0\n\t" // not found: ret = NULL
"2:\n\t"
: "=&r"(ret), "+D"(p), "+c"(n) : "a"(c) : "cc", "memory");
return ret;
}
NOINLINE const uint16_t * simple_wmemchr(const uint16_t * __restrict__ text, uint16_t target, size_t length) {
for (size_t i = 0; i < length; ++i)
if (text[i] == target)
return &text[i];
return nullptr;
}
NOINLINE const uint16_t * qword_wmemchr(const uint16_t * __restrict__ s, uint16_t target, size_t length) {
const uint16_t * p = s;
const uint64_t pattern = 0x0001000100010001ULL * (uint64_t)target;
while (length >= 4) {
uint64_t w; __builtin_memcpy(&w, p, sizeof(w));
uint64_t x = w ^ pattern;
if (((x - 0x0001000100010001ULL) & ~x & 0x8000800080008000ULL) != 0) {
if (p[0] == target) return p + 0;
if (p[1] == target) return p + 1;
if (p[2] == target) return p + 2;
if (p[3] == target) return p + 3;
}
p += 4; length -= 4;
}
while (length) {
if (*p == target) return p;
++p; --length;
}
return nullptr;
}Benchmarks show:
buf_size=67108864 bytes, queries=65536, iters=50, seed=1
simple_wmemchr time=6638559210 ns checksum=0xcb426bda4f4d21ff
wmemchr_repne_scasw time=10031833115 ns checksum=0xcb426bda4f4d21ff
qword_wmemchr time=5605833323 ns checksum=0xcb426bda4f4d21ff
Estimated throughput (GiB/s):
simple_wmemchr: 16.014
wmemchr_repne_scasw: 10.598
qword_wmemchr: 18.965Unsurprisingly, as we have remarked previously, repne scasw is not competitive here due to the missing microcode. The SWAR version is the fastest, but only marginally so. More significant performance improvements will be obtained using SIMD techniques, similar to those used in the previous section.
1.3.2. SIMD implementations. ⎇
The only issue with porting the previous memchr SIMD implementations to wmemchr is that we have to deal with 16-bit elements instead of 8-bit ones in the _mm_movemask_epi8 instruction. This means that we have to adjust the way we compute the positions from the comparison results.
NOINLINE void * wmemchr_sse2(const void * s, uint16_t c, size_t n) {
const uint16_t * p = (const uint16_t *) s;
const __m128i needle = _mm_set1_epi16((short)c);
for (; n >= 8; n -= 8, p += 8) {
__m128i hay = _mm_loadu_si128((const __m128i*)p);
__m128i eq = _mm_cmpeq_epi16(hay, needle);
unsigned mask = (unsigned)_mm_movemask_epi8(eq);
if (mask) {
unsigned byte_index = (unsigned)__builtin_ctz(mask);
return (void *) (p + (byte_index >> 1));
}
}
for (; n; --n, ++p) {
if (*p == c) return (void *)p;
}
return nullptr;
}
NOINLINE const uint16_t * wmemchr_avx2_simple(const uint16_t * __restrict__ s, uint16_t target, size_t length) {
const uint16_t * __restrict__ p = s;
const __m256i needle = _mm256_set1_epi16((short)target);
for (; length >= 64; length -= 64, p += 64) {
__m256i hay1 = _mm256_loadu_si256((const __m256i *)(p));
__m256i hay2 = _mm256_loadu_si256((const __m256i *)(p + 16));
__m256i hay3 = _mm256_loadu_si256((const __m256i *)(p + 32));
__m256i hay4 = _mm256_loadu_si256((const __m256i *)(p + 48));
__m256i eq1 = _mm256_cmpeq_epi16(hay1, needle);
__m256i eq2 = _mm256_cmpeq_epi16(hay2, needle);
__m256i eq3 = _mm256_cmpeq_epi16(hay3, needle);
__m256i eq4 = _mm256_cmpeq_epi16(hay4, needle);
__m256i any = _mm256_or_si256(_mm256_or_si256(eq1, eq2), _mm256_or_si256(eq3, eq4));
if (_mm256_testz_si256(any, any) == 0) {
unsigned mm1 = (unsigned)_mm256_movemask_epi8(eq1);
if (mm1) return p + ((unsigned)__builtin_ctz(mm1) >> 1);
unsigned mm2 = (unsigned)_mm256_movemask_epi8(eq2);
if (mm2) return (p + 16) + ((unsigned)__builtin_ctz(mm2) >> 1);
unsigned mm3 = (unsigned)_mm256_movemask_epi8(eq3);
if (mm3) return (p + 32) + ((unsigned)__builtin_ctz(mm3) >> 1);
unsigned mm4 = (unsigned)_mm256_movemask_epi8(eq4);
return (p + 48) + ((unsigned)__builtin_ctz(mm4) >> 1);
}
}
for (; length >= 16; length -= 16, p += 16) {
__m256i hay = _mm256_loadu_si256((const __m256i *)p);
__m256i eq = _mm256_cmpeq_epi16(hay, needle);
unsigned mask = (unsigned)_mm256_movemask_epi8(eq);
if (mask)
return p + ((unsigned)__builtin_ctz(mask) >> 1);
}
for (; length; --length, ++p)
if (*p == target) return p;
return nullptr;
}The final benchmarks look like this:
buf_size=67108864 bytes, queries=65536, iters=50, seed=1
simple_wmemchr time=6638559210 ns checksum=0xcb426bda4f4d21ff
wmemchr_repne_scasw time=10031833115 ns checksum=0xcb426bda4f4d21ff
wmemchr_sse2 time=4114077958 ns checksum=0xcb426bda4f4d21ff
qword_wmemchr time=5605833323 ns checksum=0xcb426bda4f4d21ff
wmemchr_avx2_simple time=3277355737 ns checksum=0xcb426bda4f4d21ff
Estimated throughput (GiB/s):
simple_wmemchr: 16.014
wmemchr_repne_scasw: 10.598
wmemchr_sse2 : 25.841
qword_wmemchr: 18.965
wmemchr_avx2_simple : 32.438We leave the problem be here, as we believe the problem to be memory-beyond this point.
1.4. Exact match of a triplet of consecutive characters. ⎇
Matching triplets is of particular interest to us and poses a challenge:
- There isn't a single instruction to load a triplet from memory, unlike before.
- Naive scaling of the 16-bit variant will increase the volume of code and provide strictly worse performance.
- The needle is staring to become large enough to stack the statistics within random data against us.
Because at this length the concrete underlying distribution of substrings in the input data matters, we will conduct two benchmarks: one with uniformly random data, and another with a prefix of enwik8, 100MB of Wikipedia in XML format.
1.4.1. Baseline and SWAR implementations. ⎇
We once again compare the "rolling read" and baseline approaches:
NOINLINE size_t memchr3_portable_simple(const void * ptr, size_t n, uint32_t needle) {
const uint8_t * p = (const uint8_t *)ptr;
if (n < 3) return n;
const uint8_t b1 = (uint8_t)needle;
const uint8_t b2 = (uint8_t)(needle >> 8);
const uint8_t b3 = (uint8_t)(needle >> 16);
for (size_t i = 0; i + 2 < n; ++i)
if (p[i] == b1 && p[i + 1] == b2 && p[i + 2] == b3) return i;
return n;
}
NOINLINE size_t memchr3_rolling_cmp(const void * ptr, size_t n, uint32_t needle) {
const uint8_t * p = (const uint8_t *)ptr;
if (n < 3) return n;
uint32_t nw = (uint32_t)(
((needle & 0xFFu) << 24) |
(((needle >> 8) & 0xFFu) << 16) |
(((needle >> 16) & 0xFFu) << 8)
);
uint32_t hw = (uint32_t)((uint32_t)p[0] << 24 | (uint32_t)p[1] << 16 | (uint32_t)p[2] << 8);
if (hw == nw) return 0;
size_t i = 2;
while (++i < n) {
hw = (hw | (uint32_t)p[i]) << 8;
if (hw == nw) return i - 2;
}
return n;
}Benchmarks show that the rolling read approach (musl-like) still isn't worth the effort here; Zen2 benchmarks show:
Testing on uniformly random data...
buf_size=67108864 bytes, queries=65536, iters=10, seed=1
memchr3_simple time=3838123745 ns checksum=0x64135f1aecd4b15f
memchr3_rolling time=6907529497 ns checksum=0x64135f1aecd4b15f
Estimated throughput (GiB/s):
memchr3_simple : 5.498
memchr3_rolling : 3.055
Testing on enwik8 with sampled needles...
memchr3_simple time=231706636 ns checksum=0x49c24b926896aabe
memchr3_rolling time=206304819 ns checksum=0x49c24b926896aabe
Estimated throughput (GiB/s):
memchr3_simple : 5.401
memchr3_rolling : 6.066Moving on to the SWAR implementation, we have three choices that make assumptions about the underlying distribution: test all bytes (most general, also most overhead on the hot path, no false positives), test only the first and last bytes (moderate overhead, some false positives), and test only the middle byte (least overhead, most false positives). We implement all three without excessive commentary:
NOINLINE size_t memchr3_portable_qword(const void * data, size_t len, uint32_t needle) {
if (len < 3) return len;
uint8_t b1 = (uint8_t)(needle);
uint8_t b2 = (uint8_t)(needle >> 8);
uint8_t b3 = (uint8_t)(needle >> 16);
const uint64_t pattern1 = 0x0101010101010101ULL * (uint64_t)b1;
const uint64_t pattern2 = 0x0101010101010101ULL * (uint64_t)b2;
const uint64_t pattern3 = 0x0101010101010101ULL * (uint64_t)b3;
const uint8_t * p = (const uint8_t *) data;
const uint8_t * end = (const uint8_t *) data + len - 2;
while (p + 6 < end) {
uint64_t chunk = *(uint64_t *)p;
uint64_t x1 = chunk ^ pattern1;
uint64_t x2 = chunk ^ pattern2;
uint64_t x3 = chunk ^ pattern3;
uint64_t hit1 = (x1 - 0x0101010101010101ULL) & ~x1 & 0x8080808080808080ULL;
uint64_t hit2 = (x2 - 0x0101010101010101ULL) & ~x2 & 0x8080808080808080ULL;
uint64_t hit3 = (x3 - 0x0101010101010101ULL) & ~x3 & 0x8080808080808080ULL;
uint64_t hc2 = (hit3 >> 16) & (hit2 >> 8) & hit1;
if (hc2) {
for (int i = 0; i < 6; i++) {
if (p[i] == b1 && p[i + 1] == b2 && p[i + 2] == b3) return p + i - (const uint8_t *) data;
}
}
p += 6;
}
for (; p < end; p++) {
if (p[0] == b1 && p[1] == b2 && p[2] == b3) return p - (const uint8_t *) data;
}
return len;
}
NOINLINE size_t memchr3_portable_middle_qword(const void * data, size_t len, uint32_t needle) {
if (len < 3) return len;
uint8_t b1 = (uint8_t)(needle);
uint8_t b2 = (uint8_t)(needle >> 8);
uint8_t b3 = (uint8_t)(needle >> 16);
const uint64_t pattern1 = 0x0101010101010101ULL * (uint64_t)b1;
const uint64_t pattern3 = 0x0101010101010101ULL * (uint64_t)b3;
const uint8_t * p = (const uint8_t *) data;
const uint8_t * end = (const uint8_t *) data + len - 2;
while (p + 6 < end) {
uint64_t chunk = *(uint64_t *)p;
uint64_t x1 = chunk ^ pattern1;
uint64_t x3 = chunk ^ pattern3;
uint64_t hit1 = (x1 - 0x0101010101010101ULL) & ~x1 & 0x8080808080808080ULL;
uint64_t hit3 = (x3 - 0x0101010101010101ULL) & ~x3 & 0x8080808080808080ULL;
uint64_t hc2 = (hit3 >> 16) & hit1;
if (hc2) {
for (int i = 0; i < 6; i++) {
if (p[i] == b1 && p[i + 1] == b2 && p[i + 2] == b3) return p + i - (const uint8_t *) data;
}
}
p += 6;
}
for (; p < end; p++) {
if (p[0] == b1 && p[1] == b2 && p[2] == b3) return p - (const uint8_t *) data;
}
return len;
}
NOINLINE size_t memchr3_portable_fb_qword(const void * data, size_t len, uint32_t needle) {
if (len < 3) return len;
uint8_t b1 = (uint8_t)(needle);
uint8_t b2 = (uint8_t)(needle >> 8);
uint8_t b3 = (uint8_t)(needle >> 16);
const uint64_t pattern1 = 0x0101010101010101ULL * (uint64_t)b1;
const uint8_t * p = (const uint8_t *) data;
const uint8_t * end = (const uint8_t *) data + len - 2;
while (p + 6 < end) {
uint64_t chunk = *(uint64_t *)p;
uint64_t x1 = chunk ^ pattern1;
uint64_t hit1 = (x1 - 0x0101010101010101ULL) & ~x1 & 0x8080808080808080ULL;
if (hit1) {
for (int i = 0; i < 6; i++) {
if (p[i] == b1 && p[i + 1] == b2 && p[i + 2] == b3) return p + i - (const uint8_t *) data;
}
}
p += 6;
}
for (; p < end; p++) {
if (p[0] == b1 && p[1] == b2 && p[2] == b3) return p - (const uint8_t *) data;
}
return len;
}The runtimes suggest that for real data, searching the first and last bytes is the best compromise:
Testing on uniformly random data...
buf_size=67108864 bytes, queries=65536, iters=10, seed=1
memchr3_simple time=3838123745 ns checksum=0x64135f1aecd4b15f
memchr3_rolling time=6907529497 ns checksum=0x64135f1aecd4b15f
memchr3_qword time=3501706757 ns checksum=0x64135f1aecd4b15f
memchr3_middle time=2558240974 ns checksum=0x64135f1aecd4b15f
memchr3_fb time=2119827609 ns checksum=0x64135f1aecd4b15f
Estimated throughput (GiB/s):
memchr3_simple : 5.498
memchr3_rolling : 3.055
memchr3_qword : 6.027
memchr3_middle : 8.249
memchr3_fb : 9.955
Testing on enwik8 with sampled needles...
memchr3_simple time=231706636 ns checksum=0x49c24b926896aabe
memchr3_rolling time=206304819 ns checksum=0x49c24b926896aabe
memchr3_qword time=126368474 ns checksum=0x49c24b926896aabe
memchr3_middle time=110629408 ns checksum=0x49c24b926896aabe
memchr3_fb time=194372178 ns checksum=0x49c24b926896aabe
Estimated throughput (GiB/s):
memchr3_simple : 5.401
memchr3_rolling : 6.066
memchr3_qword : 9.903
memchr3_middle : 11.312
memchr3_fb : 6.439Conversely, because the random data is, well, random, checking only the first byte is the best option there as it filters out most candidates with minimal overhead.
1.4.2. SSE searcher. ⎇
We will naively extend the unaligned load idea from before:
NOINLINE size_t memchr3_sse2_bmi1(const void * ptr, size_t n, uint32_t needle) {
const uint8_t *p = (const uint8_t *)ptr;
if (n < 3) return n;
const uint8_t b0 = (uint8_t)(needle);
const uint8_t b1 = (uint8_t)(needle >> 8);
const uint8_t b2 = (uint8_t)(needle >> 16);
const __m128i v0 = _mm_set1_epi8((char)b0);
const __m128i v1 = _mm_set1_epi8((char)b1);
const __m128i v2 = _mm_set1_epi8((char)b2);
size_t i = 0;
for (; i + 18 <= n; i += 16) {
__m128i a = _mm_loadu_si128((const __m128i *)(p + i));
__m128i b = _mm_loadu_si128((const __m128i *)(p + i + 1));
__m128i c = _mm_loadu_si128((const __m128i *)(p + i + 2));
__m128i eq0 = _mm_cmpeq_epi8(a, v0);
__m128i eq1 = _mm_cmpeq_epi8(b, v1);
__m128i eq2 = _mm_cmpeq_epi8(c, v2);
__m128i m01 = _mm_and_si128(eq0, eq1);
__m128i both = _mm_and_si128(m01, eq2);
unsigned mask = (unsigned)_mm_movemask_epi8(both);
if (mask) return i + (size_t)_tzcnt_u32(mask);
}
for (; i + 2 < n; ++i) {
if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
}
return n;
}We see the following performance characteristics:
buf_size=67108864 bytes, queries=65536, iters=10, seed=1
memchr3_simple time=3829049092 ns checksum=0x64135f1aecd4b15f
memchr3_rolling time=6954385344 ns checksum=0x64135f1aecd4b15f
memchr3_qword time=3522594643 ns checksum=0x64135f1aecd4b15f
memchr3_middle time=2594670220 ns checksum=0x64135f1aecd4b15f
memchr3_fb time=2152706551 ns checksum=0x64135f1aecd4b15f
memchr3_sse2_bmi1 time=612184228 ns checksum=0x64135f1aecd4b15f
Estimated throughput (GiB/s):
memchr3_simple : 5.511
memchr3_rolling : 3.035
memchr3_qword : 5.991
memchr3_middle : 8.133
memchr3_fb : 9.803
memchr3_sse2_bmi1: 34.472
Testing on enwik8 with sampled needles...
memchr3_simple time=232567051 ns checksum=0x49c24b926896aabe
memchr3_rolling time=208465357 ns checksum=0x49c24b926896aabe
memchr3_qword time=127860831 ns checksum=0x49c24b926896aabe
memchr3_middle time=112110693 ns checksum=0x49c24b926896aabe
memchr3_fb time=196291157 ns checksum=0x49c24b926896aabe
memchr3_sse2_bmi1 time=56776812 ns checksum=0x49c24b926896aabe
Estimated throughput (GiB/s):
memchr3_simple : 5.381
memchr3_rolling : 6.003
memchr3_qword : 9.788
memchr3_middle : 11.163
memchr3_fb : 6.376
memchr3_sse2_bmi1: 22.042The SSSE3 versions (basic, unrolled) which avoid excessive loads still do not prove themselves worthwhile:
NOINLINE size_t memchr3_ssse3_bmi1(const void * ptr, size_t n, uint32_t needle) {
const uint8_t *p = (const uint8_t *)ptr;
if (n < 3) return n;
const uint8_t b0 = (uint8_t)needle;
const uint8_t b1 = (uint8_t)(needle >> 8);
const uint8_t b2 = (uint8_t)(needle >> 16);
const __m128i v0 = _mm_set1_epi8((char)b0);
const __m128i v1 = _mm_set1_epi8((char)b1);
const __m128i v2 = _mm_set1_epi8((char)b2);
size_t i = 0;
if (n >= 32) {
__m128i A = _mm_loadu_si128((const __m128i *)(p + 0));
for (; i + 32 <= n; i += 16) {
__m128i N = _mm_loadu_si128((const __m128i *)(p + i + 16));
__m128i B1 = _mm_alignr_epi8(N, A, 1); // SSSE3
__m128i B2 = _mm_alignr_epi8(N, A, 2); // SSSE3
__m128i eq0 = _mm_cmpeq_epi8(A, v0);
__m128i eq1 = _mm_cmpeq_epi8(B1, v1);
__m128i eq2 = _mm_cmpeq_epi8(B2, v2);
__m128i m01 = _mm_and_si128(eq0, eq1);
__m128i both = _mm_and_si128(m01, eq2);
unsigned mask = (unsigned)_mm_movemask_epi8(both);
if (mask) return i + (size_t)_tzcnt_u32(mask); // BMI1
A = N;
}
}
for (; i + 2 < n; ++i) {
if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
}
return n;
}
static inline unsigned mask3_ssse3(__m128i A, __m128i B, __m128i v0, __m128i v1, __m128i v2) {
__m128i B1 = _mm_alignr_epi8(B, A, 1);
__m128i B2 = _mm_alignr_epi8(B, A, 2);
__m128i m = _mm_and_si128(_mm_and_si128(_mm_cmpeq_epi8(A, v0), _mm_cmpeq_epi8(B1, v1)), _mm_cmpeq_epi8(B2, v2));
return (unsigned)_mm_movemask_epi8(m);
}
NOINLINE size_t memchr3_ssse3_fast(const void * ptr, size_t n, uint32_t needle) {
const uint8_t * p = (const uint8_t*) ptr;
if (n < 3) return n;
const uint8_t b0 = (uint8_t)needle;
const uint8_t b1 = (uint8_t)(needle >> 8);
const uint8_t b2 = (uint8_t)(needle >> 16);
const __m128i v0 = _mm_set1_epi8((char)b0);
const __m128i v1 = _mm_set1_epi8((char)b1);
const __m128i v2 = _mm_set1_epi8((char)b2);
size_t i = 0;
if (n >= 32) {
__m128i A = _mm_loadu_si128((const __m128i *)(p + 0));
i = 0;
for (; i + 80 <= n; i += 64) {
__m128i B = _mm_loadu_si128((const __m128i *)(p + i + 16));
__m128i C = _mm_loadu_si128((const __m128i *)(p + i + 32));
__m128i D = _mm_loadu_si128((const __m128i *)(p + i + 48));
__m128i E = _mm_loadu_si128((const __m128i *)(p + i + 64));
unsigned m0 = mask3_ssse3(A, B, v0, v1, v2);
unsigned m1 = mask3_ssse3(B, C, v0, v1, v2);
unsigned m2 = mask3_ssse3(C, D, v0, v1, v2);
unsigned m3 = mask3_ssse3(D, E, v0, v1, v2);
if (m0 | m1 | m2 | m3) {
if (m0) return i + 0 + (size_t)_tzcnt_u32(m0);
if (m1) return i + 16 + (size_t)_tzcnt_u32(m1);
if (m2) return i + 32 + (size_t)_tzcnt_u32(m2);
return i + 48 + (size_t)_tzcnt_u32(m3);
}
A = E;
}
for (; i + 32 <= n; i += 16) {
__m128i B = _mm_loadu_si128((const __m128i*)(p + i + 16));
unsigned m = mask3_ssse3(A, B, v0, v1, v2);
if (m) return i + (size_t)_tzcnt_u32(m);
A = B;
}
}
for (; i + 2 < n; ++i) {
if (p[i] == b0 && p[i+1] == b1 && p[i+2] == b2) return i;
}
return n;
}Giving:
buf_size=67108864 bytes, queries=65536, iters=10, seed=1
Testing on uniformly random data...
Estimated throughput (GiB/s):
memchr3_sse2_bmi1 : 34.752
memchr3_ssse3_bmi1 : 31.485
memchr3_ssse3_fast : 33.956
Testing on enwik8 with sampled needles...
Estimated throughput (GiB/s):
memchr3_sse2_bmi1 : 22.036
memchr3_ssse3_bmi1 : 21.842
memchr3_ssse3_fast : 19.704However, we can once again re-use the idea of checking the first and last positions as a pre-filter, and together with some unrolling attain better performance:
NOINLINE size_t memchr3_ssse3_prefilter_bmi1(const void * ptr, size_t n, uint32_t needle) {
const uint8_t * p = (const uint8_t *)ptr;
if (n < 3) return n;
const uint8_t b0 = (uint8_t)(needle);
const uint8_t b1 = (uint8_t)(needle >> 8);
const uint8_t b2 = (uint8_t)(needle >> 16);
const __m128i v0 = _mm_set1_epi8((char)b0);
const __m128i v1 = _mm_set1_epi8((char)b1);
const __m128i v2 = _mm_set1_epi8((char)b2);
size_t i = 0;
// Scalar until 16B alignment
uintptr_t mis = (uintptr_t)p & 15u;
if (mis) {
size_t lim = 16u - (size_t)mis;
if (lim > n) lim = n;
for (; i < lim && i + 2 < n; ++i) {
if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
}
if (i + 2 >= n) return n;
}
// Now p+i is 16B aligned.
const uint8_t * q = p + i;
size_t m = n - i;
// Need at least 32 bytes so that for starts in [0..15], bytes +1 and +2 exist in A/N window.
if (m >= 32) {
__m128i A = _mm_load_si128((const __m128i*)q);
size_t j = 0;
// 64B unroll: 4x16B per iteration, one aligned load per step.
for (; j + 64 <= m; j += 64) {
__m128i N0 = _mm_load_si128((const __m128i*)(q + j + 16));
__m128i B10 = _mm_alignr_epi8(N0, A, 1);
__m128i B20 = _mm_alignr_epi8(N0, A, 2);
unsigned m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A, v0));
unsigned m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B20, v2));
unsigned cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B10, v1));
unsigned hit = cand & m1;
if (hit) return i + j + (size_t)_tzcnt_u32(hit);
}
__m128i A1 = N0;
__m128i N1 = _mm_load_si128((const __m128i*)(q + j + 32));
__m128i B11 = _mm_alignr_epi8(N1, A1, 1);
__m128i B21 = _mm_alignr_epi8(N1, A1, 2);
m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A1, v0));
m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B21, v2));
cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B11, v1));
unsigned hit = cand & m1;
if (hit) return i + j + 16 + (size_t)_tzcnt_u32(hit);
}
__m128i A2 = N1;
__m128i N2 = _mm_load_si128((const __m128i*)(q + j + 48));
__m128i B12 = _mm_alignr_epi8(N2, A2, 1);
__m128i B22 = _mm_alignr_epi8(N2, A2, 2);
m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A2, v0));
m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B22, v2));
cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B12, v1));
unsigned hit = cand & m1;
if (hit) return i + j + 32 + (size_t)_tzcnt_u32(hit);
}
__m128i A3 = N2;
__m128i N3 = _mm_load_si128((const __m128i*)(q + j + 64));
__m128i B13 = _mm_alignr_epi8(N3, A3, 1);
__m128i B23 = _mm_alignr_epi8(N3, A3, 2);
m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A3, v0));
m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B23, v2));
cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B13, v1));
unsigned hit = cand & m1;
if (hit) return i + j + 48 + (size_t)_tzcnt_u32(hit);
}
A = N3;
}
// Finish remaining aligned 16B steps while we still have a full A/N window (>= 32 bytes remaining).
for (; j + 32 <= m; j += 16) {
__m128i N = _mm_load_si128((const __m128i*)(q + j + 16));
__m128i B1 = _mm_alignr_epi8(N, A, 1);
__m128i B2 = _mm_alignr_epi8(N, A, 2);
unsigned m0 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(A, v0));
unsigned m2 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B2, v2));
unsigned cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm_movemask_epi8(_mm_cmpeq_epi8(B1, v1));
unsigned hit = cand & m1;
if (hit) return i + j + (size_t)_tzcnt_u32(hit);
}
A = N;
}
i += j;
}
for (; i + 2 < n; ++i) {
if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
}
return n;
}The performance has improved and the code has no glaring red stall spots as evidenced by perf, hence we are satisfied with the following outcomes:
buf_size=67108864 bytes, queries=65536, iters=10, seed=1
Testing on uniformly random data...
memchr3_simple time=3849644032 ns checksum=0x64135f1aecd4b15f
memchr3_rolling time=6982241710 ns checksum=0x64135f1aecd4b15f
memchr3_qword time=3545738908 ns checksum=0x64135f1aecd4b15f
memchr3_middle time=2592279127 ns checksum=0x64135f1aecd4b15f
memchr3_fb time=2163220216 ns checksum=0x64135f1aecd4b15f
memchr3_sse2_bmi1 time=607247793 ns checksum=0x64135f1aecd4b15f
memchr3_ssse3_bmi1 time=670266244 ns checksum=0x64135f1aecd4b15f
memchr3_ssse3_fast time=621495733 ns checksum=0x64135f1aecd4b15f
memchr3_ssse3_prefilt time=570113226 ns checksum=0x64135f1aecd4b15f
Estimated throughput (GiB/s):
memchr3_simple : 5.482
memchr3_rolling : 3.022
memchr3_qword : 5.952
memchr3_middle : 8.141
memchr3_fb : 9.756
memchr3_sse2_bmi1 : 34.752
memchr3_ssse3_bmi1 : 31.485
memchr3_ssse3_fast : 33.956
memchr3_ssse3_prefilt: 37.016
Testing on enwik8 with sampled needles...
memchr3_simple time=234533252 ns checksum=0x49c24b926896aabe
memchr3_rolling time=208617318 ns checksum=0x49c24b926896aabe
memchr3_qword time=128926290 ns checksum=0x49c24b926896aabe
memchr3_middle time=112717791 ns checksum=0x49c24b926896aabe
memchr3_fb time=197571431 ns checksum=0x49c24b926896aabe
memchr3_sse2_bmi1 time=56792236 ns checksum=0x49c24b926896aabe
memchr3_ssse3_bmi1 time=57297401 ns checksum=0x49c24b926896aabe
memchr3_ssse3_fast time=63515382 ns checksum=0x49c24b926896aabe
memchr3_ssse3_prefilt time=65642677 ns checksum=0x49c24b926896aabe
Estimated throughput (GiB/s):
memchr3_simple : 5.336
memchr3_rolling : 5.999
memchr3_qword : 9.707
memchr3_middle : 11.103
memchr3_fb : 6.334
memchr3_sse2_bmi1 : 22.036
memchr3_ssse3_bmi1 : 21.842
memchr3_ssse3_fast : 19.704
memchr3_ssse3_prefilt: 19.0651.4.3. AVX2 searcher. ⎇
We simply widen the SSE2 approach to AVX2 and use the same solution as before to the problem of lane-wise operations:
NOINLINE size_t memchr3_avx2_prefilter_bmi1(const void * ptr, size_t n, uint32_t needle) {
const uint8_t * p = (const uint8_t*)ptr;
if (n < 3) return n;
const uint8_t b0 = (uint8_t)(needle);
const uint8_t b1 = (uint8_t)(needle >> 8);
const uint8_t b2 = (uint8_t)(needle >> 16);
const __m256i v0 = _mm256_set1_epi8((char)b0);
const __m256i v1 = _mm256_set1_epi8((char)b1);
const __m256i v2 = _mm256_set1_epi8((char)b2);
size_t i = 0;
uintptr_t mis = (uintptr_t)p & 31u;
if (mis) {
size_t lim = 32u - (size_t)mis;
if (lim > n) lim = n;
for (; i < lim && i + 2 < n; ++i) {
if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
}
if (i + 2 >= n) return n;
}
const uint8_t* q = p + i;
size_t m = n - i;
if (m >= 64) {
__m256i A = _mm256_load_si256((const __m256i*)(const void*)q);
size_t j = 0;
// 128B unroll: 4x32B steps per iteration.
for (; j + 128 <= m; j += 128) {
__m256i N0 = _mm256_load_si256((const __m256i*)(const void*)(q + j + 32));
__m256i X0 = _mm256_permute2x128_si256(A, N0, 0x21); // [A_hi | N0_lo]
__m256i B10 = _mm256_alignr_epi8(X0, A, 1); // bytes at +1
__m256i B20 = _mm256_alignr_epi8(X0, A, 2); // bytes at +2
unsigned m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A, v0));
unsigned m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B20, v2));
unsigned cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B10, v1));
unsigned hit = cand & m1;
if (hit) return i + j + (size_t)_tzcnt_u32(hit);
}
__m256i A1 = N0;
__m256i N1 = _mm256_load_si256((const __m256i*)(const void*)(q + j + 64));
__m256i X1 = _mm256_permute2x128_si256(A1, N1, 0x21);
__m256i B11 = _mm256_alignr_epi8(X1, A1, 1);
__m256i B21 = _mm256_alignr_epi8(X1, A1, 2);
m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A1, v0));
m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B21, v2));
cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B11, v1));
unsigned hit = cand & m1;
if (hit) return i + j + 32 + (size_t)_tzcnt_u32(hit);
}
__m256i A2 = N1;
__m256i N2 = _mm256_load_si256((const __m256i*)(const void*)(q + j + 96));
__m256i X2 = _mm256_permute2x128_si256(A2, N2, 0x21);
__m256i B12 = _mm256_alignr_epi8(X2, A2, 1);
__m256i B22 = _mm256_alignr_epi8(X2, A2, 2);
m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A2, v0));
m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B22, v2));
cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B12, v1));
unsigned hit = cand & m1;
if (hit) return i + j + 64 + (size_t)_tzcnt_u32(hit);
}
__m256i A3 = N2;
__m256i N3 = _mm256_load_si256((const __m256i*)(const void*)(q + j + 128));
__m256i X3 = _mm256_permute2x128_si256(A3, N3, 0x21);
__m256i B13 = _mm256_alignr_epi8(X3, A3, 1);
__m256i B23 = _mm256_alignr_epi8(X3, A3, 2);
m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A3, v0));
m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B23, v2));
cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B13, v1));
unsigned hit = cand & m1;
if (hit) return i + j + 96 + (size_t)_tzcnt_u32(hit);
}
A = N3;
}
for (; j + 64 <= m; j += 32) {
__m256i N = _mm256_load_si256((const __m256i*)(const void*)(q + j + 32));
__m256i X = _mm256_permute2x128_si256(A, N, 0x21);
__m256i B1 = _mm256_alignr_epi8(X, A, 1);
__m256i B2 = _mm256_alignr_epi8(X, A, 2);
unsigned m0 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(A, v0));
unsigned m2 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B2, v2));
unsigned cand = m0 & m2;
if (cand) {
unsigned m1 = (unsigned)_mm256_movemask_epi8(_mm256_cmpeq_epi8(B1, v1));
unsigned hit = cand & m1;
if (hit) return i + j + (size_t)_tzcnt_u32(hit);
}
A = N;
}
i += j;
}
// Tail scalar (covers last < 64 bytes safely).
for (; i + 2 < n; ++i)
if (p[i] == b0 && p[i + 1] == b1 && p[i + 2] == b2) return i;
return n;
}The results are still underwhelming:
Testing on uniformly random data...
Estimated throughput (GiB/s):
memchr3_sse2_bmi1 : 34.412
memchr3_ssse3_bmi1 : 31.588
memchr3_ssse3_fast : 33.978
memchr3_ssse3_prefilt: 36.525
memchr3_avx2_prefilt: 41.549
Testing on enwik8 with sampled needles...
Estimated throughput (GiB/s):
memchr3_sse2_bmi1 : 22.268
memchr3_ssse3_bmi1 : 22.075
memchr3_ssse3_fast : 19.961
memchr3_ssse3_prefilt: 19.191
memchr3_avx2_prefilt: 18.313α. Fun House ⎇
While we're at re-implementing a couple of libc methods, why not strfry? This function randomizes the order of characters in a string via a Fisher-Yates shuffle.
It has been reported that glibc's implementation is suboptimal and the distribution of characters is not uniform. Later on in the thread, we learn that Ulrich Drepper's mother was a hamster, while his father smells of elderberries.
We opt in for an approach based on rdseed mixed together with clever use of AES-NI to generate high-quality uniformly random numbers at a good speed, followed by a binary algorithm for sampling indices in range [0..n) without modulo bias or the general slowness of Lemire's method.
static ALWAYS_INLINE uint64_t rdseed64(void) {
uint64_t x;
while (_rdseed64_step(&x) == 0) { }
return x;
}
#define aes128_keyassist(key, rcon) \
({ \
__m128i tmp = _mm_aeskeygenassist_si128((key), (rcon)); \
tmp = _mm_shuffle_epi32((tmp), _MM_SHUFFLE(3,3,3,3)); \
__m128i k = (key); \
k = _mm_xor_si128((k), _mm_slli_si128((k), 4)); \
k = _mm_xor_si128((k), _mm_slli_si128((k), 4)); \
k = _mm_xor_si128((k), _mm_slli_si128((k), 4)); \
_mm_xor_si128((k), (tmp)); \
})
static ALWAYS_INLINE void aes128_expand_key(__m128i rk[11], __m128i key) {
rk[0] = key;
rk[1] = aes128_keyassist(rk[0], 0x01);
rk[2] = aes128_keyassist(rk[1], 0x02);
rk[3] = aes128_keyassist(rk[2], 0x04);
rk[4] = aes128_keyassist(rk[3], 0x08);
rk[5] = aes128_keyassist(rk[4], 0x10);
rk[6] = aes128_keyassist(rk[5], 0x20);
rk[7] = aes128_keyassist(rk[6], 0x40);
rk[8] = aes128_keyassist(rk[7], 0x80);
rk[9] = aes128_keyassist(rk[8], 0x1B);
rk[10] = aes128_keyassist(rk[9], 0x36);
}
static ALWAYS_INLINE __m128i aes128_encrypt_block(const __m128i rk[11], __m128i in) {
__m128i x = _mm_xor_si128(in, rk[0]);
x = _mm_aesenc_si128(x, rk[1]);
x = _mm_aesenc_si128(x, rk[2]);
x = _mm_aesenc_si128(x, rk[3]);
x = _mm_aesenc_si128(x, rk[4]);
x = _mm_aesenc_si128(x, rk[5]);
x = _mm_aesenc_si128(x, rk[6]);
x = _mm_aesenc_si128(x, rk[7]);
x = _mm_aesenc_si128(x, rk[8]);
x = _mm_aesenc_si128(x, rk[9]);
x = _mm_aesenclast_si128(x, rk[10]);
return x;
}
typedef struct {
__m128i rk[11];
uint64_t ctr_lo, ctr_hi; // 128-bit counter (little-endian increment)
uint32_t buf[4]; // 16 bytes = 4x u32
int idx; // next unread word
} aesctr_rng_t;
static ALWAYS_INLINE void aesctr_init_rdseed(aesctr_rng_t * g) {
uint64_t k0 = rdseed64(), k1 = rdseed64();
uint64_t c0 = rdseed64(), c1 = rdseed64();
__m128i key = _mm_set_epi64x((long long)k1, (long long)k0);
aes128_expand_key(g->rk, key);
g->ctr_lo = c0; g->ctr_hi = c1;
g->idx = 4; // force refill on first use
}
static ALWAYS_INLINE void aesctr_refill(aesctr_rng_t * g) {
__m128i ctr = _mm_set_epi64x((long long)g->ctr_hi, (long long)g->ctr_lo);
__m128i block = aes128_encrypt_block(g->rk, ctr);
g->ctr_lo++;
if (g->ctr_lo == 0) g->ctr_hi++;
_mm_storeu_si128((__m128i*)g->buf, block);
g->idx = 0;
}
static ALWAYS_INLINE uint32_t aesctr_u32(aesctr_rng_t * g) {
if (g->idx >= 4) aesctr_refill(g);
return g->buf[g->idx++];
}
static ALWAYS_INLINE uint32_t bounded_mask(aesctr_rng_t * g, uint32_t bound) {
if (bound <= 1) return 0;
uint32_t range = bound - 1;
uint32_t mask = ~0u;
mask >>= __builtin_clz(range | 1u); // mask = 2^b - 1, with 2^b > range
uint32_t x;
do { x = aesctr_u32(g) & mask; } while (x > range);
return x;
}
void strfry_bytes(uint8_t * a, size_t n) {
if (n <= 1) return;
if (n - 1 > UINT32_MAX) return;
aesctr_rng_t g;
aesctr_init_rdseed(&g);
for (size_t i = n - 1; i > 0; --i) {
uint32_t j = bounded_mask(&g, (uint32_t)(i + 1)); // j in [0..i]
uint8_t tmp = a[i]; a[i] = a[j]; a[j] = tmp;
}
}
char * strfry(char * s) {
size_t n = strlen(s);
strfry_bytes((uint8_t *) s, n);
return s;
}Benchmarking against glibc is pointless (it's not uniform, not fast and not "secure"). We can probe for uniformity using Lehmer codes; a base-64 alphabet that counts permutations by index shows us some concrete data:
m=2 trials=1000000 perms=2
perm counts: expected=500000.000 min=499783 max=500217 max_rel_dev=0.0004
chi2(perms)=0.19 dof=1 z≈-0.57
chi2(pos 0)=0.19 dof=1 z≈-0.57
chi2(pos 1)=0.19 dof=1 z≈-0.57
chi2(pos sum)=0.38 (sum dof=2)
m=3 trials=1000000 perms=6
perm counts: expected=166666.667 min=166046 max=167113 max_rel_dev=0.0037
chi2(perms)=4.45 dof=5 z≈-0.17
chi2(pos 0)=0.88 dof=2 z≈-0.56
chi2(pos 1)=0.63 dof=2 z≈-0.69
chi2(pos 2)=0.17 dof=2 z≈-0.92
chi2(pos sum)=1.68 (sum dof=6)
m=4 trials=1000000 perms=24
perm counts: expected=41666.667 min=41326 max=41934 max_rel_dev=0.0082
chi2(perms)=18.25 dof=23 z≈-0.70
chi2(pos 0)=0.89 dof=3 z≈-0.86
chi2(pos 1)=6.52 dof=3 z≈1.44
chi2(pos 2)=0.77 dof=3 z≈-0.91
chi2(pos 3)=2.02 dof=3 z≈-0.40
chi2(pos sum)=10.19 (sum dof=12)
m=5 trials=1000000 perms=120
perm counts: expected=8333.333 min=8107 max=8524 max_rel_dev=0.0272
chi2(perms)=111.13 dof=119 z≈-0.51
chi2(pos 0)=3.37 dof=4 z≈-0.22
chi2(pos 1)=6.40 dof=4 z≈0.85
chi2(pos 2)=3.34 dof=4 z≈-0.23
chi2(pos 3)=8.38 dof=4 z≈1.55
chi2(pos 4)=3.47 dof=4 z≈-0.19
chi2(pos sum)=24.96 (sum dof=20)
m=6 trials=1000000 perms=720
perm counts: expected=1388.889 min=1269 max=1508 max_rel_dev=0.0863
chi2(perms)=745.06 dof=719 z≈0.69
chi2(pos 0)=2.38 dof=5 z≈-0.83
chi2(pos 1)=5.68 dof=5 z≈0.22
chi2(pos 2)=11.88 dof=5 z≈2.18
chi2(pos 3)=4.52 dof=5 z≈-0.15
chi2(pos 4)=5.23 dof=5 z≈0.07
chi2(pos 5)=6.85 dof=5 z≈0.58
chi2(pos sum)=36.54 (sum dof=30)
m=7 trials=1000000 perms=5040
perm counts: expected=198.413 min=139 max=250 max_rel_dev=0.2994
chi2(perms)=5022.50 dof=5039 z≈-0.16
chi2(pos 0)=5.69 dof=6 z≈-0.09
chi2(pos 1)=5.98 dof=6 z≈-0.01
chi2(pos 2)=6.84 dof=6 z≈0.24
chi2(pos 3)=9.99 dof=6 z≈1.15
chi2(pos 4)=4.28 dof=6 z≈-0.50
chi2(pos 5)=6.46 dof=6 z≈0.13
chi2(pos 6)=5.24 dof=6 z≈-0.22
chi2(pos sum)=44.48 (sum dof=42)Our code could be adapted to shuffle objects of any size, but since this is a simple exercise in programming, we leave this to the reader. Other interesting functions that we may tackle in the future are strtok, strspn, strrchr, strpbrk or memcmp.
In the next post, we will be looking more closely into longer (>= 4 bytes), sparser (as in, matching with gaps) and finer (non-overlapping 2-bit or 4-bit sequences, useful in Bioinformatics) needles before moving on to string algorithms based on suffix trees, suffix arrays and the BWT and the FM-index in part three.
