A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
SequenceFile.cc
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 
20 #include "SequenceFile.h"
21 #include <stdio.h>
22 #include <string.h>
23 #include <string>
24 #include <iostream>
25 
26 using namespace std;
27 
28 
29 // if isNew == true, the path is the root path, we need to new a sequence file
30 // otherwise, we just open existing sequence file
31 
33  DatabaseID dbId,
34  UserTypeID typeId,
35  SetID setId,
36  string path,
37  pdb::PDBLoggerPtr logger,
38  size_t pageSize) {
39  this->nodeId = nodeId;
40  this->dbId = dbId;
41  this->typeId = typeId;
42  this->setId = setId;
43  this->file = nullptr;
44  this->metaSize = sizeof(FileType) + sizeof(size_t);
45  this->filePath = path;
46  this->logger = logger;
47  this->numFlushedPages = 0;
48  this->pageSize = pageSize;
49  logger->writeLn("SequenceFile: path:");
50  logger->writeLn(filePath.c_str());
51  this->openAll();
52 }
53 
55  if (this->file != nullptr) {
56  this->closeAll();
57  }
58 }
59 
61  if (this->file != nullptr) {
62  return true;
63  }
64  if ((this->file = fopen(this->filePath.c_str(), "a+")) != 0) {
65  return true;
66  } else {
67  this->logger->writeLn("SequenceFile: file can not be opened");
68  perror(nullptr);
69  return false;
70  }
71 }
72 
74  if (fclose(this->file) == 0) {
75  this->file = nullptr;
76  return true;
77  } else {
78  return false;
79  }
80 }
81 
83  this->closeAll();
84  if (this->file != nullptr) {
85  this->file = nullptr;
86  this->numFlushedPages = 0;
87  }
88  // delete the file
89  if (remove(this->filePath.c_str()) == 0) {
90  cout << "Removed temp data " << this->filePath << ".\n";
91  this->filePath = "";
92  }
93 }
94 
96  return appendPage(page);
97 }
98 
100  if (this->file == nullptr) {
101  return -1;
102  }
103  this->logger->writeLn("SequenceFile: appending data...");
104  int retSize = fwrite(page->getRawBytes(), sizeof(char), page->getSize(), this->file);
105  PageID pageId = page->getPageID();
106  this->logger->writeLn("SequenceFile: PageID:");
107  this->logger->writeInt(pageId);
108  retSize += fwrite(&pageId, sizeof(PageID), 1, this->file);
109  fflush(this->file);
110  this->numFlushedPages++;
111  this->lastFlushedId = pageId;
112  this->logger->writeLn("SequenceFile: appendPage: Size:");
113  this->logger->writeInt(retSize);
114  this->logger->writeLn("SequenceFile: appendPage: PageID:");
115  this->logger->writeInt((this->lastFlushedId));
116  return 0;
117 }
118 
119 int SequenceFile::writeData(void* data, size_t length) {
120  if (this->file == nullptr) {
121  return -1;
122  }
123  size_t retSize = fwrite(data, sizeof(char), length, this->file);
124  if (retSize != length) {
125  return -1;
126  } else {
127  return 0;
128  }
129 }
130 
131 
132 // Meta data is always at the beginning of a file, and has following layout:
133 // FileType: sizeof(enum)
134 // PageSize: sizeof(size_t)
136  if (this->file == nullptr) {
137  return -1;
138  }
139  char* buffer = new char[this->metaSize];
140  char* cur = buffer;
141  // initialize FileType
143  cur = cur + sizeof(FileType);
144  // initialize PageSize;
145  *((size_t*)cur) = this->pageSize;
146  int ret = this->writeData(buffer, this->metaSize);
147  delete[] buffer;
148  return ret;
149 }
150 
152  return 0;
153 }
154 
156  if (this->file == nullptr) {
157  return -1;
158  }
159  return fseek(this->file, sizeof(FileType), SEEK_SET);
160 }
161 
163  if (pageSize == 0) {
164  pageSize = getPageSizeInMeta();
165  }
166  return pageSize;
167 }
168 
170  if (this->file == nullptr) {
171  return -1;
172  }
173  if (this->seekPageSizeInMeta() == 0) {
174  size_t pageSize;
175  this->logger->writeLn("SequenceFile: get page size from file meta:");
176  size_t sizeRead = fread((size_t*)(&(pageSize)), sizeof(size_t), 1, this->file);
177  if (sizeRead == 0) {
178  return 0;
179  }
180  this->logger->writeInt(pageSize);
181  return pageSize;
182  } else {
183  return 0;
184  }
185 }
186 
188  if (this->file == nullptr) {
189  return -1;
190  }
191  return fseek(this->file, -sizeof(PageID), SEEK_END);
192 }
193 
195  if (this->seekLastFlushedPageID() == 0) {
196  size_t size = ftell(this->file);
197  this->logger->writeLn("SequenceFile: file position after seek:");
198  this->logger->writeInt(size);
199  if (size <= this->metaSize) {
200  this->logger->writeLn("SequenceFile: no flushedPages.");
201  this->numFlushedPages = 0;
202  }
203  this->logger->writeLn("SequenceFile: set numFlushedPages:");
204  size_t sizeRead = fread((PageID*)(&(this->lastFlushedId)), sizeof(PageID), 1, this->file);
205  if (sizeRead == 0) {
206  std::cout << "SequenceFile: Read failed" << std::endl;
207  return 0;
208  }
209  this->numFlushedPages = this->lastFlushedId + 1;
210  this->logger->writeInt(this->lastFlushedId);
211  } else {
212  this->logger->writeLn("SequenceFile: no flushedPages.");
213  this->numFlushedPages = 0;
214  }
215  return this->numFlushedPages;
216 }
217 
219  return this->numFlushedPages;
220 }
221 
223  return this->lastFlushedId;
224 }
225 
227  return this->lastFlushedId;
228 }
229 
231  if (this->file == nullptr) {
232  return -1;
233  }
234  return fseek(
235  this->file, this->metaSize + (pageId) * (this->pageSize + sizeof(PageID)), SEEK_SET);
236 }
237 
238 // Load data of given length on the page specified to cache
239 size_t SequenceFile::loadPage(PageID pageId, char* pageInCache, size_t length) {
240  return loadPage(0, pageId, pageInCache, length);
241 }
242 
243 // Load data of given length on the page specified to cache
245  unsigned int pageSeqInPartition,
246  char* pageInCache,
247  size_t length) {
248  if (this->file == nullptr) {
249  return 0;
250  }
251  seekPage(pageSeqInPartition);
252  return fread(pageInCache, sizeof(char), length, this->file);
253 }
254 
255 
257  if (this->file == nullptr) {
258  return -1;
259  }
260  return fseek(this->file, 0, SEEK_END);
261 }
void clear() override
Definition: SequenceFile.cc:82
unsigned int SetID
Definition: DataTypes.h:31
shared_ptr< PDBPage > PDBPagePtr
Definition: PDBPage.h:32
FileType
Definition: DataTypes.h:72
int seekPage(PageID pageId)
unsigned int NodeID
Definition: DataTypes.h:27
unsigned int getAndSetNumFlushedPages() override
int seekLastFlushedPageID()
unsigned int DatabaseID
Definition: DataTypes.h:29
unsigned int PageID
Definition: DataTypes.h:26
size_t loadPage(PageID pageId, char *pageInCache, size_t length)
PageID getLatestPageID() override
PageID getLastFlushedPageID() override
bool openAll() override
Definition: SequenceFile.cc:60
size_t getPageSize() override
int seekPageSizeInMeta()
int writeData(void *data, size_t length)
bool closeAll() override
Definition: SequenceFile.cc:73
int updateMeta() override
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
unsigned int getNumFlushedPages() override
int appendPage(FilePartitionID partitionId, PDBPagePtr page) override
Definition: SequenceFile.cc:95
int writeMeta() override
size_t getPageSizeInMeta() override
unsigned int FilePartitionID
Definition: DataTypes.h:32
SequenceFile(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, string path, pdb::PDBLoggerPtr logger, size_t pageSize)
Definition: SequenceFile.cc:32
unsigned int UserTypeID
Definition: DataTypes.h:25