Skip to content

Commit 746a1a5

Browse files
committed
fix local delta backup
1 parent 2598c99 commit 746a1a5

File tree

3 files changed

+45
-13
lines changed

3 files changed

+45
-13
lines changed

src/catchup.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,7 @@ catchup_thread_runner(void *arg)
853853

854854
if (file->write_size == BYTES_INVALID)
855855
{
856-
elog(VERBOSE, "Skipping the unchanged file: \"%s\"", from_fullpath);
856+
elog(VERBOSE, "Skipping the unchanged file: \"%s\", read %li bytes", from_fullpath, file->read_size);
857857
continue;
858858
}
859859

src/data.c

+33-8
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ get_checksum_errormsg(Page page, char **errormsg, BlockNumber absolute_blkno)
268268
* PageIsOk(0) if page was successfully retrieved
269269
* PageIsTruncated(-1) if the page was truncated
270270
* SkipCurrentPage(-2) if we need to skip this page,
271-
* only used for DELTA backup
271+
* only used for DELTA and PTRACK backup
272272
* PageIsCorrupted(-3) if the page checksum mismatch
273273
* or header corruption,
274274
* only used for checkdb
@@ -403,7 +403,12 @@ prepare_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
403403
page_st->lsn > 0 &&
404404
page_st->lsn < prev_backup_start_lsn)
405405
{
406-
elog(VERBOSE, "Skipping blknum %u in file: \"%s\"", blknum, from_fullpath);
406+
elog(VERBOSE, "Skipping blknum %u in file: \"%s\", file->exists_in_prev: %s, page_st->lsn: %X/%X, prev_backup_start_lsn: %X/%X",
407+
blknum, from_fullpath,
408+
file->exists_in_prev ? "true" : "false",
409+
(uint32) (page_st->lsn >> 32), (uint32) page_st->lsn,
410+
(uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn
411+
);
407412
return SkipCurrentPage;
408413
}
409414

@@ -2255,7 +2260,7 @@ send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_f
22552260
/* copy local file (взята из send_pages, но используется простое копирование странички, без добавления заголовков и компрессии) */
22562261
int
22572262
copy_pages(const char *to_fullpath, const char *from_fullpath,
2258-
pgFile *file, XLogRecPtr prev_backup_start_lsn,
2263+
pgFile *file, XLogRecPtr sync_lsn,
22592264
uint32 checksum_version, bool use_pagemap,
22602265
BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema)
22612266
{
@@ -2303,13 +2308,26 @@ copy_pages(const char *to_fullpath, const char *from_fullpath,
23032308
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
23042309
}
23052310

2306-
/* ошибки бы тут обработать! */
2307-
out = open_local_file_rw(to_fullpath, &out_buf, STDIO_BUFSIZE);
2311+
out = fio_fopen(to_fullpath, PG_BINARY_R "+", FIO_BACKUP_HOST);
2312+
if (out == NULL)
2313+
elog(ERROR, "Cannot open destination file \"%s\": %s",
2314+
to_fullpath, strerror(errno));
2315+
2316+
/* update file permission */
2317+
if (fio_chmod(to_fullpath, file->mode, FIO_BACKUP_HOST) == -1)
2318+
elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
2319+
strerror(errno));
2320+
2321+
if (!fio_is_remote_file(out))
2322+
{
2323+
out_buf = pgut_malloc(STDIO_BUFSIZE);
2324+
setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE);
2325+
}
23082326

23092327
while (blknum < file->n_blocks)
23102328
{
23112329
PageState page_st;
2312-
int rc = prepare_page(file, prev_backup_start_lsn,
2330+
int rc = prepare_page(file, sync_lsn,
23132331
blknum, in, backup_mode, curr_page,
23142332
true, checksum_version,
23152333
ptrack_version_num, ptrack_schema,
@@ -2318,7 +2336,14 @@ copy_pages(const char *to_fullpath, const char *from_fullpath,
23182336
break;
23192337

23202338
else if (rc == PageIsOk)
2339+
{
2340+
if (fio_fseek(out, blknum * BLCKSZ) < 0)
2341+
{
2342+
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
2343+
blknum, to_fullpath, strerror(errno));
2344+
}
23212345
copy_page(file, blknum, in, out, curr_page, to_fullpath);
2346+
}
23222347

23232348
n_blocks_read++;
23242349

@@ -2339,8 +2364,8 @@ copy_pages(const char *to_fullpath, const char *from_fullpath,
23392364
to_fullpath, strerror(errno));
23402365

23412366
/* close local output file */
2342-
if (out && fclose(out))
2343-
elog(ERROR, "Cannot close the backup file \"%s\": %s",
2367+
if (out && fio_fclose(out))
2368+
elog(ERROR, "Cannot close the destination file \"%s\": %s",
23442369
to_fullpath, strerror(errno));
23452370

23462371
pg_free(iter);

tests/catchup.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ def test_remote_delta_catchup(self):
217217
base_dir = os.path.join(module_name, fname, 'src'),
218218
set_replication = True,
219219
ptrack_enable = True,
220+
pg_options = { 'wal_log_hints': 'on' }
220221
)
221222
source_pg.slow_start()
222223
source_pg.safe_psql("postgres", "CREATE TABLE ultimate_question(answer int)")
@@ -227,7 +228,8 @@ def test_remote_delta_catchup(self):
227228
backup_mode = 'FULL',
228229
source_pgdata = source_pg.data_dir,
229230
destination_node = dest_pg,
230-
options = ['-d', 'postgres', '-p', str(source_pg.port), '--stream'])
231+
options = ['-d', 'postgres', '-p', str(source_pg.port), '--stream']
232+
)
231233
self.set_replica(source_pg, dest_pg)
232234
dest_options = {}
233235
dest_options['port'] = str(dest_pg.port)
@@ -236,7 +238,7 @@ def test_remote_delta_catchup(self):
236238
dest_pg.stop()
237239

238240
# make changes on master
239-
source_pg.pgbench_init(scale=10)
241+
source_pg.pgbench_init(scale = 10)
240242
pgbench = source_pg.pgbench(options=['-T', '10', '--no-vacuum'])
241243
pgbench.wait()
242244
source_pg.safe_psql("postgres", "INSERT INTO ultimate_question VALUES(42)")
@@ -247,13 +249,18 @@ def test_remote_delta_catchup(self):
247249
backup_mode = 'DELTA',
248250
source_pgdata = source_pg.data_dir,
249251
destination_node = dest_pg,
250-
options = ['-d', 'postgres', '-p', str(source_pg.port), '--stream'])
252+
options = ['-d', 'postgres', '-p', str(source_pg.port), '--stream']
253+
)
254+
255+
source_pgdata = self.pgdata_content(source_pg.data_dir)
256+
dest_pgdata = self.pgdata_content(dest_pg.data_dir)
257+
self.compare_pgdata(source_pgdata, dest_pgdata)
251258

252259
# stop replication
253260
source_pg.stop()
254261

255262
# check latest changes
256-
self.set_replica(source_pg, dest_pg)
263+
self.set_replica(master = source_pg, replica = dest_pg)
257264
dest_pg.slow_start(replica = True)
258265
self.assertEqual(
259266
result,

0 commit comments

Comments
 (0)