Skip to content

Commit

Permalink
[HOPSWORKS-1578] skip projectId - -1 and deleted project operations (#49
Browse files Browse the repository at this point in the history
)
  • Loading branch information
o-alex authored Feb 6, 2020
1 parent 7a3bc62 commit 3c9bc6c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 14 deletions.
13 changes: 8 additions & 5 deletions include/FileProvenanceElasticDataReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,26 +33,29 @@ typedef boost::tuple<std::list<std::string>, FileProvenancePK, boost::optional<F

class FileProvenanceElasticDataReader : public NdbDataReader<FileProvenanceRow, SConn> {
public:
FileProvenanceElasticDataReader(SConn connection, const bool hopsworks);
FileProvenanceElasticDataReader(SConn hopsConn, const bool hopsworks, int lru_cap);
virtual ~FileProvenanceElasticDataReader();
protected:

private:
FileProvenanceLogTable mFileLogTable;
FileProvenanceXAttrBufferTable mXAttrBuffer;
INodeTable inodesTable;

void processAddedandDeleted(Pq* data_batch, eBulk& bulk);
ProcessRowResult process_row(FileProvenanceRow row);
FPXAttrBufferRow readBufferedXAttr(FPXAttrBufferPK xattrBufferKey);
FileProvenanceConstants::ProvOpStoreType readProvType(FileProvenanceRow row);
boost::optional<FPXAttrBufferRow> getProvCore(FPXAttrVersionsK versionsKey);
};

class FileProvenanceElasticDataReaders : public NdbDataReaders<FileProvenanceRow, SConn>{
public:
FileProvenanceElasticDataReaders(SConn* connections, int num_readers,const bool hopsworks,
TimedRestBatcher* restEndpoint) :
FileProvenanceElasticDataReaders(SConn* hopsConns, int num_readers,const bool hopsworks,
TimedRestBatcher* restEndpoint, int lru_cap) :
NdbDataReaders(restEndpoint){
for(int i=0; i<num_readers; i++){
FileProvenanceElasticDataReader* dr = new FileProvenanceElasticDataReader(connections[i], hopsworks);
FileProvenanceElasticDataReader* dr
= new FileProvenanceElasticDataReader(hopsConns[i], hopsworks, lru_cap);
dr->start(i, this);
mDataReaders.push_back(dr);
}
Expand Down
17 changes: 12 additions & 5 deletions src/FileProvenanceElasticDataReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#include "FileProvenanceElasticDataReader.h"

FileProvenanceElasticDataReader::FileProvenanceElasticDataReader(SConn connection, const bool hopsworks)
: NdbDataReader(connection, hopsworks) {
FileProvenanceElasticDataReader::FileProvenanceElasticDataReader(SConn hopsConn, const bool hopsworks, int lru_cap)
: NdbDataReader(hopsConn, hopsworks), inodesTable(lru_cap) {
}

class ElasticHelper {
Expand Down Expand Up @@ -495,9 +495,16 @@ ProcessRowResult FileProvenanceElasticDataReader::process_row(FileProvenanceRow
row.mProjectId = projectIId.get();
} else {
std::stringstream cause;
cause << "no project id" << row.to_string();
LOG_ERROR(cause.str());
throw std::logic_error(cause.str());
cause << "no project id - skipping operation" << row.to_string();
LOG_WARN(cause.str());
return boost::make_tuple(bulkOps, row.getPK(), boost::none);
}
INodeRow inode = inodesTable.getByInodeId(mNdbConnection, row.mProjectId);
if(inode.mId != row.mProjectId) {
std::stringstream cause;
cause << "no project inode(deleted?) - skipping operation" << row.to_string();
LOG_DEBUG(cause.str());
return boost::make_tuple(bulkOps, row.getPK(), boost::none);
}

switch (fileOp) {
Expand Down
8 changes: 4 additions & 4 deletions src/Notifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,12 @@ void Notifier::setup() {
elastic_file_provenance_tailer_connection, elastic_file_provenance_tailer_recovery_connection,
mPollMaxTimeToWait, mBarrier);

SConn* elastic_file_provenance_connections = new SConn[mFileProvenanceTU.mNumReaders];
SConn* file_prov_hops_connections = new SConn[mFileProvenanceTU.mNumReaders];
for (int i = 0; i < mFileProvenanceTU.mNumReaders; i++) {
elastic_file_provenance_connections[i] = create_ndb_connection(mDatabaseName);
file_prov_hops_connections[i] = create_ndb_connection(mDatabaseName);
}
mFileProvenanceElasticDataReaders = new FileProvenanceElasticDataReaders(elastic_file_provenance_connections,
mFileProvenanceTU.mNumReaders, mHopsworksEnabled, mFileProvenanceElastic);
mFileProvenanceElasticDataReaders = new FileProvenanceElasticDataReaders(file_prov_hops_connections,
mFileProvenanceTU.mNumReaders, mHopsworksEnabled, mFileProvenanceElastic, mLRUCap);
mFileProvenanceBatcher = new RCBatcher<FileProvenanceRow, SConn>(
mFileProvenanceTableTailer, mFileProvenanceElasticDataReaders,
mFileProvenanceTU.mWaitTime, mFileProvenanceTU.mBatchSize);
Expand Down

0 comments on commit 3c9bc6c

Please sign in to comment.