diff --git a/CHANGELOG.md b/CHANGELOG.md index c8a53c5b4..c7a038865 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,10 @@ ## New Features -- #1471 Unbounded partitioned windows +- #1471 Unbounded partitioned windows - #1445 Support for CURRENT_DATE, CURRENT_TIME and CURRENT_TIMESTAMP - #1505 Support for right outer join - +- #1453 Create tables from SQLite data provider ## Improvements - #1464 Better Support for unsigned types in C++ side @@ -24,7 +24,7 @@ ## New Features - #1367 OverlapAccumulator Kernel - #1364 Implement the concurrent API (bc.sql with token, bc.status, bc.fetch) -- #1426 Window Functions without partitioning +- #1426 Window Functions without partitioning - #1349 Add e2e test for Hive Partitioned Data - #1396 Create tables from other RDBMS - #1427 Support for CONCAT alias operator @@ -32,7 +32,7 @@ - #1472 Implement predicate pushdown for data providers ## Improvements -- #1325 Refactored CacheMachine.h and CacheMachine.cpp +- #1325 Refactored CacheMachine.h and CacheMachine.cpp - #1322 Updated and enabled several E2E tests - #1333 Fixing build due to cudf update - #1344 Removed GPUCacheDataMetadata class @@ -41,7 +41,7 @@ - #1331 Added flag to enable null e2e testing - #1418 Adding support for docker image - #1434 Added documentation for C++ and Python in Sphinx -- #1419 Added concat cache machine timeout +- #1419 Added concat cache machine timeout - #1444 Updating GCP to >= version - #1349 Add e2e test for Hive Partitioned Data - #1447 Improve getting estimated output num rows @@ -58,18 +58,18 @@ - #1350 Fixed bug where there are no projects in a bindable table scan - #1359 Avoid cuda issues when free pinned memory - #1365 Fixed build after sublibs changes on cudf -- #1369 Updated java path for powerpc build +- #1369 Updated java path for powerpc build - #1371 Fixed e2e settings - #1372 Recompute `columns_to_hash` in DistributeAggregationKernel - #1375 Fix empty row_group_ids for parquet -- #1380 Fixed issue with int64 literal values +- #1380 Fixed issue with int64 literal values - #1379 Remove ProjectRemoveRule - #1389 Fix issue when CAST a literal - #1387 Skip getting orc metadata for decimal type - #1392 Fix substrings with nulls - #1398 Fix performance regression - #1401 Fix support for minus unary operation -- #1415 Fixed bug where num_batches was not getting set in BindableTableScan +- #1415 Fixed bug where num_batches was not getting set in BindableTableScan - #1413 Fix for null tests 13 and 23 of windowFunctionTest - #1416 Fix full join when both tables contains nulls - #1423 Fix temporary directory for hive partition test @@ -82,7 +82,7 @@ - #1504 Fixing some conflicts in Dockerfile ## Deprecated Features -- #1394 Disabled support for outer joins with inequalities +- #1394 Disabled support for outer joins with inequalities # BlazingSQL 0.18.0 (February 24, 2021) @@ -95,7 +95,7 @@ - #1238 Implements MergeStramKernel executor model - #1259 Implements SortAndSamplernel executor model, also avoid setting up num of samples - #1271 Added Hive utility for partitioned data -- #1289 Multiple concurrent query support +- #1289 Multiple concurrent query support - #1285 Infer PROTOCOL when Dask client is passed - #1294 Add config options for logger - #1301 Added usage of pinned buffers for communication and fixes various UCX related bugs @@ -104,7 +104,7 @@ - #1303 Add support for INITCAP - #1313 getting and using ORC metadata - #1347 Fixing issue when reading orc metadata from DATE dtype -- #1338 Window Function support for LEAD and LAG statements +- #1338 Window Function support for LEAD and LAG statements - #1362 give useful message when file extension is not recognized - #1361 Supporting first_value and last_value for Window Function @@ -127,7 +127,7 @@ - #1308 Improve the engine loggers - #1314 Added unit tests to verify that OOM error handling works well - #1320 Revamping cache logger -- #1323 Made progress bar update continuously and stay after query is done +- #1323 Made progress bar update continuously and stay after query is done - #1336 Improvements for the cache API - #1483 Improve dependencies script @@ -141,7 +141,7 @@ - #1277 Support FileSystems (GS, S3) when extension of the files are not provided - #1300 Fixed issue when creating tables from a local dir relative path - #1312 Fix progress bar for jupyterlab -- #1318 Disabled require acknowledge +- #1318 Disabled require acknowledge # BlazingSQL 0.17.0 (December 10, 2020) @@ -162,7 +162,7 @@ - #1201 Implement string TRIM - #1216 Add unit test for DAYOFWEEK - #1205 Implement string REVERSE -- #1220 Implement string LEFT and RIGHT +- #1220 Implement string LEFT and RIGHT - #1223 Add support for UNION statement - #1250 updated README.md and CHANGELOG and others preparing for 0.17 release @@ -221,7 +221,7 @@ - #1203 Changed code back so that parquet is not read a single rowgroup at a time - #1207 Calcite uses literal as int32 if not explicit CAST was provided - #1212 Fixed issue when building the thirdpart, cmake version set to 3.18.4 -- #1225 Fixed issue due to change in gather API +- #1225 Fixed issue due to change in gather API - #1254 Fixing support of nightly and stable on localhost - #1258 Fixing gtest version issue diff --git a/engine/src/cython/engine.cpp b/engine/src/cython/engine.cpp index be76692cd..72ef9eac2 100644 --- a/engine/src/cython/engine.cpp +++ b/engine/src/cython/engine.cpp @@ -98,11 +98,11 @@ std::pair, std::vector> get_l #ifdef POSTGRESQL_SUPPORT parser = std::make_shared(); auto sql = ral::io::getSqlInfo(args_map); - provider = std::make_shared(sql, 0, 0); + provider = std::make_shared(sql, total_number_of_nodes, self_node_idx); + isSqlProvider = true; #else throw std::runtime_error("ERROR: This BlazingSQL version doesn't support PostgreSQL integration"); #endif - isSqlProvider = true; } else if(fileType == ral::io::DataType::SQLITE) { #ifdef SQLITE_SUPPORT parser = std::make_shared(); @@ -205,7 +205,7 @@ std::shared_ptr runGenerateGraph(uint32_t masterIndex, { using blazingdb::manager::Context; using blazingdb::transport::Node; - + auto& communicationData = ral::communication::CommunicationData::getInstance(); std::vector contextNodes; diff --git a/engine/src/io/data_parser/sql/PostgreSQLParser.cpp b/engine/src/io/data_parser/sql/PostgreSQLParser.cpp index a933bd4b1..751311088 100644 --- a/engine/src/io/data_parser/sql/PostgreSQLParser.cpp +++ b/engine/src/io/data_parser/sql/PostgreSQLParser.cpp @@ -20,140 +20,59 @@ namespace ral { namespace io { -static const std::array postgresql_string_type_hints = { - "character", "character varying", "bytea", "text", "anyarray", "name"}; +static const std::array postgresql_string_type_hints = + {"character", "character varying", "bytea", "text", "anyarray", "name"}; static inline bool postgresql_is_cudf_string(const std::string & hint) { - const auto * result = std::find_if(std::cbegin(postgresql_string_type_hints), - std::cend(postgresql_string_type_hints), - [&hint]( - const char * c_hint) { return std::strcmp(c_hint, hint.c_str()) == 0; }); + const auto * result = std::find_if( + std::cbegin(postgresql_string_type_hints), + std::cend(postgresql_string_type_hints), + [&hint](const char * c_hint) { return !hint.rfind(c_hint, 0); }); return result != std::cend(postgresql_string_type_hints); } -static inline void date_to_ymd(std::int32_t jd, - std::int32_t & year, - std::int32_t & month, - std::int32_t & day) { - std::uint32_t julian, quad, extra; - std::int32_t y; - julian = static_cast(jd); - julian += 32044; - quad = julian / 146097; - extra = (julian - quad * 146097) * 4 + 3; - julian += 60 + quad * 3 + extra / 146097; - quad = julian / 1461; - julian -= quad * 1461; - y = julian * 4 / 1461; - julian = ((y != 0) ? (julian + 305) % 365 : (julian + 306) % 366) + 123; - y += quad * 4; - year = y - 4800; - quad = julian * 2141 / 65536; - day = julian - 7834 * quad / 256; - month = (quad + 10) % 12 + 1; -} - -static inline void time_to_hms(std::int64_t jd, - std::int32_t & hour, - std::int32_t & min, - std::int32_t & sec, - std::int32_t & msec) { - std::int64_t time; - const std::int64_t USECS_PER_HOUR = 3600000000; - const std::int64_t USECS_PER_MINUTE = 60000000; - const std::int64_t USECS_PER_SEC = 1000000; - time = jd; - hour = time / USECS_PER_HOUR; - time -= (hour) *USECS_PER_HOUR; - min = time / USECS_PER_MINUTE; - time -= (min) *USECS_PER_MINUTE; - sec = time / USECS_PER_SEC; - msec = time - (sec * USECS_PER_SEC); -} - -static inline int timestamp_to_tm( - std::int64_t dt, struct tm & tm, std::int32_t & msec) { - std::int64_t dDate, POSTGRESQL_EPOCH_DATE = 2451545; - std::int64_t time; - std::uint64_t USECS_PER_DAY = 86400000000; - time = dt; - - dDate = time / USECS_PER_DAY; - if (dDate != 0) { time -= dDate * USECS_PER_DAY; } - - if (time < 0) { - time += USECS_PER_DAY; - dDate -= 1; - } - dDate += POSTGRESQL_EPOCH_DATE; - - date_to_ymd( - static_cast(dDate), tm.tm_year, tm.tm_mon, tm.tm_mday); - time_to_hms(time, tm.tm_hour, tm.tm_min, tm.tm_sec, msec); -} - -static inline std::string date_to_string(std::int32_t jd) { - // For date conversion see - // https://doxygen.postgresql.org/backend_2utils_2adt_2datetime_8c.html#a889d375aaf2a25be071d818565142b9e - const std::int32_t POSTGRESQL_EPOCH_DATE = 2451545; - std::int32_t year, month, day; - date_to_ymd(POSTGRESQL_EPOCH_DATE + jd, year, month, day); - std::stringstream oss; - oss << year << '-' << std::setfill('0') << std::setw(2) << month << '-' - << std::setw(2) << day; - return oss.str(); -} - -static inline std::string timestamp_to_string(std::int64_t tstamp) { - // For timestamp conversion see - // https://doxygen.postgresql.org/backend_2utils_2adt_2timestamp_8c.html#a933dc09a38ddcf144a48b2aaf5790893 - struct tm tm; - std::int32_t msec; - timestamp_to_tm(tstamp, tm, msec); - std::stringstream oss; - oss << tm.tm_year << '-' << std::setfill('0') << std::setw(2) << tm.tm_mon - << '-' << std::setw(2) << tm.tm_mday << ' ' << std::setw(2) << tm.tm_hour - << ':' << std::setw(2) << tm.tm_min << ':' << std::setw(2) << tm.tm_sec; - if (msec != 0) { oss << '.' << std::setw(6) << msec; } - return oss.str(); -} - -static inline cudf::type_id parse_postgresql_column_type( - const std::string & columnTypeName) { +static inline cudf::type_id +parse_postgresql_column_type(const std::string & columnTypeName) { if (postgresql_is_cudf_string(columnTypeName)) { return cudf::type_id::STRING; } - if (columnTypeName == "smallint") { return cudf::type_id::INT16; } - if (columnTypeName == "integer") { return cudf::type_id::INT32; } - if (columnTypeName == "bigint") { return cudf::type_id::INT64; } - if (columnTypeName == "decimal") { return cudf::type_id::DECIMAL64; } - if (columnTypeName == "numeric") { return cudf::type_id::DECIMAL64; } - if (columnTypeName == "real") { return cudf::type_id::FLOAT32; } - if (columnTypeName == "double precision") { return cudf::type_id::FLOAT64; } - if (columnTypeName == "smallserial") { return cudf::type_id::INT16; } - if (columnTypeName == "serial") { return cudf::type_id::INT32; } - if (columnTypeName == "bigserial") { return cudf::type_id::INT64; } - if (columnTypeName == "boolean") { return cudf::type_id::BOOL8; } - if (columnTypeName == "date") { return cudf::type_id::TIMESTAMP_DAYS; } - if (columnTypeName == "money") { return cudf::type_id::UINT64; } - if (columnTypeName == "timestamp without time zone") { + if (!columnTypeName.rfind("smallint", 0)) { return cudf::type_id::INT16; } + if (!columnTypeName.rfind("integer", 0)) { return cudf::type_id::INT32; } + if (!columnTypeName.rfind("bigint", 0)) { return cudf::type_id::INT64; } + if (!columnTypeName.rfind("decimal", 0)) { return cudf::type_id::FLOAT64; } + if (!columnTypeName.rfind("numeric", 0)) { return cudf::type_id::FLOAT64; } + if (!columnTypeName.rfind("real", 0)) { return cudf::type_id::FLOAT32; } + if (!columnTypeName.rfind("double precision", 0)) { + return cudf::type_id::FLOAT64; + } + if (!columnTypeName.rfind("smallserial", 0)) { return cudf::type_id::INT16; } + if (!columnTypeName.rfind("serial", 0)) { return cudf::type_id::INT32; } + if (!columnTypeName.rfind("bigserial", 0)) { return cudf::type_id::INT64; } + if (!columnTypeName.rfind("boolean", 0)) { return cudf::type_id::BOOL8; } + if (!columnTypeName.rfind("date", 0)) { return cudf::type_id::TIMESTAMP_MILLISECONDS; } - if (columnTypeName == "timestamp with time zone") { + if (!columnTypeName.rfind("money", 0)) { return cudf::type_id::UINT64; } + if (!columnTypeName.rfind("timestamp without time zone", 0)) { return cudf::type_id::TIMESTAMP_MILLISECONDS; } - if (columnTypeName == "time without time zone") { + if (!columnTypeName.rfind("timestamp with time zone", 0)) { + return cudf::type_id::TIMESTAMP_MILLISECONDS; + } + if (!columnTypeName.rfind("time without time zone", 0)) { return cudf::type_id::DURATION_MILLISECONDS; } - if (columnTypeName == "time with time zone") { + if (!columnTypeName.rfind("time with time zone", 0)) { return cudf::type_id::DURATION_MILLISECONDS; } - if (columnTypeName == "interval") { + if (!columnTypeName.rfind("interval", 0)) { return cudf::type_id::DURATION_MILLISECONDS; } - if (columnTypeName == "inet") { return cudf::type_id::UINT64; } - if (columnTypeName == "USER-DEFINED") { return cudf::type_id::STRUCT; } - if (columnTypeName == "ARRAY") { return cudf::type_id::LIST; } + if (!columnTypeName.rfind("inet", 0)) { return cudf::type_id::UINT64; } + if (!columnTypeName.rfind("USER-DEFINED", 0)) { + return cudf::type_id::STRUCT; + } + if (!columnTypeName.rfind("ARRAY", 0)) { return cudf::type_id::LIST; } throw std::runtime_error("PostgreSQL type hint not found: " + columnTypeName); } @@ -162,200 +81,256 @@ postgresql_parser::postgresql_parser() postgresql_parser::~postgresql_parser() = default; -void postgresql_parser::read_sql_loop(void * src, - const std::vector & cudf_types, - const std::vector & column_indices, - std::vector & host_cols, - std::vector> & null_masks) { +void postgresql_parser::read_sql_loop( + void * src, + const std::vector & cudf_types, + const std::vector & column_indices, + std::vector & host_cols, + std::vector> & null_masks) { PGresult * result = static_cast(src); const int ntuples = PQntuples(result); for (int rowCounter = 0; rowCounter < ntuples; rowCounter++) { - parse_sql( - src, column_indices, cudf_types, rowCounter, host_cols, null_masks); + parse_sql(src, + column_indices, + cudf_types, + rowCounter, + host_cols, + null_masks); } } -cudf::type_id postgresql_parser::get_cudf_type_id( - const std::string & sql_column_type) { +cudf::type_id +postgresql_parser::get_cudf_type_id(const std::string & sql_column_type) { return parse_postgresql_column_type(sql_column_type); } -std::uint8_t postgresql_parser::parse_cudf_int8( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t postgresql_parser::parse_cudf_int8(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); - const std::int8_t value = *reinterpret_cast(result); + char * end; + const std::int8_t value = + static_cast(std::strtol(result, &end, 10)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_int16( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_int16(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::int16_t value = - ntohs(*reinterpret_cast(result)); + static_cast(std::strtol(result, &end, 10)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_int32( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_int32(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::int32_t value = - ntohl(*reinterpret_cast(result)); + static_cast(std::strtol(result, &end, 10)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_int64( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_int64(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::int64_t value = - __builtin_bswap64(*reinterpret_cast(result)); + static_cast(std::strtoll(result, &end, 10)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_uint8( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_uint8(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); - const std::uint8_t value = *reinterpret_cast(result); + char * end; + const std::uint8_t value = + static_cast(std::strtoul(result, &end, 10)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_uint16(void * src, - std::size_t col, - std::size_t row, - std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_uint16(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::uint16_t value = - ntohs(*reinterpret_cast(result)); + static_cast(std::strtol(result, &end, 10)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_uint32(void * src, - std::size_t col, - std::size_t row, - std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_uint32(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::uint32_t value = - ntohl(*reinterpret_cast(result)); + static_cast(std::strtoul(result, &end, 10)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_uint64(void * src, - std::size_t col, - std::size_t row, - std::vector * v) { +std::uint8_t +postgresql_parser::parse_cudf_uint64(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); + char * end; const std::uint64_t value = - __builtin_bswap64(*reinterpret_cast(result)); + static_cast(std::strtoull(result, &end, 10)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_float32( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t postgresql_parser::parse_cudf_float32(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); - const std::int32_t casted = - ntohl(*reinterpret_cast(result)); - const float value = *reinterpret_cast(&casted); + char * end; + const float value = static_cast(std::strtof(result, &end)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_float64( - void * src, std::size_t col, std::size_t row, std::vector * v) { +std::uint8_t postgresql_parser::parse_cudf_float64(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { PGresult * pgResult = static_cast(src); if (PQgetisnull(pgResult, row, col)) { return 0; } const char * result = PQgetvalue(pgResult, row, col); - const std::int64_t casted = - __builtin_bswap64(*reinterpret_cast(result)); - const double value = *reinterpret_cast(&casted); + char * end; + const double value = static_cast(std::strtod(result, &end)); v->at(row) = value; return 1; } -std::uint8_t postgresql_parser::parse_cudf_bool8( - void * src, std::size_t col, std::size_t row, std::vector * v) { - return parse_cudf_int8(src, col, row, v); +std::uint8_t postgresql_parser::parse_cudf_bool8(void * src, + std::size_t col, + std::size_t row, + std::vector * v) { + PGresult * pgResult = static_cast(src); + if (PQgetisnull(pgResult, row, col)) { return 0; } + const std::string result = std::string{PQgetvalue(pgResult, row, col)}; + std::int8_t value; + if (result == "t") { + value = 1; + } else { + if (result == "f") { + value = 0; + } else { + return 0; + } + } + v->at(row) = value; + return 1; } -std::uint8_t postgresql_parser::parse_cudf_timestamp_days( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t postgresql_parser::parse_cudf_timestamp_days(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_timestamp_seconds( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t +postgresql_parser::parse_cudf_timestamp_seconds(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_timestamp_milliseconds( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t +postgresql_parser::parse_cudf_timestamp_milliseconds(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_timestamp_microseconds( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t +postgresql_parser::parse_cudf_timestamp_microseconds(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_timestamp_nanoseconds( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t +postgresql_parser::parse_cudf_timestamp_nanoseconds(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { return parse_cudf_string(src, col, row, v); } -std::uint8_t postgresql_parser::parse_cudf_string( - void * src, std::size_t col, std::size_t row, cudf_string_col * v) { +std::uint8_t postgresql_parser::parse_cudf_string(void * src, + std::size_t col, + std::size_t row, + cudf_string_col * v) { PGresult * pgResult = static_cast(src); - Oid oid = PQftype(pgResult, col); - if (oid == InvalidOid) { throw std::runtime_error("Bad postgresql type"); } - if (PQgetisnull(pgResult, row, col)) { v->offsets.push_back(v->offsets.back()); return 0; } const char * result = PQgetvalue(pgResult, row, col); - std::string data; - - // TODO(cristhian): convert oid to data type using postgresql pgtype table - // https://www.postgresql.org/docs/13/catalog-pg-type.html - switch (oid) { - case 1082: { // date - const std::int32_t value = - ntohl(*reinterpret_cast(result)); - data = date_to_string(value); - break; - } - case 1114: { // timestamp - const std::int64_t value = - __builtin_bswap64(*reinterpret_cast(result)); - data = timestamp_to_string(value); - break; - } - default: data = result; + std::string data = result; + + // trim spaces because postgresql store chars with padding. + Oid oid = PQftype(pgResult, col); + if (oid == InvalidOid) { throw std::runtime_error("Bad postgresql type"); } + if (oid == static_cast(1042)) { + data.erase(std::find_if(data.rbegin(), + data.rend(), + [](unsigned char c) { return !std::isspace(c); }) + .base(), + data.end()); } v->chars.insert(v->chars.end(), data.cbegin(), data.cend()); diff --git a/engine/src/io/data_parser/sql/SQLiteParser.cpp b/engine/src/io/data_parser/sql/SQLiteParser.cpp index 2e2795fd4..505a2a552 100644 --- a/engine/src/io/data_parser/sql/SQLiteParser.cpp +++ b/engine/src/io/data_parser/sql/SQLiteParser.cpp @@ -61,23 +61,23 @@ cudf::type_id parse_sqlite_column_type(std::string t) { return std::tolower(c); }); if (sqlite_is_cudf_string(t)) return cudf::type_id::STRING; - if (t == "tinyint") { return cudf::type_id::INT8; } - if (t == "smallint") { return cudf::type_id::INT8; } - if (t == "mediumint") { return cudf::type_id::INT16; } - if (t == "int") { return cudf::type_id::INT32; } - if (t == "integer") { return cudf::type_id::INT32; } - if (t == "bigint") { return cudf::type_id::INT64; } - if (t == "unsigned big int") { return cudf::type_id::UINT64; } - if (t == "int2") { return cudf::type_id::INT16; } - if (t == "int8") { return cudf::type_id::INT64; } - if (t == "real") { return cudf::type_id::FLOAT32; } - if (t == "double") { return cudf::type_id::FLOAT64; } - if (t == "double precision") { return cudf::type_id::FLOAT64; } - if (t == "float") { return cudf::type_id::FLOAT32; } - if (t == "decimal") { return cudf::type_id::FLOAT64; } - if (t == "boolean") { return cudf::type_id::UINT8; } - if (t == "date") { return cudf::type_id::TIMESTAMP_MILLISECONDS; } - if (t == "datetime") { return cudf::type_id::TIMESTAMP_MILLISECONDS; } + if (!t.rfind("tinyint", 0)) { return cudf::type_id::INT8; } + if (!t.rfind("smallint", 0)) { return cudf::type_id::INT8; } + if (!t.rfind("mediumint", 0)) { return cudf::type_id::INT16; } + if (!t.rfind("int", 0)) { return cudf::type_id::INT32; } + if (!t.rfind("integer", 0)) { return cudf::type_id::INT32; } + if (!t.rfind("bigint", 0)) { return cudf::type_id::INT64; } + if (!t.rfind("unsigned big int", 0)) { return cudf::type_id::UINT64; } + if (!t.rfind("int2", 0)) { return cudf::type_id::INT16; } + if (!t.rfind("int8", 0)) { return cudf::type_id::INT64; } + if (!t.rfind("real", 0)) { return cudf::type_id::FLOAT32; } + if (!t.rfind("double", 0)) { return cudf::type_id::FLOAT64; } + if (!t.rfind("double precision", 0)) { return cudf::type_id::FLOAT64; } + if (!t.rfind("float", 0)) { return cudf::type_id::FLOAT32; } + if (!t.rfind("decimal", 0)) { return cudf::type_id::FLOAT64; } + if (!t.rfind("boolean", 0)) { return cudf::type_id::UINT8; } + if (!t.rfind("date", 0)) { return cudf::type_id::TIMESTAMP_MILLISECONDS; } + if (!t.rfind("datetime", 0)) { return cudf::type_id::TIMESTAMP_MILLISECONDS; } } sqlite_parser::sqlite_parser() : abstractsql_parser{DataType::SQLITE} {} diff --git a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp index 752733e0c..56d1bd549 100644 --- a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp +++ b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.cpp @@ -50,7 +50,9 @@ bool abstractsql_data_provider::set_predicate_pushdown(const std::string &queryS return !this->where.empty(); } -std::string abstractsql_data_provider::build_select_query(size_t batch_index) const { +std::string abstractsql_data_provider::build_select_query( + std::size_t batch_index, + const std::string & orderBy) const { std::string cols; if (this->column_indices.empty()) { @@ -67,23 +69,28 @@ std::string abstractsql_data_provider::build_select_query(size_t batch_index) co } } - const size_t offset = this->sql.table_batch_size * (this->total_number_of_nodes * batch_index + this->self_node_idx); - std::string limit = " LIMIT " + std::to_string(this->sql.table_batch_size) + " OFFSET " + std::to_string(offset); - auto ret = "SELECT " + cols + "FROM " + this->sql.table; + std::ostringstream oss; - if(!sql.table_filter.empty() && !this->where.empty()) { - ret += " where " + sql.table_filter + " AND " + this->where; - } else { - if (sql.table_filter.empty()) { - if (!this->where.empty()) { // then the filter is from the predicate pushdown{ - ret += " where " + this->where; - } - } else { - ret += " where " + sql.table_filter; + oss << "SELECT " << cols << " FROM " << this->sql.table; + + if (sql.table_filter.empty()) { + if (!this->where + .empty()) { // then the filter is from the predicate pushdown{ + oss << " where " << this->where; } + } else { + oss << " where " << sql.table_filter; } - return ret + limit; + if (!orderBy.empty()) { oss << " order by " << orderBy; } + + const size_t offset = + this->sql.table_batch_size * + (this->total_number_of_nodes * batch_index + this->self_node_idx); + + oss << " LIMIT " << this->sql.table_batch_size << " OFFSET " << offset; + + return oss.str(); } } /* namespace io */ diff --git a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.h b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.h index 524ab1d7b..d40a4bc4c 100644 --- a/engine/src/io/data_provider/sql/AbstractSQLDataProvider.h +++ b/engine/src/io/data_provider/sql/AbstractSQLDataProvider.h @@ -59,7 +59,8 @@ class abstractsql_data_provider : public data_provider { protected: // returns SELECT ... FROM ... WHERE ... LIMIT ... OFFSET - std::string build_select_query(size_t batch_index) const; + std::string build_select_query(std::size_t batch_index, + const std::string & orderBy = "") const; protected: sql_info sql; diff --git a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp index 5ac37edb9..8ce27b06f 100644 --- a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp +++ b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.cpp @@ -14,14 +14,14 @@ namespace io { namespace { -const std::string MakePostgreSQLConnectionString(const sql_info &sql) { +const std::string MakePostgreSQLConnectionString(const sql_info & sql) { std::ostringstream os; os << "host=" << sql.host << " port=" << sql.port << " dbname=" << sql.schema << " user=" << sql.user << " password=" << sql.password; return os.str(); } -const std::string MakeQueryForColumnsInfo(const sql_info &sql) { +const std::string MakeQueryForColumnsInfo(const sql_info & sql) { std::ostringstream os; os << "select column_name, data_type, character_maximum_length" " from information_schema.tables as tables" @@ -36,11 +36,11 @@ class TableInfo { public: std::vector column_names; std::vector column_types; - std::vector column_bytes; + std::size_t row_count; }; -TableInfo ExecuteTableInfo(PGconn *connection, const sql_info &sql) { - PGresult *result = PQexec(connection, MakeQueryForColumnsInfo(sql).c_str()); +inline TableInfo ExecuteTableInfo(PGconn * connection, const sql_info & sql) { + PGresult * result = PQexec(connection, MakeQueryForColumnsInfo(sql).c_str()); if (PQresultStatus(result) != PGRES_TUPLES_OK) { PQclear(result); PQfinish(connection); @@ -63,54 +63,152 @@ TableInfo ExecuteTableInfo(PGconn *connection, const sql_info &sql) { std::string{PQgetvalue(result, i, dataTypeFn)}); // NOTE character_maximum_length is used for char or byte string type - if (PQgetisnull(result, i, characterMaximumLengthFn)) { - // TODO(recy, cristhian): check the minimum size for types - tableInfo.column_bytes.emplace_back(8); - } else { - const char *characterMaximumLengthBytes = + if (!PQgetisnull(result, i, characterMaximumLengthFn)) { + const char * characterMaximumLengthBytes = PQgetvalue(result, i, characterMaximumLengthFn); // NOTE postgresql representation of number is in network order const std::uint32_t characterMaximumLength = ntohl(*reinterpret_cast( characterMaximumLengthBytes)); - tableInfo.column_bytes.emplace_back( - static_cast(characterMaximumLength)); } } - + PQclear(result); + const std::string query = "select count(*) from " + sql.table; + result = PQexec(connection, query.c_str()); + if (PQresultStatus(result) != PGRES_TUPLES_OK) { + PQclear(result); + PQfinish(connection); + throw std::runtime_error("Error access for columns info"); + } + const char * value = PQgetvalue(result, 0, 0); + char * end; + tableInfo.row_count = std::strtoll(value, &end, 10); + PQclear(result); return tableInfo; } } // namespace -postgresql_data_provider::postgresql_data_provider(const sql_info &sql, - size_t total_number_of_nodes, - size_t self_node_idx) - : abstractsql_data_provider(sql, total_number_of_nodes, self_node_idx), table_fetch_completed{false}, - batch_position{0}, estimated_table_row_count{0} { +static inline std::string FindKeyName(PGconn * connection, + const sql_info & sql) { + // This function exists because when we get batches from table we use LIMIT + // clause and since postgresql returns unpredictable subsets of query's rows, + // we apply a group by a column in order to keep some order for result query + // see https://www.postgresql.org/docs/13/queries-limit.html + std::ostringstream oss; + oss << "select column_name, ordinal_position" + " from information_schema.table_constraints tc" + " join information_schema.key_column_usage kcu" + " on tc.constraint_name = kcu.constraint_name" + " and tc.constraint_schema = kcu.constraint_schema" + " and tc.constraint_name = kcu.constraint_name" + " where tc.table_catalog = '" + << sql.schema << "' and tc.table_name = '" << sql.table + << "' and constraint_type = 'PRIMARY KEY'"; + const std::string query = oss.str(); + PGresult * result = PQexec(connection, query.c_str()); + if (PQresultStatus(result) != PGRES_TUPLES_OK) { + PQclear(result); + PQfinish(connection); + throw std::runtime_error("Error access for columns info"); + } + + if (PQntuples(result)) { + int columnNameFn = PQfnumber(result, "column_name"); + const std::string columnName{PQgetvalue(result, 0, columnNameFn)}; + PQclear(result); + if (columnName.empty()) { + throw std::runtime_error("No column name into result for primary key"); + } else { + return columnName; + } + } else { + // here table doesn't have a primary key, so we choose a column by type + // the primitive types like int or float have priority over other types + PQclear(result); + std::ostringstream oss; + oss << "select column_name, oid, case" + " when typname like 'int_' then 1" + " when typname like 'float_' then 2" + " else 99 end as typorder" + " from information_schema.tables as tables" + " join information_schema.columns as columns" + " on tables.table_name = columns.table_name" + " join pg_type on udt_name = typname where tables.table_catalog = '" + << sql.schema << "' and tables.table_name = '" << sql.table + << "' order by typorder, typlen desc, oid"; + + const std::string query = oss.str(); + PGresult * result = PQexec(connection, query.c_str()); + if (PQresultStatus(result) != PGRES_TUPLES_OK) { + PQclear(result); + PQfinish(connection); + throw std::runtime_error("Error access for columns info"); + } + + if (PQntuples(result)) { + int columnNameFn = PQfnumber(result, "column_name"); + const std::string columnName{PQgetvalue(result, 0, columnNameFn)}; + PQclear(result); + if (columnName.empty()) { + throw std::runtime_error("No column name into result for column type"); + } else { + return columnName; + } + } + PQclear(result); + } + throw std::runtime_error("There is no a key name candidate"); +} + +static inline bool IsThereNext(PGconn * connection, const std::string & query) { + std::ostringstream oss; + oss << "select count(*) from (" << query << ") as t"; + const std::string count = oss.str(); + PGresult * result = PQexec(connection, count.c_str()); + if (PQresultStatus(result) != PGRES_TUPLES_OK) { + PQclear(result); + PQfinish(connection); + throw std::runtime_error("Count query batch"); + } + + const char * data = PQgetvalue(result, 0, 0); + char * end; + const std::size_t value = + static_cast(std::strtoll(data, &end, 10)); + PQclear(result); + + return value == 0; +} + +postgresql_data_provider::postgresql_data_provider( + const sql_info & sql, + std::size_t total_number_of_nodes, + std::size_t self_node_idx) + : abstractsql_data_provider(sql, total_number_of_nodes, self_node_idx), + table_fetch_completed{false}, batch_position{0}, + estimated_table_row_count{0} { connection = PQconnectdb(MakePostgreSQLConnectionString(sql).c_str()); if (PQstatus(connection) != CONNECTION_OK) { - std::cerr << "Connection to database failed: " << PQerrorMessage(connection) - << std::endl; throw std::runtime_error("Connection to database failed: " + std::string{PQerrorMessage(connection)}); } - std::cout << "PostgreSQL version: " << PQserverVersion(connection) - << std::endl; - TableInfo tableInfo = ExecuteTableInfo(connection, sql); - column_names = tableInfo.column_names; column_types = tableInfo.column_types; + estimated_table_row_count = tableInfo.row_count; + keyname = FindKeyName(connection, sql); } postgresql_data_provider::~postgresql_data_provider() { PQfinish(connection); } std::shared_ptr postgresql_data_provider::clone() { return std::static_pointer_cast( - std::make_shared(sql, this->total_number_of_nodes, this->self_node_idx)); + std::make_shared(sql, + this->total_number_of_nodes, + this->self_node_idx)); } bool postgresql_data_provider::has_next() { @@ -129,23 +227,33 @@ data_handle postgresql_data_provider::get_next(bool open_file) { handle.sql_handle.column_names = column_names; handle.sql_handle.column_types = column_types; - if (!open_file) { return handle; } - - const std::string query = this->build_select_query(this->batch_position); + if (open_file == false) { return handle; } - batch_position += sql.table_batch_size; - PGresult *result = PQexecParams( - connection, query.c_str(), 0, nullptr, nullptr, nullptr, nullptr, 1); + std::ostringstream oss; + oss << build_select_query(batch_position, keyname); + const std::string query = oss.str(); + batch_position++; + PGresult * result = PQexec(connection, query.c_str()); if (PQresultStatus(result) != PGRES_TUPLES_OK) { PQclear(result); PQfinish(connection); throw std::runtime_error("Error getting next batch from postgresql"); } + PQflush(connection); int resultNtuples = PQntuples(result); + { + std::ostringstream oss; + oss << "QUERY: " << query << std::endl + << "COUNT: " << resultNtuples << std::endl; + std::cout << oss.str(); + } - if (!resultNtuples) { table_fetch_completed = true; } + if (!resultNtuples || + IsThereNext(connection, build_select_query(batch_position, keyname))) { + table_fetch_completed = true; + } handle.sql_handle.postgresql_result.reset(result, PQclear); handle.sql_handle.row_count = PQntuples(result); diff --git a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h index 8316cf868..277e2936c 100644 --- a/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h +++ b/engine/src/io/data_provider/sql/PostgreSQLDataProvider.h @@ -1,6 +1,7 @@ /* - * Copyright 2021 Percy Camilo TriveƱo Aucahuasi - * Copyright 2021 Cristhian Alberto Gonzales Castillo + * Copyright 2021 BlazingDB, Inc. + * Copyright 2021 Cristhian Alberto Gonzales Castillo + * */ #ifndef POSTGRESQLDATAPROVIDER_H_ @@ -15,9 +16,9 @@ namespace io { class postgresql_data_provider : public abstractsql_data_provider { public: - postgresql_data_provider(const sql_info &sql, - size_t total_number_of_nodes, - size_t self_node_idx); + postgresql_data_provider(const sql_info & sql, + std::size_t total_number_of_nodes, + std::size_t self_node_idx); virtual ~postgresql_data_provider(); @@ -36,10 +37,11 @@ class postgresql_data_provider : public abstractsql_data_provider { std::unique_ptr get_predicate_transformer() const override { return nullptr; } private: - PGconn *connection; + PGconn * connection; bool table_fetch_completed; std::size_t batch_position; std::size_t estimated_table_row_count; + std::string keyname; }; } /* namespace io */ diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp index 59737c669..339bb07fa 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.cpp @@ -11,6 +11,8 @@ - sql::SQLException (derived from std::runtime_error) */ +#include + #include "SQLiteDataProvider.h" #include "blazingdb/io/Util/StringUtil.h" @@ -27,14 +29,12 @@ struct sqlite_table_info { struct sqlite_columns_info { std::vector columns; std::vector types; - std::vector bytes; }; struct callb { - int sqlite_callback( - void * NotUsed, int argc, char ** argv, char ** azColName) { + int sqlite_callback(void *, int argc, char ** argv, char ** azColName) { int i; - for(i = 0; i < argc; i++) { + for (i = 0; i < argc; i++) { printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL"); } printf("\n"); @@ -42,32 +42,31 @@ struct callb { } }; -std::shared_ptr execute_sqlite_query( - sqlite3 * conn, const std::string & query) { +static inline std::shared_ptr +execute_sqlite_query(sqlite3 * db, const std::string & query) { sqlite3_stmt * stmt; - const char * sql = query.c_str(); - int rc = sqlite3_prepare_v2(conn, sql, -1, &stmt, NULL); - if(rc != SQLITE_OK) { - printf("error: %s", sqlite3_errmsg(conn)); - // TODO percy error + + int errorCode = sqlite3_prepare_v2(db, query.c_str(), -1, &stmt, nullptr); + if (errorCode != SQLITE_OK) { + std::ostringstream oss; + oss << "Executing SQLite query provider: " << std::endl + << "query: " << query << std::endl + << "error message: " << sqlite3_errmsg(db); + throw std::runtime_error{oss.str()}; } - auto sqlite_deleter = [](sqlite3_stmt * pointer) { - std::cout << "sqlite smt deleted!!!!\n"; - sqlite3_finalize(pointer); - }; - std::shared_ptr ret(stmt, sqlite_deleter); - return ret; + + return std::shared_ptr{stmt, sqlite3_finalize}; } -sqlite_table_info get_sqlite_table_info( - sqlite3 * db, const std::string & table) { +static inline sqlite_table_info +get_sqlite_table_info(sqlite3 * db, const std::string & table) { sqlite_table_info ret; const std::string sql{"select count(*) from " + table}; int err = sqlite3_exec( db, sql.c_str(), [](void * data, int count, char ** rows, char **) -> int { - if(count == 1 && rows) { + if (count == 1 && rows) { sqlite_table_info & ret = *static_cast(data); ret.partitions.push_back("default"); // check for partitions api ret.rows = static_cast(atoi(rows[0])); @@ -77,138 +76,158 @@ sqlite_table_info get_sqlite_table_info( }, &ret, nullptr); - if(err != SQLITE_OK) { throw std::runtime_error("getting number of rows"); } - return ret; -} - -// TODO percy avoid code duplication -bool sqlite_is_string_col_type(const std::string & t) { - std::vector mysql_string_types_hints = { - "character", - "varchar", - "varying character", - "nchar", - "native character", - "nvarchar", - "text", - "clob", - "string" // TODO percy ??? - }; - - for(auto hint : mysql_string_types_hints) { - if(StringUtil::beginsWith(t, hint)) return true; + if (err != SQLITE_OK) { + throw std::runtime_error{std::string{"getting number of rows"} + + sqlite3_errmsg(db)}; } - - return false; + return ret; } -sqlite_columns_info get_sqlite_columns_info( - sqlite3 * conn, const std::string & table) { - // TODO percy error handling - +static inline sqlite_columns_info +get_sqlite_columns_info(sqlite3 * db, const std::string & table) { sqlite_columns_info ret; std::string query = "PRAGMA table_info(" + table + ")"; - auto A = execute_sqlite_query(conn, query); - sqlite3_stmt * stmt = A.get(); + auto stmt_ptr = execute_sqlite_query(db, query); + sqlite3_stmt * stmt = stmt_ptr.get(); - int rc = 0; - while((rc = sqlite3_step(stmt)) == SQLITE_ROW) { + int rc = SQLITE_ERROR; + while ((rc = sqlite3_step(stmt)) == SQLITE_ROW) { const unsigned char * name = sqlite3_column_text(stmt, 1); - std::string col_name((char *) name); + std::string col_name(reinterpret_cast(name)); ret.columns.push_back(col_name); const unsigned char * type = sqlite3_column_text(stmt, 2); - std::string col_type((char *) type); + std::string col_type(reinterpret_cast(type)); - std::transform(col_type.cbegin(), + std::transform( + col_type.cbegin(), col_type.cend(), col_type.begin(), [](const std::string::value_type c) { return std::tolower(c); }); - size_t max_bytes = 8; // TODO percy check max scalar bytes from sqlite - if(sqlite_is_string_col_type(col_type)) { - // max_bytes = res->getUInt64("CHARACTER_MAXIMUM_LENGTH"); - // TODO percy see how to get the max size for string/txt cols ... - // see docs - max_bytes = 256; - } ret.types.push_back(col_type); } - if(rc != SQLITE_DONE) { - printf("error: %s", sqlite3_errmsg(conn)); - // TODO percy error + + if (rc != SQLITE_DONE) { + std::ostringstream oss; + oss << "Getting SQLite columns info: " << std::endl + << "query: " << query << std::endl + << "error message: " << sqlite3_errmsg(db); + throw std::runtime_error{oss.str()}; } return ret; } -sqlite_data_provider::sqlite_data_provider( - const sql_info & sql, size_t total_number_of_nodes, size_t self_node_idx) - : abstractsql_data_provider(sql, total_number_of_nodes, self_node_idx), - sqlite_connection(nullptr), batch_position(0), current_row_count(0) { - sqlite3 * conn = nullptr; - int rc = sqlite3_open(sql.schema.c_str(), &conn); - - if(rc) { - fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(conn)); - // TODO percy error - } else { - fprintf(stdout, "Opened sqlite database successfully\n"); +sqlite_data_provider::sqlite_data_provider(const sql_info & sql, + size_t total_number_of_nodes, + size_t self_node_idx) + : abstractsql_data_provider{sql, total_number_of_nodes, self_node_idx}, + db{nullptr}, batch_position{0} { + int errorCode = sqlite3_open(sql.schema.c_str(), &db); + + if (errorCode != SQLITE_OK) { + throw std::runtime_error(std::string{"Can't open database: "} + + sqlite3_errmsg(db)); } - this->sqlite_connection = conn; - sqlite_table_info tbl_info = get_sqlite_table_info(conn, this->sql.table); - this->partitions = std::move(tbl_info.partitions); - this->row_count = tbl_info.rows; - sqlite_columns_info cols_info = - get_sqlite_columns_info(conn, this->sql.table); - this->column_names = cols_info.columns; - this->column_types = cols_info.types; -} + sqlite_table_info tbl_info = get_sqlite_table_info(db, sql.table); + partitions = std::move(tbl_info.partitions); + row_count = tbl_info.rows; -sqlite_data_provider::~sqlite_data_provider() { - sqlite3_close(this->sqlite_connection); + sqlite_columns_info cols_info = get_sqlite_columns_info(db, sql.table); + column_names = cols_info.columns; + column_types = cols_info.types; } +sqlite_data_provider::~sqlite_data_provider() { sqlite3_close(db); } + std::shared_ptr sqlite_data_provider::clone() { - return std::make_shared( - this->sql, this->total_number_of_nodes, this->self_node_idx); + return std::make_shared(sql, + total_number_of_nodes, + self_node_idx); } bool sqlite_data_provider::has_next() { - return this->current_row_count < row_count; + // We need this implementation here becuase SQLite doesn't have a method to + // get the length of rows into a sqlite3_statement + const std::size_t offset = + sql.table_batch_size * + (batch_position * total_number_of_nodes + self_node_idx); + std::ostringstream oss; + oss << "SELECT * FROM " << sql.table << " LIMIT 1 OFFSET " << offset; + const std::string query = oss.str(); + bool it_has = false; + int errorCode = sqlite3_exec( + db, + query.c_str(), + [](void * data, int count, char ** rows, char **) -> int { + *static_cast(data) = count > 0 && rows; + return 0; + }, + &it_has, + nullptr); + if (errorCode != SQLITE_OK) { + throw std::runtime_error{std::string{"Has next SQLite batch: "} + + sqlite3_errmsg(db)}; + } + return it_has; } -void sqlite_data_provider::reset() { this->batch_position = 0; } +void sqlite_data_provider::reset() { batch_position = 0; } + +static inline std::size_t get_size_for_statement(sqlite3_stmt * stmt) { + std::ostringstream oss; + oss << "select count(*) from (" << sqlite3_expanded_sql(stmt) << ')' + << std::endl; + std::string query = oss.str(); -data_handle sqlite_data_provider::get_next(bool) { - std::string query = this->build_select_query(this->batch_position); - this->batch_position += this->sql.table_batch_size; + std::size_t nRows = 0; + const std::int32_t errorCode = sqlite3_exec( + sqlite3_db_handle(stmt), + query.c_str(), + [](void * data, int count, char ** rows, char **) -> int { + if (count == 1 && rows) { + *static_cast(data) = + static_cast(std::atoi(rows[0])); + return 0; + } + return 1; + }, + &nRows, + nullptr); + if (errorCode != SQLITE_OK) { + throw std::runtime_error{std::string{"Has next SQLite batch: "} + + sqlite3_errstr(errorCode)}; + } + return nRows; +} - std::cout << "query: " << query << "\n"; - auto stmt = execute_sqlite_query(this->sqlite_connection, query); - current_row_count += batch_position; +data_handle sqlite_data_provider::get_next(bool open_file) { data_handle ret; - ret.sql_handle.table = this->sql.table; - ret.sql_handle.column_names = this->column_names; - ret.sql_handle.column_types = this->column_types; - ret.sql_handle.row_count = row_count; + + ret.sql_handle.table = sql.table; + ret.sql_handle.column_names = column_names; + ret.sql_handle.column_types = column_types; + + if (open_file == false) { return ret; } + + const std::string query = build_select_query(batch_position); + batch_position++; + + std::shared_ptr stmt = execute_sqlite_query(db, query); + + ret.sql_handle.row_count = get_size_for_statement(stmt.get()); ret.sql_handle.sqlite_statement = stmt; + // TODO percy add columns to uri.query - ret.uri = Uri("sqlite", "", this->sql.schema + "/" + this->sql.table, "", ""); - // std::cout << "get_next TOTAL rows: " << this->row_count << "\n"; - // std::cout << "get_next current_row_count: " << this->current_row_count - // << "\n"; + ret.uri = Uri("sqlite", "", sql.schema + "/" + sql.table, "", ""); return ret; } size_t sqlite_data_provider::get_num_handles() { - if(this->partitions.empty()) { - size_t ret = this->row_count / this->sql.table_batch_size; - return ret == 0 ? 1 : ret; - } - - return this->partitions.size(); + std::size_t ret = row_count / sql.table_batch_size; + return ret == 0 ? 1 : ret; } } /* namespace io */ diff --git a/engine/src/io/data_provider/sql/SQLiteDataProvider.h b/engine/src/io/data_provider/sql/SQLiteDataProvider.h index 84c02aa09..b2568b9c6 100644 --- a/engine/src/io/data_provider/sql/SQLiteDataProvider.h +++ b/engine/src/io/data_provider/sql/SQLiteDataProvider.h @@ -20,45 +20,45 @@ namespace io { */ class sqlite_data_provider : public abstractsql_data_provider { public: - sqlite_data_provider(const sql_info &sql, size_t total_number_of_nodes, - size_t self_node_idx); + sqlite_data_provider(const sql_info & sql, + std::size_t total_number_of_nodes, + std::size_t self_node_idx); virtual ~sqlite_data_provider(); - std::shared_ptr clone() override; + std::shared_ptr clone() override; /** - * tells us if this provider can generate more sql resultsets - */ - bool has_next() override; + * tells us if this provider can generate more sql resultsets + */ + bool has_next() override; /** - * Resets file read count to 0 for file based DataProvider - */ - void reset() override; + * Resets file read count to 0 for file based DataProvider + */ + void reset() override; /** - * gets us the next arrow::io::RandomAccessFile - * if open_file is false will not run te query and just returns a data_handle - * with columns info - */ - data_handle get_next(bool = true) override; + * gets us the next arrow::io::RandomAccessFile + * if open_file is false will not run te query and just returns a data_handle + * with columns info + */ + data_handle get_next(bool = true) override; /** - * Get the number of data_handles that will be provided. - */ - size_t get_num_handles() override; + * Get the number of data_handles that will be provided. + */ + size_t get_num_handles() override; protected: // TODO percy c.gonzales std::unique_ptr get_predicate_transformer() const override { return nullptr; } private: - sqlite3* sqlite_connection; + sqlite3 * db; std::vector partitions; - size_t row_count; - size_t batch_position; - size_t current_row_count; + std::size_t row_count; + std::size_t batch_position; }; } /* namespace io */ diff --git a/tests/BlazingSQLTest/DataBase/sqliteSchema.py b/tests/BlazingSQLTest/DataBase/sqliteSchema.py index 653a71552..ab59c1b7d 100644 --- a/tests/BlazingSQLTest/DataBase/sqliteSchema.py +++ b/tests/BlazingSQLTest/DataBase/sqliteSchema.py @@ -99,10 +99,12 @@ def sqlite_load_data_in_file(table: str, with open(psvpath) as psv: reader = csv.reader(psv, delimiter='|') row = next(reader) + row = [c if c.lower() != 'null' else None for c in row] nfields = ','.join('?' * len(row)) query = f'insert into {table} values ({nfields})' cursor.execute(query, row) for row in reader: + row = [c if c.lower() != 'null' else None for c in row] cursor.execute(query, row) connection.commit() diff --git a/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py b/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py index 4c33360d8..bf5a18148 100644 --- a/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py +++ b/tests/BlazingSQLTest/EndToEndTests/tablesFromSQL.py @@ -56,10 +56,9 @@ def define_samples(sample_list: [Sample], table_mapper = None): ], tpchQueries.map_tables) data_types = [ - DataType.MYSQL, - # TODO percy c.gonzales + #DataType.MYSQL, #DataType.POSTGRESQL, - #DataType.SQLITE, + DataType.SQLITE ] tables = [ @@ -203,6 +202,11 @@ def setup_test(data_type: DataType) -> createSchema.sql_connection: sqliteSchema.create_and_load_tpch_schema(sql) return sql + if data_type is DataType.POSTGRESQL: + from DataBase import postgreSQLSchema + postgreSQLSchema.create_and_load_tpch_schema(sql) + return sql + def executionTest(dask_client, drill, spark, dir_data_lc, bc, nRals, sql): extra_args = {