# ========================================== # daily_summary.py - 일일 발전 통계 집계 # ========================================== # solar_logs 데이터를 집계하여 daily_stats 테이블에 저장 from datetime import datetime, timedelta try: from dotenv import load_dotenv load_dotenv() except ImportError: pass import pandas as pd from database import get_supabase_client def get_plant_capacities(client) -> dict: """plants 테이블에서 용량 정보 조회""" try: result = client.table("plants").select("id, capacity").execute() return {row['id']: row.get('capacity', 99.0) for row in result.data} except Exception as e: print(f" ⚠️ 용량 조회 실패: {e}") return {} def calculate_daily_stats(date_str: str = None): """ 특정 날짜의 발전 통계 집계 Args: date_str: 집계 대상 날짜 (YYYY-MM-DD). 미지정 시 오늘. """ if date_str is None: date_str = datetime.now().strftime('%Y-%m-%d') print(f"\n📊 [일일 통계 집계] {date_str}") print("-" * 60) client = get_supabase_client() if not client: print("❌ Supabase 연결 실패") return False # 1. 용량 정보 조회 capacities = get_plant_capacities(client) # 2. 해당일 로그 조회 start_dt = f"{date_str}T00:00:00" end_dt = f"{date_str}T23:59:59" try: result = client.table("solar_logs") \ .select("plant_id, current_kw, today_kwh, created_at") \ .gte("created_at", start_dt) \ .lte("created_at", end_dt) \ .order("created_at", desc=False) \ .execute() if not result.data: print(" ⚠️ 해당 날짜의 로그가 없습니다.") return False df = pd.DataFrame(result.data) except Exception as e: print(f" ❌ 로그 조회 실패: {e}") return False # 3. 발전소별 통계 계산 stats_list = [] for plant_id, group in df.groupby('plant_id'): # 마지막 로그의 today_kwh total_generation = group['today_kwh'].iloc[-1] if len(group) > 0 else 0 # 최대 출력 peak_kw = group['current_kw'].max() if len(group) > 0 else 0 # 이용률 시간 = 발전량 / 용량 capacity = capacities.get(plant_id, 99.0) generation_hours = round(total_generation / capacity, 2) if capacity > 0 else 0 stats = { 'plant_id': plant_id, 'date': date_str, 'total_generation': round(total_generation, 2), 'peak_kw': round(peak_kw, 2), 'generation_hours': generation_hours } stats_list.append(stats) # 출력 print(f" {plant_id}: {total_generation:.1f}kWh ({generation_hours:.1f}시간, 최대 {peak_kw:.1f}kW)") # 4. daily_stats 테이블에 Upsert if stats_list: try: result = client.table("daily_stats").upsert( stats_list, on_conflict="plant_id,date" ).execute() print("-" * 60) print(f"✅ {len(stats_list)}개 발전소 통계 저장 완료") except Exception as e: print(f" ❌ 저장 실패: {e}") return False return True def calculate_monthly_stats(target_month: str): """ 특정 월의 발전 통계 집계 (일간 데이터 합산) Args: target_month: YYYY-MM """ print(f"\n📅 [월간 통계 집계] {target_month}") print("-" * 60) client = get_supabase_client() if not client: return False try: # 1. 모든 발전소 ID 조회 plants_res = client.table("plants").select("id").execute() plant_ids = [p['id'] for p in plants_res.data] updated_count = 0 for pid in plant_ids: # 2. 해당 월의 Daily 합계 조회 d_res = client.table("daily_stats").select("total_generation") \ .eq("plant_id", pid) \ .gte("date", f"{target_month}-01") \ .lte("date", f"{target_month}-31") \ .execute() if not d_res.data: continue total_gen = sum(r.get('total_generation', 0) or 0 for r in d_res.data) # 3. Monthly Upsert client.table("monthly_stats").upsert({ "plant_id": pid, "month": target_month, "total_generation": round(total_gen, 2), "updated_at": datetime.now().isoformat() }, on_conflict="plant_id, month").execute() print(f" {pid}: {total_gen:.1f}kWh (Month Total)") updated_count += 1 print("-" * 60) print(f"✅ {updated_count}개 발전소 월간 통계 갱신 완료") return True except Exception as e: print(f" ❌ 월간 집계 실패: {e}") return False if __name__ == "__main__": import sys from datetime import timedelta # 인자로 날짜 지정 가능: python daily_summary.py 2026-01-22 if len(sys.argv) > 1: target_date = sys.argv[1] else: # 인자 없으면 '어제' 날짜를 기본값으로 사용 # (새벽에 실행하여 전날 데이터를 마감하는 시나리오) yesterday = datetime.now() - timedelta(days=1) target_date = yesterday.strftime('%Y-%m-%d') print(f"ℹ️ 날짜 미지정 -> 어제({target_date}) 기준으로 집계합니다.") # 1. 일간 통계 집계 success = calculate_daily_stats(target_date) # 2. 월말 체크 및 월간 집계 트리거 # target_date가 해당 월의 마지막 날이면 월간 집계 실행 if success: try: current_dt = datetime.strptime(target_date, '%Y-%m-%d') import calendar last_day = calendar.monthrange(current_dt.year, current_dt.month)[1] if current_dt.day == last_day: target_month = current_dt.strftime('%Y-%m') print(f"\n🔔 월말({target_date}) 감지 -> {target_month} 월간 집계 실행") calculate_monthly_stats(target_month) except Exception as e: print(f"⚠️ 월간 집계 트리거 오류: {e}")