A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PartitionedFile.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #include "PartitionedFile.h"
20 #include <stdio.h>
21 #include <vector>
22 #include <string>
23 #include <cstring>
24 #include <fcntl.h>
25 #include <iostream>
26 #include <fstream>
27 #include <stdlib.h>
28 #include <sys/stat.h>
29 #include <sys/types.h>
30 #include <unistd.h>
31 #include <sys/stat.h>
32 #include <chrono>
33 #include <ctime>
34 using namespace std;
39  DatabaseID dbId,
40  UserTypeID typeId,
41  SetID setId,
42  string metaPartitionPath,
43  vector<string> dataPartitionPaths,
44  pdb::PDBLoggerPtr logger,
45  size_t pageSize) {
46  unsigned int i = 0;
47 
48  this->nodeId = nodeId;
49  this->dbId = dbId;
50  this->typeId = typeId;
51  this->setId = setId;
52  this->metaPartitionPath = metaPartitionPath;
53  this->dataPartitionPaths = dataPartitionPaths;
54  this->logger = logger;
55  this->pageSize = pageSize;
56  this->usingDirect = false;
57  this->cleared = false;
58  // Initialize meta data;
59  this->metaData = make_shared<PartitionedFileMetaData>();
60  this->metaData->setPageSize(pageSize);
61  this->metaData->setNumFlushedPages(0);
62  this->metaData->setVersion(0);
63  this->metaData->setLatestPageId((unsigned int)(-1));
64  pthread_mutex_init(&this->fileMutex, nullptr);
65  PartitionMetaDataPtr curPartitionMetaData;
66  for (i = 0; i < dataPartitionPaths.size(); i++) {
67  curPartitionMetaData = make_shared<PartitionMetaData>(dataPartitionPaths.at(i), i);
68  this->metaData->addPartition(curPartitionMetaData);
69  }
70 
71  // Initialize FILE instances;
72  this->metaFile = nullptr;
73  for (i = 0; i < dataPartitionPaths.size(); i++) {
74  this->dataFiles.push_back(nullptr);
75  this->dataHandles.push_back(-1);
76  }
77  this->openAll();
78  this->writeMeta();
79 
80  // Initialize the mutex;
81 }
82 
88  DatabaseID dbId,
89  UserTypeID typeId,
90  SetID setId,
91  string metaPartitionPath,
92  pdb::PDBLoggerPtr logger) {
93 
94  this->nodeId = nodeId;
95  this->dbId = dbId;
96  this->typeId = typeId;
97  this->setId = setId;
98  this->metaPartitionPath = metaPartitionPath;
99  this->logger = logger;
100  this->cleared = false;
101 
102 
103  // Initialize the mutex;
104  pthread_mutex_init(&this->fileMutex, nullptr);
105 }
106 
111  if (this->cleared == false) {
112  pthread_mutex_lock(&this->fileMutex);
113  this->closeAll();
114  pthread_mutex_unlock(&this->fileMutex);
115  }
116  pthread_mutex_destroy(&this->fileMutex);
117 }
118 
123  // check whether meta partition exists, if not, create it.
124  if (this->metaFile != nullptr) {
125  return false;
126  }
127  FILE* curFile;
128  ifstream file(this->metaPartitionPath.c_str());
129  if (!file) {
130  curFile = fopen(this->metaPartitionPath.c_str(), "w");
131  } else {
132  curFile = fopen(this->metaPartitionPath.c_str(), "r+");
133  }
134  if (curFile != nullptr) {
135  this->metaFile = curFile;
136  } else {
137  cout << "meta can't be open:" << this->metaPartitionPath.c_str() << "\n";
138  return false;
139  }
140  return true;
141 }
142 
147  if (usingDirect == true) {
148  return openDataDirect();
149  }
150  int numPartitions = this->dataPartitionPaths.size();
151  int i;
152  FILE* curFile;
153  for (i = 0; i < numPartitions; i++) {
154  curFile = fopen(this->dataPartitionPaths.at(i).c_str(), "a+");
155  if (curFile != nullptr) {
156  this->dataFiles.at(i) = curFile;
157  cout << "file opened:" << this->dataPartitionPaths.at(i).c_str() << "\n";
158  } else {
159  cout << "file can't be open:" << this->dataPartitionPaths.at(i).c_str() << "\n";
160  return false;
161  }
162  }
163  return true;
164 }
165 
170  int numPartitions = this->dataPartitionPaths.size();
171  int i;
172  int handle;
173  for (i = 0; i < numPartitions; i++) {
174  handle = open(this->dataPartitionPaths.at(i).c_str(),
175  O_RDWR | O_APPEND | O_CREAT
176 #ifndef __APPLE__
177  |
178  O_DIRECT
179 #endif
180  ,
181  S_IRWXU | S_IRWXO);
182  if (handle >= 0) {
183  this->dataHandles.at(i) = handle;
184  cout << "file opened:" << this->dataPartitionPaths.at(i).c_str() << "\n";
185  } else {
186  cout << "file can't be open:" << this->dataPartitionPaths.at(i).c_str() << "\n";
187  return false;
188  }
189  }
190  return true;
191 }
192 
197  return ((this->openMeta()) && (this->openData()));
198 }
199 
200 
205  if (usingDirect == true) {
206  return closeDirect();
207  }
208  int i;
209  int numPartitions = this->dataPartitionPaths.size();
210  fclose(this->metaFile);
211  this->metaFile = nullptr;
212  for (i = 0; i < numPartitions; i++) {
213  fclose(this->dataFiles.at(i));
214  this->dataFiles.at(i) = nullptr;
215  }
216  return true;
217 }
218 
223  int i;
224  int numPartitions = this->dataPartitionPaths.size();
225  fclose(this->metaFile);
226  for (i = 0; i < numPartitions; i++) {
227  close(this->dataHandles.at(i));
228  }
229  return true;
230 }
231 
232 
237 
238  pthread_mutex_lock(&this->fileMutex);
239  if (this->cleared == true) {
240 
241  pthread_mutex_unlock(&this->fileMutex);
242  return;
243  }
244  this->closeAll();
245  remove(this->metaPartitionPath.c_str());
246  logger->info("PartitionedFile: Deleting file:" + this->metaPartitionPath);
247  int i;
248  int numPartitions = this->dataPartitionPaths.size();
249  for (i = 0; i < numPartitions; i++) {
250  remove(this->dataPartitionPaths.at(i).c_str());
251  logger->info("PartitionedFile: Deleting file:" + this->dataPartitionPaths.at(i));
252  }
253  this->cleared = true;
254  pthread_mutex_unlock(&this->fileMutex);
255 }
256 
261  return this->metaData;
262 }
267  if (usingDirect == true) {
268  return appendPageDirect(partitionId, page);
269  }
270  FILE* curPartition = nullptr;
271  if (((curPartition = this->dataFiles.at(partitionId)) == nullptr) || (page == nullptr)) {
272  return -1;
273  }
274 
275  PageID pageId = page->getPageID();
276 
277  pthread_mutex_lock(&this->fileMutex);
278  if (this->cleared == true) {
279  pthread_mutex_unlock(&this->fileMutex);
280  return -1;
281  }
282  if (this->writeData(curPartition, page->getRawBytes(), page->getRawSize()) < 0) {
283  pthread_mutex_unlock(&this->fileMutex);
284  return -1;
285  }
286  // update metadata;
287  this->metaData->incNumFlushedPages();
288 
289  if ((pageId > this->metaData->getLatestPageId()) ||
290  (this->metaData->getLatestPageId() == (unsigned int)(-1))) {
291  this->metaData->setLatestPageId(pageId);
292  }
293 
294  // update partition metadata
295  int ret = (int)(this->metaData->getPartition(partitionId)->getNumPages());
296  this->metaData->addPageIndex(pageId, partitionId, ret);
297  this->metaData->getPartition(partitionId)->incNumPages();
298  pthread_mutex_unlock(&this->fileMutex);
299  return ret;
300 }
301 
306  int handle = -1;
307  if (((handle = this->dataHandles.at(partitionId)) < 0) || (page == nullptr)) {
308  return -1;
309  }
310 
311  PageID pageId = page->getPageID();
312  pthread_mutex_lock(&this->fileMutex);
313  if (this->cleared == true) {
314  pthread_mutex_unlock(&this->fileMutex);
315  return -1;
316  }
317  if (this->writeDataDirect(handle, page->getRawBytes(), page->getRawSize()) < 0) {
318  pthread_mutex_unlock(&this->fileMutex);
319  return -1;
320  }
321  this->metaData->incNumFlushedPages();
322 
323  if ((pageId > this->metaData->getLatestPageId()) ||
324  (this->metaData->getLatestPageId() == (unsigned int)(-1))) {
325  this->metaData->setLatestPageId(pageId);
326  }
327  int ret = (int)(this->metaData->getPartition(partitionId)->getNumPages());
328  this->metaData->addPageIndex(pageId, partitionId, ret);
329  this->metaData->getPartition(partitionId)->incNumPages();
330  pthread_mutex_unlock(&this->fileMutex);
331  return ret;
332 }
333 
334 
359  pthread_mutex_lock(&this->fileMutex);
360  if (this->metaFile == nullptr) {
361  pthread_mutex_unlock(&this->fileMutex);
362  return -1;
363  }
364  if (this->cleared == true) {
365  pthread_mutex_unlock(&this->fileMutex);
366  return -1;
367  }
368  // compute meta size
369  size_t metaSize = sizeof(FileType) + sizeof(unsigned short) + sizeof(size_t) +
370  sizeof(unsigned int) + sizeof(unsigned int) + sizeof(unsigned int);
371  unsigned int numPartitions = this->dataPartitionPaths.size();
372  unsigned int i = 0;
373  for (i = 0; i < numPartitions; i++) {
374  metaSize += sizeof(FilePartitionID) + sizeof(unsigned int) + sizeof(size_t) +
375  this->dataPartitionPaths.at(i).length() + 1;
376  }
377  unsigned int numPages = this->metaData->getNumFlushedPages();
378  for (i = 0; i < numPages; i++) {
379  metaSize += sizeof(PageID) + sizeof(FilePartitionID) + sizeof(unsigned int);
380  }
381  // write meta size to meta partition
382  fseek(this->metaFile, 0, SEEK_SET);
383  fwrite((size_t*)(&metaSize), sizeof(size_t), 1, this->metaFile);
384  fflush(this->metaFile);
385  // allocate buffer for meta data
386  char* buffer = (char*)malloc(metaSize * sizeof(char));
387  char* cur = buffer;
388 
389  // intialize FileType
391  cur = cur + sizeof(FileType);
392  // initialize Version
393  *((unsigned short*)cur) = this->metaData->getVersion();
394  cur = cur + sizeof(unsigned short);
395  // initialize PageSize
396  *((size_t*)cur) = this->metaData->getPageSize();
397  cur = cur + sizeof(size_t);
398  // initialize TotalPageNumber
399  *((unsigned int*)cur) = this->metaData->getNumFlushedPages();
400  cur = cur + sizeof(unsigned int);
401  *((unsigned int*)cur) = this->metaData->getLatestPageId();
402  cur = cur + sizeof(unsigned int);
403  // initialize Partitions
404  *((unsigned int*)cur) = numPartitions;
405  cur = cur + sizeof(unsigned int);
406 
407  for (i = 0; i < numPartitions; i++) {
408  *((FilePartitionID*)cur) = i;
409  cur = cur + sizeof(FilePartitionID);
410  *((unsigned int*)cur) = this->metaData->getPartition(i)->getNumPages();
411  cur = cur + sizeof(unsigned int);
412  *((size_t*)cur) = this->dataPartitionPaths.at(i).length() + 1;
413  cur = cur + sizeof(size_t);
414  memcpy(cur,
415  this->dataPartitionPaths.at(i).c_str(),
416  this->dataPartitionPaths.at(i).length() + 1);
417  cur = cur + this->dataPartitionPaths.at(i).length() + 1;
418  }
419 
420  for (auto iter = this->getMetaData()->getPageIndexes()->begin();
421  iter != this->getMetaData()->getPageIndexes()->end();
422  iter++) {
423  PageID pageId = iter->first;
424  PageIndex pageIndex = iter->second;
425  *((PageID*)cur) = pageId;
426  cur = cur + sizeof(PageID);
427  *((FilePartitionID*)cur) = pageIndex.partitionId;
428  cur = cur + sizeof(FilePartitionID);
429  *((unsigned int*)cur) = pageIndex.pageSeqInPartition;
430  cur = cur + sizeof(unsigned int);
431  }
432 
433  // write meta data
434  fseek(this->metaFile, sizeof(size_t), SEEK_SET);
435  int ret = this->writeData(this->metaFile, (void*)buffer, metaSize);
436  fflush(this->metaFile);
437  free(buffer);
438  pthread_mutex_unlock(&this->fileMutex);
439  return ret;
440 }
441 
452  if (this->seekNumFlushedPagesInMeta() < 0) {
453  return -1;
454  }
455  unsigned int numFlushedPages = this->getMetaData()->getNumFlushedPages();
456  if (this->writeData(this->metaFile, &numFlushedPages, sizeof(unsigned int)) < 0) {
457  return -1;
458  }
459  unsigned int numPartitions = this->dataPartitionPaths.size();
460  unsigned int i;
461  for (i = 0; i < numPartitions; i++) {
462  this->seekNumFlushedPagesInPartitionMeta(i);
463  numFlushedPages = this->getMetaData()->getPartition(i)->getNumPages();
464  if (this->writeData(this->metaFile, &(numFlushedPages), sizeof(unsigned int)) < 0) {
465  return -1;
466  }
467  }
468  fflush(this->metaFile);
469  return 0;
470 }
471 
484  unsigned int pageSeqInPartition,
485  char* pageInCache,
486  size_t length) {
487  if (usingDirect == true) {
488  return loadPageDirect(partitionId, pageSeqInPartition, pageInCache, length);
489  }
490  FILE* curFile = this->dataFiles.at(partitionId);
491  if (curFile == nullptr) {
492  return 0;
493  }
494  if (pageSeqInPartition < this->getMetaData()->getPartition(partitionId)->getNumPages()) {
495  seekPage(curFile, pageSeqInPartition);
496  return fread(pageInCache, sizeof(char), length, curFile);
497  } else {
498  return (size_t)-1;
499  }
500 }
501 
506  unsigned int pageSeqInPartition,
507  char* pageInCache,
508  size_t length) {
509  int handle = this->dataHandles.at(partitionId);
510  size_t ret;
511  if (handle < 0) {
512  return (size_t)(-1);
513  }
514  if (pageSeqInPartition < this->getMetaData()->getPartition(partitionId)->getNumPages()) {
515  seekPageDirect(handle, pageSeqInPartition);
516  ret = read(handle, pageInCache, length);
517  } else {
518  return (size_t)(-1);
519  }
520  return ret;
521 }
522 
527 PageID PartitionedFile::loadPageId(FilePartitionID partitionId, unsigned int pageSeqInPartition) {
528  PageID ret = this->getMetaData()->getPageId(partitionId, pageSeqInPartition);
529  return ret;
530 }
531 
538  unsigned int pageSeqInPartition,
539  char* pageInCache,
540  size_t length) {
541  FILE* curFile = this->dataFiles.at(partitionId);
542  if (curFile == nullptr) {
543  return 0;
544  }
545  if (pageSeqInPartition < this->getMetaData()->getPartition(partitionId)->getNumPages()) {
546  return fread(pageInCache, sizeof(char), length, curFile);
547  } else {
548  return (size_t)(-1);
549  }
550 }
551 
552 
559  unsigned int pageSeqInPartition,
560  char* pageInCache,
561  size_t length) {
562  FILE* curFile = this->dataFiles.at(partitionId);
563  if (curFile == nullptr) {
564  return 0;
565  }
566  if (pageSeqInPartition < this->getMetaData()->getPartition(partitionId)->getNumPages()) {
567  PageID pageId;
568  size_t size = fread(&pageId, sizeof(char), sizeof(PageID), curFile);
569  if (size == 0) {
570  std::cout << "PartitionedFile: Read failed" << std::endl;
571  return (PageID)(-1);
572  }
573  return pageId;
574  } else {
575  return (PageID)(-1);
576  }
577 }
578 
585  if (this->metaFile == nullptr) {
586  return -1;
587  }
588  if (this->seekNumFlushedPagesInMeta() == 0) {
589  this->logger->writeLn("PartitionedFile: get numFlushedPages from meta partition:");
590  unsigned int numFlushedPages;
591  size_t size =
592  fread((unsigned int*)(&numFlushedPages), sizeof(unsigned int), 1, this->metaFile);
593  if (size == 0) {
594  std::cout << "PartitionedFile: Read failed" << std::endl;
595  return 0;
596  }
597  this->getMetaData()->setNumFlushedPages(numFlushedPages);
598  this->logger->writeInt(this->getMetaData()->getNumFlushedPages());
599  return this->getMetaData()->getNumFlushedPages();
600  } else {
601  return 0;
602  }
603 }
604 
609  return this->getMetaData()->getNumFlushedPages();
610 }
611 
616  return this->getMetaData()->getLatestPageId();
617 }
618 
623  return this->getMetaData()->getLatestPageId();
624 }
625 
630  return this->nodeId;
631 }
632 
637  return this->dbId;
638 }
639 
640 
645  return this->typeId;
646 }
647 
652  return this->setId;
653 }
654 
660 }
661 
666  return this->dataPartitionPaths.size();
667 }
668 
673  // parse the meta file
697  // Open meta partition for reading
698  if (this->openMeta() == false) {
699  this->logger->error("Fatal Error: PartitionedFile: Error: can't open meta partition.");
700  exit(-1);
701  }
702  // get meta partition size;
703  fseek(this->metaFile, 0, SEEK_SET);
704  size_t size;
705  size_t sizeRead = fread((size_t*)(&(size)), sizeof(size_t), 1, this->metaFile);
706  if (sizeRead == 0) {
707  std::cout << "PartitionedFile: Read meta size failed" << std::endl;
708  exit(-1);
709  }
710  // load meta partition to memory
711  fseek(this->metaFile, sizeof(size_t), SEEK_SET);
712  char* buf = (char*)malloc(size * sizeof(char));
713  sizeRead = fread((void*)buf, sizeof(char), size, this->metaFile);
714  if (sizeRead < size) {
715  cout << "Metadata corrupted, please remove storage folders and try again...\n";
716  this->logger->error(
717  "Fatal Error: Metadata corrupted, please remove storage folders and try again...");
718  exit(-1);
719  }
720 
721  // create a meta data instance
722  this->metaData = make_shared<PartitionedFileMetaData>();
723 
724  // parse file type
725  char* cur = buf;
726  cur = cur + sizeof(FileType);
727 
728  // parse and set version;
729  unsigned short version = (unsigned short)(*(unsigned short*)cur);
730  this->metaData->setVersion(version);
731  cur = cur + sizeof(unsigned short);
732 
733  // parse and set pageSize;
734  size_t pageSize = (size_t)(*(size_t*)cur);
735  this->metaData->setPageSize(pageSize);
736  this->pageSize = pageSize;
737  std::cout << "Detected file with page size=" << pageSize << std::endl;
738  cur = cur + sizeof(size_t);
739 
740  // parse and set numFlushed pages;
741  unsigned int numFlushedPages = (unsigned int)(*(unsigned int*)cur);
742  this->metaData->setNumFlushedPages(numFlushedPages);
743  cur = cur + sizeof(unsigned int);
744 
745  // parse and set latestPageId;
746  unsigned int latestPageId = (unsigned int)(*(unsigned int*)cur);
747  this->metaData->setLatestPageId(latestPageId);
748  cur = cur + sizeof(unsigned int);
749 
750 
751  // parse numPartitions;
752  unsigned int numPartitions = (unsigned int)(*(unsigned int*)cur);
753  cur = cur + sizeof(unsigned int);
754  PartitionMetaDataPtr curPartitionMeta;
755  FilePartitionID partitionId;
756  unsigned int numFlushedPagesInPartition;
757  size_t pathLen;
758 
759  // parse and set partition meta data
760  unsigned int i;
761  for (i = 0; i < numPartitions; i++) {
762  curPartitionMeta = make_shared<PartitionMetaData>();
763  // parse and set partitionId
764  partitionId = (FilePartitionID)(*(FilePartitionID*)cur);
765  curPartitionMeta->setPartitionId(partitionId);
766  cur = cur + sizeof(FilePartitionID);
767 
768  // parse and set numFlushedPages
769  numFlushedPagesInPartition = (unsigned int)(*(unsigned int*)cur);
770  curPartitionMeta->setNumPages(numFlushedPagesInPartition);
771  cur = cur + sizeof(unsigned int);
772 
773  // parse len
774  pathLen = (size_t)(*(size_t*)cur);
775  cur = cur + sizeof(size_t);
776  // parse string
777  string partitionPath(cur);
778  this->dataPartitionPaths.push_back(partitionPath);
779  curPartitionMeta->setPath(partitionPath);
780  this->metaData->addPartition(curPartitionMeta);
781  cur = cur + pathLen;
782  }
783 
784  PageID pageId;
785  unsigned int pageSeqInPartition;
786  // parse and set page index data
787  for (i = 0; i < numFlushedPages; i++) {
788  pageId = (PageID)(*(PageID*)cur);
789  cur = cur + sizeof(PageID);
790  partitionId = (FilePartitionID)(*(FilePartitionID*)cur);
791  cur = cur + sizeof(FilePartitionID);
792  pageSeqInPartition = (unsigned int)(*(unsigned int*)cur);
793  cur = cur + sizeof(unsigned int);
794  this->metaData->addPageIndex(pageId, partitionId, pageSeqInPartition);
795  }
796 
797  free(buf);
798 }
799 
804  if (pageSize == 0) {
805  pageSize = getPageSizeInMeta();
806  }
807  return pageSize;
808 }
809 
810 
815  if (this->metaFile == nullptr) {
816  return (size_t)(-1);
817  }
818  if (this->seekPageSizeInMeta() == 0) {
819  size_t pageSize;
820  this->logger->writeLn("PartitionedFile: get page size from meta partition:");
821  size_t sizeRead = fread((size_t*)(&(pageSize)), sizeof(size_t), 1, this->metaFile);
822  if (sizeRead == 0) {
823  std::cout << "PartitionedFile: Read failed" << std::endl;
824  return 0;
825  }
826  this->logger->writeInt(pageSize);
827  return pageSize;
828  } else {
829  return 0;
830  }
831 }
832 
837  unsigned int i;
838  for (i = 0; i < dataPartitionPaths.size(); i++) {
839  this->dataFiles.push_back(nullptr);
840  this->dataHandles.push_back(-1);
841  }
842 }
843 
847 void PartitionedFile::setDataPartitionPaths(const vector<string>& dataPartitionPaths) {
848  this->dataPartitionPaths = dataPartitionPaths;
849 }
850 
851 
855 int PartitionedFile::writeData(FILE* file, void* data, size_t length) {
856 
857  if ((file == nullptr) || (data == nullptr)) {
858  cout << "PartitionedFile: Error: writeData with nullptr.\n";
859  return -1;
860  }
861  size_t retSize = fwrite(data, sizeof(char), length, file);
862  fflush(file);
863  if (retSize != length) {
864  return -1;
865  } else {
866  return 0;
867  }
868 }
869 
873 int PartitionedFile::writeDataDirect(int handle, void* data, size_t length) {
874  if ((handle < 0) || (data == nullptr)) {
875  cout << "PartitionedFile: Error: invalid handle or data is nullptr.\n";
876  return -1;
877  }
878  size_t retSize = write(handle, data, length);
879  if (retSize != length) {
880  cout << "written bytes:" << retSize << "\n";
881  return -1;
882  } else {
883  return 0;
884  }
885 }
886 
887 
891 int PartitionedFile::seekPage(FILE* partition, unsigned int pageSeqInPartition) {
892  if (partition == nullptr) {
893  return -1;
894  }
895  return fseek(partition, (pageSeqInPartition) * (this->metaData->getPageSize()), SEEK_SET);
896 }
897 
901 int PartitionedFile::seekPageDirect(int handle, unsigned int pageSeqInPartition) {
902  if (handle < 0) {
903  return -1;
904  }
905  return lseek(handle, (pageSeqInPartition) * (this->metaData->getPageSize()), SEEK_SET);
906 }
907 
908 
913  if (this->metaFile == nullptr) {
914  return -1;
915  }
916  return fseek(
917  this->metaFile, sizeof(size_t) + sizeof(FileType) + sizeof(unsigned short), SEEK_SET);
918 }
919 
924  if (this->metaFile == nullptr) {
925  return -1;
926  }
927  return fseek(this->metaFile,
928  sizeof(size_t) + sizeof(FileType) + sizeof(unsigned short) + sizeof(size_t),
929  SEEK_SET);
930 }
931 
936  if (this->metaFile == nullptr) {
937  return -1;
938  }
939  unsigned int i;
940  unsigned int metaSize = sizeof(FileType) + sizeof(unsigned short) + sizeof(size_t) +
941  sizeof(unsigned int) + sizeof(unsigned int);
942  for (i = 0; i < partitionId; i++) {
943  metaSize += sizeof(FilePartitionID) + sizeof(unsigned int) + sizeof(size_t) +
944  this->dataPartitionPaths.at(i).length() + 1;
945  }
946  metaSize += sizeof(FilePartitionID);
947  return fseek(this->metaFile, sizeof(size_t) + metaSize, SEEK_SET);
948 }
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
unsigned int pageSeqInPartition
Definition: DataTypes.h:99
size_t loadPageDirect(FilePartitionID partitionId, unsigned int pageSeqInPartition, char *pageInCache, size_t length)
int writeData(FILE *file, void *data, size_t length)
size_t getPageSize() override
PageID getLastFlushedPageID() override
FileType
Definition: DataTypes.h:72
size_t loadPageFromCurPos(FilePartitionID partitionId, unsigned int pageSeqInPartition, char *pageInCache, size_t length)
void clear() override
bool closeAll() override
unsigned int NodeID
Definition: DataTypes.h:27
int updateMeta() override
PageID loadPageId(FilePartitionID partitionId, unsigned int pageSeqInPartition)
int seekPage(FILE *file, unsigned int pageSeqInPartition)
NodeID getNodeId() override
unsigned int getNumPartitions()
UserTypeID getTypeId() override
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
unsigned int DatabaseID
Definition: DataTypes.h:29
PageID getLatestPageID() override
unsigned int PageID
Definition: DataTypes.h:26
FilePartitionID partitionId
Definition: DataTypes.h:98
PageID loadPageIdFromCurPos(FilePartitionID partitionId, unsigned int pageSeqInPartition, char *pageInCache, size_t length)
DatabaseID getDbId() override
int writeDataDirect(int handle, void *data, size_t length)
void setDataPartitionPaths(const vector< string > &dataPartitionPaths)
FileType getFileType() override
int seekPageDirect(int handle, unsigned int pageSeqInPartition)
PartitionedFileMetaDataPtr getMetaData()
void buildMetaDataFromMetaPartition(SharedMemPtr shm)
size_t loadPage(FilePartitionID partitionId, unsigned int pageSeqInPartition, char *pageInCache, size_t length) override
SetID getSetId() override
int seekNumFlushedPagesInMeta()
shared_ptr< PartitionMetaData > PartitionMetaDataPtr
int seekNumFlushedPagesInPartitionMeta(FilePartitionID partitionId)
shared_ptr< PartitionedFileMetaData > PartitionedFileMetaDataPtr
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
int writeMeta() override
bool openAll() override
size_t getPageSizeInMeta() override
int appendPage(FilePartitionID partitionId, PDBPagePtr page) override
PartitionedFile(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, string metaPartitionPath, vector< string > dataPartitionPaths, pdb::PDBLoggerPtr logger, size_t pageSize)
unsigned int FilePartitionID
Definition: DataTypes.h:32
unsigned int getNumFlushedPages() override
unsigned int UserTypeID
Definition: DataTypes.h:25
int appendPageDirect(FilePartitionID partitionId, PDBPagePtr page)
unsigned int getAndSetNumFlushedPages() override