parallel decompressor
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 2e7854e..a5c06d2 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -9,7 +9,7 @@
"type": "cppdbg",
"request": "launch",
"program": "${workspaceFolder}/bzip3",
- "args": ["-t", "${workspaceFolder}/corpus/linux.bz3"],
+ "args": ["-e", "-b", "16", "-j", "2", "${workspaceFolder}/corpus/linux.tar", "${workspaceFolder}/corpus/linux.bz3"],
"stopAtEntry": false,
"cwd": "${fileDirname}",
"environment": [],
diff --git a/Makefile b/Makefile
index 45df634..41effd6 100644
--- a/Makefile
+++ b/Makefile
@@ -1,6 +1,6 @@
CC=clang
-CFLAGS=-O2 -march=native -mtune=native -flto -Iinclude -g3
+CFLAGS=-O2 -march=native -mtune=native -Iinclude -g3 -fPIC
PREFIX?=/usr/local
.PHONY: all clean format install cloc
diff --git a/src/libbz3.c b/src/libbz3.c
index d2fcb27..e89f723 100644
--- a/src/libbz3.c
+++ b/src/libbz3.c
@@ -290,6 +290,8 @@ void bz3_encode_blocks(struct bz3_state * states[], uint8_t * buffers[], int32_t
}
for(int32_t i = 0; i < n; i++)
pthread_join(threads[i], NULL);
+ for(int32_t i = 0; i < n; i++)
+ sizes[i] = messages[i].size;
}
void bz3_decode_blocks(struct bz3_state * states[], uint8_t * buffers[], int32_t sizes[], int32_t orig_sizes[], int32_t n) {
diff --git a/src/main.c b/src/main.c
index e11ee7e..f8e5c26 100644
--- a/src/main.c
+++ b/src/main.c
@@ -35,7 +35,7 @@ int main(int argc, char * argv[]) {
int no_bz3_suffix = 0;
// command line arguments
- int force_stdstreams = 0;
+ int force_stdstreams = 0, workers = 0;
// the block size
u32 block_size = MiB(8);
@@ -53,6 +53,9 @@ int main(int argc, char * argv[]) {
i++;
} else if (argv[i][1] == 'c') {
force_stdstreams = 1;
+ } else if (argv[i][1] == 'j') {
+ workers = atoi(argv[i + 1]);
+ i++;
}
} else {
if(bz3_file != NULL && regular_file != NULL) {
@@ -82,6 +85,7 @@ int main(int argc, char * argv[]) {
fprintf(stderr, "Extra flags:\n");
fprintf(stderr, " -c: force reading/writing from standard streams\n");
fprintf(stderr, " -b N: set block size in MiB\n");
+ fprintf(stderr, " -j N: set the amount of parallel threads\n");
return 1;
}
@@ -176,82 +180,192 @@ int main(int argc, char * argv[]) {
}
}
- struct bz3_state * state = bz3_new(block_size);
-
- if (state == NULL) {
- fprintf(stderr, "Failed to create a block encoder state.\n");
+ if(workers > 16 || workers < 0) {
+ fprintf(stderr, "Number of workers must be between 2 and 16.\n");
return 1;
}
- u8 * buffer = malloc(block_size + block_size / 50 + 16);
+ if(workers <= 1) {
+ struct bz3_state * state = bz3_new(block_size);
- if (mode == 1) {
- s32 read_count;
- while (!feof(input_des)) {
- read_count = fread(buffer, 1, block_size, input_des);
+ if (state == NULL) {
+ fprintf(stderr, "Failed to create a block encoder state.\n");
+ return 1;
+ }
- s32 new_size = bz3_encode_block(state, buffer, read_count);
- if (new_size == -1) {
- fprintf(stderr, "Failed to encode a block: %s\n", bz3_strerror(state));
- return 1;
- }
+ u8 * buffer = malloc(block_size + block_size / 50 + 16);
- write_neutral_s32(byteswap_buf, new_size);
- fwrite(byteswap_buf, 4, 1, output_des);
- write_neutral_s32(byteswap_buf, read_count);
- fwrite(byteswap_buf, 4, 1, output_des);
- fwrite(buffer, new_size, 1, output_des);
+ if(!buffer) {
+ fprintf(stderr, "Failed to allocate memory.\n");
+ return 1;
}
- } else if (mode == -1) {
- s32 new_size, old_size;
- while (!feof(input_des)) {
- if (fread(&byteswap_buf, 1, 4, input_des) != 4) {
- // Assume that the file has no more data.
- break;
+
+ if (mode == 1) {
+ s32 read_count;
+ while (!feof(input_des)) {
+ read_count = fread(buffer, 1, block_size, input_des);
+
+ s32 new_size = bz3_encode_block(state, buffer, read_count);
+ if (new_size == -1) {
+ fprintf(stderr, "Failed to encode a block: %s\n", bz3_strerror(state));
+ return 1;
+ }
+
+ write_neutral_s32(byteswap_buf, new_size);
+ fwrite(byteswap_buf, 4, 1, output_des);
+ write_neutral_s32(byteswap_buf, read_count);
+ fwrite(byteswap_buf, 4, 1, output_des);
+ fwrite(buffer, new_size, 1, output_des);
+ }
+ } else if (mode == -1) {
+ s32 new_size, old_size;
+ while (!feof(input_des)) {
+ if (fread(&byteswap_buf, 1, 4, input_des) != 4) {
+ // Assume that the file has no more data.
+ break;
+ }
+ new_size = read_neutral_s32(byteswap_buf);
+ if (fread(&byteswap_buf, 1, 4, input_des) != 4) {
+ fprintf(stderr, "I/O error.\n");
+ return 1;
+ }
+ old_size = read_neutral_s32(byteswap_buf);
+ fread(buffer, 1, new_size, input_des);
+ if (bz3_decode_block(state, buffer, new_size, old_size) == -1) {
+ fprintf(stderr, "Failed to decode a block: %s\n", bz3_strerror(state));
+ return 1;
+ }
+ fwrite(buffer, old_size, 1, output_des);
+ }
+ } else if (mode == 2) {
+ s32 new_size, old_size;
+ while (!feof(input_des)) {
+ if (fread(&byteswap_buf, 1, 4, input_des) != 4) {
+ // Assume that the file has no more data.
+ break;
+ }
+ new_size = read_neutral_s32(byteswap_buf);
+ if (fread(&byteswap_buf, 1, 4, input_des) != 4) {
+ fprintf(stderr, "I/O error.\n");
+ return 1;
+ }
+ old_size = read_neutral_s32(byteswap_buf);
+ fread(buffer, 1, new_size, input_des);
+ if (bz3_decode_block(state, buffer, new_size, old_size) == -1) {
+ fprintf(stderr, "Failed to decode a block: %s\n", bz3_strerror(state));
+ return 1;
+ }
}
- new_size = read_neutral_s32(byteswap_buf);
- if (fread(&byteswap_buf, 1, 4, input_des) != 4) {
- fprintf(stderr, "I/O error.\n");
+ }
+
+ if (bz3_last_error(state) != BZ3_OK) {
+ fprintf(stderr, "Failed to read data: %s\n", bz3_strerror(state));
+ return 1;
+ }
+
+ free(buffer);
+
+ bz3_free(state);
+
+ fclose(input_des);
+ fclose(output_des);
+ } else {
+ struct bz3_state * states[workers];
+ u8 * buffers[workers];
+ s32 sizes[workers];
+ s32 old_sizes[workers];
+ for(s32 i = 0; i < workers; i++) {
+ states[i] = bz3_new(block_size);
+ if (states[i] == NULL) {
+ fprintf(stderr, "Failed to create a block encoder state.\n");
return 1;
}
- old_size = read_neutral_s32(byteswap_buf);
- fread(buffer, 1, new_size, input_des);
- if (bz3_decode_block(state, buffer, new_size, old_size) == -1) {
- fprintf(stderr, "Failed to decode a block: %s\n", bz3_strerror(state));
+ buffers[i] = malloc(block_size + block_size / 50 + 16);
+ if(!buffers[i]) {
+ fprintf(stderr, "Failed to allocate memory.\n");
return 1;
}
- fwrite(buffer, old_size, 1, output_des);
}
- } else if (mode == 2) {
- s32 new_size, old_size;
- while (!feof(input_des)) {
- if (fread(&byteswap_buf, 1, 4, input_des) != 4) {
- // Assume that the file has no more data.
- break;
+
+ if (mode == 1) {
+ while(!feof(input_des)) {
+ s32 i = 0;
+ for(; i < workers; i++) {
+ size_t read_count = fread(buffers[i], 1, block_size, input_des);
+ sizes[i] = old_sizes[i] = read_count;
+ if(read_count < block_size)
+ break;
+ }
+ bz3_encode_blocks(states, buffers, sizes, i);
+ for(s32 j = 0; j < i; j++) {
+ if (bz3_last_error(states[j]) != BZ3_OK) {
+ fprintf(stderr, "Failed to encode data: %s\n", bz3_strerror(states[j]));
+ return 1;
+ }
+ }
+ for(s32 j = 0; j < i; j++) {
+ write_neutral_s32(byteswap_buf, sizes[j]);
+ fwrite(byteswap_buf, 4, 1, output_des);
+ write_neutral_s32(byteswap_buf, old_sizes[j]);
+ fwrite(byteswap_buf, 4, 1, output_des);
+ fwrite(buffers[j], sizes[j], 1, output_des);
+ }
}
- new_size = read_neutral_s32(byteswap_buf);
- if (fread(&byteswap_buf, 1, 4, input_des) != 4) {
- fprintf(stderr, "I/O error.\n");
- return 1;
+ end:;
+ } else if(mode == -1) {
+ while(!feof(input_des)) {
+ s32 i = 0;
+ for(; i < workers; i++) {
+ if(fread(&byteswap_buf, 1, 4, input_des) != 4)
+ break;
+ sizes[i] = read_neutral_s32(byteswap_buf);
+ if(fread(&byteswap_buf, 1, 4, input_des) != 4)
+ break;
+ old_sizes[i] = read_neutral_s32(byteswap_buf);
+ if(fread(buffers[i], 1, sizes[i], input_des) != sizes[i]) {
+ fprintf(stderr, "I/O error.\n");
+ return 1;
+ }
+ }
+ bz3_decode_blocks(states, buffers, sizes, old_sizes, i);
+ for(s32 j = 0; j < i; j++) {
+ if (bz3_last_error(states[j]) != BZ3_OK) {
+ fprintf(stderr, "Failed to decode data: %s\n", bz3_strerror(states[j]));
+ return 1;
+ }
+ }
+ for(s32 j = 0; j < i; j++) {
+ fwrite(buffers[j], old_sizes[j], 1, output_des);
+ }
}
- old_size = read_neutral_s32(byteswap_buf);
- fread(buffer, 1, new_size, input_des);
- if (bz3_decode_block(state, buffer, new_size, old_size) == -1) {
- fprintf(stderr, "Failed to decode a block: %s\n", bz3_strerror(state));
- return 1;
+ } else if(mode == 2) {
+ while(!feof(input_des)) {
+ s32 i = 0;
+ for(; i < workers; i++) {
+ if(fread(&byteswap_buf, 1, 4, input_des) != 4)
+ break;
+ sizes[i] = read_neutral_s32(byteswap_buf);
+ if(fread(&byteswap_buf, 1, 4, input_des) != 4)
+ break;
+ old_sizes[i] = read_neutral_s32(byteswap_buf);
+ if(fread(buffers[i], 1, sizes[i], input_des) != sizes[i]) {
+ fprintf(stderr, "I/O error.\n");
+ return 1;
+ }
+ }
+ bz3_decode_blocks(states, buffers, sizes, old_sizes, i);
+ for(s32 j = 0; j < i; j++) {
+ if (bz3_last_error(states[j]) != BZ3_OK) {
+ fprintf(stderr, "Failed to decode data: %s\n", bz3_strerror(states[j]));
+ return 1;
+ }
+ }
}
}
- }
- if (bz3_last_error(state) != BZ3_OK) {
- fprintf(stderr, "Failed to read data: %s\n", bz3_strerror(state));
- return 1;
+ for(s32 i = 0; i < workers; i++) {
+ free(buffers[i]);
+ bz3_free(states[i]);
+ }
}
-
- free(buffer);
-
- bz3_free(state);
-
- fclose(input_des);
- fclose(output_des);
}
