29 #include <sys/types.h>
42 string metaPartitionPath,
43 vector<string> dataPartitionPaths,
48 this->nodeId = nodeId;
50 this->typeId = typeId;
52 this->metaPartitionPath = metaPartitionPath;
53 this->dataPartitionPaths = dataPartitionPaths;
54 this->logger = logger;
55 this->pageSize = pageSize;
56 this->usingDirect =
false;
57 this->cleared =
false;
59 this->metaData = make_shared<PartitionedFileMetaData>();
60 this->metaData->setPageSize(pageSize);
61 this->metaData->setNumFlushedPages(0);
62 this->metaData->setVersion(0);
63 this->metaData->setLatestPageId((
unsigned int)(-1));
64 pthread_mutex_init(&this->fileMutex,
nullptr);
66 for (i = 0; i < dataPartitionPaths.size(); i++) {
67 curPartitionMetaData = make_shared<PartitionMetaData>(dataPartitionPaths.at(i), i);
68 this->metaData->addPartition(curPartitionMetaData);
72 this->metaFile =
nullptr;
73 for (i = 0; i < dataPartitionPaths.size(); i++) {
74 this->dataFiles.push_back(
nullptr);
75 this->dataHandles.push_back(-1);
91 string metaPartitionPath,
94 this->nodeId = nodeId;
96 this->typeId = typeId;
98 this->metaPartitionPath = metaPartitionPath;
99 this->logger = logger;
100 this->cleared =
false;
104 pthread_mutex_init(&this->fileMutex,
nullptr);
111 if (this->cleared ==
false) {
112 pthread_mutex_lock(&this->fileMutex);
114 pthread_mutex_unlock(&this->fileMutex);
116 pthread_mutex_destroy(&this->fileMutex);
124 if (this->metaFile !=
nullptr) {
128 ifstream file(this->metaPartitionPath.c_str());
130 curFile = fopen(this->metaPartitionPath.c_str(),
"w");
132 curFile = fopen(this->metaPartitionPath.c_str(),
"r+");
134 if (curFile !=
nullptr) {
135 this->metaFile = curFile;
137 cout <<
"meta can't be open:" << this->metaPartitionPath.c_str() <<
"\n";
147 if (usingDirect ==
true) {
148 return openDataDirect();
150 int numPartitions = this->dataPartitionPaths.size();
153 for (i = 0; i < numPartitions; i++) {
154 curFile = fopen(this->dataPartitionPaths.at(i).c_str(),
"a+");
155 if (curFile !=
nullptr) {
156 this->dataFiles.at(i) = curFile;
157 cout <<
"file opened:" << this->dataPartitionPaths.at(i).c_str() <<
"\n";
159 cout <<
"file can't be open:" << this->dataPartitionPaths.at(i).c_str() <<
"\n";
170 int numPartitions = this->dataPartitionPaths.size();
173 for (i = 0; i < numPartitions; i++) {
174 handle = open(this->dataPartitionPaths.at(i).c_str(),
175 O_RDWR | O_APPEND | O_CREAT
183 this->dataHandles.at(i) = handle;
184 cout <<
"file opened:" << this->dataPartitionPaths.at(i).c_str() <<
"\n";
186 cout <<
"file can't be open:" << this->dataPartitionPaths.at(i).c_str() <<
"\n";
197 return ((this->openMeta()) && (this->openData()));
205 if (usingDirect ==
true) {
206 return closeDirect();
209 int numPartitions = this->dataPartitionPaths.size();
210 fclose(this->metaFile);
211 this->metaFile =
nullptr;
212 for (i = 0; i < numPartitions; i++) {
213 fclose(this->dataFiles.at(i));
214 this->dataFiles.at(i) =
nullptr;
224 int numPartitions = this->dataPartitionPaths.size();
225 fclose(this->metaFile);
226 for (i = 0; i < numPartitions; i++) {
227 close(this->dataHandles.at(i));
238 pthread_mutex_lock(&this->fileMutex);
239 if (this->cleared ==
true) {
241 pthread_mutex_unlock(&this->fileMutex);
245 remove(this->metaPartitionPath.c_str());
246 logger->info(
"PartitionedFile: Deleting file:" + this->metaPartitionPath);
248 int numPartitions = this->dataPartitionPaths.size();
249 for (i = 0; i < numPartitions; i++) {
250 remove(this->dataPartitionPaths.at(i).c_str());
251 logger->info(
"PartitionedFile: Deleting file:" + this->dataPartitionPaths.at(i));
253 this->cleared =
true;
254 pthread_mutex_unlock(&this->fileMutex);
261 return this->metaData;
267 if (usingDirect ==
true) {
268 return appendPageDirect(partitionId, page);
270 FILE* curPartition =
nullptr;
271 if (((curPartition = this->dataFiles.at(partitionId)) ==
nullptr) || (page ==
nullptr)) {
275 PageID pageId = page->getPageID();
277 pthread_mutex_lock(&this->fileMutex);
278 if (this->cleared ==
true) {
279 pthread_mutex_unlock(&this->fileMutex);
282 if (this->writeData(curPartition, page->getRawBytes(), page->getRawSize()) < 0) {
283 pthread_mutex_unlock(&this->fileMutex);
287 this->metaData->incNumFlushedPages();
289 if ((pageId > this->metaData->getLatestPageId()) ||
290 (this->metaData->getLatestPageId() == (
unsigned int)(-1))) {
291 this->metaData->setLatestPageId(pageId);
295 int ret = (int)(this->metaData->getPartition(partitionId)->getNumPages());
296 this->metaData->addPageIndex(pageId, partitionId, ret);
297 this->metaData->getPartition(partitionId)->incNumPages();
298 pthread_mutex_unlock(&this->fileMutex);
307 if (((handle = this->dataHandles.at(partitionId)) < 0) || (page ==
nullptr)) {
311 PageID pageId = page->getPageID();
312 pthread_mutex_lock(&this->fileMutex);
313 if (this->cleared ==
true) {
314 pthread_mutex_unlock(&this->fileMutex);
317 if (this->writeDataDirect(handle, page->getRawBytes(), page->getRawSize()) < 0) {
318 pthread_mutex_unlock(&this->fileMutex);
321 this->metaData->incNumFlushedPages();
323 if ((pageId > this->metaData->getLatestPageId()) ||
324 (this->metaData->getLatestPageId() == (
unsigned int)(-1))) {
325 this->metaData->setLatestPageId(pageId);
327 int ret = (int)(this->metaData->getPartition(partitionId)->getNumPages());
328 this->metaData->addPageIndex(pageId, partitionId, ret);
329 this->metaData->getPartition(partitionId)->incNumPages();
330 pthread_mutex_unlock(&this->fileMutex);
359 pthread_mutex_lock(&this->fileMutex);
360 if (this->metaFile ==
nullptr) {
361 pthread_mutex_unlock(&this->fileMutex);
364 if (this->cleared ==
true) {
365 pthread_mutex_unlock(&this->fileMutex);
369 size_t metaSize =
sizeof(
FileType) +
sizeof(
unsigned short) +
sizeof(size_t) +
370 sizeof(
unsigned int) +
sizeof(
unsigned int) +
sizeof(
unsigned int);
371 unsigned int numPartitions = this->dataPartitionPaths.size();
373 for (i = 0; i < numPartitions; i++) {
374 metaSize +=
sizeof(
FilePartitionID) +
sizeof(
unsigned int) +
sizeof(size_t) +
375 this->dataPartitionPaths.at(i).length() + 1;
377 unsigned int numPages = this->metaData->getNumFlushedPages();
378 for (i = 0; i < numPages; i++) {
382 fseek(this->metaFile, 0, SEEK_SET);
383 fwrite((
size_t*)(&metaSize),
sizeof(
size_t), 1, this->metaFile);
384 fflush(this->metaFile);
386 char* buffer = (
char*)malloc(metaSize *
sizeof(
char));
393 *((
unsigned short*)cur) = this->metaData->getVersion();
394 cur = cur +
sizeof(
unsigned short);
396 *((
size_t*)cur) = this->metaData->getPageSize();
397 cur = cur +
sizeof(size_t);
399 *((
unsigned int*)cur) = this->metaData->getNumFlushedPages();
400 cur = cur +
sizeof(
unsigned int);
401 *((
unsigned int*)cur) = this->metaData->getLatestPageId();
402 cur = cur +
sizeof(
unsigned int);
404 *((
unsigned int*)cur) = numPartitions;
405 cur = cur +
sizeof(
unsigned int);
407 for (i = 0; i < numPartitions; i++) {
410 *((
unsigned int*)cur) = this->metaData->getPartition(i)->getNumPages();
411 cur = cur +
sizeof(
unsigned int);
412 *((
size_t*)cur) = this->dataPartitionPaths.at(i).length() + 1;
413 cur = cur +
sizeof(size_t);
415 this->dataPartitionPaths.at(i).c_str(),
416 this->dataPartitionPaths.at(i).length() + 1);
417 cur = cur + this->dataPartitionPaths.at(i).length() + 1;
420 for (
auto iter = this->getMetaData()->getPageIndexes()->begin();
421 iter != this->getMetaData()->getPageIndexes()->end();
423 PageID pageId = iter->first;
426 cur = cur +
sizeof(
PageID);
430 cur = cur +
sizeof(
unsigned int);
434 fseek(this->metaFile,
sizeof(
size_t), SEEK_SET);
435 int ret = this->writeData(this->metaFile, (
void*)buffer, metaSize);
436 fflush(this->metaFile);
438 pthread_mutex_unlock(&this->fileMutex);
452 if (this->seekNumFlushedPagesInMeta() < 0) {
455 unsigned int numFlushedPages = this->getMetaData()->getNumFlushedPages();
456 if (this->writeData(this->metaFile, &numFlushedPages,
sizeof(
unsigned int)) < 0) {
459 unsigned int numPartitions = this->dataPartitionPaths.size();
461 for (i = 0; i < numPartitions; i++) {
462 this->seekNumFlushedPagesInPartitionMeta(i);
463 numFlushedPages = this->getMetaData()->getPartition(i)->getNumPages();
464 if (this->writeData(this->metaFile, &(numFlushedPages),
sizeof(
unsigned int)) < 0) {
468 fflush(this->metaFile);
484 unsigned int pageSeqInPartition,
487 if (usingDirect ==
true) {
488 return loadPageDirect(partitionId, pageSeqInPartition, pageInCache, length);
490 FILE* curFile = this->dataFiles.at(partitionId);
491 if (curFile ==
nullptr) {
494 if (pageSeqInPartition < this->getMetaData()->getPartition(partitionId)->getNumPages()) {
495 seekPage(curFile, pageSeqInPartition);
496 return fread(pageInCache,
sizeof(
char), length, curFile);
506 unsigned int pageSeqInPartition,
509 int handle = this->dataHandles.at(partitionId);
514 if (pageSeqInPartition < this->getMetaData()->getPartition(partitionId)->getNumPages()) {
515 seekPageDirect(handle, pageSeqInPartition);
516 ret = read(handle, pageInCache, length);
528 PageID ret = this->getMetaData()->getPageId(partitionId, pageSeqInPartition);
538 unsigned int pageSeqInPartition,
541 FILE* curFile = this->dataFiles.at(partitionId);
542 if (curFile ==
nullptr) {
545 if (pageSeqInPartition < this->getMetaData()->getPartition(partitionId)->getNumPages()) {
546 return fread(pageInCache,
sizeof(
char), length, curFile);
559 unsigned int pageSeqInPartition,
562 FILE* curFile = this->dataFiles.at(partitionId);
563 if (curFile ==
nullptr) {
566 if (pageSeqInPartition < this->getMetaData()->getPartition(partitionId)->getNumPages()) {
568 size_t size = fread(&pageId,
sizeof(
char),
sizeof(
PageID), curFile);
570 std::cout <<
"PartitionedFile: Read failed" << std::endl;
585 if (this->metaFile ==
nullptr) {
588 if (this->seekNumFlushedPagesInMeta() == 0) {
589 this->logger->writeLn(
"PartitionedFile: get numFlushedPages from meta partition:");
590 unsigned int numFlushedPages;
592 fread((
unsigned int*)(&numFlushedPages),
sizeof(
unsigned int), 1, this->metaFile);
594 std::cout <<
"PartitionedFile: Read failed" << std::endl;
597 this->getMetaData()->setNumFlushedPages(numFlushedPages);
598 this->logger->writeInt(this->getMetaData()->getNumFlushedPages());
599 return this->getMetaData()->getNumFlushedPages();
609 return this->getMetaData()->getNumFlushedPages();
616 return this->getMetaData()->getLatestPageId();
623 return this->getMetaData()->getLatestPageId();
666 return this->dataPartitionPaths.size();
698 if (this->openMeta() ==
false) {
699 this->logger->error(
"Fatal Error: PartitionedFile: Error: can't open meta partition.");
703 fseek(this->metaFile, 0, SEEK_SET);
705 size_t sizeRead = fread((
size_t*)(&(size)),
sizeof(
size_t), 1, this->metaFile);
707 std::cout <<
"PartitionedFile: Read meta size failed" << std::endl;
711 fseek(this->metaFile,
sizeof(
size_t), SEEK_SET);
712 char* buf = (
char*)malloc(size *
sizeof(
char));
713 sizeRead = fread((
void*)buf,
sizeof(
char), size, this->metaFile);
714 if (sizeRead < size) {
715 cout <<
"Metadata corrupted, please remove storage folders and try again...\n";
717 "Fatal Error: Metadata corrupted, please remove storage folders and try again...");
722 this->metaData = make_shared<PartitionedFileMetaData>();
729 unsigned short version = (
unsigned short)(*(
unsigned short*)cur);
730 this->metaData->setVersion(version);
731 cur = cur +
sizeof(
unsigned short);
734 size_t pageSize = (size_t)(*(
size_t*)cur);
735 this->metaData->setPageSize(pageSize);
736 this->pageSize = pageSize;
737 std::cout <<
"Detected file with page size=" << pageSize << std::endl;
738 cur = cur +
sizeof(size_t);
741 unsigned int numFlushedPages = (
unsigned int)(*(
unsigned int*)cur);
742 this->metaData->setNumFlushedPages(numFlushedPages);
743 cur = cur +
sizeof(
unsigned int);
746 unsigned int latestPageId = (
unsigned int)(*(
unsigned int*)cur);
747 this->metaData->setLatestPageId(latestPageId);
748 cur = cur +
sizeof(
unsigned int);
752 unsigned int numPartitions = (
unsigned int)(*(
unsigned int*)cur);
753 cur = cur +
sizeof(
unsigned int);
756 unsigned int numFlushedPagesInPartition;
761 for (i = 0; i < numPartitions; i++) {
762 curPartitionMeta = make_shared<PartitionMetaData>();
765 curPartitionMeta->setPartitionId(partitionId);
769 numFlushedPagesInPartition = (
unsigned int)(*(
unsigned int*)cur);
770 curPartitionMeta->setNumPages(numFlushedPagesInPartition);
771 cur = cur +
sizeof(
unsigned int);
774 pathLen = (size_t)(*(
size_t*)cur);
775 cur = cur +
sizeof(size_t);
777 string partitionPath(cur);
778 this->dataPartitionPaths.push_back(partitionPath);
779 curPartitionMeta->setPath(partitionPath);
780 this->metaData->addPartition(curPartitionMeta);
785 unsigned int pageSeqInPartition;
787 for (i = 0; i < numFlushedPages; i++) {
789 cur = cur +
sizeof(
PageID);
792 pageSeqInPartition = (
unsigned int)(*(
unsigned int*)cur);
793 cur = cur +
sizeof(
unsigned int);
794 this->metaData->addPageIndex(pageId, partitionId, pageSeqInPartition);
805 pageSize = getPageSizeInMeta();
815 if (this->metaFile ==
nullptr) {
818 if (this->seekPageSizeInMeta() == 0) {
820 this->logger->writeLn(
"PartitionedFile: get page size from meta partition:");
821 size_t sizeRead = fread((
size_t*)(&(pageSize)),
sizeof(
size_t), 1, this->metaFile);
823 std::cout <<
"PartitionedFile: Read failed" << std::endl;
826 this->logger->writeInt(pageSize);
838 for (i = 0; i < dataPartitionPaths.size(); i++) {
839 this->dataFiles.push_back(
nullptr);
840 this->dataHandles.push_back(-1);
848 this->dataPartitionPaths = dataPartitionPaths;
857 if ((file ==
nullptr) || (data ==
nullptr)) {
858 cout <<
"PartitionedFile: Error: writeData with nullptr.\n";
861 size_t retSize = fwrite(data,
sizeof(
char), length, file);
863 if (retSize != length) {
874 if ((handle < 0) || (data ==
nullptr)) {
875 cout <<
"PartitionedFile: Error: invalid handle or data is nullptr.\n";
878 size_t retSize = write(handle, data, length);
879 if (retSize != length) {
880 cout <<
"written bytes:" << retSize <<
"\n";
892 if (partition ==
nullptr) {
895 return fseek(partition, (pageSeqInPartition) * (this->metaData->getPageSize()), SEEK_SET);
905 return lseek(handle, (pageSeqInPartition) * (this->metaData->getPageSize()), SEEK_SET);
913 if (this->metaFile ==
nullptr) {
917 this->metaFile,
sizeof(
size_t) +
sizeof(
FileType) +
sizeof(
unsigned short), SEEK_SET);
924 if (this->metaFile ==
nullptr) {
927 return fseek(this->metaFile,
928 sizeof(
size_t) +
sizeof(
FileType) +
sizeof(
unsigned short) +
sizeof(
size_t),
936 if (this->metaFile ==
nullptr) {
940 unsigned int metaSize =
sizeof(
FileType) +
sizeof(
unsigned short) +
sizeof(size_t) +
941 sizeof(
unsigned int) +
sizeof(
unsigned int);
942 for (i = 0; i < partitionId; i++) {
943 metaSize +=
sizeof(
FilePartitionID) +
sizeof(
unsigned int) +
sizeof(size_t) +
944 this->dataPartitionPaths.at(i).length() + 1;
947 return fseek(this->metaFile,
sizeof(
size_t) + metaSize, SEEK_SET);
shared_ptr< PDBPage > PDBPagePtr
unsigned int pageSeqInPartition
size_t loadPageDirect(FilePartitionID partitionId, unsigned int pageSeqInPartition, char *pageInCache, size_t length)
int writeData(FILE *file, void *data, size_t length)
size_t getPageSize() override
PageID getLastFlushedPageID() override
size_t loadPageFromCurPos(FilePartitionID partitionId, unsigned int pageSeqInPartition, char *pageInCache, size_t length)
int updateMeta() override
PageID loadPageId(FilePartitionID partitionId, unsigned int pageSeqInPartition)
int seekPage(FILE *file, unsigned int pageSeqInPartition)
NodeID getNodeId() override
unsigned int getNumPartitions()
UserTypeID getTypeId() override
shared_ptr< SharedMem > SharedMemPtr
PageID getLatestPageID() override
FilePartitionID partitionId
PageID loadPageIdFromCurPos(FilePartitionID partitionId, unsigned int pageSeqInPartition, char *pageInCache, size_t length)
DatabaseID getDbId() override
int writeDataDirect(int handle, void *data, size_t length)
void setDataPartitionPaths(const vector< string > &dataPartitionPaths)
FileType getFileType() override
int seekPageDirect(int handle, unsigned int pageSeqInPartition)
PartitionedFileMetaDataPtr getMetaData()
void buildMetaDataFromMetaPartition(SharedMemPtr shm)
size_t loadPage(FilePartitionID partitionId, unsigned int pageSeqInPartition, char *pageInCache, size_t length) override
SetID getSetId() override
int seekNumFlushedPagesInMeta()
void initializeDataFiles()
int seekNumFlushedPagesInPartitionMeta(FilePartitionID partitionId)
std::shared_ptr< PDBLogger > PDBLoggerPtr
size_t getPageSizeInMeta() override
int appendPage(FilePartitionID partitionId, PDBPagePtr page) override
PartitionedFile(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, string metaPartitionPath, vector< string > dataPartitionPaths, pdb::PDBLoggerPtr logger, size_t pageSize)
unsigned int FilePartitionID
unsigned int getNumFlushedPages() override
int appendPageDirect(FilePartitionID partitionId, PDBPagePtr page)
unsigned int getAndSetNumFlushedPages() override