Skip to content

Commit 412269c

Browse files
committed
[Issue #174] use "--archive-timeout" option to set timeout for partial file expire
1 parent 3107931 commit 412269c

File tree

2 files changed

+72
-35
lines changed

2 files changed

+72
-35
lines changed

src/archive.c

+72-34
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414

1515
static int push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
1616
const char *archive_dir, bool overwrite, bool no_sync,
17-
int thread_num);
17+
int thread_num, uint32 archive_timeout);
1818
#ifdef HAVE_LIBZ
1919
static int gz_push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
2020
const char *archive_dir, bool overwrite, bool no_sync,
21-
int compress_level, int thread_num);
21+
int compress_level, int thread_num, uint32 archive_timeout);
2222
#endif
2323
static void *push_wal_segno(void *arg);
2424
static void get_wal_file(const char *from_path, const char *to_path);
@@ -33,14 +33,17 @@ static void copy_file_attributes(const char *from_path,
3333
typedef struct
3434
{
3535
TimeLineID tli;
36+
XLogSegNo first_segno;
3637
uint32 xlog_seg_size;
3738

3839
const char *pg_xlog_dir;
3940
const char *archive_dir;
41+
const char *archive_status_dir;
4042
bool overwrite;
4143
bool compress;
4244
bool no_sync;
4345
bool no_ready_rename;
46+
uint32 archive_timeout;
4447

4548
CompressAlg compress_alg;
4649
int compress_level;
@@ -73,8 +76,7 @@ typedef struct WALSegno
7376
* set archive_command to
7477
* 'pg_probackup archive-push -B /home/anastasia/backup --wal-file-name %f',
7578
* to move backups into arclog_path.
76-
* Where archlog_path is $BACKUP_PATH/wal/system_id.
77-
* Currently it just copies wal files to the new location.
79+
* Where archlog_path is $BACKUP_PATH/wal/instance_name
7880
*/
7981
int
8082
do_archive_push(InstanceConfig *instance, char *wal_file_path,
@@ -84,6 +86,7 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
8486
uint64 i;
8587
char current_dir[MAXPGPATH];
8688
char pg_xlog_dir[MAXPGPATH];
89+
char archive_status_dir[MAXPGPATH];
8790
uint64 system_id;
8891
bool is_compress = false;
8992

@@ -125,6 +128,7 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
125128
elog(ERROR, "Cannot use pglz for WAL compression");
126129

127130
join_path_components(pg_xlog_dir, current_dir, XLOGDIR);
131+
join_path_components(archive_status_dir, pg_xlog_dir, "archive_status");
128132

129133
/* Create 'archlog_path' directory. Do nothing if it already exists. */
130134
//fio_mkdir(instance->arclog_path, DIR_PERMISSION, FIO_BACKUP_HOST);
@@ -161,10 +165,12 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
161165
if (is_compress)
162166
gz_push_wal_file_internal(wal_file_name, pg_xlog_dir,
163167
instance->arclog_path, overwrite,
164-
no_sync, instance->compress_level, 1);
168+
no_sync, instance->compress_level, 1,
169+
instance->archive_timeout);
165170
else
166171
push_wal_file_internal(wal_file_name, pg_xlog_dir,
167-
instance->arclog_path, overwrite, no_sync, 1);
172+
instance->arclog_path, overwrite,
173+
no_sync, 1, instance->archive_timeout);
168174

169175
push_isok = true;
170176
total_pushed_files++;
@@ -175,8 +181,8 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
175181
if (IsXLogFileName(wal_file_name))
176182
GetXLogFromFileName(wal_file_name, &tli, &first_segno, instance->xlog_seg_size);
177183

184+
/* setup filelist and locks */
178185
files = parray_new();
179-
/* setup locks */
180186
for (i = first_segno; i < first_segno + batch_size; i++)
181187
{
182188
WALSegno *xlogfile = palloc(sizeof(WALSegno));
@@ -198,13 +204,16 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
198204
archive_push_arg *arg = &(threads_args[i]);
199205

200206
arg->tli = tli;
207+
arg->first_segno = first_segno;
201208
arg->xlog_seg_size = instance->xlog_seg_size;
202209
arg->archive_dir = instance->arclog_path;
203210
arg->pg_xlog_dir = pg_xlog_dir;
211+
arg->archive_status_dir = archive_status_dir;
204212
arg->overwrite = overwrite;
205213
arg->compress = is_compress;
206214
arg->no_sync = no_sync;
207215
arg->no_ready_rename = no_ready_rename;
216+
arg->archive_timeout = instance->archive_timeout;
208217

209218
arg->compress_alg = instance->compress_alg;
210219
arg->compress_level = instance->compress_level;
@@ -234,8 +243,9 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
234243
total_pushed_files += threads_args[i].n_pushed_files;
235244
}
236245

237-
/* Note, that we don`t do garbage collection here,
238-
* because pushing into archive is a very time-sensetive operation.
246+
/* Note, that we are leaking memory here,
247+
* because pushing into archive is a very
248+
* time-sensetive operation, so we skip freeing stuff.
239249
*/
240250

241251
push_done:
@@ -254,23 +264,20 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
254264
/* ------------- INTERNAL FUNCTIONS ---------- */
255265
/*
256266
* Copy WAL segment from pgdata to archive catalog with possible compression.
257-
*
267+
* TODO: make it possible to be greedy here, i.e. pushing everything
268+
* that is ready to be pushed.
258269
*/
259270
static void *
260271
push_wal_segno(void *arg)
261272
{
262273
int i;
263274
int rc;
264275
char wal_filename[MAXPGPATH];
265-
266-
char archive_status_dir[MAXPGPATH];
267276
char wal_file_dummy[MAXPGPATH];
268277
char wal_file_ready[MAXPGPATH];
269278
char wal_file_done[MAXPGPATH];
270279
archive_push_arg *args = (archive_push_arg *) arg;
271280

272-
join_path_components(archive_status_dir, args->pg_xlog_dir, "archive_status");
273-
274281
for (i = 0; i < parray_num(args->files); i++)
275282
{
276283
WALSegno *xlogfile = (WALSegno *) parray_get(args->files, i);
@@ -281,7 +288,7 @@ push_wal_segno(void *arg)
281288
/* At first we must construct WAL filename from segno, tli and xlog_seg_size */
282289
GetXLogFileName(wal_filename, args->tli, xlogfile->segno, args->xlog_seg_size);
283290

284-
join_path_components(wal_file_dummy, archive_status_dir, wal_filename);
291+
join_path_components(wal_file_dummy, args->archive_status_dir, wal_filename);
285292
snprintf(wal_file_ready, MAXPGPATH, "%s.%s", wal_file_dummy, "ready");
286293
snprintf(wal_file_done, MAXPGPATH, "%s.%s", wal_file_dummy, "done");
287294

@@ -298,17 +305,24 @@ push_wal_segno(void *arg)
298305
if (!args->compress)
299306
rc = push_wal_file_internal(wal_filename, args->pg_xlog_dir,
300307
args->archive_dir, args->overwrite,
301-
args->no_sync, args->thread_num);
308+
args->no_sync, args->thread_num,
309+
args->archive_timeout);
302310
#ifdef HAVE_LIBZ
303311
else
304312
rc = gz_push_wal_file_internal(wal_filename, args->pg_xlog_dir,
305313
args->archive_dir, args->overwrite, args->no_sync,
306-
args->compress_level, args->thread_num);
314+
args->compress_level, args->thread_num,
315+
args->archive_timeout);
307316
#endif
308317

309318
/* take '--no-ready-rename' flag into account */
310-
if (!args->no_ready_rename && rc == 0)
319+
if (!args->no_ready_rename && rc == 0 &&
320+
/* don`t rename ready file for the first segment,
321+
* postgres will complain via WARNING if we do that */
322+
args->first_segno != xlogfile->segno)
311323
{
324+
canonicalize_path(wal_file_ready);
325+
canonicalize_path(wal_file_done);
312326
/* It is ok to rename status file in archive_status directory */
313327
elog(VERBOSE, "Thread [%d]: Rename \"%s\" to \"%s\"", args->thread_num,
314328
wal_file_ready, wal_file_done);
@@ -318,7 +332,6 @@ push_wal_segno(void *arg)
318332
}
319333

320334
args->n_pushed_files++;
321-
322335
}
323336

