mirror of
				https://github.com/jart/cosmopolitan.git
				synced 2025-10-26 11:10:58 +00:00 
			
		
		
		
	- Now integrated with `make tags` for Emacs IDE features - Delete some old deprecated broken full-text search engines - Rename .h → .inc files that don't meet our definition of header - Make sure every #include line is normal form so tools understand See #162
		
			
				
	
	
		
			2764 lines
		
	
	
	
		
			94 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
			
		
		
	
	
			2764 lines
		
	
	
	
		
			94 KiB
		
	
	
	
		
			C
		
	
	
	
	
	
| /*
 | |
| ** 2011-07-09
 | |
| **
 | |
| ** The author disclaims copyright to this source code.  In place of
 | |
| ** a legal notice, here is a blessing:
 | |
| **
 | |
| **    May you do good and not evil.
 | |
| **    May you find forgiveness for yourself and forgive others.
 | |
| **    May you share freely, never taking more than you give.
 | |
| **
 | |
| *************************************************************************
 | |
| ** This file contains code for the VdbeSorter object, used in concert with
 | |
| ** a VdbeCursor to sort large numbers of keys for CREATE INDEX statements
 | |
| ** or by SELECT statements with ORDER BY clauses that cannot be satisfied
 | |
| ** using indexes and without LIMIT clauses.
 | |
| **
 | |
| ** The VdbeSorter object implements a multi-threaded external merge sort
 | |
| ** algorithm that is efficient even if the number of elements being sorted
 | |
| ** exceeds the available memory.
 | |
| **
 | |
| ** Here is the (internal, non-API) interface between this module and the
 | |
| ** rest of the SQLite system:
 | |
| **
 | |
| **    sqlite3VdbeSorterInit()       Create a new VdbeSorter object.
 | |
| **
 | |
| **    sqlite3VdbeSorterWrite()      Add a single new row to the VdbeSorter
 | |
| **                                  object.  The row is a binary blob in the
 | |
| **                                  OP_MakeRecord format that contains both
 | |
| **                                  the ORDER BY key columns and result columns
 | |
| **                                  in the case of a SELECT w/ ORDER BY, or
 | |
| **                                  the complete record for an index entry
 | |
| **                                  in the case of a CREATE INDEX.
 | |
| **
 | |
| **    sqlite3VdbeSorterRewind()     Sort all content previously added.
 | |
| **                                  Position the read cursor on the
 | |
| **                                  first sorted element.
 | |
| **
 | |
| **    sqlite3VdbeSorterNext()       Advance the read cursor to the next sorted
 | |
| **                                  element.
 | |
| **
 | |
| **    sqlite3VdbeSorterRowkey()     Return the complete binary blob for the
 | |
| **                                  row currently under the read cursor.
 | |
| **
 | |
| **    sqlite3VdbeSorterCompare()    Compare the binary blob for the row
 | |
| **                                  currently under the read cursor against
 | |
| **                                  another binary blob X and report if
 | |
| **                                  X is strictly less than the read cursor.
 | |
| **                                  Used to enforce uniqueness in a
 | |
| **                                  CREATE UNIQUE INDEX statement.
 | |
| **
 | |
| **    sqlite3VdbeSorterClose()      Close the VdbeSorter object and reclaim
 | |
| **                                  all resources.
 | |
| **
 | |
| **    sqlite3VdbeSorterReset()      Refurbish the VdbeSorter for reuse.  This
 | |
| **                                  is like Close() followed by Init() only
 | |
| **                                  much faster.
 | |
| **
 | |
| ** The interfaces above must be called in a particular order.  Write() can
 | |
| ** only occur in between Init()/Reset() and Rewind().  Next(), Rowkey(), and
 | |
| ** Compare() can only occur in between Rewind() and Close()/Reset(). i.e.
 | |
| **
 | |
| **   Init()
 | |
| **   for each record: Write()
 | |
| **   Rewind()
 | |
| **     Rowkey()/Compare()
 | |
| **   Next()
 | |
| **   Close()
 | |
| **
 | |
| ** Algorithm:
 | |
| **
 | |
| ** Records passed to the sorter via calls to Write() are initially held
 | |
| ** unsorted in main memory. Assuming the amount of memory used never exceeds
 | |
| ** a threshold, when Rewind() is called the set of records is sorted using
 | |
| ** an in-memory merge sort. In this case, no temporary files are required
 | |
| ** and subsequent calls to Rowkey(), Next() and Compare() read records
 | |
| ** directly from main memory.
 | |
| **
 | |
| ** If the amount of space used to store records in main memory exceeds the
 | |
| ** threshold, then the set of records currently in memory are sorted and
 | |
| ** written to a temporary file in "Packed Memory Array" (PMA) format.
 | |
| ** A PMA created at this point is known as a "level-0 PMA". Higher levels
 | |
| ** of PMAs may be created by merging existing PMAs together - for example
 | |
| ** merging two or more level-0 PMAs together creates a level-1 PMA.
 | |
| **
 | |
| ** The threshold for the amount of main memory to use before flushing
 | |
| ** records to a PMA is roughly the same as the limit configured for the
 | |
| ** page-cache of the main database. Specifically, the threshold is set to
 | |
| ** the value returned by "PRAGMA main.page_size" multipled by
 | |
| ** that returned by "PRAGMA main.cache_size", in bytes.
 | |
| **
 | |
| ** If the sorter is running in single-threaded mode, then all PMAs generated
 | |
| ** are appended to a single temporary file. Or, if the sorter is running in
 | |
| ** multi-threaded mode then up to (N+1) temporary files may be opened, where
 | |
| ** N is the configured number of worker threads. In this case, instead of
 | |
| ** sorting the records and writing the PMA to a temporary file itself, the
 | |
| ** calling thread usually launches a worker thread to do so. Except, if
 | |
| ** there are already N worker threads running, the main thread does the work
 | |
| ** itself.
 | |
| **
 | |
| ** The sorter is running in multi-threaded mode if (a) the library was built
 | |
| ** with pre-processor symbol SQLITE_MAX_WORKER_THREADS set to a value greater
 | |
| ** than zero, and (b) worker threads have been enabled at runtime by calling
 | |
| ** "PRAGMA threads=N" with some value of N greater than 0.
 | |
| **
 | |
| ** When Rewind() is called, any data remaining in memory is flushed to a
 | |
| ** final PMA. So at this point the data is stored in some number of sorted
 | |
| ** PMAs within temporary files on disk.
 | |
| **
 | |
| ** If there are fewer than SORTER_MAX_MERGE_COUNT PMAs in total and the
 | |
| ** sorter is running in single-threaded mode, then these PMAs are merged
 | |
| ** incrementally as keys are retreived from the sorter by the VDBE.  The
 | |
| ** MergeEngine object, described in further detail below, performs this
 | |
| ** merge.
 | |
| **
 | |
| ** Or, if running in multi-threaded mode, then a background thread is
 | |
| ** launched to merge the existing PMAs. Once the background thread has
 | |
| ** merged T bytes of data into a single sorted PMA, the main thread
 | |
| ** begins reading keys from that PMA while the background thread proceeds
 | |
| ** with merging the next T bytes of data. And so on.
 | |
| **
 | |
| ** Parameter T is set to half the value of the memory threshold used
 | |
| ** by Write() above to determine when to create a new PMA.
 | |
| **
 | |
| ** If there are more than SORTER_MAX_MERGE_COUNT PMAs in total when
 | |
| ** Rewind() is called, then a hierarchy of incremental-merges is used.
 | |
| ** First, T bytes of data from the first SORTER_MAX_MERGE_COUNT PMAs on
 | |
| ** disk are merged together. Then T bytes of data from the second set, and
 | |
| ** so on, such that no operation ever merges more than SORTER_MAX_MERGE_COUNT
 | |
| ** PMAs at a time. This done is to improve locality.
 | |
| **
 | |
| ** If running in multi-threaded mode and there are more than
 | |
| ** SORTER_MAX_MERGE_COUNT PMAs on disk when Rewind() is called, then more
 | |
| ** than one background thread may be created. Specifically, there may be
 | |
| ** one background thread for each temporary file on disk, and one background
 | |
| ** thread to merge the output of each of the others to a single PMA for
 | |
| ** the main thread to read from.
 | |
| */
 | |
| #include "third_party/sqlite3/sqliteInt.inc"
 | |
| #include "third_party/sqlite3/vdbeInt.inc"
 | |
| /* clang-format off */
 | |
| 
 | |
| /* 
 | |
| ** If SQLITE_DEBUG_SORTER_THREADS is defined, this module outputs various
 | |
| ** messages to stderr that may be helpful in understanding the performance
 | |
| ** characteristics of the sorter in multi-threaded mode.
 | |
| */
 | |
| #if 0
 | |
| # define SQLITE_DEBUG_SORTER_THREADS 1
 | |
| #endif
 | |
| 
 | |
| /*
 | |
| ** Hard-coded maximum amount of data to accumulate in memory before flushing
 | |
| ** to a level 0 PMA. The purpose of this limit is to prevent various integer
 | |
| ** overflows. 512MiB.
 | |
| */
 | |
| #define SQLITE_MAX_PMASZ    (1<<29)
 | |
| 
 | |
| /*
 | |
| ** Private objects used by the sorter
 | |
| */
 | |
| typedef struct MergeEngine MergeEngine;     /* Merge PMAs together */
 | |
| typedef struct PmaReader PmaReader;         /* Incrementally read one PMA */
 | |
| typedef struct PmaWriter PmaWriter;         /* Incrementally write one PMA */
 | |
| typedef struct SorterRecord SorterRecord;   /* A record being sorted */
 | |
| typedef struct SortSubtask SortSubtask;     /* A sub-task in the sort process */
 | |
| typedef struct SorterFile SorterFile;       /* Temporary file object wrapper */
 | |
| typedef struct SorterList SorterList;       /* In-memory list of records */
 | |
| typedef struct IncrMerger IncrMerger;       /* Read & merge multiple PMAs */
 | |
| 
 | |
| /*
 | |
| ** A container for a temp file handle and the current amount of data 
 | |
| ** stored in the file.
 | |
| */
 | |
| struct SorterFile {
 | |
|   sqlite3_file *pFd;              /* File handle */
 | |
|   i64 iEof;                       /* Bytes of data stored in pFd */
 | |
| };
 | |
| 
 | |
| /*
 | |
| ** An in-memory list of objects to be sorted.
 | |
| **
 | |
| ** If aMemory==0 then each object is allocated separately and the objects
 | |
| ** are connected using SorterRecord.u.pNext.  If aMemory!=0 then all objects
 | |
| ** are stored in the aMemory[] bulk memory, one right after the other, and
 | |
| ** are connected using SorterRecord.u.iNext.
 | |
| */
 | |
| struct SorterList {
 | |
|   SorterRecord *pList;            /* Linked list of records */
 | |
|   u8 *aMemory;                    /* If non-NULL, bulk memory to hold pList */
 | |
|   int szPMA;                      /* Size of pList as PMA in bytes */
 | |
| };
 | |
| 
 | |
| /*
 | |
| ** The MergeEngine object is used to combine two or more smaller PMAs into
 | |
| ** one big PMA using a merge operation.  Separate PMAs all need to be
 | |
| ** combined into one big PMA in order to be able to step through the sorted
 | |
| ** records in order.
 | |
| **
 | |
| ** The aReadr[] array contains a PmaReader object for each of the PMAs being
 | |
| ** merged.  An aReadr[] object either points to a valid key or else is at EOF.
 | |
| ** ("EOF" means "End Of File".  When aReadr[] is at EOF there is no more data.)
 | |
| ** For the purposes of the paragraphs below, we assume that the array is
 | |
| ** actually N elements in size, where N is the smallest power of 2 greater
 | |
| ** to or equal to the number of PMAs being merged. The extra aReadr[] elements
 | |
| ** are treated as if they are empty (always at EOF).
 | |
| **
 | |
| ** The aTree[] array is also N elements in size. The value of N is stored in
 | |
| ** the MergeEngine.nTree variable.
 | |
| **
 | |
| ** The final (N/2) elements of aTree[] contain the results of comparing
 | |
| ** pairs of PMA keys together. Element i contains the result of 
 | |
| ** comparing aReadr[2*i-N] and aReadr[2*i-N+1]. Whichever key is smaller, the
 | |
| ** aTree element is set to the index of it. 
 | |
| **
 | |
| ** For the purposes of this comparison, EOF is considered greater than any
 | |
| ** other key value. If the keys are equal (only possible with two EOF
 | |
| ** values), it doesn't matter which index is stored.
 | |
| **
 | |
| ** The (N/4) elements of aTree[] that precede the final (N/2) described 
 | |
| ** above contains the index of the smallest of each block of 4 PmaReaders
 | |
| ** And so on. So that aTree[1] contains the index of the PmaReader that 
 | |
| ** currently points to the smallest key value. aTree[0] is unused.
 | |
| **
 | |
| ** Example:
 | |
| **
 | |
| **     aReadr[0] -> Banana
 | |
| **     aReadr[1] -> Feijoa
 | |
| **     aReadr[2] -> Elderberry
 | |
| **     aReadr[3] -> Currant
 | |
| **     aReadr[4] -> Grapefruit
 | |
| **     aReadr[5] -> Apple
 | |
| **     aReadr[6] -> Durian
 | |
| **     aReadr[7] -> EOF
 | |
| **
 | |
| **     aTree[] = { X, 5   0, 5    0, 3, 5, 6 }
 | |
| **
 | |
| ** The current element is "Apple" (the value of the key indicated by 
 | |
| ** PmaReader 5). When the Next() operation is invoked, PmaReader 5 will
 | |
| ** be advanced to the next key in its segment. Say the next key is
 | |
| ** "Eggplant":
 | |
| **
 | |
| **     aReadr[5] -> Eggplant
 | |
| **
 | |
| ** The contents of aTree[] are updated first by comparing the new PmaReader
 | |
| ** 5 key to the current key of PmaReader 4 (still "Grapefruit"). The PmaReader
 | |
| ** 5 value is still smaller, so aTree[6] is set to 5. And so on up the tree.
 | |
| ** The value of PmaReader 6 - "Durian" - is now smaller than that of PmaReader
 | |
| ** 5, so aTree[3] is set to 6. Key 0 is smaller than key 6 (Banana<Durian),
 | |
| ** so the value written into element 1 of the array is 0. As follows:
 | |
| **
 | |
| **     aTree[] = { X, 0   0, 6    0, 3, 5, 6 }
 | |
| **
 | |
| ** In other words, each time we advance to the next sorter element, log2(N)
 | |
| ** key comparison operations are required, where N is the number of segments
 | |
| ** being merged (rounded up to the next power of 2).
 | |
| */
 | |
| struct MergeEngine {
 | |
|   int nTree;                 /* Used size of aTree/aReadr (power of 2) */
 | |
|   SortSubtask *pTask;        /* Used by this thread only */
 | |
|   int *aTree;                /* Current state of incremental merge */
 | |
|   PmaReader *aReadr;         /* Array of PmaReaders to merge data from */
 | |
| };
 | |
| 
 | |
| /*
 | |
| ** This object represents a single thread of control in a sort operation.
 | |
| ** Exactly VdbeSorter.nTask instances of this object are allocated
 | |
| ** as part of each VdbeSorter object. Instances are never allocated any
 | |
| ** other way. VdbeSorter.nTask is set to the number of worker threads allowed
 | |
| ** (see SQLITE_CONFIG_WORKER_THREADS) plus one (the main thread).  Thus for
 | |
| ** single-threaded operation, there is exactly one instance of this object
 | |
| ** and for multi-threaded operation there are two or more instances.
 | |
| **
 | |
| ** Essentially, this structure contains all those fields of the VdbeSorter
 | |
| ** structure for which each thread requires a separate instance. For example,
 | |
| ** each thread requries its own UnpackedRecord object to unpack records in
 | |
| ** as part of comparison operations.
 | |
| **
 | |
| ** Before a background thread is launched, variable bDone is set to 0. Then, 
 | |
| ** right before it exits, the thread itself sets bDone to 1. This is used for 
 | |
| ** two purposes:
 | |
| **
 | |
| **   1. When flushing the contents of memory to a level-0 PMA on disk, to
 | |
| **      attempt to select a SortSubtask for which there is not already an
 | |
| **      active background thread (since doing so causes the main thread
 | |
| **      to block until it finishes).
 | |
| **
 | |
| **   2. If SQLITE_DEBUG_SORTER_THREADS is defined, to determine if a call
 | |
| **      to sqlite3ThreadJoin() is likely to block. Cases that are likely to
 | |
| **      block provoke debugging output.
 | |
| **
 | |
| ** In both cases, the effects of the main thread seeing (bDone==0) even
 | |
| ** after the thread has finished are not dire. So we don't worry about
 | |
| ** memory barriers and such here.
 | |
| */
 | |
| typedef int (*SorterCompare)(SortSubtask*,int*,const void*,int,const void*,int);
 | |
| struct SortSubtask {
 | |
|   SQLiteThread *pThread;          /* Background thread, if any */
 | |
|   int bDone;                      /* Set if thread is finished but not joined */
 | |
|   VdbeSorter *pSorter;            /* Sorter that owns this sub-task */
 | |
|   UnpackedRecord *pUnpacked;      /* Space to unpack a record */
 | |
|   SorterList list;                /* List for thread to write to a PMA */
 | |
|   int nPMA;                       /* Number of PMAs currently in file */
 | |
|   SorterCompare xCompare;         /* Compare function to use */
 | |
|   SorterFile file;                /* Temp file for level-0 PMAs */
 | |
|   SorterFile file2;               /* Space for other PMAs */
 | |
| };
 | |
