On Thu, 20 Feb 2020 at 18:43, David Fetter <[email protected]> wrote:>
> On Thu, Feb 20, 2020 at 02:36:02PM +0100, Tomas Vondra wrote:
> > I think the wc2 is showing that maybe instead of parallelizing the
> > parsing, we might instead try using a different tokenizer/parser and
> > make the implementation more efficient instead of just throwing more
> > CPUs on it.
>
> That was what I had in mind.
>
> > I don't know if our code is similar to what wc does, maytbe parsing
> > csv is more complicated than what wc does.
>
> CSV parsing differs from wc in that there are more states in the state
> machine, but I don't see anything fundamentally different.
The trouble with a state machine based approach is that the state
transitions form a dependency chain, which means that at best the
processing rate will be 4-5 cycles per byte (L1 latency to fetch the
next state).
I whipped together a quick prototype that uses SIMD and bitmap
manipulations to do the equivalent of CopyReadLineText() in csv mode
including quotes and escape handling, this runs at 0.25-0.5 cycles per
byte.
Regards,
Ants Aasma
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
#include <inttypes.h>
#include <immintrin.h>
#include <string.h>
#include <time.h>
#define likely(x) __builtin_expect((x),1)
#define unlikely(x) __builtin_expect((x),0)
/*
* Create a bitmap of matching characters in the next 64 bytes
**/
static inline uint64_t
find_chars(__m256i *data, char c)
{
const __m256i mask = _mm256_set1_epi8(c);
uint64_t result = (uint32_t) _mm256_movemask_epi8(_mm256_cmpeq_epi8(data[0], mask));
result |= ((uint64_t) _mm256_movemask_epi8(_mm256_cmpeq_epi8(data[1], mask))) << 32;
return result;
}
/*
* Creates a bitmap of unpaired escape characters
**/
static inline uint64_t
find_unpaired_escapes(uint64_t escapes)
{
// TODO: handle unpaired escape from end of last iteration
uint64_t p, e, r;
p = escapes;
e = escapes;
r = escapes;
while (e) {
p = e;
e = (e << 1) & escapes;
r ^= e;
}
return r & p;
}
/*
* Creates a bitmap mask of quoted sections given locations of
* quote chatacters.
**/
static inline uint64_t
find_quote_mask(uint64_t quote_bits, uint64_t *prev_inside_quote)
{
uint64_t mask = _mm_cvtsi128_si64(_mm_clmulepi64_si128(
_mm_set_epi64x(0ULL, quote_bits), _mm_set1_epi8(0xFF), 0));
mask ^= *prev_inside_quote;
*prev_inside_quote = ((int64_t) mask) >> 63;
return mask;
}
/*
* Parses len bytes from buf according to csv rules and writes start positions of
* records to output. Returns number of rows found.
**/
int64_t
parseIntoLines(char *buf, size_t len, size_t *output)
{
__m256i* input = (__m256i*) buf;
uint64_t prev_inside_quote = 0;
size_t pos = 0;
uint64_t numfound = 0;
*output++ = 0;
numfound++;
while (pos < len - 64) {
uint64_t quotes = find_chars(input, '"');
uint64_t escapes = find_chars(input, '\\');
uint64_t unpaired_escapes = find_unpaired_escapes(escapes);
uint64_t unescaped_quotes = quotes & ~(unpaired_escapes << 1);
uint64_t newlines = find_chars(input, '\n');
uint64_t quote_mask = find_quote_mask(unescaped_quotes, &prev_inside_quote);
uint64_t tokenpositions = newlines & ~quote_mask;
uint64_t carriages = find_chars(input, '\r') & ~quote_mask;
if (unlikely(carriages != 0))
exit(1);
uint64_t offset = 0;
while (tokenpositions > 0) {
int numchars = __builtin_ctzll(tokenpositions);
tokenpositions >>= numchars;
tokenpositions >>= 1;
offset += numchars + 1;
*output++ = pos + offset;
numfound++;
}
pos += 64;
input += 2;
}
// TODO: handle tail
return numfound;
}
int main(int argc, char *argv[])
{
char *buf;
uint64_t *lines;
uint64_t iters = 1;
if (argc < 2)
{
printf("Usage: simdcopy csvfile [iterations]\n");
return 1;
}
if (argc > 2)
{
iters = atol(argv[2]);
}
buf = aligned_alloc(64, 1024*1024*1024);
lines = aligned_alloc(8, 128*1024*1024*sizeof(uint64_t));
if (!buf || !lines)
return 1;
FILE *f = fopen(argv[1], "r");
if (!f)
return 1;
#define READBLOCK (1024*1024)
size_t len = 0;
while (len < sizeof(buf) - READBLOCK)
{
size_t result = fread(buf + len, 1, READBLOCK, f);
if (!result)
break;
len += result;
}
fclose(f);
struct timespec start;
struct timespec end;
printf("Parsing %lu bytes, %lu times\n", len, iters);
uint64_t numfound;
clock_gettime(CLOCK_MONOTONIC, &start);
for (uint64_t i = 0; i < iters; i++) {
numfound = parseIntoLines(buf, len, lines);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double delta = (end.tv_sec - start.tv_sec) + (1.e-9)*(end.tv_nsec - start.tv_nsec);
printf("Found %lu rows in %lu bytes in %f milliseconds\n", numfound, len*iters, delta*1000);
printf(" Speed: %0.3f GB/s\n", len/delta/1e9*iters);
return 0;
}