Skip to content

Commit e000001

Browse files
author
Artur Zakirov
committedAug 18, 2016
Fix bug #5: bug, return duplicate tuples
1 parent 8821cae commit e000001

File tree

2 files changed

+97
-19
lines changed

2 files changed

+97
-19
lines changed
 

‎rum.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -663,6 +663,13 @@ typedef struct
663663
bool recheck;
664664
} RumOrderingItem;
665665

666+
typedef enum
667+
{
668+
RumFastScan,
669+
RumRegularScan,
670+
RumFullScan
671+
} RumScanType;
672+
666673
typedef struct RumScanOpaqueData
667674
{
668675
MemoryContext tempCtx;
@@ -684,7 +691,7 @@ typedef struct RumScanOpaqueData
684691
RumKey key;
685692
bool firstCall;
686693
bool isVoidRes; /* true if query is unsatisfiable */
687-
bool useFastScan;
694+
RumScanType scanType;
688695
TIDBitmap *tbm;
689696

690697
ScanDirection naturalOrder;

‎rumget.c

Lines changed: 89 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ static bool scanPage(RumState * rumstate, RumScanEntry entry, RumKey *item,
3838
Page page, bool equalOk);
3939
static void insertScanItem(RumScanOpaque so, bool recheck);
4040
static int scan_entry_cmp(const void *p1, const void *p2, void *arg);
41-
static void entryGetItem(RumState * rumstate, RumScanEntry entry);
41+
static void entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList);
4242

4343

4444
/*
@@ -735,7 +735,7 @@ startScan(IndexScanDesc scan)
735735
RumScanOpaque so = (RumScanOpaque) scan->opaque;
736736
RumState *rumstate = &so->rumstate;
737737
uint32 i;
738-
bool useFastScan = false;
738+
RumScanType scanType = RumRegularScan;
739739

740740
MemoryContextSwitchTo(so->keyCtx);
741741
for (i = 0; i < so->totalentries; i++)
@@ -783,30 +783,36 @@ startScan(IndexScanDesc scan)
783783
{
784784
RumScanKey key = so->keys[i];
785785

786-
if (so->rumstate.canPreConsistent[key->attnum - 1])
786+
/* Check first key is it used to full-index scan */
787+
if (i == 0 && key->nentries > 0 && key->scanEntry[i]->scanWithAddInfo)
787788
{
788-
useFastScan = true;
789+
scanType = RumFullScan;
790+
break;
791+
}
792+
else if (so->rumstate.canPreConsistent[key->attnum - 1])
793+
{
794+
scanType = RumFastScan;
789795
break;
790796
}
791797
}
792798

793-
if (useFastScan)
799+
if (scanType == RumFastScan)
794800
{
795801
for (i = 0; i < so->totalentries; i++)
796802
{
797803
RumScanEntry entry = so->entries[i];
798804

799805
if (entry->isPartialMatch || entry->forceUseBitmap)
800806
{
801-
useFastScan = false;
807+
scanType = RumRegularScan;
802808
break;
803809
}
804810
}
805811
}
806812

807813
ItemPointerSetInvalid(&so->key.iptr);
808814

