Skip to content

Commit d7eb52d

Browse files
author
Amit Kapila
committed
Execute invalidation messages for each XLOG_XACT_INVALIDATIONS message
during logical decoding. Prior to commit c55040c we have no way of knowing the invalidations before commit. So, while decoding we use to execute all the invalidations at each command end as we had no way of knowing which invalidations happened before that command. Due to this, transactions involving large amounts of DDLs use to take more time and also lead to high CPU usage. But now we know specific invalidations at each command end so we execute only required invalidations. It has been observed that decoding of a transaction containing truncation of a table with 1000 partitions would be finished in 1s whereas before this patch it used to take 4-5 minutes. Author: Dilip Kumar Reviewed-by: Amit Kapila and Keisuke Kuroda Discussion: https://postgr.es/m/CANDwggKYveEtXjXjqHA6RL3AKSHMsQyfRY6bK+NqhAWJyw8psQ@mail.gmail.com
1 parent 564a410 commit d7eb52d

File tree

2 files changed

+92
-23
lines changed

2 files changed

+92
-23
lines changed

src/backend/replication/logical/reorderbuffer.c

+82-21
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
235235
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
236236
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
237237
ReorderBufferIterTXNState *state);
238-
static void ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn);
238+
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
239239

240240
/*
241241
* ---------------------------------------
@@ -486,6 +486,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
486486
pfree(change->data.msg.message);
487487
change->data.msg.message = NULL;
488488
break;
489+
case REORDER_BUFFER_CHANGE_INVALIDATION:
490+
if (change->data.inval.invalidations)
491+
pfree(change->data.inval.invalidations);
492+
change->data.inval.invalidations = NULL;
493+
break;
489494
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
490495
if (change->data.snapshot)
491496
{
@@ -2194,6 +2199,13 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
21942199
ReorderBufferApplyMessage(rb, txn, change, streaming);
21952200
break;
21962201

2202+
case REORDER_BUFFER_CHANGE_INVALIDATION:
2203+
/* Execute the invalidation messages locally */
2204+
ReorderBufferExecuteInvalidations(
2205+
change->data.inval.ninvalidations,
2206+
change->data.inval.invalidations);
2207+
break;
2208+
21972209
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
21982210
/* get rid of the old */
21992211
TeardownHistoricSnapshot(false);
@@ -2244,13 +2256,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
22442256

22452257
TeardownHistoricSnapshot(false);
22462258
SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2247-
2248-
/*
2249-
* Every time the CommandId is incremented, we could
2250-
* see new catalog contents, so execute all
2251-
* invalidations.
2252-
*/
2253-
ReorderBufferExecuteInvalidations(rb, txn);
22542259
}
22552260

22562261
break;
@@ -2317,7 +2322,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
23172322
AbortCurrentTransaction();
23182323

23192324
/* make sure there's no cache pollution */
2320-
ReorderBufferExecuteInvalidations(rb, txn);
2325+
ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
23212326

23222327
if (using_subtxn)
23232328
RollbackAndReleaseCurrentSubTransaction();
@@ -2356,7 +2361,8 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
23562361
AbortCurrentTransaction();
23572362

23582363
/* make sure there's no cache pollution */
2359-
ReorderBufferExecuteInvalidations(rb, txn);
2364+
ReorderBufferExecuteInvalidations(txn->ninvalidations,
2365+
txn->invalidations);
23602366

23612367
if (using_subtxn)
23622368
RollbackAndReleaseCurrentSubTransaction();
@@ -2813,23 +2819,30 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
28132819
* Setup the invalidation of the toplevel transaction.
28142820
*
28152821
* This needs to be called for each XLOG_XACT_INVALIDATIONS message and
2816-
* accumulates all the invalidation messages in the toplevel transaction.
2817-
* This is required because in some cases where we skip processing the
2818-
* transaction (see ReorderBufferForget), we need to execute all the
2819-
* invalidations together.
2822+
* accumulates all the invalidation messages in the toplevel transaction as
2823+
* well as in the form of change in reorder buffer. We require to record it in
2824+
* form of the change so that we can execute only the required invalidations
2825+
* instead of executing all the invalidations on each CommandId increment. We
2826+
* also need to accumulate these in the toplevel transaction because in some
2827+
* cases we skip processing the transaction (see ReorderBufferForget), we need
2828+
* to execute all the invalidations together.
28202829
*/
28212830
void
28222831
ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
28232832
XLogRecPtr lsn, Size nmsgs,
28242833
SharedInvalidationMessage *msgs)
28252834
{
28262835
ReorderBufferTXN *txn;
2836+
MemoryContext oldcontext;
2837+
ReorderBufferChange *change;
28272838

28282839
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
28292840

2841+
oldcontext = MemoryContextSwitchTo(rb->context);
2842+
28302843
/*
2831-
* We collect all the invalidations under the top transaction so that we
2832-
* can execute them all together.
2844+
* Collect all the invalidations under the top transaction so that we can
2845+
* execute them all together. See comment atop this function
28332846
*/
28342847
if (txn->toptxn)
28352848
txn = txn->toptxn;
@@ -2841,8 +2854,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
28412854
{
28422855
txn->ninvalidations = nmsgs;
28432856
txn->invalidations = (SharedInvalidationMessage *)
2844-
MemoryContextAlloc(rb->context,
2845-
sizeof(SharedInvalidationMessage) * nmsgs);
2857+
palloc(sizeof(SharedInvalidationMessage) * nmsgs);
28462858
memcpy(txn->invalidations, msgs,
28472859
sizeof(SharedInvalidationMessage) * nmsgs);
28482860
}
@@ -2856,19 +2868,31 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
28562868
nmsgs * sizeof(SharedInvalidationMessage));
28572869
txn->ninvalidations += nmsgs;
28582870
}
2871+
2872+
change = ReorderBufferGetChange(rb);
2873+
change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
2874+
change->data.inval.ninvalidations = nmsgs;
2875+
change->data.inval.invalidations = (SharedInvalidationMessage *)
2876+
palloc(sizeof(SharedInvalidationMessage) * nmsgs);
2877+
memcpy(change->data.inval.invalidations, msgs,
2878+
sizeof(SharedInvalidationMessage) * nmsgs);
2879+
2880+
ReorderBufferQueueChange(rb, xid, lsn, change, false);
2881+
2882+
MemoryContextSwitchTo(oldcontext);
28592883
}
28602884

