Skip to content

Commit e49a490

Browse files
committed
[Issue #228]: improve page header map bufferization and share descriptors between threads
1 parent 09e87a5 commit e49a490

File tree

7 files changed

+116
-69
lines changed

7 files changed

+116
-69
lines changed

src/backup.c

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -505,6 +505,9 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
505505
instance_config.pgdata, external_dirs, true);
506506
write_backup(&current, true);
507507

508+
/* Init backup page header map */
509+
init_header_map(&current);
510+
508511
/* init thread args with own file lists */
509512
threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
510513
threads_args = (backup_files_arg *) palloc(sizeof(backup_files_arg)*num_threads);
@@ -595,18 +598,13 @@ do_backup_instance(PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool
595598
set_min_recovery_point(pg_control, database_path, current.stop_lsn);
596599
}
597600

598-
/* close block header map */
599-
if (current.hdr_map.fp)
601+
/* close and sync page header map */
602+
if (current.hdr_map.w_fp)
600603
{
601-
if (fclose(current.hdr_map.fp))
602-
elog(ERROR, "Cannot close file \"%s\"", current.hdr_map.path);
604+
cleanup_header_map(&(current.hdr_map));
603605

604606
if (fio_sync(current.hdr_map.path, FIO_BACKUP_HOST) != 0)
605607
elog(ERROR, "Cannot sync file \"%s\": %s", current.hdr_map.path, strerror(errno));
606-
607-
current.hdr_map.fp = NULL;
608-
pg_free(current.hdr_map.buf);
609-
current.hdr_map.buf = NULL;
610608
}
611609

612610
/* close ssh session in main thread */

src/catalog.c

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -474,13 +474,8 @@ catalog_get_backup_list(const char *instance_name, time_t requested_backup_id)
474474
backup->database_dir = pgut_malloc(MAXPGPATH);
475475
join_path_components(backup->database_dir, backup->root_dir, DATABASE_DIR);
476476

477-
/* block header map, TODO: move to separate function */
478-
backup->hdr_map.fp = NULL;
479-
backup->hdr_map.buf = NULL;
480-
backup->hdr_map.path = pgut_malloc(MAXPGPATH);
481-
join_path_components(backup->hdr_map.path, backup->root_dir, HEADER_MAP);
482-
backup->hdr_map.path_tmp = pgut_malloc(MAXPGPATH);
483-
join_path_components(backup->hdr_map.path_tmp, backup->root_dir, HEADER_MAP_TMP);
477+
/* Initialize page header map */
478+
init_header_map(backup);
484479

485480
/* TODO: save encoded backup id */
486481
backup->backup_id = backup->start_time;
@@ -856,12 +851,7 @@ pgBackupCreateDir(pgBackup *backup)
856851
join_path_components(backup->database_dir, backup->root_dir, DATABASE_DIR);
857852

858853
/* block header map */
859-
backup->hdr_map.fp = NULL;
860-
backup->hdr_map.buf = NULL;
861-
backup->hdr_map.path = pgut_malloc(MAXPGPATH);
862-
join_path_components(backup->hdr_map.path, backup->root_dir, HEADER_MAP);
863-
backup->hdr_map.path_tmp = pgut_malloc(MAXPGPATH);
864-
join_path_components(backup->hdr_map.path_tmp, backup->root_dir, HEADER_MAP_TMP);
854+
init_header_map(backup);
865855

866856
/* create directories for actual backup files */
867857
for (i = 0; i < parray_num(subdirs); i++)
@@ -2287,11 +2277,6 @@ pgBackupInit(pgBackup *backup)
22872277
backup->files = NULL;
22882278
backup->note = NULL;
22892279
backup->content_crc = 0;
2290-
2291-
backup->hdr_map.path = NULL;
2292-
backup->hdr_map.path_tmp = NULL;
2293-
backup->hdr_map.fp = NULL;
2294-
backup->hdr_map.mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
22952280
}
22962281