| 
 | |
| 
 | |
| /*
 | |
| ** Main sorter structure. A single instance of this is allocated for each 
 | |
| ** sorter cursor created by the VDBE.
 | |
| **
 | |
| ** mxKeysize:
 | |
| **   As records are added to the sorter by calls to sqlite3VdbeSorterWrite(),
 | |
| **   this variable is updated so as to be set to the size on disk of the
 | |
| **   largest record in the sorter.
 | |
| */
 | |
| struct VdbeSorter {
 | |
|   int mnPmaSize;                  /* Minimum PMA size, in bytes */
 | |
|   int mxPmaSize;                  /* Maximum PMA size, in bytes.  0==no limit */
 | |
|   int mxKeysize;                  /* Largest serialized key seen so far */
 | |
|   int pgsz;                       /* Main database page size */
 | |
|   PmaReader *pReader;             /* Readr data from here after Rewind() */
 | |
|   MergeEngine *pMerger;           /* Or here, if bUseThreads==0 */
 | |
|   sqlite3 *db;                    /* Database connection */
 | |
|   KeyInfo *pKeyInfo;              /* How to compare records */
 | |
|   UnpackedRecord *pUnpacked;      /* Used by VdbeSorterCompare() */
 | |
|   SorterList list;                /* List of in-memory records */
 | |
|   int iMemory;                    /* Offset of free space in list.aMemory */
 | |
|   int nMemory;                    /* Size of list.aMemory allocation in bytes */
 | |
|   u8 bUsePMA;                     /* True if one or more PMAs created */
 | |
|   u8 bUseThreads;                 /* True to use background threads */
 | |
|   u8 iPrev;                       /* Previous thread used to flush PMA */
 | |
|   u8 nTask;                       /* Size of aTask[] array */
 | |
|   u8 typeMask;
 | |
|   SortSubtask aTask[1];           /* One or more subtasks */
 | |
| };
 | |
| 
 | |
| #define SORTER_TYPE_INTEGER 0x01
 | |
| #define SORTER_TYPE_TEXT    0x02
 | |
| 
 | |
| /*
 | |
| ** An instance of the following object is used to read records out of a
 | |
| ** PMA, in sorted order.  The next key to be read is cached in nKey/aKey.
 | |
| ** aKey might point into aMap or into aBuffer.  If neither of those locations
 | |
| ** contain a contiguous representation of the key, then aAlloc is allocated
 | |
| ** and the key is copied into aAlloc and aKey is made to poitn to aAlloc.
 | |
| **
 | |
| ** pFd==0 at EOF.
 | |
| */
 | |
| struct PmaReader {
 | |
|   i64 iReadOff;               /* Current read offset */
 | |
|   i64 iEof;                   /* 1 byte past EOF for this PmaReader */
 | |
|   int nAlloc;                 /* Bytes of space at aAlloc */
 | |
|   int nKey;                   /* Number of bytes in key */
 | |
|   sqlite3_file *pFd;          /* File handle we are reading from */
 | |
|   u8 *aAlloc;                 /* Space for aKey if aBuffer and pMap wont work */
 | |
|   u8 *aKey;                   /* Pointer to current key */
 | |
|   u8 *aBuffer;                /* Current read buffer */
 | |
|   int nBuffer;                /* Size of read buffer in bytes */
 | |
|   u8 *aMap;                   /* Pointer to mapping of entire file */
 | |
|   IncrMerger *pIncr;          /* Incremental merger */
 | |
| };
 | |
| 
 | |
| /*
 | |
| ** Normally, a PmaReader object iterates through an existing PMA stored 
 | |
| ** within a temp file. However, if the PmaReader.pIncr variable points to
 | |
| ** an object of the following type, it may be used to iterate/merge through
 | |
| ** multiple PMAs simultaneously.
 | |
| **
 | |
| ** There are two types of IncrMerger object - single (bUseThread==0) and 
 | |
| ** multi-threaded (bUseThread==1). 
 | |
| **
 | |
| ** A multi-threaded IncrMerger object uses two temporary files - aFile[0] 
 | |
| ** and aFile[1]. Neither file is allowed to grow to more than mxSz bytes in 
 | |
| ** size. When the IncrMerger is initialized, it reads enough data from 
 | |
| ** pMerger to populate aFile[0]. It then sets variables within the 
 | |
| ** corresponding PmaReader object to read from that file and kicks off 
 | |
| ** a background thread to populate aFile[1] with the next mxSz bytes of 
 | |
| ** sorted record data from pMerger. 
 | |
| **
 | |
| ** When the PmaReader reaches the end of aFile[0], it blocks until the
 | |
| ** background thread has finished populating aFile[1]. It then exchanges
 | |
| ** the contents of the aFile[0] and aFile[1] variables within this structure,
 | |
| ** sets the PmaReader fields to read from the new aFile[0] and kicks off
 | |
| ** another background thread to populate the new aFile[1]. And so on, until
 | |
| ** the contents of pMerger are exhausted.
 | |
| **
 | |
| ** A single-threaded IncrMerger does not open any temporary files of its
 | |
| ** own. Instead, it has exclusive access to mxSz bytes of space beginning
 | |
| ** at offset iStartOff of file pTask->file2. And instead of using a 
 | |
| ** background thread to prepare data for the PmaReader, with a single
 | |
| ** threaded IncrMerger the allocate part of pTask->file2 is "refilled" with
 | |
| ** keys from pMerger by the calling thread whenever the PmaReader runs out
 | |
| ** of data.
 | |
| */
 | |
| struct IncrMerger {
 | |
|   SortSubtask *pTask;             /* Task that owns this merger */
 | |
|   MergeEngine *pMerger;           /* Merge engine thread reads data from */
 | |
|   i64 iStartOff;                  /* Offset to start writing file at */
 | |
|   int mxSz;                       /* Maximum bytes of data to store */
 | |
|   int bEof;                       /* Set to true when merge is finished */
 | |
|   int bUseThread;                 /* True to use a bg thread for this object */
 | |
|   SorterFile aFile[2];            /* aFile[0] for reading, [1] for writing */
 | |
| };
 | |
| 
 | |
| /*
 | |
| ** An instance of this object is used for writing a PMA.
 | |
| **
 | |
| ** The PMA is written one record at a time.  Each record is of an arbitrary
 | |
| ** size.  But I/O is more efficient if it occurs in page-sized blocks where
 | |
| ** each block is aligned on a page boundary.  This object caches writes to
 | |
| ** the PMA so that aligned, page-size blocks are written.
 | |
| */
 | |
| struct PmaWriter {
 | |
|   int eFWErr;                     /* Non-zero if in an error state */
 | |
|   u8 *aBuffer;                    /* Pointer to write buffer */
 | |
|   int nBuffer;                    /* Size of write buffer in bytes */
 | |
|   int iBufStart;                  /* First byte of buffer to write */
 | |
|   int iBufEnd;                    /* Last byte of buffer to write */
 | |
|   i64 iWriteOff;                  /* Offset of start of buffer in file */
 | |
|   sqlite3_file *pFd;              /* File handle to write to */
 | |
| };
 | |
| 
 | |
| /*
 | |
| ** This object is the header on a single record while that record is being
 | |
| ** held in memory and prior to being written out as part of a PMA.
 | |
| **
 | |
| ** How the linked list is connected depends on how memory is being managed
 | |
| ** by this module. If using a separate allocation for each in-memory record
 | |
| ** (VdbeSorter.list.aMemory==0), then the list is always connected using the
 | |
| ** SorterRecord.u.pNext pointers.
 | |
| **
 | |
| ** Or, if using the single large allocation method (VdbeSorter.list.aMemory!=0),
 | |
| ** then while records are being accumulated the list is linked using the
 | |
| ** SorterRecord.u.iNext offset. This is because the aMemory[] array may
 | |
| ** be sqlite3Realloc()ed while records are being accumulated. Once the VM
 | |
| ** has finished passing records to the sorter, or when the in-memory buffer
 | |
| ** is full, the list is sorted. As part of the sorting process, it is
 | |
| ** converted to use the SorterRecord.u.pNext pointers. See function
 | |
| ** vdbeSorterSort() for details.
 | |
| */
 | |
| struct SorterRecord {
 | |
|   int nVal;                       /* Size of the record in bytes */
 | |
|   union {
 | |
|     SorterRecord *pNext;          /* Pointer to next record in list */
 | |
|     int iNext;                    /* Offset within aMemory of next record */
 | |
|   } u;
 | |
|   /* The data for the record immediately follows this header */
 | |
| };
 | |
| 
 | |
| /* Return a pointer to the buffer containing the record data for SorterRecord
 | |
| ** object p. Should be used as if:
 | |
| **
 | |
| **   void *SRVAL(SorterRecord *p) { return (void*)&p[1]; }
 | |
| */
 | |
| #define SRVAL(p) ((void*)((SorterRecord*)(p) + 1))
 | |
| 
 | |
| 
 | |
| /* Maximum number of PMAs that a single MergeEngine can merge */
 | |
| #define SORTER_MAX_MERGE_COUNT 16
 | |
| 
 | |
| static int vdbeIncrSwap(IncrMerger*);
 | |
| static void vdbeIncrFree(IncrMerger *);
 | |
| 
 | |
| /*
 | |
| ** Free all memory belonging to the PmaReader object passed as the
 | |
| ** argument. All structure fields are set to zero before returning.
 | |
| */
 | |