28612885
/*
28622886
* Apply all invalidations we know. Possibly we only need parts at this point
28632887
* in the changestream but we don't know which those are.
28642888
*/
28652889
static void
2866-
ReorderBufferExecuteInvalidations(ReorderBuffer *rb, ReorderBufferTXN *txn)
2890+
ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
28672891
{
28682892
int i;
28692893

2870-
for (i = 0; i < txn->ninvalidations; i++)
2871-
LocalExecuteInvalidationMessage(&txn->invalidations[i]);
2894+
for (i = 0; i < nmsgs; i++)
2895+
LocalExecuteInvalidationMessage(&msgs[i]);
28722896
}
28732897

28742898
/*
@@ -3301,6 +3325,24 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
33013325
change->data.msg.message_size);
33023326
data += change->data.msg.message_size;
33033327

3328+
break;
3329+
}
3330+
case REORDER_BUFFER_CHANGE_INVALIDATION:
3331+
{
3332+
char *data;
3333+
Size inval_size = sizeof(SharedInvalidationMessage) *
3334+
change->data.inval.ninvalidations;
3335+
3336+
sz += inval_size;
3337+
3338+
ReorderBufferSerializeReserve(rb, sz);
3339+
data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3340+
3341+
/* might have been reallocated above */
3342+
ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3343+
memcpy(data, change->data.inval.invalidations, inval_size);
3344+
data += inval_size;
3345+
33043346
break;
33053347
}
33063348
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
@@ -3578,6 +3620,12 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
35783620

35793621
break;
35803622
}
3623+
case REORDER_BUFFER_CHANGE_INVALIDATION:
3624+
{
3625+
sz += sizeof(SharedInvalidationMessage) *
3626+
change->data.inval.ninvalidations;
3627+
break;
3628+
}
35813629
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
35823630
{
35833631
Snapshot snap;
@@ -3844,6 +3892,19 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
38443892
change->data.msg.message_size);
38453893
data += change->data.msg.message_size;
38463894

3895+
break;
3896+
}
3897+
case REORDER_BUFFER_CHANGE_INVALIDATION:
3898+
{
3899+
Size inval_size = sizeof(SharedInvalidationMessage) *
3900+
change->data.inval.ninvalidations;
3901+
3902+
change->data.inval.invalidations =
3903+
MemoryContextAlloc(rb->context, inval_size);
3904+
3905+
/* read the message */
3906+
memcpy(change->data.inval.invalidations, data, inval_size);
3907+
38473908
break;
38483909
}
38493910
case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:

src/include/replication/reorderbuffer.h

+10-2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ enum ReorderBufferChangeType
5757
REORDER_BUFFER_CHANGE_UPDATE,
5858
REORDER_BUFFER_CHANGE_DELETE,
5959
REORDER_BUFFER_CHANGE_MESSAGE,
60+
REORDER_BUFFER_CHANGE_INVALIDATION,
6061
REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
6162
REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
6263
REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -149,6 +150,13 @@ typedef struct ReorderBufferChange
149150
CommandId cmax;
150151
CommandId combocid;
151152
} tuplecid;
153+
154+
/* Invalidation. */
155+
struct
156+
{
157+
uint32 ninvalidations; /* Number of messages */
158+
SharedInvalidationMessage *invalidations; /* invalidation message */
159+
} inval;
152160
} data;
153161

154162
/*
@@ -313,8 +321,8 @@ typedef struct ReorderBufferTXN
313321
uint64 nentries_mem;
314322

315323
/*
316-
* List of ReorderBufferChange structs, including new Snapshots and new
317-
* CommandIds
324+
* List of ReorderBufferChange structs, including new Snapshots, new
325+
* CommandIds and command invalidation messages.
318326
*/
319327
dlist_head changes;
320328

0 commit comments

Comments
 (0)