22972282
/* free pgBackup object */
@@ -2305,8 +2290,6 @@ pgBackupFree(void *backup)
23052290
pg_free(b->root_dir);
23062291
pg_free(b->database_dir);
23072292
pg_free(b->note);
2308-
pg_free(b->hdr_map.path);
2309-
pg_free(b->hdr_map.path_tmp);
23102293
pg_free(backup);
23112294
}
23122295

src/data.c

Lines changed: 78 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2127,10 +2127,10 @@ BackupPageHeader2*
21272127
get_data_file_headers(HeaderMap *hdr_map, pgFile *file, uint32 backup_version)
21282128
{
21292129
size_t read_len = 0;
2130-
FILE *in = NULL;
21312130
pg_crc32 hdr_crc;
21322131
BackupPageHeader2 *headers = NULL;
21332132
/* header decompression */
2133+
int z_len = 0;
21342134
char *zheaders = NULL;
21352135
const char *errormsg = NULL;
21362136

@@ -2140,16 +2140,36 @@ get_data_file_headers(HeaderMap *hdr_map, pgFile *file, uint32 backup_version)
21402140
if (file->n_headers <= 0)
21412141
return NULL;
21422142

2143-
in = fopen(hdr_map->path, PG_BINARY_R);
2143+
// in = fopen(hdr_map->path, PG_BINARY_R);
2144+
//
2145+
// if (!in)
2146+
// elog(ERROR, "Cannot open header file \"%s\": %s", hdr_map->path, strerror(errno));
21442147

2145-
if (!in)
2146-
elog(ERROR, "Cannot open header file \"%s\": %s", hdr_map->path, strerror(errno));
2148+
if (!hdr_map->r_fp)
2149+
{
2150+
pthread_lock(&(hdr_map->mutex));
21472151

2148-
/* disable buffering */
2149-
setvbuf(in, NULL, _IONBF, BUFSIZ);
2152+
/* it is possible for another contender got here first, so double check */
2153+
if (!hdr_map->r_fp) /* this file will be closed in restore.c and merge.c */
2154+
{
2155+
elog(LOG, "Opening page header map \"%s\"", hdr_map->path);
2156+
2157+
hdr_map->r_fp = fopen(hdr_map->path, PG_BINARY_R);
2158+
if (hdr_map->r_fp == NULL)
2159+
elog(ERROR, "Cannot open header file \"%s\": %s",
2160+
hdr_map->path, strerror(errno));
21502161

2151-
if (fseek(in, file->hdr_off, SEEK_SET))
2152-
elog(ERROR, "Cannot seek to position %lu in header map \"%s\": %s",
2162+
/* enable buffering for header file */
2163+
hdr_map->r_buf = pgut_malloc(LARGE_CHUNK_SIZE);
2164+
setvbuf(hdr_map->r_fp, hdr_map->r_buf, _IOFBF, LARGE_CHUNK_SIZE);
2165+
}
2166+
2167+
/* End critical section */
2168+
pthread_mutex_unlock(&(hdr_map->mutex));
2169+
}
2170+
2171+
if (fseek(hdr_map->r_fp, file->hdr_off, SEEK_SET))
2172+
elog(ERROR, "Cannot seek to position %lu in page header map \"%s\": %s",
21532173
file->hdr_off, hdr_map->path, strerror(errno));
21542174

21552175
/*
@@ -2164,21 +2184,22 @@ get_data_file_headers(HeaderMap *hdr_map, pgFile *file, uint32 backup_version)
21642184
zheaders = pgut_malloc(file->hdr_size);
21652185
memset(zheaders, 0, file->hdr_size);
21662186

2167-
if (fread(zheaders, 1, file->hdr_size, in) != file->hdr_size)
2187+
if (fread(zheaders, 1, file->hdr_size, hdr_map->r_fp) != file->hdr_size)
21682188
elog(ERROR, "Cannot read header file at offset: %li len: %i \"%s\": %s",
21692189
file->hdr_off, file->hdr_size, hdr_map->path, strerror(errno));
21702190

21712191
// elog(INFO, "zsize: %i, size: %i", file->hdr_size, read_len);
21722192

2173-
if (do_decompress(headers, read_len, zheaders, file->hdr_size,
2174-
ZLIB_COMPRESS, &errormsg) != read_len)
2193+
z_len = do_decompress(headers, read_len, zheaders, file->hdr_size,
2194+
ZLIB_COMPRESS, &errormsg);
2195+
if (z_len <= 0)
21752196
{
21762197
if (errormsg)
21772198
elog(ERROR, "An error occured during metadata decompression for file \"%s\": %s",
21782199
file->rel_path, errormsg);
21792200
else
2180-
elog(ERROR, "An error occured during metadata decompression for file \"%s\"",
2181-
file->rel_path);
2201+
elog(ERROR, "An error occured during metadata decompression for file \"%s\": %i",
2202+
file->rel_path, z_len);
21822203
}
21832204

21842205
/* validate checksum */
@@ -2190,9 +2211,6 @@ get_data_file_headers(HeaderMap *hdr_map, pgFile *file, uint32 backup_version)
21902211
elog(ERROR, "Header map for file \"%s\" crc mismatch \"%s\" offset: %lu, len: %lu, current: %u, expected: %u",
21912212
file->rel_path, hdr_map->path, file->hdr_off, read_len, hdr_crc, file->hdr_crc);
21922213

2193-
if (fclose(in))
2194-
elog(ERROR, "Cannot close header file \"%s\": %s", hdr_map->path, strerror(errno));
2195-
21962214
pg_free(zheaders);
21972215

21982216
return headers;
@@ -2217,18 +2235,18 @@ write_page_headers(BackupPageHeader2 *headers, pgFile *file, HeaderMap *hdr_map,
22172235
/* writing to header map must be serialized */
22182236
pthread_lock(&(hdr_map->mutex)); /* what if we crash while trying to obtain mutex? */
22192237

2220-
if (!hdr_map->fp)
2238+
if (!hdr_map->w_fp)
22212239
{
22222240
elog(LOG, "Creating page header map \"%s\"", map_path);
22232241

2224-
hdr_map->fp = fopen(map_path, PG_BINARY_W);
2225-
if (hdr_map->fp == NULL)
2242+
hdr_map->w_fp = fopen(map_path, PG_BINARY_W);
2243+
if (hdr_map->w_fp == NULL)
22262244
elog(ERROR, "Cannot open header file \"%s\": %s",
22272245
map_path, strerror(errno));
22282246

22292247
/* enable buffering for header file */
2230-
hdr_map->buf = pgut_malloc(STDIO_BUFSIZE);
2231-
setvbuf(hdr_map->fp, hdr_map->buf, _IOFBF, STDIO_BUFSIZE);
2248+
hdr_map->w_buf = pgut_malloc(LARGE_CHUNK_SIZE);
2249+
setvbuf(hdr_map->w_fp, hdr_map->w_buf, _IOFBF, LARGE_CHUNK_SIZE);
22322250

22332251
/* update file permission */
22342252
if (chmod(map_path, FILE_PERMISSION) == -1)
@@ -2238,7 +2256,7 @@ write_page_headers(BackupPageHeader2 *headers, pgFile *file, HeaderMap *hdr_map,
22382256
file->hdr_off = 0;
22392257
}
22402258
else
2241-
file->hdr_off = ftell(hdr_map->fp); /* TODO: replace by counter */
2259+
file->hdr_off = ftell(hdr_map->w_fp); /* TODO: replace by counter */
22422260

22432261
read_len = (file->n_headers+1) * sizeof(BackupPageHeader2);
22442262

@@ -2253,7 +2271,7 @@ write_page_headers(BackupPageHeader2 *headers, pgFile *file, HeaderMap *hdr_map,
22532271
z_len = do_compress(zheaders, read_len*2, headers,
22542272
read_len, ZLIB_COMPRESS, 1, &errormsg);
22552273

2256-
if (z_len < 0)
2274+
if (z_len <= 0)
22572275
{
22582276
if (errormsg)
22592277
elog(ERROR, "An error occured during compressing metadata for file \"%s\": %s",
@@ -2263,13 +2281,13 @@ write_page_headers(BackupPageHeader2 *headers, pgFile *file, HeaderMap *hdr_map,
22632281
file->rel_path, z_len);
22642282
}
22652283

2266-
if (fwrite(zheaders, 1, z_len, hdr_map->fp) != z_len)
2284+
if (fwrite(zheaders, 1, z_len, hdr_map->w_fp) != z_len)
22672285
elog(ERROR, "Cannot write to file \"%s\": %s", map_path, strerror(errno));
22682286

22692287
elog(VERBOSE, "Writing header map for file \"%s\" offset: %li, len: %i, crc: %u",
22702288
file->rel_path, file->hdr_off, z_len, file->hdr_crc);
22712289

2272-
// elog(INFO, "File: %s, Unzip: %i, zip: %i", file->rel_path, read_len, z_len);
2290+
elog(INFO, "File: %s, Unzip: %li, zip: %i", file->rel_path, read_len, z_len);
22732291

22742292
file->hdr_size = z_len;
22752293

@@ -2278,3 +2296,38 @@ write_page_headers(BackupPageHeader2 *headers, pgFile *file, HeaderMap *hdr_map,
22782296

22792297
pg_free(zheaders);
22802298
}
2299+
2300+
void
2301+
init_header_map(pgBackup *backup)
2302+
{
2303+
backup->hdr_map.r_fp = NULL;
2304+
backup->hdr_map.w_fp = NULL;
2305+
backup->hdr_map.r_buf = NULL;
2306+
backup->hdr_map.w_buf = NULL;
2307+
join_path_components(backup->hdr_map.path, backup->root_dir, HEADER_MAP);
2308+
join_path_components(backup->hdr_map.path_tmp, backup->root_dir, HEADER_MAP_TMP);
2309+
backup->hdr_map.mutex = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
2310+
}
2311+
2312+
void
2313+
cleanup_header_map(HeaderMap *hdr_map)
2314+
{
2315+
2316+
/* cleanup read descriptor */
2317+
if (hdr_map->r_fp && fclose(hdr_map->r_fp))
2318+
elog(ERROR, "Cannot close file \"%s\"", hdr_map->path);
2319+
2320+
hdr_map->r_fp = NULL;
2321+
pg_free(hdr_map->r_buf);
2322+
hdr_map->r_buf = NULL;
2323+
hdr_map->r_offset = 0;
2324+
2325+
/* cleanup write descriptor */
2326+
if (hdr_map->w_fp && fclose(hdr_map->w_fp))
2327+
elog(ERROR, "Cannot close file \"%s\"", hdr_map->path);
2328+
2329+
hdr_map->w_fp = NULL;
2330+
pg_free(hdr_map->w_buf);
2331+
hdr_map->w_buf = NULL;
2332+
hdr_map->w_offset = 0;
2333+
}

src/merge.c

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -706,24 +706,26 @@ merge_chain(parray *parent_chain, pgBackup *full_backup, pgBackup *dest_backup)
706706
pretty_time);
707707

708708
/* If temp header map descriptor is open, then close it and make rename */
709-
if (full_backup->hdr_map.fp)
709+
if (full_backup->hdr_map.w_fp)
710710
{
711-
if (fclose(full_backup->hdr_map.fp))
712-
elog(ERROR, "Cannot close file \"%s\"", full_backup->hdr_map.path);
711+
cleanup_header_map(&(full_backup->hdr_map));
713712

714713
/* sync new header map to dist */
715714
if (fio_sync(full_backup->hdr_map.path_tmp, FIO_BACKUP_HOST) != 0)
716715
elog(ERROR, "Cannot sync temp header map \"%s\": %s",
717716
full_backup->hdr_map.path_tmp, strerror(errno));
718717

719718
/* Replace old header map with new one */
720-
if (rename(full_backup->hdr_map.path_tmp, full_backup->hdr_map.path) == -1)
719+
if (rename(full_backup->hdr_map.path_tmp, full_backup->hdr_map.path))
721720
elog(ERROR, "Could not rename file \"%s\" to \"%s\": %s",
722721
full_backup->hdr_map.path_tmp, full_backup->hdr_map.path, strerror(errno));
722+
}
723723

724-
full_backup->hdr_map.fp = NULL;
725-
pg_free(full_backup->hdr_map.buf);
726-
full_backup->hdr_map.buf = NULL;
724+
/* Close page header maps */
725+
for (i = parray_num(parent_chain) - 1; i >= 0; i--)
726+
{
727+
pgBackup *backup = (pgBackup *) parray_get(parent_chain, i);
728+
cleanup_header_map(&(backup->hdr_map));
727729
}
728730

729731
/*
@@ -868,11 +870,8 @@ merge_chain(parray *parent_chain, pgBackup *full_backup, pgBackup *dest_backup)
868870
full_backup->root_dir = pgut_strdup(destination_path);
869871
}
870872

871-
/* Reinit some path variables */
873+
/* Reinit path to database_dir */
872874
join_path_components(full_backup->database_dir, full_backup->root_dir, DATABASE_DIR);
873-
join_path_components(full_backup->hdr_map.path, full_backup->root_dir, HEADER_MAP);
874-
join_path_components(full_backup->hdr_map.path_tmp, full_backup->root_dir, HEADER_MAP_TMP);
875-
full_backup->hdr_map.fp = NULL;
876875

877876
/* If we crash here, it will produce full backup in MERGED
878877
* status, located in directory with wrong backup id.

src/pg_probackup.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ extern const char *PROGRAM_EMAIL;
9292

9393
#define ERRMSG_MAX_LEN 2048
9494
#define CHUNK_SIZE (128 * 1024)
95+
#define LARGE_CHUNK_SIZE (4 * 1024 * 1024)
9596
#define OUT_BUF_SIZE (512 * 1024)
9697

9798
/* retry attempts */
@@ -364,11 +365,14 @@ typedef struct PGNodeInfo
364365
/* structure used for access to block header map */
365366
typedef struct HeaderMap
366367
{
367-
char *path;
368-
char *path_tmp; /* used only in merge */
369-
char *buf; /* buffer */
370-
FILE *fp;
371-
off_t offset;
368+
char path[MAXPGPATH];
369+
char path_tmp[MAXPGPATH]; /* used only in merge */
370+
char *r_buf; /* buffer */
371+
char *w_buf; /* buffer */
372+
FILE *r_fp; /* descriptor used for reading */
373+
FILE *w_fp; /* descriptor used for writing */
374+
off_t r_offset; /* current position in r_fp */
375+
off_t w_offset; /* current position in w_fp */
372376
pthread_mutex_t mutex;
373377

374378
} HeaderMap;
@@ -1021,6 +1025,8 @@ extern bool validate_file_pages(pgFile *file, const char *fullpath, XLogRecPtr s
10211025

10221026
extern BackupPageHeader2* get_data_file_headers(HeaderMap *hdr_map, pgFile *file, uint32 backup_version);
10231027
extern void write_page_headers(BackupPageHeader2 *headers, pgFile *file, HeaderMap *hdr_map, bool is_merge);
1028+
extern void init_header_map(pgBackup *backup);
1029+
extern void cleanup_header_map(HeaderMap *hdr_map);
10241030
/* parsexlog.c */
10251031
extern bool extractPageMap(const char *archivedir, uint32 wal_seg_size,
10261032
XLogRecPtr startpoint, TimeLineID start_tli,

src/restore.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,13 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain,
925925
elog(ERROR, "Backup files restoring failed. Transfered bytes: %s, time elapsed: %s",
926926
pretty_total_bytes, pretty_time);
927927

928+
/* Close page header maps */
929+
for (i = parray_num(parent_chain) - 1; i >= 0; i--)
930+
{
931+
pgBackup *backup = (pgBackup *) parray_get(parent_chain, i);
932+
cleanup_header_map(&(backup->hdr_map));
933+
}
934+
928935
if (no_sync)
929936
elog(WARNING, "Restored files are not synced to disk");
930937
else

src/validate.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ pgBackupValidate(pgBackup *backup, pgRestoreParams *params)
185185
/* cleanup */
186186
parray_walk(files, pgFileFree);
187187
parray_free(files);
188+
cleanup_header_map(&(backup->hdr_map));
188189

189190
/* Update backup status */
190191
if (corrupted)

0 commit comments

Comments
 (0)