Skip to content

Commit

Permalink
Refactor import script deduplication logic
Browse files Browse the repository at this point in the history
Previously this was attempting to deduplicate admin2 records that were
reported twice (sometimes with slight variations on the county name).

However, there are also duplicate records that are province_state level,
for example on the 22nd March the report contains "District of Columbia"
twice:
https://github.com/CSSEGISandData/COVID-19/blob/master/csse_covid_19_data/csse_covid_19_daily_reports/03-22-2020.csv

This commit reuses the new matching code to decide what is a duplicate and what isn't.

It deduplicates cases like this where the numbers are sometimes the same, sometimes
zeroed.

It also refactors the code so that we keep track of all these data quality issues as we go
and summarise them after each import.
  • Loading branch information
MatMoore committed Apr 18, 2020
1 parent 90c248d commit 9006e24
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 77 deletions.
249 changes: 178 additions & 71 deletions covidapi/import_data_jh.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from datetime import datetime, date, timedelta
from requests import Session
from requests.exceptions import HTTPError
from collections import defaultdict
from collections import defaultdict, Counter
from typing import Optional
import csv

Expand All @@ -24,6 +24,81 @@ def parse_datetime(date_str):
return datetime.strptime(date_str, r'%m/%d/%y %H:%M')


class ImportResult:
def __init__(self):
self._unmatched_locations = Counter()
self._resolved_duplicate_locations = Counter()
self._unresolved_duplicate_locations = Counter()
self._matched_records = defaultdict(list)
self._ignored_regions = Counter()
self._unexpected_decreases = []
self._errors = []

def record_error(self, message):
self._errors.append(message)

def record_matched_record(self, jh_id, record):
"""
Keep track of records that we do match to the lookup table.
This is used to detect duplicates.
"""
self._matched_records[jh_id].append(record)

def record_unmatched_location(self, region_names):
"""
Record a location that we couldn't match to the lookup table
"""
self._unmatched_locations[region_names] += 1

def record_ignored_location(self, region_names):
"""
Record a location that we're ignoring, for example city-level information, or subgroups
within a population that have returned from a cruise ship
"""
self._ignored_regions[region_names] += 1

def record_resolved_duplicate(self, jh_id):
"""
Record a duplicate record in the same report that we have been able to de-duplicate
"""
self._resolved_duplicate_locations[jh_id] += 1

def record_unexpected_decrease(self, record, confirmed, deaths, recovered):
"""
This happens when we receive two reports with the same timestamp, but
different numbers. This happens when revised estimates are reported on different
days, but the last_updated timestamp has not been changed.
If this happens, the script always records the higher estimates. This
ensures we always store 1 report per timestamp, and the script can be rerun
without changing the result.
"""
self._unexpected_decreases.append((record, confirmed, deaths, recovered))

def duplicate_records(self):
return [records for records in self._matched_records.values() if len(records) > 1]

def info(self):
info_list = [
f'Number of processed locations: {len(self._matched_records)}',
f'Number of duplicate locations: {len(self.duplicate_records())}',
f'Number of ignored locations: {len(self._ignored_regions)}', ]

for location, location_count in self._resolved_duplicate_locations.items():
info_list.append(f'Resolved {location_count} duplicate records for {location}')

for location in self._unmatched_locations:
info_list.append(f'No match found for {location}')

for (record, confirmed, deaths, recovered) in self._unexpected_decreases:
info_list.append(f'Timestamp has been reused for {record!r} (conflicting report: confirmed={confirmed}, deaths={deaths}, recovered={recovered})')

return info_list

def errors(self):
return self._errors


