On 2024/2/25 22:27, Yifan Zhao wrote:
Currently, the creation of EROFS compressed image creation is
single-threaded, which suffers from performance issues. This patch
attempts to address it by compressing the large file in parallel.
Specifically, each input file larger than 16MB is splited into segments,
and each worker thread compresses a segment as if it were a separate
file. Finally, the main thread merges all the compressed segments.
Multi-threaded compression is not compatible with -Ededupe,
-E(all-)fragments and -Eztailpacking for now.
Signed-off-by: Yifan Zhao <[email protected]>
Co-authored-by: Tong Xin <[email protected]>
---
include/erofs/compress.h | 1 +
lib/compress.c | 690 ++++++++++++++++++++++++++++++++-------
lib/compressor.c | 2 +
3 files changed, 575 insertions(+), 118 deletions(-)
diff --git a/include/erofs/compress.h b/include/erofs/compress.h
index 046640b..2699334 100644
--- a/include/erofs/compress.h
+++ b/include/erofs/compress.h
@@ -15,6 +15,7 @@ extern "C"
#include "internal.h"
#define EROFS_CONFIG_COMPR_MAX_SZ (4000 * 1024)
+#define EROFS_COMPR_QUEUE_SZ (EROFS_CONFIG_COMPR_MAX_SZ * 2)
void z_erofs_drop_inline_pcluster(struct erofs_inode *inode);
int erofs_write_compressed_file(struct erofs_inode *inode, int fd);
diff --git a/lib/compress.c b/lib/compress.c
index 9611102..f98feae 100644
--- a/lib/compress.c
+++ b/lib/compress.c
@@ -8,6 +8,9 @@
#ifndef _LARGEFILE64_SOURCE
#define _LARGEFILE64_SOURCE
#endif
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
@@ -20,6 +23,16 @@
#include "erofs/block_list.h"
#include "erofs/compress_hints.h"
#include "erofs/fragments.h"
+#ifdef EROFS_MT_ENABLED
+#include "erofs/workqueue.h"
+#endif
+#ifdef HAVE_LINUX_FALLOC_H
+#include <linux/falloc.h>
+#endif
+
+#if defined(HAVE_FALLOCATE) && defined(FALLOC_FL_PUNCH_HOLE)
+#define USE_PER_WORKER_TMPFILE 1
+#endif
/* compressing configuration specified by users */
struct erofs_compress_cfg {
@@ -33,29 +46,84 @@ struct z_erofs_extent_item {
struct z_erofs_inmem_extent e;
};
+struct z_erofs_file_compress_ctx {
struct z_erofs_compressed_inode_ctx would be better
+ struct erofs_inode *inode;
+ int fd;
+ unsigned int pclustersize;
+
+ u32 tof_chksum;
+ bool fix_dedupedfrag;
+ bool fragemitted;
+};
+
struct z_erofs_vle_compress_ctx {
I think we'd better to rename this as
struct z_erofs_compressed_segment_ctx
- u8 queue[EROFS_CONFIG_COMPR_MAX_SZ * 2];
+ struct z_erofs_file_compress_ctx *fctx;
+
+ u8 *queue;
struct list_head extents;
struct z_erofs_extent_item *pivot;
- struct erofs_inode *inode;
- struct erofs_compress_cfg *ccfg;
+ struct erofs_compress *chandle;
+ char *destbuf;
- u8 *metacur;
unsigned int head, tail;
erofs_off_t remaining;
- unsigned int pclustersize;
erofs_blk_t blkaddr; /* pointing to the next blkaddr */
+ erofs_blk_t compressed_blocks;
u16 clusterofs;
- u32 tof_chksum;
- bool fix_dedupedfrag;
- bool fragemitted;
+ int seg_num, seg_idx;
+ FILE *tmpfile;
+ off_t tmpfile_off;
+};
+
+struct z_erofs_write_index_ctx {
why we need this structure, I'd like to fold it in
struct z_erofs_compressed_inode_ctx.
+ struct erofs_inode *inode;
+ struct list_head *extents;
+ u16 clusterofs;
+ erofs_blk_t blkaddr, blkoff;
I don't like this approach, let's just fix
extents->blkaddr in a loop together.
+ u8 *metacur;
};
+#ifdef EROFS_MT_ENABLED
+struct erofs_compress_wq_private {
+ bool init;
+ u8 *queue;
+ char *destbuf;
+ struct erofs_compress_cfg *ccfg;
+ FILE* tmpfile;
+};
+
+struct erofs_compress_work {
+ /* Note: struct erofs_work must be the first member */
+ struct erofs_work work;
+ struct z_erofs_vle_compress_ctx ctx;
+
+ unsigned int alg_id;
+ char *alg_name;
+ unsigned int comp_level;
+ unsigned int dict_size;
+
+ int ret;
+
+ struct erofs_compress_work *next;
+};
+
+static struct {
+ struct erofs_workqueue wq;
+ struct erofs_compress_work *idle;
Does this need a mutex protection?
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ int nfini;
+} z_erofs_mt_ctrl;
+#endif
+
+static bool z_erofs_mt_enabled;
+static u8 *z_erofs_global_queue;
+
#define Z_EROFS_LEGACY_MAP_HEADER_SIZE Z_EROFS_FULL_INDEX_ALIGN(0)
-static void z_erofs_write_indexes_final(struct z_erofs_vle_compress_ctx *ctx)
+static void z_erofs_write_indexes_final(struct z_erofs_write_index_ctx *ctx)
{
const unsigned int type = Z_EROFS_LCLUSTER_TYPE_PLAIN;
struct z_erofs_lcluster_index di;
@@ -71,7 +139,7 @@ static void z_erofs_write_indexes_final(struct
z_erofs_vle_compress_ctx *ctx)
ctx->metacur += sizeof(di);
}
-static void z_erofs_write_extent(struct z_erofs_vle_compress_ctx *ctx,
+static void z_erofs_write_extent(struct z_erofs_write_index_ctx *ctx,
struct z_erofs_inmem_extent *e)
{
struct erofs_inode *inode = ctx->inode;
@@ -99,10 +167,15 @@ static void z_erofs_write_extent(struct
z_erofs_vle_compress_ctx *ctx,
di.di_advise = cpu_to_le16(advise);
if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL &&
- !e->compressedblks)
+ !e->compressedblks) {
di.di_u.blkaddr = cpu_to_le32(inode->fragmentoff >> 32);
- else
+ } else if (z_erofs_mt_enabled) {
+ di.di_u.blkaddr =
+ cpu_to_le32(ctx->blkaddr + ctx->blkoff);
+ ctx->blkoff += e->compressedblks;
so we don't need this at all.
+ } else {
di.di_u.blkaddr = cpu_to_le32(e->blkaddr);
+ }
memcpy(ctx->metacur, &di, sizeof(di));
ctx->metacur += sizeof(di);
@@ -144,10 +217,15 @@ static void z_erofs_write_extent(struct
z_erofs_vle_compress_ctx *ctx,
Z_EROFS_LCLUSTER_TYPE_HEAD1;
if (inode->datalayout == EROFS_INODE_COMPRESSED_FULL &&
- !e->compressedblks)
+ !e->compressedblks) {
di.di_u.blkaddr = cpu_to_le32(inode->fragmentoff >> 32);
- else
+ } else if (z_erofs_mt_enabled) {
+ di.di_u.blkaddr =
+ cpu_to_le32(ctx->blkaddr + ctx->blkoff);
+ ctx->blkoff += e->compressedblks;
same here.
+ } else {
di.di_u.blkaddr = cpu_to_le32(e->blkaddr);
+ }
if (e->partial) {
DBG_BUGON(e->raw);
@@ -170,12 +248,12 @@ static void z_erofs_write_extent(struct
z_erofs_vle_compress_ctx *ctx,
ctx->clusterofs = clusterofs + count;
}
-static void z_erofs_write_indexes(struct z_erofs_vle_compress_ctx *ctx)
+static void z_erofs_write_indexes(struct z_erofs_write_index_ctx *ctx)
{
struct z_erofs_extent_item *ei, *n;
ctx->clusterofs = 0;
- list_for_each_entry_safe(ei, n, &ctx->extents, list) {
+ list_for_each_entry_safe(ei, n, ctx->extents, list) {
z_erofs_write_extent(ctx, &ei->e);
list_del(&ei->list);
@@ -188,11 +266,12 @@ static bool z_erofs_need_refill(struct
z_erofs_vle_compress_ctx *ctx)
{
const bool final = !ctx->remaining;
unsigned int qh_aligned, qh_after;
+ struct erofs_inode *inode = ctx->fctx->inode;
if (final || ctx->head < EROFS_CONFIG_COMPR_MAX_SZ)
return false;
- qh_aligned = round_down(ctx->head, erofs_blksiz(ctx->inode->sbi));
+ qh_aligned = round_down(ctx->head, erofs_blksiz(inode->sbi));
qh_after = ctx->head - qh_aligned;
memmove(ctx->queue, ctx->queue + qh_aligned, ctx->tail - qh_aligned);
ctx->tail -= qh_aligned;
@@ -212,14 +291,13 @@ static void z_erofs_commit_extent(struct
z_erofs_vle_compress_ctx *ctx,
list_add_tail(&ei->list, &ctx->extents);
ctx->clusterofs = (ctx->clusterofs + ei->e.length) &
- (erofs_blksiz(ctx->inode->sbi) - 1);
-
+ (erofs_blksiz(ctx->fctx->inode->sbi) - 1);
}
static int z_erofs_compress_dedupe(struct z_erofs_vle_compress_ctx *ctx,
unsigned int *len)
{
- struct erofs_inode *inode = ctx->inode;
+ struct erofs_inode *inode = ctx->fctx->inode;
const unsigned int lclustermask = (1 << inode->z_logical_clusterbits) - 1;
struct erofs_sb_info *sbi = inode->sbi;
struct z_erofs_extent_item *ei = ctx->pivot;
@@ -318,13 +396,14 @@ out:
static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx,
unsigned int len, char *dst)
{
- struct erofs_sb_info *sbi = ctx->inode->sbi;
+ struct erofs_inode *inode = ctx->fctx->inode;
+ struct erofs_sb_info *sbi = inode->sbi;
unsigned int count = min(erofs_blksiz(sbi), len);
unsigned int interlaced_offset, rightpart;
int ret;
/* write interlaced uncompressed data if needed */
- if (ctx->inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
+ if (inode->z_advise & Z_EROFS_ADVISE_INTERLACED_PCLUSTER)
interlaced_offset = ctx->clusterofs;
else
interlaced_offset = 0;
@@ -335,11 +414,19 @@ static int write_uncompressed_extent(struct
z_erofs_vle_compress_ctx *ctx,
memcpy(dst + interlaced_offset, ctx->queue + ctx->head, rightpart);
memcpy(dst, ctx->queue + ctx->head + rightpart, count - rightpart);
- erofs_dbg("Writing %u uncompressed data to block %u",
- count, ctx->blkaddr);
- ret = blk_write(sbi, dst, ctx->blkaddr, 1);
- if (ret)
- return ret;
+ if (ctx->tmpfile) {
+ erofs_dbg("Writing %u uncompressed data to tmpfile", count);
+ ret = fwrite(dst, erofs_blksiz(sbi), 1, ctx->tmpfile);
+ if (ret != 1)
+ return -EIO;
+ fflush(ctx->tmpfile);
+ } else {
+ erofs_dbg("Writing %u uncompressed data to block %u", count,
+ ctx->blkaddr);
+ ret = blk_write(sbi, dst, ctx->blkaddr, 1);
+ if (ret)
+ return ret;
+ }
return count;
}
@@ -384,8 +471,8 @@ static void tryrecompress_trailing(struct
z_erofs_vle_compress_ctx *ctx,
void *in, unsigned int *insize,
void *out, unsigned int *compressedsize)
{
- struct erofs_sb_info *sbi = ctx->inode->sbi;
- static char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
+ struct erofs_sb_info *sbi = ctx->fctx->inode->sbi;
+ char tmp[Z_EROFS_PCLUSTER_MAX_SIZE];
does tryrecompress_trailing() work? if it doesn't work,
let's leave the old code as-is.
unsigned int count;
int ret = *compressedsize;
@@ -409,7 +496,7 @@ static void tryrecompress_trailing(struct
z_erofs_vle_compress_ctx *ctx,
static bool z_erofs_fixup_deduped_fragment(struct z_erofs_vle_compress_ctx
*ctx,
unsigned int len)
{
- struct erofs_inode *inode = ctx->inode;
+ struct erofs_inode *inode = ctx->fctx->inode;
struct erofs_sb_info *sbi = inode->sbi;
const unsigned int newsize = ctx->remaining + len;
@@ -417,9 +504,10 @@ static bool z_erofs_fixup_deduped_fragment(struct
z_erofs_vle_compress_ctx *ctx,
/* try to fix again if it gets larger (should be rare) */
if (inode->fragment_size < newsize) {
- ctx->pclustersize = min_t(erofs_off_t,
z_erofs_get_max_pclustersize(inode),
- roundup(newsize - inode->fragment_size,
- erofs_blksiz(sbi)));
+ ctx->fctx->pclustersize =
+ min_t(erofs_off_t, z_erofs_get_max_pclustersize(inode),
+ roundup(newsize - inode->fragment_size,
+ erofs_blksiz(sbi)));
return false;
}
@@ -439,26 +527,31 @@ static bool z_erofs_fixup_deduped_fragment(struct
z_erofs_vle_compress_ctx *ctx,
static int __z_erofs_compress_one(struct z_erofs_vle_compress_ctx *ctx,
struct z_erofs_inmem_extent *e)
{
- static char dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
- struct erofs_inode *inode = ctx->inode;
+ static char
+ global_dstbuf[EROFS_CONFIG_COMPR_MAX_SZ + EROFS_MAX_BLOCK_SIZE];
+ char *dstbuf = ctx->destbuf ? ctx->destbuf : global_dstbuf;
char *dstbuf = ctx->destbuf ? : global_dstbuf;