39 this->nodeId = nodeId;
41 this->typeId = typeId;
44 this->metaSize =
sizeof(
FileType) +
sizeof(
size_t);
45 this->filePath = path;
46 this->logger = logger;
47 this->numFlushedPages = 0;
48 this->pageSize = pageSize;
49 logger->writeLn(
"SequenceFile: path:");
50 logger->writeLn(filePath.c_str());
55 if (this->file !=
nullptr) {
61 if (this->file !=
nullptr) {
64 if ((this->file = fopen(this->filePath.c_str(),
"a+")) != 0) {
67 this->logger->writeLn(
"SequenceFile: file can not be opened");
74 if (fclose(this->file) == 0) {
84 if (this->file !=
nullptr) {
86 this->numFlushedPages = 0;
89 if (
remove(this->filePath.c_str()) == 0) {
90 cout <<
"Removed temp data " << this->filePath <<
".\n";
96 return appendPage(page);
100 if (this->file ==
nullptr) {
103 this->logger->writeLn(
"SequenceFile: appending data...");
104 int retSize = fwrite(page->getRawBytes(),
sizeof(char), page->getSize(), this->file);
105 PageID pageId = page->getPageID();
106 this->logger->writeLn(
"SequenceFile: PageID:");
107 this->logger->writeInt(pageId);
108 retSize += fwrite(&pageId,
sizeof(
PageID), 1, this->file);
110 this->numFlushedPages++;
111 this->lastFlushedId = pageId;
112 this->logger->writeLn(
"SequenceFile: appendPage: Size:");
113 this->logger->writeInt(retSize);
114 this->logger->writeLn(
"SequenceFile: appendPage: PageID:");
115 this->logger->writeInt((this->lastFlushedId));
120 if (this->file ==
nullptr) {
123 size_t retSize = fwrite(data,
sizeof(
char), length, this->file);
124 if (retSize != length) {
136 if (this->file ==
nullptr) {
139 char* buffer =
new char[this->metaSize];
145 *((
size_t*)cur) = this->pageSize;
146 int ret = this->writeData(buffer, this->metaSize);
156 if (this->file ==
nullptr) {
159 return fseek(this->file,
sizeof(
FileType), SEEK_SET);
164 pageSize = getPageSizeInMeta();
170 if (this->file ==
nullptr) {
173 if (this->seekPageSizeInMeta() == 0) {
175 this->logger->writeLn(
"SequenceFile: get page size from file meta:");
176 size_t sizeRead = fread((
size_t*)(&(pageSize)),
sizeof(
size_t), 1, this->file);
180 this->logger->writeInt(pageSize);
188 if (this->file ==
nullptr) {
191 return fseek(this->file, -
sizeof(
PageID), SEEK_END);
195 if (this->seekLastFlushedPageID() == 0) {
196 size_t size = ftell(this->file);
197 this->logger->writeLn(
"SequenceFile: file position after seek:");
198 this->logger->writeInt(size);
199 if (size <= this->metaSize) {
200 this->logger->writeLn(
"SequenceFile: no flushedPages.");
201 this->numFlushedPages = 0;
203 this->logger->writeLn(
"SequenceFile: set numFlushedPages:");
204 size_t sizeRead = fread((
PageID*)(&(this->lastFlushedId)),
sizeof(
PageID), 1, this->file);
206 std::cout <<
"SequenceFile: Read failed" << std::endl;
209 this->numFlushedPages = this->lastFlushedId + 1;
210 this->logger->writeInt(this->lastFlushedId);
212 this->logger->writeLn(
"SequenceFile: no flushedPages.");
213 this->numFlushedPages = 0;
215 return this->numFlushedPages;
219 return this->numFlushedPages;
223 return this->lastFlushedId;
227 return this->lastFlushedId;
231 if (this->file ==
nullptr) {
235 this->file, this->metaSize + (pageId) * (this->pageSize +
sizeof(
PageID)), SEEK_SET);
240 return loadPage(0, pageId, pageInCache, length);
245 unsigned int pageSeqInPartition,
248 if (this->file ==
nullptr) {
251 seekPage(pageSeqInPartition);
252 return fread(pageInCache,
sizeof(
char), length, this->file);
257 if (this->file ==
nullptr) {
260 return fseek(this->file, 0, SEEK_END);
shared_ptr< PDBPage > PDBPagePtr
int seekPage(PageID pageId)
unsigned int getAndSetNumFlushedPages() override
int seekLastFlushedPageID()
size_t loadPage(PageID pageId, char *pageInCache, size_t length)
PageID getLatestPageID() override
PageID getLastFlushedPageID() override
size_t getPageSize() override
int writeData(void *data, size_t length)
int updateMeta() override
std::shared_ptr< PDBLogger > PDBLoggerPtr
unsigned int getNumFlushedPages() override
int appendPage(FilePartitionID partitionId, PDBPagePtr page) override
size_t getPageSizeInMeta() override
unsigned int FilePartitionID
SequenceFile(NodeID nodeId, DatabaseID dbId, UserTypeID typeId, SetID setId, string path, pdb::PDBLoggerPtr logger, size_t pageSize)