From bddd445e7fb686e7292daac4f0cc14385401df50 Mon Sep 17 00:00:00 2001 From: gcca Date: Fri, 16 Apr 2021 13:17:46 -0500 Subject: [PATCH 01/14] [feature/table-from-postgresl-sqlite] update parser type names --- .../src/io/data_parser/sql/SQLiteParser.cpp | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/engine/src/io/data_parser/sql/SQLiteParser.cpp b/engine/src/io/data_parser/sql/SQLiteParser.cpp index dd4f995d4..77dfe9a71 100644 --- a/engine/src/io/data_parser/sql/SQLiteParser.cpp +++ b/engine/src/io/data_parser/sql/SQLiteParser.cpp @@ -61,23 +61,23 @@ cudf::type_id parse_sqlite_column_type(std::string t) { return std::tolower(c); }); if (sqlite_is_cudf_string(t)) return cudf::type_id::STRING; - if (t == "tinyint") { return cudf::type_id::INT8; } - if (t == "smallint") { return cudf::type_id::INT8; } - if (t == "mediumint") { return cudf::type_id::INT16; } - if (t == "int") { return cudf::type_id::INT32; } - if (t == "integer") { return cudf::type_id::INT32; } - if (t == "bigint") { return cudf::type_id::INT64; } - if (t == "unsigned big int") { return cudf::type_id::UINT64; } - if (t == "int2") { return cudf::type_id::INT16; } - if (t == "int8") { return cudf::type_id::INT64; } - if (t == "real") { return cudf::type_id::FLOAT32; } - if (t == "double") { return cudf::type_id::FLOAT64; } - if (t == "double precision") { return cudf::type_id::FLOAT64; } - if (t == "float") { return cudf::type_id::FLOAT32; } - if (t == "decimal") { return cudf::type_id::FLOAT64; } - if (t == "boolean") { return cudf::type_id::UINT8; } - if (t == "date") { return cudf::type_id::TIMESTAMP_MILLISECONDS; } - if (t == "datetime") { return cudf::type_id::TIMESTAMP_MILLISECONDS; } + if (!t.rfind("tinyint", 0)) { return cudf::type_id::INT8; } + if (!t.rfind("smallint", 0)) { return cudf::type_id::INT8; } + if (!t.rfind("mediumint", 0)) { return cudf::type_id::INT16; } + if (!t.rfind("int", 0)) { return cudf::type_id::INT32; } + if (!t.rfind("integer", 0)) { return cudf::type_id::INT32; } + if (!t.rfind("bigint", 0)) { return cudf::type_id::INT64; } + if (!t.rfind("unsigned big int", 0)) { return cudf::type_id::UINT64; } + if (!t.rfind("int2", 0)) { return cudf::type_id::INT16; } + if (!t.rfind("int8", 0)) { return cudf::type_id::INT64; } + if (!t.rfind("real", 0)) { return cudf::type_id::FLOAT32; } + if (!t.rfind("double", 0)) { return cudf::type_id::FLOAT64; } + if (!t.rfind("double precision", 0)) { return cudf::type_id::FLOAT64; } + if (!t.rfind("float", 0)) { return cudf::type_id::FLOAT32; } + if (!t.rfind("decimal", 0)) { return cudf::type_id::FLOAT64; } + if (!t.rfind("boolean", 0)) { return cudf::type_id::UINT8; } + if (!t.rfind("date", 0)) { return cudf::type_id::TIMESTAMP_MILLISECONDS; } + if (!t.rfind("datetime", 0)) { return cudf::type_id::TIMESTAMP_MILLISECONDS; } } sqlite_parser::sqlite_parser() : abstractsql_parser{DataType::SQLITE} {} From d9a3d9c33d0ac2cc8ee56270e0eaa0b568f83c5d Mon Sep 17 00:00:00 2001 From: gcca Date: Fri, 16 Apr 2021 13:18:15 -0500 Subject: [PATCH 02/14] [feature/table-from-postgresl-sqlite] update provider construction --- .../data_provider/sql/SQLiteDataProvider.cpp | 131 +++++++++--------- .../io/data_provider/sql/SQLiteDataProvider.h | 43 +++--- 2 files changed, 89 insertions(+), 85 deletions(-) diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp index cbff345d6..13a315e8f 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp @@ -12,6 +12,8 @@ - sql::SQLException (derived from std::runtime_error) */ +#include + #include "SQLiteDataProvider.h" #include "blazingdb/io/Util/StringUtil.h" @@ -28,14 +30,15 @@ struct sqlite_table_info { struct sqlite_columns_info { std::vector columns; std::vector types; - std::vector bytes; }; struct callb { - int sqlite_callback( - void * NotUsed, int argc, char ** argv, char ** azColName) { + int sqlite_callback(void * NotUsed, + int argc, + char ** argv, + char ** azColName) { int i; - for(i = 0; i < argc; i++) { + for (i = 0; i < argc; i++) { printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL"); } printf("\n"); @@ -43,32 +46,32 @@ struct callb { } }; -std::shared_ptr execute_sqlite_query( - sqlite3 * conn, const std::string & query) { +static inline std::shared_ptr +execute_sqlite_query(sqlite3 * db, const std::string & query) { sqlite3_stmt * stmt; const char * sql = query.c_str(); - int rc = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL); - if(rc != SQLITE_OK) { - printf("error: %s", sqlite3_errmsg(conn)); - // TODO percy error - } + + int errorCode = sqlite3_prepare_v2(db, sql, -1, &stmt, NULL); + if (errorCode != SQLITE_OK) { printf("error: %s", sqlite3_errmsg(db)); } + auto sqlite_deleter = [](sqlite3_stmt * pointer) { - std::cout << "sqlite smt deleted!!!!\n"; sqlite3_finalize(pointer); }; + std::shared_ptr ret(stmt, sqlite_deleter); + return ret; } -sqlite_table_info get_sqlite_table_info( - sqlite3 * db, const std::string & table) { +sqlite_table_info get_sqlite_table_info(sqlite3 * db, + const std::string & table) { sqlite_table_info ret; const std::string sql{"select count(*) from " + table}; int err = sqlite3_exec( db, sql.c_str(), [](void * data, int count, char ** rows, char **) -> int { - if(count == 1 && rows) { + if (count == 1 && rows) { sqlite_table_info & ret = *static_cast(data); ret.partitions.push_back("default"); // check for partitions api ret.rows = static_cast(atoi(rows[0])); @@ -78,7 +81,7 @@ sqlite_table_info get_sqlite_table_info( }, &ret, nullptr); - if(err != SQLITE_OK) { throw std::runtime_error("getting number of rows"); } + if (err != SQLITE_OK) { throw std::runtime_error("getting number of rows"); } return ret; } @@ -87,6 +90,7 @@ bool sqlite_is_string_col_type(const std::string & t) { std::vector mysql_string_types_hints = { "character", "varchar", + "char", "varying character", "nchar", "native character", @@ -96,15 +100,15 @@ bool sqlite_is_string_col_type(const std::string & t) { "string" // TODO percy ??? }; - for(auto hint : mysql_string_types_hints) { - if(StringUtil::beginsWith(t, hint)) return true; + for (auto hint : mysql_string_types_hints) { + if (StringUtil::beginsWith(t, hint)) return true; } return false; } -sqlite_columns_info get_sqlite_columns_info( - sqlite3 * conn, const std::string & table) { +sqlite_columns_info get_sqlite_columns_info(sqlite3 * conn, + const std::string & table) { // TODO percy error handling sqlite_columns_info ret; @@ -113,7 +117,7 @@ sqlite_columns_info get_sqlite_columns_info( sqlite3_stmt * stmt = A.get(); int rc = 0; - while((rc = sqlite3_step(stmt)) == SQLITE_ROW) { + while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { const unsigned char * name = sqlite3_column_text(stmt, 1); std::string col_name((char *) name); ret.columns.push_back(col_name); @@ -121,13 +125,14 @@ sqlite_columns_info get_sqlite_columns_info( const unsigned char * type = sqlite3_column_text(stmt, 2); std::string col_type((char *) type); - std::transform(col_type.cbegin(), + std::transform( + col_type.cbegin(), col_type.cend(), col_type.begin(), [](const std::string::value_type c) { return std::tolower(c); }); size_t max_bytes = 8; // TODO percy check max scalar bytes from sqlite - if(sqlite_is_string_col_type(col_type)) { + if (sqlite_is_string_col_type(col_type)) { // max_bytes = res->getUInt64("CHARACTER_MAXIMUM_LENGTH"); // TODO percy see how to get the max size for string/txt cols ... // see docs @@ -135,7 +140,7 @@ sqlite_columns_info get_sqlite_columns_info( } ret.types.push_back(col_type); } - if(rc != SQLITE_DONE) { + if (rc != SQLITE_DONE) { printf("error: %s", sqlite3_errmsg(conn)); // TODO percy error } @@ -143,38 +148,33 @@ sqlite_columns_info get_sqlite_columns_info( return ret; } -sqlite_data_provider::sqlite_data_provider( - const sql_info & sql, size_t total_number_of_nodes, size_t self_node_idx) +sqlite_data_provider::sqlite_data_provider(const sql_info & sql, + size_t total_number_of_nodes, + size_t self_node_idx) : abstractsql_data_provider(sql, total_number_of_nodes, self_node_idx), - sqlite_connection(nullptr), batch_position(0), current_row_count(0) { - sqlite3 * conn = nullptr; - int rc = sqlite3_open(sql.schema.c_str(), &conn); + db(nullptr), batch_position(0), current_row_count(0) { + int errorCode = sqlite3_open(sql.schema.c_str(), &db); - if(rc) { - fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(conn)); - // TODO percy error - } else { - fprintf(stdout, "Opened sqlite database successfully\n"); + if (errorCode != SQLITE_OK) { + throw std::runtime_error(std::string{"Can't open database: "} + + sqlite3_errmsg(db)); } - this->sqlite_connection = conn; - sqlite_table_info tbl_info = get_sqlite_table_info(conn, this->sql.table); - this->partitions = std::move(tbl_info.partitions); - this->row_count = tbl_info.rows; - sqlite_columns_info cols_info = - get_sqlite_columns_info(conn, this->sql.table); - this->column_names = cols_info.columns; - this->column_types = cols_info.types; - this->column_bytes = cols_info.bytes; -} + sqlite_table_info tbl_info = get_sqlite_table_info(db, sql.table); + partitions = std::move(tbl_info.partitions); + row_count = tbl_info.rows; -sqlite_data_provider::~sqlite_data_provider() { - sqlite3_close(this->sqlite_connection); + sqlite_columns_info cols_info = get_sqlite_columns_info(db, sql.table); + column_names = cols_info.columns; + column_types = cols_info.types; } +sqlite_data_provider::~sqlite_data_provider() { sqlite3_close(db); } + std::shared_ptr sqlite_data_provider::clone() { - return std::make_shared( - this->sql, this->total_number_of_nodes, this->self_node_idx); + return std::make_shared(this->sql, + this->total_number_of_nodes, + this->self_node_idx); } bool sqlite_data_provider::has_next() { @@ -184,38 +184,41 @@ bool sqlite_data_provider::has_next() { void sqlite_data_provider::reset() { this->batch_position = 0; } data_handle sqlite_data_provider::get_next(bool) { - std::string query; + data_handle ret; + + const std::int32_t limit = batch_position + sql.table_batch_size; + const std::int32_t offset = batch_position; + + std::ostringstream oss; + oss << "SELECT * FROM " << sql.table << " LIMIT " << limit << " OFFSET " + << offset; + std::string query = oss.str(); + + batch_position += sql.table_batch_size; - query = "SELECT * FROM " + this->sql.table + " LIMIT " + - std::to_string(this->batch_position + this->sql.table_batch_size) + - " OFFSET " + std::to_string(this->batch_position); - this->batch_position += this->sql.table_batch_size; + std::cout << "\033[32m>>> query: " << query << "\033[0m" << std::endl; + + std::shared_ptr stmt = execute_sqlite_query(db, query); - std::cout << "query: " << query << "\n"; - auto stmt = execute_sqlite_query(this->sqlite_connection, query); current_row_count += batch_position; - data_handle ret; + ret.sql_handle.table = this->sql.table; ret.sql_handle.column_names = this->column_names; ret.sql_handle.column_types = this->column_types; - ret.sql_handle.column_bytes = this->column_bytes; ret.sql_handle.row_count = row_count; ret.sql_handle.sqlite_statement = stmt; + // TODO percy add columns to uri.query ret.uri = Uri("sqlite", "", this->sql.schema + "/" + this->sql.table, "", ""); - // std::cout << "get_next TOTAL rows: " << this->row_count << "\n"; - // std::cout << "get_next current_row_count: " << this->current_row_count - // << "\n"; return ret; } size_t sqlite_data_provider::get_num_handles() { - if(this->partitions.empty()) { - size_t ret = this->row_count / this->sql.table_batch_size; + if (partitions.empty()) { + size_t ret = row_count / sql.table_batch_size; return ret == 0 ? 1 : ret; } - - return this->partitions.size(); + return partitions.size(); } } /* namespace io */ diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.h b/engine/src/io/data_provider/sql/SQLiteDataProvider.h index b7a62ab73..6d1ed915a 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.h +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.h @@ -20,41 +20,42 @@ namespace io { */ class sqlite_data_provider : public abstractsql_data_provider { public: - sqlite_data_provider(const sql_info &sql, size_t total_number_of_nodes, - size_t self_node_idx); + sqlite_data_provider(const sql_info & sql, + std::size_t total_number_of_nodes, + std::size_t self_node_idx); virtual ~sqlite_data_provider(); - std::shared_ptr clone() override; + std::shared_ptr clone() override; /** - * tells us if this provider can generate more sql resultsets - */ - bool has_next() override; + * tells us if this provider can generate more sql resultsets + */ + bool has_next() override; /** - * Resets file read count to 0 for file based DataProvider - */ - void reset() override; + * Resets file read count to 0 for file based DataProvider + */ + void reset() override; /** - * gets us the next arrow::io::RandomAccessFile - * if open_file is false will not run te query and just returns a data_handle - * with columns info - */ - data_handle get_next(bool = true) override; + * gets us the next arrow::io::RandomAccessFile + * if open_file is false will not run te query and just returns a data_handle + * with columns info + */ + data_handle get_next(bool = true) override; /** - * Get the number of data_handles that will be provided. - */ - size_t get_num_handles() override; + * Get the number of data_handles that will be provided. + */ + size_t get_num_handles() override; private: - sqlite3* sqlite_connection; + sqlite3 * db; std::vector partitions; - size_t row_count; - size_t batch_position; - size_t current_row_count; + std::size_t row_count; + std::size_t batch_position; + std::size_t current_row_count; }; } /* namespace io */ From 5270ab2ca6c454db80e5a8beae6190fadb79114a Mon Sep 17 00:00:00 2001 From: gcca Date: Fri, 16 Apr 2021 13:40:31 -0500 Subject: [PATCH 03/14] [feature/table-from-postgresl-sqlite] update sqlite provider to check next iteration --- .../data_provider/sql/SQLiteDataProvider.cpp | 80 +++++++++---------- .../io/data_provider/sql/SQLiteDataProvider.h | 1 + 2 files changed, 39 insertions(+), 42 deletions(-) diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp index 13a315e8f..d4a38d59b 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp @@ -33,10 +33,7 @@ struct sqlite_columns_info { }; struct callb { - int sqlite_callback(void * NotUsed, - int argc, - char ** argv, - char ** azColName) { + int sqlite_callback(void *, int argc, char ** argv, char ** azColName) { int i; for (i = 0; i < argc; i++) { printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL"); @@ -49,22 +46,22 @@ struct callb { static inline std::shared_ptr execute_sqlite_query(sqlite3 * db, const std::string & query) { sqlite3_stmt * stmt; - const char * sql = query.c_str(); - int errorCode = sqlite3_prepare_v2(db, sql, -1, &stmt, NULL); - if (errorCode != SQLITE_OK) { printf("error: %s", sqlite3_errmsg(db)); } - - auto sqlite_deleter = [](sqlite3_stmt * pointer) { - sqlite3_finalize(pointer); - }; - - std::shared_ptr ret(stmt, sqlite_deleter); + int errorCode = sqlite3_prepare_v2(db, query.c_str(), -1, &stmt, nullptr); + if (errorCode != SQLITE_OK) { + std::ostringstream oss; + oss << "Executing SQLite query provider: " << std::endl + << "query: " << query << std::endl + << "error message: " << sqlite3_errmsg(db); + throw std::runtime_error{oss.str()}; + } - return ret; + auto sqlite_deleter = [](sqlite3_stmt * stmt) { sqlite3_finalize(stmt); }; + return std::shared_ptr{stmt, sqlite_deleter}; } -sqlite_table_info get_sqlite_table_info(sqlite3 * db, - const std::string & table) { +static inline sqlite_table_info +get_sqlite_table_info(sqlite3 * db, const std::string & table) { sqlite_table_info ret; const std::string sql{"select count(*) from " + table}; int err = sqlite3_exec( @@ -107,23 +104,21 @@ bool sqlite_is_string_col_type(const std::string & t) { return false; } -sqlite_columns_info get_sqlite_columns_info(sqlite3 * conn, - const std::string & table) { - // TODO percy error handling - +static inline sqlite_columns_info +get_sqlite_columns_info(sqlite3 * db, const std::string & table) { sqlite_columns_info ret; std::string query = "PRAGMA table_info(" + table + ")"; - auto A = execute_sqlite_query(conn, query); + auto A = execute_sqlite_query(db, query); sqlite3_stmt * stmt = A.get(); - int rc = 0; + int rc = SQLITE_ERROR; while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { const unsigned char * name = sqlite3_column_text(stmt, 1); - std::string col_name((char *) name); + std::string col_name(reinterpret_cast(name)); ret.columns.push_back(col_name); const unsigned char * type = sqlite3_column_text(stmt, 2); - std::string col_type((char *) type); + std::string col_type(reinterpret_cast(type)); std::transform( col_type.cbegin(), @@ -131,7 +126,7 @@ sqlite_columns_info get_sqlite_columns_info(sqlite3 * conn, col_type.begin(), [](const std::string::value_type c) { return std::tolower(c); }); - size_t max_bytes = 8; // TODO percy check max scalar bytes from sqlite + std::size_t max_bytes = 8; // TODO percy check max scalar bytes from sqlite if (sqlite_is_string_col_type(col_type)) { // max_bytes = res->getUInt64("CHARACTER_MAXIMUM_LENGTH"); // TODO percy see how to get the max size for string/txt cols ... @@ -140,9 +135,13 @@ sqlite_columns_info get_sqlite_columns_info(sqlite3 * conn, } ret.types.push_back(col_type); } + if (rc != SQLITE_DONE) { - printf("error: %s", sqlite3_errmsg(conn)); - // TODO percy error + std::ostringstream oss; + oss << "Getting SQLite columns info: " << std::endl + << "query: " << query << std::endl + << "error message: " << sqlite3_errmsg(db); + throw std::runtime_error{oss.str()}; } return ret; @@ -151,8 +150,9 @@ sqlite_columns_info get_sqlite_columns_info(sqlite3 * conn, sqlite_data_provider::sqlite_data_provider(const sql_info & sql, size_t total_number_of_nodes, size_t self_node_idx) - : abstractsql_data_provider(sql, total_number_of_nodes, self_node_idx), - db(nullptr), batch_position(0), current_row_count(0) { + : abstractsql_data_provider{sql, total_number_of_nodes, self_node_idx}, + db{nullptr}, current_stmt{nullptr}, batch_position{0}, current_row_count{ + 0} { int errorCode = sqlite3_open(sql.schema.c_str(), &db); if (errorCode != SQLITE_OK) { @@ -172,16 +172,14 @@ sqlite_data_provider::sqlite_data_provider(const sql_info & sql, sqlite_data_provider::~sqlite_data_provider() { sqlite3_close(db); } std::shared_ptr sqlite_data_provider::clone() { - return std::make_shared(this->sql, - this->total_number_of_nodes, - this->self_node_idx); + return std::make_shared(sql, + total_number_of_nodes, + self_node_idx); } -bool sqlite_data_provider::has_next() { - return this->current_row_count < row_count; -} +bool sqlite_data_provider::has_next() { return current_stmt != nullptr; } -void sqlite_data_provider::reset() { this->batch_position = 0; } +void sqlite_data_provider::reset() { batch_position = 0; } data_handle sqlite_data_provider::get_next(bool) { data_handle ret; @@ -196,20 +194,18 @@ data_handle sqlite_data_provider::get_next(bool) { batch_position += sql.table_batch_size; - std::cout << "\033[32m>>> query: " << query << "\033[0m" << std::endl; - std::shared_ptr stmt = execute_sqlite_query(db, query); current_row_count += batch_position; - ret.sql_handle.table = this->sql.table; - ret.sql_handle.column_names = this->column_names; - ret.sql_handle.column_types = this->column_types; + ret.sql_handle.table = sql.table; + ret.sql_handle.column_names = column_names; + ret.sql_handle.column_types = column_types; ret.sql_handle.row_count = row_count; ret.sql_handle.sqlite_statement = stmt; // TODO percy add columns to uri.query - ret.uri = Uri("sqlite", "", this->sql.schema + "/" + this->sql.table, "", ""); + ret.uri = Uri("sqlite", "", sql.schema + "/" + sql.table, "", ""); return ret; } diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.h b/engine/src/io/data_provider/sql/SQLiteDataProvider.h index 6d1ed915a..fccd304ec 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.h +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.h @@ -52,6 +52,7 @@ class sqlite_data_provider : public abstractsql_data_provider { private: sqlite3 * db; + sqlite3_stmt * current_stmt; std::vector partitions; std::size_t row_count; std::size_t batch_position; From 243a58661eb5858d04670288375a24f4585cb6d1 Mon Sep 17 00:00:00 2001 From: gcca Date: Fri, 16 Apr 2021 17:33:39 -0500 Subject: [PATCH 04/14] [feature/table-from-postgresl-sqlite] update sqlite provider for distribution --- .../data_provider/sql/SQLiteDataProvider.cpp | 122 +++++++++++------- .../io/data_provider/sql/SQLiteDataProvider.h | 2 - 2 files changed, 77 insertions(+), 47 deletions(-) diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp index d4a38d59b..2c4ce2a6e 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp @@ -78,30 +78,11 @@ get_sqlite_table_info(sqlite3 * db, const std::string & table) { }, &ret, nullptr); - if (err != SQLITE_OK) { throw std::runtime_error("getting number of rows"); } - return ret; -} - -// TODO percy avoid code duplication -bool sqlite_is_string_col_type(const std::string & t) { - std::vector mysql_string_types_hints = { - "character", - "varchar", - "char", - "varying character", - "nchar", - "native character", - "nvarchar", - "text", - "clob", - "string" // TODO percy ??? - }; - - for (auto hint : mysql_string_types_hints) { - if (StringUtil::beginsWith(t, hint)) return true; + if (err != SQLITE_OK) { + throw std::runtime_error{std::string{"getting number of rows"} + + sqlite3_errmsg(db)}; } - - return false; + return ret; } static inline sqlite_columns_info @@ -126,13 +107,6 @@ get_sqlite_columns_info(sqlite3 * db, const std::string & table) { col_type.begin(), [](const std::string::value_type c) { return std::tolower(c); }); - std::size_t max_bytes = 8; // TODO percy check max scalar bytes from sqlite - if (sqlite_is_string_col_type(col_type)) { - // max_bytes = res->getUInt64("CHARACTER_MAXIMUM_LENGTH"); - // TODO percy see how to get the max size for string/txt cols ... - // see docs - max_bytes = 256; - } ret.types.push_back(col_type); } @@ -151,8 +125,7 @@ sqlite_data_provider::sqlite_data_provider(const sql_info & sql, size_t total_number_of_nodes, size_t self_node_idx) : abstractsql_data_provider{sql, total_number_of_nodes, self_node_idx}, - db{nullptr}, current_stmt{nullptr}, batch_position{0}, current_row_count{ - 0} { + db{nullptr}, batch_position{0} { int errorCode = sqlite3_open(sql.schema.c_str(), &db); if (errorCode != SQLITE_OK) { @@ -177,31 +150,90 @@ std::shared_ptr sqlite_data_provider::clone() { self_node_idx); } -bool sqlite_data_provider::has_next() { return current_stmt != nullptr; } - -void sqlite_data_provider::reset() { batch_position = 0; } +static inline std::string +make_table_query_string(const std::string & table, + const std::size_t limit, + const std::size_t batch_size, + const std::size_t batch_position, + const std::size_t number_of_nodes, + const std::size_t node_idx) { + const std::size_t offset = + batch_size * (batch_position * number_of_nodes + node_idx); + std::ostringstream oss; + oss << "SELECT * FROM " << table << " LIMIT " << limit << " OFFSET " + << offset; + return oss.str(); +} -data_handle sqlite_data_provider::get_next(bool) { - data_handle ret; +bool sqlite_data_provider::has_next() { + const std::string query = make_table_query_string(sql.table, + 1, + sql.table_batch_size, + batch_position, + total_number_of_nodes, + self_node_idx); + bool it_has = false; + int errorCode = sqlite3_exec( + db, + query.c_str(), + [](void * data, int count, char ** rows, char **) -> int { + *static_cast(data) = count > 0 && rows; + return 0; + }, + &it_has, + nullptr); + if (errorCode != SQLITE_OK) { + throw std::runtime_error{std::string{"Has next SQLite batch: "} + + sqlite3_errmsg(db)}; + } + return it_has; +} - const std::int32_t limit = batch_position + sql.table_batch_size; - const std::int32_t offset = batch_position; +void sqlite_data_provider::reset() { batch_position = 0; } +static inline std::size_t get_size_for_statement(sqlite3_stmt * stmt) { std::ostringstream oss; - oss << "SELECT * FROM " << sql.table << " LIMIT " << limit << " OFFSET " - << offset; + oss << "select count(*) from (" << sqlite3_expanded_sql(stmt) << ')' + << std::endl; std::string query = oss.str(); - batch_position += sql.table_batch_size; + std::size_t nRows = 0; + const std::int32_t errorCode = sqlite3_exec( + sqlite3_db_handle(stmt), + query.c_str(), + [](void * data, int count, char ** rows, char **) -> int { + if (count == 1 && rows) { + *static_cast(data) = + static_cast(std::atoi(rows[0])); + return 0; + } + return 1; + }, + &nRows, + nullptr); + if (errorCode != SQLITE_OK) { + throw std::runtime_error{std::string{"Has next SQLite batch: "} + + sqlite3_errstr(errorCode)}; + } + return nRows; +} + +data_handle sqlite_data_provider::get_next(bool) { + data_handle ret; + const std::string query = make_table_query_string(sql.table, + sql.table_batch_size, + sql.table_batch_size, + batch_position, + total_number_of_nodes, + self_node_idx); + batch_position++; std::shared_ptr stmt = execute_sqlite_query(db, query); - current_row_count += batch_position; - ret.sql_handle.table = sql.table; ret.sql_handle.column_names = column_names; ret.sql_handle.column_types = column_types; - ret.sql_handle.row_count = row_count; + ret.sql_handle.row_count = get_size_for_statement(stmt.get()); ret.sql_handle.sqlite_statement = stmt; // TODO percy add columns to uri.query diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.h b/engine/src/io/data_provider/sql/SQLiteDataProvider.h index fccd304ec..e0ba7f6f1 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.h +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.h @@ -52,11 +52,9 @@ class sqlite_data_provider : public abstractsql_data_provider { private: sqlite3 * db; - sqlite3_stmt * current_stmt; std::vector partitions; std::size_t row_count; std::size_t batch_position; - std::size_t current_row_count; }; } /* namespace io */ From da42dce24c4f954bbb4c7b578ac50526b91a7737 Mon Sep 17 00:00:00 2001 From: gcca Date: Sat, 17 Apr 2021 13:46:31 -0500 Subject: [PATCH 05/14] [feature/table-from-postgresl-sqlite] update sqlite provider to load data --- .../data_provider/sql/SQLiteDataProvider.cpp | 37 +++++++++---------- tests/BlazingSQLTest/DataBase/sqliteSchema.py | 2 + .../EndToEndTests/tablesFromSQL.py | 4 +- 3 files changed, 22 insertions(+), 21 deletions(-) diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp index 2c4ce2a6e..e1f6d9587 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp @@ -151,27 +151,27 @@ std::shared_ptr sqlite_data_provider::clone() { } static inline std::string -make_table_query_string(const std::string & table, - const std::size_t limit, +make_table_query_string(const std::size_t limit, const std::size_t batch_size, const std::size_t batch_position, const std::size_t number_of_nodes, - const std::size_t node_idx) { + const std::size_t node_idx, + const std::string & select_from) { const std::size_t offset = batch_size * (batch_position * number_of_nodes + node_idx); std::ostringstream oss; - oss << "SELECT * FROM " << table << " LIMIT " << limit << " OFFSET " - << offset; + oss << select_from << " LIMIT " << limit << " OFFSET " << offset; return oss.str(); } bool sqlite_data_provider::has_next() { - const std::string query = make_table_query_string(sql.table, - 1, - sql.table_batch_size, - batch_position, - total_number_of_nodes, - self_node_idx); + const std::string query = + make_table_query_string(1, + sql.table_batch_size, + batch_position, + total_number_of_nodes, + self_node_idx, + "select * from " + sql.table); bool it_has = false; int errorCode = sqlite3_exec( db, @@ -220,12 +220,14 @@ static inline std::size_t get_size_for_statement(sqlite3_stmt * stmt) { data_handle sqlite_data_provider::get_next(bool) { data_handle ret; - const std::string query = make_table_query_string(sql.table, - sql.table_batch_size, + + const std::string select_from = build_select_from(); + const std::string query = make_table_query_string(sql.table_batch_size, sql.table_batch_size, batch_position, total_number_of_nodes, - self_node_idx); + self_node_idx, + select_from); batch_position++; std::shared_ptr stmt = execute_sqlite_query(db, query); @@ -242,11 +244,8 @@ data_handle sqlite_data_provider::get_next(bool) { } size_t sqlite_data_provider::get_num_handles() { - if (partitions.empty()) { - size_t ret = row_count / sql.table_batch_size; - return ret == 0 ? 1 : ret; - } - return partitions.size(); + std::size_t ret = row_count / sql.table_batch_size; + return ret == 0 ? 1 : ret; } } /* namespace io */ diff --git a/tests/BlazingSQLTest/DataBase/sqliteSchema.py b/tests/BlazingSQLTest/DataBase/sqliteSchema.py index 653a71552..ab59c1b7d 100644 --- a/tests/BlazingSQLTest/DataBase/sqliteSchema.py +++ b/tests/BlazingSQLTest/DataBase/sqliteSchema.py @@ -99,10 +99,12 @@ def sqlite_load_data_in_file(table: str, with open(psvpath) as psv: reader = csv.reader(psv, delimiter='|') row = next(reader) + row = [c if c.lower() != 'null' else None for c in row] nfields = ','.join('?' * len(row)) query = f'insert into {table} values ({nfields})' cursor.execute(query, row) for row in reader: + row = [c if c.lower() != 'null' else None for c in row] cursor.execute(query, row) connection.commit() diff --git a/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py b/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py index 1967ecc3f..80ff5eb6b 100644 --- a/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py +++ b/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py @@ -9,8 +9,8 @@ queryType = "TablesFromSQL" data_types = [ #DataType.MYSQL, - DataType.POSTGRESQL, - #DataType.SQLITE, + #DataType.POSTGRESQL, + DataType.SQLITE, # TODO percy c.gonzales ] From 8239b91a993419d248e8858cb77f508ecee3aeb6 Mon Sep 17 00:00:00 2001 From: gcca Date: Mon, 19 Apr 2021 10:21:04 -0500 Subject: [PATCH 06/14] [feature/table-from-postgresl-sqlite] update postgresql provider for distribution --- .../io/data_parser/sql/PostgreSQLParser.cpp | 256 +++++++++++------- .../sql/PostgreSQLDataProvider.cpp | 63 +++-- .../sql/PostgreSQLDataProvider.h | 11 +- 3 files changed, 200 insertions(+), 130 deletions(-) diff --git a/engine/src/io/data_parser/sql/PostgreSQLParser.cpp b/engine/src/io/data_parser/sql/PostgreSQLParser.cpp index 54d051b3a..5ef56dd98 100644 --- a/engine/src/io/data_parser/sql/PostgreSQLParser.cpp +++ b/engine/src/io/data_parser/sql/PostgreSQLParser.cpp @@ -21,21 +21,27 @@ namespace ral { namespace io { -static const std::array postgresql_string_type_hints = { - "character", "character varying", "bytea", "text", "anyarray", "name"}; +static const std::array postgresql_string_type_hints = + {"character", "character varying", "bytea", "text", "anyarray", "name"}; static inline bool postgresql_is_cudf_string(const std::string & hint) { - const auto * result = std::find_if(std::cbegin(postgresql_string_type_hints), - std::cend(postgresql_string_type_hints), - [&hint]( - const char * c_hint) { return std::strcmp(c_hint, hint.c_str()) == 0; }); + const auto * result = + std::find_if(std::cbegin(postgresql_string_type_hints), + std::cend(postgresql_string_type_hints), + [&hint](const char * c_hint) { + return std::strcmp(c_hint, hint.c_str()) == 0; + }); return result != std::cend(postgresql_string_type_hints); } +// To avoid theses functions we could use postgresql ecpg. But for now, there is +// not exist a official conda channel for ecpg. Maybe we could add to cmake +// script a function to check a postgresql installation folder + static inline void date_to_ymd(std::int32_t jd, - std::int32_t & year, - std::int32_t & month, - std::int32_t & day) { + std::int32_t & year, + std::int32_t & month, + std::int32_t & day) { std::uint32_t julian, quad, extra; std::int32_t y; julian = static_cast(jd); @@ -55,10 +61,10 @@ static inline void date_to_ymd(std::int32_t jd, } static inline void time_to_hms(std::int64_t jd, - std::int32_t & hour, - std::int32_t & min, - std::int32_t & sec, - std::int32_t & msec) { + std::int32_t & hour, + std::int32_t & min, + std::int32_t & sec, + std::int32_t & msec) { std::int64_t time; const std::int64_t USECS_PER_HOUR = 3600000000; const std::int64_t USECS_PER_MINUTE = 60000000; @@ -72,8 +78,8 @@ static inline void time_to_hms(std::int64_t jd, msec = time - (sec * USECS_PER_SEC); } -static inline int timestamp_to_tm( - std::int64_t dt, struct tm & tm, std::int32_t & msec) { +static inline int +timestamp_to_tm(std::int64_t dt, struct tm & tm, std::int32_t & msec) { std::int64_t dDate, POSTGRESQL_EPOCH_DATE = 2451545; std::int64_t time; std::uint64_t USECS_PER_DAY = 86400000000; @@ -88,8 +94,10 @@ static inline int timestamp_to_tm( } dDate += POSTGRESQL_EPOCH_DATE; - date_to_ymd( - static_cast(dDate), tm.tm_year, tm.tm_mon, tm.tm_mday); + date_to_ymd(static_cast(dDate), + tm.tm_year, + tm.tm_mon, + tm.tm_mday); time_to_hms(time, tm.tm_hour, tm.tm_min, tm.tm_sec, msec); } @@ -119,42 +127,48 @@ static inline std::string timestamp_to_string(std::int64_t tstamp) { return oss.str(); } -static inline cudf::type_id parse_postgresql_column_type( - const std::string & columnTypeName) { +static inline cudf::type_id +parse_postgresql_column_type(const std::string & columnTypeName) { if (postgresql_is_cudf_string(columnTypeName)) { return cudf::type_id::STRING; } - if (columnTypeName == "smallint") { return cudf::type_id::INT16; } - if (columnTypeName == "integer") { return cudf::type_id::INT32; } - if (columnTypeName == "bigint") { return cudf::type_id::INT64; } - if (columnTypeName == "decimal") { return cudf::type_id::DECIMAL64; } - if (columnTypeName == "numeric") { return cudf::type_id::DECIMAL64; } - if (columnTypeName == "real") { return cudf::type_id::FLOAT32; } - if (columnTypeName == "double precision") { return cudf::type_id::FLOAT64; } - if (columnTypeName == "smallserial") { return cudf::type_id::INT16; } - if (columnTypeName == "serial") { return cudf::type_id::INT32; } - if (columnTypeName == "bigserial") { return cudf::type_id::INT64; } - if (columnTypeName == "boolean") { return cudf::type_id::BOOL8; } - if (columnTypeName == "date") { return cudf::type_id::TIMESTAMP_DAYS; } - if (columnTypeName == "money") { return cudf::type_id::UINT64; } - if (columnTypeName == "timestamp without time zone") { + if (!columnTypeName.rfind("smallint", 0)) { return cudf::type_id::INT16; } + if (!columnTypeName.rfind("integer", 0)) { return cudf::type_id::INT32; } + if (!columnTypeName.rfind("bigint", 0)) { return cudf::type_id::INT64; } + if (!columnTypeName.rfind("decimal", 0)) { return cudf::type_id::DECIMAL64; } + if (!columnTypeName.rfind("numeric", 0)) { return cudf::type_id::DECIMAL64; } + if (!columnTypeName.rfind("real", 0)) { return cudf::type_id::FLOAT32; } + if (!columnTypeName.rfind("double precision", 0)) { + return cudf::type_id::FLOAT64; + } + if (!columnTypeName.rfind("smallserial", 0)) { return cudf::type_id::INT16; } + if (!columnTypeName.rfind("serial", 0)) { return cudf::type_id::INT32; } + if (!columnTypeName.rfind("bigserial", 0)) { return cudf::type_id::INT64; } + if (!columnTypeName.rfind("boolean", 0)) { return cudf::type_id::BOOL8; } + if (!columnTypeName.rfind("date", 0)) { + return cudf::type_id::TIMESTAMP_DAYS; + } + if (!columnTypeName.rfind("money", 0)) { return cudf::type_id::UINT64; } + if (!columnTypeName.rfind("timestamp without time zone", 0)) { return cudf::type_id::TIMESTAMP_MILLISECONDS; } - if (columnTypeName == "timestamp with time zone") { + if (!columnTypeName.rfind("timestamp with time zone", 0)) { return cudf::type_id::TIMESTAMP_MILLISECONDS; } - if (columnTypeName == "time without time zone") { + if (!columnTypeName.rfind("time without time zone", 0)) { return cudf::type_id::DURATION_MILLISECONDS; } - if (columnTypeName == "time with time zone") { + if (!columnTypeName.rfind("time with time zone", 0)) { return cudf::type_id::DURATION_MILLISECONDS; } - if (columnTypeName == "interval") { + if (!columnTypeName.rfind("interval", 0)) { return cudf::type_id::DURATION_MILLISECONDS; } - if (columnTypeName == "inet") { return cudf::type_id::UINT64; } - if (columnTypeName == "USER-DEFINED") { return cudf::type_id::STRUCT; } - if (columnTypeName == "ARRAY") { return cudf::type_id::LIST; } + if (!columnTypeName.rfind("inet", 0)) { return cudf::type_id::UINT64; } + if (!columnTypeName.rfind("USER-DEFINED", 0)) { + return cudf::type_id::STRUCT; + } + if (!columnTypeName.rfind("ARRAY", 0)) { return cudf::type_id::LIST; } throw std::runtime_error("PostgreSQL type hint not found: " + columnTypeName); } @@ -163,26 +177,33 @@ postgresql_parser::postgresql_parser() postgresql_parser::~postgresql_parser() = default; -void postgresql_parser::read_sql_loop(void * src, - const std::vector & cudf_types, - const std::vector & column_indices, - std::vector & host_cols, - std::vector> & null_masks) { +void postgresql_parser::read_sql_loop( + void * src, + const std::vector & cudf_types, + const std::vector & column_indices, + std::vector & host_cols, + std::vector> & null_masks) { PGresult * result = static_cast(src); const int ntuples = PQntuples(result); for (int rowCounter = 0; rowCounter < ntuples; rowCounter++) { - parse_sql( - src, column_indices, cudf_types, rowCounter, host_cols, null_masks); + parse_sql(src, + column_indices, + cudf_types, + rowCounter, + host_cols, + null_masks); } } -cudf::type_id postgresql_parser::get_cudf_type_id( - const std::string & sql_column_type) { +cudf::type_id +postgresql_parser::get_cudf_type_id(const std::string & sql_column_type) { return parse_postgresql_column_type(sql_column_type); } -std::uint8_t postgresql_parser::parse_cudf_int8( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t postgresql_parser::parse_cudf_int8(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); @@ -191,41 +212,53 @@ std::uint8_t postgresql_parser::parse_cudf_int8( return 1; } -std::uint8_t postgresql_parser::parse_cudf_int16( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_int16(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); const std::int16_t value = - ntohs(*reinterpret_cast(result)); + ntohs(*reinterpret_cast(result)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_int32( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_int32(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); const std::int32_t value = - ntohl(*reinterpret_cast(result)); + ntohl(*reinterpret_cast(result)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_int64( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_int64(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); const std::int64_t value = - __builtin_bswap64(*reinterpret_cast(result)); + __builtin_bswap64(*reinterpret_cast(result)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_uint8( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_uint8(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); @@ -234,101 +267,126 @@ std::uint8_t postgresql_parser::parse_cudf_uint8( return 1; } -std::uint8_t postgresql_parser::parse_cudf_uint16(void * src, - std::size_t col, - std::size_t row, - std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_uint16(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); const std::uint16_t value = - ntohs(*reinterpret_cast(result)); + ntohs(*reinterpret_cast(result)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_uint32(void * src, - std::size_t col, - std::size_t row, - std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_uint32(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); const std::uint32_t value = - ntohl(*reinterpret_cast(result)); + ntohl(*reinterpret_cast(result)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_uint64(void * src, - std::size_t col, - std::size_t row, - std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_uint64(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); const std::uint64_t value = - __builtin_bswap64(*reinterpret_cast(result)); + __builtin_bswap64(*reinterpret_cast(result)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_float32( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t postgresql_parser::parse_cudf_float32(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); const std::int32_t casted = - ntohl(*reinterpret_cast(result)); + ntohl(*reinterpret_cast(result)); const float value = *reinterpret_cast(&casted); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_float64( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t postgresql_parser::parse_cudf_float64(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); const std::int64_t casted = - __builtin_bswap64(*reinterpret_cast(result)); + __builtin_bswap64(*reinterpret_cast(result)); const double value = *reinterpret_cast(&casted); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_bool8( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t postgresql_parser::parse_cudf_bool8(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { return parse_cudf_int8(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_timestamp_days( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t postgresql_parser::parse_cudf_timestamp_days(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_timestamp_seconds( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t +postgresql_parser::parse_cudf_timestamp_seconds(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_timestamp_milliseconds( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t +postgresql_parser::parse_cudf_timestamp_milliseconds(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_timestamp_microseconds( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t +postgresql_parser::parse_cudf_timestamp_microseconds(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_timestamp_nanoseconds( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t +postgresql_parser::parse_cudf_timestamp_nanoseconds(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_string( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t postgresql_parser::parse_cudf_string(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { PGresult * pgResult = static_cast(src); Oid oid = PQftype(pgResult, col); @@ -346,13 +404,13 @@ std::uint8_t postgresql_parser::parse_cudf_string( switch (oid) { case 1082: { // date const std::int32_t value = - ntohl(*reinterpret_cast(result)); + ntohl(*reinterpret_cast(result)); data = date_to_string(value); break; } case 1114: { // timestamp const std::int64_t value = - __builtin_bswap64(*reinterpret_cast(result)); + __builtin_bswap64(*reinterpret_cast(result)); data = timestamp_to_string(value); break; } diff --git a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp index 2f446cb5a..5638a2124 100644 --- a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp +++ b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp @@ -15,14 +15,14 @@ namespace io { namespace { -const std::string MakePostgreSQLConnectionString(const sql_info &sql) { +const std::string MakePostgreSQLConnectionString(const sql_info & sql) { std::ostringstream os; os << "host=" << sql.host << " port=" << sql.port << " dbname=" << sql.schema << " user=" << sql.user << " password=" << sql.password; return os.str(); } -const std::string MakeQueryForColumnsInfo(const sql_info &sql) { +const std::string MakeQueryForColumnsInfo(const sql_info & sql) { std::ostringstream os; os << "select column_name, data_type, character_maximum_length" " from information_schema.tables as tables" @@ -38,10 +38,11 @@ class TableInfo { std::vector column_names; std::vector column_types; std::vector column_bytes; + std::size_t row_count; }; -TableInfo ExecuteTableInfo(PGconn *connection, const sql_info &sql) { - PGresult *result = PQexec(connection, MakeQueryForColumnsInfo(sql).c_str()); +inline TableInfo ExecuteTableInfo(PGconn * connection, const sql_info & sql) { + PGresult * result = PQexec(connection, MakeQueryForColumnsInfo(sql).c_str()); if (PQresultStatus(result) != PGRES_TUPLES_OK) { PQclear(result); PQfinish(connection); @@ -49,15 +50,17 @@ TableInfo ExecuteTableInfo(PGconn *connection, const sql_info &sql) { } int resultNtuples = PQntuples(result); + int resultNfields = PQnfields(result); TableInfo tableInfo; - tableInfo.column_names.reserve(resultNtuples); - tableInfo.column_types.reserve(resultNtuples); + tableInfo.column_names.reserve(resultNfields); + tableInfo.column_types.reserve(resultNfields); + tableInfo.row_count = static_cast(resultNtuples); int columnNameFn = PQfnumber(result, "column_name"); int dataTypeFn = PQfnumber(result, "data_type"); int characterMaximumLengthFn = PQfnumber(result, "character_maximum_length"); - for (int i = 0; i < resultNtuples; i++) { + for (int i = 0; i < resultNfields; i++) { tableInfo.column_names.emplace_back( std::string{PQgetvalue(result, i, columnNameFn)}); tableInfo.column_types.emplace_back( @@ -68,7 +71,7 @@ TableInfo ExecuteTableInfo(PGconn *connection, const sql_info &sql) { // TODO(recy, cristhian): check the minimum size for types tableInfo.column_bytes.emplace_back(8); } else { - const char *characterMaximumLengthBytes = + const char * characterMaximumLengthBytes = PQgetvalue(result, i, characterMaximumLengthFn); // NOTE postgresql representation of number is in network order const std::uint32_t characterMaximumLength = @@ -84,35 +87,34 @@ TableInfo ExecuteTableInfo(PGconn *connection, const sql_info &sql) { } // namespace -postgresql_data_provider::postgresql_data_provider(const sql_info &sql, - size_t total_number_of_nodes, - size_t self_node_idx) - : abstractsql_data_provider(sql, total_number_of_nodes, self_node_idx), table_fetch_completed{false}, - batch_position{0}, estimated_table_row_count{0} { +postgresql_data_provider::postgresql_data_provider( + const sql_info & sql, + std::size_t total_number_of_nodes, + std::size_t self_node_idx) + : abstractsql_data_provider(sql, total_number_of_nodes, self_node_idx), + table_fetch_completed{false}, batch_position{0}, + estimated_table_row_count{0} { connection = PQconnectdb(MakePostgreSQLConnectionString(sql).c_str()); if (PQstatus(connection) != CONNECTION_OK) { - std::cerr << "Connection to database failed: " << PQerrorMessage(connection) - << std::endl; throw std::runtime_error("Connection to database failed: " + std::string{PQerrorMessage(connection)}); } - std::cout << "PostgreSQL version: " << PQserverVersion(connection) - << std::endl; - TableInfo tableInfo = ExecuteTableInfo(connection, sql); - column_names = tableInfo.column_names; column_types = tableInfo.column_types; column_bytes = tableInfo.column_bytes; + estimated_table_row_count = tableInfo.row_count; } postgresql_data_provider::~postgresql_data_provider() { PQfinish(connection); } std::shared_ptr postgresql_data_provider::clone() { return std::static_pointer_cast( - std::make_shared(sql, this->total_number_of_nodes, this->self_node_idx)); + std::make_shared(sql, + this->total_number_of_nodes, + this->self_node_idx)); } bool postgresql_data_provider::has_next() { @@ -135,12 +137,21 @@ data_handle postgresql_data_provider::get_next(bool open_file) { const std::string select_from = build_select_from(); const std::string where = sql.table_filter.empty() ? "" : " where "; - const std::string query = select_from + where + sql.table_filter + - build_limit_offset(batch_position); - - batch_position += sql.table_batch_size; - PGresult *result = PQexecParams( - connection, query.c_str(), 0, nullptr, nullptr, nullptr, nullptr, 1); + const std::size_t offset = + sql.table_batch_size * + (batch_position * total_number_of_nodes + self_node_idx); + const std::string query = + select_from + where + sql.table_filter + build_limit_offset(offset); + + batch_position++; + PGresult * result = PQexecParams(connection, + query.c_str(), + 0, + nullptr, + nullptr, + nullptr, + nullptr, + 1); if (PQresultStatus(result) != PGRES_TUPLES_OK) { PQclear(result); diff --git a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h index e3bc863c9..8eefed891 100644 --- a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h +++ b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h @@ -1,6 +1,7 @@ /* * Copyright 2021 BlazingDB, Inc. - * Copyright 2021 Cristhian Alberto Gonzales Castillo + * Copyright 2021 Cristhian Alberto Gonzales Castillo + * */ #ifndef POSTGRESQLDATAPROVIDER_H_ @@ -15,9 +16,9 @@ namespace io { class postgresql_data_provider : public abstractsql_data_provider { public: - postgresql_data_provider(const sql_info &sql, - size_t total_number_of_nodes, - size_t self_node_idx); + postgresql_data_provider(const sql_info & sql, + std::size_t total_number_of_nodes, + std::size_t self_node_idx); virtual ~postgresql_data_provider(); @@ -32,7 +33,7 @@ class postgresql_data_provider : public abstractsql_data_provider { std::size_t get_num_handles() override; private: - PGconn *connection; + PGconn * connection; bool table_fetch_completed; std::size_t batch_position; std::size_t estimated_table_row_count; From 928bf537fe0d56e4f9e2b2a42da65b12eaf3a437 Mon Sep 17 00:00:00 2001 From: gcca Date: Mon, 19 Apr 2021 16:19:21 -0500 Subject: [PATCH 07/14] [feature/table-from-postgresl-sqlite] update postgresql parser for numeric types --- engine/src/cython/engine.cpp | 4 +- .../io/data_parser/sql/PostgreSQLParser.cpp | 181 +++++------------- .../sql/PostgreSQLDataProvider.cpp | 45 ++--- .../data_provider/sql/SQLiteDataProvider.cpp | 8 +- .../EndToEndTests/tablesFromSQL.py | 9 +- 5 files changed, 74 insertions(+), 173 deletions(-) diff --git a/engine/src/cython/engine.cpp b/engine/src/cython/engine.cpp index 3fc6be93e..47fc7f570 100644 --- a/engine/src/cython/engine.cpp +++ b/engine/src/cython/engine.cpp @@ -97,7 +97,7 @@ std::pair, std::vector> get_l #ifdef POSTGRESQL_SUPPORT parser = std::make_shared(); auto sql = ral::io::getSqlInfo(args_map); - provider = std::make_shared(sql, 0, 0); + provider = std::make_shared(sql, total_number_of_nodes, self_node_idx); #else throw std::runtime_error("ERROR: This BlazingSQL version doesn't support PostgreSQL integration"); #endif @@ -200,7 +200,7 @@ std::shared_ptr runGenerateGraph(uint32_t masterIndex, { using blazingdb::manager::Context; using blazingdb::transport::Node; - + auto& communicationData = ral::communication::CommunicationData::getInstance(); std::vector contextNodes; diff --git a/engine/src/io/data_parser/sql/PostgreSQLParser.cpp b/engine/src/io/data_parser/sql/PostgreSQLParser.cpp index 8b6000985..4cae3daf9 100644 --- a/engine/src/io/data_parser/sql/PostgreSQLParser.cpp +++ b/engine/src/io/data_parser/sql/PostgreSQLParser.cpp @@ -24,108 +24,13 @@ static const std::array postgresql_string_type_hints = {"character", "character varying", "bytea", "text", "anyarray", "name"}; static inline bool postgresql_is_cudf_string(const std::string & hint) { - const auto * result = - std::find_if(std::cbegin(postgresql_string_type_hints), - std::cend(postgresql_string_type_hints), - [&hint](const char * c_hint) { - return std::strcmp(c_hint, hint.c_str()) == 0; - }); + const auto * result = std::find_if( + std::cbegin(postgresql_string_type_hints), + std::cend(postgresql_string_type_hints), + [&hint](const char * c_hint) { return !hint.rfind(c_hint, 0); }); return result != std::cend(postgresql_string_type_hints); } -// To avoid theses functions we could use postgresql ecpg. But for now, there is -// not exist a official conda channel for ecpg. Maybe we could add to cmake -// script a function to check a postgresql installation folder - -static inline void date_to_ymd(std::int32_t jd, - std::int32_t & year, - std::int32_t & month, - std::int32_t & day) { - std::uint32_t julian, quad, extra; - std::int32_t y; - julian = static_cast(jd); - julian += 32044; - quad = julian / 146097; - extra = (julian - quad * 146097) * 4 + 3; - julian += 60 + quad * 3 + extra / 146097; - quad = julian / 1461; - julian -= quad * 1461; - y = julian * 4 / 1461; - julian = ((y != 0) ? (julian + 305) % 365 : (julian + 306) % 366) + 123; - y += quad * 4; - year = y - 4800; - quad = julian * 2141 / 65536; - day = julian - 7834 * quad / 256; - month = (quad + 10) % 12 + 1; -} - -static inline void time_to_hms(std::int64_t jd, - std::int32_t & hour, - std::int32_t & min, - std::int32_t & sec, - std::int32_t & msec) { - std::int64_t time; - const std::int64_t USECS_PER_HOUR = 3600000000; - const std::int64_t USECS_PER_MINUTE = 60000000; - const std::int64_t USECS_PER_SEC = 1000000; - time = jd; - hour = time / USECS_PER_HOUR; - time -= (hour) *USECS_PER_HOUR; - min = time / USECS_PER_MINUTE; - time -= (min) *USECS_PER_MINUTE; - sec = time / USECS_PER_SEC; - msec = time - (sec * USECS_PER_SEC); -} - -static inline int -timestamp_to_tm(std::int64_t dt, struct tm & tm, std::int32_t & msec) { - std::int64_t dDate, POSTGRESQL_EPOCH_DATE = 2451545; - std::int64_t time; - std::uint64_t USECS_PER_DAY = 86400000000; - time = dt; - - dDate = time / USECS_PER_DAY; - if (dDate != 0) { time -= dDate * USECS_PER_DAY; } - - if (time < 0) { - time += USECS_PER_DAY; - dDate -= 1; - } - dDate += POSTGRESQL_EPOCH_DATE; - - date_to_ymd(static_cast(dDate), - tm.tm_year, - tm.tm_mon, - tm.tm_mday); - time_to_hms(time, tm.tm_hour, tm.tm_min, tm.tm_sec, msec); -} - -static inline std::string date_to_string(std::int32_t jd) { - // For date conversion see - // https://doxygen.postgresql.org/backend_2utils_2adt_2datetime_8c.html#a889d375aaf2a25be071d818565142b9e - const std::int32_t POSTGRESQL_EPOCH_DATE = 2451545; - std::int32_t year, month, day; - date_to_ymd(POSTGRESQL_EPOCH_DATE + jd, year, month, day); - std::stringstream oss; - oss << year << '-' << std::setfill('0') << std::setw(2) << month << '-' - << std::setw(2) << day; - return oss.str(); -} - -static inline std::string timestamp_to_string(std::int64_t tstamp) { - // For timestamp conversion see - // https://doxygen.postgresql.org/backend_2utils_2adt_2timestamp_8c.html#a933dc09a38ddcf144a48b2aaf5790893 - struct tm tm; - std::int32_t msec; - timestamp_to_tm(tstamp, tm, msec); - std::stringstream oss; - oss << tm.tm_year << '-' << std::setfill('0') << std::setw(2) << tm.tm_mon - << '-' << std::setw(2) << tm.tm_mday << ' ' << std::setw(2) << tm.tm_hour - << ':' << std::setw(2) << tm.tm_min << ':' << std::setw(2) << tm.tm_sec; - if (msec != 0) { oss << '.' << std::setw(6) << msec; } - return oss.str(); -} - static inline cudf::type_id parse_postgresql_column_type(const std::string & columnTypeName) { if (postgresql_is_cudf_string(columnTypeName)) { @@ -134,8 +39,8 @@ parse_postgresql_column_type(const std::string & columnTypeName) { if (!columnTypeName.rfind("smallint", 0)) { return cudf::type_id::INT16; } if (!columnTypeName.rfind("integer", 0)) { return cudf::type_id::INT32; } if (!columnTypeName.rfind("bigint", 0)) { return cudf::type_id::INT64; } - if (!columnTypeName.rfind("decimal", 0)) { return cudf::type_id::DECIMAL64; } - if (!columnTypeName.rfind("numeric", 0)) { return cudf::type_id::DECIMAL64; } + if (!columnTypeName.rfind("decimal", 0)) { return cudf::type_id::FLOAT64; } + if (!columnTypeName.rfind("numeric", 0)) { return cudf::type_id::FLOAT64; } if (!columnTypeName.rfind("real", 0)) { return cudf::type_id::FLOAT32; } if (!columnTypeName.rfind("double precision", 0)) { return cudf::type_id::FLOAT64; @@ -145,7 +50,7 @@ parse_postgresql_column_type(const std::string & columnTypeName) { if (!columnTypeName.rfind("bigserial", 0)) { return cudf::type_id::INT64; } if (!columnTypeName.rfind("boolean", 0)) { return cudf::type_id::BOOL8; } if (!columnTypeName.rfind("date", 0)) { - return cudf::type_id::TIMESTAMP_DAYS; + return cudf::type_id::TIMESTAMP_MILLISECONDS; } if (!columnTypeName.rfind("money", 0)) { return cudf::type_id::UINT64; } if (!columnTypeName.rfind("timestamp without time zone", 0)) { @@ -206,7 +111,9 @@ std::uint8_t postgresql_parser::parse_cudf_int8(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); - const std::int8_t value = *reinterpret_cast(result); + char * end; + const std::int8_t value = + static_cast(std::strtol(result, &end, 10)); v->at(row) = value; return 1; } @@ -219,8 +126,9 @@ postgresql_parser::parse_cudf_int16(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::int16_t value = - ntohs(*reinterpret_cast(result)); + static_cast(std::strtol(result, &end, 10)); v->at(row) = value; return 1; } @@ -233,8 +141,9 @@ postgresql_parser::parse_cudf_int32(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::int32_t value = - ntohl(*reinterpret_cast(result)); + static_cast(std::strtol(result, &end, 10)); v->at(row) = value; return 1; } @@ -247,8 +156,9 @@ postgresql_parser::parse_cudf_int64(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::int64_t value = - __builtin_bswap64(*reinterpret_cast(result)); + static_cast(std::strtoll(result, &end, 10)); v->at(row) = value; return 1; } @@ -261,7 +171,9 @@ postgresql_parser::parse_cudf_uint8(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); - const std::uint8_t value = *reinterpret_cast(result); + char * end; + const std::uint8_t value = + static_cast(std::strtoul(result, &end, 10)); v->at(row) = value; return 1; } @@ -274,8 +186,9 @@ postgresql_parser::parse_cudf_uint16(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::uint16_t value = - ntohs(*reinterpret_cast(result)); + static_cast(std::strtol(result, &end, 10)); v->at(row) = value; return 1; } @@ -288,8 +201,9 @@ postgresql_parser::parse_cudf_uint32(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::uint32_t value = - ntohl(*reinterpret_cast(result)); + static_cast(std::strtoul(result, &end, 10)); v->at(row) = value; return 1; } @@ -302,8 +216,9 @@ postgresql_parser::parse_cudf_uint64(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::uint64_t value = - __builtin_bswap64(*reinterpret_cast(result)); + static_cast(std::strtoull(result, &end, 10)); v->at(row) = value; return 1; } @@ -315,9 +230,8 @@ std::uint8_t postgresql_parser::parse_cudf_float32(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); - const std::int32_t casted = - ntohl(*reinterpret_cast(result)); - const float value = *reinterpret_cast(&casted); + char * end; + const float value = static_cast(std::strtof(result, &end)); v->at(row) = value; return 1; } @@ -329,9 +243,8 @@ std::uint8_t postgresql_parser::parse_cudf_float64(void * src, PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); - const std::int64_t casted = - __builtin_bswap64(*reinterpret_cast(result)); - const double value = *reinterpret_cast(&casted); + char * end; + const double value = static_cast(std::strtod(result, &end)); v->at(row) = value; return 1; } @@ -340,7 +253,21 @@ std::uint8_t postgresql_parser::parse_cudf_bool8(void * src, std::size_t col, std::size_t row, std::vector * v) { - return parse_cudf_int8(src, col, row, v); + PGresult * pgResult = static_cast(src); + if (PQgetisnull(pgResult, row, col)) { return 0; } + const std::string result = std::string{PQgetvalue(pgResult, row, col)}; + std::int8_t value; + if (result == "t") { + value = 1; + } else { + if (result == "f") { + value = 0; + } else { + return 0; + } + } + v->at(row) = value; + return 1; } std::uint8_t postgresql_parser::parse_cudf_timestamp_days(void * src, @@ -396,25 +323,7 @@ std::uint8_t postgresql_parser::parse_cudf_string(void * src, return 0; } const char * result = PQgetvalue(pgResult, row, col); - std::string data; - - // TODO(cristhian): convert oid to data type using postgresql pgtype table - // https://www.postgresql.org/docs/13/catalog-pg-type.html - switch (oid) { - case 1082: { // date - const std::int32_t value = - ntohl(*reinterpret_cast(result)); - data = date_to_string(value); - break; - } - case 1114: { // timestamp - const std::int64_t value = - __builtin_bswap64(*reinterpret_cast(result)); - data = timestamp_to_string(value); - break; - } - default: data = result; - } + std::string data = result; v->chars.insert(v->chars.end(), data.cbegin(), data.cend()); v->offsets.push_back(v->offsets.back() + data.length()); diff --git a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp index a3eacc7c6..6e57ba30a 100644 --- a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp +++ b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp @@ -36,7 +36,6 @@ class TableInfo { public: std::vector column_names; std::vector column_types; - std::vector column_bytes; std::size_t row_count; }; @@ -49,38 +48,42 @@ inline TableInfo ExecuteTableInfo(PGconn * connection, const sql_info & sql) { } int resultNtuples = PQntuples(result); - int resultNfields = PQnfields(result); TableInfo tableInfo; - tableInfo.column_names.reserve(resultNfields); - tableInfo.column_types.reserve(resultNfields); - tableInfo.row_count = static_cast(resultNtuples); + tableInfo.column_names.reserve(resultNtuples); + tableInfo.column_types.reserve(resultNtuples); int columnNameFn = PQfnumber(result, "column_name"); int dataTypeFn = PQfnumber(result, "data_type"); int characterMaximumLengthFn = PQfnumber(result, "character_maximum_length"); - for (int i = 0; i < resultNfields; i++) { + for (int i = 0; i < resultNtuples; i++) { tableInfo.column_names.emplace_back( std::string{PQgetvalue(result, i, columnNameFn)}); tableInfo.column_types.emplace_back( std::string{PQgetvalue(result, i, dataTypeFn)}); // NOTE character_maximum_length is used for char or byte string type - if (PQgetisnull(result, i, characterMaximumLengthFn)) { - // TODO(recy, cristhian): check the minimum size for types - tableInfo.column_bytes.emplace_back(8); - } else { + if (!PQgetisnull(result, i, characterMaximumLengthFn)) { const char * characterMaximumLengthBytes = PQgetvalue(result, i, characterMaximumLengthFn); // NOTE postgresql representation of number is in network order const std::uint32_t characterMaximumLength = ntohl(*reinterpret_cast( characterMaximumLengthBytes)); - tableInfo.column_bytes.emplace_back( - static_cast(characterMaximumLength)); } } - + PQclear(result); + const std::string query = "select count(*) from " + sql.table; + result = PQexec(connection, query.c_str()); + if (PQresultStatus(result) != PGRES_TUPLES_OK) { + PQclear(result); + PQfinish(connection); + throw std::runtime_error("Error access for columns info"); + } + const char * value = PQgetvalue(result, 0, 0); + char * end; + tableInfo.row_count = std::strtoll(value, &end, 10); + PQclear(result); return tableInfo; } @@ -103,7 +106,6 @@ postgresql_data_provider::postgresql_data_provider( TableInfo tableInfo = ExecuteTableInfo(connection, sql); column_names = tableInfo.column_names; column_types = tableInfo.column_types; - column_bytes = tableInfo.column_bytes; estimated_table_row_count = tableInfo.row_count; } @@ -132,17 +134,9 @@ data_handle postgresql_data_provider::get_next(bool open_file) { handle.sql_handle.column_names = column_names; handle.sql_handle.column_types = column_types; - if (!open_file) { return handle; } - - const std::string select_from = build_select_from(); - const std::string where = sql.table_filter.empty() ? "" : " where "; - const std::size_t offset = - sql.table_batch_size * - (batch_position * total_number_of_nodes + self_node_idx); - const std::string query = - select_from + where + sql.table_filter + build_limit_offset(offset); - + const std::string query = build_select_query(batch_position); batch_position++; + PGresult * result = PQexecParams(connection, query.c_str(), 0, @@ -150,8 +144,7 @@ data_handle postgresql_data_provider::get_next(bool open_file) { nullptr, nullptr, nullptr, - 1); - + 0); if (PQresultStatus(result) != PGRES_TUPLES_OK) { PQclear(result); PQfinish(connection); diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp index b6a2f3683..0fd0e9d3a 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp @@ -220,13 +220,7 @@ static inline std::size_t get_size_for_statement(sqlite3_stmt * stmt) { data_handle sqlite_data_provider::get_next(bool) { data_handle ret; - const std::string select_from = build_select_from(); - const std::string query = make_table_query_string(sql.table_batch_size, - sql.table_batch_size, - batch_position, - total_number_of_nodes, - self_node_idx, - select_from); + const std::string query = build_select_query(batch_position); batch_position++; std::shared_ptr stmt = execute_sqlite_query(db, query); diff --git a/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py b/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py index 1cb6c7452..706ea2540 100644 --- a/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py +++ b/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py @@ -57,8 +57,8 @@ def define_samples(sample_list: [Sample], table_mapper = None): data_types = [ #DataType.MYSQL, - #DataType.POSTGRESQL, - DataType.SQLITE, + DataType.POSTGRESQL, + #DataType.SQLITE, # TODO percy c.gonzales #DataType.POSTGRESQL, #DataType.SQLITE, @@ -205,6 +205,11 @@ def setup_test(data_type: DataType) -> createSchema.sql_connection: sqliteSchema.create_and_load_tpch_schema(sql) return sql + if data_type is DataType.POSTGRESQL: + from DataBase import postgreSQLSchema + postgreSQLSchema.create_and_load_tpch_schema(sql) + return sql + def executionTest(dask_client, drill, spark, dir_data_lc, bc, nRals, sql): extra_args = { From c51b670316f23087a7c56e312387b718f56c6544 Mon Sep 17 00:00:00 2001 From: gcca Date: Tue, 20 Apr 2021 07:18:44 -0500 Subject: [PATCH 08/14] [feature/table-from-postgresl-sqlite] update postgresql to manage char exception --- engine/src/cython/engine.cpp | 2 +- .../io/data_parser/sql/PostgreSQLParser.cpp | 14 +++++-- .../sql/PostgreSQLDataProvider.cpp | 39 +++++++++++++++---- .../sql/PostgreSQLDataProvider.h | 1 + 4 files changed, 44 insertions(+), 12 deletions(-) diff --git a/engine/src/cython/engine.cpp b/engine/src/cython/engine.cpp index 47fc7f570..6c6e6aeb4 100644 --- a/engine/src/cython/engine.cpp +++ b/engine/src/cython/engine.cpp @@ -98,10 +98,10 @@ std::pair, std::vector> get_l parser = std::make_shared(); auto sql = ral::io::getSqlInfo(args_map); provider = std::make_shared(sql, total_number_of_nodes, self_node_idx); + isSqlProvider = true; #else throw std::runtime_error("ERROR: This BlazingSQL version doesn't support PostgreSQL integration"); #endif - isSqlProvider = true; } else if(fileType == ral::io::DataType::SQLITE) { #ifdef SQLITE_SUPPORT parser = std::make_shared(); diff --git a/engine/src/io/data_parser/sql/PostgreSQLParser.cpp b/engine/src/io/data_parser/sql/PostgreSQLParser.cpp index 4cae3daf9..751311088 100644 --- a/engine/src/io/data_parser/sql/PostgreSQLParser.cpp +++ b/engine/src/io/data_parser/sql/PostgreSQLParser.cpp @@ -315,9 +315,6 @@ std::uint8_t postgresql_parser::parse_cudf_string(void * src, cudf_string_col * v) { PGresult * pgResult = static_cast(src); - Oid oid = PQftype(pgResult, col); - if (oid == InvalidOid) { throw std::runtime_error("Bad postgresql type"); } - if (PQgetisnull(pgResult, row, col)) { v->offsets.push_back(v->offsets.back()); return 0; @@ -325,6 +322,17 @@ std::uint8_t postgresql_parser::parse_cudf_string(void * src, const char * result = PQgetvalue(pgResult, row, col); std::string data = result; + // trim spaces because postgresql store chars with padding. + Oid oid = PQftype(pgResult, col); + if (oid == InvalidOid) { throw std::runtime_error("Bad postgresql type"); } + if (oid == static_cast(1042)) { + data.erase(std::find_if(data.rbegin(), + data.rend(), + [](unsigned char c) { return !std::isspace(c); }) + .base(), + data.end()); + } + v->chars.insert(v->chars.end(), data.cbegin(), data.cend()); v->offsets.push_back(v->offsets.back() + data.length()); return 1; diff --git a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp index 6e57ba30a..d03060013 100644 --- a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp +++ b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp @@ -89,6 +89,32 @@ inline TableInfo ExecuteTableInfo(PGconn * connection, const sql_info & sql) { } // namespace +static inline std::string FindKeyName(PGconn * connection, + const sql_info & sql) { + // This function exists because when we get batches from table we use LIMIT + // clause since postgresql returns unpredictable subsets of query's rows see + // https://www.postgresql.org/docs/13/queries-limit.html + std::ostringstream oss; + oss << "select column_name, ordinal_position" + " from information_schema.table_constraints tc" + " join information_schema.key_column_usage kcu" + " on tc.constraint_name = kcu.constraint_name" + " and tc.constraint_schema = kcu.constraint_schema" + " and tc.constraint_name = kcu.constraint_name" + " where tc.table_catalog = '" + << sql.schema << "' and tc.table_name = '" << sql.table + << "' and constraint_type = 'PRIMARY KEY'"; + const std::string query = oss.str(); + PGresult * result = PQexec(connection, query.c_str()); + if (PQresultStatus(result) != PGRES_TUPLES_OK) { + PQclear(result); + PQfinish(connection); + throw std::runtime_error("Error access for columns info"); + } + + return ""; +} + postgresql_data_provider::postgresql_data_provider( const sql_info & sql, std::size_t total_number_of_nodes, @@ -107,6 +133,7 @@ postgresql_data_provider::postgresql_data_provider( column_names = tableInfo.column_names; column_types = tableInfo.column_types; estimated_table_row_count = tableInfo.row_count; + keyname = FindKeyName(connection, sql); } postgresql_data_provider::~postgresql_data_provider() { PQfinish(connection); } @@ -134,22 +161,18 @@ data_handle postgresql_data_provider::get_next(bool open_file) { handle.sql_handle.column_names = column_names; handle.sql_handle.column_types = column_types; + if (open_file == false) { return handle; } + const std::string query = build_select_query(batch_position); batch_position++; - PGresult * result = PQexecParams(connection, - query.c_str(), - 0, - nullptr, - nullptr, - nullptr, - nullptr, - 0); + PGresult * result = PQexec(connection, query.c_str()); if (PQresultStatus(result) != PGRES_TUPLES_OK) { PQclear(result); PQfinish(connection); throw std::runtime_error("Error getting next batch from postgresql"); } + PQflush(connection); int resultNtuples = PQntuples(result); diff --git a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h index f6bde069e..277e2936c 100644 --- a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h +++ b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h @@ -41,6 +41,7 @@ class postgresql_data_provider : public abstractsql_data_provider { bool table_fetch_completed; std::size_t batch_position; std::size_t estimated_table_row_count; + std::string keyname; }; } /* namespace io */ From 1a0ae2804a684e69d889deefa63d426f8c192fd2 Mon Sep 17 00:00:00 2001 From: gcca Date: Wed, 21 Apr 2021 09:36:36 -0500 Subject: [PATCH 09/14] [feature/table-from-postgresl-sqlite] add order by argumento to select query --- .../sql/AbstractSQLDataProvider.cpp | 27 +++++++++++++------ .../sql/AbstractSQLDataProvider.h | 3 ++- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp index 1dd12710a..fd9c3c7a0 100644 --- a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp +++ b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp @@ -135,7 +135,9 @@ bool abstractsql_data_provider::set_predicate_pushdown(const std::string &queryS return !this->where.empty(); } -std::string abstractsql_data_provider::build_select_query(size_t batch_index) const { +std::string abstractsql_data_provider::build_select_query( + std::size_t batch_index, + const std::string & orderBy) const { std::string cols; if (this->column_indices.empty()) { @@ -152,19 +154,28 @@ std::string abstractsql_data_provider::build_select_query(size_t batch_index) co } } - const size_t offset = this->sql.table_batch_size * (this->total_number_of_nodes * batch_index + this->self_node_idx); - std::string limit = " LIMIT " + std::to_string(this->sql.table_batch_size) + " OFFSET " + std::to_string(offset); - auto ret = "SELECT " + cols + "FROM " + this->sql.table; + std::ostringstream oss; + + oss << "SELECT " << cols << " FROM " << this->sql.table; if (sql.table_filter.empty()) { - if (!this->where.empty()) { // then the filter is from the predicate pushdown{ - ret += " where " + this->where; + if (!this->where + .empty()) { // then the filter is from the predicate pushdown{ + oss << " where " << this->where; } } else { - ret += " where " + sql.table_filter; + oss << " where " << sql.table_filter; } - return ret + limit; + if (!orderBy.empty()) { oss << " order by " << orderBy; } + + const size_t offset = + (this->sql.table_batch_size - 1) * + (this->total_number_of_nodes * batch_index + this->self_node_idx); + + oss << " LIMIT " << (this->sql.table_batch_size - 1) << " OFFSET " << offset; + + return oss.str(); } } /* namespace io */ diff --git a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.h b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.h index 524ab1d7b..d40a4bc4c 100644 --- a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.h +++ b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.h @@ -59,7 +59,8 @@ class abstractsql_data_provider : public data_provider { protected: // returns SELECT ... FROM ... WHERE ... LIMIT ... OFFSET - std::string build_select_query(size_t batch_index) const; + std::string build_select_query(std::size_t batch_index, + const std::string & orderBy = "") const; protected: sql_info sql; From 2956ea2106462bb859b66036417a295a0aaa6769 Mon Sep 17 00:00:00 2001 From: gcca Date: Wed, 21 Apr 2021 09:37:47 -0500 Subject: [PATCH 10/14] [feature/table-from-postgresl-sqlite] check has_next using count(*) to avoid overlapping --- .../sql/PostgreSQLDataProvider.cpp | 87 +++++++++++++++++-- 1 file changed, 82 insertions(+), 5 deletions(-) diff --git a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp index d03060013..e2b9fe587 100644 --- a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp +++ b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp @@ -92,8 +92,9 @@ inline TableInfo ExecuteTableInfo(PGconn * connection, const sql_info & sql) { static inline std::string FindKeyName(PGconn * connection, const sql_info & sql) { // This function exists because when we get batches from table we use LIMIT - // clause since postgresql returns unpredictable subsets of query's rows see - // https://www.postgresql.org/docs/13/queries-limit.html + // clause and since postgresql returns unpredictable subsets of query's rows, + // we apply a group by a column in order to keep some order for result query + // see https://www.postgresql.org/docs/13/queries-limit.html std::ostringstream oss; oss << "select column_name, ordinal_position" " from information_schema.table_constraints tc" @@ -112,7 +113,72 @@ static inline std::string FindKeyName(PGconn * connection, throw std::runtime_error("Error access for columns info"); } - return ""; + if (PQntuples(result)) { + int columnNameFn = PQfnumber(result, "column_name"); + const std::string columnName{PQgetvalue(result, 0, columnNameFn)}; + PQclear(result); + if (columnName.empty()) { + throw std::runtime_error("No column name into result for primary key"); + } else { + return columnName; + } + } else { + // here table doesn't have a primary key, so we choose a column by type + // the primitive types like int or float have priority over other types + PQclear(result); + std::ostringstream oss; + oss << "select column_name, oid, case" + " when typname like 'int_' then 1" + " when typname like 'float_' then 2" + " else 99 end as typorder" + " from information_schema.tables as tables" + " join information_schema.columns as columns" + " on tables.table_name = columns.table_name" + " join pg_type on udt_name = typname where tables.table_catalog = '" + << sql.schema << "' and tables.table_name = '" << sql.table + << "' order by typorder, typlen desc, oid"; + + const std::string query = oss.str(); + PGresult * result = PQexec(connection, query.c_str()); + if (PQresultStatus(result) != PGRES_TUPLES_OK) { + PQclear(result); + PQfinish(connection); + throw std::runtime_error("Error access for columns info"); + } + + if (PQntuples(result)) { + int columnNameFn = PQfnumber(result, "column_name"); + const std::string columnName{PQgetvalue(result, 0, columnNameFn)}; + PQclear(result); + if (columnName.empty()) { + throw std::runtime_error("No column name into result for column type"); + } else { + return columnName; + } + } + PQclear(result); + } + throw std::runtime_error("There is no a key name candidate"); +} + +static inline bool IsThereNext(PGconn * connection, const std::string & query) { + std::ostringstream oss; + oss << "select count(*) from (" << query << ") as t"; + const std::string count = oss.str(); + PGresult * result = PQexec(connection, count.c_str()); + if (PQresultStatus(result) != PGRES_TUPLES_OK) { + PQclear(result); + PQfinish(connection); + throw std::runtime_error("Count query batch"); + } + + const char * data = PQgetvalue(result, 0, 0); + char * end; + const std::size_t value = + static_cast(std::strtoll(data, &end, 10)); + PQclear(result); + + return value == 0; } postgresql_data_provider::postgresql_data_provider( @@ -163,7 +229,9 @@ data_handle postgresql_data_provider::get_next(bool open_file) { if (open_file == false) { return handle; } - const std::string query = build_select_query(batch_position); + std::ostringstream oss; + oss << build_select_query(batch_position, keyname); + const std::string query = oss.str(); batch_position++; PGresult * result = PQexec(connection, query.c_str()); @@ -175,8 +243,17 @@ data_handle postgresql_data_provider::get_next(bool open_file) { PQflush(connection); int resultNtuples = PQntuples(result); + { + std::ostringstream oss; + oss << "\033[32mQUERY: " << query << std::endl + << "COUNT: " << resultNtuples << "\033[0m" << std::endl; + std::cout << oss.str(); + } - if (!resultNtuples) { table_fetch_completed = true; } + if (!resultNtuples || + IsThereNext(connection, build_select_query(batch_position, keyname))) { + table_fetch_completed = true; + } handle.sql_handle.postgresql_result.reset(result, PQclear); handle.sql_handle.row_count = PQntuples(result); From 7267c9c5e8d2e0dcd7c652a31bc6c66fce0a965c Mon Sep 17 00:00:00 2001 From: gcca Date: Wed, 21 Apr 2021 09:38:36 -0500 Subject: [PATCH 11/14] [feature/table-from-postgresl-sqlite] update sqlite provider to consider opened file --- .../data_provider/sql/SQLiteDataProvider.cpp | 45 +++++++------------ 1 file changed, 17 insertions(+), 28 deletions(-) diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp index 0fd0e9d3a..339bb07fa 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp @@ -55,8 +55,7 @@ execute_sqlite_query(sqlite3 * db, const std::string & query) { throw std::runtime_error{oss.str()}; } - auto sqlite_deleter = [](sqlite3_stmt * stmt) { sqlite3_finalize(stmt); }; - return std::shared_ptr{stmt, sqlite_deleter}; + return std::shared_ptr{stmt, sqlite3_finalize}; } static inline sqlite_table_info @@ -88,8 +87,8 @@ static inline sqlite_columns_info get_sqlite_columns_info(sqlite3 * db, const std::string & table) { sqlite_columns_info ret; std::string query = "PRAGMA table_info(" + table + ")"; - auto A = execute_sqlite_query(db, query); - sqlite3_stmt * stmt = A.get(); + auto stmt_ptr = execute_sqlite_query(db, query); + sqlite3_stmt * stmt = stmt_ptr.get(); int rc = SQLITE_ERROR; while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { @@ -149,28 +148,15 @@ std::shared_ptr sqlite_data_provider::clone() { self_node_idx); } -static inline std::string -make_table_query_string(const std::size_t limit, - const std::size_t batch_size, - const std::size_t batch_position, - const std::size_t number_of_nodes, - const std::size_t node_idx, - const std::string & select_from) { +bool sqlite_data_provider::has_next() { + // We need this implementation here becuase SQLite doesn't have a method to + // get the length of rows into a sqlite3_statement const std::size_t offset = - batch_size * (batch_position * number_of_nodes + node_idx); + sql.table_batch_size * + (batch_position * total_number_of_nodes + self_node_idx); std::ostringstream oss; - oss << select_from << " LIMIT " << limit << " OFFSET " << offset; - return oss.str(); -} - -bool sqlite_data_provider::has_next() { - const std::string query = - make_table_query_string(1, - sql.table_batch_size, - batch_position, - total_number_of_nodes, - self_node_idx, - "select * from " + sql.table); + oss << "SELECT * FROM " << sql.table << " LIMIT 1 OFFSET " << offset; + const std::string query = oss.str(); bool it_has = false; int errorCode = sqlite3_exec( db, @@ -217,17 +203,20 @@ static inline std::size_t get_size_for_statement(sqlite3_stmt * stmt) { return nRows; } -data_handle sqlite_data_provider::get_next(bool) { +data_handle sqlite_data_provider::get_next(bool open_file) { data_handle ret; + ret.sql_handle.table = sql.table; + ret.sql_handle.column_names = column_names; + ret.sql_handle.column_types = column_types; + + if (open_file == false) { return ret; } + const std::string query = build_select_query(batch_position); batch_position++; std::shared_ptr stmt = execute_sqlite_query(db, query); - ret.sql_handle.table = sql.table; - ret.sql_handle.column_names = column_names; - ret.sql_handle.column_types = column_types; ret.sql_handle.row_count = get_size_for_statement(stmt.get()); ret.sql_handle.sqlite_statement = stmt; From 6f1c5fc33ba5fcb3b0352040f8e31a24aa18794b Mon Sep 17 00:00:00 2001 From: gcca Date: Wed, 21 Apr 2021 09:58:03 -0500 Subject: [PATCH 12/14] [feature/table-from-postgresl-sqlite] remove some tries to fix postgresql provider --- engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp | 4 ++-- engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp | 4 ++-- tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py | 5 +---- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp index fd9c3c7a0..78574ceee 100644 --- a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp +++ b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp @@ -170,10 +170,10 @@ std::string abstractsql_data_provider::build_select_query( if (!orderBy.empty()) { oss << " order by " << orderBy; } const size_t offset = - (this->sql.table_batch_size - 1) * + this->sql.table_batch_size * (this->total_number_of_nodes * batch_index + this->self_node_idx); - oss << " LIMIT " << (this->sql.table_batch_size - 1) << " OFFSET " << offset; + oss << " LIMIT " << this->sql.table_batch_size << " OFFSET " << offset; return oss.str(); } diff --git a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp index e2b9fe587..8ce27b06f 100644 --- a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp +++ b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp @@ -245,8 +245,8 @@ data_handle postgresql_data_provider::get_next(bool open_file) { int resultNtuples = PQntuples(result); { std::ostringstream oss; - oss << "\033[32mQUERY: " << query << std::endl - << "COUNT: " << resultNtuples << "\033[0m" << std::endl; + oss << "QUERY: " << query << std::endl + << "COUNT: " << resultNtuples << std::endl; std::cout << oss.str(); } diff --git a/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py b/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py index 706ea2540..bf5a18148 100644 --- a/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py +++ b/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py @@ -57,11 +57,8 @@ def define_samples(sample_list: [Sample], table_mapper = None): data_types = [ #DataType.MYSQL, - DataType.POSTGRESQL, - #DataType.SQLITE, - # TODO percy c.gonzales #DataType.POSTGRESQL, - #DataType.SQLITE, + DataType.SQLITE ] tables = [ From 62d0cfbc246285cd639124b5d22da334b0720342 Mon Sep 17 00:00:00 2001 From: gcca Date: Tue, 4 May 2021 10:11:42 -0500 Subject: [PATCH 13/14] [feature/table-from-postgresl-sqlite] update changelog --- CHANGELOG.md | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b7010d92..258a14ea3 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## New Features - #1445 Support for CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP +- #1453 Create tables from SQLite data provider ## Improvements @@ -18,7 +19,7 @@ ## New Features - #1367 OverlapAccumulator Kernel - #1364 Implement the concurrent API (bc.sql with token, bc.status, bc.fetch) -- #1426 Window Functions without partitioning +- #1426 Window Functions without partitioning - #1349 Add e2e test for Hive Partitioned Data - #1396 Create tables from other RDBMS - #1427 Support for CONCAT alias operator @@ -26,7 +27,7 @@ - #1472 Implement predicate pushdown for data providers ## Improvements -- #1325 Refactored CacheMachine.h and CacheMachine.cpp +- #1325 Refactored CacheMachine.h and CacheMachine.cpp - #1322 Updated and enabled several E2E tests - #1333 Fixing build due to cudf update - #1344 Removed GPUCacheDataMetadata class @@ -35,11 +36,11 @@ - #1331 Added flag to enable null e2e testing - #1418 Adding support for docker image - #1434 Added documentation for C++ and Python in Sphinx -- #1419 Added concat cache machine timeout +- #1419 Added concat cache machine timeout - #1444 Updating GCP to >= version - #1349 Add e2e test for Hive Partitioned Data - #1447 Improve getting estimated output num rows -- #1473 Added Warning to Window Functions +- #1473 Added Warning to Window Functions ## Bug Fixes - #1335 Fixing uninitialized var in orc metadata and handling the parseMetadata exceptions properly @@ -49,18 +50,18 @@ - #1350 Fixed bug where there are no projects in a bindable table scan - #1359 Avoid cuda issues when free pinned memory - #1365 Fixed build after sublibs changes on cudf -- #1369 Updated java path for powerpc build +- #1369 Updated java path for powerpc build - #1371 Fixed e2e settings - #1372 Recompute `columns_to_hash` in DistributeAggregationKernel - #1375 Fix empty row_group_ids for parquet -- #1380 Fixed issue with int64 literal values +- #1380 Fixed issue with int64 literal values - #1379 Remove ProjectRemoveRule - #1389 Fix issue when CAST a literal - #1387 Skip getting orc metadata for decimal type - #1392 Fix substrings with nulls - #1398 Fix performance regression - #1401 Fix support for minus unary operation -- #1415 Fixed bug where num_batches was not getting set in BindableTableScan +- #1415 Fixed bug where num_batches was not getting set in BindableTableScan - #1413 Fix for null tests 13 and 23 of windowFunctionTest - #1416 Fix full join when both tables contains nulls - #1423 Fix temporary directory for hive partition test @@ -73,7 +74,7 @@ ## Deprecated Features -- #1394 Disabled support for outer joins with inequalities +- #1394 Disabled support for outer joins with inequalities # BlazingSQL 0.18.0 (Date TBS) @@ -86,7 +87,7 @@ - #1238 Implements MergeStramKernel executor model - #1259 Implements SortAndSamplernel executor model, also avoid setting up num of samples - #1271 Added Hive utility for partitioned data -- #1289 Multiple concurrent query support +- #1289 Multiple concurrent query support - #1285 Infer PROTOCOL when Dask client is passed - #1294 Add config options for logger - #1301 Added usage of pinned buffers for communication and fixes various UCX related bugs @@ -95,7 +96,7 @@ - #1303 Add support for INITCAP - #1313 getting and using ORC metadata - #1347 Fixing issue when reading orc metadata from DATE dtype -- #1338 Window Function support for LEAD and LAG statements +- #1338 Window Function support for LEAD and LAG statements - #1362 give useful message when file extension is not recognized - #1361 Supporting first_value and last_value for Window Function @@ -118,7 +119,7 @@ - #1308 Improve the engine loggers - #1314 Added unit tests to verify that OOM error handling works well - #1320 Revamping cache logger -- #1323 Made progress bar update continuously and stay after query is done +- #1323 Made progress bar update continuously and stay after query is done - #1336 Improvements for the cache API - #1483 Improve dependencies script @@ -132,7 +133,7 @@ - #1277 Support FileSystems (GS, S3) when extension of the files are not provided - #1300 Fixed issue when creating tables from a local dir relative path - #1312 Fix progress bar for jupyterlab -- #1318 Disabled require acknowledge +- #1318 Disabled require acknowledge # BlazingSQL 0.17.0 (December 10, 2020) From 1ab0d2a862d0f06150eec9534fd5d12a235e4ed1 Mon Sep 17 00:00:00 2001 From: gcca Date: Tue, 4 May 2021 10:12:53 -0500 Subject: [PATCH 14/14] [feature/table-from-postgresl-sqlite] update changelog --- CHANGELOG.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 83a9b2ec3..c7a038865 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,10 @@ ## New Features -- #1471 Unbounded partitioned windows +- #1471 Unbounded partitioned windows - #1445 Support for CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP - #1505 Support for right outer join - +- #1453 Create tables from SQLite data provider ## Improvements - #1464 Better Support for unsigned types in C++ side @@ -162,7 +162,7 @@ - #1201 Implement string TRIM - #1216 Add unit test for DAYOFWEEK - #1205 Implement string REVERSE -- #1220 Implement string LEFT and RIGHT +- #1220 Implement string LEFT and RIGHT - #1223 Add support for UNION statement - #1250 updated README.md and CHANGELOG and others preparing for 0.17 release @@ -221,7 +221,7 @@ - #1203 Changed code back so that parquet is not read a single rowgroup at a time - #1207 Calcite uses literal as int32 if not explicit CAST was provided - #1212 Fixed issue when building the thirdpart, cmake version set to 3.18.4 -- #1225 Fixed issue due to change in gather API +- #1225 Fixed issue due to change in gather API - #1254 Fixing support of nightly and stable on localhost - #1258 Fixing gtest version issue