324337
args->ret = 0;
@@ -336,7 +349,7 @@ push_wal_segno(void *arg)
336349
int
337350
push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
338351
const char *archive_dir, bool overwrite, bool no_sync,
339-
int thread_num)
352+
int thread_num, uint32 archive_timeout)
340353
{
341354
FILE *in = NULL;
342355
int out = -1;
@@ -353,8 +366,10 @@ push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
353366

354367
/* from path */
355368
join_path_components(from_fullpath, pg_xlog_dir, wal_file_name);
369+
canonicalize_path(from_fullpath);
356370
/* to path */
357371
join_path_components(to_fullpath, archive_dir, wal_file_name);
372+
canonicalize_path(to_fullpath);
358373

359374
/* Open source file for read */
360375
in = fio_fopen(from_fullpath, PG_BINARY_R, FIO_DB_HOST);
@@ -377,11 +392,20 @@ push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
377392
else
378393
goto part_opened;
379394

380-
/* Partial file already exists, it could have happened due to failed archive-push,
381-
* in this case partial file can be discarded, or due to concurrent archiving.
395+
/*
396+
* Partial file already exists, it could have happened due to:
397+
* 1. failed archive-push
398+
* 2. concurrent archiving
399+
*
400+
* For ARCHIVE_TIMEOUT period we will try to create partial file
401+
* and look for the size of already existing partial file, to
402+
* determine if it is changing or not.
403+
* If after ARCHIVE_TIMEOUT we still failed to create partial
404+
* file, we will make a decision about discarding
405+
* already existing partial file.
382406
*/
383-
/* TODO: use --archive-timeout */
384-
while (partial_try_count < PARTIAL_WAL_TIMER)
407+
408+
while (partial_try_count < archive_timeout)
385409
{
386410
if (fio_stat(to_fullpath_part, &st, false, FIO_BACKUP_HOST) < 0)
387411
{
@@ -407,8 +431,9 @@ push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
407431
/* first round */
408432
if (!partial_try_count)
409433
{
410-
elog(VERBOSE, "Thread [%d]: Temp WAL file already exists, waiting on it: \"%s\"",
411-
thread_num, to_fullpath_part);
434+
elog(VERBOSE, "Thread [%d]: Temp WAL file already exists, "
435+
"waiting on it %s seconds: \"%s\"",
436+
thread_num, archive_timeout, to_fullpath_part);
412437
partial_file_size = st.st_size;
413438
}
414439

@@ -433,7 +458,7 @@ push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
433458
{
434459
if (!partial_is_stale)
435460
elog(ERROR, "Thread [%d]: Failed to open temp WAL file \"%s\" in %i seconds",
436-
thread_num, to_fullpath_part, PARTIAL_WAL_TIMER);
461+
thread_num, to_fullpath_part, archive_timeout);
437462

438463
/* Partial segment is considered stale, so reuse it */
439464
elog(LOG, "Thread [%d]: Reusing stale temp WAL file \"%s\"",
@@ -562,7 +587,7 @@ push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
562587
int
563588
gz_push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
564589
const char *archive_dir, bool overwrite, bool no_sync,
565-
int compress_level, int thread_num)
590+
int compress_level, int thread_num, uint32 archive_timeout)
566591
{
567592
FILE *in = NULL;
568593
gzFile out = NULL;
@@ -582,8 +607,10 @@ gz_push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
582607

583608
/* from path */
584609
join_path_components(from_fullpath, pg_xlog_dir, wal_file_name);
610+
canonicalize_path(from_fullpath);
585611
/* to path */
586612
join_path_components(to_fullpath, archive_dir, wal_file_name);
613+
canonicalize_path(to_fullpath);
587614

588615
/* destination file with .gz suffix */
589616
snprintf(to_fullpath_gz, sizeof(to_fullpath_gz), "%s.gz", to_fullpath);
@@ -608,10 +635,20 @@ gz_push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
608635
else
609636
goto part_opened;
610637

611-
/* Partial file already exists, it could have happened due to failed archive-push,
612-
* in this case partial file can be discarded, or due to concurrent archiving.
638+
/*
639+
* Partial file already exists, it could have happened due to:
640+
* 1. failed archive-push
641+
* 2. concurrent archiving
642+
*
643+
* For ARCHIVE_TIMEOUT period we will try to create partial file
644+
* and look for the size of already existing partial file, to
645+
* determine if it is changing or not.
646+
* If after ARCHIVE_TIMEOUT we still failed to create partial
647+
* file, we will make a decision about discarding
648+
* already existing partial file.
613649
*/
614-
while (partial_try_count < PARTIAL_WAL_TIMER)
650+
651+
while (partial_try_count < archive_timeout)
615652
{
616653
if (fio_stat(to_fullpath_gz_part, &st, false, FIO_BACKUP_HOST) < 0)
617654
{
@@ -637,8 +674,9 @@ gz_push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
637674
/* first round */
638675
if (!partial_try_count)
639676
{
640-
elog(VERBOSE, "Thread [%d]: Temp WAL file already exists, waiting on it: \"%s\"",
641-
thread_num, to_fullpath_gz_part);
677+
elog(VERBOSE, "Thread [%d]: Temp WAL file already exists, "
678+
"waiting on it %s seconds: \"%s\"",
679+
thread_num, archive_timeout, to_fullpath_gz_part);
642680
partial_file_size = st.st_size;
643681
}
644682

@@ -663,7 +701,7 @@ gz_push_wal_file_internal(const char *wal_file_name, const char *pg_xlog_dir,
663701
{
664702
if (!partial_is_stale)
665703
elog(ERROR, "Thread [%d]: Failed to open temp WAL file \"%s\" in %i seconds",
666-
thread_num, to_fullpath_gz_part, PARTIAL_WAL_TIMER);
704+
thread_num, to_fullpath_gz_part, archive_timeout);
667705

668706
/* Partial segment is considered stale, so reuse it */
669707
elog(LOG, "Thread [%d]: Reusing stale temp WAL file \"%s\"",

src/pg_probackup.h

-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ extern const char *PROGRAM_EMAIL;
6767
#define DATABASE_MAP "database_map"
6868

6969
/* Timeout defaults */
70-
#define PARTIAL_WAL_TIMER 60
7170
#define ARCHIVE_TIMEOUT_DEFAULT 300
7271
#define REPLICA_TIMEOUT_DEFAULT 300
7372

0 commit comments

Comments
 (0)