Skip to content

Commit

Permalink
Merge pull request #7 from logicalclocks/HOPSWORKS-779
Browse files Browse the repository at this point in the history
[HOPSWORKS] [ePipe] Fix a race condition while reading from ndb
  • Loading branch information
maismail authored Nov 5, 2018
2 parents e9dae3d + 8312264 commit 9c4db07
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
5 changes: 3 additions & 2 deletions include/ConcurrentPriorityQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ConcurrentPriorityQueue {
public:
ConcurrentPriorityQueue();
void push(Data data);
void top(Data& result);
void pop(Data& result);
void pop();
void wait_and_pop(Data &result);
bool empty();
Expand Down Expand Up @@ -68,9 +68,10 @@ void ConcurrentPriorityQueue<Data, DataCompartor>::wait_and_pop(Data& result) {
}

template<typename Data, typename DataCompartor>
void ConcurrentPriorityQueue<Data, DataCompartor>::top(Data& result) {
void ConcurrentPriorityQueue<Data, DataCompartor>::pop(Data& result) {
boost::mutex::scoped_lock lock(mLock);
result = mQueue.top();
mQueue.pop();
}

template<typename Data, typename DataCompartor>
Expand Down
4 changes: 2 additions & 2 deletions include/NdbDataReaders.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ template<typename Data, typename Conn, typename Keys>
void NdbDataReaders<Data, Conn, Keys>::processWaiting() {
while(!mWaitingOutQueue->empty()){
Bulk<Keys> out;
mWaitingOutQueue->top(out);
mWaitingOutQueue->pop(out);
if(out.mProcessingIndex == mLastSent + 1){
LOG_INFO("publish enriched events with index [" << out.mProcessingIndex << "] to Elastic");
mElasticSearch->addData(out);
mLastSent++;
mWaitingOutQueue->pop();
}else{
mWaitingOutQueue->push(out);
break;
}
}
Expand Down

0 comments on commit 9c4db07

Please sign in to comment.