A platform for high-performance distributed tool and library development written in C++. It can be deployed in two different cluster modes: standalone or distributed. API for v0.5.0, released on June 13, 2018.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
PipelineStage.h
Go to the documentation of this file.
1 /*****************************************************************************
2  * *
3  * Copyright 2018 Rice University *
4  * *
5  * Licensed under the Apache License, Version 2.0 (the "License"); *
6  * you may not use this file except in compliance with the License. *
7  * You may obtain a copy of the License at *
8  * *
9  * http://www.apache.org/licenses/LICENSE-2.0 *
10  * *
11  * Unless required by applicable law or agreed to in writing, software *
12  * distributed under the License is distributed on an "AS IS" BASIS, *
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
14  * See the License for the specific language governing permissions and *
15  * limitations under the License. *
16  * *
17  *****************************************************************************/
18 #ifndef PIPELINE_STAGE_H
19 #define PIPELINE_STAGE_H
20 
21 
22 #include "PDBVector.h"
23 #include "SimpleRequest.h"
24 #include "SimpleRequestResult.h"
25 #include "SimpleSendDataRequest.h"
26 #include "DataTypes.h"
27 #include "PDBLogger.h"
28 #include "Configuration.h"
29 #include "SharedMem.h"
30 #include "TupleSetJobStage.h"
31 #include "HermesExecutionServer.h"
32 #include "PartitionedHashSet.h"
33 #include "SetSpecifier.h"
34 #include "DataPacket.h"
35 #include <vector>
36 #include <memory>
37 #include <unordered_map>
38 #include <algorithm>
39 #include <ctime>
40 #include <cstdlib>
41 
42 namespace pdb {
43 
44 
46 typedef std::shared_ptr<PipelineStage> PipelineStagePtr;
47 
48 
49 // this class encapsulates the pipeline stage, which we call TupleSetJobStage
50 // A TupleSetJobStage can have multiple different sources and different sinks
51 
52 // Supported sources:
53 // -- VectorTupleSetIterator
54 // -- MapTupleSetIterator
55 // -- JoinTupleSetIterator
56 
57 // Supported sinks:
58 // -- ShuffleSink for aggregation
59 // -- BroadcastSink for broadcast join
60 // -- HashPartitionSink for hash partitioned join
61 // -- HashPartitionVectorSink for partitioning vectors
62 // -- UserSetSink for data materialization
63 
65 
66 private:
67  // Job stage
69 
70  // batch size
71  size_t batchSize;
72 
73  // number of threads
75 
76  // node id
78 
79  // logger
81 
82  // configuration
84 
85  // shared memory
87 
88  // operator id
90 
91  // vector of nodeId for shuffling
92  std::vector<int> nodeIds;
93 
94 
95 public:
96  // destructor
98 
99  // constructor
104  NodeID nodeId,
105  size_t batchSize,
106  int numThreads);
107 
108  // store shuffle data
110  std::string databaseName,
111  std::string setName,
112  std::string address,
113  int port,
114  bool whetherToPersiste,
115  std::string& errMsg);
116  bool storeCompressedShuffleData(char* bytes,
117  size_t numBytes,
118  std::string databaseName,
119  std::string setName,
120  std::string address,
121  int port,
122  std::string& errMsg);
123  // send Shuffle data
124  bool sendData(PDBCommunicatorPtr conn,
125  void* bytes,
126  size_t size,
127  std::string databaseName,
128  std::string setName,
129  std::string& errMsg);
130 
131  // tuning the backend circular buffer size
132  size_t getBackendCircularBufferSize(bool& success, std::string& errMsg);
133 
134  // get iterators to scan a user set
135  std::vector<PageCircularBufferIteratorPtr> getUserSetIterators(HermesExecutionServer* server,
136  int numThreads,
137  bool& success,
138  std::string& errMsg);
139 
140  // get bufferss to scan a user set in a sharing way, so that each iterator is linked to a buffer
141  // and will scan all pages
143  std::vector<PageCircularBufferPtr>& sourceBuffers,
144  int numPartitions,
145  int& counter,
146  PDBBuzzerPtr tempBuzzer,
147  bool& success,
148  std::string& errMsg);
149 
150  // create proxy
151  DataProxyPtr createProxy(int i, pthread_mutex_t connection_mutex, std::string& errMsg);
152 
153  // execute pipeline
154  void executePipelineWork(int i,
155  SetSpecifierPtr outputSet,
156  std::vector<PageCircularBufferIteratorPtr>& iterators,
157  PartitionedHashSetPtr hashSet,
158  DataProxyPtr proxy,
159  std::vector<PageCircularBufferPtr>& sinkBuffers,
160  HermesExecutionServer* server,
161  std::string& errMsg);
162 
163  // return the root job stage corresponding to the pipeline
165 
166  // return the number of threads that are required to run the pipeline network
167  int getNumThreads();
168 
169  // run the pipeline stage
170  void runPipeline(HermesExecutionServer* server,
171  std::vector<PageCircularBufferPtr> combinerBuffers,
172  SetSpecifierPtr outputSet);
173 
174  // run a pipeline without combining
175  void runPipeline(HermesExecutionServer* server);
176 
177  // run a pipeline with combiner queue
179 
180  // run a pipeline with shuffle buffers
182 
183  // run a pipeline with hash partitioning for JoinMaps
185 
186 
187 };
188 }
189 
190 
191 #endif
bool storeCompressedShuffleData(char *bytes, size_t numBytes, std::string databaseName, std::string setName, std::string address, int port, std::string &errMsg)
void runPipelineWithShuffleSink(HermesExecutionServer *server)
void runPipeline(HermesExecutionServer *server, std::vector< PageCircularBufferPtr > combinerBuffers, SetSpecifierPtr outputSet)
shared_ptr< DataProxy > DataProxyPtr
Definition: DataProxy.h:30
unsigned int OperatorID
Definition: DataTypes.h:36
bool storeShuffleData(Handle< Vector< Handle< Object >>> data, std::string databaseName, std::string setName, std::string address, int port, bool whetherToPersiste, std::string &errMsg)
unsigned int NodeID
Definition: DataTypes.h:27
Handle< TupleSetJobStage > & getJobStage()
std::vector< PageCircularBufferIteratorPtr > getUserSetIterators(HermesExecutionServer *server, int numThreads, bool &success, std::string &errMsg)
Handle< TupleSetJobStage > jobStage
Definition: PipelineStage.h:68
bool sendData(PDBCommunicatorPtr conn, void *bytes, size_t size, std::string databaseName, std::string setName, std::string &errMsg)
shared_ptr< SharedMem > SharedMemPtr
Definition: SharedMem.h:32
std::shared_ptr< PartitionedHashSet > PartitionedHashSetPtr
void feedSharedBuffers(HermesExecutionServer *server, std::vector< PageCircularBufferPtr > &sourceBuffers, int numPartitions, int &counter, PDBBuzzerPtr tempBuzzer, bool &success, std::string &errMsg)
std::shared_ptr< PDBCommunicator > PDBCommunicatorPtr
void runPipelineWithBroadcastSink(HermesExecutionServer *server)
PDBLoggerPtr logger
Definition: PipelineStage.h:80
void executePipelineWork(int i, SetSpecifierPtr outputSet, std::vector< PageCircularBufferIteratorPtr > &iterators, PartitionedHashSetPtr hashSet, DataProxyPtr proxy, std::vector< PageCircularBufferPtr > &sinkBuffers, HermesExecutionServer *server, std::string &errMsg)
SharedMemPtr shm
Definition: PipelineStage.h:86
shared_ptr< PDBBuzzer > PDBBuzzerPtr
Definition: PDBBuzzer.h:32
shared_ptr< Configuration > ConfigurationPtr
Definition: Configuration.h:89
std::shared_ptr< PipelineStage > PipelineStagePtr
Definition: PipelineStage.h:45
std::shared_ptr< PDBLogger > PDBLoggerPtr
Definition: PDBLogger.h:40
size_t getBackendCircularBufferSize(bool &success, std::string &errMsg)
void runPipelineWithHashPartitionSink(HermesExecutionServer *server)
std::shared_ptr< SetSpecifier > SetSpecifierPtr
Definition: SetSpecifier.h:28
ConfigurationPtr conf
Definition: PipelineStage.h:83
std::vector< int > nodeIds
Definition: PipelineStage.h:92
DataProxyPtr createProxy(int i, pthread_mutex_t connection_mutex, std::string &errMsg)
PipelineStage(Handle< TupleSetJobStage > stage, SharedMemPtr shm, PDBLoggerPtr logger, ConfigurationPtr conf, NodeID nodeId, size_t batchSize, int numThreads)