Skip to content

Commit 0783103

Browse files
committed
[Issue #228] return BackupPageHeader into data file to fix merge retry, remote 8 bytes alignment in datafiles, do not skip header when running data file validation
1 parent ddbc6fd commit 0783103

File tree

5 files changed

+143
-83
lines changed

5 files changed

+143
-83
lines changed

src/data.c

Lines changed: 111 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@
2525
#include "utils/thread.h"
2626

2727
/* Union to ease operations on relation pages */
28-
typedef union DataPage
28+
typedef struct DataPage
2929
{
30-
PageHeaderData page_data;
30+
BackupPageHeader bph;
3131
char data[BLCKSZ];
3232
} DataPage;
3333

3434
static BackupPageHeader2* get_data_file_headers(const char *fullpath, pgFile *file, uint32 backup_version);
3535
static void write_page_headers(BackupPageHeader2 *headers, pgFile *file, const char* to_fullpath);
36-
static bool get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size,
37-
BlockNumber *blknum, pg_crc32 *crc, bool use_crc32c);
36+
static bool get_compressed_page_meta(FILE *in, const char *fullpath, BackupPageHeader* bph,
37+
pg_crc32 *crc, bool use_crc32c);
3838

3939
#ifdef HAVE_LIBZ
4040
/* Implementation of zlib compression method */
@@ -481,17 +481,15 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
481481
CompressAlg calg, int clevel,
482482
const char *from_fullpath, const char *to_fullpath)
483483
{
484-
// BackupPageHeader header;
485-
int compressed_size;
484+
int compressed_size = 0;
486485
size_t write_buffer_size = 0;
487-
char write_buffer[BLCKSZ];
488-
char compressed_page[BLCKSZ*2]; /* compressed page may require more space than uncompressed */
486+
char write_buffer[BLCKSZ*2]; /* compressed page may require more space than uncompressed */
487+
BackupPageHeader* bph = (BackupPageHeader*)write_buffer;
489488
const char *errormsg = NULL;
490489

491-
// header.block = blknum;
492-
493490
/* Compress the page */
494-
compressed_size = do_compress(compressed_page, sizeof(compressed_page),
491+
compressed_size = do_compress(write_buffer + sizeof(BackupPageHeader),
492+
sizeof(write_buffer) - sizeof(BackupPageHeader),
495493
page, BLCKSZ, calg, clevel,
496494
&errormsg);
497495
/* Something went wrong and errormsg was assigned, throw a warning */
@@ -501,21 +499,16 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
501499

502500
file->compress_alg = calg; /* TODO: wtf? why here? */
503501

504-
/* The page was successfully compressed. */
505-
if (compressed_size > 0 && compressed_size < BLCKSZ)
506-
{
507-
// memcpy(write_buffer, &header, sizeof(header));
508-
memcpy(write_buffer, compressed_page, compressed_size);
509-
write_buffer_size = compressed_size;
510-
}
511-
/* Non-positive value means that compression failed. Write it as is. */
512-
else
502+
/* compression didn`t worked */
503+
if (compressed_size <= 0 || compressed_size >= BLCKSZ)
513504
{
505+
/* Do not compress page */
506+
memcpy(write_buffer + sizeof(BackupPageHeader), page, BLCKSZ);
514507
compressed_size = BLCKSZ;
515-
// memcpy(write_buffer, &header, sizeof(header));
516-
memcpy(write_buffer, page, BLCKSZ);
517-
write_buffer_size = compressed_size;
518508
}
509+
bph->block = blknum;
510+
bph->compressed_size = compressed_size;
511+
write_buffer_size = compressed_size + sizeof(BackupPageHeader);
519512

520513
/* Update CRC */
521514
COMP_FILE_CRC32(true, *crc, write_buffer, write_buffer_size);
@@ -747,7 +740,7 @@ backup_non_data_file(pgFile *file, pgFile *prev_file,
747740
size_t
748741
restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out,
749742
const char *to_fullpath, bool use_bitmap, PageState *checksum_map,
750-
XLogRecPtr shift_lsn, datapagemap_t *lsn_map)
743+
XLogRecPtr shift_lsn, datapagemap_t *lsn_map, bool is_merge)
751744
{
752745
size_t total_write_len = 0;
753746
char *in_buf = pgut_malloc(STDIO_BUFSIZE);
@@ -825,9 +818,10 @@ restore_data_file(parray *parent_chain, pgFile *dest_file, FILE *out,
825818
setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
826819

827820
/* get headers for this file */
828-
headers = get_data_file_headers(from_fullpath, tmp_file, parse_program_version(backup->program_version));
821+
if (!is_merge)
822+
headers = get_data_file_headers(from_fullpath, tmp_file, parse_program_version(backup->program_version));
829823

830-
if (!headers && tmp_file->n_headers > 0)
824+
if (!is_merge && !headers && tmp_file->n_headers > 0)
831825
elog(ERROR, "Failed to get headers for file \"%s\"", from_fullpath);
832826

833827
/*
@@ -895,6 +889,7 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
895889
for (;;)
896890
{
897891
off_t write_pos;
892+
size_t len;
898893
size_t read_len;
899894
DataPage page;
900895
int32 compressed_size = 0;
@@ -918,23 +913,44 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
918913
blknum = headers[n_hdr].block;
919914
page_lsn = headers[n_hdr].lsn;
920915
page_crc = headers[n_hdr].checksum;
921-
/* calculate payload size by comparing current and next page positions */
922-
compressed_size = headers[n_hdr+1].pos - headers[n_hdr].pos;
916+
/* calculate payload size by comparing current and next page positions,
917+
* page header is not included */
918+
compressed_size = headers[n_hdr+1].pos - headers[n_hdr].pos - sizeof(BackupPageHeader);
923919

924920
Assert(compressed_size > 0);
925921
Assert(compressed_size <= BLCKSZ);
926922

927-
read_len = compressed_size;
923+
read_len = compressed_size + sizeof(BackupPageHeader);
928924
}
929925
else
930926
{
931-
if (get_compressed_page_meta(in, from_fullpath, &compressed_size,
932-
&blknum, NULL, false))
927+
/* We get into this function either when restoring old backup
928+
* or when merging something. Aligh read_len only in restoring
929+
* or merging old backup.
930+
*/
931+
if (get_compressed_page_meta(in, from_fullpath, &(page).bph, NULL, false))
933932
{
934933
cur_pos_in += sizeof(BackupPageHeader);
935934

936935
/* backward compatibility kludge TODO: remove in 3.0 */
937-
read_len = MAXALIGN(compressed_size);
936+
blknum = page.bph.block;
937+
compressed_size = page.bph.compressed_size;
938+
939+
/* this will backfire when retrying merge of old backups,
940+
* just pray that this will never happen.
941+
*/
942+
if (backup_version >= 20400)
943+
read_len = compressed_size;
944+
else
945+
read_len = MAXALIGN(compressed_size);
946+
947+
// elog(INFO, "FILE: %s", from_fullpath);
948+
// elog(INFO, "blknum: %i", blknum);
949+
//
950+
// elog(INFO, "POS: %u", cur_pos_in);
951+
// elog(INFO, "SIZE: %i", compressed_size);
952+
// elog(INFO, "ASIZE: %i", read_len);
953+
938954
}
939955
else
940956
break;
@@ -1008,9 +1024,9 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
10081024
if (map && datapagemap_is_set(map, blknum))
10091025
{
10101026
/* Backward compatibility kludge TODO: remove in 3.0
1011-
* skip to the next page for backup withot header file
1027+
* go to the next page.
10121028
*/
1013-
if (!headers && fseek(in, MAXALIGN(compressed_size), SEEK_CUR) != 0)
1029+
if (!headers && fseek(in, read_len, SEEK_CUR) != 0)
10141030
elog(ERROR, "Cannot seek block %u of '%s': %s",
10151031
blknum, from_fullpath, strerror(errno));
10161032
continue;
@@ -1027,9 +1043,14 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
10271043
}
10281044

10291045
/* read a page from file */
1030-
if (fread(page.data, 1, read_len, in) != read_len)
1046+
if (headers)
1047+
len = fread(&page, 1, read_len, in);
1048+
else
1049+
len = fread(page.data, 1, read_len, in);
1050+
1051+
if (len != read_len)
10311052
elog(ERROR, "Cannot read block %u file \"%s\": %s",
1032-
blknum, from_fullpath, strerror(errno));
1053+
blknum, from_fullpath, strerror(errno));
10331054

10341055
cur_pos_in += read_len;
10351056

@@ -1562,7 +1583,7 @@ validate_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
15621583
bool use_crc32c = backup_version <= 20021 || backup_version >= 20025;
15631584
BackupPageHeader2 *headers = NULL;
15641585
int n_hdr = -1;
1565-
off_t cur_pos = 0;
1586+
off_t cur_pos_in = 0;
15661587

15671588
elog(VERBOSE, "Validate relation blocks for file \"%s\"", fullpath);
15681589

@@ -1586,6 +1607,7 @@ validate_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
15861607
while (true)
15871608
{
15881609
int rc = 0;
1610+
size_t len = 0;
15891611
DataPage compressed_page; /* used as read buffer */
15901612
int compressed_size = 0;
15911613
DataPage page;
@@ -1603,58 +1625,78 @@ validate_file_pages(pgFile *file, const char *fullpath, XLogRecPtr stop_lsn,
16031625
break;
16041626

16051627
blknum = headers[n_hdr].block;
1606-
/* calculate payload size by comparing current and next page positions */
1607-
compressed_size = headers[n_hdr+1].pos - headers[n_hdr].pos;
1628+
/* calculate payload size by comparing current and next page positions,
1629+
* page header is not included.
1630+
*/
1631+
compressed_size = headers[n_hdr+1].pos - headers[n_hdr].pos - sizeof(BackupPageHeader);
16081632

16091633
Assert(compressed_size > 0);
16101634
Assert(compressed_size <= BLCKSZ);
16111635

1612-
if (cur_pos != headers[n_hdr].pos)
1636+
read_len = sizeof(BackupPageHeader) + compressed_size;
1637+
1638+
if (cur_pos_in != headers[n_hdr].pos)
16131639
{
16141640
if (fio_fseek(in, headers[n_hdr].pos) < 0)
16151641
elog(ERROR, "Cannot seek block %u of \"%s\": %s",
16161642
blknum, fullpath, strerror(errno));
1643+
else
1644+
elog(INFO, "Seek to %u", headers[n_hdr].pos);
16171645

1618-
cur_pos = headers[n_hdr].pos;
1646+
cur_pos_in = headers[n_hdr].pos;
16191647
}
1620-
1621-
read_len = compressed_size;
16221648
}
16231649
/* old backups rely on header located directly in data file */
16241650
else
16251651
{
1626-
if (!get_compressed_page_meta(in, fullpath, &compressed_size,
1627-
&blknum, &crc, use_crc32c))
1628-
break;
1629-
1630-
/* Backward compatibility kludge, TODO: remove in 3.0
1631-
* for some reason we padded compressed pages in old versions
1632-
*/
1633-
read_len = MAXALIGN(compressed_size);
1652+
if (get_compressed_page_meta(in, fullpath, &(compressed_page).bph, &crc, use_crc32c))
1653+
{
1654+
/* Backward compatibility kludge, TODO: remove in 3.0
1655+
* for some reason we padded compressed pages in old versions
1656+
*/
1657+
blknum = compressed_page.bph.block;
1658+
compressed_size = compressed_page.bph.compressed_size;
1659+
read_len = MAXALIGN(compressed_size);
1660+
}
1661+
else
1662+
break;
16341663
}
16351664

16361665
/* backward compatibility kludge TODO: remove in 3.0 */
16371666
if (compressed_size == PageIsTruncated)
16381667
{
1639-
elog(LOG, "Block %u of \"%s\" is truncated",
1668+
elog(INFO, "Block %u of \"%s\" is truncated",
16401669
blknum, fullpath);
16411670
continue;
16421671
}
16431672

16441673
Assert(compressed_size <= BLCKSZ);
16451674
Assert(compressed_size > 0);
16461675

1647-
if (fread(compressed_page.data, 1, read_len, in) != read_len)
1676+
if (headers)
1677+
len = fread(&compressed_page, 1, read_len, in);
1678+
else
1679+
len = fread(compressed_page.data, 1, read_len, in);
1680+
1681+
// elog(INFO, "POS: %u", cur_pos_in);
1682+
//
1683+
// elog(INFO, "LEN: %i", len);
1684+
// elog(INFO, "READ_LEN: %i", read_len);
1685+
1686+
if (len != read_len)
16481687
{
16491688
elog(WARNING, "Cannot read block %u file \"%s\": %s",
16501689
blknum, fullpath, strerror(errno));
16511690
return false;
16521691
}
16531692

16541693
/* update current position */
1655-
cur_pos += read_len;
1694+
cur_pos_in += read_len;
16561695

1657-
COMP_FILE_CRC32(use_crc32c, crc, compressed_page.data, read_len);
1696+
if (headers)
1697+
COMP_FILE_CRC32(use_crc32c, crc, &compressed_page, read_len);
1698+
else
1699+
COMP_FILE_CRC32(use_crc32c, crc, compressed_page.data, read_len);
16581700

16591701
if (compressed_size != BLCKSZ
16601702
|| page_may_be_compressed(compressed_page.data, file->compress_alg,
@@ -1878,40 +1920,39 @@ get_lsn_map(const char *fullpath, uint32 checksum_version,
18781920

18791921
/* */
18801922
bool
1881-
get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size,
1882-
BlockNumber *blknum, pg_crc32 *crc, bool use_crc32c)
1923+
get_compressed_page_meta(FILE *in, const char *fullpath, BackupPageHeader* bph,
1924+
pg_crc32 *crc, bool use_crc32c)
18831925
{
18841926

18851927
/* read BackupPageHeader */
1886-
BackupPageHeader header;
1887-
size_t read_len = fread(&header, 1, sizeof(header), in);
1928+
size_t read_len = fread(bph, 1, sizeof(BackupPageHeader), in);
18881929

18891930
if (ferror(in))
18901931
elog(ERROR, "Cannot read file \"%s\": %s",
18911932
fullpath, strerror(errno));
18921933

1893-
if (read_len != sizeof(header))
1934+
if (read_len != sizeof(BackupPageHeader))
18941935
{
18951936
if (read_len == 0 && feof(in))
18961937
return false; /* EOF found */
18971938
else if (read_len != 0 && feof(in))
18981939
elog(ERROR,
1899-
"Odd size page found at block %u of \"%s\"",
1900-
*blknum, fullpath);
1940+
"Odd size page found at offset %lu of \"%s\"",
1941+
ftell(in), fullpath);
19011942
else
1902-
elog(ERROR, "Cannot read header of block %u of \"%s\": %s",
1903-
*blknum, fullpath, strerror(errno));
1943+
elog(ERROR, "Cannot read header at offset %lu of \"%s\": %s",
1944+
ftell(in), fullpath, strerror(errno));
19041945
}
19051946

19061947
if (crc)
1907-
COMP_FILE_CRC32(use_crc32c, *crc, &header, read_len);
1948+
COMP_FILE_CRC32(use_crc32c, *crc, bph, read_len);
19081949

1909-
if (header.block == 0 && header.compressed_size == 0)
1950+
if (bph->block == 0 && bph->compressed_size == 0)
19101951
elog(ERROR, "Empty block in file \"%s\"", fullpath);
19111952

19121953

1913-
*blknum = header.block;
1914-
*compressed_size = header.compressed_size;
1954+
// *blknum = header.block;
1955+
// *compressed_size = header.compressed_size;
19151956

19161957
// elog(INFO, "blknum: %i", header.block);
19171958
// elog(INFO, "size: %i", header.compressed_size);
@@ -1920,7 +1961,7 @@ get_compressed_page_meta(FILE *in, const char *fullpath, int32 *compressed_size,
19201961
// elog(INFO, "BLKNUM: %i", *blknum);
19211962
// elog(INFO, "File: %s", fullpath);
19221963

1923-
Assert(*compressed_size != 0);
1964+
Assert(bph->compressed_size != 0);
19241965
return true;
19251966

19261967
}
@@ -2034,7 +2075,7 @@ send_pages(ConnectionArgs* conn_arg, const char *to_fullpath, const char *from_f
20342075
compressed_size = compress_and_backup_page(file, blknum, in, out, &(file->crc),
20352076
rc, curr_page, calg, clevel,
20362077
from_fullpath, to_fullpath);
2037-
cur_pos_out += compressed_size;
2078+
cur_pos_out += compressed_size + sizeof(BackupPageHeader);
20382079
}
20392080

20402081
n_blocks_read++;

0 commit comments

Comments
 (0)