From: Sebastian Andrzej Siewior <sebast...@breakpoint.cc> Changes since v2: - use outq
- The block is considered as decompressed after LZMA_STREAM_END (not if one of the buffer reach their limit). - If the in-buffer would produce more than allocated for the out-buffer then LZMA_DATA_ERROR is returned. - If `timeout' is set, the expiry time is computed on function entry and not before starting to wait for the thread until it makes progress. - lzma_outq_enable_partial_output() is used unconditionally. Changes since v1: - Use the `timeout' parameter / mythread_cond_timedwait() in the main thread. - A little bit of doxygen documentation. - Added enum `lzma_memlimit_opt' to lzma_stream_decoder_mt() as an init parameter. The idea is to specify how to obey the memory limit so the user can keep using one API and not worry to fail due to the memory limit. Lets assume the archive has a 9MiB dictionary, 24MiB block of uncompressed data. The archive contains two compressed blocks of 10 MiB each. Using two threads, the memory requirement is roughly (9 + 24 + 10) * 2 = 86 MiB On a system with 64 MiB of memory with additional 128MiB of swap it likely leads to the use of (say 30 MiB) swap memory during decompression which will slow down the whole operation. The synchronous API would do just fine with only 9 MiB of memory. So to not complicate things, invoking lzma_stream_decoder_mt() with a memory limit of 32 MiB three scenarios are possible: - LZMA_MEMLIMIT_THREAD One thread requires 43MiB of memory and would exceed the memory limit. However, continue with one thread instead of possible two. - LZMA_MEMLIMIT_NO_THREAD One thread requires 43MiB of memory and would exceed the memory limit. Fallback to the synchronous API without buffered input / output memory. - LZMA_MEMLIMIT_COMPLETE In this scenario it would behave like LZMA_MEMLIMIT_NO_THREAD. However, with a dictionary size > 32MiB it would abort. - Add get_free_mem() to get free memory on Linux. It reads the value of `MemAvailable' from `/proc/meminfo'. The difference compared to `lzma_physmem()' is that it returns the amout of memory that the system thinks is usable without using swap memory. A big machine may have 128GiB of memory but may also have a few applications running consuming 88GiB of it. The `free' command may report 38 GiB in page cache and 2 GiB in free. The `MemAvailable' will report something between 37 - 39 GiB area. - New threads are only created if memory limit does not exceed filter_size + decomp_block * 2 which accounts the output and worst-case input (uncompressed data). This is to avoid exceeding the memory limit while extending the input buffer. Changes since RFC: - Options are considered - Memory limits are considered. The limit may get exceeded if we get close to it and then the existing threads enlarge their in-buffer. - Blocks with no size information in the header can be decompressed. This happens synchronous. Signed-off-by: Sebastian Andrzej Siewior <sebast...@breakpoint.cc> --- src/liblzma/api/lzma/container.h | 79 +- src/liblzma/common/Makefile.inc | 6 + src/liblzma/common/outqueue.h | 9 + src/liblzma/common/stream_decoder_mt.c | 1157 ++++++++++++++++++++++++ src/liblzma/liblzma.map | 1 + src/xz/coder.c | 110 ++- 6 files changed, 1357 insertions(+), 5 deletions(-) create mode 100644 src/liblzma/common/stream_decoder_mt.c diff --git a/src/liblzma/api/lzma/container.h b/src/liblzma/api/lzma/container.h index 4ad7ce6a8ec0d..505aa3c16ffc4 100644 --- a/src/liblzma/api/lzma/container.h +++ b/src/liblzma/api/lzma/container.h @@ -173,7 +173,7 @@ typedef struct { uint32_t reserved_int2; uint32_t reserved_int3; uint32_t reserved_int4; - uint64_t reserved_int5; + uint64_t memlimit; uint64_t reserved_int6; uint64_t reserved_int7; uint64_t reserved_int8; @@ -587,6 +587,83 @@ extern LZMA_API(lzma_ret) lzma_stream_decoder( lzma_nothrow lzma_attr_warn_unused_result; +/** + * \brief The `memory_limit' argument for lzma_stream_decoder_mt() + * + * The required memory limit for decompression is determined later once the XZ + * block header is parsed and the compression parameters are known. + * Each thread needs to allocate the memory for the filters (size of the + * dictionary) and a buffer for the compressed and decompressed data. + * This option specifies how to proceed if is not possible to continue even with + * one thread. + */ +typedef enum { + + + LZMA_MEMLIMIT_COMPLETE = 0, + /**< + * \brief Abort decompression. + * + * Decompression will fail if the specified memory limit would be + * exceeded even with one thread. + */ + + + LZMA_MEMLIMIT_THREAD = 1, + /**< + * \brief Continue with one thread. + * + * Should the required memory exceed the memory limit then continue with + * one thread. The input and output is buffered and decompression can + * happen asynchronous. + */ + + + LZMA_MEMLIMIT_NO_THREAD = 2 + /**< + * \brief Continue with without a thread. + * + * Should the required memory exceed the memory limit then continue + * without a thread. The input and output is not buffered and + * decompression is performed in synchronous fashion, similar to + * lzma_stream_decoder(). + */ + +} lzma_memlimit_opt; + + +/** + * \brief Initialize multithreaded .xz Stream decoder + * + * \param strm Pointer to properly prepared lzma_stream + * \param options Pointer to multithreaded compression options + * \param mem_limit Specify how to behave if the memory is about to + * be exceeded. See lzma_memlimit_opt for details. + * + * This provides the functionality of lzma_stream_decoder() a single function + * for multithreaded use. + * + * The decoder can decode multiple blocks in parallel. This requires that the + * block header contains size information which are added by the multi threaded + * encoder, see lzma_stream_encoder_mt(). + * + * A stream with one block will only utilize one thread. A stream with multiple + * blocks but without the header size information will be processed + * synchronously (without using the thread) similar to lzma_stream_decoder(). + * Concatenated streams are processed synchronously. Once the first stream is + * fully decoded then the following stream will be processed. + * + * \return - LZMA_OK: Initialization was successful. + * - LZMA_MEM_ERROR: Cannot allocate memory. + * - LZMA_MEMLIMIT_ERROR: Memory usage limit was reached. + * - LZMA_OPTIONS_ERROR: Unsupported flags. + * - LZMA_PROG_ERROR + */ +extern LZMA_API(lzma_ret) + lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options, + lzma_memlimit_opt mem_limit); + + /** * \brief Decode .xz Streams and .lzma files with autodetection * diff --git a/src/liblzma/common/Makefile.inc b/src/liblzma/common/Makefile.inc index 8205eb7f426d8..e01a6e4c1f59a 100644 --- a/src/liblzma/common/Makefile.inc +++ b/src/liblzma/common/Makefile.inc @@ -80,4 +80,10 @@ liblzma_la_SOURCES += \ common/stream_decoder.h \ common/stream_flags_decoder.c \ common/vli_decoder.c + +if COND_THREADS +liblzma_la_SOURCES += \ + common/stream_decoder_mt.c +endif + endif diff --git a/src/liblzma/common/outqueue.h b/src/liblzma/common/outqueue.h index 355e0ced2cfc3..de630ee855c6b 100644 --- a/src/liblzma/common/outqueue.h +++ b/src/liblzma/common/outqueue.h @@ -203,6 +203,15 @@ lzma_outq_has_buf(const lzma_outq *outq) return outq->bufs_in_use < outq->bufs_limit; } +/// \brief Test if there is at least one preallocated buffer free +/// +/// This returns true then a new buffer will be pre-allocated. +/// +static inline bool +lzma_outq_has_buf_prealloc(const lzma_outq *outq) +{ + return outq->bufs_in_use < outq->bufs_allocated; +} /// \brief Test if the queue is completely empty static inline bool diff --git a/src/liblzma/common/stream_decoder_mt.c b/src/liblzma/common/stream_decoder_mt.c new file mode 100644 index 0000000000000..56c9b4f17ace6 --- /dev/null +++ b/src/liblzma/common/stream_decoder_mt.c @@ -0,0 +1,1157 @@ +/////////////////////////////////////////////////////////////////////////////// +// +/// \file stream_decoder_mt.c +/// \brief Multithreaded .xz Stream decoder +// +// Author: Sebastian Andrzej Siewior +// +// This file has been put into the public domain. +// You can do whatever you want with this file. +// +/////////////////////////////////////////////////////////////////////////////// + +#include "common.h" +#include "block_decoder.h" +#include "stream_decoder.h" +#include "index.h" +#include "outqueue.h" + +#include <stdio.h> + +typedef enum { + /// Waiting for work. + THR_IDLE, + + /// Decoding is in progress. + THR_RUN, + + /// The main thread wants the thread to stop whatever it was doing + /// but not exit. + THR_STOP, + + /// The main thread wants the thread to exit. + THR_EXIT, + +} worker_state; + +struct worker_thread { + uint8_t *in; + /// Size of ->in + size_t in_size; + /// Size of current block + size_t in_block_size; + /// Bytes written to ->in (coordinator) + size_t in_filled; + /// Bytes consumed of ->in (worker) + size_t in_pos; + + worker_state state; + + /// Pointer to the main structure is needed when putting this + /// thread back to the stack of free threads. + struct lzma_stream_coder *coder; + /// The allocator is set by the main thread. + const lzma_allocator *allocator; + + lzma_outbuf *outbuf; + bool partial_update; + size_t secret_progress; + + lzma_next_coder block_decoder; + lzma_block block_options; + struct worker_thread *next; + + /// Filter size is used for memusage accounting + size_t filter_size; + + mythread_mutex mutex; + mythread_cond cond; + mythread thread_id; +}; + +struct lzma_stream_coder { + size_t exp_filter_size; + size_t exp_block_size; + + lzma_index_hash *index_hash; + + mythread_mutex mutex; + mythread_cond cond; + + /// Array of allocated thread-specific structures + struct worker_thread *threads; + + /// Number of structures in "threads" above. This is also the + /// number of threads that will be created at maximum. + uint32_t threads_max; + + /// Number of thread structures that have been initialized, and + /// thus the number of worker threads actually created so far. + uint32_t threads_initialized; + + /// Stack of free threads. When a thread finishes, it puts itself + /// back into this stack. This starts as empty because threads + /// are created only when actually needed. + struct worker_thread *threads_free; + /// Current thread decompressed is read from + + /// Current thread compressed data is written to + struct worker_thread *thr_write; + lzma_ret thread_error; + + lzma_outq outq; + + /// Memory usage limit + uint64_t memlimit; + /// Amount of memory actually needed (only an estimate) + uint64_t memusage; + + enum { + SEQ_STREAM_HEADER, + SEQ_BLOCK_HEADER, + SEQ_BLOCK, + SEQ_INDEX, + SEQ_STREAM_FOOTER, + SEQ_STREAM_PADDING, + } sequence; + + /// True if block sizes are missing and threads are not used + bool direct_decomp; + + bool tell_no_check; + bool tell_unsupported_check; + bool tell_any_check; + bool ignore_check; + bool concatenated; + bool first_stream; + + lzma_stream_flags stream_flags; + lzma_memlimit_opt memory_limit; + uint32_t timeout; + + size_t pos; + uint8_t buffer[LZMA_BLOCK_HEADER_SIZE_MAX]; +}; + +static void thr_do_partial_update(void *thr_ptr) +{ + struct worker_thread *thr = thr_ptr; + + mythread_mutex_lock(&thr->mutex); + thr->partial_update = true; + mythread_mutex_unlock(&thr->mutex); +} + +static void worker_set_error(struct worker_thread *thr, lzma_ret err_code) +{ + mythread_mutex_lock(&thr->mutex); + if (thr->state == THR_RUN) + thr->state = THR_IDLE; + mythread_mutex_unlock(&thr->mutex); + + mythread_mutex_lock(&thr->coder->mutex); + if (thr->coder->thread_error == LZMA_OK) + thr->coder->thread_error = err_code; + + thr->next = thr->coder->threads_free; + thr->coder->threads_free = thr; + mythread_cond_signal(&thr->coder->cond); + mythread_mutex_unlock(&thr->coder->mutex); +} + +/// Use smaller chunks so cancellation attempts don't block for long +#define CHUNK_SIZE 16384 +static MYTHREAD_RET_TYPE worker_decoder(void *thr_ptr) +{ + struct worker_thread *thr = thr_ptr; + size_t in_filled; + size_t out_pos; + unsigned char tmp_buf; + size_t tmp_buf_pos = 0; + lzma_ret ret; + +next_loop_lock: + + mythread_mutex_lock(&thr->mutex); +next_loop_unlocked: + + if (thr->state == THR_IDLE) { + mythread_cond_wait(&thr->cond, &thr->mutex); + goto next_loop_unlocked; + } else if (thr->state == THR_EXIT) { + mythread_mutex_unlock(&thr->mutex); + + lzma_free(thr->in, thr->allocator); + + lzma_next_end(&thr->block_decoder, thr->allocator); + + mythread_mutex_destroy(&thr->mutex); + mythread_cond_destroy(&thr->cond); + return MYTHREAD_RET_VALUE; + + } else if (thr->state == THR_STOP) { + thr->state = THR_IDLE; + mythread_cond_wait(&thr->cond, &thr->mutex); + goto next_loop_unlocked; + } else if (thr->state != THR_RUN) { + thr->state = THR_IDLE; + mythread_mutex_unlock(&thr->mutex); + + worker_set_error(thr, LZMA_PROG_ERROR); + + goto next_loop_lock; + } + + in_filled = thr->in_filled; + + if (in_filled == thr->in_pos) { + mythread_cond_wait(&thr->cond, &thr->mutex); + goto next_loop_unlocked; + } + + mythread_mutex_unlock(&thr->mutex); + + if ((in_filled - thr->in_pos) > CHUNK_SIZE) + in_filled = thr->in_pos + CHUNK_SIZE; + + out_pos = thr->secret_progress; + + // Check if it attempts to write more than written in the header. + if (unlikely(out_pos == thr->outbuf->allocated)) { + + ret = thr->block_decoder.code(thr->block_decoder.coder, + thr->allocator, + thr->in, &thr->in_pos, in_filled, + &tmp_buf, &tmp_buf_pos, 1, + LZMA_RUN); + } else { + ret = thr->block_decoder.code(thr->block_decoder.coder, + thr->allocator, + thr->in, &thr->in_pos, in_filled, + thr->outbuf->buf, &out_pos, thr->outbuf->allocated, + LZMA_RUN); + } + if (ret == LZMA_OK) { + bool partial_update; + + mythread_mutex_lock(&thr->mutex); + partial_update = thr->partial_update; + mythread_mutex_unlock(&thr->mutex); + + if (partial_update && (out_pos != thr->outbuf->pos)) { + + mythread_mutex_lock(&thr->coder->mutex); + thr->outbuf->pos = out_pos; + + mythread_cond_signal(&thr->coder->cond); + mythread_mutex_unlock(&thr->coder->mutex); + } + + thr->secret_progress = out_pos; + + // If the input buffer has been fully consumed and we made no + // progress then something is wrong. + if (thr->in_pos == thr->in_block_size || tmp_buf_pos != 0) { + tmp_buf_pos = 0; + worker_set_error(thr, LZMA_DATA_ERROR); + } + + goto next_loop_lock; + } else if (ret == LZMA_STREAM_END) { + + mythread_mutex_lock(&thr->mutex); + if (thr->state == THR_RUN) + thr->state = THR_IDLE; + mythread_mutex_unlock(&thr->mutex); + + mythread_mutex_lock(&thr->coder->mutex); + + thr->outbuf->pos = out_pos; + thr->outbuf->unpadded_size = lzma_block_unpadded_size(&thr->block_options); + thr->outbuf->uncompressed_size = thr->block_options.uncompressed_size; + thr->outbuf->finished = true; + thr->outbuf->pos = out_pos; + thr->outbuf = NULL; + + thr->next = thr->coder->threads_free; + thr->coder->threads_free = thr; + + mythread_cond_signal(&thr->coder->cond); + mythread_mutex_unlock(&thr->coder->mutex); + goto next_loop_lock; + } else { + worker_set_error(thr, ret); + + goto next_loop_lock; + } + return MYTHREAD_RET_VALUE; +} + +static void threads_end(struct lzma_stream_coder *coder, + const lzma_allocator *allocator) +{ + uint32_t i; + + for (i = 0; i < coder->threads_initialized; ++i) { + mythread_mutex_lock(&coder->threads[i].mutex); + coder->threads[i].state = THR_EXIT; + mythread_cond_signal(&coder->threads[i].cond); + mythread_mutex_unlock(&coder->threads[i].mutex); + } + + for (i = 0; i < coder->threads_initialized; ++i) + mythread_join(coder->threads[i].thread_id); + + coder->threads_initialized = 0; + lzma_free(coder->threads, allocator); + return; +} + +static void threads_stop(struct lzma_stream_coder *coder) +{ + uint32_t i; + + for (i = 0; i < coder->threads_initialized; ++i) { + mythread_mutex_lock(&coder->threads[i].mutex); + coder->threads[i].state = THR_STOP; + mythread_cond_signal(&coder->threads[i].cond); + mythread_mutex_unlock(&coder->threads[i].mutex); + } +} + +static void stream_decoder_mt_end(void *coder_ptr, + const lzma_allocator *allocator) +{ + struct lzma_stream_coder *coder = coder_ptr; + + threads_end(coder, allocator); + lzma_outq_end(&coder->outq, allocator); + lzma_index_hash_end(coder->index_hash, allocator); + lzma_free(coder, allocator); +} + +static lzma_ret stream_decode_in(struct lzma_stream_coder *coder, + const uint8_t *restrict in, + size_t *restrict in_pos, + size_t in_size) +{ + struct worker_thread *thr = coder->thr_write; + size_t old_filled; + size_t cur_in_infilled; + + mythread_mutex_lock(&thr->mutex); + + old_filled = thr->in_filled; + mythread_mutex_unlock(&thr->mutex); + cur_in_infilled = old_filled; + + lzma_bufcpy(in, in_pos, in_size, + thr->in, &cur_in_infilled, thr->in_block_size); + + mythread_mutex_lock(&thr->mutex); + thr->in_filled = cur_in_infilled; + + if (old_filled == thr->in_pos) + mythread_cond_signal(&thr->cond); + + mythread_mutex_unlock(&thr->mutex); + + // Complete block filled or just in-buffer consumed + if (thr->in_filled == thr->in_block_size) { + + coder->sequence = SEQ_BLOCK_HEADER; + coder->thr_write = NULL; + } + return LZMA_OK; +} + +/// Initialize a new worker_thread structure and create a new thread. +static lzma_ret initialize_new_thread(struct lzma_stream_coder *coder, + const lzma_allocator *allocator) +{ + struct worker_thread *thr = &coder->threads[coder->threads_initialized]; + + memset(thr, 0, sizeof(struct worker_thread)); + + if (mythread_mutex_init(&thr->mutex)) + goto error_mutex; + + if (mythread_cond_init(&thr->cond)) + goto error_cond; + + thr->state = THR_IDLE; + thr->allocator = allocator; + thr->coder = coder; + thr->block_decoder = LZMA_NEXT_CODER_INIT; + + if (mythread_create(&thr->thread_id, worker_decoder, thr)) + goto error_thread; + + ++coder->threads_initialized; + coder->thr_write = thr; + + return LZMA_OK; + +error_thread: + mythread_cond_destroy(&thr->cond); + +error_cond: + mythread_mutex_destroy(&thr->mutex); + +error_mutex: + return LZMA_MEM_ERROR; +} + + +static lzma_ret get_thread(struct lzma_stream_coder *coder, + const lzma_allocator *allocator) +{ + if (!lzma_outq_has_buf(&coder->outq)) + return LZMA_OK; + + if (!lzma_outq_has_buf_prealloc(&coder->outq) && + coder->threads_initialized && + coder->exp_filter_size && coder->exp_block_size) { + size_t exp; + + // It is assumed that the archive consists of multiple + // blocks sharing the same filter and block settings. + // The in-block is extended over time to fit the current + // block. For accouting the worst case is assumed, that + // is compressed size = uncompressed size. + exp = coder->exp_filter_size; + exp += coder->exp_block_size; + exp += coder->exp_block_size; + + exp += coder->memusage; + exp += coder->outq.memusage; + + if (exp > coder->memlimit) + return LZMA_OK; + } + + // If there is a free structure on the stack, use it. + mythread_mutex_lock(&coder->mutex); + if (coder->threads_free != NULL) { + coder->thr_write = coder->threads_free; + coder->threads_free = coder->threads_free->next; + } + mythread_mutex_unlock(&coder->mutex); + + if (coder->thr_write == NULL) { + // If there are no uninitialized structures left, return. + if (coder->threads_initialized == coder->threads_max) + return LZMA_OK; + + // Initialize a new thread. + return_if_error(initialize_new_thread(coder, allocator)); + } + + mythread_mutex_lock(&coder->thr_write->mutex); + coder->thr_write->next = NULL; + + coder->thr_write->in_block_size = 0; + coder->thr_write->in_filled = 0; + coder->thr_write->in_pos = 0; + coder->thr_write->partial_update = false; + coder->thr_write->secret_progress = 0; + + memset(&coder->thr_write->block_options, 0, sizeof(lzma_block)); + coder->thr_write->state = THR_RUN; + mythread_mutex_unlock(&coder->thr_write->mutex); + + return LZMA_OK; +} + +static lzma_ret alloc_out_buffer(struct lzma_stream_coder *coder, + const lzma_allocator *allocator) +{ + struct worker_thread *thr; + size_t uncomp_size; + lzma_ret ret; + + thr = coder->thr_write; + + uncomp_size = thr->block_options.uncompressed_size; + + ret = lzma_outq_prealloc_buf(&coder->outq, allocator, + uncomp_size); + if (ret != LZMA_OK) + return ret; + + if (coder->exp_block_size < uncomp_size) + coder->exp_block_size = uncomp_size; + + thr->outbuf = lzma_outq_get_buf(&coder->outq, thr); + return LZMA_OK; +} + +static lzma_ret try_copy_decoded(struct lzma_stream_coder *coder, + const lzma_allocator *allocator, + uint8_t *restrict out, + size_t *restrict out_pos, + size_t out_size) +{ + lzma_ret ret; + + do { + lzma_vli unpadded_size; + lzma_vli uncompressed_size; + + mythread_mutex_lock(&coder->mutex); + + ret = coder->thread_error; + if (ret != LZMA_OK) { + mythread_mutex_unlock(&coder->mutex); + return ret; + } + + if (!lzma_outq_is_readable(&coder->outq)) { + mythread_mutex_unlock(&coder->mutex); + return LZMA_OK; + } + + ret = lzma_outq_read(&coder->outq, allocator, + out, out_pos, out_size, + &unpadded_size, + &uncompressed_size); + mythread_mutex_unlock(&coder->mutex); + + // block fully consumed + if (ret == LZMA_STREAM_END) { + + ret = lzma_index_hash_append(coder->index_hash, + unpadded_size, + uncompressed_size); + if (ret != LZMA_OK) + return ret; + lzma_outq_enable_partial_output(&coder->outq, + thr_do_partial_update); + } + + if (*out_pos == out_size) + return LZMA_OK; + + } while (1); +} + +static size_t comp_blk_size(struct lzma_stream_coder *coder, size_t size) +{ + return vli_ceil4(size) + lzma_check_size(coder->stream_flags.check); +} + +static lzma_ret +stream_decoder_reset(struct lzma_stream_coder *coder, const lzma_allocator *allocator) +{ + // Initialize the Index hash used to verify the Index. + coder->index_hash = lzma_index_hash_init(coder->index_hash, allocator); + if (coder->index_hash == NULL) + return LZMA_MEM_ERROR; + + // Reset the rest of the variables. + coder->sequence = SEQ_STREAM_HEADER; + coder->pos = 0; + + return LZMA_OK; +} + +static void cleanup_filters(struct worker_thread *thr, + const lzma_allocator *allocator) +{ + lzma_filter *filters; + size_t i; + + filters = thr->block_options.filters; + + for (i = 0; i < LZMA_FILTERS_MAX; ++i) + lzma_free(filters[i].options, allocator); + thr->block_options.filters = NULL; +} + +static lzma_ret wait_cond_progress(struct lzma_stream_coder *coder, + mythread_condtime *wait_abs) +{ + int ret; + + if (coder->timeout == 0) { + mythread_cond_wait(&coder->cond, &coder->mutex); + return LZMA_OK; + } + + ret = mythread_cond_timedwait(&coder->cond, &coder->mutex, wait_abs); + if (ret == 0) + return LZMA_OK; + + // ret == ETIMEDOUT + return LZMA_TIMED_OUT; +} + +static lzma_ret +stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator, + const uint8_t *restrict in, size_t *restrict in_pos, + size_t in_size, + uint8_t *restrict out, size_t *restrict out_pos, + size_t out_size, lzma_action action) +{ + struct lzma_stream_coder *coder = coder_ptr; + struct worker_thread *thr; + size_t start_out_pos = *out_pos; + mythread_condtime wait_abs; + lzma_ret ret; + + if (coder->timeout) + mythread_condtime_set(&wait_abs, &coder->cond, coder->timeout); + + while (true) + switch (coder->sequence) { + case SEQ_STREAM_HEADER: { + // Copy the Stream Header to the internal buffer. + lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos, + LZMA_STREAM_HEADER_SIZE); + + // Return if we didn't get the whole Stream Header yet. + if (coder->pos < LZMA_STREAM_HEADER_SIZE) + return LZMA_OK; + + coder->pos = 0; + + // Decode the Stream Header. + ret = lzma_stream_header_decode(&coder->stream_flags, + coder->buffer); + if (ret != LZMA_OK) + return ret == LZMA_FORMAT_ERROR && !coder->first_stream + ? LZMA_DATA_ERROR : ret; + + // If we are decoding concatenated Streams, and the later + // Streams have invalid Header Magic Bytes, we give + // LZMA_DATA_ERROR instead of LZMA_FORMAT_ERROR. + coder->first_stream = false; + + // Even if we return LZMA_*_CHECK below, we want + // to continue from Block Header decoding. + coder->sequence = SEQ_BLOCK_HEADER; + + // Detect if there's no integrity check or if it is + // unsupported if those were requested by the application. + if (coder->tell_no_check && coder->stream_flags.check + == LZMA_CHECK_NONE) + return LZMA_NO_CHECK; + + if (coder->tell_unsupported_check + && !lzma_check_is_supported(coder->stream_flags.check)) + return LZMA_UNSUPPORTED_CHECK; + + if (coder->tell_any_check) + return LZMA_GET_CHECK; + break; + } + + case SEQ_BLOCK_HEADER: { + if (*in_pos >= in_size) + return LZMA_OK; + + thr = coder->thr_write; + if (!thr) { +seq_blk_hdr_again: + ret = try_copy_decoded(coder, allocator, out, out_pos, + out_size); + if (ret != LZMA_OK) + return ret; + + ret = get_thread(coder, allocator); + if (ret != LZMA_OK) + return ret; + + if (!coder->thr_write) { + + // No out buffer but making progress? + if (start_out_pos != *out_pos) + return LZMA_OK; + + mythread_mutex_lock(&coder->mutex); + if (!lzma_outq_is_readable(&coder->outq)) + ret = wait_cond_progress(coder, &wait_abs); + + mythread_mutex_unlock(&coder->mutex); + + if (ret != LZMA_OK) + return ret; + goto seq_blk_hdr_again; + } + thr = coder->thr_write; + } + + if (coder->pos == 0) { + // Detect if it's Index. + if (in[*in_pos] == 0x00) { + coder->sequence = SEQ_INDEX; + break; + } + + // Calculate the size of the Block Header. Note that + // Block Header decoder wants to see this byte too + // so don't advance *in_pos. + thr->block_options.header_size = + lzma_block_header_size_decode(in[*in_pos]); + } + + // Copy the Block Header to the internal buffer. + + lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos, + thr->block_options.header_size); + + // Return if we didn't get the whole Block Header yet. + if (coder->pos < thr->block_options.header_size) + return LZMA_OK; + + coder->pos = 0; + + // Version 1 is needed to support the .ignore_check option. + thr->block_options.version = 1; + + // Set up a buffer to hold the filter chain. Block Header + // decoder will initialize all members of this array so + // we don't need to do it here. + lzma_filter filters[LZMA_FILTERS_MAX + 1]; + thr->block_options.filters = filters; + + // Copy the type of the Check so that Block Header and Block + // decoders see it. + thr->block_options.check = coder->stream_flags.check; + + // Decode the Block Header. + ret = lzma_block_header_decode(&thr->block_options, allocator, + coder->buffer); + if (ret != LZMA_OK) + return ret; + + // If LZMA_IGNORE_CHECK was used, this flag needs to be set. + // It has to be set after lzma_block_header_decode() because + // it always resets this to false. + thr->block_options.ignore_check = coder->ignore_check; + + // Check the memory usage limit. + const uint64_t memusage = lzma_raw_decoder_memusage(filters); + + if (memusage == UINT64_MAX) { + // One or more unknown Filter IDs. + + cleanup_filters(thr, allocator); + return LZMA_OPTIONS_ERROR; + } + + // The memusage for each thread should be the same. Check the + // memory requirements only for the first block/thread. The + // expected memory consumption vs limits will be checked by + // get_thread() before creating a new thread. + if (coder->threads_initialized == 1 && !thr->filter_size) { + if (coder->memusage + memusage > coder->memlimit) { + + switch (coder->memory_limit) { + case LZMA_MEMLIMIT_COMPLETE: + cleanup_filters(thr, allocator); + return LZMA_MEMLIMIT_ERROR; + + case LZMA_MEMLIMIT_THREAD: + coder->threads_max = 1; + break; + + case LZMA_MEMLIMIT_NO_THREAD: + coder->threads_max = 1; + coder->direct_decomp = true; + break; + } + } + } + + if (coder->exp_filter_size < memusage) + coder->exp_filter_size = memusage; + + if (thr->filter_size != memusage) { + coder->memusage -= thr->filter_size; + coder->memusage += memusage; + thr->filter_size = memusage; + } + // Memory usage is OK. + // Initialize the Block decoder. + ret = lzma_block_decoder_init(&thr->block_decoder, allocator, + &thr->block_options); + + // Free the allocated filter options since they are needed + // only to initialize the Block decoder. + cleanup_filters(thr, allocator); + + // Check if memory usage calculation and Block encoder + // initialization succeeded. + if (ret != LZMA_OK) + return ret; + + if (coder->direct_decomp) { + ; + } else if (thr->block_options.compressed_size == LZMA_VLI_UNKNOWN || + thr->block_options.uncompressed_size == LZMA_VLI_UNKNOWN) { + + // Happens if the previous block header had sizes + // encoded but one of the following block header does + // not. + if (coder->threads_initialized != 1) + return LZMA_PROG_ERROR; + + coder->direct_decomp = true; + } else { + thr->in_block_size = comp_blk_size(coder, thr->block_options.compressed_size); + + if (coder->threads_initialized == 1) { + uint64_t blk_mem; + + blk_mem = thr->in_block_size; + blk_mem += thr->block_options.uncompressed_size; + + if (coder->memusage + blk_mem > coder->memlimit) { + switch (coder->memory_limit) { + case LZMA_MEMLIMIT_THREAD: + coder->threads_max = 1; + break; + + case LZMA_MEMLIMIT_COMPLETE: + case LZMA_MEMLIMIT_NO_THREAD: + coder->threads_max = 1; + coder->direct_decomp = true; + break; + } + } + } + } + + if (!coder->direct_decomp) { + ret = alloc_out_buffer(coder, allocator); + if (ret != LZMA_OK) + return ret; + + if (thr->in_size < thr->in_block_size) { + size_t mem_size; + + lzma_free(thr->in, allocator); + + mem_size = my_max(thr->in_block_size, + coder->exp_block_size); + + thr->in = lzma_alloc(mem_size, allocator); + if (!thr->in) + return LZMA_MEM_ERROR; + + coder->memusage -= thr->in_size; + coder->memusage += coder->exp_block_size; + thr->in_size = mem_size; + } + lzma_outq_enable_partial_output(&coder->outq, + thr_do_partial_update); + } + + coder->sequence = SEQ_BLOCK; + break; + + case SEQ_BLOCK: + thr = coder->thr_write; + + if (coder->direct_decomp) { + ret = thr->block_decoder.code(thr->block_decoder.coder, + thr->allocator, + in, in_pos, in_size, + out, out_pos, out_size, + action); + if (ret != LZMA_STREAM_END) + return ret; + + // Block decoded successfully. Add the new size pair to + // the Index hash. + ret = lzma_index_hash_append(coder->index_hash, + lzma_block_unpadded_size(&thr->block_options), + thr->block_options.uncompressed_size); + if (ret != LZMA_OK) + return ret; + + coder->sequence = SEQ_BLOCK_HEADER; + break; + } + + ret = try_copy_decoded(coder, allocator, out, out_pos, out_size); + if (ret != LZMA_OK) + return ret; + + ret = stream_decode_in(coder, in, in_pos, in_size); + if (ret != LZMA_OK) { + threads_stop(coder); + return ret; + } + + if ((*in_pos >= in_size) || (*out_pos >= out_size)) + return LZMA_OK; + + break; + } + + case SEQ_INDEX: { + // If we don't have any input, don't call + // lzma_index_hash_decode() since it would return + // LZMA_BUF_ERROR, which we must not do here. + if (*in_pos >= in_size) + return LZMA_OK; + + // first flush all worker threads, so the accounting of decoded + // blocks matches index's expectation. + while (!lzma_outq_is_empty(&coder->outq)) { + ret = try_copy_decoded(coder, allocator, out, out_pos, + out_size); + if (ret != LZMA_OK) + return ret; + + if (*out_pos >= out_size) + return LZMA_OK; + + if (lzma_outq_is_empty(&coder->outq)) + break; + + mythread_mutex_lock(&coder->mutex); + if (!lzma_outq_is_readable(&coder->outq)) + ret = wait_cond_progress(coder, &wait_abs); + mythread_mutex_unlock(&coder->mutex); + + if (ret != LZMA_OK) + return ret; + } + + // Decode the Index and compare it to the hash calculated + // from the sizes of the Blocks (if any). + ret = lzma_index_hash_decode(coder->index_hash, in, in_pos, + in_size); + if (ret != LZMA_STREAM_END) + return ret; + coder->sequence = SEQ_STREAM_FOOTER; + break; + } + + case SEQ_STREAM_FOOTER: { + + lzma_bufcpy(in, in_pos, in_size, coder->buffer, &coder->pos, + LZMA_STREAM_HEADER_SIZE); + + // Return if we didn't get the whole Stream Footer yet. + if (coder->pos < LZMA_STREAM_HEADER_SIZE) + return LZMA_OK; + + coder->pos = 0; + // Decode the Stream Footer. The decoder gives + // LZMA_FORMAT_ERROR if the magic bytes don't match, + // so convert that return code to LZMA_DATA_ERROR. + lzma_stream_flags footer_flags; + ret = lzma_stream_footer_decode(&footer_flags, coder->buffer); + if (ret != LZMA_OK) + return ret == LZMA_FORMAT_ERROR + ? LZMA_DATA_ERROR : ret; + + // Check that Index Size stored in the Stream Footer matches + // the real size of the Index field. + if (lzma_index_hash_size(coder->index_hash) + != footer_flags.backward_size) + return LZMA_DATA_ERROR; + + // Compare that the Stream Flags fields are identical in + // both Stream Header and Stream Footer. + ret = lzma_stream_flags_compare(&coder->stream_flags, &footer_flags); + if (ret != LZMA_OK) + return ret; + + if (!coder->concatenated) + return LZMA_STREAM_END; + coder->sequence = SEQ_STREAM_PADDING; + break; + } + + case SEQ_STREAM_PADDING: { + + // Skip over possible Stream Padding. + while (true) { + if (*in_pos >= in_size) { + // Unless LZMA_FINISH was used, we cannot + // know if there's more input coming later. + if (action != LZMA_FINISH) { + return LZMA_OK; + } + + // Stream Padding must be a multiple of + // four bytes. + return coder->pos == 0 + ? LZMA_STREAM_END + : LZMA_DATA_ERROR; + } + + // If the byte is not zero, it probably indicates + // beginning of a new Stream (or the file is corrupt). + if (in[*in_pos] != 0x00) + break; + + ++*in_pos; + coder->pos = (coder->pos + 1) & 3; + } + + // Stream Padding must be a multiple of four bytes (empty + // Stream Padding is OK). + if (coder->pos != 0) { + ++*in_pos; + return LZMA_DATA_ERROR; + } + + // Prepare to decode the next Stream. + return_if_error(stream_decoder_reset(coder, allocator)); + break; + } + + default: + return LZMA_PROG_ERROR; + } + return LZMA_PROG_ERROR; +} + +static lzma_check stream_decoder_mt_get_check(const void *coder_ptr) +{ + const struct lzma_stream_coder *coder = coder_ptr; + return coder->stream_flags.check; +} + +static lzma_ret stream_decoder_mt_memconfig(void *coder_ptr, uint64_t *memusage, + uint64_t *old_memlimit, + uint64_t new_memlimit) +{ + struct lzma_stream_coder *coder = coder_ptr; + + *memusage = coder->memusage + coder->outq.memusage; + *old_memlimit = coder->memlimit; + + if (new_memlimit != 0) { + if (new_memlimit < coder->memusage + coder->outq.memusage) + return LZMA_MEMLIMIT_ERROR; + + coder->memlimit = new_memlimit; + } + + return LZMA_OK; +} + +static lzma_ret +stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, + const lzma_mt *options, lzma_memlimit_opt memory_limit) +{ + struct lzma_stream_coder *coder; + + if (options->threads == 0 || options->threads > LZMA_THREADS_MAX) + return LZMA_OPTIONS_ERROR; + if (options->flags & ~LZMA_SUPPORTED_FLAGS) + return LZMA_OPTIONS_ERROR; + + switch (memory_limit) { + case LZMA_MEMLIMIT_COMPLETE: + case LZMA_MEMLIMIT_THREAD: + case LZMA_MEMLIMIT_NO_THREAD: + break; + default: + return LZMA_OPTIONS_ERROR; + } + + lzma_next_coder_init(&stream_decoder_mt_init, next, allocator); + + coder = next->coder; + if (!coder) { + coder = lzma_alloc(sizeof(struct lzma_stream_coder), allocator); + if (coder == NULL) + return LZMA_MEM_ERROR; + + memset(coder, 0xff, sizeof(struct lzma_stream_coder)); + memzero(&coder->outq, sizeof(coder->outq)); + + if (mythread_mutex_init(&coder->mutex)) + goto err_out; + + if (mythread_cond_init(&coder->cond)) { + mythread_mutex_destroy(&coder->mutex); + goto err_out; + } + + next->coder = coder; + + next->code = stream_decode_mt; + next->end = stream_decoder_mt_end; + next->get_check = stream_decoder_mt_get_check; + next->memconfig = &stream_decoder_mt_memconfig; + + next->get_progress = NULL; + + coder->index_hash = NULL; + coder->threads_max = 0; + coder->threads_initialized = 0; + } + + coder->timeout = options->timeout; + coder->sequence = SEQ_STREAM_HEADER; + coder->thread_error = LZMA_OK; + + coder->memlimit = my_max(1, options->memlimit); + coder->memusage = LZMA_MEMUSAGE_BASE; + + coder->tell_no_check = options->flags & LZMA_TELL_NO_CHECK; + coder->tell_unsupported_check = options->flags & LZMA_TELL_UNSUPPORTED_CHECK; + coder->tell_any_check = options->flags & LZMA_TELL_ANY_CHECK; + coder->ignore_check = options->flags & LZMA_IGNORE_CHECK; + coder->concatenated = options->flags & LZMA_CONCATENATED; + coder->first_stream = true; + coder->direct_decomp = false; + coder->memory_limit = memory_limit; + coder->exp_filter_size = 0; + coder->exp_block_size = 0; + coder->pos = 0; + + memset(&coder->stream_flags, 0, sizeof(lzma_stream_flags)); + + // By allocating threads from scratch we can start memory-usage + // accounting from scratch, too. Changes in filter and block sizes may + // affect number of threads. We don't keep possible larger-than-needed + // in buffer (if the block size decreased) and have only one thread + // in case this stream has no block sizes (and `direct_decomp' expects + // no threads to keep it simple). + if (coder->threads_max) { + coder->threads_max = 0; + threads_end(coder, allocator); + } + + coder->threads_free = NULL; + coder->thr_write = NULL; + + coder->threads = lzma_alloc(options->threads * sizeof(struct worker_thread), + allocator); + if (coder->threads == NULL) + goto err_out; + + coder->threads_max = options->threads; + + return_if_error(lzma_outq_init(&coder->outq, allocator, + options->threads)); + + return stream_decoder_reset(coder, allocator); + +err_out: + lzma_free(coder->threads, allocator); + lzma_free(coder, allocator); + return LZMA_MEM_ERROR; +} + +extern LZMA_API(lzma_ret) +lzma_stream_decoder_mt(lzma_stream *strm, const lzma_mt *options, + lzma_memlimit_opt mem_limit) +{ + lzma_next_strm_init(stream_decoder_mt_init, strm, options, mem_limit); + + strm->internal->supported_actions[LZMA_RUN] = true; + strm->internal->supported_actions[LZMA_FINISH] = true; + + return LZMA_OK; +} diff --git a/src/liblzma/liblzma.map b/src/liblzma/liblzma.map index 251ef0225c322..664bdb291c834 100644 --- a/src/liblzma/liblzma.map +++ b/src/liblzma/liblzma.map @@ -109,6 +109,7 @@ XZ_5.3.1alpha { lzma_erofs_decoder; lzma_erofs_encoder; lzma_file_info_decoder; + lzma_stream_decoder_mt; local: *; diff --git a/src/xz/coder.c b/src/xz/coder.c index 85f954393d8bf..f8d1f6c35ea8e 100644 --- a/src/xz/coder.c +++ b/src/xz/coder.c @@ -51,7 +51,7 @@ static lzma_check check; /// This becomes false if the --check=CHECK option is used. static bool check_default = true; -#if defined(HAVE_ENCODERS) && defined(MYTHREAD_ENABLED) +#if (defined(HAVE_ENCODERS) || defined(HAVE_DECODERS)) && defined(MYTHREAD_ENABLED) static lzma_mt mt_options = { .flags = 0, .timeout = 300, @@ -59,6 +59,94 @@ static lzma_mt mt_options = { }; #endif +#if defined(HAVE_DECODERS) && defined(MYTHREAD_ENABLED) + +# ifdef __linux__ +#include <sys/stat.h> +#include <fcntl.h> + +/* + * An estimate of how much memory is available. Swap will not be used, the page + * cache may be purged, not everything will be reclaimed what might be + * reclaimed, watermarks are considers. + */ +static char str_MemAvailable[] = "MemAvailable"; + +static int32_t get_free_mem(uint64_t *val) +{ + char buf[4096]; + char *str; + ssize_t bytes; + int fd; + + *val = 0; + + fd = open("/proc/meminfo", O_RDONLY); + if (fd < 0) + return -1; + + bytes = read(fd, buf, sizeof(buf)); + close(fd); + + if (bytes <= 0) + return -1; + + buf[bytes] = '\0'; + + str = buf; + while (1) { + char *end; + + end = strchr(str, ':'); + if (!end) + break; + if ((end - str) == sizeof(str_MemAvailable) - 1) { + if (!strncmp(str, str_MemAvailable, + sizeof(str_MemAvailable) - 1)) { + uint64_t num; + + str = end + 1; + num = strtoull(str, &end, 10); + if (!num) + return -1; + if (num == ULLONG_MAX) + return -1; + /* it should end with ' kb\n' */ + if (*end != ' ') + return -1; +#if SIZE_MAX == UINT32_MAX + /* + * On 32bit architectures limit to 2.5GiB even + * if more is possible due to the limited + * address space. + */ + if (num > 2621440) + num = 2621440; +#endif + + num *= 1024; + *val = num; + return 0; + } + } + + end = strchr(end + 1, '\n'); + if (!end) + break; + str = end + 1; + } + return -1; +} + +# else + +static int32_t get_free_mem(uint64_t *val) +{ + return -1; +} + +# endif +#endif extern void coder_set_check(lzma_check new_check) @@ -520,9 +608,23 @@ coder_init(file_pair *pair) break; case FORMAT_XZ: - ret = lzma_stream_decoder(&strm, - hardware_memlimit_get( - MODE_DECOMPRESS), flags); + if (hardware_threads_get() > 1) { + mt_options.threads = hardware_threads_get(); + mt_options.flags = flags; + mt_options.memlimit = hardware_memlimit_get(MODE_DECOMPRESS); + if (mt_options.memlimit == UINT64_MAX) { + uint64_t free_mem; + + if (get_free_mem(&free_mem) == 0) + mt_options.memlimit = free_mem; + } + ret = lzma_stream_decoder_mt(&strm, &mt_options, + LZMA_MEMLIMIT_THREAD); + } else { + ret = lzma_stream_decoder(&strm, + hardware_memlimit_get( + MODE_DECOMPRESS), flags); + } break; case FORMAT_LZMA: -- 2.30.0