class ReportFetcher:
"""
Fetch the raw data from Github
Expand All @@ -47,23 +122,6 @@ def fetch_report(self, report_date):
return records


def deduplicate(db_instance, to_deduplicate):
"""
Try to remove suspected duplicate reports from the session
"""
for duplicates in to_deduplicate.values():
duplicates.sort(key=lambda dr: dr.confirmed, reverse=True)
first = duplicates[0]
for other in duplicates[1:]:
print(f'Deduplicating:\n\t- {first!r}\n\t- {other!r}')

if other.confirmed == 0 and other.deaths == 0 and other.recovered == 0:
print('\t-> Ignoring duplicate with zero cases, deaths and recovered')
db_instance.expunge(other)
else:
raise Exception('Duplicate records have different numbers. Giving up.')


def get_daily_report_by_region_and_date(
db: SessionLocal, country_region: str,
province_state: Optional[str],
Expand Down Expand Up @@ -135,44 +193,95 @@ def clean_optional_field(original: Optional[str]) -> Optional[str]:
return original if original else None


def sanity_check(db_instance):
"""
Run some checks on the database before we commit
"""
if db_instance.execute('''
select 1
from jh_daily_reports
where fips is not null
group by fips, last_update
having count(*) > 1
''').fetchone():
db_instance.rollback()
raise Exception(f'Found records with the same FIPS and last_update')

if db_instance.execute('''
select 1
from jh_daily_reports
where fips is null
group by country_region, province_state, admin2, last_update
having count(*) > 1
''').fetchone():
db_instance.rollback()
raise Exception(f'Found records with the same country_region, province_state, admin2, last_update and no FIPS')


def import_daily_report(report, matcher):
db_instance = SessionLocal()
to_deduplicate = defaultdict(list)

for row in report:
class Importer:
def __init__(self, matcher):
self.db_instance = SessionLocal()
self.matcher = matcher

def _deduplicate(self, result):
"""
Try to remove suspected duplicate reports from the SQLAlchemy session
"""
for duplicates in result.duplicate_records():
duplicates.sort(key=lambda dr: (dr.confirmed, dr.deaths, dr.recovered), reverse=True)
first = duplicates[0]

for other in duplicates[1:]:
pairs = (
(first.confirmed, other.confirmed),
(first.deaths, other.deaths),
(first.recovered, other.recovered)
)
if all(lower == 0 or (higher == lower) for (higher, lower) in pairs):
result.record_resolved_duplicate(other.jh_id)
self.db_instance.expunge(other)
else:
result.record_error(f'Unable to deduplicate {first}: other record has confirmed={other.confirmed}, deaths={other.deaths}, recovered={other.recovered}')

def _sanity_check(self, result):
"""
Run some checks on the database before we commit
"""
row = self.db_instance.execute('''
select 1
from jh_daily_reports
where fips is not null
group by fips, last_update
having count(*) > 1
''').fetchone()
if row:
result.record_error(f'Found records with the same FIPS and last_update: {row}')

row = self.db_instance.execute('''
select 1
from jh_daily_reports
where jh_id is not null
group by jh_id, last_update
having count(*) > 1
''').fetchone()
if row:
result.record_error(f'Found records with the same jh_id and last_update: {row}')

row = self.db_instance.execute('''
select country_region, province_state, admin2, last_update
from jh_daily_reports
where fips is null and jh_id is null
group by country_region, province_state, admin2, last_update
having count(*) > 1
''').fetchone()
if row:
result.record_error(f'Found records with the same country_region, province_state, admin2, last_update and no FIPS: {row}')

def import_daily_report(self, report):
result = ImportResult()

for row in report:
self._import_row(row, result)

self._deduplicate(result)
self.db_instance.flush()
self._sanity_check(result)

errors = result.errors()
if errors:
self.db_instance.rollback()
for error in errors:
print(error)
raise Exception('Unrecoverable errors while importing data.')

self.db_instance.commit()

return result

def _import_row(self, row, result):
last_update_str = row.get('Last Update') or row['Last_Update']
last_update = parse_datetime(last_update_str)

# Region identifiers
country_region = row.get('Country_Region') or row['Country/Region']
province_state = row.get('Province_State') or row.get('Province/State')
admin2 = row.get('Admin2')
fips = row.get('fips')
fips = row.get('FIPS')

province_state = clean_optional_field(province_state)
fips = clean_optional_field(fips)
Expand All @@ -181,11 +290,17 @@ def import_daily_report(report, matcher):
region_names = RegionNames.parse_from_report(row)

try:
region_info = matcher.match_region(region_names)
region_info = self.matcher.match_region(region_names)

if region_info is None:
result.record_ignored_location(region_names)
jh_id = None
else:
jh_id = region_info.identified_region.uid
except KeyError:
print('No match')
print(row)
result.record_unmatched_location(region_names)
region_info = None
jh_id = None

# Measures
confirmed = int(row['Confirmed'])
Expand All @@ -194,7 +309,7 @@ def import_daily_report(report, matcher):

try:
dr = get_daily_report_by_region_and_date(
db=db_instance,
db=self.db_instance,
province_state=province_state,
country_region=country_region,
admin2=admin2,
Expand All @@ -211,35 +326,24 @@ def import_daily_report(report, matcher):
confirmed=confirmed,
deaths=deaths,
recovered=recovered,
jh_id=region_info.identified_region.uid if region_info else None
jh_id=jh_id
)
except Exception:
print(f'Invalid row: {row!r}')
raise
else:
if dr.confirmed > confirmed or dr.deaths > deaths or dr.recovered > recovered:
print(f'Warning: NOT lowering estimates for {dr!r}')
print(f'New data: confirmed={confirmed}, deaths={deaths}, recovered={recovered}')
continue
result.record_unexpected_decrease(dr, confirmed, deaths, recovered)
return
else:
dr.confirmed = confirmed
dr.deaths = deaths
dr.recovered = recovered

db_instance.add(dr)

# Track potentially duplicated admin2s
if dr.admin2 in DUPLICATE_ADMIN2.values():
to_deduplicate[(dr.country_region, dr.province_state, dr.admin2)].append(dr)

try:
deduplicate(db_instance, to_deduplicate)
except Exception:
print(f'Cannot deduplicate')
if jh_id is not None:
result.record_matched_record(jh_id, dr)

db_instance.flush()
sanity_check(db_instance)
db_instance.commit()
self.db_instance.add(dr)


def main(args):
Expand All @@ -257,13 +361,16 @@ def main(args):

report_fetcher = ReportFetcher()
matcher = Matcher()
importer = Importer(matcher)

while current <= today:
print(f'Importing data for {current}')

try:
report = report_fetcher.fetch_report(current)
import_daily_report(report, matcher)
result = importer.import_daily_report(report)
for info in result.info():
print(info)
except HTTPError:
if current == today:
print('Unable to fetch report. It may not be available yet.')
Expand Down
8 changes: 3 additions & 5 deletions covidapi/jh_cleaning/lookup_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def __init__(self):
def match_region(self, region_match):
"""
Attempt to lookup a region using the names provided
None = just ignore this region
"""
try:
return self.region_matches[region_match]
Expand All @@ -30,11 +32,7 @@ def match_region(self, region_match):
fuzzier = map_county_to_admin2(fuzzy) if fuzzy else None
fuzziest = map_boat_passengers(fuzzier) if fuzzier else None

try:
return self.region_matches[fuzziest] if fuzziest else None
except KeyError:
print(fuzziest)
raise
return self.region_matches[fuzziest] if fuzziest else None

def lookup_by_id(self, uid):
"""
Expand Down
4 changes: 3 additions & 1 deletion import_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
args = parser.parse_args()

try:
args.func(args)
func = args.func
except AttributeError:
parser.print_help()
else:
func(args)

0 comments on commit 9006e24

Please sign in to comment.