A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
OutputIterator.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 
19 #ifndef OUTPUT_ITER_H
20 #define OUTPUT_ITER_H
21 
22 #include "PDBDebug.h"
23 #include "Query.h"
24 #include "Handle.h"
25 #include "DoneWithResult.h"
26 #include "PDBCommunicator.h"
27 #include "PDBString.h"
28 #include "KeepGoing.h"
29 #include <string>
30 #include <memory>
31 #include <snappy.h>
33 
34 namespace pdb {
35 
36 template <class OutType>
38 
39 public:
40  bool operator!=(const OutputIterator& me) const {
41  if (connection != nullptr || me.connection != nullptr)
42  return true;
43  return false;
44  }
45 
47  return ((*data)[pos]);
48  }
49 
50  int getSize() {
51  return size;
52  }
53 
54  int getPos() {
55  return pos;
56  }
57 
58  void operator++() {
59  if (pos == size - 1) {
60 
61  // for allocations
62  const UseTemporaryAllocationBlock tempBlock{1024};
63 
64  // for errors
65  std::string errMsg;
66 
67  // free the last page
68  Handle<KeepGoing> temp;
69  if (page != nullptr) {
70 
71  // if we don't have this line, we'll still be pointing into the freed page
72  data = nullptr;
73  free(page);
74  page = nullptr;
75  temp = makeObject<KeepGoing>();
76  if (!connection->sendObject(temp, errMsg)) {
77  std::cout << "Problem sending request: " << errMsg << "\n";
78  connection = nullptr;
79  return;
80  }
81  // std :: cout << "sent keep going" << std :: endl;
82  }
83 
84  // get the next page
85  size_t objSize = connection->getSizeOfNextObject();
86  // std :: cout << "to receive " << objSize << " bytes" << std :: endl;
87  // if the file is done, then we're good
88  if (connection->getObjectTypeID() == DoneWithResult_TYPEID) {
89  connection = nullptr;
90  return;
91  }
92 #ifdef ENABLE_COMPRESSION
93  char* readToHere = new char[objSize];
94 #else
95  void* readToHere = malloc(objSize);
96 #endif
97  // we've got some more data
98 
99  if (!connection->receiveBytes(readToHere, errMsg)) {
100  std::cout << "Problem getting data: " << errMsg << "\n";
101  connection = nullptr;
102  return;
103  }
104  size_t uncompressedSize = 0;
105  snappy::GetUncompressedLength(readToHere, objSize, &uncompressedSize);
106  page = (Record<Vector<Handle<OutType>>>*)malloc(uncompressedSize);
107 #ifdef ENABLE_COMPRESSION
108  snappy::RawUncompress(readToHere, objSize, (char*)(page));
109 #else
110  memcpy(page, readToHere, objSize);
111 #endif
112 
113 #ifdef ENABLE_COMPRESSION
114  delete[] readToHere;
115 #else
116  free(readToHere);
117 #endif
118  // gets the vector that we are going to iterate over
119  data = page->getRootObject();
120  // std :: cout << "to obtain size of vector" << std :: endl;
121  size = data->size();
122  // std :: cout << "got a page with size="<< size << std :: endl;
123  // added by Jia to fix a segfault when size=0
124  if (size > 0) {
125  pos = 0;
126  } else {
127  connection = nullptr;
128  return;
129  }
130 
131  } else {
132  pos++;
133  }
134  }
135 
137  connection = connectionIn;
138  data = nullptr;
139  page = nullptr;
140 
141  // get the ball rolling!!
142  this->operator++();
143  }
144 
146  connection = nullptr;
147  data = nullptr;
148  page = nullptr;
149  }
150 
152 
153  // make sure we don't leave a page sitting around
154  data = nullptr;
155  if (page != nullptr)
156  free(page);
157 
158  // nothing to do if we don't have a connection
159  if (connection == nullptr)
160  return;
161 
162  // for allocations
163  const UseTemporaryAllocationBlock tempBlock{1024};
164 
165  // tell the server we are done
166  Handle<DoneWithResult> temp = makeObject<DoneWithResult>();
167  std::string errMsg;
168  if (!connection->sendObject(temp, errMsg)) {
169  std::cout << "Problem sending done message: " << errMsg << "\n";
170  connection = nullptr;
171  return;
172  }
173  PDB_COUT << "send done message" << std::endl;
174  }
175 
176 private:
177  int size = 0;
178  int pos = -1;
182 };
183 }
184 
185 #endif
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
Record< Vector< Handle< OutType > > > * page
#define PDB_COUT
Definition: PDBDebug.h:31
bool operator!=(const OutputIterator &me) const
OutputIterator(PDBCommunicatorPtr connectionIn)
Handle< Vector< Handle< OutType > > > data
PDBCommunicatorPtr connection
Handle< OutType > & operator*() const