diff --git a/.gitignore b/.gitignore index a07fedc..1dd71fb 100644 --- a/.gitignore +++ b/.gitignore @@ -99,3 +99,4 @@ functions/pumpkinpal-b60be-firebase-adminsdk-jtia5-63bbe231d8.json *.csv pumpkinpal-b60be-firebase-adminsdk-jtia5-e193545340.json Incentives.md +/venv diff --git a/functions/GPC_design_brief.md b/functions/GPC_design_brief.md new file mode 100644 index 0000000..a094c77 --- /dev/null +++ b/functions/GPC_design_brief.md @@ -0,0 +1,134 @@ +# GPC Database Design Brief + +## Overview +Database design for storing and processing Giant Pumpkin Commonwealth (GPC) competition results across all categories, with support for both historical data migration and future data submissions. + +## Categories +- Pumpkins (P) +- Squash (S) +- Long Gourds (L) +- Watermelon (W) +- Tomato (T) +- Field Pumpkins (F) +- Bushel Gourds (B) +- Marrows (M) + +## Data Flow + +### 1. Raw Data Layer +- Schema: `raw_data` +- Tables: `{category}_{year}` (e.g., `p_2024`, `s_2024`) +- Purpose: Direct mirror of bigpumpkins.com data structure +- Source: Scraped from URLs following pattern: `bigpumpkins.com/WeighoffResultsGPC.aspx?c={category}&y={year}` + +### 2. Staging Layer +- Schema: `staging` +- Tables: + - `entries_staging` + - `growers_staging` + - `sites_staging` +- Purpose: Cleaned and normalized data, ready for processing +- Processing: Data validation, deduplication, and standardization + +### 3. Core Layer +- Schema: `core` +- Tables: + - `entries`: Single source of truth for all competition entries + - Full column set from staging + - Properly typed and validated + - Indexed for efficient querying + - Supports cross-category analysis + - Maintains complete entry history +- Purpose: Clean, validated data ready for analytics +- Features: + - Data integrity constraints + - Efficient indexing + - Full audit trail + - Cross-category querying support + +### 4. Site Submission Layer +- Schema: `submissions` +- Tables: + - `site_entries_pending` + - `site_entries_verified` +- Purpose: Handle new data submissions from site leaders +- Flow: Pending → Admin Verification → Core Layer + +### 5. Analytics Layer +- Schema: `analytics` +- Tables/Views: + - `annual_records`: Year-by-year records and achievements + - `state_rankings`: Performance by state/province + - `site_performance`: Site-level statistics and trends + - `grower_records`: Individual achievements and records + - `category_trends`: Category-specific analysis + - `competition_metrics`: Entry and participation analysis + - `weight_statistics`: Statistical distributions + - `growth_patterns`: Genetics and growing condition analysis +- Purpose: Derived insights and statistics +- Features: + - Built from core.entries + - Optimized for specific analyses + - Materialized views where needed + - Rebuilable from core data + +### 6. Public Layer +- Schema: `public` +- Views: + - TBD +- Purpose: Public-facing views of data + +## Data Processing Stages + +1. **Historical Data ETL** + - Scrape → Raw Layer + - Raw → Staging → Core + - Core → Analytics + +2. **New Data Submissions** + - Site Upload → Submissions Layer + - Admin Verification + - Verified Data → Core Layer + - Core → Analytics Update + +## Data Relationships + +### Primary Entities +- Growers (unique across all categories) +- Sites (unique across all categories) +- Entries (category-specific, linked to growers and sites) +- Categories (static reference table) + +### Key Relationships +- Each entry belongs to one category +- Each entry belongs to one grower +- Each entry belongs to one site +- Each site can host multiple categories +- Each grower can compete in multiple categories + +## Identity Management + +### Grower Identification +- Challenge: Grower names vary across entries (spelling, formatting, moves) +- Solution: Two-tier identification system + - System-generated unique ID (primary key) + - Processed name field (standardized format) + - Historical name variations tracked + - Location history maintained + +### Entry Uniqueness +- Challenge: Multiple entries could share weight/grower combination +- Solution: Composite natural key includes + - Category + - Year + - Weight + - Grower ID + - Site ID + - Entry date/time + +### Data Matching Strategy +- Raw data processed through name standardization +- Fuzzy matching algorithms for historical data +- Manual review interface for uncertain matches +- Grower merge capability for duplicate resolution + diff --git a/functions/etl_processor.py b/functions/etl_processor.py new file mode 100644 index 0000000..da894d2 --- /dev/null +++ b/functions/etl_processor.py @@ -0,0 +1,1206 @@ +import pandas as pd +from nameparser import HumanName +from fuzzywuzzy import process, fuzz +import logging +from typing import Dict, List, Any, Optional +from tqdm import tqdm +from supabase import create_client, Client +from dotenv import load_dotenv +import os +import re +import sys +import time +from datetime import datetime +import json + +# Setup logging +log_filename = f'etl_pipeline_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler(log_filename), + logging.StreamHandler() + ] +) +logger = logging.getLogger(__name__) + +def escape_sql_string(value): + """Escape single quotes in SQL strings.""" + return str(value).replace("'", "''") + +class GPCPipeline: + def __init__(self, supabase: Client): + """Initialize the ETL processor with Supabase client.""" + self.supabase = supabase + self.batch_size = 500 # Increased from 50 to 500 + self.max_retries = 3 + self.retry_delay = 2 # seconds + self._verify_database_access() + self._ensure_staging_tables() + + def _verify_database_access(self) -> None: + """Verify database access.""" + try: + # Use RPC to query with schema + query = """ + SELECT * FROM raw_data.p_2005 LIMIT 1; + """ + result = self.supabase.rpc('select_from_raw_data', {'table_name': 'p_2005'}).execute() + logger.info("Successfully verified database access") + + except Exception as e: + logger.error(f"Error verifying database access: {str(e)}") + if hasattr(e, 'message'): + logger.error(f"Error message: {e.message}") + raise Exception("Failed to verify database access. Please check your credentials and database setup.") + + def _ensure_staging_tables(self) -> None: + """Ensure staging tables exist with correct schema.""" + try: + # Drop any existing views first + drop_views_sql = """ + DROP VIEW IF EXISTS staging.name_changes CASCADE; + DROP VIEW IF EXISTS staging.site_changes CASCADE; + DROP VIEW IF EXISTS staging.data_quality_view CASCADE; + """ + self.supabase.rpc('execute_sql', {'query': drop_views_sql}).execute() + logger.info("Dropped existing views") + + # Drop and recreate entries_staging table with CASCADE + drop_entries_sql = """ + DROP TABLE IF EXISTS staging.entries_staging CASCADE; + """ + self.supabase.rpc('execute_sql', {'query': drop_entries_sql}).execute() + + create_entries_sql = """ + CREATE TABLE staging.entries_staging ( + entry_id SERIAL PRIMARY KEY, + category CHAR(1), + year INTEGER, + place TEXT, + weight_lbs NUMERIC, + original_grower_name TEXT, + processed_grower_name TEXT, + city TEXT, + state_prov TEXT, + country TEXT, + gpc_site TEXT, + seed_mother TEXT, + pollinator_father TEXT, + ott NUMERIC, + est_weight NUMERIC, + entry_type TEXT, + data_quality_score INTEGER, + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + ); + """ + self.supabase.rpc('execute_sql', {'query': create_entries_sql}).execute() + logger.info("Recreated entries_staging table") + + # Create name_changes as a view instead of a table + create_name_changes_sql = """ + CREATE VIEW staging.name_changes AS + SELECT + ROW_NUMBER() OVER () as change_id, + original_grower_name as original_name, + processed_grower_name as processed_name, + CASE + WHEN original_grower_name = processed_grower_name THEN 1.0 + WHEN processed_grower_name = 'Unknown' THEN 0.0 + ELSE 0.8 + END as confidence_score, + CASE + WHEN original_grower_name = processed_grower_name THEN 'NO_CHANGE' + WHEN processed_grower_name = 'Unknown' THEN 'INVALID_NAME' + ELSE 'STANDARDIZED' + END as change_type, + MIN(created_at) as first_seen_at + FROM staging.entries_staging + WHERE original_grower_name IS NOT NULL + GROUP BY original_grower_name, processed_grower_name; + """ + self.supabase.rpc('execute_sql', {'query': create_name_changes_sql}).execute() + logger.info("Created name_changes view") + + # Create site_standardization as a view + create_site_std_sql = """ + CREATE VIEW staging.site_standardization AS + SELECT + ROW_NUMBER() OVER () as site_id, + gpc_site as original_site, + gpc_site as standardized_site, + city, + state_prov, + country, + 1.0 as confidence_score, + MIN(created_at) as first_seen_at + FROM staging.entries_staging + WHERE gpc_site IS NOT NULL + GROUP BY gpc_site, city, state_prov, country; + """ + self.supabase.rpc('execute_sql', {'query': create_site_std_sql}).execute() + logger.info("Created site_standardization view") + + # Create data_quality_issues as a view + create_quality_issues_sql = """ + CREATE VIEW staging.data_quality_issues AS + WITH quality_checks AS ( + SELECT + entry_id, + 'WEIGHT' as field_name, + weight_lbs::text as original_value, + weight_lbs::text as corrected_value, + CASE + WHEN weight_lbs <= 0 THEN 'INVALID_WEIGHT' + WHEN weight_lbs > 3000 THEN 'SUSPICIOUS_WEIGHT' + ELSE 'VALID_WEIGHT' + END as issue_type, + CASE + WHEN weight_lbs > 0 AND weight_lbs <= 3000 THEN 1.0 + ELSE 0.5 + END as confidence_score, + created_at + FROM staging.entries_staging + WHERE weight_lbs IS NOT NULL + + UNION ALL + + SELECT + entry_id, + 'GROWER_NAME' as field_name, + original_grower_name as original_value, + processed_grower_name as corrected_value, + CASE + WHEN processed_grower_name = 'Unknown' THEN 'INVALID_NAME' + WHEN original_grower_name != processed_grower_name THEN 'STANDARDIZED_NAME' + ELSE 'VALID_NAME' + END as issue_type, + CASE + WHEN original_grower_name = processed_grower_name THEN 1.0 + WHEN processed_grower_name = 'Unknown' THEN 0.0 + ELSE 0.8 + END as confidence_score, + created_at + FROM staging.entries_staging + WHERE original_grower_name IS NOT NULL + ) + SELECT + ROW_NUMBER() OVER () as issue_id, + entry_id, + issue_type, + field_name, + original_value, + corrected_value, + confidence_score, + created_at + FROM quality_checks + WHERE issue_type NOT IN ('VALID_WEIGHT', 'VALID_NAME'); + """ + self.supabase.rpc('execute_sql', {'query': create_quality_issues_sql}).execute() + logger.info("Created data_quality_issues view") + + except Exception as e: + logger.error(f"Error ensuring staging tables exist: {str(e)}") + if hasattr(e, 'message'): + logger.error(f"Error message: {e.message}") + raise + + def _fetch_raw_data(self, year: int, category: str) -> Optional[pd.DataFrame]: + """Fetch data from raw_data schema with retries.""" + table_name = f"{category.lower()}_{year}" + + for attempt in range(self.max_retries): + try: + # Use RPC to query with schema + result = self.supabase.rpc('select_from_raw_data', { + 'table_name': table_name + }).execute() + + if result.data: + df = pd.DataFrame(result.data) + logger.info(f"Successfully fetched {len(df)} records from raw_data.{table_name}") + return df + else: + logger.warning(f"No data found in raw_data.{table_name}") + return None + + except Exception as e: + if attempt < self.max_retries - 1: + logger.warning(f"Retry {attempt + 1} for raw_data.{table_name}: {str(e)}") + time.sleep(self.retry_delay) + else: + logger.error(f"Failed to fetch data from raw_data.{table_name} after {self.max_retries} attempts") + logger.error(f"Error details: {str(e)}") + if hasattr(e, 'message'): + logger.error(f"Error message: {e.message}") + raise + + return None + + def _clean_numeric(self, value: Any) -> Optional[float]: + """Clean and convert numeric values.""" + if pd.isna(value): + return None + try: + # Remove commas and convert to float + cleaned = str(value).replace(',', '').strip() + return float(cleaned) if cleaned else None + except (ValueError, TypeError): + return None + + def _process_name(self, name: str) -> str: + """Process grower name with improved handling.""" + if pd.isna(name): + return "Unknown" + + name = str(name).strip() + # Remove special characters and extra spaces + name = re.sub(r'[&/\-]+', ' ', name) + name = re.sub(r'\s+', ' ', name) + name = re.sub(r'\d+', '', name) + + # Handle team names + if "team" in name.lower(): + name = re.sub(r'\bteam\b|\bthe\b', '', name, flags=re.I).strip() + return f"Team {name.title()}" + + # Parse individual names + try: + human_name = HumanName(name) + if human_name.last: + return f"{human_name.last.title()}, {human_name.first.title()}".strip() + except Exception: + pass + + return name.title().strip() + + def _batch_insert(self, table_name: str, records: List[Dict], schema: str = 'staging') -> None: + """Insert records in batches with improved error handling.""" + if not records: + logger.warning(f"No records to insert into {schema}.{table_name}") + return + + successful_inserts = 0 + failed_batches = [] + + # Construct the SQL with the values directly + for i in range(0, len(records), self.batch_size): + batch = records[i:i + self.batch_size] + + # Convert batch to SQL values + values_list = [] + for record in batch: + if table_name == 'sites_staging': + values = ( + f"({record['year']}, " + f"'{record['gpc_site'].replace("'", "''")}', '{record['city'].replace("'", "''")}', " + f"'{record['state_prov'].replace("'", "''")}', '{record['country'].replace("'", "''")}')" + ) + values_list.append(values) + else: + values = ( + f"('{record['category'].replace("'", "''")}', {record['year']}, '{record['place'].replace("'", "''")}', " + f"{record['weight_lbs'] if record['weight_lbs'] is not None else 'NULL'}, " + f"'{record['processed_grower_name'].replace("'", "''")}', " + f"'{record['original_grower_name'].replace("'", "''")}', '{record['city'].replace("'", "''")}', " + f"'{record['state_prov'].replace("'", "''")}', '{record['country'].replace("'", "''")}', " + f"'{record['gpc_site'].replace("'", "''")}', " + f"'{record['seed_mother'].replace("'", "''")}', '{record['pollinator_father'].replace("'", "''")}', " + f"{record['ott'] if record['ott'] is not None else 'NULL'}, " + f"{record['est_weight'] if record['est_weight'] is not None else 'NULL'}, " + f"'{record['entry_type'].replace("'", "''")}')" + ) + values_list.append(values) + + # Construct appropriate SQL based on table + if table_name == 'sites_staging': + sql = f""" + INSERT INTO {schema}.{table_name} ( + year, gpc_site, city, state_prov, country + ) + VALUES {','.join(values_list)}; + """ + else: + sql = f""" + INSERT INTO {schema}.{table_name} ( + category, year, place, weight_lbs, processed_grower_name, + original_grower_name, city, state_prov, country, gpc_site, + seed_mother, pollinator_father, ott, est_weight, entry_type + ) + VALUES {','.join(values_list)}; + """ + + for attempt in range(self.max_retries): + try: + # Use execute_sql with 'query' parameter + self.supabase.rpc('execute_sql', {'query': sql}).execute() + successful_inserts += len(batch) + logger.info(f"Successfully inserted batch of {len(batch)} records into {schema}.{table_name}") + break + except Exception as e: + error_msg = str(e) + logger.warning(f"Insert attempt {attempt + 1} failed: {error_msg}") + + if attempt == self.max_retries - 1: + logger.error(f"Failed to insert batch after {self.max_retries} attempts") + failed_batches.append((i, batch)) + # Log the actual data that failed + logger.error(f"Failed SQL: {sql}") + logger.error(f"Failed batch sample: {json.dumps(batch[0], indent=2, default=str)}") + self._save_failed_records([(i, batch)], f"{schema}_{table_name}") + else: + time.sleep(self.retry_delay) + + if failed_batches: + total_failed = sum(len(batch) for _, batch in failed_batches) + logger.error(f"Failed to insert {total_failed} records into {schema}.{table_name}") + + logger.info(f"Completed inserting {successful_inserts} records into {schema}.{table_name}") + + def _save_failed_records(self, failed_batches: List[tuple], table_name: str) -> None: + """Save failed records to a file for later analysis.""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"failed_records_{table_name}_{timestamp}.json" + + failed_records = [] + for batch_index, batch in failed_batches: + for record in batch: + failed_records.append({ + 'batch_index': batch_index, + 'record': record + }) + + try: + with open(filename, 'w') as f: + json.dump(failed_records, f, indent=2, default=str) + logger.info(f"Saved failed records to {filename}") + except Exception as e: + logger.error(f"Failed to save failed records: {str(e)}") + + def process_year(self, year: int, category: str) -> None: + """Process a single year's data with improved error handling.""" + logger.info(f"Starting processing for year {year} category {category}") + + # Fetch raw data + df = self._fetch_raw_data(year, category) + if df is None or df.empty: + logger.warning(f"No data to process for year {year} category {category}") + return + + try: + # Process names + logger.info("Processing grower names...") + df['processed_grower_name'] = df['grower_name'].apply(self._process_name) + + # Process by state/province for better matching + states = df['state_prov'].unique() + for state in states: + state_mask = df['state_prov'] == state + state_names = df.loc[state_mask, 'processed_grower_name'].unique() + + # Perform fuzzy matching within state + for name1 in state_names: + matches = process.extract(name1, state_names, scorer=fuzz.token_sort_ratio) + similar_names = [match[0] for match in matches if match[1] > 85 and match[0] != name1] + + if similar_names: + # Use the alphabetically first name as the standard + standard_name = min([name1] + similar_names) + df.loc[df['processed_grower_name'].isin(similar_names), 'processed_grower_name'] = standard_name + + logger.info(f"Processing category: {category}") + + # Prepare entries for staging + entries = [] + sites = set() # Use set for unique sites + seen_entries = set() # Track unique entries for deduplication + + for _, row in df.iterrows(): + # Determine entry type based on place field + place = str(row.get('place', '')).strip() + entry_type = 'Unknown' + if place.upper() == 'DMG': + entry_type = 'Damaged' + elif place.upper() == 'EXH': + entry_type = 'Exhibition' + elif place.isnumeric() or place.replace('T-', '').isnumeric(): + entry_type = 'Official' + elif 'DNQ' in place.upper(): + entry_type = 'Disqualified' + + # Create entry record + entry = { + 'category': category, + 'year': year, + 'place': place or 'Unknown', + 'weight_lbs': self._clean_numeric(row.get('weight_lbs')), + 'processed_grower_name': row['processed_grower_name'], + 'original_grower_name': str(row.get('grower_name', '')).strip() or 'Unknown', + 'city': str(row.get('city', '')).strip() or 'Unknown', + 'state_prov': str(row.get('state_prov', '')).strip() or 'Unknown', + 'country': str(row.get('country', '')).strip() or 'Unknown', + 'gpc_site': str(row.get('gpc_site', '')).strip() or 'Unknown', + 'seed_mother': str(row.get('seed_mother', '')).strip() or 'Unknown', + 'pollinator_father': str(row.get('pollinator_father', '')).strip() or 'Unknown', + 'ott': self._clean_numeric(row.get('ott')), + 'est_weight': self._clean_numeric(row.get('est_weight')), + 'entry_type': entry_type + } + + # Create deduplication key + dedup_key = ( + entry['processed_grower_name'], + entry['gpc_site'], + entry['year'], + entry['weight_lbs'], + entry['category'] + ) + + # Only add if we haven't seen this exact entry before + if dedup_key not in seen_entries: + entries.append(entry) + seen_entries.add(dedup_key) + + # Process site information + site_key = ( + year, + str(row.get('gpc_site', '')).strip() or 'Unknown', + str(row.get('city', '')).strip() or 'Unknown', + str(row.get('state_prov', '')).strip() or 'Unknown', + str(row.get('country', '')).strip() or 'Unknown' + ) + + if site_key not in sites: + sites.add(site_key) + + # Convert sites set to list of dictionaries + sites_list = [ + { + 'year': site[0], + 'gpc_site': site[1], + 'city': site[2], + 'state_prov': site[3], + 'country': site[4] + } + for site in sites + ] + + # Insert entries into staging + logger.info("Inserting processed data into staging tables...") + if entries: + self._batch_insert('entries_staging', entries) + logger.info(f"Inserted {len(entries)} unique entries (removed {len(df) - len(entries)} duplicates)") + + # Insert sites into staging + logger.info("Inserting sites into staging tables...") + if sites_list: + self._batch_insert('sites_staging', sites_list) + + except Exception as e: + logger.error(f"Error processing year {year} category {category}: {str(e)}") + if hasattr(e, 'message'): + logger.error(f"Error message: {e.message}") + raise + + def process_data(self, df, category): + """Process the raw data and return entries and sites dataframes.""" + # Process entries + entries = df.copy() + entries['category'] = category + entries['entry_type'] = entries['Place'].apply(lambda x: 'dmg' if x == 'DMG' else ('exh' if x == 'EXH' else 'official')) + + # Process sites - removed category + sites = df[['GPC Site', 'City', 'State/Prov', 'Country']].copy() + sites = sites.rename(columns={'GPC Site': 'site_name'}) + sites = sites.drop_duplicates() + + return entries, sites + + def insert_entries_batch(self, entries_batch, year): + """Insert a batch of entries into the staging table.""" + values = [] + for _, row in entries_batch.iterrows(): + entry_values = f"({year}, " + entry_values += f"'{escape_sql_string(str(row['Place']))}', " + entry_values += f"{row['Weight']} as weight, " + entry_values += f"'{escape_sql_string(row['Processed Name'])}', " + entry_values += f"'{escape_sql_string(row['City'])}', " + entry_values += f"'{escape_sql_string(row['State/Prov'])}', " + entry_values += f"'{escape_sql_string(row['Country'])}', " + entry_values += f"'{escape_sql_string(row['GPC Site'])}', " + entry_values += f"'{escape_sql_string(str(row['Seed Mother']))}', " + entry_values += f"'{escape_sql_string(str(row['Pollinator/Father']))}', " + entry_values += f"{row['OTT'] if pd.notna(row['OTT']) else 'NULL'}, " + entry_values += f"{row['Est Weight'] if pd.notna(row['Est Weight']) else 'NULL'}, " + entry_values += f"'{escape_sql_string(row['entry_type'])}', " + entry_values += f"'{escape_sql_string(row['category'])}')" + values.append(entry_values) + + values_str = ', '.join(values) + + insert_query = f""" + INSERT INTO staging.entries_staging + (year, place, weight, grower_name, city, state_prov, country, gpc_site, + seed_mother, pollinator_father, ott, est_weight, entry_type, category) + VALUES {values_str}; + """ + + try: + self.supabase.rpc('execute_sql', {'query': insert_query}).execute() + logger.info(f"Successfully inserted batch of {len(entries_batch)} records into staging.entries_staging") + except Exception as e: + logger.error(f"Error inserting entries batch: {str(e)}") + raise + + def insert_sites_batch(self, sites_batch, year): + """Insert a batch of sites into the staging table.""" + values = [] + for _, row in sites_batch.iterrows(): + site_values = f"({year}, " + site_values += f"'{escape_sql_string(row['site_name'])}', " + site_values += f"'{escape_sql_string(row['City'])}', " + site_values += f"'{escape_sql_string(row['State/Prov'])}', " + site_values += f"'{escape_sql_string(row['Country'])}')" + values.append(site_values) + + values_str = ', '.join(values) + + insert_query = f""" + INSERT INTO staging.sites_staging + (year, site_name, city, state_prov, country) + VALUES {values_str}; + """ + + try: + self.supabase.rpc('execute_sql', {'query': insert_query}).execute() + logger.info(f"Successfully inserted batch of {len(sites_batch)} records into staging.sites_staging") + except Exception as e: + logger.error(f"Error inserting sites batch: {str(e)}") + raise + + def _ensure_core_tables(self) -> None: + """Ensure core tables exist with correct schema.""" + try: + # Create core schema if it doesn't exist + create_schema_sql = """ + CREATE SCHEMA IF NOT EXISTS core; + """ + self.supabase.rpc('execute_sql', {'query': create_schema_sql}).execute() + logger.info("Created core schema") + + # Drop and recreate entries table + drop_entries_sql = """ + DROP TABLE IF EXISTS core.entries CASCADE; + """ + self.supabase.rpc('execute_sql', {'query': drop_entries_sql}).execute() + logger.info("Dropped existing core.entries table") + + create_entries_sql = """ + CREATE TABLE core.entries ( + entry_id SERIAL PRIMARY KEY, + category CHAR(1), + year INTEGER, + place TEXT, + weight_lbs NUMERIC, + grower_name TEXT, + original_grower_name TEXT, + city TEXT, + state_prov TEXT, + country TEXT, + gpc_site TEXT, + seed_mother TEXT, + pollinator_father TEXT, + ott NUMERIC, + est_weight NUMERIC, + entry_type TEXT, + created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP + ); + """ + self.supabase.rpc('execute_sql', {'query': create_entries_sql}).execute() + logger.info("Created core.entries table") + + # Add indexes for common queries + indexes_sql = """ + CREATE INDEX IF NOT EXISTS entries_category_year_idx ON core.entries (category, year); + CREATE INDEX IF NOT EXISTS entries_grower_name_idx ON core.entries (grower_name); + CREATE INDEX IF NOT EXISTS entries_gpc_site_idx ON core.entries (gpc_site); + CREATE INDEX IF NOT EXISTS entries_weight_idx ON core.entries (weight_lbs DESC); + """ + self.supabase.rpc('execute_sql', {'query': indexes_sql}).execute() + logger.info("Created indexes on core.entries table") + + except Exception as e: + logger.error(f"Error ensuring core tables: {str(e)}") + if hasattr(e, 'message'): + logger.error(f"Error message: {e.message}") + raise + + def _track_name_change(self, original_name: str, processed_name: str, confidence: float, change_type: str) -> None: + """Track a grower name change in the name_changes table.""" + sql = """ + INSERT INTO staging.name_changes ( + original_name, processed_name, confidence_score, change_type + ) VALUES ( + $1, $2, $3, $4 + ); + """ + try: + self.supabase.rpc('execute_sql', { + 'query': sql, + 'params': [original_name, processed_name, confidence, change_type] + }).execute() + except Exception as e: + logger.warning(f"Failed to track name change: {str(e)}") + + def _track_quality_issue(self, entry_id: int, issue_type: str, field: str, + original: str, corrected: str, confidence: float) -> None: + """Track a data quality issue.""" + sql = """ + INSERT INTO staging.data_quality_issues ( + entry_id, issue_type, field_name, original_value, + corrected_value, confidence_score + ) VALUES ( + $1, $2, $3, $4, $5, $6 + ); + """ + try: + self.supabase.rpc('execute_sql', { + 'query': sql, + 'params': [entry_id, issue_type, field, original, corrected, confidence] + }).execute() + except Exception as e: + logger.warning(f"Failed to track quality issue: {str(e)}") + + def create_analytics_views(self) -> None: + """Create materialized views for analytics.""" + try: + # Create analytics schema if it doesn't exist + create_schema_sql = """ + CREATE SCHEMA IF NOT EXISTS analytics; + """ + self.supabase.rpc('execute_sql', {'query': create_schema_sql}).execute() + logger.info("Created analytics schema") + + # Create and populate category points reference table + try: + category_points_sql = """ + DROP TABLE IF EXISTS analytics.category_points; + CREATE TABLE analytics.category_points ( + category CHAR(1), + points_per_pound NUMERIC(10,6), + points_per_kg NUMERIC(10,6), + measurement_type TEXT, + PRIMARY KEY (category) + ); + + INSERT INTO analytics.category_points VALUES + ('P', 0.024, 0.052912, 'weight'), -- Atlantic Giant + ('S', 0.039, 0.085984, 'weight'), -- Squash + ('B', 0.140, 0.30865, 'weight'), -- Bushel Gourd + ('W', 0.160, 0.35266, 'weight'), -- Watermelon + ('L', 0.270, 0.10632, 'length'), -- Long Gourd (points per inch/cm) + ('F', 0.290, 0.63929, 'weight'), -- Field Pumpkin + ('M', 0.390, 0.86022, 'weight'), -- Marrow + ('T', 7.000, 15.432, 'weight'); -- Tomato + """ + self.supabase.rpc('execute_sql', {'query': category_points_sql}).execute() + logger.info("Created and populated category_points table") + except Exception as e: + logger.error(f"Error creating category_points table: {str(e)}") + raise + + # Create master gardener calculations view + try: + master_gardener_sql = """ + DROP MATERIALIZED VIEW IF EXISTS analytics.master_gardener_entries; + CREATE MATERIALIZED VIEW analytics.master_gardener_entries AS + WITH ranked_entries AS ( + SELECT + e.grower_name, + e.year, + e.category, + e.weight_lbs, + e.ott as length_inches, + cp.points_per_pound, + cp.measurement_type, + -- Calculate points based on measurement type + CASE + WHEN cp.measurement_type = 'weight' THEN + e.weight_lbs * cp.points_per_pound + WHEN cp.measurement_type = 'length' THEN + e.ott * cp.points_per_pound + ELSE 0 + END as points, + -- Rank within category for the year + ROW_NUMBER() OVER ( + PARTITION BY e.grower_name, e.year, e.category + ORDER BY e.weight_lbs DESC + ) as entry_rank + FROM core.entries e + JOIN analytics.category_points cp ON e.category = cp.category + WHERE e.entry_type NOT IN ('DMG', 'EXH') + ) + SELECT + grower_name, + year, + category, + weight_lbs, + length_inches, + points, + entry_rank + FROM ranked_entries + WHERE entry_rank = 1 -- Only keep best entry per category per grower per year + ORDER BY grower_name, year, points DESC; + + -- Create master gardener qualifiers view (only Qualified) + DROP MATERIALIZED VIEW IF EXISTS analytics.master_gardener_qualifiers; + CREATE MATERIALIZED VIEW analytics.master_gardener_qualifiers AS + WITH grower_yearly_totals AS ( + SELECT + grower_name, + year, + COUNT(DISTINCT category) as categories_entered, + SUM(points) as total_points, + array_agg(DISTINCT category ORDER BY category) as categories, + array_agg(points ORDER BY category) as category_points + FROM analytics.master_gardener_entries + GROUP BY grower_name, year + ) + SELECT + grower_name, + year, + categories_entered, + total_points, + categories, + category_points, + 'Qualified' as qualification_status + FROM grower_yearly_totals + WHERE categories_entered >= 3 -- Must enter at least 3 categories + AND total_points >= 110 -- Only show Qualified + ORDER BY year DESC, total_points DESC; + + -- Create state/province records view + DROP MATERIALIZED VIEW IF EXISTS analytics.state_records; + CREATE MATERIALIZED VIEW analytics.state_records AS + WITH ranked_entries AS ( + SELECT + category, + state_prov, + country, + weight_lbs, + grower_name, + year, + gpc_site, + ROW_NUMBER() OVER ( + PARTITION BY category, state_prov, country + ORDER BY weight_lbs DESC + ) as rank_in_state + FROM core.entries + WHERE entry_type NOT IN ('DMG', 'EXH') + AND state_prov != 'Unknown' + ) + SELECT + category, + state_prov, + country, + weight_lbs, + grower_name, + year, + gpc_site + FROM ranked_entries + WHERE rank_in_state = 1 + ORDER BY category, country, state_prov; + + -- Create country records view + DROP MATERIALIZED VIEW IF EXISTS analytics.country_records; + CREATE MATERIALIZED VIEW analytics.country_records AS + WITH ranked_entries AS ( + SELECT + category, + country, + weight_lbs, + grower_name, + state_prov, + year, + gpc_site, + ROW_NUMBER() OVER ( + PARTITION BY category, country + ORDER BY weight_lbs DESC + ) as rank_in_country + FROM core.entries + WHERE entry_type NOT IN ('DMG', 'EXH') + AND country != 'Unknown' + ) + SELECT + category, + country, + weight_lbs, + grower_name, + state_prov, + year, + gpc_site + FROM ranked_entries + WHERE rank_in_country = 1 + ORDER BY category, country; + """ + self.supabase.rpc('execute_sql', {'query': master_gardener_sql}).execute() + logger.info("Created master gardener and regional records views") + except Exception as e: + logger.error(f"Error creating master gardener and regional records views: {str(e)}") + raise + + # Create heaviest by category view + try: + heaviest_sql = """ + DROP MATERIALIZED VIEW IF EXISTS analytics.heaviest_by_category; + CREATE MATERIALIZED VIEW analytics.heaviest_by_category AS + WITH ranked_entries AS ( + SELECT + category, + weight_lbs, + grower_name, + gpc_site, + state_prov, + country, + year, + ROW_NUMBER() OVER (PARTITION BY category ORDER BY weight_lbs DESC) as rank + FROM core.entries + WHERE entry_type NOT IN ('DMG', 'EXH') + ) + SELECT * + FROM ranked_entries + WHERE rank = 1 + ORDER BY category; + """ + self.supabase.rpc('execute_sql', {'query': heaviest_sql}).execute() + logger.info("Created heaviest_by_category materialized view") + except Exception as e: + logger.error(f"Error creating heaviest_by_category view: {str(e)}") + raise + + # Create annual top ten view + try: + top_ten_sql = """ + DROP MATERIALIZED VIEW IF EXISTS analytics.annual_top_ten; + CREATE MATERIALIZED VIEW analytics.annual_top_ten AS + WITH ranked_entries AS ( + SELECT + category, + year, + weight_lbs, + grower_name, + gpc_site, + state_prov, + country, + ROW_NUMBER() OVER ( + PARTITION BY category, year + ORDER BY weight_lbs DESC + ) as rank + FROM core.entries + WHERE entry_type NOT IN ('DMG', 'EXH') + ) + SELECT * + FROM ranked_entries + WHERE rank <= 10 + ORDER BY category, year DESC, rank; + """ + self.supabase.rpc('execute_sql', {'query': top_ten_sql}).execute() + logger.info("Created annual_top_ten materialized view") + except Exception as e: + logger.error(f"Error creating annual_top_ten view: {str(e)}") + raise + + # Create site records view + try: + site_records_sql = """ + DROP MATERIALIZED VIEW IF EXISTS analytics.site_records; + CREATE MATERIALIZED VIEW analytics.site_records AS + WITH ranked_entries AS ( + SELECT + gpc_site, + category, + weight_lbs, + grower_name, + year, + state_prov, + country, + ROW_NUMBER() OVER ( + PARTITION BY gpc_site, category + ORDER BY weight_lbs DESC + ) as rank + FROM core.entries + WHERE entry_type NOT IN ('DMG', 'EXH') + ) + SELECT * + FROM ranked_entries + WHERE rank = 1 + ORDER BY gpc_site, category; + """ + self.supabase.rpc('execute_sql', {'query': site_records_sql}).execute() + logger.info("Created site_records materialized view") + except Exception as e: + logger.error(f"Error creating site_records view: {str(e)}") + raise + + # Create grower achievements view + try: + grower_achievements_sql = """ + DROP MATERIALIZED VIEW IF EXISTS analytics.grower_achievements; + CREATE MATERIALIZED VIEW analytics.grower_achievements AS + WITH personal_bests AS ( + SELECT + grower_name, + category, + MAX(weight_lbs) as personal_best, + COUNT(*) as total_entries, + COUNT(DISTINCT year) as years_competed, + COUNT(DISTINCT gpc_site) as sites_competed + FROM core.entries + WHERE entry_type NOT IN ('DMG', 'EXH') + GROUP BY grower_name, category + ), + rankings AS ( + SELECT + grower_name, + category, + year, + weight_lbs, + ROW_NUMBER() OVER ( + PARTITION BY category, year + ORDER BY weight_lbs DESC + ) as year_rank + FROM core.entries + WHERE entry_type NOT IN ('DMG', 'EXH') + ) + SELECT + pb.grower_name, + pb.category, + pb.personal_best, + pb.total_entries, + pb.years_competed, + pb.sites_competed, + COUNT(CASE WHEN r.year_rank = 1 THEN 1 END) as first_place_finishes, + COUNT(CASE WHEN r.year_rank <= 3 THEN 1 END) as podium_finishes + FROM personal_bests pb + LEFT JOIN rankings r ON + pb.grower_name = r.grower_name + AND pb.category = r.category + GROUP BY + pb.grower_name, + pb.category, + pb.personal_best, + pb.total_entries, + pb.years_competed, + pb.sites_competed + ORDER BY pb.grower_name, pb.category; + """ + self.supabase.rpc('execute_sql', {'query': grower_achievements_sql}).execute() + logger.info("Created grower_achievements materialized view") + except Exception as e: + logger.error(f"Error creating grower_achievements view: {str(e)}") + raise + + # Create regional records views + try: + regional_records_sql = """ + -- Create regional records view + DROP MATERIALIZED VIEW IF EXISTS analytics.regional_records; + CREATE MATERIALIZED VIEW analytics.regional_records AS + WITH ranked_entries AS ( + SELECT + e.category, + e.weight_lbs, + e.grower_name, + e.gpc_site, + e.year, + e.state_prov, + e.country, + r.region_name, + RANK() OVER ( + PARTITION BY e.category, rm.region_id + ORDER BY e.weight_lbs DESC + ) as rank_in_region + FROM core.entries e + JOIN analytics.region_mappings rm ON + e.country = rm.country AND + (e.state_prov = rm.state_prov OR rm.state_prov IS NULL) + JOIN analytics.regions r ON rm.region_id = r.region_id + WHERE e.entry_type NOT IN ('DMG', 'EXH') + ) + SELECT + category, + region_name, + weight_lbs, + grower_name, + gpc_site, + year, + state_prov, + country + FROM ranked_entries + WHERE rank_in_region = 1 + ORDER BY category, region_name; + + -- Create regional records by year view + DROP MATERIALIZED VIEW IF EXISTS analytics.regional_records_by_year; + CREATE MATERIALIZED VIEW analytics.regional_records_by_year AS + WITH ranked_entries AS ( + SELECT + e.category, + e.weight_lbs, + e.grower_name, + e.gpc_site, + e.year, + e.state_prov, + e.country, + r.region_name, + RANK() OVER ( + PARTITION BY e.category, rm.region_id, e.year + ORDER BY e.weight_lbs DESC + ) as rank_in_region + FROM core.entries e + JOIN analytics.region_mappings rm ON + e.country = rm.country AND + (e.state_prov = rm.state_prov OR rm.state_prov IS NULL) + JOIN analytics.regions r ON rm.region_id = r.region_id + WHERE e.entry_type NOT IN ('DMG', 'EXH') + ) + SELECT + category, + region_name, + year, + weight_lbs, + grower_name, + gpc_site, + state_prov, + country + FROM ranked_entries + WHERE rank_in_region = 1 + ORDER BY category, region_name, year DESC; + """ + self.supabase.rpc('execute_sql', {'query': regional_records_sql}).execute() + logger.info("Created regional records views") + except Exception as e: + logger.error(f"Error creating regional records views: {str(e)}") + raise + + except Exception as e: + logger.error(f"Error creating analytics views: {str(e)}") + if hasattr(e, 'message'): + logger.error(f"Error message: {e.message}") + raise + + def process_staging_to_core(self) -> None: + """Process data from staging to core tables.""" + try: + # Insert from staging to core + insert_sql = """ + INSERT INTO core.entries ( + category, year, place, weight_lbs, grower_name, original_grower_name, + city, state_prov, country, gpc_site, seed_mother, pollinator_father, + ott, est_weight, entry_type + ) + SELECT + category, + year, + place, + weight_lbs, + processed_grower_name as grower_name, + original_grower_name, + city, + state_prov, + country, + gpc_site, + seed_mother, + pollinator_father, + ott, + est_weight, + entry_type + FROM staging.entries_staging; + """ + self.supabase.rpc('execute_sql', {'query': insert_sql}).execute() + logger.info("Successfully processed staging data to core.entries table") + + # Create analytics views + self.create_analytics_views() + logger.info("Successfully created analytics views") + + except Exception as e: + logger.error(f"Error processing staging to core: {str(e)}") + if hasattr(e, 'message'): + logger.error(f"Error message: {e.message}") + raise + + def run_pipeline(self, start_year: int = 2005, end_year: int = 2023, categories: List[str] = ['P', 'F', 'L', 'T', 'S']) -> None: + """Run the complete ETL pipeline.""" + try: + # Initialize all tables + self._ensure_staging_tables() + self._ensure_core_tables() + + # Process each category and year + for category in categories: + for year in range(start_year, end_year + 1): + # Fetch and process raw data to staging + df = self._fetch_raw_data(year, category) + if df is not None: + self._process_data_to_staging(df, category, year) + + # Process staging to core + self.process_staging_to_core() + + logger.info("ETL pipeline completed successfully") + + except Exception as e: + logger.error(f"Error running ETL pipeline: {str(e)}") + if hasattr(e, 'message'): + logger.error(f"Error message: {e.message}") + raise + +def main(): + """Main execution function with improved error handling.""" + load_dotenv() + + # Validate environment variables + required_env_vars = ['SUPABASE_URL', 'SUPABASE_KEY'] + missing_vars = [var for var in required_env_vars if not os.getenv(var)] + if missing_vars: + logger.error(f"Missing required environment variables: {missing_vars}") + sys.exit(1) + + try: + supabase = create_client(os.getenv("SUPABASE_URL"), os.getenv("SUPABASE_KEY")) + pipeline = GPCPipeline(supabase) + + # Ensure core tables exist + pipeline._ensure_core_tables() + + # Define categories and years + categories = ["P", "S", "L", "W", "T", "F", "B", "M"] + years = range(2005, 2025) # Updated to include through 2024 + + # Process each category for each year + for year in years: + for category in categories: + try: + pipeline.process_year(year, category) + except Exception as e: + logger.error(f"Failed to process year {year} category {category}: {str(e)}") + continue + + # After all data is in staging, process to core + try: + logger.info("Processing staging data to core tables...") + pipeline.process_staging_to_core() + logger.info("Successfully processed staging data to core tables") + except Exception as e: + logger.error(f"Failed to process staging to core: {str(e)}") + raise + + except Exception as e: + logger.error(f"Critical error in ETL pipeline: {str(e)}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/functions/import-script.py b/functions/import-script.py index 3780b50..e1da818 100644 --- a/functions/import-script.py +++ b/functions/import-script.py @@ -4,6 +4,7 @@ from firebase_admin import credentials, firestore from decouple import config from datetime import datetime +from tqdm import tqdm # Set up Firestore cred = credentials.Certificate("pumpkinpal-b60be-firebase-adminsdk-jtia5-63bbe231d8.json") # Replace with your service account key file path @@ -24,7 +25,7 @@ def upload_to_firestore(collection, documents): # Split documents into batches of 500 batches = [documents[i:i + 500] for i in range(0, len(documents), 500)] - for batch_documents in batches: + for batch_documents in tqdm(batches, desc=f"Uploading {collection}"): batch = db.batch() for doc_id, data in batch_documents: doc_ref = db.collection(collection).document(doc_id) @@ -38,8 +39,8 @@ def upload_to_firestore(collection, documents): contest_documents = [] pumpkin_documents = [] -# Process each row in the dataframe (only the first # rows for testing) -for index, row in df.iterrows(): +# Process each row in the dataframe +for index, row in tqdm(df.iterrows(), total=len(df), desc="Processing records"): # Create or update grower document grower_data = { "id": row["Processed Name"], @@ -79,6 +80,7 @@ def upload_to_firestore(collection, documents): "city": row["City"], "state": row["State/Prov"], "country": row["Country"], + "entryType": row["entryType"], "timestamp": datetime.now() } pumpkin_documents.append((pumpkin_id, pumpkin_data)) @@ -101,4 +103,3 @@ def upload_to_firestore(collection, documents): print("Pumpkin data upload completed successfully.") except Exception as e: print(f"There were errors during the contest data upload: {e}") - diff --git a/functions/import-to-supabase.py b/functions/import-to-supabase.py new file mode 100644 index 0000000..c644875 --- /dev/null +++ b/functions/import-to-supabase.py @@ -0,0 +1,204 @@ +import os +import pandas as pd +import numpy as np +from supabase import create_client, Client +from dotenv import load_dotenv +from tqdm import tqdm + +def clean_string(value): + """Clean string values and handle None/NaN""" + if pd.isna(value): + return None + return str(value).strip() + +def clean_number(value): + """Convert string numbers with commas to float""" + if pd.isna(value): + return None + if isinstance(value, str): + # Remove commas and convert to float + return float(value.replace(',', '')) + return float(value) + +def clean_data(data_dict): + """Clean dictionary values and remove None values""" + return {k: v for k, v in data_dict.items() if v is not None} + +def handle_name(processed_name, first_name, last_name): + """Handle cases where name fields might be incomplete""" + if pd.isna(last_name) or not last_name.strip(): + # If no last name, use the processed name parts + parts = processed_name.split(',', 1) + if len(parts) > 1: + return parts[0].strip() # Use the part before the comma as last name + return processed_name.strip() # Use the whole name if no comma + return last_name.strip() + +# Load environment variables from the .env file +load_dotenv() + +# Get Supabase URL and Key from environment variables +SUPABASE_URL = os.getenv('SUPABASE_URL') +SUPABASE_KEY = os.getenv('SUPABASE_KEY') + +if not SUPABASE_URL or not SUPABASE_KEY: + raise ValueError("Please set SUPABASE_URL and SUPABASE_KEY in your .env file.") + +# Initialize Supabase client +supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) + +# Load preprocessed data +print("Loading data...") +df = pd.read_csv('preprocessed-bigpumpkins.csv') + +# Convert numeric fields +df['Weight (lbs)'] = df['Weight (lbs)'].apply(clean_number) +df['Est. Weight'] = df['Est. Weight'].apply(clean_number) +df['OTT'] = df['OTT'].apply(clean_number) +df['Pct. Chart'] = df['Pct. Chart'].apply(clean_number) + +# Rename columns to match database fields +df.rename(columns={ + 'Processed Name': 'processed_name', + 'First Name': 'first_name', + 'Last Name': 'last_name', + 'State/Prov': 'state_prov', + 'Weight (lbs)': 'weight', + 'Est. Weight': 'est_weight', + 'GPC Site': 'gpc_site', + 'Place': 'place', + 'Seed (Mother)': 'seed_mother', + 'Pollinator (Father)': 'seed_father', + 'OTT': 'ott', + 'Pct. Chart': 'pct_chart', + 'Year': 'year', + 'City': 'city', + 'Country': 'country', + 'entryType': 'entry_type' +}, inplace=True) + +# Clean 'gpc_site' by replacing slashes +df['gpc_site'] = df['gpc_site'].str.replace('/', '', regex=False) + +# Remove duplicates and prepare growers data +print("Preparing growers data...") +growers_df = df[['processed_name', 'first_name', 'last_name', 'city', 'state_prov', 'country']].drop_duplicates() + +# Insert growers into Supabase +print("Inserting growers into Supabase...") +grower_ids = {} +for index, row in tqdm(growers_df.iterrows(), total=growers_df.shape[0], desc='Growers'): + last_name = handle_name(row['processed_name'], row['first_name'], row['last_name']) + grower_data = clean_data({ + 'processed_name': clean_string(row['processed_name']), + 'first_name': clean_string(row['first_name']), + 'last_name': last_name, # Use processed last name + 'city': clean_string(row['city']), + 'state_prov': clean_string(row['state_prov']), + 'country': clean_string(row['country']) + }) + + try: + data = supabase.table('growers').insert(grower_data).execute() + grower_id = data.data[0]['id'] + grower_ids[row['processed_name']] = grower_id + except Exception as e: + print(f"Error inserting grower {row['processed_name']}: {str(e)}") + # Retry once on connection error + if 'ConnectionTerminated' in str(e): + try: + print("Retrying after connection error...") + data = supabase.table('growers').insert(grower_data).execute() + grower_id = data.data[0]['id'] + grower_ids[row['processed_name']] = grower_id + except Exception as retry_e: + print(f"Retry failed for grower {row['processed_name']}: {str(retry_e)}") + continue + +# Remove duplicates and prepare sites data +print("Preparing sites data...") +sites_df = df[['gpc_site', 'city', 'state_prov', 'country', 'year']].drop_duplicates() +sites_df.rename(columns={'gpc_site': 'name'}, inplace=True) + +# Insert sites into Supabase with retry logic +print("Inserting sites into Supabase...") +site_ids = {} +for index, row in tqdm(sites_df.iterrows(), total=sites_df.shape[0], desc='Sites'): + site_data = clean_data({ + 'name': clean_string(row['name']), + 'city': clean_string(row['city']), + 'state_prov': clean_string(row['state_prov']), + 'country': clean_string(row['country']), + 'year': int(row['year']) if pd.notna(row['year']) else None + }) + + max_retries = 3 + for attempt in range(max_retries): + try: + data = supabase.table('sites').insert(site_data).execute() + site_id = data.data[0]['id'] + site_ids[(row['name'], row['year'])] = site_id + break + except Exception as e: + if attempt == max_retries - 1: # Last attempt + print(f"Error inserting site {row['name']} ({row['year']}): {str(e)}") + elif 'ConnectionTerminated' in str(e) or 'Server disconnected' in str(e): + print(f"Connection error, retrying... ({attempt + 1}/{max_retries})") + import time + time.sleep(1) # Wait 1 second before retrying + else: + print(f"Error inserting site {row['name']} ({row['year']}): {str(e)}") + break + +# Prepare weigh-offs data +print("Preparing weigh-offs data...") +weigh_offs_rows = [] +for index, row in tqdm(df.iterrows(), total=df.shape[0], desc='Weigh-Offs'): + grower_id = grower_ids.get(row['processed_name']) + site_id = site_ids.get((row['gpc_site'], row['year'])) + + if not grower_id: + print(f"Missing grower ID for {row['processed_name']} at index {index}") + continue + if not site_id: + print(f"Missing site ID for {row['gpc_site']} ({row['year']}) at index {index}") + continue + + weigh_off_data = clean_data({ + 'weight': clean_number(row['weight']), + 'place': clean_string(row['place']), + 'entry_type': clean_string(row['entry_type']), + 'seed_mother': clean_string(row['seed_mother']), + 'seed_father': clean_string(row['seed_father']), + 'ott': clean_number(row['ott']), + 'est_weight': clean_number(row['est_weight']), + 'pct_chart': clean_number(row['pct_chart']), + 'year': int(row['year']) if pd.notna(row['year']) else None, + 'grower_id': grower_id, + 'site_id': site_id + }) + + weigh_offs_rows.append(weigh_off_data) + +# Batch insert weigh-offs with retry logic +print("Inserting weigh-offs into Supabase...") +batch_size = 500 +for i in tqdm(range(0, len(weigh_offs_rows), batch_size), desc='Batch Insertion'): + batch = weigh_offs_rows[i:i + batch_size] + max_retries = 3 + for attempt in range(max_retries): + try: + data = supabase.table('weigh_offs').insert(batch).execute() + break + except Exception as e: + if attempt == max_retries - 1: # Last attempt + print(f"Error inserting batch starting at index {i}: {str(e)}") + elif 'ConnectionTerminated' in str(e) or 'Server disconnected' in str(e): + print(f"Connection error, retrying... ({attempt + 1}/{max_retries})") + import time + time.sleep(1) # Wait 1 second before retrying + else: + print(f"Error inserting batch starting at index {i}: {str(e)}") + break + +print("Data import completed successfully.") \ No newline at end of file diff --git a/functions/index.js b/functions/index.js index 8d2f009..1e0cd35 100644 --- a/functions/index.js +++ b/functions/index.js @@ -350,90 +350,121 @@ res.send(` async function calculateGlobalRankings() { const db = admin.firestore(); const pumpkinsCollection = db.collection('Stats_Pumpkins'); + const categories = ['overall', 'official', 'nonDmg', 'nonExh']; try { - const pumpkinsSnapshot = await pumpkinsCollection.get(); + const batchSize = 1000; + let lastDoc = null; + let allPumpkins = []; + + // Fetch all pumpkins + while (true) { + let query = pumpkinsCollection.orderBy('weight', 'desc').limit(batchSize); + if (lastDoc) { + query = query.startAfter(lastDoc); + } - if (pumpkinsSnapshot.empty) { - console.log('No matching pumpkins.'); - return; - } + const pumpkinsSnapshot = await query.get(); + if (pumpkinsSnapshot.empty) break; - const pumpkins = []; - const yearlyPumpkins = new Map(); // Use Map to store pumpkins grouped by year + const pumpkins = pumpkinsSnapshot.docs.map(doc => ({...doc.data(), id: doc.id})); + allPumpkins = allPumpkins.concat(pumpkins); + lastDoc = pumpkinsSnapshot.docs[pumpkinsSnapshot.docs.length - 1]; - pumpkinsSnapshot.forEach(doc => { - const pumpkin = doc.data(); - if (pumpkin.place !== 'DMG' && typeof pumpkin.weight === 'number') { - pumpkins.push(pumpkin); + console.log(`Fetched ${allPumpkins.length} pumpkins`); + } - // Group pumpkins by year - if (!yearlyPumpkins.has(pumpkin.year)) { - yearlyPumpkins.set(pumpkin.year, []); - } - yearlyPumpkins.get(pumpkin.year).push(pumpkin); + // Calculate rankings for each category + const rankings = {}; + categories.forEach(category => { + let filteredPumpkins = allPumpkins; + if (category === 'official') { + filteredPumpkins = allPumpkins.filter(p => p.place.toUpperCase() !== 'DMG' && p.place.toUpperCase() !== 'EXH'); + } else if (category === 'nonDmg') { + filteredPumpkins = allPumpkins.filter(p => p.place.toUpperCase() !== 'DMG'); + } else if (category === 'nonExh') { + filteredPumpkins = allPumpkins.filter(p => p.place.toUpperCase() !== 'EXH'); } - }); - - console.log(`Total pumpkins: ${pumpkinsSnapshot.size}`); - console.log(`Valid pumpkins: ${pumpkins.length}`); + + filteredPumpkins.sort((a, b) => b.weight - a.weight); + + filteredPumpkins.forEach((pumpkin, index) => { + if (!rankings[pumpkin.id]) { + rankings[pumpkin.id] = { + lifetimeGlobalRank: null, + yearGlobalRank: null, + lifetimeGlobalRanks: {}, + yearlyGlobalRanks: {} + }; + } + rankings[pumpkin.id].lifetimeGlobalRanks[category] = index + 1; + + // Set the original lifetimeGlobalRank (backward compatibility) + if (category === 'official') { + rankings[pumpkin.id].lifetimeGlobalRank = index + 1; + } + }); - // Sort pumpkins by weight in descending order - pumpkins.sort((a, b) => b.weight - a.weight); + // Calculate yearly rankings + const pumpkinsByYear = {}; + filteredPumpkins.forEach(pumpkin => { + if (!pumpkinsByYear[pumpkin.year]) { + pumpkinsByYear[pumpkin.year] = []; + } + pumpkinsByYear[pumpkin.year].push(pumpkin); + }); - // Sort yearly pumpkins outside the loop - yearlyPumpkins.forEach(yearPumpkins => { - yearPumpkins.sort((a, b) => b.weight - a.weight); + Object.keys(pumpkinsByYear).forEach(year => { + pumpkinsByYear[year].sort((a, b) => b.weight - a.weight); + pumpkinsByYear[year].forEach((pumpkin, index) => { + if (!rankings[pumpkin.id].yearlyGlobalRanks[pumpkin.year]) { + rankings[pumpkin.id].yearlyGlobalRanks[pumpkin.year] = {}; + } + rankings[pumpkin.id].yearlyGlobalRanks[pumpkin.year][category] = index + 1; + + // Set the original yearGlobalRank (backward compatibility) + if (category === 'official') { + rankings[pumpkin.id].yearGlobalRank = index + 1; + } + }); + }); }); - // Begin a Firestore batch - let batch = db.batch(); - - // Counter to keep track of how many operations are in the batch + // Update pumpkins with new rankings + const updateBatchSize = 500; let batchCounter = 0; + let batch = db.batch(); - // Assign rank and update each pumpkin in Firestore - for (let i = 0; i < pumpkins.length; i++) { - const pumpkin = pumpkins[i]; - pumpkin.lifetimeGlobalRank = i + 1; - - // Assign yearly rank - const yearlyRank = yearlyPumpkins.get(pumpkin.year).findIndex(p => p.id === pumpkin.id); - if (yearlyRank !== -1) { - pumpkin.yearGlobalRank = yearlyRank + 1; - } - - // Add update operation to the batch - if (typeof pumpkin.id === 'string' && pumpkin.id !== '') { - const docRef = db.collection('Stats_Pumpkins').doc(pumpkin.id); - batch.update(docRef, pumpkin); - batchCounter++; - } else { - console.error('Invalid pumpkin id:', pumpkin.id); - } + for (const pumpkinId in rankings) { + const docRef = pumpkinsCollection.doc(pumpkinId); + batch.update(docRef, rankings[pumpkinId]); + batchCounter++; - // If the batch has reached the maximum size (500), commit it and start a new one - if (batchCounter === 500) { + if (batchCounter === updateBatchSize) { await batch.commit(); - batch = db.batch(); + batch = db.batch(); // Create a new batch batchCounter = 0; } } - // Commit any remaining operations in the batch if (batchCounter > 0) { await batch.commit(); } + console.log('Global rankings calculation completed.'); } catch (err) { - console.error('Error getting pumpkins:', err); + console.error('Error calculating global rankings:', err); + throw err; } } // HTTP function to manually trigger the calculation of Worldwide Weigh-off Rankings -exports.calculateGlobalRankings = functions.https.onRequest(async (req, res) => { - await calculateGlobalRankings(); - res.send('Global rankings calculation completed.'); +exports.calculateGlobalRankings = functions.runWith({ + timeoutSeconds: 300, + memory: '1GB' +}).https.onRequest(async (req, res) => { + await calculateGlobalRankings(); + res.send('Global rankings calculation completed.'); }); @@ -441,66 +472,111 @@ exports.calculateGlobalRankings = functions.https.onRequest(async (req, res) => async function calculateStateRankings() { const db = admin.firestore(); const pumpkinsCollection = db.collection('Stats_Pumpkins'); + const categories = ['overall', 'official', 'nonDmg', 'nonExh']; try { - const pumpkinsSnapshot = await pumpkinsCollection.get(); - - if (pumpkinsSnapshot.empty) { - console.log('No matching pumpkins.'); - return; - } - - const statePumpkins = {}; - const yearlyStatePumpkins = {}; - - for (const doc of pumpkinsSnapshot.docs) { - const pumpkin = doc.data(); - if (pumpkin.place === 'DMG') continue; + const batchSize = 1000; + let lastDoc = null; + let allPumpkins = []; + + // Fetch all pumpkins + while (true) { + let query = pumpkinsCollection.orderBy('weight', 'desc').limit(batchSize); + if (lastDoc) { + query = query.startAfter(lastDoc); + } - const state = pumpkin.state; + const pumpkinsSnapshot = await query.get(); + if (pumpkinsSnapshot.empty) break; - if (!statePumpkins[state]) statePumpkins[state] = []; - statePumpkins[state].push(pumpkin); + const pumpkins = pumpkinsSnapshot.docs.map(doc => ({...doc.data(), id: doc.id})); + allPumpkins = allPumpkins.concat(pumpkins); + lastDoc = pumpkinsSnapshot.docs[pumpkinsSnapshot.docs.length - 1]; - if (!yearlyStatePumpkins[state]) yearlyStatePumpkins[state] = {}; - if (!yearlyStatePumpkins[state][pumpkin.year]) yearlyStatePumpkins[state][pumpkin.year] = []; - yearlyStatePumpkins[state][pumpkin.year].push(pumpkin); + console.log(`Fetched ${allPumpkins.length} pumpkins`); } - // Sort state pumpkins outside the loop - for (const state in statePumpkins) { - statePumpkins[state].sort((a, b) => b.weight - a.weight); - for (const year in yearlyStatePumpkins[state]) { - yearlyStatePumpkins[state][year].sort((a, b) => b.weight - a.weight); + // Calculate rankings for each category and state + const rankings = {}; + categories.forEach(category => { + const stateRankings = {}; + let filteredPumpkins = allPumpkins; + if (category === 'official') { + filteredPumpkins = allPumpkins.filter(p => p.place.toUpperCase() !== 'DMG' && p.place.toUpperCase() !== 'EXH'); + } else if (category === 'nonDmg') { + filteredPumpkins = allPumpkins.filter(p => p.place.toUpperCase() !== 'DMG'); + } else if (category === 'nonExh') { + filteredPumpkins = allPumpkins.filter(p => p.place.toUpperCase() !== 'EXH'); } - } + + filteredPumpkins.forEach(pumpkin => { + if (!stateRankings[pumpkin.state]) { + stateRankings[pumpkin.state] = []; + } + stateRankings[pumpkin.state].push(pumpkin); + }); + + Object.keys(stateRankings).forEach(state => { + stateRankings[state].sort((a, b) => b.weight - a.weight); + stateRankings[state].forEach((pumpkin, index) => { + if (!rankings[pumpkin.id]) { + rankings[pumpkin.id] = { + lifetimeStateRank: null, + yearlyStateRank: null, + lifetimeStateRanks: {}, + yearlyStateRanks: {} + }; + } + rankings[pumpkin.id].lifetimeStateRanks[category] = index + 1; + + // Set the original lifetimeStateRank (backward compatibility) + if (category === 'official') { + rankings[pumpkin.id].lifetimeStateRank = index + 1; + } + }); + }); - let batch = db.batch(); - let batchCounter = 0; + // Calculate yearly rankings + Object.keys(stateRankings).forEach(state => { + const pumpkinsByYear = {}; + stateRankings[state].forEach(pumpkin => { + if (!pumpkinsByYear[pumpkin.year]) { + pumpkinsByYear[pumpkin.year] = []; + } + pumpkinsByYear[pumpkin.year].push(pumpkin); + }); - for (const state in statePumpkins) { - for (let i = 0; i < statePumpkins[state].length; i++) { - const pumpkin = statePumpkins[state][i]; - pumpkin.lifetimeStateRank = i + 1; + Object.keys(pumpkinsByYear).forEach(year => { + pumpkinsByYear[year].sort((a, b) => b.weight - a.weight); + pumpkinsByYear[year].forEach((pumpkin, index) => { + if (!rankings[pumpkin.id].yearlyStateRanks[pumpkin.year]) { + rankings[pumpkin.id].yearlyStateRanks[pumpkin.year] = {}; + } + rankings[pumpkin.id].yearlyStateRanks[pumpkin.year][category] = index + 1; + + // Set the original yearlyStateRank (backward compatibility) + if (category === 'official') { + rankings[pumpkin.id].yearlyStateRank = index + 1; + } + }); + }); + }); + }); - const yearlyRank = yearlyStatePumpkins[state][pumpkin.year].findIndex(p => p.id === pumpkin.id); - if (yearlyRank !== -1) { - pumpkin.yearlyStateRank = yearlyRank + 1; - } + // Update pumpkins with new rankings + const updateBatchSize = 500; + let batchCounter = 0; + let batch = db.batch(); - if (typeof pumpkin.id === 'string' && pumpkin.id !== '') { - const docRef = pumpkinsCollection.doc(pumpkin.id); - batch.update(docRef, { lifetimeStateRank: pumpkin.lifetimeStateRank, yearlyStateRank: pumpkin.yearlyStateRank }); - batchCounter++; - } else { - console.error('Invalid pumpkin id:', pumpkin.id); - } + for (const pumpkinId in rankings) { + const docRef = pumpkinsCollection.doc(pumpkinId); + batch.update(docRef, rankings[pumpkinId]); + batchCounter++; - if (batchCounter === 500) { - await batch.commit(); - batch = db.batch(); - batchCounter = 0; - } + if (batchCounter === updateBatchSize) { + await batch.commit(); + batch = db.batch(); // Create a new batch + batchCounter = 0; } } @@ -508,6 +584,7 @@ async function calculateStateRankings() { await batch.commit(); } + console.log('State rankings calculation completed.'); } catch (err) { console.error('Error calculating state rankings:', err); throw err; @@ -515,80 +592,140 @@ async function calculateStateRankings() { } // HTTP function to manually trigger the calculation of state rankings -exports.calculateStateRankings = functions.https.onRequest(async (req, res) => { - try { - await calculateStateRankings(); - res.send('State rankings calculation completed.'); - } catch (err) { - res.status(500).send('Error calculating state rankings: ' + err.toString()); - } +exports.calculateStateRankings = functions.runWith({ + timeoutSeconds: 300, + memory: '1GB' +}).https.onRequest(async (req, res) => { + try { + await calculateStateRankings(); + res.send('State rankings calculation completed.'); + } catch (err) { + res.status(500).send('Error calculating state rankings: ' + err.toString()); + } }); // Country Ranking (Lifetime and Yearly) +exports.calculateCountryRankings = functions.runWith({ + timeoutSeconds: 300, + memory: '1GB' +}).https.onRequest(async (req, res) => { + try { + await calculateCountryRankings(); + res.send('Country rankings calculation completed.'); + } catch (err) { + res.status(500).send('Error calculating country rankings: ' + err.toString()); + } +}); + async function calculateCountryRankings() { const db = admin.firestore(); const pumpkinsCollection = db.collection('Stats_Pumpkins'); + const categories = ['overall', 'official', 'nonDmg', 'nonExh']; try { - const pumpkinsSnapshot = await pumpkinsCollection.get(); - - if (pumpkinsSnapshot.empty) { - console.log('No matching pumpkins.'); - return; - } - - const countryPumpkins = {}; - const yearlyCountryPumpkins = {}; - - for (const doc of pumpkinsSnapshot.docs) { - const pumpkin = doc.data(); - if (pumpkin.place === 'DMG') continue; + const batchSize = 1000; + let lastDoc = null; + let allPumpkins = []; + + // Fetch all pumpkins + while (true) { + let query = pumpkinsCollection.orderBy('weight', 'desc').limit(batchSize); + if (lastDoc) { + query = query.startAfter(lastDoc); + } - const country = pumpkin.country; + const pumpkinsSnapshot = await query.get(); + if (pumpkinsSnapshot.empty) break; - if (!countryPumpkins[country]) countryPumpkins[country] = []; - countryPumpkins[country].push(pumpkin); + const pumpkins = pumpkinsSnapshot.docs.map(doc => ({...doc.data(), id: doc.id})); + allPumpkins = allPumpkins.concat(pumpkins); + lastDoc = pumpkinsSnapshot.docs[pumpkinsSnapshot.docs.length - 1]; - if (!yearlyCountryPumpkins[country]) yearlyCountryPumpkins[country] = {}; - if (!yearlyCountryPumpkins[country][pumpkin.year]) yearlyCountryPumpkins[country][pumpkin.year] = []; - yearlyCountryPumpkins[country][pumpkin.year].push(pumpkin); + console.log(`Fetched ${allPumpkins.length} pumpkins`); } - // Sort country pumpkins outside the loop - for (const country in countryPumpkins) { - countryPumpkins[country].sort((a, b) => b.weight - a.weight); - for (const year in yearlyCountryPumpkins[country]) { - yearlyCountryPumpkins[country][year].sort((a, b) => b.weight - a.weight); + // Calculate rankings for each category and country + const rankings = {}; + categories.forEach(category => { + const countryRankings = {}; + let filteredPumpkins = allPumpkins; + if (category === 'official') { + filteredPumpkins = allPumpkins.filter(p => p.place.toUpperCase() !== 'DMG' && p.place.toUpperCase() !== 'EXH'); + } else if (category === 'nonDmg') { + filteredPumpkins = allPumpkins.filter(p => p.place.toUpperCase() !== 'DMG'); + } else if (category === 'nonExh') { + filteredPumpkins = allPumpkins.filter(p => p.place.toUpperCase() !== 'EXH'); } - } + + filteredPumpkins.forEach(pumpkin => { + if (!countryRankings[pumpkin.country]) { + countryRankings[pumpkin.country] = []; + } + countryRankings[pumpkin.country].push(pumpkin); + }); + + Object.keys(countryRankings).forEach(country => { + countryRankings[country].sort((a, b) => b.weight - a.weight); + countryRankings[country].forEach((pumpkin, index) => { + if (!rankings[pumpkin.id]) { + rankings[pumpkin.id] = { + lifetimeCountryRank: null, + yearlyCountryRank: null, + lifetimeCountryRanks: {}, + yearlyCountryRanks: {} + }; + } + rankings[pumpkin.id].lifetimeCountryRanks[category] = index + 1; + + // Set the original lifetimeCountryRank (backward compatibility) + if (category === 'official') { + rankings[pumpkin.id].lifetimeCountryRank = index + 1; + } + }); + }); - let batch = db.batch(); - let batchCounter = 0; + // Calculate yearly rankings + Object.keys(countryRankings).forEach(country => { + const pumpkinsByYear = {}; + countryRankings[country].forEach(pumpkin => { + if (!pumpkinsByYear[pumpkin.year]) { + pumpkinsByYear[pumpkin.year] = []; + } + pumpkinsByYear[pumpkin.year].push(pumpkin); + }); - for (const country in countryPumpkins) { - for (let i = 0; i < countryPumpkins[country].length; i++) { - const pumpkin = countryPumpkins[country][i]; - pumpkin.lifetimeCountryRank = i + 1; + Object.keys(pumpkinsByYear).forEach(year => { + pumpkinsByYear[year].sort((a, b) => b.weight - a.weight); + pumpkinsByYear[year].forEach((pumpkin, index) => { + if (!rankings[pumpkin.id].yearlyCountryRanks[pumpkin.year]) { + rankings[pumpkin.id].yearlyCountryRanks[pumpkin.year] = {}; + } + rankings[pumpkin.id].yearlyCountryRanks[pumpkin.year][category] = index + 1; + + // Set the original yearlyCountryRank (backward compatibility) + if (category === 'official') { + rankings[pumpkin.id].yearlyCountryRank = index + 1; + } + }); + }); + }); + }); - const yearlyRank = yearlyCountryPumpkins[country][pumpkin.year].findIndex(p => p.id === pumpkin.id); - if (yearlyRank !== -1) { - pumpkin.yearlyCountryRank = yearlyRank + 1; - } + // Update pumpkins with new rankings + const updateBatchSize = 500; + let batchCounter = 0; + let batch = db.batch(); - if (typeof pumpkin.id === 'string' && pumpkin.id !== '') { - const docRef = pumpkinsCollection.doc(pumpkin.id); - batch.update(docRef, { lifetimeCountryRank: pumpkin.lifetimeCountryRank, yearlyCountryRank: pumpkin.yearlyCountryRank }); - batchCounter++; - } else { - console.error('Invalid pumpkin id:', pumpkin.id); - } + for (const pumpkinId in rankings) { + const docRef = pumpkinsCollection.doc(pumpkinId); + batch.update(docRef, rankings[pumpkinId]); + batchCounter++; - if (batchCounter === 500) { - await batch.commit(); - batch = db.batch(); - batchCounter = 0; - } + if (batchCounter === updateBatchSize) { + await batch.commit(); + batch = db.batch(); // Create a new batch + batchCounter = 0; } } @@ -596,27 +733,19 @@ async function calculateCountryRankings() { await batch.commit(); } + console.log('Country rankings calculation completed.'); } catch (err) { console.error('Error calculating country rankings:', err); throw err; } } -// HTTP function to manually trigger the calculation of country rankings -exports.calculateCountryRankings = functions.https.onRequest(async (req, res) => { - try { - await calculateCountryRankings(); - res.send('Country rankings calculation completed.'); - } catch (err) { - res.status(500).send('Error calculating country rankings: ' + err.toString()); - } -}); - // Lifetime Best Rank async function calculateLifetimeBestRank() { const db = admin.firestore(); const growersCollection = db.collection('Stats_Growers'); const pumpkinsCollection = db.collection('Stats_Pumpkins'); + const categories = ['overall', 'official', 'nonDmg', 'nonExh']; try { const growersSnapshot = await growersCollection.get(); @@ -633,11 +762,27 @@ async function calculateLifetimeBestRank() { const pumpkinsSnapshot = await pumpkinsCollection.get(); pumpkinsSnapshot.forEach(doc => { const pumpkin = doc.data(); - if (pumpkin.place === 'DMG') return; - const growerId = pumpkin.grower; - if (!growerRankings[growerId]) growerRankings[growerId] = []; - growerRankings[growerId].push(pumpkin.yearGlobalRank); + + if (!growerRankings[growerId]) { + growerRankings[growerId] = { + overall: [], + official: [], + nonDmg: [], + nonExh: [] + }; + } + + growerRankings[growerId].overall.push(pumpkin.yearlyGlobalRanks?.[pumpkin.year]?.overall); + if (pumpkin.place.toUpperCase() !== 'DMG' && pumpkin.place.toUpperCase() !== 'EXH') { + growerRankings[growerId].official.push(pumpkin.yearlyGlobalRanks?.[pumpkin.year]?.official); + } + if (pumpkin.place.toUpperCase() !== 'DMG') { + growerRankings[growerId].nonDmg.push(pumpkin.yearlyGlobalRanks?.[pumpkin.year]?.nonDmg); + } + if (pumpkin.place.toUpperCase() !== 'EXH') { + growerRankings[growerId].nonExh.push(pumpkin.yearlyGlobalRanks?.[pumpkin.year]?.nonExh); + } }); let batch = db.batch(); @@ -645,16 +790,27 @@ async function calculateLifetimeBestRank() { for (const doc of growersSnapshot.docs) { const grower = doc.data(); - const rankings = growerRankings[grower.id] || []; + const rankings = growerRankings[grower.id] || { + overall: [], + official: [], + nonDmg: [], + nonExh: [] + }; + + const bestRanks = {}; + categories.forEach(category => { + const validRankings = rankings[category].filter(rank => rank !== undefined && rank !== null); + bestRanks[category] = validRankings.length > 0 ? Math.min(...validRankings) : null; + }); - if (rankings.length > 0) { - grower.bestRank = Math.min(...rankings); - } else { - grower.bestRank = null; // or some other value indicating no pumpkins - } + const updateData = { + bestRanks: bestRanks, + // Maintain backward compatibility + bestRank: bestRanks.official + }; const docRef = growersCollection.doc(grower.id); - batch.update(docRef, { bestRank: grower.bestRank }); + batch.update(docRef, updateData); batchCounter++; if (batchCounter === 500) { @@ -668,18 +824,26 @@ async function calculateLifetimeBestRank() { await batch.commit(); } + console.log('Lifetime Best Rank calculation completed.'); } catch (err) { console.error('Error calculating lifetime best rank:', err); + throw err; } } // HTTP function to manually trigger the calculation of Lifetime Best Rank -exports.calculateLifetimeBestRank = functions.https.onRequest(async (req, res) => { - await calculateLifetimeBestRank(); - res.send('Lifetime Best Rank calculation completed.'); +exports.calculateLifetimeBestRank = functions.runWith({ + timeoutSeconds: 300, + memory: '1GB' +}).https.onRequest(async (req, res) => { + try { + await calculateLifetimeBestRank(); + res.send('Lifetime Best Rank calculation completed.'); + } catch (err) { + res.status(500).send('Error calculating lifetime best rank: ' + err.toString()); + } }); - // Contest Popularity Ranking (Lifetime and Yearly) async function calculateContestPopularityRanking() { const db = admin.firestore(); @@ -694,66 +858,88 @@ async function calculateContestPopularityRanking() { return; } - // Begin a Firestore batch - let batch = db.batch(); - let batchCounter = 0; - // Initialize contest popularity counts const contestPopularity = {}; contestsSnapshot.forEach(doc => { const contestId = doc.id; - contestPopularity[contestId] = { yearly: 0, lifetime: 0 }; + contestPopularity[contestId] = { + LifetimePopularity: 0, + YearPopularity: {} + }; }); - // Query all pumpkins - const pumpkinsSnapshot = await pumpkinsCollection.get(); + // Query all official pumpkins + const pumpkinsSnapshot = await pumpkinsCollection + .where('place', 'not-in', ['DMG', 'EXH']) + .get(); + pumpkinsSnapshot.forEach(doc => { const pumpkin = doc.data(); const contestId = pumpkin.contest; const contestName = pumpkin.contestName; + const year = pumpkin.year; if (contestPopularity[contestId]) { - contestPopularity[contestId].yearly += 1; + contestPopularity[contestId].LifetimePopularity++; + if (!contestPopularity[contestId].YearPopularity[year]) { + contestPopularity[contestId].YearPopularity[year] = 0; + } + contestPopularity[contestId].YearPopularity[year]++; } // Increment lifetime popularity for all contests with matching name for (const contest of contestsSnapshot.docs) { if (contest.data().name === contestName) { - contestPopularity[contest.id].lifetime += 1; + contestPopularity[contest.id].LifetimePopularity++; } } }); - // Update Firestore document + // Update Firestore documents + const updateBatchSize = 500; + let batchCounter = 0; + let batch = db.batch(); + for (const contestId in contestPopularity) { const docRef = contestsCollection.doc(contestId); - const yearPopularity = contestPopularity[contestId].yearly; - const lifetimePopularity = contestPopularity[contestId].lifetime; - batch.update(docRef, { LifetimePopularity: lifetimePopularity, YearPopularity: yearPopularity }); + const updateData = { + LifetimePopularity: contestPopularity[contestId].LifetimePopularity, + YearPopularity: Object.values(contestPopularity[contestId].YearPopularity).reduce((a, b) => a + b, 0), + YearlyPopularity: contestPopularity[contestId].YearPopularity + }; + + batch.update(docRef, updateData); batchCounter++; - // If the batch has reached the maximum size (500), commit it and start a new one - if (batchCounter === 500) { + if (batchCounter === updateBatchSize) { await batch.commit(); batch = db.batch(); batchCounter = 0; } } - // Commit any remaining operations in the batch if (batchCounter > 0) { await batch.commit(); } + console.log('Contest Popularity Ranking calculation completed.'); } catch (err) { console.error('Error calculating contest popularity ranking:', err); + throw err; } } // HTTP function to manually trigger the calculation of Contest Popularity Ranking -exports.calculateContestPopularityRanking = functions.https.onRequest(async (req, res) => { - await calculateContestPopularityRanking(); - res.send('Contest Popularity Ranking calculation completed.'); +exports.calculateContestPopularityRanking = functions.runWith({ + timeoutSeconds: 300, + memory: '1GB' +}).https.onRequest(async (req, res) => { + try { + await calculateContestPopularityRanking(); + res.send('Contest Popularity Ranking calculation completed.'); + } catch (err) { + res.status(500).send('Error calculating contest popularity ranking: ' + err.toString()); + } }); @@ -771,58 +957,67 @@ async function calculateSiteRecords() { return; } - // Begin a Firestore batch - let batch = db.batch(); - let batchCounter = 0; - - // Query all pumpkins - const pumpkinsSnapshot = await pumpkinsCollection.get(); - // Create a map to store the record weight for each contest const contestRecords = {}; contestsSnapshot.forEach(doc => { contestRecords[doc.id] = 0; }); + // Query all official pumpkins (excluding DMG and EXH) + const pumpkinsSnapshot = await pumpkinsCollection + .where('place', 'not-in', ['DMG', 'EXH']) + .get(); + // Process pumpkins and update record weights pumpkinsSnapshot.forEach(doc => { const pumpkin = doc.data(); const contestId = pumpkin.contest; - if (pumpkin.place !== 'DMG' && pumpkin.weight > contestRecords[contestId]) { + if (pumpkin.weight > contestRecords[contestId]) { contestRecords[contestId] = pumpkin.weight; } }); // Update Firestore documents with record weights + const updateBatchSize = 500; + let batchCounter = 0; + let batch = db.batch(); + for (const contestId in contestRecords) { const docRef = contestsCollection.doc(contestId); const recordWeight = contestRecords[contestId]; batch.update(docRef, { recordWeight }); batchCounter++; - // If the batch has reached the maximum size (500), commit it and start a new one - if (batchCounter === 500) { + if (batchCounter === updateBatchSize) { await batch.commit(); batch = db.batch(); batchCounter = 0; } } - // Commit any remaining operations in the batch if (batchCounter > 0) { await batch.commit(); } + console.log('Site records calculation completed.'); } catch (err) { console.error('Error calculating site records:', err); + throw err; } } // HTTP function to manually trigger the calculation of site records -exports.calculateSiteRecords = functions.https.onRequest(async (req, res) => { - await calculateSiteRecords(); - res.send('Site records calculation completed.'); +exports.calculateSiteRecords = functions.runWith({ + timeoutSeconds: 300, + memory: '1GB' +}).https.onRequest(async (req, res) => { + try { + await calculateSiteRecords(); + res.send('Site records calculation completed.'); + } catch (err) { + res.status(500).send('Error calculating site records: ' + err.toString()); + } }); @@ -845,12 +1040,10 @@ async function calculateGrowerMetrics() { const pumpkinsByGrower = {}; pumpkinsSnapshot.forEach(doc => { const pumpkin = doc.data(); - if (pumpkin.place !== 'DMG') { // Exclude disqualified pumpkins - if (!pumpkinsByGrower[pumpkin.grower]) { - pumpkinsByGrower[pumpkin.grower] = []; - } - pumpkinsByGrower[pumpkin.grower].push(pumpkin); + if (!pumpkinsByGrower[pumpkin.grower]) { + pumpkinsByGrower[pumpkin.grower] = []; } + pumpkinsByGrower[pumpkin.grower].push(pumpkin); }); let batch = db.batch(); @@ -860,15 +1053,16 @@ async function calculateGrowerMetrics() { const grower = doc.data(); const pumpkins = pumpkinsByGrower[grower.id] || []; - // LifetimeMaxWeight - if (pumpkins.length > 0) { - const maxWeight = Math.max(...pumpkins.map(pumpkin => pumpkin.weight)); + // LifetimeMaxWeight (excluding DMG and EXH) + const officialPumpkins = pumpkins.filter(p => p.place !== 'DMG' && p.place !== 'EXH'); + if (officialPumpkins.length > 0) { + const maxWeight = Math.max(...officialPumpkins.map(pumpkin => pumpkin.weight)); grower.LifetimeMaxWeight = maxWeight; } else { - grower.LifetimeMaxWeight = null; // or some other value indicating no pumpkins + grower.LifetimeMaxWeight = null; } - // NumberOfEntries + // NumberOfEntries (including all entries) grower.NumberOfEntries = pumpkins.length; const docRef = growersCollection.doc(grower.id); @@ -886,15 +1080,24 @@ async function calculateGrowerMetrics() { await batch.commit(); } + console.log('Grower metrics calculation completed.'); } catch (err) { console.error('Error calculating grower metrics:', err); + throw err; } } // HTTP function to manually trigger the calculation of grower metrics -exports.calculateGrowerMetrics = functions.https.onRequest(async (req, res) => { - await calculateGrowerMetrics(); - res.send('Grower metrics calculation completed.'); +exports.calculateGrowerMetrics = functions.runWith({ + timeoutSeconds: 300, + memory: '1GB' +}).https.onRequest(async (req, res) => { + try { + await calculateGrowerMetrics(); + res.send('Grower metrics calculation completed.'); + } catch (err) { + res.status(500).send('Error calculating grower metrics: ' + err.toString()); + } }); @@ -917,12 +1120,10 @@ async function calculateGrowerRankings() { const pumpkinsByGrower = {}; pumpkinsSnapshot.forEach(doc => { const pumpkin = doc.data(); - if (pumpkin.place !== 'DMG') { // Exclude disqualified pumpkins - if (!pumpkinsByGrower[pumpkin.grower]) { - pumpkinsByGrower[pumpkin.grower] = []; - } - pumpkinsByGrower[pumpkin.grower].push(pumpkin); + if (!pumpkinsByGrower[pumpkin.grower]) { + pumpkinsByGrower[pumpkin.grower] = []; } + pumpkinsByGrower[pumpkin.grower].push(pumpkin); }); let batch = db.batch(); @@ -933,17 +1134,22 @@ async function calculateGrowerRankings() { const pumpkins = pumpkinsByGrower[grower.id] || []; if (pumpkins.length > 0) { - const minGlobalRank = Math.min(...pumpkins.map(p => p.lifetimeGlobalRank)); - const minCountryRank = Math.min(...pumpkins.map(p => p.lifetimeCountryRank)); - const minStateRank = Math.min(...pumpkins.map(p => p.lifetimeStateRank)); + // Filter out DMG and EXH pumpkins + const officialPumpkins = pumpkins.filter(p => p.place.toUpperCase() !== 'DMG' && p.place.toUpperCase() !== 'EXH'); - const globalRanking = `Global: #${minGlobalRank}`; - const countryRanking = `${pumpkins[0].country}: #${minCountryRank}`; - const stateRanking = `${pumpkins[0].state}: #${minStateRank}`; + if (officialPumpkins.length > 0) { + const minGlobalRank = Math.min(...officialPumpkins.map(p => p.lifetimeGlobalRank || Infinity)); + const minCountryRank = Math.min(...officialPumpkins.map(p => p.lifetimeCountryRank || Infinity)); + const minStateRank = Math.min(...officialPumpkins.map(p => p.lifetimeStateRank || Infinity)); - const docRef = growersCollection.doc(grower.id); - batch.update(docRef, { globalRanking, countryRanking, stateRanking }); - batchCounter++; + const globalRanking = `Global: #${minGlobalRank}`; + const countryRanking = `${officialPumpkins[0].country}: #${minCountryRank}`; + const stateRanking = `${officialPumpkins[0].state}: #${minStateRank}`; + + const docRef = growersCollection.doc(grower.id); + batch.update(docRef, { globalRanking, countryRanking, stateRanking }); + batchCounter++; + } } if (batchCounter === 500) { @@ -957,22 +1163,33 @@ async function calculateGrowerRankings() { await batch.commit(); } + console.log('Grower rankings calculation completed.'); } catch (err) { console.error('Error calculating grower rankings:', err); + throw err; } } // HTTP function to manually trigger the calculation of Grower Rankings -exports.calculateGrowerRankings = functions.https.onRequest(async (req, res) => { - await calculateGrowerRankings(); - res.send('Grower rankings calculation completed.'); +exports.calculateGrowerRankings = functions.runWith({ + timeoutSeconds: 300, + memory: '1GB' +}).https.onRequest(async (req, res) => { + try { + await calculateGrowerRankings(); + res.send('Grower rankings calculation completed.'); + } catch (err) { + res.status(500).send('Error calculating grower rankings: ' + err.toString()); + } }); + // Calculate Site Stats async function calculateSiteStats() { const db = admin.firestore(); const contestsCollection = db.collection('Stats_Contests'); + const pumpkinsCollection = db.collection('Stats_Pumpkins'); const sitesCollection = db.collection('Stats_Sites'); try { @@ -985,6 +1202,22 @@ async function calculateSiteStats() { let siteStats = {}; + // Query all pumpkins to get entry counts + const pumpkinsSnapshot = await pumpkinsCollection.get(); + const entriesByContest = {}; + const officialEntriesByContest = {}; + pumpkinsSnapshot.forEach(doc => { + const pumpkin = doc.data(); + if (!entriesByContest[pumpkin.contest]) { + entriesByContest[pumpkin.contest] = 0; + officialEntriesByContest[pumpkin.contest] = 0; + } + entriesByContest[pumpkin.contest]++; + if (pumpkin.place !== 'DMG' && pumpkin.place !== 'EXH') { + officialEntriesByContest[pumpkin.contest]++; + } + }); + for (const doc of contestsSnapshot.docs) { const contestData = doc.data(); const siteName = contestData.name; @@ -994,15 +1227,25 @@ async function calculateSiteStats() { siteStats[siteName] = { 'Site Record': 0, 'Total Entries': 0, + 'Official Entries': 0, 'Popularity by Year': {}, 'Max Weight by Year': {} }; } - siteStats[siteName]['Site Record'] = Math.max(siteStats[siteName]['Site Record'], contestData.recordWeight); - siteStats[siteName]['Total Entries'] = Math.max(siteStats[siteName]['Total Entries'], contestData.LifetimePopularity); - siteStats[siteName]['Popularity by Year'][year] = contestData.YearPopularity; - siteStats[siteName]['Max Weight by Year'][year] = contestData.recordWeight; + // Update Site Record and Max Weight by Year only for official entries + if (officialEntriesByContest[doc.id]) { + siteStats[siteName]['Site Record'] = Math.max(siteStats[siteName]['Site Record'], contestData.recordWeight); + siteStats[siteName]['Max Weight by Year'][year] = Math.max(siteStats[siteName]['Max Weight by Year'][year] || 0, contestData.recordWeight); + } + + // Update Total Entries and Popularity by Year for all entries + const totalEntries = entriesByContest[doc.id] || 0; + siteStats[siteName]['Total Entries'] += totalEntries; + siteStats[siteName]['Popularity by Year'][year] = (siteStats[siteName]['Popularity by Year'][year] || 0) + totalEntries; + + // Update Official Entries count + siteStats[siteName]['Official Entries'] += officialEntriesByContest[doc.id] || 0; } let batch = db.batch(); @@ -1029,15 +1272,24 @@ async function calculateSiteStats() { await batch.commit(); } + console.log('Site stats calculation completed.'); } catch (err) { console.error('Error calculating site stats:', err); + throw err; } } // HTTP function to manually trigger the calculation of site stats -exports.calculateSiteStats = functions.https.onRequest(async (req, res) => { - await calculateSiteStats(); - res.send('Site stats calculation completed.'); +exports.calculateSiteStats = functions.runWith({ + timeoutSeconds: 300, + memory: '1GB' +}).https.onRequest(async (req, res) => { + try { + await calculateSiteStats(); + res.send('Site stats calculation completed.'); + } catch (err) { + res.status(500).send('Error calculating site stats: ' + err.toString()); + } }); diff --git a/functions/preprocessor-v2.py b/functions/preprocessor-v2.py new file mode 100644 index 0000000..de81ace --- /dev/null +++ b/functions/preprocessor-v2.py @@ -0,0 +1,184 @@ +# Import required libraries +import pandas as pd +from nameparser import HumanName +from fuzzywuzzy import process +from fuzzywuzzy import fuzz +from collections import Counter +import re +from tqdm import tqdm +import logging +from typing import Dict, List, Optional, Tuple +from pathlib import Path + +# Set up logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.FileHandler('functions/preprocessor.log'), + logging.StreamHandler() + ], + force=True +) +logger = logging.getLogger(__name__) + +# Force handlers to flush immediately +for handler in logger.handlers: + handler.flush() + +def load_data(file_path: str) -> pd.DataFrame: + """Safely load CSV data with error handling""" + try: + df = pd.read_csv(file_path) + required_columns = { + 'Grower Name', 'State/Prov', 'Place', + 'Weight (lbs)', 'Seed (Mother)', 'Pollinator (Father)', + 'OTT', 'Est. Weight', 'GPC Site', 'Year', + 'City', 'Country' + } + + if not required_columns.issubset(df.columns): + missing = required_columns - set(df.columns) + raise ValueError(f"Missing required columns: {missing}") + + logger.info(f"Successfully loaded {len(df)} records from {file_path}") + return df + except FileNotFoundError: + logger.error(f"File {file_path} not found") + raise + except pd.errors.EmptyDataError: + logger.error("File is empty") + raise + except Exception as e: + logger.error(f"Error loading file: {str(e)}") + raise + +def preprocess_name(name: str) -> str: + """Preprocesses a name by replacing special characters and cleaning up spaces.""" + name = str(name) # Ensure input is string + name = name.replace("&", "and").replace("/", " ").replace("-", " ").strip() + name = re.sub(r'\s+', ' ', name) # Replace multiple spaces with a single space + name = re.sub(r'\d', '', name) # Remove numbers + return name + +def parse_name(name: str) -> str: + """Parses a name into its components using the HumanName library.""" + human_name = HumanName(name) + if human_name.last == '': + return name.strip() + else: + return f'{human_name.last}, {human_name.first}'.strip() + +def handle_team_names(name: str) -> str: + """Handles team names to ensure that 'Team' is always at the beginning.""" + if "team" in name.lower(): + name = re.sub(r'\bteam\b', '', name, flags=re.I).strip() + name = re.sub(r'\bthe\b', '', name, flags=re.I).strip() + + if ',' in name: + name_parts = [part.strip() for part in name.split(',') if part.strip() != ''] + name = 'Team ' + ' '.join(name_parts) if name_parts else 'Team Unknown' + else: + name = f'Team {name}' + return name + return parse_name(name) + +def perform_fuzzy_matching(names: List[str], threshold: int = 80) -> Dict[str, List[str]]: + """Perform fuzzy matching with progress bar.""" + matches_dict = {} + total_comparisons = len(names) * (len(names) - 1) // 2 + + with tqdm(total=total_comparisons, desc="Performing fuzzy matching", disable=False) as pbar: + for i, name1 in enumerate(names): + remaining_names = names[i+1:] + scores = [fuzz.token_sort_ratio(name1, name2) for name2 in remaining_names] + matches = [name2 for name2, score in zip(remaining_names, scores) if score > threshold] + if matches: + matches_dict[name1] = matches + pbar.update(len(remaining_names)) + + return matches_dict + +def process_dataframe(df: pd.DataFrame) -> pd.DataFrame: + """Main processing function for the dataframe.""" + logger.info("Starting name preprocessing...") + + print("Starting preprocessing...", flush=True) + + # Create processed name column + with tqdm(total=3, desc="Processing names", disable=False) as pbar: + df['Processed Name'] = df['Grower Name'].apply(preprocess_name) + pbar.update(1) + + df['Processed Name'] = df['Processed Name'].apply(handle_team_names) + pbar.update(1) + + df['Processed Name'] = df['Processed Name'].str.title() + pbar.update(1) + + # Perform fuzzy matching by state/province + logger.info("Starting fuzzy matching process...") + fuzzy_matched_names = {} + state_provs = df['State/Prov'].unique() + + for state_prov in tqdm(state_provs, desc="Processing states/provinces"): + state_prov_df = df[df['State/Prov'] == state_prov] + processed_names = state_prov_df['Processed Name'].unique().tolist() + matches = perform_fuzzy_matching(processed_names, threshold=80) + fuzzy_matched_names.update(matches) + + # Cross-state matching with higher threshold + logger.info("Performing cross-state matching...") + all_processed_names = df['Processed Name'].unique().tolist() + all_matches = perform_fuzzy_matching(all_processed_names, threshold=90) + fuzzy_matched_names.update({k: v for k, v in all_matches.items() if k not in fuzzy_matched_names}) + + # Standardize names based on fuzzy matches + logger.info("Standardizing names...") + for most_common_name, matches in tqdm(fuzzy_matched_names.items(), desc="Standardizing names"): + df.loc[df['Processed Name'].isin(matches), 'Processed Name'] = most_common_name + + # Split names into components + logger.info("Splitting names into components...") + df[['Last Name', 'First Name']] = df['Processed Name'].apply( + lambda name: pd.Series([name, ""]) if "Team" in name else pd.Series(name.split(',', 1)) + ) + + # Clean up name columns + df['Last Name'] = df['Last Name'].str.strip().str.title() + df['First Name'] = df['First Name'].str.strip().str.title() + + # Add entry type + df['entryType'] = df['Place'].apply( + lambda x: 'dmg' if x == 'DMG' else ('exh' if x == 'EXH' else 'official') + ) + + return df + +def main(): + """Main execution function""" + try: + input_file = 'functions/bigpumpkins_2004_2024_2024-12-06.csv' + output_file = 'functions/preprocessed-bigpumpkins.csv' + + print(f"Starting preprocessing of {input_file}", flush=True) + logger.info(f"Starting preprocessing of {input_file}") + + # Load data + df = load_data(input_file) + + # Process the data + processed_df = process_dataframe(df) + + # Save results + processed_df.to_csv(output_file, index=False) + logger.info(f"Successfully saved preprocessed data to {output_file}") + print("Processing complete!", flush=True) + + except Exception as e: + logger.error(f"An error occurred during processing: {str(e)}") + print(f"Error: {str(e)}", flush=True) + raise + +if __name__ == "__main__": + main() diff --git a/functions/preprocessor.py b/functions/preprocessor.py index 8cefb3d..5ddf6a3 100644 --- a/functions/preprocessor.py +++ b/functions/preprocessor.py @@ -10,7 +10,7 @@ # Load the data print("Loading data...") -pumpkins_df = pd.read_csv('bigpumpkins_2004_2023_2023-10-11.csv') +pumpkins_df = pd.read_csv('functions/bigpumpkins_2004_2024_2024-12-06.csv') # Function to preprocess names print("Preprocessing names...") @@ -54,6 +54,15 @@ def handle_team_names(name): else: return parse_name(name) +# Function to determine entry type +def determine_entry_type(place): + if place == 'DMG': + return 'dmg' + elif place == 'EXH': + return 'exh' + else: + return 'official' + # Preprocess the names pumpkins_df['Processed Name'] = pumpkins_df['Grower Name'].apply(preprocess_name) @@ -140,6 +149,9 @@ def handle_team_names(name): pumpkins_df['Last Name'] = pumpkins_df['Last Name'].str.strip().str.title() # Convert to title case pumpkins_df['First Name'] = pumpkins_df['First Name'].str.strip().str.title() # Convert to title case +# Add 'entryType' column +pumpkins_df['entryType'] = pumpkins_df['Place'].apply(determine_entry_type) + # Save the modified dataframe to a CSV file print("Saving preprocessed data to CSV file...") pumpkins_df.to_csv('preprocessed-bigpumpkins.csv', index=False) diff --git a/functions/scrape_bigpumpkins.py b/functions/scrape_bigpumpkins.py index 08802b2..7a41a75 100644 --- a/functions/scrape_bigpumpkins.py +++ b/functions/scrape_bigpumpkins.py @@ -4,7 +4,7 @@ import datetime # List of years to scrape. Note: To include 2023, the range is set to 2024 because range() excludes the end value. -years = list(range(2005, 2024)) +years = list(range(2005, 2025)) # List to hold all the data all_data = [] @@ -47,4 +47,4 @@ all_data = pd.concat(all_data, ignore_index=True) # Save the dataframe to a CSV file with today's date -all_data.to_csv(f'bigpumpkins_2004_2023_{datetime.date.today()}.csv', index=False) \ No newline at end of file +all_data.to_csv(f'bigpumpkins_2004_2024_{datetime.date.today()}.csv', index=False) \ No newline at end of file diff --git a/functions/scrape_gpc_results.py b/functions/scrape_gpc_results.py new file mode 100644 index 0000000..bcb69e2 --- /dev/null +++ b/functions/scrape_gpc_results.py @@ -0,0 +1,273 @@ +import os +import logging +from datetime import datetime +import json +import asyncio +from typing import Dict, List, Any +from dotenv import load_dotenv +from supabase import create_client +from postgrest import AsyncPostgrestClient +from tqdm import tqdm +import requests +from bs4 import BeautifulSoup + +# Setup logging - only show WARNING and above for httpx +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.basicConfig(level=logging.INFO) + +async def test_supabase_connection(supabase) -> bool: + """Test Supabase connection and permissions.""" + try: + # Create schema if it doesn't exist + print("\nCreating raw_data schema if it doesn't exist...") + create_schema_query = "CREATE SCHEMA IF NOT EXISTS raw_data;" + supabase.rpc('execute_sql', {'query': create_schema_query}).execute() + print("Schema creation successful") + return True + + except Exception as e: + print(f"\nConnection test failed: {str(e)}") + print(f"Type of error: {type(e)}") + return False + +async def create_raw_table(supabase, category: str, year: int) -> bool: + """Create a properly structured raw data table for a specific category and year.""" + table_name = f"{category.lower()}_{year}" + + try: + # Drop the table if it exists (for testing) + drop_query = f"DROP TABLE IF EXISTS raw_data.{table_name};" + print(f"\nDropping table if exists: {table_name}") + supabase.rpc('execute_sql', {'query': drop_query}).execute() + + # Create new table + create_query = f""" + CREATE TABLE raw_data.{table_name} ( + id BIGSERIAL PRIMARY KEY, + place VARCHAR(10), + weight_lbs DECIMAL(10,2), + grower_name VARCHAR(255), + city VARCHAR(100), + state_prov VARCHAR(100), + country VARCHAR(100), + gpc_site VARCHAR(255), + seed_mother VARCHAR(255), + pollinator_father VARCHAR(255), + ott DECIMAL(10,1), + est_weight DECIMAL(10,2), + pct_chart DECIMAL(10,1), + created_at TIMESTAMPTZ DEFAULT NOW() + ); + """ + + print(f"Creating table: {table_name}") + supabase.rpc('execute_sql', {'query': create_query}).execute() + print(f"Table {table_name} created successfully") + return True + + except Exception as e: + logging.error(f"Error creating table {table_name}: {str(e)}") + return False + +async def insert_data(supabase, category: str, year: int, data: Dict[str, Any]) -> bool: + """Insert scraped data into structured table.""" + table_name = f"{category.lower()}_{year}" + + try: + print(f"\nInserting {len(data['data'])} records into {table_name}") + + # Build a single INSERT statement with multiple VALUES + values_list = [] + for row in data['data']: + # Clean up the weight value (remove commas) + weight = row['Weight (lbs)'].replace(',', '') + + values_list.append(f"""( + '{row['Place']}', + {weight}, + '{row['Grower Name'].replace("'", "''")}', + '{row['City'].replace("'", "''")}', + '{row['State/Prov'].replace("'", "''")}', + '{row['Country'].replace("'", "''")}', + '{row['GPC Site'].replace("'", "''")}', + '{row['Seed (Mother)'].replace("'", "''")}', + '{row['Pollinator (Father)'].replace("'", "''")}', + {row['OTT'] or 0}, + {row['Est. Weight'].replace(',', '') if row['Est. Weight'] else 0}, + {row['Pct. Chart'] or 0} + )""") + + # Create batches of 100 records each + batch_size = 100 + for i in range(0, len(values_list), batch_size): + batch = values_list[i:i + batch_size] + query = f""" + INSERT INTO raw_data.{table_name} ( + place, weight_lbs, grower_name, city, state_prov, + country, gpc_site, seed_mother, pollinator_father, + ott, est_weight, pct_chart + ) VALUES {','.join(batch)}; + """ + + supabase.rpc('execute_sql', {'query': query}).execute() + print(f"Processed {min(i + batch_size, len(values_list))}/{len(values_list)} records") + + print(f"Successfully inserted all {len(data['data'])} records into {table_name}") + return True + + except Exception as e: + logging.error(f"Error inserting data: {str(e)}") + print(f"Detailed error: {str(e)}") + return False + +async def scrape_data(category: str, year: int) -> Dict[str, Any]: + """Scrape data for a specific category and year.""" + try: + # URL of the page to scrape + url = f"http://www.bigpumpkins.com/WeighoffResultsGPC.aspx?c={category}&y={year}" + + # Send HTTP request + response = requests.get(url) + + # Parse HTML content + soup = BeautifulSoup(response.content, 'html.parser') + + # Find the table + table = soup.find('table') + if not table: + return None + + # Extract table headers + headers = [th.text for th in table.find_all('th')] + + # Extract table rows + rows = table.find_all('tr') + data = [] + for row in rows[1:]: # Skip header row + row_data = [td.text for td in row.find_all('td')] + if len(row_data) == len(headers): + data.append(dict(zip(headers, row_data))) + + if not data: + return None + + return { + "headers": headers, + "data": data, + "url": url, + "scraped_at": datetime.now().isoformat() + } + + except Exception as e: + logging.error(f"Error scraping {category} {year}: {str(e)}") + return None + +async def scrape_and_store(supabase, category: str, year: int) -> Dict[str, Any]: + """Scrape data for a category and year, and store it in Supabase.""" + result = { + "category": category, + "year": year, + "success": False, + "error": None, + "timestamp": datetime.now().isoformat() + } + + try: + # Create the table first + if not await create_raw_table(supabase, category, year): + result["error"] = "Failed to create table" + return result + + # Scrape the data + scraped_data = await scrape_data(category, year) + + if not scraped_data: + result["error"] = "No data found" + return result + + # Insert the scraped data + if await insert_data(supabase, category, year, scraped_data): + result["success"] = True + else: + result["error"] = "Failed to insert data" + + except Exception as e: + result["error"] = str(e) + + return result + +async def main(): + load_dotenv() + + supabase_url = os.getenv("SUPABASE_URL") + supabase_key = os.getenv("SUPABASE_KEY") + + if not supabase_url or not supabase_key: + logging.critical("Missing Supabase credentials") + return + + print(f"Using Supabase URL: {supabase_url}") + print(f"Using key starting with: {supabase_key[:6]}...") + + supabase = create_client(supabase_url, supabase_key) + + # Test connection first + print("\nTesting Supabase connection...") + if not await test_supabase_connection(supabase): + logging.critical("Failed to connect to Supabase or insufficient permissions") + return + + print("\nConnection and permissions verified. Starting scrape...") + + # Define categories and years + categories = ["P", "S", "L", "W", "T", "F", "B", "M"] + years = range(2005, 2025) + + # Create a list of all combinations + tasks = [(cat, year) for cat in categories for year in years] + + # Initialize results tracking + results = { + "start_time": datetime.now().isoformat(), + "successful_scrapes": [], + "failed_scrapes": [], + "empty_results": [] + } + + # Process all combinations with progress bar + with tqdm(total=len(tasks), desc="Scraping progress") as pbar: + for category_code, year in tasks: + result = await scrape_and_store(supabase, category_code, year) + + if result["success"]: + results["successful_scrapes"].append(result) + elif result["error"] == "No data found": + results["empty_results"].append(result) + else: + results["failed_scrapes"].append(result) + + pbar.update(1) + + # Add summary statistics + results["end_time"] = datetime.now().isoformat() + results["duration"] = str(datetime.fromisoformat(results["end_time"]) - + datetime.fromisoformat(results["start_time"])) + results["total_successful"] = len(results["successful_scrapes"]) + results["total_failed"] = len(results["failed_scrapes"]) + results["total_empty"] = len(results["empty_results"]) + + # Save detailed report + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + report_filename = f"scraping_report_{timestamp}.json" + with open(report_filename, 'w') as f: + json.dump(results, f, indent=2) + + # Print summary + print("\nScraping Summary:") + print(f"Successful scrapes: {results['total_successful']}") + print(f"Failed scrapes: {results['total_failed']}") + print(f"Empty results: {results['total_empty']}") + print(f"Detailed report saved to: {report_filename}") + +if __name__ == "__main__": + asyncio.run(main())