Currently, multi-threaded mkfs.erofs creates tmpfiles for each segment to store the intermediate compression result, reaching the limit of open files when the number of segments is large.
This patch uses per-worker tmpfiles to avoid this problem if possible, i.e., the environment supports the fallocate() syscall and FALLOC_FL_PUNCH_HOLE flag. Signed-off-by: Yifan Zhao <[email protected]> --- lib/compress.c | 68 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/lib/compress.c b/lib/compress.c index d5a5f16..3fae260 100644 --- a/lib/compress.c +++ b/lib/compress.c @@ -8,6 +8,9 @@ #ifndef _LARGEFILE64_SOURCE #define _LARGEFILE64_SOURCE #endif +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif #include <string.h> #include <stdlib.h> #include <unistd.h> @@ -23,6 +26,13 @@ #ifdef EROFS_MT_ENABLED #include "erofs/workqueue.h" #endif +#ifdef HAVE_LINUX_FALLOC_H +#include <linux/falloc.h> +#endif + +#if defined(HAVE_FALLOCATE) && defined(FALLOC_FL_PUNCH_HOLE) +#define USE_PER_WORKER_TMPFILE 1 +#endif /* compressing configuration specified by users */ struct erofs_compress_cfg { @@ -59,6 +69,7 @@ struct z_erofs_vle_compress_ctx { int seg_num, seg_idx; FILE *tmpfile; + off_t tmpfile_off; }; struct z_erofs_write_index_ctx { @@ -75,6 +86,7 @@ struct erofs_compress_wq_private { u8 *queue; char *destbuf; struct erofs_compress_cfg *ccfg; + FILE* tmpfile; }; struct erofs_compress_work { @@ -402,6 +414,7 @@ static int write_uncompressed_extent(struct z_erofs_vle_compress_ctx *ctx, ret = fwrite(dst, erofs_blksiz(sbi), 1, ctx->tmpfile); if (ret != 1) return -EIO; + fflush(ctx->tmpfile); } else { erofs_dbg("Writing %u uncompressed data to block %u", count, ctx->blkaddr); @@ -1073,6 +1086,7 @@ void z_erofs_init_ctx(struct z_erofs_vle_compress_ctx *ctx, ctx->tof_chksum = tof_chksum; ctx->fd = fd; ctx->tmpfile = NULL; + ctx->tmpfile_off = 0; init_list_head(&ctx->extents); } @@ -1169,7 +1183,7 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi, struct erofs_compress_cfg *lc; int ret; - if (!priv->init) { + if (unlikely(!priv->init)) { priv->init = true; priv->queue = malloc(EROFS_COMPR_QUEUE_SZ); @@ -1185,6 +1199,16 @@ int z_erofs_mt_private_init(struct erofs_sb_info *sbi, sizeof(struct erofs_compress_cfg)); if (!priv->ccfg) return -ENOMEM; + +#ifdef USE_PER_WORKER_TMPFILE +#ifndef HAVE_TMPFILE64 + priv->tmpfile = tmpfile(); +#else + priv->tmpfile = tmpfile64(); +#endif + if (!priv->tmpfile) + return -errno; +#endif } lc = &priv->ccfg[alg_id]; @@ -1214,6 +1238,9 @@ void z_erofs_mt_private_fini(void *private) free(priv->ccfg); free(priv->destbuf); free(priv->queue); +#ifdef USE_PER_WORKER_TMPFILE + fclose(priv->tmpfile); +#endif priv->init = false; } } @@ -1237,24 +1264,30 @@ void z_erofs_mt_work(struct erofs_work *work) ctx->queue = priv->queue; ctx->destbuf = priv->destbuf; ctx->chandle = &priv->ccfg[cwork->alg_id].handle; - +#ifdef USE_PER_WORKER_TMPFILE + ctx->tmpfile = priv->tmpfile; + ctx->tmpfile_off = ftell(ctx->tmpfile); + if (ctx->tmpfile_off == -1) { + ret = -errno; + goto out; + } +#else #ifdef HAVE_TMPFILE64 ctx->tmpfile = tmpfile64(); #else ctx->tmpfile = tmpfile(); #endif - if (!ctx->tmpfile) { ret = -errno; goto out; } + ctx->tmpfile_off = 0; +#endif ret = z_erofs_compress_file(ctx, offset, blkaddr); if (ret) goto out; - fflush(ctx->tmpfile); - if (ctx->seg_idx == ctx->seg_num - 1) ret = z_erofs_handle_fragments(ctx); @@ -1274,6 +1307,7 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr, struct erofs_sb_info *sbi = cur->ctx.inode->sbi; struct z_erofs_write_index_ctx *ictx = cfile->ictx; char *memblock = NULL; + size_t size = 0; int ret = 0, lret; while (cur != NULL) { @@ -1290,28 +1324,32 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr, goto out; } - memblock = realloc(memblock, - ctx->compressed_blocks * erofs_blksiz(sbi)); + size = ctx->compressed_blocks * erofs_blksiz(sbi); + memblock = realloc(memblock, size); if (!memblock) { if (!ret) ret = -ENOMEM; goto out; } - lret = fseek(ctx->tmpfile, 0, SEEK_SET); - if (lret) { + lret = pread(fileno(ctx->tmpfile), memblock, size, + ctx->tmpfile_off); + if (lret != size) { if (!ret) - ret = lret; + ret = -errno; goto out; } - lret = fread(memblock, erofs_blksiz(sbi), - ctx->compressed_blocks, ctx->tmpfile); - if (lret != ctx->compressed_blocks) { +#ifdef USE_PER_WORKER_TMPFILE + lret = fallocate(fileno(ctx->tmpfile), + FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, + ctx->tmpfile_off, size); + if (lret) { if (!ret) - ret = -EIO; + ret = -errno; goto out; } +#endif lret = blk_write(sbi, memblock, blkaddr + *compressed_blocks, ctx->compressed_blocks); @@ -1323,7 +1361,9 @@ int z_erofs_mt_merge(struct erofs_compress_file *cfile, erofs_blk_t blkaddr, *compressed_blocks += ctx->compressed_blocks; out: +#ifndef USE_PER_WORKER_TMPFILE fclose(ctx->tmpfile); +#endif tmp = cur->next; pthread_mutex_lock(&work_mutex); -- 2.43.2
