@@ -489,232 +489,3 @@ def _plot_wind_speed_comparison(self, df: pd.DataFrame, timestamp: str) -> None:
489
489
analyzer = WeatherDataAnalyzer ("./config/config.yaml" )
490
490
results = analyzer .analyze_data ()
491
491
print (f"Analysis complete with { len (results )} result categories" )
492
- EOL
493
- 7. Create the load module :
494
- bashCopycat > src / load .py << 'EOL'
495
- """
496
- Weather data loading module.
497
-
498
- Workflow:
499
- 1. Load processed data from CSV files in the processed data directory
500
- 2. Format the data for appropriate output destinations
501
- 3. Load data into different formats and destinations (JSON, CSV, SQLite)
502
- 4. Apply database optimizations like indexing for better query performance
503
- 5. Generate summary statistics for efficient reporting
504
- 6. Handle errors and ensure data integrity during the loading process
505
- 7. Return loading status and output file paths
506
- """
507
- import os
508
- import logging
509
- import pandas as pd
510
- import json
511
- from typing import Dict , List , Any
512
- import yaml
513
- from datetime import datetime
514
- import sqlite3
515
- from prometheus_client import Counter , Gauge , Histogram
516
-
517
- logger = logging .getLogger (__name__ )
518
-
519
- # Prometheus metrics
520
- LOAD_SUCCESSES = Counter ('weather_load_successes' , 'Number of successful data loads' )
521
- LOAD_FAILURES = Counter ('weather_load_failures' , 'Number of failed data loads' )
522
- RECORDS_LOADED = Gauge ('weather_records_loaded' , 'Number of records loaded' )
523
- LOAD_TIME = Histogram ('weather_load_processing_time' , 'Time to load data in seconds' )
524
-
525
- class WeatherDataLoader :
526
- """Class for loading processed weather data into different formats."""
527
-
528
- def __init__ (self , config_path : str ):
529
- """
530
- Initialize the loader with configuration.
531
-
532
- Args:
533
- config_path: Path to the configuration file.
534
- """
535
- self .config = self ._load_config (config_path )
536
- self .processed_data_path = self .config ['data' ]['processed_data_path' ]
537
- self .output_data_path = self .config ['data' ]['output_data_path' ]
538
-
539
- # Create output directory if it doesn't exist
540
- os .makedirs (self .output_data_path , exist_ok = True )
541
-
542
- def _load_config (self , config_path : str ) -> Dict :
543
- """Load configuration from YAML file."""
544
- with open (config_path , 'r' ) as file :
545
- return yaml .safe_load (file )
546
-
547
- def _get_latest_processed_file (self ) -> str :
548
- """
549
- Get the path to the most recent processed data file.
550
-
551
- Returns:
552
- Path to the most recent processed data file.
553
- """
554
- import glob
555
-
556
- files = glob .glob (os .path .join (self .processed_data_path , "*.csv" ))
557
- if not files :
558
- logger .warning ("No processed data files found" )
559
- return ""
560
-
561
- return max (files , key = os .path .getmtime )
562
-
563
- @LOAD_TIME .time ()
564
- def load_to_json (self ) -> str :
565
- """
566
- Load processed data to a JSON file.
567
-
568
- Returns:
569
- Path to the output JSON file, or empty string if failed.
570
- """
571
- try :
572
- latest_file = self ._get_latest_processed_file ()
573
- if not latest_file :
574
- logger .warning ("No processed data file found to load" )
575
- LOAD_FAILURES .inc ()
576
- return ""
577
-
578
- df = pd .read_csv (latest_file )
579
- RECORDS_LOADED .set (len (df ))
580
-
581
- # Convert data to a list of records
582
- records = df .to_dict (orient = 'records' )
583
-
584
- # Create output filename
585
- timestamp = datetime .now ().strftime ("%Y%m%d_%H%M%S" )
586
- output_file = os .path .join (self .output_data_path , f"weather_data_{ timestamp } .json" )
587
-
588
- # Write to JSON file
589
- with open (output_file , 'w' ) as f :
590
- json .dump (records , f , indent = 2 )
591
-
592
- logger .info (f"Successfully loaded data to JSON file: { output_file } " )
593
- LOAD_SUCCESSES .inc ()
594
- return output_file
595
-
596
- except Exception as e :
597
- logger .error (f"Error loading data to JSON: { str (e )} " )
598
- LOAD_FAILURES .inc ()
599
- return ""
600
-
601
- @LOAD_TIME .time ()
602
- def load_to_sqlite (self , db_path : str = None ) -> bool :
603
- """
604
- Load processed data to a SQLite database.
605
-
606
- Args:
607
- db_path: Path to the SQLite database file. If None, will create in output directory.
608
-
609
- Returns:
610
- True if successful, False otherwise.
611
- """
612
- try :
613
- latest_file = self ._get_latest_processed_file ()
614
- if not latest_file :
615
- logger .warning ("No processed data file found to load" )
616
- LOAD_FAILURES .inc ()
617
- return False
618
-
619
- if db_path is None :
620
- db_path = os .path .join (self .output_data_path , "weather_data.db" )
621
-
622
- df = pd .read_csv (latest_file )
623
- RECORDS_LOADED .set (len (df ))
624
-
625
- # Convert timestamp to proper datetime if it exists
626
- if 'timestamp' in df .columns :
627
- df ['timestamp' ] = pd .to_datetime (df ['timestamp' ])
628
-
629
- # Connect to SQLite database
630
- conn = sqlite3 .connect (db_path )
631
-
632
- # Write to database
633
- df .to_sql ('weather_data' , conn , if_exists = 'replace' , index = False )
634
-
635
- # Create indices for faster queries
636
- cursor = conn .cursor ()
637
- cursor .execute ('CREATE INDEX IF NOT EXISTS idx_city ON weather_data (city)' )
638
- cursor .execute ('CREATE INDEX IF NOT EXISTS idx_date ON weather_data (date)' )
639
- conn .commit ()
640
- conn .close ()
641
-
642
- logger .info (f"Successfully loaded data to SQLite database: { db_path } " )
643
- LOAD_SUCCESSES .inc ()
644
- return True
645
-
646
- except Exception as e :
647
- logger .error (f"Error loading data to SQLite: { str (e )} " )
648
- LOAD_FAILURES .inc ()
649
- return False
650
-
651
- @LOAD_TIME .time ()
652
- def load_to_csv (self , include_summary : bool = True ) -> str :
653
- """
654
- Load processed data to a formatted CSV file, optionally with summary statistics.
655
-
656
- Args:
657
- include_summary: Whether to include summary statistics in the output.
658
-
659
- Returns:
660
- Path to the output CSV file, or empty string if failed.
661
- """
662
- try :
663
- latest_file = self ._get_latest_processed_file ()
664
- if not latest_file :
665
- logger .warning ("No processed data file found to load" )
666
- LOAD_FAILURES .inc ()
667
- return ""
668
-
669
- df = pd .read_csv (latest_file )
670
- RECORDS_LOADED .set (len (df ))
671
-
672
- # Create output filename
673
- timestamp = datetime .now ().strftime ("%Y%m%d_%H%M%S" )
674
- output_file = os .path .join (self .output_data_path , f"weather_report_{ timestamp } .csv" )
675
-
676
- # Create a copy for output
677
- output_df = df .copy ()
678
-
679
- # If requested, add summary statistics
680
- if include_summary :
681
- # Calculate summary statistics by city
682
- city_summary = df .groupby ('city' ).agg ({
683
- 'temperature' : ['mean' , 'min' , 'max' ],
684
- 'humidity' : ['mean' , 'min' , 'max' ],
685
- 'wind_speed' : ['mean' , 'min' , 'max' ]
686
- }).round (2 )
687
-
688
- # Save summary to a separate file
689
- summary_file = os .path .join (self .output_data_path , f"weather_summary_{ timestamp } .csv" )
690
- city_summary .to_csv (summary_file )
691
- logger .info (f"Saved summary statistics to { summary_file } " )
692
-
693
- # Write to CSV file
694
- output_df .to_csv (output_file , index = False )
695
-
696
- logger .info (f"Successfully loaded data to CSV file: { output_file } " )
697
- LOAD_SUCCESSES .inc ()
698
- return output_file
699
-
700
- except Exception as e :
701
- logger .error (f"Error loading data to CSV: { str (e )} " )
702
- LOAD_FAILURES .inc ()
703
- return ""
704
-
705
- if __name__ == "__main__" :
706
- # Configure logging
707
- logging .basicConfig (
708
- level = logging .INFO ,
709
- format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
710
- )
711
-
712
- # Test the loader
713
- loader = WeatherDataLoader ("./config/config.yaml" )
714
- json_file = loader .load_to_json ()
715
- sqlite_result = loader .load_to_sqlite ()
716
- csv_file = loader .load_to_csv ()
717
-
718
- print (f"JSON Output: { json_file } " )
719
- print (f"SQLite Result: { sqlite_result } " )
720
- print (f"CSV Output: { csv_file } " )
0 commit comments