Skip to content

Commit

Permalink
fix!: to_gbq loads unit8 columns to BigQuery INT64 instead of STR…
Browse files Browse the repository at this point in the history
…ING (#814)

* fix!: `to_gbq` loads `unit8` columns to BigQuery INT64 instead of STRING

fix!: `to_gbq` loads naive (no timezone) columns to BigQuery DATETIME instead of TIMESTAMP (#814)
fix!: `to_gbq` loads object column containing bool values to BOOLEAN instead of STRING (#814)
fix!: `to_gbq` loads object column containing dictionary values to STRUCT instead of STRING (#814)
deps: min pyarrow is now 4.0.0 to support compliant nested types (#814)
Release-As: 0.24.0
  • Loading branch information
tswast authored Sep 23, 2024
1 parent 8a4389b commit 107bb40
Show file tree
Hide file tree
Showing 18 changed files with 997 additions and 76 deletions.
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
UNIT_TEST_EXTRAS = [
"bqstorage",
"tqdm",
"geopandas",
]
UNIT_TEST_EXTRAS_BY_PYTHON = {
"3.9": [],
Expand Down
2 changes: 1 addition & 1 deletion owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
# Use a middle version of Python to test when no extras are installed.
"3.9": []
}
extras = ["tqdm"]
extras = ["tqdm", "geopandas"]
templated_files = common.py_library(
unit_test_python_versions=["3.8", "3.9", "3.10", "3.11", "3.12"],
system_test_python_versions=["3.8", "3.9", "3.10", "3.11", "3.12"],
Expand Down
3 changes: 3 additions & 0 deletions pandas_gbq/core/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Copyright (c) 2024 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.
70 changes: 70 additions & 0 deletions pandas_gbq/core/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright (c) 2019 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import itertools

import pandas


def list_columns_and_indexes(dataframe, index=True):
"""Return all index and column names with dtypes.
Returns:
Sequence[Tuple[str, dtype]]:
Returns a sorted list of indexes and column names with
corresponding dtypes. If an index is missing a name or has the
same name as a column, the index is omitted.
"""
column_names = frozenset(dataframe.columns)
columns_and_indexes = []
if index:
if isinstance(dataframe.index, pandas.MultiIndex):
for name in dataframe.index.names:
if name and name not in column_names:
values = dataframe.index.get_level_values(name)
columns_and_indexes.append((name, values.dtype))
else:
if dataframe.index.name and dataframe.index.name not in column_names:
columns_and_indexes.append(
(dataframe.index.name, dataframe.index.dtype)
)

columns_and_indexes += zip(dataframe.columns, dataframe.dtypes)
return columns_and_indexes


def first_valid(series):
first_valid_index = series.first_valid_index()
if first_valid_index is not None:
return series.at[first_valid_index]


def first_array_valid(series):
"""Return the first "meaningful" element from the array series.
Here, "meaningful" means the first non-None element in one of the arrays that can
be used for type detextion.
"""
first_valid_index = series.first_valid_index()
if first_valid_index is None:
return None

valid_array = series.at[first_valid_index]
valid_item = next((item for item in valid_array if not pandas.isna(item)), None)

if valid_item is not None:
return valid_item

# Valid item is None because all items in the "valid" array are invalid. Try
# to find a true valid array manually.
for array in itertools.islice(series, first_valid_index + 1, None):
try:
array_iter = iter(array)
except TypeError:
continue # Not an array, apparently, e.g. None, thus skip.
valid_item = next((item for item in array_iter if not pandas.isna(item)), None)
if valid_item is not None:
break

return valid_item
12 changes: 10 additions & 2 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pandas_gbq.features import FEATURES
import pandas_gbq.query
import pandas_gbq.schema
import pandas_gbq.schema.pandas_to_bigquery
import pandas_gbq.timestamp

try:
Expand Down Expand Up @@ -1219,9 +1220,16 @@ def _generate_bq_schema(df, default_type="STRING"):
be overridden: https://github.com/pydata/pandas-gbq/issues/218, this
method can be removed after there is time to migrate away from this
method."""
from pandas_gbq import schema
fields = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(
df,
default_type=default_type,
)
fields_json = []

for field in fields:
fields_json.append(field.to_api_repr())

return schema.generate_bq_schema(df, default_type=default_type)
return {"fields": fields_json}


class _Table(GbqConnector):
Expand Down
10 changes: 5 additions & 5 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

from pandas_gbq import exceptions
import pandas_gbq.schema
import pandas_gbq.schema.bigquery
import pandas_gbq.schema.pandas_to_bigquery


def encode_chunk(dataframe):
Expand Down Expand Up @@ -214,11 +216,9 @@ def load_csv_from_file(
This method is needed for writing with google-cloud-bigquery versions that
don't implment load_table_from_dataframe with the CSV serialization format.
"""
if schema is None:
schema = pandas_gbq.schema.generate_bq_schema(dataframe)

schema = pandas_gbq.schema.remove_policy_tags(schema)
bq_schema = pandas_gbq.schema.to_google_cloud_bigquery(schema)
bq_schema = pandas_gbq.schema.pandas_to_bigquery.dataframe_to_bigquery_fields(
dataframe, schema
)

def load_chunk(chunk, job_config):
try:
Expand Down
31 changes: 0 additions & 31 deletions pandas_gbq/schema.py → pandas_gbq/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,37 +92,6 @@ def schema_is_subset(schema_remote, schema_local):
return all(field in fields_remote for field in fields_local)


def generate_bq_schema(dataframe, default_type="STRING"):
"""Given a passed dataframe, generate the associated Google BigQuery schema.
Arguments:
dataframe (pandas.DataFrame): D
default_type : string
The default big query type in case the type of the column
does not exist in the schema.
"""

# If you update this mapping, also update the table at
# `docs/source/writing.rst`.
type_mapping = {
"i": "INTEGER",
"b": "BOOLEAN",
"f": "FLOAT",
"O": "STRING",
"S": "STRING",
"U": "STRING",
"M": "TIMESTAMP",
}

fields = []
for column_name, dtype in dataframe.dtypes.items():
fields.append(
{"name": column_name, "type": type_mapping.get(dtype.kind, default_type)}
)

return {"fields": fields}


def update_schema(schema_old, schema_new):
"""
Given an old BigQuery schema, update it with a new one.
Expand Down
44 changes: 44 additions & 0 deletions pandas_gbq/schema/bigquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2019 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import collections

import google.cloud.bigquery


def to_schema_fields(schema):
"""Coerce `schema` to a list of schema field instances.
Args:
schema(Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Table schema to convert. If some items are passed as mappings,
their content must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
Returns:
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`]
Raises:
Exception: If ``schema`` is not a sequence, or if any item in the
sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField`
instance or a compatible mapping representation of the field.
"""
for field in schema:
if not isinstance(
field, (google.cloud.bigquery.SchemaField, collections.abc.Mapping)
):
raise ValueError(
"Schema items must either be fields or compatible "
"mapping representations."
)

return [
field
if isinstance(field, google.cloud.bigquery.SchemaField)
else google.cloud.bigquery.SchemaField.from_api_repr(field)
for field in schema
]
Loading

0 comments on commit 107bb40

Please sign in to comment.