809-
if (useFastScan)
815+
if (scanType == RumFastScan)
810816
{
811817
/*
812818
* We are going to use fast scan. Do some preliminaries. Start scan of
@@ -819,13 +825,13 @@ startScan(IndexScanDesc scan)
819825
for (i = 0; i < so->totalentries; i++)
820826
{
821827
if (!so->sortedEntries[i]->isFinished)
822-
entryGetItem(&so->rumstate, so->sortedEntries[i]);
828+
entryGetItem(&so->rumstate, so->sortedEntries[i], NULL);
823829
}
824830
qsort_arg(so->sortedEntries, so->totalentries, sizeof(RumScanEntry),
825831
scan_entry_cmp, rumstate);
826832
}
827833

828-
so->useFastScan = useFastScan;
834+
so->scanType = scanType;
829835
}
830836

831837
/*
@@ -952,7 +958,7 @@ entryGetNextItem(RumState * rumstate, RumScanEntry entry)
952958
}
953959
}
954960

955-
static void
961+
static bool
956962
entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
957963
{
958964
Page page;
@@ -998,7 +1004,7 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
9981004
ItemPointerSetInvalid(&entry->curRumKey.iptr);
9991005
entry->isFinished = TRUE;
10001006
LockBuffer(entry->stack->buffer, RUM_UNLOCK);
1001-
return;
1007+
return false;
10021008
}
10031009

10041010
page = BufferGetPage(entry->stack->buffer);
@@ -1014,7 +1020,7 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
10141020
ItemPointerSetInvalid(&entry->curRumKey.iptr);
10151021
entry->isFinished = TRUE;
10161022
LockBuffer(entry->stack->buffer, RUM_UNLOCK);
1017-
return;
1023+
return false;
10181024
}
10191025

10201026
/*
@@ -1102,6 +1108,8 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
11021108

11031109
if (needUnlock)
11041110
LockBuffer(entry->stack->buffer, RUM_UNLOCK);
1111+
1112+
return true;
11051113
}
11061114

11071115
#define rum_rand() (((double) random()) / ((double) MAX_RANDOM_VALUE))
@@ -1121,10 +1129,13 @@ entryGetNextItemList(RumState * rumstate, RumScanEntry entry)
11211129
* current implementation this is guaranteed by the behavior of tidbitmaps.
11221130
*/
11231131
static void
1124-
entryGetItem(RumState * rumstate, RumScanEntry entry)
1132+
entryGetItem(RumState * rumstate, RumScanEntry entry, bool *nextEntryList)
11251133
{
11261134
Assert(!entry->isFinished);
11271135

1136+
if (nextEntryList)
1137+
*nextEntryList = false;
1138+
11281139
if (entry->matchBitmap)
11291140
{
11301141
Assert(ScanDirectionIsForward(entry->scanDirection));
@@ -1186,7 +1197,8 @@ entryGetItem(RumState * rumstate, RumScanEntry entry)
11861197
else if (entry->stack)
11871198
{
11881199
entry->offset++;
1189-
entryGetNextItemList(rumstate, entry);
1200+
if (entryGetNextItemList(rumstate, entry) && nextEntryList)
1201+
*nextEntryList = true;
11901202
}
11911203
else
11921204
{
@@ -1205,7 +1217,8 @@ entryGetItem(RumState * rumstate, RumScanEntry entry)
12051217
if (entry->stack && entry->isFinished)
12061218
{
12071219
entry->isFinished = FALSE;
1208-
entryGetNextItemList(rumstate, entry);
1220+
if (entryGetNextItemList(rumstate, entry) && nextEntryList)
1221+
*nextEntryList = true;
12091222
}
12101223
}
12111224
}
@@ -1490,7 +1503,7 @@ scanGetItemRegular(IndexScanDesc scan, RumKey *advancePast,
14901503
compareCurRumKeyScanDirection(rumstate, entry,
14911504
&myAdvancePast) <= 0))
14921505
{
1493-
entryGetItem(rumstate, entry);
1506+
entryGetItem(rumstate, entry, NULL);
14941507

14951508
if (!ItemPointerIsValid(&myAdvancePast.iptr))
14961509
break;
@@ -1915,7 +1928,7 @@ entryShift(int i, RumScanOpaque so, bool find)
19151928
entryFindItem(rumstate, so->sortedEntries[minIndex],
19161929
&so->sortedEntries[i - 1]->curRumKey);
19171930
else if (!so->sortedEntries[minIndex]->isFinished)
1918-
entryGetItem(rumstate, so->sortedEntries[minIndex]);
1931+
entryGetItem(rumstate, so->sortedEntries[minIndex], NULL);
19191932

19201933
/* Restore order of so->sortedEntries */
19211934
while (minIndex > 0 &&
@@ -2053,6 +2066,62 @@ scanGetItemFast(IndexScanDesc scan, RumKey *advancePast,
20532066
return false;
20542067
}
20552068

2069+
/*
2070+
* Get next item pointer using full-index scan.
2071+
*
2072+
* First key is used to full scan, other keys are only used for ranking.
2073+
*/
2074+
static bool
2075+
scanGetItemFull(IndexScanDesc scan, RumKey *advancePast,
2076+
RumKey *item, bool *recheck)
2077+
{
2078+
RumScanOpaque so = (RumScanOpaque) scan->opaque;
2079+
RumScanEntry entry;
2080+
bool nextEntryList;
2081+
uint32 i;
2082+
2083+
Assert(so->totalentries > 0);
2084+
Assert(so->entries[0]->scanWithAddInfo);
2085+
2086+
/*
2087+
* This is first entry of the first key, which is used for full-index
2088+
* scan.
2089+
*/
2090+
entry = so->entries[0];
2091+
2092+
entryGetItem(&so->rumstate, entry, &nextEntryList);
2093+
if (entry->isFinished == TRUE)
2094+
return false;
2095+
2096+
/* Move related order by entries */
2097+
if (nextEntryList)
2098+
for (i = 1; i < so->totalentries; i++)
2099+
{
2100+
RumScanEntry orderEntry = so->entries[i];
2101+
if (orderEntry->nlist > 0)
2102+
{
2103+
orderEntry->isFinished = FALSE;
2104+
orderEntry->offset = InvalidOffsetNumber;
2105+
RumItemSetMin(&orderEntry->curRumKey);
2106+
}
2107+
}
2108+
2109+
for (i = 1; i < so->totalentries; i++)
2110+
{
2111+
RumScanEntry orderEntry = so->entries[i];
2112+
2113+
while (orderEntry->isFinished == FALSE &&
2114+
(!ItemPointerIsValid(&orderEntry->curRumKey.iptr) ||
2115+
compareCurRumKeyScanDirection(&so->rumstate, orderEntry,
2116+
&entry->curRumKey) < 0))
2117+
entryGetItem(&so->rumstate, orderEntry, NULL);
2118+
}
2119+
2120+
*item = entry->curRumKey;
2121+
*recheck = false;
2122+
return true;
2123+
}
2124+
20562125
/*
20572126
* Get next item whether using regular or fast scan.
20582127
*/
@@ -2062,8 +2131,10 @@ scanGetItem(IndexScanDesc scan, RumKey *advancePast,
20622131
{
20632132
RumScanOpaque so = (RumScanOpaque) scan->opaque;
20642133

2065-
if (so->useFastScan)
2134+
if (so->scanType == RumFastScan)
20662135
return scanGetItemFast(scan, advancePast, item, recheck);
2136+
else if (so->scanType == RumFullScan)
2137+
return scanGetItemFull(scan, advancePast, item, recheck);
20672138
else
20682139
return scanGetItemRegular(scan, advancePast, item, recheck);
20692140
}

0 commit comments

Comments
 (0)
Please sign in to comment.