Skip to content

Commit 3107931

Browse files
committed
[Issue #174] batch processing added
1 parent 5f33b24 commit 3107931

File tree

3 files changed

+112
-65
lines changed

3 files changed

+112
-65
lines changed

src/archive.c

+104-63
Original file line numberDiff line numberDiff line change
@@ -32,45 +32,56 @@ static void copy_file_attributes(const char *from_path,
3232

3333
typedef struct
3434
{
35-
TimeLineID tli;
36-
XLogSegNo segno;
37-
uint32 xlog_seg_size;
35+
TimeLineID tli;
36+
uint32 xlog_seg_size;
3837

3938
const char *pg_xlog_dir;
4039
const char *archive_dir;
41-
bool overwrite;
42-
bool compress;
43-
bool no_sync;
44-
bool no_ready_rename;
40+
bool overwrite;
41+
bool compress;
42+
bool no_sync;
43+
bool no_ready_rename;
4544

4645
CompressAlg compress_alg;
47-
int compress_level;
48-
int thread_num;
46+
int compress_level;
47+
int thread_num;
48+
49+
parray *files;
50+
51+
int n_pushed_files;
4952

5053
/*
5154
* Return value from the thread.
5255
* 0 means there is no error,
5356
* 1 - there is an error.
5457
* 2 - no error, but nothing to push
5558
*/
56-
int ret;
59+
int ret;
5760
} archive_push_arg;
5861

62+
typedef struct WALSegno
63+
{
64+
XLogSegNo segno;
65+
volatile pg_atomic_flag lock;
66+
} WALSegno;
67+
5968
/*
6069
* At this point, we already done one roundtrip to archive server
6170
* to get instance config.
6271
*
6372
* pg_probackup specific archive command for archive backups
64-
* set archive_command = 'pg_probackup archive-push -B /home/anastasia/backup
65-
* --wal-file-path %p --wal-file-name %f', to move backups into arclog_path.
73+
* set archive_command to
74+
* 'pg_probackup archive-push -B /home/anastasia/backup --wal-file-name %f',
75+
* to move backups into arclog_path.
6676
* Where archlog_path is $BACKUP_PATH/wal/system_id.
6777
* Currently it just copies wal files to the new location.
6878
*/
6979
int
7080
do_archive_push(InstanceConfig *instance, char *wal_file_path,
71-
char *wal_file_name, bool overwrite, bool no_sync, bool no_ready_rename)
81+
char *wal_file_name, int batch_size, bool overwrite,
82+
bool no_sync, bool no_ready_rename)
7283
{
73-
int i;
84+
uint64 i;
7485
char current_dir[MAXPGPATH];
7586
char pg_xlog_dir[MAXPGPATH];
7687
uint64 system_id;
@@ -86,9 +97,11 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
8697
bool push_isok = true;
8798

8899
/* reporting */
89-
int n_pushed_files = 0;
100+
int total_pushed_files = 0;
90101
pid_t my_pid;
91102

103+
parray *files = NULL;
104+
92105
my_pid = getpid();
93106

94107
if (wal_file_name == NULL)
@@ -123,21 +136,28 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
123136

124137
/* Single-thread push
125138
* There are two cases, when we don`t want to start multi-thread push:
126-
* - number of threads is equal to 1, multithreading isn`t cheap to start,
127-
* so creating, running and terminating one thread using generic
128-
* multithread approach can take almost as much time as copying itself.
139+
* - batch size is equal to 1; multithreading isn`t cheap to start,
140+
* so creating, running and terminating threads using generic
141+
* multithread approach to copy just one file can take almost as much
142+
* time as copying itself.
129143
* - file to push is not WAL file, but .history, .backup or .partial file.
130144
* we do not apply compression to such files.
131145
*/
132-
if (num_threads == 1 || !IsXLogFileName(wal_file_name))
146+
147+
if (num_threads > batch_size)
148+
num_threads = batch_size;
149+
150+
elog(INFO, "PID [%d]: pg_probackup push file %s into archive, "
151+
"threads: %i, batch size: %i, compression: %s",
152+
my_pid, wal_file_name, num_threads,
153+
batch_size, is_compress ? "zlib" : "none");
154+
155+
if (!IsXLogFileName(wal_file_name) || batch_size == 1)
133156
{
134157
/* do not apply compression to .backup, .history and .partial files */
135158
if (!IsXLogFileName(wal_file_name))
136159
is_compress = false;
137160

138-
elog(INFO, "PID [%d]: pg_probackup push file %s into archive, threads: 1, compression: %s",
139-
my_pid, wal_file_name, is_compress ? "zlib" : "none");
140-
141161
if (is_compress)
142162
gz_push_wal_file_internal(wal_file_name, pg_xlog_dir,
143163
instance->arclog_path, overwrite,
@@ -147,16 +167,25 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
147167
instance->arclog_path, overwrite, no_sync, 1);
148168

149169
push_isok = true;
150-
n_pushed_files++;
170+
total_pushed_files++;
151171
goto push_done;
152172
}
153173

154174
/* parse WAL filename */
155175
if (IsXLogFileName(wal_file_name))
156176
GetXLogFromFileName(wal_file_name, &tli, &first_segno, instance->xlog_seg_size);
157177

158-
elog(INFO, "PID [%d]: pg_probackup push file %s into archive, threads: %i, compression: %s",
159-
my_pid, wal_file_name, num_threads, is_compress ? "zlib" : "none");
178+
files = parray_new();
179+
/* setup locks */
180+
for (i = first_segno; i < first_segno + batch_size; i++)
181+
{
182+
WALSegno *xlogfile = palloc(sizeof(WALSegno));
183+
184+
xlogfile->segno = i;
185+
pg_atomic_init_flag(&xlogfile->lock);
186+
187+
parray_append(files, xlogfile);
188+
}
160189

161190
/* TODO: report actual executed command */
162191

@@ -169,7 +198,6 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
169198
archive_push_arg *arg = &(threads_args[i]);
170199

171200
arg->tli = tli;
172-
arg->segno = first_segno + i;
173201
arg->xlog_seg_size = instance->xlog_seg_size;
174202
arg->archive_dir = instance->arclog_path;
175203
arg->pg_xlog_dir = pg_xlog_dir;
@@ -181,6 +209,9 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
181209
arg->compress_alg = instance->compress_alg;
182210
arg->compress_level = instance->compress_level;
183211

212+
arg->files = files;
213+
arg->n_pushed_files = 0;
214+
184215
arg->thread_num = i+1;
185216
/* By default there are some error */
186217
arg->ret = 1;
@@ -199,8 +230,8 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
199230
pthread_join(threads[i], NULL);
200231
if (threads_args[i].ret == 1)
201232
push_isok = false;
202-
else if (threads_args[i].ret == 0)
203-
n_pushed_files++;
233+
234+
total_pushed_files += threads_args[i].n_pushed_files;
204235
}
205236

206237
/* Note, that we don`t do garbage collection here,
@@ -213,7 +244,7 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
213244
/* report number of files pushed into archive */
214245
elog(INFO, "PID [%d]: pg_probackup archive-push completed successfully, "
215246
"number of pushed files: %i",
216-
my_pid, n_pushed_files);
247+
my_pid, total_pushed_files);
217248
return 0;
218249
}
219250

