15
15
16
16
static int push_file_internal_uncompressed (const char * wal_file_name , const char * pg_xlog_dir ,
17
17
const char * archive_dir , bool overwrite , bool no_sync ,
18
- uint32 archive_timeout );
18
+ uint32 archive_timeout , xlogFileType type );
19
19
#ifdef HAVE_LIBZ
20
20
static int push_file_internal_gz (const char * wal_file_name , const char * pg_xlog_dir ,
21
- const char * archive_dir , bool overwrite , bool no_sync ,
22
- int compress_level , uint32 archive_timeout );
21
+ const char * archive_dir , bool overwrite , bool no_sync ,
22
+ int compress_level , uint32 archive_timeout , xlogFileType type );
23
23
#endif
24
24
static void * push_files (void * arg );
25
25
static void * get_files (void * arg );
26
+ static bool
27
+ get_wal_file_wrapper (const char * filename , const char * archive_root_dir ,
28
+ const char * to_fullpath , bool prefetch_mode );
26
29
static bool get_wal_file (const char * filename , const char * from_path , const char * to_path ,
27
30
bool prefetch_mode );
28
31
static int get_wal_file_internal (const char * from_path , const char * to_path , FILE * out ,
@@ -89,8 +92,9 @@ typedef struct
89
92
90
93
typedef struct WALSegno
91
94
{
92
- char name [MAXFNAMELEN ];
93
- volatile pg_atomic_flag lock ;
95
+ char name [MAXFNAMELEN ];
96
+ volatile pg_atomic_flag lock ;
97
+ xlogFileType type ;
94
98
} WALSegno ;
95
99
96
100
static int push_file (WALSegno * xlogfile , const char * archive_status_dir ,
@@ -102,6 +106,28 @@ static int push_file(WALSegno *xlogfile, const char *archive_status_dir,
102
106
static parray * setup_push_filelist (const char * archive_status_dir ,
103
107
const char * first_file , int batch_size );
104
108
109
+ static xlogFileType
110
+ get_xlogFileType (const char * filename )
111
+ {
112
+
113
+ if IsXLogFileName (filename )
114
+ return SEGMENT ;
115
+
116
+ else if IsPartialXLogFileName (filename )
117
+ return PARTIAL_SEGMENT ;
118
+
119
+ else if IsBackupHistoryFileName (filename )
120
+ return BACKUP_HISTORY_FILE ;
121
+
122
+ else if IsTLHistoryFileName (filename )
123
+ return HISTORY_FILE ;
124
+
125
+ else if IsBackupHistoryFileName (filename )
126
+ return BACKUP_HISTORY_FILE ;
127
+
128
+ return UNKNOWN ;
129
+ }
130
+
105
131
/*
106
132
* At this point, we already done one roundtrip to archive server
107
133
* to get instance config.
@@ -185,6 +211,8 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa
185
211
parray_num (batch_files ), batch_size ,
186
212
is_compress ? "zlib" : "none" );
187
213
214
+ /* TODO: create subdirectories here, not in internal functions */
215
+
188
216
num_threads = n_threads ;
189
217
190
218
/* Single-thread push
@@ -366,12 +394,12 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir,
366
394
if (!is_compress )
367
395
rc = push_file_internal_uncompressed (xlogfile -> name , pg_xlog_dir ,
368
396
archive_dir , overwrite , no_sync ,
369
- archive_timeout );
397
+ archive_timeout , xlogfile -> type );
370
398
#ifdef HAVE_LIBZ
371
399
else
372
400
rc = push_file_internal_gz (xlogfile -> name , pg_xlog_dir , archive_dir ,
373
401
overwrite , no_sync , compress_level ,
374
- archive_timeout );
402
+ archive_timeout , xlogfile -> type );
375
403
#endif
376
404
377
405
/* take '--no-ready-rename' flag into account */
@@ -408,13 +436,14 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir,
408
436
int
409
437
push_file_internal_uncompressed (const char * wal_file_name , const char * pg_xlog_dir ,
410
438
const char * archive_dir , bool overwrite , bool no_sync ,
411
- uint32 archive_timeout )
439
+ uint32 archive_timeout , xlogFileType type )
412
440
{
413
441
FILE * in = NULL ;
414
442
int out = -1 ;
415
443
char * buf = pgut_malloc (OUT_BUF_SIZE ); /* 1MB buffer */
416
444
char from_fullpath [MAXPGPATH ];
417
445
char to_fullpath [MAXPGPATH ];
446
+ char archive_subdir [MAXPGPATH ];
418
447
/* partial handling */
419
448
struct stat st ;
420
449
char to_fullpath_part [MAXPGPATH ];
@@ -427,8 +456,16 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
427
456
/* from path */
428
457
join_path_components (from_fullpath , pg_xlog_dir , wal_file_name );
429
458
canonicalize_path (from_fullpath );
459
+
460
+ /* calculate subdir in WAL archive */
461
+ get_archive_subdir (archive_subdir , archive_dir , wal_file_name , type );
462
+
463
+ /* create subdirectory */
464
+ if (fio_mkdir (archive_subdir , DIR_PERMISSION , FIO_BACKUP_HOST ) != 0 )
465
+ elog (ERROR , "Cannot create subdirectory in WAL archive: '%s'" , archive_subdir );
466
+
430
467
/* to path */
431
- join_path_components (to_fullpath , archive_dir , wal_file_name );
468
+ join_path_components (to_fullpath , archive_subdir , wal_file_name );
432
469
canonicalize_path (to_fullpath );
433
470
434
471
/* Open source file for read */
@@ -647,14 +684,15 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
647
684
int
648
685
push_file_internal_gz (const char * wal_file_name , const char * pg_xlog_dir ,
649
686
const char * archive_dir , bool overwrite , bool no_sync ,
650
- int compress_level , uint32 archive_timeout )
687
+ int compress_level , uint32 archive_timeout , xlogFileType type )
651
688
{
652
689
FILE * in = NULL ;
653
690
gzFile out = NULL ;
654
691
char * buf = pgut_malloc (OUT_BUF_SIZE );
655
692
char from_fullpath [MAXPGPATH ];
656
693
char to_fullpath [MAXPGPATH ];
657
694
char to_fullpath_gz [MAXPGPATH ];
695
+ char archive_subdir [MAXPGPATH ];
658
696
659
697
/* partial handling */
660
698
struct stat st ;
@@ -669,8 +707,16 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
669
707
/* from path */
670
708
join_path_components (from_fullpath , pg_xlog_dir , wal_file_name );
671
709
canonicalize_path (from_fullpath );
710
+
711
+ /* calculate subdir in WAL archive */
712
+ get_archive_subdir (archive_subdir , archive_dir , wal_file_name , type );
713
+
714
+ /* create subdirectory */
715
+ if (fio_mkdir (archive_subdir , DIR_PERMISSION , FIO_BACKUP_HOST ) != 0 )
716
+ elog (ERROR , "Cannot create subdirectory in WAL archive: '%s'" , archive_subdir );
717
+
672
718
/* to path */
673
- join_path_components (to_fullpath , archive_dir , wal_file_name );
719
+ join_path_components (to_fullpath , archive_subdir , wal_file_name );
674
720
canonicalize_path (to_fullpath );
675
721
676
722
/* destination file with .gz suffix */
@@ -940,15 +986,17 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
940
986
{
941
987
int i ;
942
988
WALSegno * xlogfile = NULL ;
943
- parray * status_files = NULL ;
944
- parray * batch_files = parray_new ();
989
+ parray * status_files = NULL ;
990
+ parray * batch_files = parray_new ();
945
991
946
992
/* guarantee that first filename is in batch list */
947
993
xlogfile = palloc (sizeof (WALSegno ));
948
994
pg_atomic_init_flag (& xlogfile -> lock );
949
995
snprintf (xlogfile -> name , MAXFNAMELEN , "%s" , first_file );
950
996
parray_append (batch_files , xlogfile );
951
997
998
+ xlogfile -> type = get_xlogFileType (xlogfile -> name );
999
+
952
1000
if (batch_size < 2 )
953
1001
return batch_files ;
954
1002
@@ -980,6 +1028,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
980
1028
pg_atomic_init_flag (& xlogfile -> lock );
981
1029
982
1030
snprintf (xlogfile -> name , MAXFNAMELEN , "%s" , filename );
1031
+
1032
+ xlogfile -> type = get_xlogFileType (xlogfile -> name );
983
1033
parray_append (batch_files , xlogfile );
984
1034
985
1035
if (parray_num (batch_files ) >= batch_size )
@@ -1048,7 +1098,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha
1048
1098
1049
1099
/* full filepath to WAL file in archive directory.
1050
1100
* $BACKUP_PATH/wal/instance_name/000000010000000000000001 */
1051
- join_path_components (backup_wal_file_path , instanceState -> instance_wal_subdir_path , wal_file_name );
1101
+ // join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name);
1052
1102
1053
1103
INSTR_TIME_SET_CURRENT (start_time );
1054
1104
if (num_threads > batch_size )
@@ -1177,7 +1227,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha
1177
1227
1178
1228
while (fail_count < 3 )
1179
1229
{
1180
- if (get_wal_file (wal_file_name , backup_wal_file_path , absolute_wal_file_path , false))
1230
+ if (get_wal_file_wrapper (wal_file_name , instanceState -> instance_wal_subdir_path , absolute_wal_file_path , false))
1181
1231
{
1182
1232
fail_count = 0 ;
1183
1233
elog (INFO , "pg_probackup archive-get copied WAL file %s" , wal_file_name );
@@ -1260,7 +1310,7 @@ uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir,
1260
1310
/* It is ok, maybe requested batch is greater than the number of available
1261
1311
* files in the archive
1262
1312
*/
1263
- if (!get_wal_file (xlogfile -> name , from_fullpath , to_fullpath , true))
1313
+ if (!get_wal_file_wrapper (xlogfile -> name , archive_dir , to_fullpath , true))
1264
1314
{
1265
1315
elog (LOG , "Thread [%d]: Failed to prefetch WAL segment %s" , 0 , xlogfile -> name );
1266
1316
break ;
@@ -1334,7 +1384,7 @@ get_files(void *arg)
1334
1384
join_path_components (from_fullpath , args -> archive_dir , xlogfile -> name );
1335
1385
join_path_components (to_fullpath , args -> prefetch_dir , xlogfile -> name );
1336
1386
1337
- if (!get_wal_file (xlogfile -> name , from_fullpath , to_fullpath , true))
1387
+ if (!get_wal_file_wrapper (xlogfile -> name , args -> archive_dir , to_fullpath , true))
1338
1388
{
1339
1389
/* It is ok, maybe requested batch is greater than the number of available
1340
1390
* files in the archive
@@ -1353,6 +1403,38 @@ get_files(void *arg)
1353
1403
return NULL ;
1354
1404
}
1355
1405
1406
+ /*
1407
+ * First we try to copy from WAL archive subdirectory:
1408
+ * Failing that, try WAL archive root directory
1409
+ */
1410
+ bool
1411
+ get_wal_file_wrapper (const char * filename , const char * archive_root_dir ,
1412
+ const char * to_fullpath , bool prefetch_mode )
1413
+ {
1414
+ bool success = false;
1415
+ char archive_subdir [MAXPGPATH ];
1416
+ char from_fullpath [MAXPGPATH ];
1417
+ xlogFileType type = get_xlogFileType (filename );
1418
+
1419
+ if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE )
1420
+ {
1421
+ /* first try subdir ... */
1422
+ get_archive_subdir (archive_subdir , archive_root_dir , filename , type );
1423
+ join_path_components (from_fullpath , archive_subdir , filename );
1424
+
1425
+ success = get_wal_file (filename , from_fullpath , to_fullpath , prefetch_mode );
1426
+ }
1427
+
1428
+ if (!success )
1429
+ {
1430
+ /* ... fallback to archive dir for backward compatibility purposes */
1431
+ join_path_components (from_fullpath , archive_root_dir , filename );
1432
+ success = get_wal_file (filename , from_fullpath , to_fullpath , prefetch_mode );
1433
+ }
1434
+
1435
+ return success ;
1436
+ }
1437
+
1356
1438
/*
1357
1439
* Copy WAL segment from archive catalog to pgdata with possible decompression.
1358
1440
* When running in prefetch mode, we should not error out.
@@ -1755,3 +1837,30 @@ uint32 maintain_prefetch(const char *prefetch_dir, XLogSegNo first_segno, uint32
1755
1837
1756
1838
return n_files ;
1757
1839
}
1840
+
1841
+ /* Calculate subdir path in WAL archive directory. Example:
1842
+ * 000000010000000200000013 -> 00000002
1843
+ */
1844
+ void
1845
+ get_archive_subdir (char * archive_subdir , const char * archive_dir , const char * wal_file_name , xlogFileType type )
1846
+ {
1847
+ if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE )
1848
+ {
1849
+ int rc = 0 ;
1850
+ char tli [MAXFNAMELEN ];
1851
+ char log [MAXFNAMELEN ];
1852
+ char suffix [MAXFNAMELEN ];
1853
+
1854
+ rc = sscanf (wal_file_name , "%08s%08s%s" ,
1855
+ (char * ) & tli , (char * ) & log , (char * ) & suffix );
1856
+
1857
+ if (rc == 3 )
1858
+ {
1859
+ join_path_components (archive_subdir , archive_dir , log );
1860
+ return ;
1861
+ }
1862
+ }
1863
+
1864
+ /* for all other files just use root directory of WAL archive */
1865
+ strcpy (archive_subdir , archive_dir );
1866
+ }
0 commit comments