Skip to content

Commit

Permalink
[HOPSWORKS-1372] Upgrade the ELK stack to 7.2 OSS and use the OpenDis…
Browse files Browse the repository at this point in the history
…tro Security Plugin (#43)
  • Loading branch information
maismail authored and SirOibaf committed Dec 11, 2019
1 parent b3990cd commit d1146de
Show file tree
Hide file tree
Showing 20 changed files with 204 additions and 34 deletions.
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ set(Boost_USE_STATIC_LIBS ON)

find_package(Boost 1.70 REQUIRED COMPONENTS system date_time program_options thread)
find_package(RapidJSON 1.1.0)
find_package(OpenSSL REQUIRED)

include_directories(${Boost_INCLUDE_DIRS})
include_directories(${RAPIDJSON_INCLUDE_DIRS})
include_directories (${CMAKE_SOURCE_DIR}/include)
include_directories(${OPENSSL_INCLUDE_DIR})

if(NOT NDB_DIR)
set(NDB_DIR "/usr/local/mysql")
Expand All @@ -32,4 +34,4 @@ file(GLOB SOURCE ${CMAKE_SOURCE_DIR}/src/*.cpp)

add_executable(ePipe ${SOURCE})

target_link_libraries(ePipe ${Boost_LIBRARIES} ndbclient pthread)
target_link_libraries(ePipe ${Boost_LIBRARIES} ndbclient pthread OpenSSL::SSL)
5 changes: 5 additions & 0 deletions config.ini.template
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ app_provenance_index = appprovenance
elastic_batch = 5000
ewait_time = 5000

ssl_enabled = false
ca_path =
username =
password =

# Metrics Server
stats = true
metricsServer = 0.0.0.0:29191
2 changes: 1 addition & 1 deletion include/AppProvenanceElastic.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

class AppProvenanceElastic : public ElasticSearchWithMetrics {
public:
AppProvenanceElastic(std::string elastic_addr, std::string index,
AppProvenanceElastic(const HttpClientConfig elastic_client_config, std::string index,
int time_to_wait_before_inserting, int bulk_size,
const bool stats, SConn conn);

Expand Down
2 changes: 1 addition & 1 deletion include/ElasticSearchBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
class ElasticSearchBase : public TimedRestBatcher, public
MetricsProvider {
public:
ElasticSearchBase(std::string elastic_addr, int time_to_wait_before_inserting, int bulk_size);
ElasticSearchBase(const HttpClientConfig elastic_client_config, int time_to_wait_before_inserting, int bulk_size);

virtual ~ElasticSearchBase();

Expand Down
2 changes: 1 addition & 1 deletion include/ElasticSearchWithMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ struct ElasticMovingCountersSet : public ElasticMovingCounters{

class ElasticSearchWithMetrics : public ElasticSearchBase {
public:
ElasticSearchWithMetrics(std::string elastic_addr, int time_to_wait_before_inserting, int bulk_size, const bool stats);
ElasticSearchWithMetrics(const HttpClientConfig elastic_client_config, int time_to_wait_before_inserting, int bulk_size, const bool stats);

virtual ~ElasticSearchWithMetrics();

Expand Down
2 changes: 1 addition & 1 deletion include/FileProvenanceElastic.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

class FileProvenanceElastic : public ElasticSearchWithMetrics {
public:
FileProvenanceElastic(std::string elastic_addr,int time_to_wait_before_inserting, int bulk_size,
FileProvenanceElastic(const HttpClientConfig elastic_client_config,int time_to_wait_before_inserting, int bulk_size,
const bool stats, SConn conn);

virtual ~FileProvenanceElastic();
Expand Down
5 changes: 3 additions & 2 deletions include/Notifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class Notifier : public ClusterConnectionBase {
Notifier(const char* connection_string, const char* database_name,
const char* meta_database_name, const char* hive_meta_database_name,
const TableUnitConf mutations_tu, const TableUnitConf schemabased_tu,const TableUnitConf provenance_tu,
const int poll_maxTimeToWait, const std::string elastic_addr, const bool hopsworks, const std::string elastic_index,
const int poll_maxTimeToWait, const HttpClientConfig
elastic_client_config, const bool hopsworks, const std::string elastic_index,
const std::string elastic_app_provenance_index,
const int elastic_batch_size, const int elastic_issue_time, const int lru_cap, const bool recovery, const bool stats,
Barrier barrier, const bool hiveCleaner, const std::string
Expand All @@ -59,7 +60,7 @@ class Notifier : public ClusterConnectionBase {
const TableUnitConf mAppProvenanceTU;

const int mPollMaxTimeToWait;
const std::string mElasticAddr;
const HttpClientConfig mElasticClientConfig;
const bool mHopsworksEnabled;
const std::string mElasticIndex;
const std::string mElasticAppProvenanceIndex;
Expand Down
4 changes: 2 additions & 2 deletions include/ProjectsElasticSearch.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ struct MovingCountersSet : public MovingCounters{

class ProjectsElasticSearch : public ElasticSearchBase{
public:
ProjectsElasticSearch(std::string elastic_addr, std::string index,
int time_to_wait_before_inserting, int bulk_size,
ProjectsElasticSearch(const HttpClientConfig elastic_client_config,
std::string index, int time_to_wait_before_inserting, int bulk_size,
const bool stats, MConn conn);

void addDataset(Int64 inodeId, std::string json);
Expand Down
2 changes: 1 addition & 1 deletion include/Reindexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Reindexer : public ClusterConnectionBase {
public:
Reindexer(const char* connection_string, const char* database_name,
const char* meta_database_name, const char* hive_meta_database_name,
const std::string elastic_addr, const std::string index, int
const HttpClientConfig elastic_client_config, const std::string index, int
elastic_batch_size, int elastic_issue_time, int lru_cap);
virtual ~Reindexer();

Expand Down
2 changes: 1 addition & 1 deletion include/TimedRestBatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ struct eBulk {

class TimedRestBatcher : public Batcher {
public:
TimedRestBatcher(std::string endpoint_addr, int time_to_wait_before_inserting, int bulk_size);
TimedRestBatcher(const HttpClientConfig elastic_client_config, int time_to_wait_before_inserting, int bulk_size);

void addData(eBulk data);

Expand Down
144 changes: 143 additions & 1 deletion include/http/HttpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,64 @@
#include <boost/beast/version.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <boost/archive/iterators/binary_from_base64.hpp>
#include <boost/archive/iterators/base64_from_binary.hpp>
#include <boost/archive/iterators/transform_width.hpp>
#include <boost/algorithm/string.hpp>

namespace beast = boost::beast;
namespace http = beast::http;
namespace net = boost::asio;
using tcp = net::ip::tcp;
namespace ssl = net::ssl;

struct HttpResponse{
bool mSuccess;
unsigned int mCode;
std::string mResponse;
};

struct HttpClientConfig{
std::string mAddr;
bool mSSLEnabled;
std::string mCAPath;
std::string mUserName;
std::string mPassword;

bool isValidUserAndPass(){
return mUserName != "" && mPassword != "";
}

std::string getAuthorization(){
std::string userNP = mUserName + ":" + mPassword;
std::string authorization = "Basic " + encode64(userNP);
return authorization;
}

private:
std::string encode64(const std::string &val) {
using namespace boost::archive::iterators;
using It = base64_from_binary<transform_width<std::string::const_iterator, 6, 8>>;
auto tmp = std::string(It(std::begin(val)), It(std::end(val)));
return tmp.append((3 - val.size() % 3) % 3, '=');
}

std::string decode64(const std::string &val) {
using namespace boost::archive::iterators;
using It = transform_width<binary_from_base64<std::string::const_iterator>, 8, 6>;
return boost::algorithm::trim_right_copy_if(std::string(It(std::begin(val)), It(std::end(val))), [](char c) {
return c == '\0';
});
}
};

class HttpClient{
public:
HttpClient(std::string addr){
HttpClient(const HttpClientConfig config){
std::string addr = config.mAddr;
try {
auto const i = addr.find(":");
auto const ip = addr.substr(0, i);
Expand All @@ -51,6 +94,7 @@ class HttpClient{
LOG_FATAL("error in http address format [" << addr << "]"
<< " only ips allowed : " << ex.code() << " " << ex.what());
}
mConfig = config;
}

HttpResponse get(std::string target){
Expand All @@ -66,13 +110,23 @@ class HttpClient{
}
private:
tcp::endpoint mEndpoint;
HttpClientConfig mConfig;

HttpResponse request(http::verb verb, std::string
target){
return request(verb, target, "");
}

HttpResponse request(http::verb verb, std::string
target, std::string data){
if(mConfig.mSSLEnabled){
return request_with_ssl(verb, target, data);
}else{
return request_no_ssl(verb, target, data);
}
}

HttpResponse request_no_ssl(http::verb verb, std::string
target, std::string data){

std::string responseBody;
Expand All @@ -91,6 +145,10 @@ class HttpClient{
req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
req.set(http::field::content_type, "application/json");

if(mConfig.isValidUserAndPass()) {
req.set(http::field::authorization, mConfig.getAuthorization());
}

if(data.length() != 0){
req.body() = data;
req.set(http::field::content_length, data.length());
Expand Down Expand Up @@ -124,5 +182,89 @@ class HttpClient{
return {succeed, code, responseBody};
}

HttpResponse request_with_ssl(http::verb verb, std::string
target, std::string data){

std::string responseBody;
bool succeed = true;
unsigned int code = 0;

try
{
net::io_context ioc;

// The SSL context is required, and holds certificates
ssl::context ctx(ssl::context::tlsv12_client);
load_ca_certificates(ctx);
ctx.set_verify_mode(ssl::verify_peer);

beast::ssl_stream<beast::tcp_stream> stream(ioc, ctx);

// Set SNI Hostname (many hosts need this to handshake successfully)
if(! SSL_set_tlsext_host_name(stream.native_handle(), mEndpoint.address
().to_string().c_str()))
{
beast::error_code ec{-1, net::error::get_ssl_category()};
throw beast::system_error{ec};
}

beast::get_lowest_layer(stream).connect(mEndpoint);
stream.handshake(ssl::stream_base::client);

http::request<http::string_body> req{verb, target, 11};
req.set(http::field::host, mEndpoint.address().to_string());
req.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);
req.set(http::field::content_type, "application/json");

if(mConfig.isValidUserAndPass()) {
req.set(http::field::authorization, mConfig.getAuthorization());
}

if(data.length() != 0){
req.body() = data;
req.set(http::field::content_length, data.length());
req.prepare_payload();
}

http::write(stream, req);

beast::flat_buffer buffer;

http::response<http::dynamic_body> response;

http::read(stream, buffer, response);

responseBody = beast::buffers_to_string(response.body().data());
code = response.result_int();

beast::error_code ec;
stream.shutdown(ec);

if(ec && ec != beast::errc::not_connected){
LOG_ERROR("Error in http connection with error code "<< ec);
succeed = false;
}
}catch(std::exception const& e)
{
LOG_ERROR("Error in http connection : " << e.what());
succeed = false;
}

return {succeed, code, responseBody};
}


void load_ca_certificates(ssl::context& ctx){
std::ifstream ifs(mConfig.mCAPath.c_str());
std::string const cacert = std::string(std::istreambuf_iterator<char>(ifs),
std::istreambuf_iterator<char>());

boost::system::error_code ec;

ctx.add_certificate_authority(
boost::asio::buffer(cacert.data(), cacert.size()), ec);
if(ec)
LOG_ERROR("Error while loading the ca certificates : " << ec);
}
};
#endif //EPIPE_HTTPCLIENT_H
5 changes: 3 additions & 2 deletions src/AppProvenanceElastic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

#include "AppProvenanceElastic.h"

AppProvenanceElastic::AppProvenanceElastic(std::string elastic_addr, std::string index,
AppProvenanceElastic::AppProvenanceElastic(const HttpClientConfig elastic_client_config, std::string index,
int time_to_wait_before_inserting, int bulk_size, const bool stats, SConn conn) :
ElasticSearchWithMetrics(elastic_addr, time_to_wait_before_inserting, bulk_size, stats),
ElasticSearchWithMetrics(elastic_client_config, time_to_wait_before_inserting,
bulk_size, stats),
mIndex(index), mConn(conn) {
mElasticBulkAddr = getElasticSearchBulkUrl(mIndex);
}
Expand Down
4 changes: 2 additions & 2 deletions src/ElasticSearchBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

#include "ElasticSearchBase.h"

ElasticSearchBase::ElasticSearchBase(std::string elastic_addr, int time_to_wait_before_inserting, int bulk_size)
: TimedRestBatcher(elastic_addr, time_to_wait_before_inserting, bulk_size), DEFAULT_TYPE("_doc") {
ElasticSearchBase::ElasticSearchBase(const HttpClientConfig elastic_client_config, int time_to_wait_before_inserting, int bulk_size)
: TimedRestBatcher(elastic_client_config, time_to_wait_before_inserting, bulk_size), DEFAULT_TYPE("_doc") {
}

std::string ElasticSearchBase::getElasticSearchUrlonIndex(std::string index) {
Expand Down
5 changes: 3 additions & 2 deletions src/ElasticSearchWithMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
*/
#include "ElasticSearchWithMetrics.h"

ElasticSearchWithMetrics::ElasticSearchWithMetrics(std::string elastic_addr, int time_to_wait_before_inserting, int bulk_size, const bool stats)
: ElasticSearchBase(elastic_addr, time_to_wait_before_inserting, bulk_size), mStats(stats), mStartTime(Utils::getCurrentTime()) {
ElasticSearchWithMetrics::ElasticSearchWithMetrics(const HttpClientConfig elastic_client_config, int time_to_wait_before_inserting, int bulk_size, const bool stats)
: ElasticSearchBase(elastic_client_config, time_to_wait_before_inserting,
bulk_size), mStats(stats), mStartTime(Utils::getCurrentTime()) {
}

ElasticSearchWithMetrics::~ElasticSearchWithMetrics() {
Expand Down
5 changes: 3 additions & 2 deletions src/FileProvenanceElastic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
#include <FileProvenanceConstants.h>
#include "FileProvenanceElastic.h"

FileProvenanceElastic::FileProvenanceElastic(std::string elastic_addr, int time_to_wait_before_inserting,
FileProvenanceElastic::FileProvenanceElastic(const HttpClientConfig elastic_client_config, int time_to_wait_before_inserting,
int bulk_size, const bool stats, SConn conn) :
ElasticSearchWithMetrics(elastic_addr, time_to_wait_before_inserting, bulk_size, stats), mConn(conn) {}
ElasticSearchWithMetrics(elastic_client_config,
time_to_wait_before_inserting, bulk_size, stats), mConn(conn) {}

void FileProvenanceElastic::intProcessOneByOne(eBulk bulk) {
std::vector<eBulk>::iterator itB, endB;
Expand Down
Loading

0 comments on commit d1146de

Please sign in to comment.