@@ -228,56 +259,66 @@ do_archive_push(InstanceConfig *instance, char *wal_file_path,
228259
static void *
229260
push_wal_segno(void *arg)
230261
{
262+
int i;
231263
int rc;
232-
char wal_filename[MAXPGPATH];
233-
archive_push_arg *args = (archive_push_arg *) arg;
234-
235-
char archive_status_dir[MAXPGPATH];
236-
char wal_file_dummy[MAXPGPATH];
237-
char wal_file_ready[MAXPGPATH];
238-
char wal_file_done[MAXPGPATH];
264+
char wal_filename[MAXPGPATH];
239265

240-
/* At first we must construct WAL filename from segno, tli and xlog_seg_size */
241-
GetXLogFileName(wal_filename, args->tli, args->segno, args->xlog_seg_size);
266+
char archive_status_dir[MAXPGPATH];
267+
char wal_file_dummy[MAXPGPATH];
268+
char wal_file_ready[MAXPGPATH];
269+
char wal_file_done[MAXPGPATH];
270+
archive_push_arg *args = (archive_push_arg *) arg;
242271

243272
join_path_components(archive_status_dir, args->pg_xlog_dir, "archive_status");
244-
join_path_components(wal_file_dummy, archive_status_dir, wal_filename);
245-
snprintf(wal_file_ready, MAXPGPATH, "%s.%s", wal_file_dummy, "ready");
246-
snprintf(wal_file_done, MAXPGPATH, "%s.%s", wal_file_dummy, "done");
247273

248-
/* For additional threads we must check the existence of .ready file */
249-
if (args->thread_num != 1)
274+
for (i = 0; i < parray_num(args->files); i++)
250275
{
276+
WALSegno *xlogfile = (WALSegno *) parray_get(args->files, i);
277+
278+
if (!pg_atomic_test_set_flag(&xlogfile->lock))
279+
continue;
280+
281+
/* At first we must construct WAL filename from segno, tli and xlog_seg_size */
282+
GetXLogFileName(wal_filename, args->tli, xlogfile->segno, args->xlog_seg_size);
283+
284+
join_path_components(wal_file_dummy, archive_status_dir, wal_filename);
285+
snprintf(wal_file_ready, MAXPGPATH, "%s.%s", wal_file_dummy, "ready");
286+
snprintf(wal_file_done, MAXPGPATH, "%s.%s", wal_file_dummy, "done");
287+
288+
/* Check the existence of .ready file */
251289
if (!fileExists(wal_file_ready, FIO_DB_HOST))
252290
{
253291
/* no ready file, nothing to do here */
254-
args->ret = 2;
255-
return NULL;
292+
continue;
256293
}
257-
}
258-
elog(LOG, "Thread [%d]: pushing file \"%s\"", args->thread_num, wal_filename);
259294

260-
/* If compression is not required, then just copy it as is */
261-
if (!args->compress)
262-
rc = push_wal_file_internal(wal_filename, args->pg_xlog_dir,
263-
args->archive_dir, args->overwrite,
264-
args->no_sync, args->thread_num);
295+
elog(LOG, "Thread [%d]: pushing file \"%s\"", args->thread_num, wal_filename);
296+
297+
/* If compression is not required, then just copy it as is */
298+
if (!args->compress)
299+
rc = push_wal_file_internal(wal_filename, args->pg_xlog_dir,
300+
args->archive_dir, args->overwrite,
301+
args->no_sync, args->thread_num);
265302
#ifdef HAVE_LIBZ
266-
else
267-
rc = gz_push_wal_file_internal(wal_filename, args->pg_xlog_dir,
303+
else
304+
rc = gz_push_wal_file_internal(wal_filename, args->pg_xlog_dir,
268305
args->archive_dir, args->overwrite, args->no_sync,
269306
args->compress_level, args->thread_num);
270307
#endif
271308

272-
/* take '--no-ready-rename' flag into account */
273-
if (!args->no_ready_rename && rc == 0 && args->thread_num != 1)
274-
{
275-
/* It is ok to rename status file in archive_status directory */
276-
elog(VERBOSE, "Thread [%d]: Rename \"%s\" to \"%s\"", args->thread_num,
277-
wal_file_ready, wal_file_done);
278-
if (fio_rename(wal_file_ready, wal_file_done, FIO_DB_HOST) < 0)
279-
elog(ERROR, "Thread [%d]: Cannot rename file \"%s\" to \"%s\": %s",
280-
args->thread_num, wal_file_ready, wal_file_done, strerror(errno));
309+
/* take '--no-ready-rename' flag into account */
310+
if (!args->no_ready_rename && rc == 0)
311+
{
312+
/* It is ok to rename status file in archive_status directory */
313+
elog(VERBOSE, "Thread [%d]: Rename \"%s\" to \"%s\"", args->thread_num,
314+
wal_file_ready, wal_file_done);
315+
if (fio_rename(wal_file_ready, wal_file_done, FIO_DB_HOST) < 0)
316+
elog(ERROR, "Thread [%d]: Cannot rename file \"%s\" to \"%s\": %s",
317+
args->thread_num, wal_file_ready, wal_file_done, strerror(errno));
318+
}
319+
320+
args->n_pushed_files++;
321+
281322
}
282323

283324
args->ret = 0;

src/pg_probackup.c

+6-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ bool compress_shortcut = false;
123123
char *instance_name;
124124

125125
/* archive push options */
126+
int batch_size = 1;
126127
static char *wal_file_path;
127128
static char *wal_file_name;
128129
static bool file_overwrite = false;
@@ -214,6 +215,7 @@ static ConfigOption cmd_options[] =
214215
{ 's', 151, "wal-file-name", &wal_file_name, SOURCE_CMD_STRICT },
215216
{ 'b', 152, "overwrite", &file_overwrite, SOURCE_CMD_STRICT },
216217
{ 'b', 153, "no-ready-rename", &no_ready_rename, SOURCE_CMD_STRICT },
218+
{ 'i', 162, "batch-size", &batch_size, SOURCE_CMD_STRICT },
217219
/* show options */
218220
{ 'f', 160, "format", opt_show_format, SOURCE_CMD_STRICT },
219221
{ 'b', 161, "archive", &show_archive, SOURCE_CMD_STRICT },
@@ -739,14 +741,17 @@ main(int argc, char *argv[])
739741
if (num_threads < 1)
740742
num_threads = 1;
741743

744+
if (batch_size < 1)
745+
batch_size = 1;
746+
742747
compress_init();
743748

744749
/* do actual operation */
745750
switch (backup_subcmd)
746751
{
747752
case ARCHIVE_PUSH_CMD:
748753
return do_archive_push(&instance_config, wal_file_path, wal_file_name,
749-
file_overwrite, no_sync, no_ready_rename);
754+
batch_size, file_overwrite, no_sync, no_ready_rename);
750755
case ARCHIVE_GET_CMD:
751756
return do_archive_get(&instance_config,
752757
wal_file_path, wal_file_name);

src/pg_probackup.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,8 @@ extern int do_add_instance(InstanceConfig *instance);
693693

694694
/* in archive.c */
695695
extern int do_archive_push(InstanceConfig *instance, char *wal_file_path,
696-
char *wal_file_name, bool overwrite, bool no_sync, bool no_ready_rename);
696+
char *wal_file_name, int batch_size, bool overwrite,
697+
bool no_sync, bool no_ready_rename);
697698
extern int do_archive_get(InstanceConfig *instance, char *wal_file_path,
698699
char *wal_file_name);
699700

0 commit comments

Comments
 (0)