| static void vdbePmaReaderClear(PmaReader *pReadr){
 | |
|   sqlite3_free(pReadr->aAlloc);
 | |
|   sqlite3_free(pReadr->aBuffer);
 | |
|   if( pReadr->aMap ) sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
 | |
|   vdbeIncrFree(pReadr->pIncr);
 | |
|   memset(pReadr, 0, sizeof(PmaReader));
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Read the next nByte bytes of data from the PMA p.
 | |
| ** If successful, set *ppOut to point to a buffer containing the data
 | |
| ** and return SQLITE_OK. Otherwise, if an error occurs, return an SQLite
 | |
| ** error code.
 | |
| **
 | |
| ** The buffer returned in *ppOut is only valid until the
 | |
| ** next call to this function.
 | |
| */
 | |
| static int vdbePmaReadBlob(
 | |
|   PmaReader *p,                   /* PmaReader from which to take the blob */
 | |
|   int nByte,                      /* Bytes of data to read */
 | |
|   u8 **ppOut                      /* OUT: Pointer to buffer containing data */
 | |
| ){
 | |
|   int iBuf;                       /* Offset within buffer to read from */
 | |
|   int nAvail;                     /* Bytes of data available in buffer */
 | |
| 
 | |
|   if( p->aMap ){
 | |
|     *ppOut = &p->aMap[p->iReadOff];
 | |
|     p->iReadOff += nByte;
 | |
|     return SQLITE_OK;
 | |
|   }
 | |
| 
 | |
|   assert( p->aBuffer );
 | |
| 
 | |
|   /* If there is no more data to be read from the buffer, read the next 
 | |
|   ** p->nBuffer bytes of data from the file into it. Or, if there are less
 | |
|   ** than p->nBuffer bytes remaining in the PMA, read all remaining data.  */
 | |
|   iBuf = p->iReadOff % p->nBuffer;
 | |
|   if( iBuf==0 ){
 | |
|     int nRead;                    /* Bytes to read from disk */
 | |
|     int rc;                       /* sqlite3OsRead() return code */
 | |
| 
 | |
|     /* Determine how many bytes of data to read. */
 | |
|     if( (p->iEof - p->iReadOff) > (i64)p->nBuffer ){
 | |
|       nRead = p->nBuffer;
 | |
|     }else{
 | |
|       nRead = (int)(p->iEof - p->iReadOff);
 | |
|     }
 | |
|     assert( nRead>0 );
 | |
| 
 | |
|     /* Readr data from the file. Return early if an error occurs. */
 | |
|     rc = sqlite3OsRead(p->pFd, p->aBuffer, nRead, p->iReadOff);
 | |
|     assert( rc!=SQLITE_IOERR_SHORT_READ );
 | |
|     if( rc!=SQLITE_OK ) return rc;
 | |
|   }
 | |
|   nAvail = p->nBuffer - iBuf; 
 | |
| 
 | |
|   if( nByte<=nAvail ){
 | |
|     /* The requested data is available in the in-memory buffer. In this
 | |
|     ** case there is no need to make a copy of the data, just return a 
 | |
|     ** pointer into the buffer to the caller.  */
 | |
|     *ppOut = &p->aBuffer[iBuf];
 | |
|     p->iReadOff += nByte;
 | |
|   }else{
 | |
|     /* The requested data is not all available in the in-memory buffer.
 | |
|     ** In this case, allocate space at p->aAlloc[] to copy the requested
 | |
|     ** range into. Then return a copy of pointer p->aAlloc to the caller.  */
 | |
|     int nRem;                     /* Bytes remaining to copy */
 | |
| 
 | |
|     /* Extend the p->aAlloc[] allocation if required. */
 | |
|     if( p->nAlloc<nByte ){
 | |
|       u8 *aNew;
 | |
|       sqlite3_int64 nNew = MAX(128, 2*(sqlite3_int64)p->nAlloc);
 | |
|       while( nByte>nNew ) nNew = nNew*2;
 | |
|       aNew = sqlite3Realloc(p->aAlloc, nNew);
 | |
|       if( !aNew ) return SQLITE_NOMEM_BKPT;
 | |
|       p->nAlloc = nNew;
 | |
|       p->aAlloc = aNew;
 | |
|     }
 | |
| 
 | |
|     /* Copy as much data as is available in the buffer into the start of
 | |
|     ** p->aAlloc[].  */
 | |
|     memcpy(p->aAlloc, &p->aBuffer[iBuf], nAvail);
 | |
|     p->iReadOff += nAvail;
 | |
|     nRem = nByte - nAvail;
 | |
| 
 | |
|     /* The following loop copies up to p->nBuffer bytes per iteration into
 | |
|     ** the p->aAlloc[] buffer.  */
 | |
|     while( nRem>0 ){
 | |
|       int rc;                     /* vdbePmaReadBlob() return code */
 | |
|       int nCopy;                  /* Number of bytes to copy */
 | |
|       u8 *aNext;                  /* Pointer to buffer to copy data from */
 | |
| 
 | |
|       nCopy = nRem;
 | |
|       if( nRem>p->nBuffer ) nCopy = p->nBuffer;
 | |
|       rc = vdbePmaReadBlob(p, nCopy, &aNext);
 | |
|       if( rc!=SQLITE_OK ) return rc;
 | |
|       assert( aNext!=p->aAlloc );
 | |
|       memcpy(&p->aAlloc[nByte - nRem], aNext, nCopy);
 | |
|       nRem -= nCopy;
 | |
|     }
 | |
| 
 | |
|     *ppOut = p->aAlloc;
 | |
|   }
 | |
| 
 | |
|   return SQLITE_OK;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Read a varint from the stream of data accessed by p. Set *pnOut to
 | |
| ** the value read.
 | |
| */
 | |
| static int vdbePmaReadVarint(PmaReader *p, u64 *pnOut){
 | |
|   int iBuf;
 | |
| 
 | |
|   if( p->aMap ){
 | |
|     p->iReadOff += sqlite3GetVarint(&p->aMap[p->iReadOff], pnOut);
 | |
|   }else{
 | |
|     iBuf = p->iReadOff % p->nBuffer;
 | |
|     if( iBuf && (p->nBuffer-iBuf)>=9 ){
 | |
|       p->iReadOff += sqlite3GetVarint(&p->aBuffer[iBuf], pnOut);
 | |
|     }else{
 | |
|       u8 aVarint[16], *a;
 | |
|       int i = 0, rc;
 | |
|       do{
 | |
|         rc = vdbePmaReadBlob(p, 1, &a);
 | |
|         if( rc ) return rc;
 | |
|         aVarint[(i++)&0xf] = a[0];
 | |
|       }while( (a[0]&0x80)!=0 );
 | |
|       sqlite3GetVarint(aVarint, pnOut);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return SQLITE_OK;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Attempt to memory map file pFile. If successful, set *pp to point to the
 | |
| ** new mapping and return SQLITE_OK. If the mapping is not attempted 
 | |
| ** (because the file is too large or the VFS layer is configured not to use
 | |
| ** mmap), return SQLITE_OK and set *pp to NULL.
 | |
| **
 | |
| ** Or, if an error occurs, return an SQLite error code. The final value of
 | |
| ** *pp is undefined in this case.
 | |
| */
 | |
| static int vdbeSorterMapFile(SortSubtask *pTask, SorterFile *pFile, u8 **pp){
 | |
|   int rc = SQLITE_OK;
 | |
|   if( pFile->iEof<=(i64)(pTask->pSorter->db->nMaxSorterMmap) ){
 | |
|     sqlite3_file *pFd = pFile->pFd;
 | |
|     if( pFd->pMethods->iVersion>=3 ){
 | |
|       rc = sqlite3OsFetch(pFd, 0, (int)pFile->iEof, (void**)pp);
 | |
|       testcase( rc!=SQLITE_OK );
 | |
|     }
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Attach PmaReader pReadr to file pFile (if it is not already attached to
 | |
| ** that file) and seek it to offset iOff within the file.  Return SQLITE_OK 
 | |
| ** if successful, or an SQLite error code if an error occurs.
 | |
| */
 | |
| static int vdbePmaReaderSeek(
 | |
|   SortSubtask *pTask,             /* Task context */
 | |
|   PmaReader *pReadr,              /* Reader whose cursor is to be moved */
 | |
|   SorterFile *pFile,              /* Sorter file to read from */
 | |
|   i64 iOff                        /* Offset in pFile */
 | |
| ){
 | |
|   int rc = SQLITE_OK;
 | |
| 
 | |
|   assert( pReadr->pIncr==0 || pReadr->pIncr->bEof==0 );
 | |
| 
 | |
|   if( sqlite3FaultSim(201) ) return SQLITE_IOERR_READ;
 | |
|   if( pReadr->aMap ){
 | |
|     sqlite3OsUnfetch(pReadr->pFd, 0, pReadr->aMap);
 | |
|     pReadr->aMap = 0;
 | |
|   }
 | |
|   pReadr->iReadOff = iOff;
 | |
|   pReadr->iEof = pFile->iEof;
 | |
|   pReadr->pFd = pFile->pFd;
 | |
| 
 | |
|   rc = vdbeSorterMapFile(pTask, pFile, &pReadr->aMap);
 | |
|   if( rc==SQLITE_OK && pReadr->aMap==0 ){
 | |
|     int pgsz = pTask->pSorter->pgsz;
 | |
|     int iBuf = pReadr->iReadOff % pgsz;
 | |
|     if( pReadr->aBuffer==0 ){
 | |
|       pReadr->aBuffer = (u8*)sqlite3Malloc(pgsz);
 | |
|       if( pReadr->aBuffer==0 ) rc = SQLITE_NOMEM_BKPT;
 | |
|       pReadr->nBuffer = pgsz;
 | |
|     }
 | |
|     if( rc==SQLITE_OK && iBuf ){
 | |
|       int nRead = pgsz - iBuf;
 | |
|       if( (pReadr->iReadOff + nRead) > pReadr->iEof ){
 | |
|         nRead = (int)(pReadr->iEof - pReadr->iReadOff);
 | |
|       }
 | |
|       rc = sqlite3OsRead(
 | |
|           pReadr->pFd, &pReadr->aBuffer[iBuf], nRead, pReadr->iReadOff
 | |
|       );
 | |
|       testcase( rc!=SQLITE_OK );
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Advance PmaReader pReadr to the next key in its PMA. Return SQLITE_OK if
 | |
| ** no error occurs, or an SQLite error code if one does.
 | |
| */
 | |
| static int vdbePmaReaderNext(PmaReader *pReadr){
 | |
|   int rc = SQLITE_OK;             /* Return Code */
 | |
|   u64 nRec = 0;                   /* Size of record in bytes */
 | |
| 
 | |
| 
 | |
|   if( pReadr->iReadOff>=pReadr->iEof ){
 | |
|     IncrMerger *pIncr = pReadr->pIncr;
 | |
|     int bEof = 1;
 | |
|     if( pIncr ){
 | |
|       rc = vdbeIncrSwap(pIncr);
 | |
|       if( rc==SQLITE_OK && pIncr->bEof==0 ){
 | |
|         rc = vdbePmaReaderSeek(
 | |
|             pIncr->pTask, pReadr, &pIncr->aFile[0], pIncr->iStartOff
 | |
|         );
 | |
|         bEof = 0;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if( bEof ){
 | |
|       /* This is an EOF condition */
 | |
|       vdbePmaReaderClear(pReadr);
 | |
|       testcase( rc!=SQLITE_OK );
 | |
|       return rc;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if( rc==SQLITE_OK ){
 | |
|     rc = vdbePmaReadVarint(pReadr, &nRec);
 | |
|   }
 | |
|   if( rc==SQLITE_OK ){
 | |
|     pReadr->nKey = (int)nRec;
 | |
|     rc = vdbePmaReadBlob(pReadr, (int)nRec, &pReadr->aKey);
 | |
|     testcase( rc!=SQLITE_OK );
 | |
|   }
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Initialize PmaReader pReadr to scan through the PMA stored in file pFile
 | |
| ** starting at offset iStart and ending at offset iEof-1. This function 
 | |
| ** leaves the PmaReader pointing to the first key in the PMA (or EOF if the 
 | |
| ** PMA is empty).
 | |
| **
 | |
| ** If the pnByte parameter is NULL, then it is assumed that the file 
 | |
| ** contains a single PMA, and that that PMA omits the initial length varint.
 | |
| */
 | |
| static int vdbePmaReaderInit(
 | |
|   SortSubtask *pTask,             /* Task context */
 | |
|   SorterFile *pFile,              /* Sorter file to read from */
 | |
|   i64 iStart,                     /* Start offset in pFile */
 | |
|   PmaReader *pReadr,              /* PmaReader to populate */
 | |
|   i64 *pnByte                     /* IN/OUT: Increment this value by PMA size */
 | |
| ){
 | |
|   int rc;
 | |
| 
 | |
|   assert( pFile->iEof>iStart );
 | |
|   assert( pReadr->aAlloc==0 && pReadr->nAlloc==0 );
 | |
|   assert( pReadr->aBuffer==0 );
 | |
|   assert( pReadr->aMap==0 );
 | |
| 
 | |
|   rc = vdbePmaReaderSeek(pTask, pReadr, pFile, iStart);
 | |
|   if( rc==SQLITE_OK ){
 | |
|     u64 nByte = 0;                 /* Size of PMA in bytes */
 | |
|     rc = vdbePmaReadVarint(pReadr, &nByte);
 | |
|     pReadr->iEof = pReadr->iReadOff + nByte;
 | |
|     *pnByte += nByte;
 | |
|   }
 | |
| 
 | |
|   if( rc==SQLITE_OK ){
 | |
|     rc = vdbePmaReaderNext(pReadr);
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** A version of vdbeSorterCompare() that assumes that it has already been
 | |
| ** determined that the first field of key1 is equal to the first field of 
 | |
| ** key2.
 | |
| */
 | |
| static int vdbeSorterCompareTail(
 | |
|   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
 | |
|   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
 | |
|   const void *pKey1, int nKey1,   /* Left side of comparison */
 | |
|   const void *pKey2, int nKey2    /* Right side of comparison */
 | |
| ){
 | |
|   UnpackedRecord *r2 = pTask->pUnpacked;
 | |
|   if( *pbKey2Cached==0 ){
 | |
|     sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
 | |
|     *pbKey2Cached = 1;
 | |
|   }
 | |
|   return sqlite3VdbeRecordCompareWithSkip(nKey1, pKey1, r2, 1);
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Compare key1 (buffer pKey1, size nKey1 bytes) with key2 (buffer pKey2, 
 | |
| ** size nKey2 bytes). Use (pTask->pKeyInfo) for the collation sequences
 | |
| ** used by the comparison. Return the result of the comparison.
 | |
| **
 | |
| ** If IN/OUT parameter *pbKey2Cached is true when this function is called,
 | |
| ** it is assumed that (pTask->pUnpacked) contains the unpacked version
 | |
| ** of key2. If it is false, (pTask->pUnpacked) is populated with the unpacked
 | |
| ** version of key2 and *pbKey2Cached set to true before returning.
 | |
| **
 | |
| ** If an OOM error is encountered, (pTask->pUnpacked->error_rc) is set
 | |
| ** to SQLITE_NOMEM.
 | |
| */
 | |
| static int vdbeSorterCompare(
 | |
|   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
 | |
|   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
 | |
|   const void *pKey1, int nKey1,   /* Left side of comparison */
 | |
|   const void *pKey2, int nKey2    /* Right side of comparison */
 | |
| ){
 | |
|   UnpackedRecord *r2 = pTask->pUnpacked;
 | |
|   if( !*pbKey2Cached ){
 | |
|     sqlite3VdbeRecordUnpack(pTask->pSorter->pKeyInfo, nKey2, pKey2, r2);
 | |
|     *pbKey2Cached = 1;
 | |
|   }
 | |
|   return sqlite3VdbeRecordCompare(nKey1, pKey1, r2);
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** A specially optimized version of vdbeSorterCompare() that assumes that
 | |
| ** the first field of each key is a TEXT value and that the collation
 | |
| ** sequence to compare them with is BINARY.
 | |
| */
 | |
| static int vdbeSorterCompareText(
 | |
|   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
 | |
|   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
 | |
|   const void *pKey1, int nKey1,   /* Left side of comparison */
 | |
|   const void *pKey2, int nKey2    /* Right side of comparison */
 | |
| ){
 | |
|   const u8 * const p1 = (const u8 * const)pKey1;
 | |
|   const u8 * const p2 = (const u8 * const)pKey2;
 | |
|   const u8 * const v1 = &p1[ p1[0] ];   /* Pointer to value 1 */
 | |
|   const u8 * const v2 = &p2[ p2[0] ];   /* Pointer to value 2 */
 | |
| 
 | |
|   int n1;
 | |
|   int n2;
 | |
|   int res;
 | |
| 
 | |
|   getVarint32NR(&p1[1], n1);
 | |
|   getVarint32NR(&p2[1], n2);
 | |
|   res = memcmp(v1, v2, (MIN(n1, n2) - 13)/2);
 | |
|   if( res==0 ){
 | |
|     res = n1 - n2;
 | |
|   }
 | |
| 
 | |
|   if( res==0 ){
 | |
|     if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
 | |
|       res = vdbeSorterCompareTail(
 | |
|           pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
 | |
|       );
 | |
|     }
 | |
|   }else{
 | |
|     assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
 | |
|     if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
 | |
|       res = res * -1;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** A specially optimized version of vdbeSorterCompare() that assumes that
 | |
| ** the first field of each key is an INTEGER value.
 | |
| */
 | |
| static int vdbeSorterCompareInt(
 | |
|   SortSubtask *pTask,             /* Subtask context (for pKeyInfo) */
 | |
|   int *pbKey2Cached,              /* True if pTask->pUnpacked is pKey2 */
 | |
|   const void *pKey1, int nKey1,   /* Left side of comparison */
 | |
|   const void *pKey2, int nKey2    /* Right side of comparison */
 | |
| ){
 | |
|   const u8 * const p1 = (const u8 * const)pKey1;
 | |
|   const u8 * const p2 = (const u8 * const)pKey2;
 | |
|   const int s1 = p1[1];                 /* Left hand serial type */
 | |
|   const int s2 = p2[1];                 /* Right hand serial type */
 | |
|   const u8 * const v1 = &p1[ p1[0] ];   /* Pointer to value 1 */
 | |
|   const u8 * const v2 = &p2[ p2[0] ];   /* Pointer to value 2 */
 | |
|   int res;                              /* Return value */
 | |
| 
 | |
|   assert( (s1>0 && s1<7) || s1==8 || s1==9 );
 | |
|   assert( (s2>0 && s2<7) || s2==8 || s2==9 );
 | |
| 
 | |
|   if( s1==s2 ){
 | |
|     /* The two values have the same sign. Compare using memcmp(). */
 | |
|     static const u8 aLen[] = {0, 1, 2, 3, 4, 6, 8, 0, 0, 0 };
 | |
|     const u8 n = aLen[s1];
 | |
|     int i;
 | |
|     res = 0;
 | |
|     for(i=0; i<n; i++){
 | |
|       if( (res = v1[i] - v2[i])!=0 ){
 | |
|         if( ((v1[0] ^ v2[0]) & 0x80)!=0 ){
 | |
|           res = v1[0] & 0x80 ? -1 : +1;
 | |
|         }
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
|   }else if( s1>7 && s2>7 ){
 | |
|     res = s1 - s2;
 | |
|   }else{
 | |
|     if( s2>7 ){
 | |
|       res = +1;
 | |
|     }else if( s1>7 ){
 | |
|       res = -1;
 | |
|     }else{
 | |
|       res = s1 - s2;
 | |
|     }
 | |
|     assert( res!=0 );
 | |
| 
 | |
|     if( res>0 ){
 | |
|       if( *v1 & 0x80 ) res = -1;
 | |
|     }else{
 | |
|       if( *v2 & 0x80 ) res = +1;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if( res==0 ){
 | |
|     if( pTask->pSorter->pKeyInfo->nKeyField>1 ){
 | |
|       res = vdbeSorterCompareTail(
 | |
|           pTask, pbKey2Cached, pKey1, nKey1, pKey2, nKey2
 | |
|       );
 | |
|     }
 | |
|   }else if( pTask->pSorter->pKeyInfo->aSortFlags[0] ){
 | |
|     assert( !(pTask->pSorter->pKeyInfo->aSortFlags[0]&KEYINFO_ORDER_BIGNULL) );
 | |
|     res = res * -1;
 | |
|   }
 | |
| 
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Initialize the temporary index cursor just opened as a sorter cursor.
 | |
| **
 | |
| ** Usually, the sorter module uses the value of (pCsr->pKeyInfo->nKeyField)
 | |
| ** to determine the number of fields that should be compared from the
 | |
| ** records being sorted. However, if the value passed as argument nField
 | |
| ** is non-zero and the sorter is able to guarantee a stable sort, nField
 | |
| ** is used instead. This is used when sorting records for a CREATE INDEX
 | |
| ** statement. In this case, keys are always delivered to the sorter in
 | |
| ** order of the primary key, which happens to be make up the final part 
 | |
| ** of the records being sorted. So if the sort is stable, there is never
 | |
| ** any reason to compare PK fields and they can be ignored for a small
 | |
| ** performance boost.
 | |
| **
 | |
| ** The sorter can guarantee a stable sort when running in single-threaded
 | |
| ** mode, but not in multi-threaded mode.
 | |
| **
 | |
| ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
 | |
| */
 | |
| int sqlite3VdbeSorterInit(
 | |
|   sqlite3 *db,                    /* Database connection (for malloc()) */
 | |
|   int nField,                     /* Number of key fields in each record */
 | |
|   VdbeCursor *pCsr                /* Cursor that holds the new sorter */
 | |
| ){
 | |
|   int pgsz;                       /* Page size of main database */
 | |
|   int i;                          /* Used to iterate through aTask[] */
 | |
|   VdbeSorter *pSorter;            /* The new sorter */
 | |
|   KeyInfo *pKeyInfo;              /* Copy of pCsr->pKeyInfo with db==0 */
 | |
|   int szKeyInfo;                  /* Size of pCsr->pKeyInfo in bytes */
 | |
|   int sz;                         /* Size of pSorter in bytes */
 | |
|   int rc = SQLITE_OK;
 | |
| #if SQLITE_MAX_WORKER_THREADS==0
 | |
| # define nWorker 0
 | |
| #else
 | |
|   int nWorker;
 | |
| #endif
 | |
| 
 | |
|   /* Initialize the upper limit on the number of worker threads */
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|   if( sqlite3TempInMemory(db) || sqlite3GlobalConfig.bCoreMutex==0 ){
 | |
|     nWorker = 0;
 | |
|   }else{
 | |
|     nWorker = db->aLimit[SQLITE_LIMIT_WORKER_THREADS];
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   /* Do not allow the total number of threads (main thread + all workers)
 | |
|   ** to exceed the maximum merge count */
 | |
| #if SQLITE_MAX_WORKER_THREADS>=SORTER_MAX_MERGE_COUNT
 | |
|   if( nWorker>=SORTER_MAX_MERGE_COUNT ){
 | |
|     nWorker = SORTER_MAX_MERGE_COUNT-1;
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   assert( pCsr->pKeyInfo && pCsr->pBtx==0 );
 | |
|   assert( pCsr->eCurType==CURTYPE_SORTER );
 | |
|   szKeyInfo = sizeof(KeyInfo) + (pCsr->pKeyInfo->nKeyField-1)*sizeof(CollSeq*);
 | |
|   sz = sizeof(VdbeSorter) + nWorker * sizeof(SortSubtask);
 | |
| 
 | |
|   pSorter = (VdbeSorter*)sqlite3DbMallocZero(db, sz + szKeyInfo);
 | |
|   pCsr->uc.pSorter = pSorter;
 | |
|   if( pSorter==0 ){
 | |
|     rc = SQLITE_NOMEM_BKPT;
 | |
|   }else{
 | |
|     Btree *pBt = db->aDb[0].pBt;
 | |
|     pSorter->pKeyInfo = pKeyInfo = (KeyInfo*)((u8*)pSorter + sz);
 | |
|     memcpy(pKeyInfo, pCsr->pKeyInfo, szKeyInfo);
 | |
|     pKeyInfo->db = 0;
 | |
|     if( nField && nWorker==0 ){
 | |
|       pKeyInfo->nKeyField = nField;
 | |
|     }
 | |
|     sqlite3BtreeEnter(pBt);
 | |
|     pSorter->pgsz = pgsz = sqlite3BtreeGetPageSize(pBt);
 | |
|     sqlite3BtreeLeave(pBt);
 | |
|     pSorter->nTask = nWorker + 1;
 | |
|     pSorter->iPrev = (u8)(nWorker - 1);
 | |
|     pSorter->bUseThreads = (pSorter->nTask>1);
 | |
|     pSorter->db = db;
 | |
|     for(i=0; i<pSorter->nTask; i++){
 | |
|       SortSubtask *pTask = &pSorter->aTask[i];
 | |
|       pTask->pSorter = pSorter;
 | |
|     }
 | |
| 
 | |
|     if( !sqlite3TempInMemory(db) ){
 | |
|       i64 mxCache;                /* Cache size in bytes*/
 | |
|       u32 szPma = sqlite3GlobalConfig.szPma;
 | |
|       pSorter->mnPmaSize = szPma * pgsz;
 | |
| 
 | |
|       mxCache = db->aDb[0].pSchema->cache_size;
 | |
|       if( mxCache<0 ){
 | |
|         /* A negative cache-size value C indicates that the cache is abs(C)
 | |
|         ** KiB in size.  */
 | |
|         mxCache = mxCache * -1024;
 | |
|       }else{
 | |
|         mxCache = mxCache * pgsz;
 | |
|       }
 | |
|       mxCache = MIN(mxCache, SQLITE_MAX_PMASZ);
 | |
|       pSorter->mxPmaSize = MAX(pSorter->mnPmaSize, (int)mxCache);
 | |
| 
 | |
|       /* Avoid large memory allocations if the application has requested
 | |
|       ** SQLITE_CONFIG_SMALL_MALLOC. */
 | |
|       if( sqlite3GlobalConfig.bSmallMalloc==0 ){
 | |
|         assert( pSorter->iMemory==0 );
 | |
|         pSorter->nMemory = pgsz;
 | |
|         pSorter->list.aMemory = (u8*)sqlite3Malloc(pgsz);
 | |
|         if( !pSorter->list.aMemory ) rc = SQLITE_NOMEM_BKPT;
 | |
|       }
 | |
|     }
 | |
| 
 | |
|     if( pKeyInfo->nAllField<13 
 | |
|      && (pKeyInfo->aColl[0]==0 || pKeyInfo->aColl[0]==db->pDfltColl)
 | |
|      && (pKeyInfo->aSortFlags[0] & KEYINFO_ORDER_BIGNULL)==0
 | |
|     ){
 | |
|       pSorter->typeMask = SORTER_TYPE_INTEGER | SORTER_TYPE_TEXT;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| #undef nWorker   /* Defined at the top of this function */
 | |
| 
 | |
| /*
 | |
| ** Free the list of sorted records starting at pRecord.
 | |
| */
 | |
| static void vdbeSorterRecordFree(sqlite3 *db, SorterRecord *pRecord){
 | |
|   SorterRecord *p;
 | |
|   SorterRecord *pNext;
 | |
|   for(p=pRecord; p; p=pNext){
 | |
|     pNext = p->u.pNext;
 | |
|     sqlite3DbFree(db, p);
 | |
|   }
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Free all resources owned by the object indicated by argument pTask. All 
 | |
| ** fields of *pTask are zeroed before returning.
 | |
| */
 | |
| static void vdbeSortSubtaskCleanup(sqlite3 *db, SortSubtask *pTask){
 | |
|   sqlite3DbFree(db, pTask->pUnpacked);
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|   /* pTask->list.aMemory can only be non-zero if it was handed memory
 | |
|   ** from the main thread.  That only occurs SQLITE_MAX_WORKER_THREADS>0 */
 | |
|   if( pTask->list.aMemory ){
 | |
|     sqlite3_free(pTask->list.aMemory);
 | |
|   }else
 | |
| #endif
 | |
|   {
 | |
|     assert( pTask->list.aMemory==0 );
 | |
|     vdbeSorterRecordFree(0, pTask->list.pList);
 | |
|   }
 | |
|   if( pTask->file.pFd ){
 | |
|     sqlite3OsCloseFree(pTask->file.pFd);
 | |
|   }
 | |
|   if( pTask->file2.pFd ){
 | |
|     sqlite3OsCloseFree(pTask->file2.pFd);
 | |
|   }
 | |
|   memset(pTask, 0, sizeof(SortSubtask));
 | |
| }
 | |
| 
 | |
| #ifdef SQLITE_DEBUG_SORTER_THREADS
 | |
| static void vdbeSorterWorkDebug(SortSubtask *pTask, const char *zEvent){
 | |
|   i64 t;
 | |
|   int iTask = (pTask - pTask->pSorter->aTask);
 | |
|   sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
 | |
|   fprintf(stderr, "%lld:%d %s\n", t, iTask, zEvent);
 | |
| }
 | |
| static void vdbeSorterRewindDebug(const char *zEvent){
 | |
|   i64 t;
 | |
|   sqlite3OsCurrentTimeInt64(sqlite3_vfs_find(0), &t);
 | |
|   fprintf(stderr, "%lld:X %s\n", t, zEvent);
 | |
| }
 | |
| static void vdbeSorterPopulateDebug(
 | |
|   SortSubtask *pTask,
 | |
|   const char *zEvent
 | |
| ){
 | |
|   i64 t;
 | |
|   int iTask = (pTask - pTask->pSorter->aTask);
 | |
|   sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
 | |
|   fprintf(stderr, "%lld:bg%d %s\n", t, iTask, zEvent);
 | |
| }
 | |
| static void vdbeSorterBlockDebug(
 | |
|   SortSubtask *pTask,
 | |
|   int bBlocked,
 | |
|   const char *zEvent
 | |
| ){
 | |
|   if( bBlocked ){
 | |
|     i64 t;
 | |
|     sqlite3OsCurrentTimeInt64(pTask->pSorter->db->pVfs, &t);
 | |
|     fprintf(stderr, "%lld:main %s\n", t, zEvent);
 | |
|   }
 | |
| }
 | |
| #else
 | |
| # define vdbeSorterWorkDebug(x,y)
 | |
| # define vdbeSorterRewindDebug(y)
 | |
| # define vdbeSorterPopulateDebug(x,y)
 | |
| # define vdbeSorterBlockDebug(x,y,z)
 | |
| #endif
 | |
| 
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
| /*
 | |
| ** Join thread pTask->thread.
 | |
| */
 | |
| static int vdbeSorterJoinThread(SortSubtask *pTask){
 | |
|   int rc = SQLITE_OK;
 | |
|   if( pTask->pThread ){
 | |
| #ifdef SQLITE_DEBUG_SORTER_THREADS
 | |
|     int bDone = pTask->bDone;
 | |
| #endif
 | |
|     void *pRet = SQLITE_INT_TO_PTR(SQLITE_ERROR);
 | |
|     vdbeSorterBlockDebug(pTask, !bDone, "enter");
 | |
|     (void)sqlite3ThreadJoin(pTask->pThread, &pRet);
 | |
|     vdbeSorterBlockDebug(pTask, !bDone, "exit");
 | |
|     rc = SQLITE_PTR_TO_INT(pRet);
 | |
|     assert( pTask->bDone==1 );
 | |
|     pTask->bDone = 0;
 | |
|     pTask->pThread = 0;
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Launch a background thread to run xTask(pIn).
 | |
| */
 | |
| static int vdbeSorterCreateThread(
 | |
|   SortSubtask *pTask,             /* Thread will use this task object */
 | |
|   void *(*xTask)(void*),          /* Routine to run in a separate thread */
 | |
|   void *pIn                       /* Argument passed into xTask() */
 | |
| ){
 | |
|   assert( pTask->pThread==0 && pTask->bDone==0 );
 | |
|   return sqlite3ThreadCreate(&pTask->pThread, xTask, pIn);
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Join all outstanding threads launched by SorterWrite() to create 
 | |
| ** level-0 PMAs.
 | |
| */
 | |
| static int vdbeSorterJoinAll(VdbeSorter *pSorter, int rcin){
 | |
|   int rc = rcin;
 | |
|   int i;
 | |
| 
 | |
|   /* This function is always called by the main user thread.
 | |
|   **
 | |
|   ** If this function is being called after SorterRewind() has been called, 
 | |
|   ** it is possible that thread pSorter->aTask[pSorter->nTask-1].pThread
 | |
|   ** is currently attempt to join one of the other threads. To avoid a race
 | |
|   ** condition where this thread also attempts to join the same object, join 
 | |
|   ** thread pSorter->aTask[pSorter->nTask-1].pThread first. */
 | |
|   for(i=pSorter->nTask-1; i>=0; i--){
 | |
|     SortSubtask *pTask = &pSorter->aTask[i];
 | |
|     int rc2 = vdbeSorterJoinThread(pTask);
 | |
|     if( rc==SQLITE_OK ) rc = rc2;
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| #else
 | |
| # define vdbeSorterJoinAll(x,rcin) (rcin)
 | |
| # define vdbeSorterJoinThread(pTask) SQLITE_OK
 | |
| #endif
 | |
| 
 | |
| /*
 | |
| ** Allocate a new MergeEngine object capable of handling up to
 | |
| ** nReader PmaReader inputs.
 | |
| **
 | |
| ** nReader is automatically rounded up to the next power of two.
 | |
| ** nReader may not exceed SORTER_MAX_MERGE_COUNT even after rounding up.
 | |
| */
 | |
| static MergeEngine *vdbeMergeEngineNew(int nReader){
 | |
|   int N = 2;                      /* Smallest power of two >= nReader */
 | |
|   int nByte;                      /* Total bytes of space to allocate */
 | |
|   MergeEngine *pNew;              /* Pointer to allocated object to return */
 | |
| 
 | |
|   assert( nReader<=SORTER_MAX_MERGE_COUNT );
 | |
| 
 | |
|   while( N<nReader ) N += N;
 | |
|   nByte = sizeof(MergeEngine) + N * (sizeof(int) + sizeof(PmaReader));
 | |
| 
 | |
|   pNew = sqlite3FaultSim(100) ? 0 : (MergeEngine*)sqlite3MallocZero(nByte);
 | |
|   if( pNew ){
 | |
|     pNew->nTree = N;
 | |
|     pNew->pTask = 0;
 | |
|     pNew->aReadr = (PmaReader*)&pNew[1];
 | |
|     pNew->aTree = (int*)&pNew->aReadr[N];
 | |
|   }
 | |
|   return pNew;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Free the MergeEngine object passed as the only argument.
 | |
| */
 | |
| static void vdbeMergeEngineFree(MergeEngine *pMerger){
 | |
|   int i;
 | |
|   if( pMerger ){
 | |
|     for(i=0; i<pMerger->nTree; i++){
 | |
|       vdbePmaReaderClear(&pMerger->aReadr[i]);
 | |
|     }
 | |
|   }
 | |
|   sqlite3_free(pMerger);
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Free all resources associated with the IncrMerger object indicated by
 | |
| ** the first argument.
 | |
| */
 | |
| static void vdbeIncrFree(IncrMerger *pIncr){
 | |
|   if( pIncr ){
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|     if( pIncr->bUseThread ){
 | |
|       vdbeSorterJoinThread(pIncr->pTask);
 | |
|       if( pIncr->aFile[0].pFd ) sqlite3OsCloseFree(pIncr->aFile[0].pFd);
 | |
|       if( pIncr->aFile[1].pFd ) sqlite3OsCloseFree(pIncr->aFile[1].pFd);
 | |
|     }
 | |
| #endif
 | |
|     vdbeMergeEngineFree(pIncr->pMerger);
 | |
|     sqlite3_free(pIncr);
 | |
|   }
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Reset a sorting cursor back to its original empty state.
 | |
| */
 | |
| void sqlite3VdbeSorterReset(sqlite3 *db, VdbeSorter *pSorter){
 | |
|   int i;
 | |
|   (void)vdbeSorterJoinAll(pSorter, SQLITE_OK);
 | |
|   assert( pSorter->bUseThreads || pSorter->pReader==0 );
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|   if( pSorter->pReader ){
 | |
|     vdbePmaReaderClear(pSorter->pReader);
 | |
|     sqlite3DbFree(db, pSorter->pReader);
 | |
|     pSorter->pReader = 0;
 | |
|   }
 | |
| #endif
 | |
|   vdbeMergeEngineFree(pSorter->pMerger);
 | |
|   pSorter->pMerger = 0;
 | |
|   for(i=0; i<pSorter->nTask; i++){
 | |
|     SortSubtask *pTask = &pSorter->aTask[i];
 | |
|     vdbeSortSubtaskCleanup(db, pTask);
 | |
|     pTask->pSorter = pSorter;
 | |
|   }
 | |
|   if( pSorter->list.aMemory==0 ){
 | |
|     vdbeSorterRecordFree(0, pSorter->list.pList);
 | |
|   }
 | |
|   pSorter->list.pList = 0;
 | |
|   pSorter->list.szPMA = 0;
 | |
|   pSorter->bUsePMA = 0;
 | |
|   pSorter->iMemory = 0;
 | |
|   pSorter->mxKeysize = 0;
 | |
|   sqlite3DbFree(db, pSorter->pUnpacked);
 | |
|   pSorter->pUnpacked = 0;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Free any cursor components allocated by sqlite3VdbeSorterXXX routines.
 | |
| */
 | |
| void sqlite3VdbeSorterClose(sqlite3 *db, VdbeCursor *pCsr){
 | |
|   VdbeSorter *pSorter;
 | |
|   assert( pCsr->eCurType==CURTYPE_SORTER );
 | |
|   pSorter = pCsr->uc.pSorter;
 | |
|   if( pSorter ){
 | |
|     sqlite3VdbeSorterReset(db, pSorter);
 | |
|     sqlite3_free(pSorter->list.aMemory);
 | |
|     sqlite3DbFree(db, pSorter);
 | |
|     pCsr->uc.pSorter = 0;
 | |
|   }
 | |
| }
 | |
| 
 | |
| #if SQLITE_MAX_MMAP_SIZE>0
 | |
| /*
 | |
| ** The first argument is a file-handle open on a temporary file. The file
 | |
| ** is guaranteed to be nByte bytes or smaller in size. This function
 | |
| ** attempts to extend the file to nByte bytes in size and to ensure that
 | |
| ** the VFS has memory mapped it.
 | |
| **
 | |
| ** Whether or not the file does end up memory mapped of course depends on
 | |
| ** the specific VFS implementation.
 | |
| */
 | |
| static void vdbeSorterExtendFile(sqlite3 *db, sqlite3_file *pFd, i64 nByte){
 | |
|   if( nByte<=(i64)(db->nMaxSorterMmap) && pFd->pMethods->iVersion>=3 ){
 | |
|     void *p = 0;
 | |
|     int chunksize = 4*1024;
 | |
|     sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_CHUNK_SIZE, &chunksize);
 | |
|     sqlite3OsFileControlHint(pFd, SQLITE_FCNTL_SIZE_HINT, &nByte);
 | |
|     sqlite3OsFetch(pFd, 0, (int)nByte, &p);
 | |
|     sqlite3OsUnfetch(pFd, 0, p);
 | |
|   }
 | |
| }
 | |
| #else
 | |
| # define vdbeSorterExtendFile(x,y,z)
 | |
| #endif
 | |
| 
 | |
| /*
 | |
| ** Allocate space for a file-handle and open a temporary file. If successful,
 | |
| ** set *ppFd to point to the malloc'd file-handle and return SQLITE_OK.
 | |
| ** Otherwise, set *ppFd to 0 and return an SQLite error code.
 | |
| */
 | |
| static int vdbeSorterOpenTempFile(
 | |
|   sqlite3 *db,                    /* Database handle doing sort */
 | |
|   i64 nExtend,                    /* Attempt to extend file to this size */
 | |
|   sqlite3_file **ppFd
 | |
| ){
 | |
|   int rc;
 | |
|   if( sqlite3FaultSim(202) ) return SQLITE_IOERR_ACCESS;
 | |
|   rc = sqlite3OsOpenMalloc(db->pVfs, 0, ppFd,
 | |
|       SQLITE_OPEN_TEMP_JOURNAL |
 | |
|       SQLITE_OPEN_READWRITE    | SQLITE_OPEN_CREATE |
 | |
|       SQLITE_OPEN_EXCLUSIVE    | SQLITE_OPEN_DELETEONCLOSE, &rc
 | |
|   );
 | |
|   if( rc==SQLITE_OK ){
 | |
|     i64 max = SQLITE_MAX_MMAP_SIZE;
 | |
|     sqlite3OsFileControlHint(*ppFd, SQLITE_FCNTL_MMAP_SIZE, (void*)&max);
 | |
|     if( nExtend>0 ){
 | |
|       vdbeSorterExtendFile(db, *ppFd, nExtend);
 | |
|     }
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** If it has not already been allocated, allocate the UnpackedRecord 
 | |
| ** structure at pTask->pUnpacked. Return SQLITE_OK if successful (or 
 | |
| ** if no allocation was required), or SQLITE_NOMEM otherwise.
 | |
| */
 | |
| static int vdbeSortAllocUnpacked(SortSubtask *pTask){
 | |
|   if( pTask->pUnpacked==0 ){
 | |
|     pTask->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pTask->pSorter->pKeyInfo);
 | |
|     if( pTask->pUnpacked==0 ) return SQLITE_NOMEM_BKPT;
 | |
|     pTask->pUnpacked->nField = pTask->pSorter->pKeyInfo->nKeyField;
 | |
|     pTask->pUnpacked->errCode = 0;
 | |
|   }
 | |
|   return SQLITE_OK;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
| ** Merge the two sorted lists p1 and p2 into a single list.
 | |
| */
 | |
| static SorterRecord *vdbeSorterMerge(
 | |
|   SortSubtask *pTask,             /* Calling thread context */
 | |
|   SorterRecord *p1,               /* First list to merge */
 | |
|   SorterRecord *p2                /* Second list to merge */
 | |
| ){
 | |
|   SorterRecord *pFinal = 0;
 | |
|   SorterRecord **pp = &pFinal;
 | |
|   int bCached = 0;
 | |
| 
 | |
|   assert( p1!=0 && p2!=0 );
 | |
|   for(;;){
 | |
|     int res;
 | |
|     res = pTask->xCompare(
 | |
|         pTask, &bCached, SRVAL(p1), p1->nVal, SRVAL(p2), p2->nVal
 | |
|     );
 | |
| 
 | |
|     if( res<=0 ){
 | |
|       *pp = p1;
 | |
|       pp = &p1->u.pNext;
 | |
|       p1 = p1->u.pNext;
 | |
|       if( p1==0 ){
 | |
|         *pp = p2;
 | |
|         break;
 | |
|       }
 | |
|     }else{
 | |
|       *pp = p2;
 | |
|       pp = &p2->u.pNext;
 | |
|       p2 = p2->u.pNext;
 | |
|       bCached = 0;
 | |
|       if( p2==0 ){
 | |
|         *pp = p1;
 | |
|         break;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
|   return pFinal;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Return the SorterCompare function to compare values collected by the
 | |
| ** sorter object passed as the only argument.
 | |
| */
 | |
| static SorterCompare vdbeSorterGetCompare(VdbeSorter *p){
 | |
|   if( p->typeMask==SORTER_TYPE_INTEGER ){
 | |
|     return vdbeSorterCompareInt;
 | |
|   }else if( p->typeMask==SORTER_TYPE_TEXT ){
 | |
|     return vdbeSorterCompareText; 
 | |
|   }
 | |
|   return vdbeSorterCompare;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Sort the linked list of records headed at pTask->pList. Return 
 | |
| ** SQLITE_OK if successful, or an SQLite error code (i.e. SQLITE_NOMEM) if 
 | |
| ** an error occurs.
 | |
| */
 | |
| static int vdbeSorterSort(SortSubtask *pTask, SorterList *pList){
 | |
|   int i;
 | |
|   SorterRecord *p;
 | |
|   int rc;
 | |
|   SorterRecord *aSlot[64];
 | |
| 
 | |
|   rc = vdbeSortAllocUnpacked(pTask);
 | |
|   if( rc!=SQLITE_OK ) return rc;
 | |
| 
 | |
|   p = pList->pList;
 | |
|   pTask->xCompare = vdbeSorterGetCompare(pTask->pSorter);
 | |
|   memset(aSlot, 0, sizeof(aSlot));
 | |
| 
 | |
|   while( p ){
 | |
|     SorterRecord *pNext;
 | |
|     if( pList->aMemory ){
 | |
|       if( (u8*)p==pList->aMemory ){
 | |
|         pNext = 0;
 | |
|       }else{
 | |
|         assert( p->u.iNext<sqlite3MallocSize(pList->aMemory) );
 | |
|         pNext = (SorterRecord*)&pList->aMemory[p->u.iNext];
 | |
|       }
 | |
|     }else{
 | |
|       pNext = p->u.pNext;
 | |
|     }
 | |
| 
 | |
|     p->u.pNext = 0;
 | |
|     for(i=0; aSlot[i]; i++){
 | |
|       p = vdbeSorterMerge(pTask, p, aSlot[i]);
 | |
|       aSlot[i] = 0;
 | |
|     }
 | |
|     aSlot[i] = p;
 | |
|     p = pNext;
 | |
|   }
 | |
| 
 | |
|   p = 0;
 | |
|   for(i=0; i<ArraySize(aSlot); i++){
 | |
|     if( aSlot[i]==0 ) continue;
 | |
|     p = p ? vdbeSorterMerge(pTask, p, aSlot[i]) : aSlot[i];
 | |
|   }
 | |
|   pList->pList = p;
 | |
| 
 | |
|   assert( pTask->pUnpacked->errCode==SQLITE_OK 
 | |
|        || pTask->pUnpacked->errCode==SQLITE_NOMEM 
 | |
|   );
 | |
|   return pTask->pUnpacked->errCode;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Initialize a PMA-writer object.
 | |
| */
 | |
| static void vdbePmaWriterInit(
 | |
|   sqlite3_file *pFd,              /* File handle to write to */
 | |
|   PmaWriter *p,                   /* Object to populate */
 | |
|   int nBuf,                       /* Buffer size */
 | |
|   i64 iStart                      /* Offset of pFd to begin writing at */
 | |
| ){
 | |
|   memset(p, 0, sizeof(PmaWriter));
 | |
|   p->aBuffer = (u8*)sqlite3Malloc(nBuf);
 | |
|   if( !p->aBuffer ){
 | |
|     p->eFWErr = SQLITE_NOMEM_BKPT;
 | |
|   }else{
 | |
|     p->iBufEnd = p->iBufStart = (iStart % nBuf);
 | |
|     p->iWriteOff = iStart - p->iBufStart;
 | |
|     p->nBuffer = nBuf;
 | |
|     p->pFd = pFd;
 | |
|   }
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Write nData bytes of data to the PMA. Return SQLITE_OK
 | |
| ** if successful, or an SQLite error code if an error occurs.
 | |
| */
 | |
| static void vdbePmaWriteBlob(PmaWriter *p, u8 *pData, int nData){
 | |
|   int nRem = nData;
 | |
|   while( nRem>0 && p->eFWErr==0 ){
 | |
|     int nCopy = nRem;
 | |
|     if( nCopy>(p->nBuffer - p->iBufEnd) ){
 | |
|       nCopy = p->nBuffer - p->iBufEnd;
 | |
|     }
 | |
| 
 | |
|     memcpy(&p->aBuffer[p->iBufEnd], &pData[nData-nRem], nCopy);
 | |
|     p->iBufEnd += nCopy;
 | |
|     if( p->iBufEnd==p->nBuffer ){
 | |
|       p->eFWErr = sqlite3OsWrite(p->pFd, 
 | |
|           &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, 
 | |
|           p->iWriteOff + p->iBufStart
 | |
|       );
 | |
|       p->iBufStart = p->iBufEnd = 0;
 | |
|       p->iWriteOff += p->nBuffer;
 | |
|     }
 | |
|     assert( p->iBufEnd<p->nBuffer );
 | |
| 
 | |
|     nRem -= nCopy;
 | |
|   }
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Flush any buffered data to disk and clean up the PMA-writer object.
 | |
| ** The results of using the PMA-writer after this call are undefined.
 | |
| ** Return SQLITE_OK if flushing the buffered data succeeds or is not 
 | |
| ** required. Otherwise, return an SQLite error code.
 | |
| **
 | |
| ** Before returning, set *piEof to the offset immediately following the
 | |
| ** last byte written to the file.
 | |
| */
 | |
| static int vdbePmaWriterFinish(PmaWriter *p, i64 *piEof){
 | |
|   int rc;
 | |
|   if( p->eFWErr==0 && ALWAYS(p->aBuffer) && p->iBufEnd>p->iBufStart ){
 | |
|     p->eFWErr = sqlite3OsWrite(p->pFd, 
 | |
|         &p->aBuffer[p->iBufStart], p->iBufEnd - p->iBufStart, 
 | |
|         p->iWriteOff + p->iBufStart
 | |
|     );
 | |
|   }
 | |
|   *piEof = (p->iWriteOff + p->iBufEnd);
 | |
|   sqlite3_free(p->aBuffer);
 | |
|   rc = p->eFWErr;
 | |
|   memset(p, 0, sizeof(PmaWriter));
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Write value iVal encoded as a varint to the PMA. Return 
 | |
| ** SQLITE_OK if successful, or an SQLite error code if an error occurs.
 | |
| */
 | |
| static void vdbePmaWriteVarint(PmaWriter *p, u64 iVal){
 | |
|   int nByte; 
 | |
|   u8 aByte[10];
 | |
|   nByte = sqlite3PutVarint(aByte, iVal);
 | |
|   vdbePmaWriteBlob(p, aByte, nByte);
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Write the current contents of in-memory linked-list pList to a level-0
 | |
| ** PMA in the temp file belonging to sub-task pTask. Return SQLITE_OK if 
 | |
| ** successful, or an SQLite error code otherwise.
 | |
| **
 | |
| ** The format of a PMA is:
 | |
| **
 | |
| **     * A varint. This varint contains the total number of bytes of content
 | |
| **       in the PMA (not including the varint itself).
 | |
| **
 | |
| **     * One or more records packed end-to-end in order of ascending keys. 
 | |
| **       Each record consists of a varint followed by a blob of data (the 
 | |
| **       key). The varint is the number of bytes in the blob of data.
 | |
| */
 | |
| static int vdbeSorterListToPMA(SortSubtask *pTask, SorterList *pList){
 | |
|   sqlite3 *db = pTask->pSorter->db;
 | |
|   int rc = SQLITE_OK;             /* Return code */
 | |
|   PmaWriter writer;               /* Object used to write to the file */
 | |
| 
 | |
| #ifdef SQLITE_DEBUG
 | |
|   /* Set iSz to the expected size of file pTask->file after writing the PMA. 
 | |
|   ** This is used by an assert() statement at the end of this function.  */
 | |
|   i64 iSz = pList->szPMA + sqlite3VarintLen(pList->szPMA) + pTask->file.iEof;
 | |
| #endif
 | |
| 
 | |
|   vdbeSorterWorkDebug(pTask, "enter");
 | |
|   memset(&writer, 0, sizeof(PmaWriter));
 | |
|   assert( pList->szPMA>0 );
 | |
| 
 | |
|   /* If the first temporary PMA file has not been opened, open it now. */
 | |
|   if( pTask->file.pFd==0 ){
 | |
|     rc = vdbeSorterOpenTempFile(db, 0, &pTask->file.pFd);
 | |
|     assert( rc!=SQLITE_OK || pTask->file.pFd );
 | |
|     assert( pTask->file.iEof==0 );
 | |
|     assert( pTask->nPMA==0 );
 | |
|   }
 | |
| 
 | |
|   /* Try to get the file to memory map */
 | |
|   if( rc==SQLITE_OK ){
 | |
|     vdbeSorterExtendFile(db, pTask->file.pFd, pTask->file.iEof+pList->szPMA+9);
 | |
|   }
 | |
| 
 | |
|   /* Sort the list */
 | |
|   if( rc==SQLITE_OK ){
 | |
|     rc = vdbeSorterSort(pTask, pList);
 | |
|   }
 | |
| 
 | |
|   if( rc==SQLITE_OK ){
 | |
|     SorterRecord *p;
 | |
|     SorterRecord *pNext = 0;
 | |
| 
 | |
|     vdbePmaWriterInit(pTask->file.pFd, &writer, pTask->pSorter->pgsz,
 | |
|                       pTask->file.iEof);
 | |
|     pTask->nPMA++;
 | |
|     vdbePmaWriteVarint(&writer, pList->szPMA);
 | |
|     for(p=pList->pList; p; p=pNext){
 | |
|       pNext = p->u.pNext;
 | |
|       vdbePmaWriteVarint(&writer, p->nVal);
 | |
|       vdbePmaWriteBlob(&writer, SRVAL(p), p->nVal);
 | |
|       if( pList->aMemory==0 ) sqlite3_free(p);
 | |
|     }
 | |
|     pList->pList = p;
 | |
|     rc = vdbePmaWriterFinish(&writer, &pTask->file.iEof);
 | |
|   }
 | |
| 
 | |
|   vdbeSorterWorkDebug(pTask, "exit");
 | |
|   assert( rc!=SQLITE_OK || pList->pList==0 );
 | |
|   assert( rc!=SQLITE_OK || pTask->file.iEof==iSz );
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Advance the MergeEngine to its next entry.
 | |
| ** Set *pbEof to true there is no next entry because
 | |
| ** the MergeEngine has reached the end of all its inputs.
 | |
| **
 | |
| ** Return SQLITE_OK if successful or an error code if an error occurs.
 | |
| */
 | |
| static int vdbeMergeEngineStep(
 | |
|   MergeEngine *pMerger,      /* The merge engine to advance to the next row */
 | |
|   int *pbEof                 /* Set TRUE at EOF.  Set false for more content */
 | |
| ){
 | |
|   int rc;
 | |
|   int iPrev = pMerger->aTree[1];/* Index of PmaReader to advance */
 | |
|   SortSubtask *pTask = pMerger->pTask;
 | |
| 
 | |
|   /* Advance the current PmaReader */
 | |
|   rc = vdbePmaReaderNext(&pMerger->aReadr[iPrev]);
 | |
| 
 | |
|   /* Update contents of aTree[] */
 | |
|   if( rc==SQLITE_OK ){
 | |
|     int i;                      /* Index of aTree[] to recalculate */
 | |
|     PmaReader *pReadr1;         /* First PmaReader to compare */
 | |
|     PmaReader *pReadr2;         /* Second PmaReader to compare */
 | |
|     int bCached = 0;
 | |
| 
 | |
|     /* Find the first two PmaReaders to compare. The one that was just
 | |
|     ** advanced (iPrev) and the one next to it in the array.  */
 | |
|     pReadr1 = &pMerger->aReadr[(iPrev & 0xFFFE)];
 | |
|     pReadr2 = &pMerger->aReadr[(iPrev | 0x0001)];
 | |
| 
 | |
|     for(i=(pMerger->nTree+iPrev)/2; i>0; i=i/2){
 | |
|       /* Compare pReadr1 and pReadr2. Store the result in variable iRes. */
 | |
|       int iRes;
 | |
|       if( pReadr1->pFd==0 ){
 | |
|         iRes = +1;
 | |
|       }else if( pReadr2->pFd==0 ){
 | |
|         iRes = -1;
 | |
|       }else{
 | |
|         iRes = pTask->xCompare(pTask, &bCached,
 | |
|             pReadr1->aKey, pReadr1->nKey, pReadr2->aKey, pReadr2->nKey
 | |
|         );
 | |
|       }
 | |
| 
 | |
|       /* If pReadr1 contained the smaller value, set aTree[i] to its index.
 | |
|       ** Then set pReadr2 to the next PmaReader to compare to pReadr1. In this
 | |
|       ** case there is no cache of pReadr2 in pTask->pUnpacked, so set
 | |
|       ** pKey2 to point to the record belonging to pReadr2.
 | |
|       **
 | |
|       ** Alternatively, if pReadr2 contains the smaller of the two values,
 | |
|       ** set aTree[i] to its index and update pReadr1. If vdbeSorterCompare()
 | |
|       ** was actually called above, then pTask->pUnpacked now contains
 | |
|       ** a value equivalent to pReadr2. So set pKey2 to NULL to prevent
 | |
|       ** vdbeSorterCompare() from decoding pReadr2 again.
 | |
|       **
 | |
|       ** If the two values were equal, then the value from the oldest
 | |
|       ** PMA should be considered smaller. The VdbeSorter.aReadr[] array
 | |
|       ** is sorted from oldest to newest, so pReadr1 contains older values
 | |
|       ** than pReadr2 iff (pReadr1<pReadr2).  */
 | |
|       if( iRes<0 || (iRes==0 && pReadr1<pReadr2) ){
 | |
|         pMerger->aTree[i] = (int)(pReadr1 - pMerger->aReadr);
 | |
|         pReadr2 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
 | |
|         bCached = 0;
 | |
|       }else{
 | |
|         if( pReadr1->pFd ) bCached = 0;
 | |
|         pMerger->aTree[i] = (int)(pReadr2 - pMerger->aReadr);
 | |
|         pReadr1 = &pMerger->aReadr[ pMerger->aTree[i ^ 0x0001] ];
 | |
|       }
 | |
|     }
 | |
|     *pbEof = (pMerger->aReadr[pMerger->aTree[1]].pFd==0);
 | |
|   }
 | |
| 
 | |
|   return (rc==SQLITE_OK ? pTask->pUnpacked->errCode : rc);
 | |
| }
 | |
| 
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
| /*
 | |
| ** The main routine for background threads that write level-0 PMAs.
 | |
| */
 | |
| static void *vdbeSorterFlushThread(void *pCtx){
 | |
|   SortSubtask *pTask = (SortSubtask*)pCtx;
 | |
|   int rc;                         /* Return code */
 | |
|   assert( pTask->bDone==0 );
 | |
|   rc = vdbeSorterListToPMA(pTask, &pTask->list);
 | |
|   pTask->bDone = 1;
 | |
|   return SQLITE_INT_TO_PTR(rc);
 | |
| }
 | |
| #endif /* SQLITE_MAX_WORKER_THREADS>0 */
 | |
| 
 | |
| /*
 | |
| ** Flush the current contents of VdbeSorter.list to a new PMA, possibly
 | |
| ** using a background thread.
 | |
| */
 | |
| static int vdbeSorterFlushPMA(VdbeSorter *pSorter){
 | |
| #if SQLITE_MAX_WORKER_THREADS==0
 | |
|   pSorter->bUsePMA = 1;
 | |
|   return vdbeSorterListToPMA(&pSorter->aTask[0], &pSorter->list);
 | |
| #else
 | |
|   int rc = SQLITE_OK;
 | |
|   int i;
 | |
|   SortSubtask *pTask = 0;    /* Thread context used to create new PMA */
 | |
|   int nWorker = (pSorter->nTask-1);
 | |
| 
 | |
|   /* Set the flag to indicate that at least one PMA has been written. 
 | |
|   ** Or will be, anyhow.  */
 | |
|   pSorter->bUsePMA = 1;
 | |
| 
 | |
|   /* Select a sub-task to sort and flush the current list of in-memory
 | |
|   ** records to disk. If the sorter is running in multi-threaded mode,
 | |
|   ** round-robin between the first (pSorter->nTask-1) tasks. Except, if
 | |
|   ** the background thread from a sub-tasks previous turn is still running,
 | |
|   ** skip it. If the first (pSorter->nTask-1) sub-tasks are all still busy,
 | |
|   ** fall back to using the final sub-task. The first (pSorter->nTask-1)
 | |
|   ** sub-tasks are prefered as they use background threads - the final 
 | |
|   ** sub-task uses the main thread. */
 | |
|   for(i=0; i<nWorker; i++){
 | |
|     int iTest = (pSorter->iPrev + i + 1) % nWorker;
 | |
|     pTask = &pSorter->aTask[iTest];
 | |
|     if( pTask->bDone ){
 | |
|       rc = vdbeSorterJoinThread(pTask);
 | |
|     }
 | |
|     if( rc!=SQLITE_OK || pTask->pThread==0 ) break;
 | |
|   }
 | |
| 
 | |
|   if( rc==SQLITE_OK ){
 | |
|     if( i==nWorker ){
 | |
|       /* Use the foreground thread for this operation */
 | |
|       rc = vdbeSorterListToPMA(&pSorter->aTask[nWorker], &pSorter->list);
 | |
|     }else{
 | |
|       /* Launch a background thread for this operation */
 | |
|       u8 *aMem;
 | |
|       void *pCtx;
 | |
| 
 | |
|       assert( pTask!=0 );
 | |
|       assert( pTask->pThread==0 && pTask->bDone==0 );
 | |
|       assert( pTask->list.pList==0 );
 | |
|       assert( pTask->list.aMemory==0 || pSorter->list.aMemory!=0 );
 | |
| 
 | |
|       aMem = pTask->list.aMemory;
 | |
|       pCtx = (void*)pTask;
 | |
|       pSorter->iPrev = (u8)(pTask - pSorter->aTask);
 | |
|       pTask->list = pSorter->list;
 | |
|       pSorter->list.pList = 0;
 | |
|       pSorter->list.szPMA = 0;
 | |
|       if( aMem ){
 | |
|         pSorter->list.aMemory = aMem;
 | |
|         pSorter->nMemory = sqlite3MallocSize(aMem);
 | |
|       }else if( pSorter->list.aMemory ){
 | |
|         pSorter->list.aMemory = sqlite3Malloc(pSorter->nMemory);
 | |
|         if( !pSorter->list.aMemory ) return SQLITE_NOMEM_BKPT;
 | |
|       }
 | |
| 
 | |
|       rc = vdbeSorterCreateThread(pTask, vdbeSorterFlushThread, pCtx);
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return rc;
 | |
| #endif /* SQLITE_MAX_WORKER_THREADS!=0 */
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Add a record to the sorter.
 | |
| */
 | |
| int sqlite3VdbeSorterWrite(
 | |
|   const VdbeCursor *pCsr,         /* Sorter cursor */
 | |
|   Mem *pVal                       /* Memory cell containing record */
 | |
| ){
 | |
|   VdbeSorter *pSorter;
 | |
|   int rc = SQLITE_OK;             /* Return Code */
 | |
|   SorterRecord *pNew;             /* New list element */
 | |
|   int bFlush;                     /* True to flush contents of memory to PMA */
 | |
|   int nReq;                       /* Bytes of memory required */
 | |
|   int nPMA;                       /* Bytes of PMA space required */
 | |
|   int t;                          /* serial type of first record field */
 | |
| 
 | |
|   assert( pCsr->eCurType==CURTYPE_SORTER );
 | |
|   pSorter = pCsr->uc.pSorter;
 | |
|   getVarint32NR((const u8*)&pVal->z[1], t);
 | |
|   if( t>0 && t<10 && t!=7 ){
 | |
|     pSorter->typeMask &= SORTER_TYPE_INTEGER;
 | |
|   }else if( t>10 && (t & 0x01) ){
 | |
|     pSorter->typeMask &= SORTER_TYPE_TEXT;
 | |
|   }else{
 | |
|     pSorter->typeMask = 0;
 | |
|   }
 | |
| 
 | |
|   assert( pSorter );
 | |
| 
 | |
|   /* Figure out whether or not the current contents of memory should be
 | |
|   ** flushed to a PMA before continuing. If so, do so.
 | |
|   **
 | |
|   ** If using the single large allocation mode (pSorter->aMemory!=0), then
 | |
|   ** flush the contents of memory to a new PMA if (a) at least one value is
 | |
|   ** already in memory and (b) the new value will not fit in memory.
 | |
|   ** 
 | |
|   ** Or, if using separate allocations for each record, flush the contents
 | |
|   ** of memory to a PMA if either of the following are true:
 | |
|   **
 | |
|   **   * The total memory allocated for the in-memory list is greater 
 | |
|   **     than (page-size * cache-size), or
 | |
|   **
 | |
|   **   * The total memory allocated for the in-memory list is greater 
 | |
|   **     than (page-size * 10) and sqlite3HeapNearlyFull() returns true.
 | |
|   */
 | |
|   nReq = pVal->n + sizeof(SorterRecord);
 | |
|   nPMA = pVal->n + sqlite3VarintLen(pVal->n);
 | |
|   if( pSorter->mxPmaSize ){
 | |
|     if( pSorter->list.aMemory ){
 | |
|       bFlush = pSorter->iMemory && (pSorter->iMemory+nReq) > pSorter->mxPmaSize;
 | |
|     }else{
 | |
|       bFlush = (
 | |
|           (pSorter->list.szPMA > pSorter->mxPmaSize)
 | |
|        || (pSorter->list.szPMA > pSorter->mnPmaSize && sqlite3HeapNearlyFull())
 | |
|       );
 | |
|     }
 | |
|     if( bFlush ){
 | |
|       rc = vdbeSorterFlushPMA(pSorter);
 | |
|       pSorter->list.szPMA = 0;
 | |
|       pSorter->iMemory = 0;
 | |
|       assert( rc!=SQLITE_OK || pSorter->list.pList==0 );
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   pSorter->list.szPMA += nPMA;
 | |
|   if( nPMA>pSorter->mxKeysize ){
 | |
|     pSorter->mxKeysize = nPMA;
 | |
|   }
 | |
| 
 | |
|   if( pSorter->list.aMemory ){
 | |
|     int nMin = pSorter->iMemory + nReq;
 | |
| 
 | |
|     if( nMin>pSorter->nMemory ){
 | |
|       u8 *aNew;
 | |
|       sqlite3_int64 nNew = 2 * (sqlite3_int64)pSorter->nMemory;
 | |
|       int iListOff = -1;
 | |
|       if( pSorter->list.pList ){
 | |
|         iListOff = (u8*)pSorter->list.pList - pSorter->list.aMemory;
 | |
|       }
 | |
|       while( nNew < nMin ) nNew = nNew*2;
 | |
|       if( nNew > pSorter->mxPmaSize ) nNew = pSorter->mxPmaSize;
 | |
|       if( nNew < nMin ) nNew = nMin;
 | |
|       aNew = sqlite3Realloc(pSorter->list.aMemory, nNew);
 | |
|       if( !aNew ) return SQLITE_NOMEM_BKPT;
 | |
|       if( iListOff>=0 ){
 | |
|         pSorter->list.pList = (SorterRecord*)&aNew[iListOff];
 | |
|       }
 | |
|       pSorter->list.aMemory = aNew;
 | |
|       pSorter->nMemory = nNew;
 | |
|     }
 | |
| 
 | |
|     pNew = (SorterRecord*)&pSorter->list.aMemory[pSorter->iMemory];
 | |
|     pSorter->iMemory += ROUND8(nReq);
 | |
|     if( pSorter->list.pList ){
 | |
|       pNew->u.iNext = (int)((u8*)(pSorter->list.pList) - pSorter->list.aMemory);
 | |
|     }
 | |
|   }else{
 | |
|     pNew = (SorterRecord *)sqlite3Malloc(nReq);
 | |
|     if( pNew==0 ){
 | |
|       return SQLITE_NOMEM_BKPT;
 | |
|     }
 | |
|     pNew->u.pNext = pSorter->list.pList;
 | |
|   }
 | |
| 
 | |
|   memcpy(SRVAL(pNew), pVal->z, pVal->n);
 | |
|   pNew->nVal = pVal->n;
 | |
|   pSorter->list.pList = pNew;
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Read keys from pIncr->pMerger and populate pIncr->aFile[1]. The format
 | |
| ** of the data stored in aFile[1] is the same as that used by regular PMAs,
 | |
| ** except that the number-of-bytes varint is omitted from the start.
 | |
| */
 | |
| static int vdbeIncrPopulate(IncrMerger *pIncr){
 | |
|   int rc = SQLITE_OK;
 | |
|   int rc2;
 | |
|   i64 iStart = pIncr->iStartOff;
 | |
|   SorterFile *pOut = &pIncr->aFile[1];
 | |
|   SortSubtask *pTask = pIncr->pTask;
 | |
|   MergeEngine *pMerger = pIncr->pMerger;
 | |
|   PmaWriter writer;
 | |
|   assert( pIncr->bEof==0 );
 | |
| 
 | |
|   vdbeSorterPopulateDebug(pTask, "enter");
 | |
| 
 | |
|   vdbePmaWriterInit(pOut->pFd, &writer, pTask->pSorter->pgsz, iStart);
 | |
|   while( rc==SQLITE_OK ){
 | |
|     int dummy;
 | |
|     PmaReader *pReader = &pMerger->aReadr[ pMerger->aTree[1] ];
 | |
|     int nKey = pReader->nKey;
 | |
|     i64 iEof = writer.iWriteOff + writer.iBufEnd;
 | |
| 
 | |
|     /* Check if the output file is full or if the input has been exhausted.
 | |
|     ** In either case exit the loop. */
 | |
|     if( pReader->pFd==0 ) break;
 | |
|     if( (iEof + nKey + sqlite3VarintLen(nKey))>(iStart + pIncr->mxSz) ) break;
 | |
| 
 | |
|     /* Write the next key to the output. */
 | |
|     vdbePmaWriteVarint(&writer, nKey);
 | |
|     vdbePmaWriteBlob(&writer, pReader->aKey, nKey);
 | |
|     assert( pIncr->pMerger->pTask==pTask );
 | |
|     rc = vdbeMergeEngineStep(pIncr->pMerger, &dummy);
 | |
|   }
 | |
| 
 | |
|   rc2 = vdbePmaWriterFinish(&writer, &pOut->iEof);
 | |
|   if( rc==SQLITE_OK ) rc = rc2;
 | |
|   vdbeSorterPopulateDebug(pTask, "exit");
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
| /*
 | |
| ** The main routine for background threads that populate aFile[1] of
 | |
| ** multi-threaded IncrMerger objects.
 | |
| */
 | |
| static void *vdbeIncrPopulateThread(void *pCtx){
 | |
|   IncrMerger *pIncr = (IncrMerger*)pCtx;
 | |
|   void *pRet = SQLITE_INT_TO_PTR( vdbeIncrPopulate(pIncr) );
 | |
|   pIncr->pTask->bDone = 1;
 | |
|   return pRet;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Launch a background thread to populate aFile[1] of pIncr.
 | |
| */
 | |
| static int vdbeIncrBgPopulate(IncrMerger *pIncr){
 | |
|   void *p = (void*)pIncr;
 | |
|   assert( pIncr->bUseThread );
 | |
|   return vdbeSorterCreateThread(pIncr->pTask, vdbeIncrPopulateThread, p);
 | |
| }
 | |
| #endif
 | |
| 
 | |
| /*
 | |
| ** This function is called when the PmaReader corresponding to pIncr has
 | |
| ** finished reading the contents of aFile[0]. Its purpose is to "refill"
 | |
| ** aFile[0] such that the PmaReader should start rereading it from the
 | |
| ** beginning.
 | |
| **
 | |
| ** For single-threaded objects, this is accomplished by literally reading 
 | |
| ** keys from pIncr->pMerger and repopulating aFile[0]. 
 | |
| **
 | |
| ** For multi-threaded objects, all that is required is to wait until the 
 | |
| ** background thread is finished (if it is not already) and then swap 
 | |
| ** aFile[0] and aFile[1] in place. If the contents of pMerger have not
 | |
| ** been exhausted, this function also launches a new background thread
 | |
| ** to populate the new aFile[1].
 | |
| **
 | |
| ** SQLITE_OK is returned on success, or an SQLite error code otherwise.
 | |
| */
 | |
| static int vdbeIncrSwap(IncrMerger *pIncr){
 | |
|   int rc = SQLITE_OK;
 | |
| 
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|   if( pIncr->bUseThread ){
 | |
|     rc = vdbeSorterJoinThread(pIncr->pTask);
 | |
| 
 | |
|     if( rc==SQLITE_OK ){
 | |
|       SorterFile f0 = pIncr->aFile[0];
 | |
|       pIncr->aFile[0] = pIncr->aFile[1];
 | |
|       pIncr->aFile[1] = f0;
 | |
|     }
 | |
| 
 | |
|     if( rc==SQLITE_OK ){
 | |
|       if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
 | |
|         pIncr->bEof = 1;
 | |
|       }else{
 | |
|         rc = vdbeIncrBgPopulate(pIncr);
 | |
|       }
 | |
|     }
 | |
|   }else
 | |
| #endif
 | |
|   {
 | |
|     rc = vdbeIncrPopulate(pIncr);
 | |
|     pIncr->aFile[0] = pIncr->aFile[1];
 | |
|     if( pIncr->aFile[0].iEof==pIncr->iStartOff ){
 | |
|       pIncr->bEof = 1;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Allocate and return a new IncrMerger object to read data from pMerger.
 | |
| **
 | |
| ** If an OOM condition is encountered, return NULL. In this case free the
 | |
| ** pMerger argument before returning.
 | |
| */
 | |
| static int vdbeIncrMergerNew(
 | |
|   SortSubtask *pTask,     /* The thread that will be using the new IncrMerger */
 | |
|   MergeEngine *pMerger,   /* The MergeEngine that the IncrMerger will control */
 | |
|   IncrMerger **ppOut      /* Write the new IncrMerger here */
 | |
| ){
 | |
|   int rc = SQLITE_OK;
 | |
|   IncrMerger *pIncr = *ppOut = (IncrMerger*)
 | |
|        (sqlite3FaultSim(100) ? 0 : sqlite3MallocZero(sizeof(*pIncr)));
 | |
|   if( pIncr ){
 | |
|     pIncr->pMerger = pMerger;
 | |
|     pIncr->pTask = pTask;
 | |
|     pIncr->mxSz = MAX(pTask->pSorter->mxKeysize+9,pTask->pSorter->mxPmaSize/2);
 | |
|     pTask->file2.iEof += pIncr->mxSz;
 | |
|   }else{
 | |
|     vdbeMergeEngineFree(pMerger);
 | |
|     rc = SQLITE_NOMEM_BKPT;
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
| /*
 | |
| ** Set the "use-threads" flag on object pIncr.
 | |
| */
 | |
| static void vdbeIncrMergerSetThreads(IncrMerger *pIncr){
 | |
|   pIncr->bUseThread = 1;
 | |
|   pIncr->pTask->file2.iEof -= pIncr->mxSz;
 | |
| }
 | |
| #endif /* SQLITE_MAX_WORKER_THREADS>0 */
 | |
| 
 | |
| 
 | |
| 
 | |
| /*
 | |
| ** Recompute pMerger->aTree[iOut] by comparing the next keys on the
 | |
| ** two PmaReaders that feed that entry.  Neither of the PmaReaders
 | |
| ** are advanced.  This routine merely does the comparison.
 | |
| */
 | |
| static void vdbeMergeEngineCompare(
 | |
|   MergeEngine *pMerger,  /* Merge engine containing PmaReaders to compare */
 | |
|   int iOut               /* Store the result in pMerger->aTree[iOut] */
 | |
| ){
 | |
|   int i1;
 | |
|   int i2;
 | |
|   int iRes;
 | |
|   PmaReader *p1;
 | |
|   PmaReader *p2;
 | |
| 
 | |
|   assert( iOut<pMerger->nTree && iOut>0 );
 | |
| 
 | |
|   if( iOut>=(pMerger->nTree/2) ){
 | |
|     i1 = (iOut - pMerger->nTree/2) * 2;
 | |
|     i2 = i1 + 1;
 | |
|   }else{
 | |
|     i1 = pMerger->aTree[iOut*2];
 | |
|     i2 = pMerger->aTree[iOut*2+1];
 | |
|   }
 | |
| 
 | |
|   p1 = &pMerger->aReadr[i1];
 | |
|   p2 = &pMerger->aReadr[i2];
 | |
| 
 | |
|   if( p1->pFd==0 ){
 | |
|     iRes = i2;
 | |
|   }else if( p2->pFd==0 ){
 | |
|     iRes = i1;
 | |
|   }else{
 | |
|     SortSubtask *pTask = pMerger->pTask;
 | |
|     int bCached = 0;
 | |
|     int res;
 | |
|     assert( pTask->pUnpacked!=0 );  /* from vdbeSortSubtaskMain() */
 | |
|     res = pTask->xCompare(
 | |
|         pTask, &bCached, p1->aKey, p1->nKey, p2->aKey, p2->nKey
 | |
|     );
 | |
|     if( res<=0 ){
 | |
|       iRes = i1;
 | |
|     }else{
 | |
|       iRes = i2;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   pMerger->aTree[iOut] = iRes;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Allowed values for the eMode parameter to vdbeMergeEngineInit()
 | |
| ** and vdbePmaReaderIncrMergeInit().
 | |
| **
 | |
| ** Only INCRINIT_NORMAL is valid in single-threaded builds (when
 | |
| ** SQLITE_MAX_WORKER_THREADS==0).  The other values are only used
 | |
| ** when there exists one or more separate worker threads.
 | |
| */
 | |
| #define INCRINIT_NORMAL 0
 | |
| #define INCRINIT_TASK   1
 | |
| #define INCRINIT_ROOT   2
 | |
| 
 | |
| /* 
 | |
| ** Forward reference required as the vdbeIncrMergeInit() and
 | |
| ** vdbePmaReaderIncrInit() routines are called mutually recursively when
 | |
| ** building a merge tree.
 | |
| */
 | |
| static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode);
 | |
| 
 | |
| /*
 | |
| ** Initialize the MergeEngine object passed as the second argument. Once this
 | |
| ** function returns, the first key of merged data may be read from the 
 | |
| ** MergeEngine object in the usual fashion.
 | |
| **
 | |
| ** If argument eMode is INCRINIT_ROOT, then it is assumed that any IncrMerge
 | |
| ** objects attached to the PmaReader objects that the merger reads from have
 | |
| ** already been populated, but that they have not yet populated aFile[0] and
 | |
| ** set the PmaReader objects up to read from it. In this case all that is
 | |
| ** required is to call vdbePmaReaderNext() on each PmaReader to point it at
 | |
| ** its first key.
 | |
| **
 | |
| ** Otherwise, if eMode is any value other than INCRINIT_ROOT, then use 
 | |
| ** vdbePmaReaderIncrMergeInit() to initialize each PmaReader that feeds data 
 | |
| ** to pMerger.
 | |
| **
 | |
| ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
 | |
| */
 | |
| static int vdbeMergeEngineInit(
 | |
|   SortSubtask *pTask,             /* Thread that will run pMerger */
 | |
|   MergeEngine *pMerger,           /* MergeEngine to initialize */
 | |
|   int eMode                       /* One of the INCRINIT_XXX constants */
 | |
| ){
 | |
|   int rc = SQLITE_OK;             /* Return code */
 | |
|   int i;                          /* For looping over PmaReader objects */
 | |
|   int nTree;                      /* Number of subtrees to merge */
 | |
| 
 | |
|   /* Failure to allocate the merge would have been detected prior to
 | |
|   ** invoking this routine */
 | |
|   assert( pMerger!=0 );
 | |
| 
 | |
|   /* eMode is always INCRINIT_NORMAL in single-threaded mode */
 | |
|   assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
 | |
| 
 | |
|   /* Verify that the MergeEngine is assigned to a single thread */
 | |
|   assert( pMerger->pTask==0 );
 | |
|   pMerger->pTask = pTask;
 | |
| 
 | |
|   nTree = pMerger->nTree;
 | |
|   for(i=0; i<nTree; i++){
 | |
|     if( SQLITE_MAX_WORKER_THREADS>0 && eMode==INCRINIT_ROOT ){
 | |
|       /* PmaReaders should be normally initialized in order, as if they are
 | |
|       ** reading from the same temp file this makes for more linear file IO.
 | |
|       ** However, in the INCRINIT_ROOT case, if PmaReader aReadr[nTask-1] is
 | |
|       ** in use it will block the vdbePmaReaderNext() call while it uses
 | |
|       ** the main thread to fill its buffer. So calling PmaReaderNext()
 | |
|       ** on this PmaReader before any of the multi-threaded PmaReaders takes
 | |
|       ** better advantage of multi-processor hardware. */
 | |
|       rc = vdbePmaReaderNext(&pMerger->aReadr[nTree-i-1]);
 | |
|     }else{
 | |
|       rc = vdbePmaReaderIncrInit(&pMerger->aReadr[i], INCRINIT_NORMAL);
 | |
|     }
 | |
|     if( rc!=SQLITE_OK ) return rc;
 | |
|   }
 | |
| 
 | |
|   for(i=pMerger->nTree-1; i>0; i--){
 | |
|     vdbeMergeEngineCompare(pMerger, i);
 | |
|   }
 | |
|   return pTask->pUnpacked->errCode;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** The PmaReader passed as the first argument is guaranteed to be an
 | |
| ** incremental-reader (pReadr->pIncr!=0). This function serves to open
 | |
| ** and/or initialize the temp file related fields of the IncrMerge
 | |
| ** object at (pReadr->pIncr).
 | |
| **
 | |
| ** If argument eMode is set to INCRINIT_NORMAL, then all PmaReaders
 | |
| ** in the sub-tree headed by pReadr are also initialized. Data is then 
 | |
| ** loaded into the buffers belonging to pReadr and it is set to point to 
 | |
| ** the first key in its range.
 | |
| **
 | |
| ** If argument eMode is set to INCRINIT_TASK, then pReadr is guaranteed
 | |
| ** to be a multi-threaded PmaReader and this function is being called in a
 | |
| ** background thread. In this case all PmaReaders in the sub-tree are 
 | |
| ** initialized as for INCRINIT_NORMAL and the aFile[1] buffer belonging to
 | |
| ** pReadr is populated. However, pReadr itself is not set up to point
 | |
| ** to its first key. A call to vdbePmaReaderNext() is still required to do
 | |
| ** that. 
 | |
| **
 | |
| ** The reason this function does not call vdbePmaReaderNext() immediately 
 | |
| ** in the INCRINIT_TASK case is that vdbePmaReaderNext() assumes that it has
 | |
| ** to block on thread (pTask->thread) before accessing aFile[1]. But, since
 | |
| ** this entire function is being run by thread (pTask->thread), that will
 | |
| ** lead to the current background thread attempting to join itself.
 | |
| **
 | |
| ** Finally, if argument eMode is set to INCRINIT_ROOT, it may be assumed
 | |
| ** that pReadr->pIncr is a multi-threaded IncrMerge objects, and that all
 | |
| ** child-trees have already been initialized using IncrInit(INCRINIT_TASK).
 | |
| ** In this case vdbePmaReaderNext() is called on all child PmaReaders and
 | |
| ** the current PmaReader set to point to the first key in its range.
 | |
| **
 | |
| ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
 | |
| */
 | |
| static int vdbePmaReaderIncrMergeInit(PmaReader *pReadr, int eMode){
 | |
|   int rc = SQLITE_OK;
 | |
|   IncrMerger *pIncr = pReadr->pIncr;
 | |
|   SortSubtask *pTask = pIncr->pTask;
 | |
|   sqlite3 *db = pTask->pSorter->db;
 | |
| 
 | |
|   /* eMode is always INCRINIT_NORMAL in single-threaded mode */
 | |
|   assert( SQLITE_MAX_WORKER_THREADS>0 || eMode==INCRINIT_NORMAL );
 | |
| 
 | |
|   rc = vdbeMergeEngineInit(pTask, pIncr->pMerger, eMode);
 | |
| 
 | |
|   /* Set up the required files for pIncr. A multi-theaded IncrMerge object
 | |
|   ** requires two temp files to itself, whereas a single-threaded object
 | |
|   ** only requires a region of pTask->file2. */
 | |
|   if( rc==SQLITE_OK ){
 | |
|     int mxSz = pIncr->mxSz;
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|     if( pIncr->bUseThread ){
 | |
|       rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[0].pFd);
 | |
|       if( rc==SQLITE_OK ){
 | |
|         rc = vdbeSorterOpenTempFile(db, mxSz, &pIncr->aFile[1].pFd);
 | |
|       }
 | |
|     }else
 | |
| #endif
 | |
|     /*if( !pIncr->bUseThread )*/{
 | |
|       if( pTask->file2.pFd==0 ){
 | |
|         assert( pTask->file2.iEof>0 );
 | |
|         rc = vdbeSorterOpenTempFile(db, pTask->file2.iEof, &pTask->file2.pFd);
 | |
|         pTask->file2.iEof = 0;
 | |
|       }
 | |
|       if( rc==SQLITE_OK ){
 | |
|         pIncr->aFile[1].pFd = pTask->file2.pFd;
 | |
|         pIncr->iStartOff = pTask->file2.iEof;
 | |
|         pTask->file2.iEof += mxSz;
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|   if( rc==SQLITE_OK && pIncr->bUseThread ){
 | |
|     /* Use the current thread to populate aFile[1], even though this
 | |
|     ** PmaReader is multi-threaded. If this is an INCRINIT_TASK object,
 | |
|     ** then this function is already running in background thread 
 | |
|     ** pIncr->pTask->thread. 
 | |
|     **
 | |
|     ** If this is the INCRINIT_ROOT object, then it is running in the 
 | |
|     ** main VDBE thread. But that is Ok, as that thread cannot return
 | |
|     ** control to the VDBE or proceed with anything useful until the 
 | |
|     ** first results are ready from this merger object anyway.
 | |
|     */
 | |
|     assert( eMode==INCRINIT_ROOT || eMode==INCRINIT_TASK );
 | |
|     rc = vdbeIncrPopulate(pIncr);
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   if( rc==SQLITE_OK && (SQLITE_MAX_WORKER_THREADS==0 || eMode!=INCRINIT_TASK) ){
 | |
|     rc = vdbePmaReaderNext(pReadr);
 | |
|   }
 | |
| 
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
| /*
 | |
| ** The main routine for vdbePmaReaderIncrMergeInit() operations run in 
 | |
| ** background threads.
 | |
| */
 | |
| static void *vdbePmaReaderBgIncrInit(void *pCtx){
 | |
|   PmaReader *pReader = (PmaReader*)pCtx;
 | |
|   void *pRet = SQLITE_INT_TO_PTR(
 | |
|                   vdbePmaReaderIncrMergeInit(pReader,INCRINIT_TASK)
 | |
|                );
 | |
|   pReader->pIncr->pTask->bDone = 1;
 | |
|   return pRet;
 | |
| }
 | |
| #endif
 | |
| 
 | |
| /*
 | |
| ** If the PmaReader passed as the first argument is not an incremental-reader
 | |
| ** (if pReadr->pIncr==0), then this function is a no-op. Otherwise, it invokes
 | |
| ** the vdbePmaReaderIncrMergeInit() function with the parameters passed to
 | |
| ** this routine to initialize the incremental merge.
 | |
| ** 
 | |
| ** If the IncrMerger object is multi-threaded (IncrMerger.bUseThread==1), 
 | |
| ** then a background thread is launched to call vdbePmaReaderIncrMergeInit().
 | |
| ** Or, if the IncrMerger is single threaded, the same function is called
 | |
| ** using the current thread.
 | |
| */
 | |
| static int vdbePmaReaderIncrInit(PmaReader *pReadr, int eMode){
 | |
|   IncrMerger *pIncr = pReadr->pIncr;   /* Incremental merger */
 | |
|   int rc = SQLITE_OK;                  /* Return code */
 | |
|   if( pIncr ){
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|     assert( pIncr->bUseThread==0 || eMode==INCRINIT_TASK );
 | |
|     if( pIncr->bUseThread ){
 | |
|       void *pCtx = (void*)pReadr;
 | |
|       rc = vdbeSorterCreateThread(pIncr->pTask, vdbePmaReaderBgIncrInit, pCtx);
 | |
|     }else
 | |
| #endif
 | |
|     {
 | |
|       rc = vdbePmaReaderIncrMergeInit(pReadr, eMode);
 | |
|     }
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Allocate a new MergeEngine object to merge the contents of nPMA level-0
 | |
| ** PMAs from pTask->file. If no error occurs, set *ppOut to point to
 | |
| ** the new object and return SQLITE_OK. Or, if an error does occur, set *ppOut
 | |
| ** to NULL and return an SQLite error code.
 | |
| **
 | |
| ** When this function is called, *piOffset is set to the offset of the
 | |
| ** first PMA to read from pTask->file. Assuming no error occurs, it is 
 | |
| ** set to the offset immediately following the last byte of the last
 | |
| ** PMA before returning. If an error does occur, then the final value of
 | |
| ** *piOffset is undefined.
 | |
| */
 | |
| static int vdbeMergeEngineLevel0(
 | |
|   SortSubtask *pTask,             /* Sorter task to read from */
 | |
|   int nPMA,                       /* Number of PMAs to read */
 | |
|   i64 *piOffset,                  /* IN/OUT: Readr offset in pTask->file */
 | |
|   MergeEngine **ppOut             /* OUT: New merge-engine */
 | |
| ){
 | |
|   MergeEngine *pNew;              /* Merge engine to return */
 | |
|   i64 iOff = *piOffset;
 | |
|   int i;
 | |
|   int rc = SQLITE_OK;
 | |
| 
 | |
|   *ppOut = pNew = vdbeMergeEngineNew(nPMA);
 | |
|   if( pNew==0 ) rc = SQLITE_NOMEM_BKPT;
 | |
| 
 | |
|   for(i=0; i<nPMA && rc==SQLITE_OK; i++){
 | |
|     i64 nDummy = 0;
 | |
|     PmaReader *pReadr = &pNew->aReadr[i];
 | |
|     rc = vdbePmaReaderInit(pTask, &pTask->file, iOff, pReadr, &nDummy);
 | |
|     iOff = pReadr->iEof;
 | |
|   }
 | |
| 
 | |
|   if( rc!=SQLITE_OK ){
 | |
|     vdbeMergeEngineFree(pNew);
 | |
|     *ppOut = 0;
 | |
|   }
 | |
|   *piOffset = iOff;
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Return the depth of a tree comprising nPMA PMAs, assuming a fanout of
 | |
| ** SORTER_MAX_MERGE_COUNT. The returned value does not include leaf nodes.
 | |
| **
 | |
| ** i.e.
 | |
| **
 | |
| **   nPMA<=16    -> TreeDepth() == 0
 | |
| **   nPMA<=256   -> TreeDepth() == 1
 | |
| **   nPMA<=65536 -> TreeDepth() == 2
 | |
| */
 | |
| static int vdbeSorterTreeDepth(int nPMA){
 | |
|   int nDepth = 0;
 | |
|   i64 nDiv = SORTER_MAX_MERGE_COUNT;
 | |
|   while( nDiv < (i64)nPMA ){
 | |
|     nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
 | |
|     nDepth++;
 | |
|   }
 | |
|   return nDepth;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** pRoot is the root of an incremental merge-tree with depth nDepth (according
 | |
| ** to vdbeSorterTreeDepth()). pLeaf is the iSeq'th leaf to be added to the
 | |
| ** tree, counting from zero. This function adds pLeaf to the tree.
 | |
| **
 | |
| ** If successful, SQLITE_OK is returned. If an error occurs, an SQLite error
 | |
| ** code is returned and pLeaf is freed.
 | |
| */
 | |
| static int vdbeSorterAddToTree(
 | |
|   SortSubtask *pTask,             /* Task context */
 | |
|   int nDepth,                     /* Depth of tree according to TreeDepth() */
 | |
|   int iSeq,                       /* Sequence number of leaf within tree */
 | |
|   MergeEngine *pRoot,             /* Root of tree */
 | |
|   MergeEngine *pLeaf              /* Leaf to add to tree */
 | |
| ){
 | |
|   int rc = SQLITE_OK;
 | |
|   int nDiv = 1;
 | |
|   int i;
 | |
|   MergeEngine *p = pRoot;
 | |
|   IncrMerger *pIncr;
 | |
| 
 | |
|   rc = vdbeIncrMergerNew(pTask, pLeaf, &pIncr);
 | |
| 
 | |
|   for(i=1; i<nDepth; i++){
 | |
|     nDiv = nDiv * SORTER_MAX_MERGE_COUNT;
 | |
|   }
 | |
| 
 | |
|   for(i=1; i<nDepth && rc==SQLITE_OK; i++){
 | |
|     int iIter = (iSeq / nDiv) % SORTER_MAX_MERGE_COUNT;
 | |
|     PmaReader *pReadr = &p->aReadr[iIter];
 | |
| 
 | |
|     if( pReadr->pIncr==0 ){
 | |
|       MergeEngine *pNew = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
 | |
|       if( pNew==0 ){
 | |
|         rc = SQLITE_NOMEM_BKPT;
 | |
|       }else{
 | |
|         rc = vdbeIncrMergerNew(pTask, pNew, &pReadr->pIncr);
 | |
|       }
 | |
|     }
 | |
|     if( rc==SQLITE_OK ){
 | |
|       p = pReadr->pIncr->pMerger;
 | |
|       nDiv = nDiv / SORTER_MAX_MERGE_COUNT;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if( rc==SQLITE_OK ){
 | |
|     p->aReadr[iSeq % SORTER_MAX_MERGE_COUNT].pIncr = pIncr;
 | |
|   }else{
 | |
|     vdbeIncrFree(pIncr);
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** This function is called as part of a SorterRewind() operation on a sorter
 | |
| ** that has already written two or more level-0 PMAs to one or more temp
 | |
| ** files. It builds a tree of MergeEngine/IncrMerger/PmaReader objects that 
 | |
| ** can be used to incrementally merge all PMAs on disk.
 | |
| **
 | |
| ** If successful, SQLITE_OK is returned and *ppOut set to point to the
 | |
| ** MergeEngine object at the root of the tree before returning. Or, if an
 | |
| ** error occurs, an SQLite error code is returned and the final value 
 | |
| ** of *ppOut is undefined.
 | |
| */
 | |
| static int vdbeSorterMergeTreeBuild(
 | |
|   VdbeSorter *pSorter,       /* The VDBE cursor that implements the sort */
 | |
|   MergeEngine **ppOut        /* Write the MergeEngine here */
 | |
| ){
 | |
|   MergeEngine *pMain = 0;
 | |
|   int rc = SQLITE_OK;
 | |
|   int iTask;
 | |
| 
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|   /* If the sorter uses more than one task, then create the top-level 
 | |
|   ** MergeEngine here. This MergeEngine will read data from exactly 
 | |
|   ** one PmaReader per sub-task.  */
 | |
|   assert( pSorter->bUseThreads || pSorter->nTask==1 );
 | |
|   if( pSorter->nTask>1 ){
 | |
|     pMain = vdbeMergeEngineNew(pSorter->nTask);
 | |
|     if( pMain==0 ) rc = SQLITE_NOMEM_BKPT;
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
 | |
|     SortSubtask *pTask = &pSorter->aTask[iTask];
 | |
|     assert( pTask->nPMA>0 || SQLITE_MAX_WORKER_THREADS>0 );
 | |
|     if( SQLITE_MAX_WORKER_THREADS==0 || pTask->nPMA ){
 | |
|       MergeEngine *pRoot = 0;     /* Root node of tree for this task */
 | |
|       int nDepth = vdbeSorterTreeDepth(pTask->nPMA);
 | |
|       i64 iReadOff = 0;
 | |
| 
 | |
|       if( pTask->nPMA<=SORTER_MAX_MERGE_COUNT ){
 | |
|         rc = vdbeMergeEngineLevel0(pTask, pTask->nPMA, &iReadOff, &pRoot);
 | |
|       }else{
 | |
|         int i;
 | |
|         int iSeq = 0;
 | |
|         pRoot = vdbeMergeEngineNew(SORTER_MAX_MERGE_COUNT);
 | |
|         if( pRoot==0 ) rc = SQLITE_NOMEM_BKPT;
 | |
|         for(i=0; i<pTask->nPMA && rc==SQLITE_OK; i += SORTER_MAX_MERGE_COUNT){
 | |
|           MergeEngine *pMerger = 0; /* New level-0 PMA merger */
 | |
|           int nReader;              /* Number of level-0 PMAs to merge */
 | |
| 
 | |
|           nReader = MIN(pTask->nPMA - i, SORTER_MAX_MERGE_COUNT);
 | |
|           rc = vdbeMergeEngineLevel0(pTask, nReader, &iReadOff, &pMerger);
 | |
|           if( rc==SQLITE_OK ){
 | |
|             rc = vdbeSorterAddToTree(pTask, nDepth, iSeq++, pRoot, pMerger);
 | |
|           }
 | |
|         }
 | |
|       }
 | |
| 
 | |
|       if( rc==SQLITE_OK ){
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|         if( pMain!=0 ){
 | |
|           rc = vdbeIncrMergerNew(pTask, pRoot, &pMain->aReadr[iTask].pIncr);
 | |
|         }else
 | |
| #endif
 | |
|         {
 | |
|           assert( pMain==0 );
 | |
|           pMain = pRoot;
 | |
|         }
 | |
|       }else{
 | |
|         vdbeMergeEngineFree(pRoot);
 | |
|       }
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if( rc!=SQLITE_OK ){
 | |
|     vdbeMergeEngineFree(pMain);
 | |
|     pMain = 0;
 | |
|   }
 | |
|   *ppOut = pMain;
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** This function is called as part of an sqlite3VdbeSorterRewind() operation
 | |
| ** on a sorter that has written two or more PMAs to temporary files. It sets
 | |
| ** up either VdbeSorter.pMerger (for single threaded sorters) or pReader
 | |
| ** (for multi-threaded sorters) so that it can be used to iterate through
 | |
| ** all records stored in the sorter.
 | |
| **
 | |
| ** SQLITE_OK is returned if successful, or an SQLite error code otherwise.
 | |
| */
 | |
| static int vdbeSorterSetupMerge(VdbeSorter *pSorter){
 | |
|   int rc;                         /* Return code */
 | |
|   SortSubtask *pTask0 = &pSorter->aTask[0];
 | |
|   MergeEngine *pMain = 0;
 | |
| #if SQLITE_MAX_WORKER_THREADS
 | |
|   sqlite3 *db = pTask0->pSorter->db;
 | |
|   int i;
 | |
|   SorterCompare xCompare = vdbeSorterGetCompare(pSorter);
 | |
|   for(i=0; i<pSorter->nTask; i++){
 | |
|     pSorter->aTask[i].xCompare = xCompare;
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   rc = vdbeSorterMergeTreeBuild(pSorter, &pMain);
 | |
|   if( rc==SQLITE_OK ){
 | |
| #if SQLITE_MAX_WORKER_THREADS
 | |
|     assert( pSorter->bUseThreads==0 || pSorter->nTask>1 );
 | |
|     if( pSorter->bUseThreads ){
 | |
|       int iTask;
 | |
|       PmaReader *pReadr = 0;
 | |
|       SortSubtask *pLast = &pSorter->aTask[pSorter->nTask-1];
 | |
|       rc = vdbeSortAllocUnpacked(pLast);
 | |
|       if( rc==SQLITE_OK ){
 | |
|         pReadr = (PmaReader*)sqlite3DbMallocZero(db, sizeof(PmaReader));
 | |
|         pSorter->pReader = pReadr;
 | |
|         if( pReadr==0 ) rc = SQLITE_NOMEM_BKPT;
 | |
|       }
 | |
|       if( rc==SQLITE_OK ){
 | |
|         rc = vdbeIncrMergerNew(pLast, pMain, &pReadr->pIncr);
 | |
|         if( rc==SQLITE_OK ){
 | |
|           vdbeIncrMergerSetThreads(pReadr->pIncr);
 | |
|           for(iTask=0; iTask<(pSorter->nTask-1); iTask++){
 | |
|             IncrMerger *pIncr;
 | |
|             if( (pIncr = pMain->aReadr[iTask].pIncr) ){
 | |
|               vdbeIncrMergerSetThreads(pIncr);
 | |
|               assert( pIncr->pTask!=pLast );
 | |
|             }
 | |
|           }
 | |
|           for(iTask=0; rc==SQLITE_OK && iTask<pSorter->nTask; iTask++){
 | |
|             /* Check that:
 | |
|             **   
 | |
|             **   a) The incremental merge object is configured to use the
 | |
|             **      right task, and
 | |
|             **   b) If it is using task (nTask-1), it is configured to run
 | |
|             **      in single-threaded mode. This is important, as the
 | |
|             **      root merge (INCRINIT_ROOT) will be using the same task
 | |
|             **      object.
 | |
|             */
 | |
|             PmaReader *p = &pMain->aReadr[iTask];
 | |
|             assert( p->pIncr==0 || (
 | |
|                 (p->pIncr->pTask==&pSorter->aTask[iTask])             /* a */
 | |
|              && (iTask!=pSorter->nTask-1 || p->pIncr->bUseThread==0)  /* b */
 | |
|             ));
 | |
|             rc = vdbePmaReaderIncrInit(p, INCRINIT_TASK);
 | |
|           }
 | |
|         }
 | |
|         pMain = 0;
 | |
|       }
 | |
|       if( rc==SQLITE_OK ){
 | |
|         rc = vdbePmaReaderIncrMergeInit(pReadr, INCRINIT_ROOT);
 | |
|       }
 | |
|     }else
 | |
| #endif
 | |
|     {
 | |
|       rc = vdbeMergeEngineInit(pTask0, pMain, INCRINIT_NORMAL);
 | |
|       pSorter->pMerger = pMain;
 | |
|       pMain = 0;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   if( rc!=SQLITE_OK ){
 | |
|     vdbeMergeEngineFree(pMain);
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| 
 | |
| /*
 | |
| ** Once the sorter has been populated by calls to sqlite3VdbeSorterWrite,
 | |
| ** this function is called to prepare for iterating through the records
 | |
| ** in sorted order.
 | |
| */
 | |
| int sqlite3VdbeSorterRewind(const VdbeCursor *pCsr, int *pbEof){
 | |
|   VdbeSorter *pSorter;
 | |
|   int rc = SQLITE_OK;             /* Return code */
 | |
| 
 | |
|   assert( pCsr->eCurType==CURTYPE_SORTER );
 | |
|   pSorter = pCsr->uc.pSorter;
 | |
|   assert( pSorter );
 | |
| 
 | |
|   /* If no data has been written to disk, then do not do so now. Instead,
 | |
|   ** sort the VdbeSorter.pRecord list. The vdbe layer will read data directly
 | |
|   ** from the in-memory list.  */
 | |
|   if( pSorter->bUsePMA==0 ){
 | |
|     if( pSorter->list.pList ){
 | |
|       *pbEof = 0;
 | |
|       rc = vdbeSorterSort(&pSorter->aTask[0], &pSorter->list);
 | |
|     }else{
 | |
|       *pbEof = 1;
 | |
|     }
 | |
|     return rc;
 | |
|   }
 | |
| 
 | |
|   /* Write the current in-memory list to a PMA. When the VdbeSorterWrite() 
 | |
|   ** function flushes the contents of memory to disk, it immediately always
 | |
|   ** creates a new list consisting of a single key immediately afterwards.
 | |
|   ** So the list is never empty at this point.  */
 | |
|   assert( pSorter->list.pList );
 | |
|   rc = vdbeSorterFlushPMA(pSorter);
 | |
| 
 | |
|   /* Join all threads */
 | |
|   rc = vdbeSorterJoinAll(pSorter, rc);
 | |
| 
 | |
|   vdbeSorterRewindDebug("rewind");
 | |
| 
 | |
|   /* Assuming no errors have occurred, set up a merger structure to 
 | |
|   ** incrementally read and merge all remaining PMAs.  */
 | |
|   assert( pSorter->pReader==0 );
 | |
|   if( rc==SQLITE_OK ){
 | |
|     rc = vdbeSorterSetupMerge(pSorter);
 | |
|     *pbEof = 0;
 | |
|   }
 | |
| 
 | |
|   vdbeSorterRewindDebug("rewinddone");
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Advance to the next element in the sorter.  Return value:
 | |
| **
 | |
| **    SQLITE_OK     success
 | |
| **    SQLITE_DONE   end of data
 | |
| **    otherwise     some kind of error.
 | |
| */
 | |
| int sqlite3VdbeSorterNext(sqlite3 *db, const VdbeCursor *pCsr){
 | |
|   VdbeSorter *pSorter;
 | |
|   int rc;                         /* Return code */
 | |
| 
 | |
|   assert( pCsr->eCurType==CURTYPE_SORTER );
 | |
|   pSorter = pCsr->uc.pSorter;
 | |
|   assert( pSorter->bUsePMA || (pSorter->pReader==0 && pSorter->pMerger==0) );
 | |
|   if( pSorter->bUsePMA ){
 | |
|     assert( pSorter->pReader==0 || pSorter->pMerger==0 );
 | |
|     assert( pSorter->bUseThreads==0 || pSorter->pReader );
 | |
|     assert( pSorter->bUseThreads==1 || pSorter->pMerger );
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|     if( pSorter->bUseThreads ){
 | |
|       rc = vdbePmaReaderNext(pSorter->pReader);
 | |
|       if( rc==SQLITE_OK && pSorter->pReader->pFd==0 ) rc = SQLITE_DONE;
 | |
|     }else
 | |
| #endif
 | |
|     /*if( !pSorter->bUseThreads )*/ {
 | |
|       int res = 0;
 | |
|       assert( pSorter->pMerger!=0 );
 | |
|       assert( pSorter->pMerger->pTask==(&pSorter->aTask[0]) );
 | |
|       rc = vdbeMergeEngineStep(pSorter->pMerger, &res);
 | |
|       if( rc==SQLITE_OK && res ) rc = SQLITE_DONE;
 | |
|     }
 | |
|   }else{
 | |
|     SorterRecord *pFree = pSorter->list.pList;
 | |
|     pSorter->list.pList = pFree->u.pNext;
 | |
|     pFree->u.pNext = 0;
 | |
|     if( pSorter->list.aMemory==0 ) vdbeSorterRecordFree(db, pFree);
 | |
|     rc = pSorter->list.pList ? SQLITE_OK : SQLITE_DONE;
 | |
|   }
 | |
|   return rc;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Return a pointer to a buffer owned by the sorter that contains the 
 | |
| ** current key.
 | |
| */
 | |
| static void *vdbeSorterRowkey(
 | |
|   const VdbeSorter *pSorter,      /* Sorter object */
 | |
|   int *pnKey                      /* OUT: Size of current key in bytes */
 | |
| ){
 | |
|   void *pKey;
 | |
|   if( pSorter->bUsePMA ){
 | |
|     PmaReader *pReader;
 | |
| #if SQLITE_MAX_WORKER_THREADS>0
 | |
|     if( pSorter->bUseThreads ){
 | |
|       pReader = pSorter->pReader;
 | |
|     }else
 | |
| #endif
 | |
|     /*if( !pSorter->bUseThreads )*/{
 | |
|       pReader = &pSorter->pMerger->aReadr[pSorter->pMerger->aTree[1]];
 | |
|     }
 | |
|     *pnKey = pReader->nKey;
 | |
|     pKey = pReader->aKey;
 | |
|   }else{
 | |
|     *pnKey = pSorter->list.pList->nVal;
 | |
|     pKey = SRVAL(pSorter->list.pList);
 | |
|   }
 | |
|   return pKey;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Copy the current sorter key into the memory cell pOut.
 | |
| */
 | |
| int sqlite3VdbeSorterRowkey(const VdbeCursor *pCsr, Mem *pOut){
 | |
|   VdbeSorter *pSorter;
 | |
|   void *pKey; int nKey;           /* Sorter key to copy into pOut */
 | |
| 
 | |
|   assert( pCsr->eCurType==CURTYPE_SORTER );
 | |
|   pSorter = pCsr->uc.pSorter;
 | |
|   pKey = vdbeSorterRowkey(pSorter, &nKey);
 | |
|   if( sqlite3VdbeMemClearAndResize(pOut, nKey) ){
 | |
|     return SQLITE_NOMEM_BKPT;
 | |
|   }
 | |
|   pOut->n = nKey;
 | |
|   MemSetTypeFlag(pOut, MEM_Blob);
 | |
|   memcpy(pOut->z, pKey, nKey);
 | |
| 
 | |
|   return SQLITE_OK;
 | |
| }
 | |
| 
 | |
| /*
 | |
| ** Compare the key in memory cell pVal with the key that the sorter cursor
 | |
| ** passed as the first argument currently points to. For the purposes of
 | |
| ** the comparison, ignore the rowid field at the end of each record.
 | |
| **
 | |
| ** If the sorter cursor key contains any NULL values, consider it to be
 | |
| ** less than pVal. Even if pVal also contains NULL values.
 | |
| **
 | |
| ** If an error occurs, return an SQLite error code (i.e. SQLITE_NOMEM).
 | |
| ** Otherwise, set *pRes to a negative, zero or positive value if the
 | |
| ** key in pVal is smaller than, equal to or larger than the current sorter
 | |
| ** key.
 | |
| **
 | |
| ** This routine forms the core of the OP_SorterCompare opcode, which in
 | |
| ** turn is used to verify uniqueness when constructing a UNIQUE INDEX.
 | |
| */
 | |
| int sqlite3VdbeSorterCompare(
 | |
|   const VdbeCursor *pCsr,         /* Sorter cursor */
 | |
|   Mem *pVal,                      /* Value to compare to current sorter key */
 | |
|   int nKeyCol,                    /* Compare this many columns */
 | |
|   int *pRes                       /* OUT: Result of comparison */
 | |
| ){
 | |
|   VdbeSorter *pSorter;
 | |
|   UnpackedRecord *r2;
 | |
|   KeyInfo *pKeyInfo;
 | |
|   int i;
 | |
|   void *pKey; int nKey;           /* Sorter key to compare pVal with */
 | |
| 
 | |
|   assert( pCsr->eCurType==CURTYPE_SORTER );
 | |
|   pSorter = pCsr->uc.pSorter;
 | |
|   r2 = pSorter->pUnpacked;
 | |
|   pKeyInfo = pCsr->pKeyInfo;
 | |
|   if( r2==0 ){
 | |
|     r2 = pSorter->pUnpacked = sqlite3VdbeAllocUnpackedRecord(pKeyInfo);
 | |
|     if( r2==0 ) return SQLITE_NOMEM_BKPT;
 | |
|     r2->nField = nKeyCol;
 | |
|   }
 | |
|   assert( r2->nField==nKeyCol );
 | |
| 
 | |
|   pKey = vdbeSorterRowkey(pSorter, &nKey);
 | |
|   sqlite3VdbeRecordUnpack(pKeyInfo, nKey, pKey, r2);
 | |
|   for(i=0; i<nKeyCol; i++){
 | |
|     if( r2->aMem[i].flags & MEM_Null ){
 | |
|       *pRes = -1;
 | |
|       return SQLITE_OK;
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   *pRes = sqlite3VdbeRecordCompare(pVal->n, pVal->z, r2);
 | |
|   return SQLITE_OK;
 | |
| }
 |