""" 통계 조회 API - 일별/월별/연도별 발전량 통계 """ from fastapi import APIRouter, HTTPException, Depends, Query from supabase import Client from typing import List, Literal from datetime import datetime, timedelta, timezone from app.core.database import get_db router = APIRouter( prefix="/plants", tags=["Stats"] ) @router.get("/{plant_id}/stats") async def get_plant_stats( plant_id: str, period: Literal["day", "month", "year"] = Query("day", description="통계 기간"), year: int = Query(None, description="특정 연도 (월별/연도별 조회 시)"), db: Client = Depends(get_db) ) -> dict: """ 발전소 통계 조회 (Hybrid 방식) 1. daily_stats: 과거 데이터 조회 2. solar_logs: 오늘 실시간 데이터 조회 3. 병합: 오늘 날짜 데이터는 실시간 데이터 우선 사용 Args: plant_id: 발전소 ID period: 'day' (최근 30일), 'month' (최근 12개월 또는 특정 연도), 'year' (최근 연도들) year: 특정 연도 (옵션, period='month' 시 해당 연도의 월별 데이터 반환) Returns: 차트 라이브러리 친화적 포맷 [{"label": "...", "value": ...}, ...] """ try: today = datetime.now().date() today_str = today.isoformat() # 1. 과거 데이터 조회 (daily_stats) # 1. 과거 데이터 조회 (period에 따라 테이블 분기) stats_data_raw = [] is_monthly_source = False if period == "day": # [일별 조회] daily_stats 테이블 사용 # 이번 달 1일부터 오늘까지 start_date = today.replace(day=1) date_filter = start_date.isoformat() stats_query = db.table("daily_stats") \ .select("date, total_generation") \ .eq("plant_id", plant_id) \ .gte("date", date_filter) \ .lte("date", today_str) \ .order("date", desc=False) # 페이지네이션 등 없이 심플하게 (일별은 데이터 적음) stats_data_raw = stats_query.execute().data else: # [월별/연도별 조회] monthly_stats 테이블 사용 is_monthly_source = True if period == "month": # year 파라미터가 있으면 해당 연도 1월~12월, 없으면 올해 target_year = year if year else today.year start_month = f"{target_year}-01" end_month = f"{target_year}-12" # 문자열 비교라 12월도 포함됨 else: # year # year 파라미터가 있으면 해당 연도부터 if year: start_year = year else: start_year = today.year - 9 # 10년치 start_month = f"{start_year}-01" end_month = f"{today.year}-12" stats_query = db.table("monthly_stats") \ .select("month, total_generation") \ .eq("plant_id", plant_id) \ .gte("month", start_month) \ .lte("month", end_month) \ .order("month", desc=False) stats_data_raw = stats_query.execute().data # 데이터 맵핑 {날짜키: 발전량} # daily_stats: key='date' (YYYY-MM-DD) # monthly_stats: key='month' (YYYY-MM) data_map = {} for row in stats_data_raw: key = row.get("month") if is_monthly_source else row.get("date") val = row.get("total_generation") or 0 if key: data_map[key] = val # 2. 오늘 실시간 데이터 조회 (solar_logs) # period='day'일 때만 합산 (monthly/year는 monthly_stats가 이미 갱신되었다고 가정하거나, 필요시 로직 추가) # 하지만 monthly_stats가 '어제까지'의 합계일 수 있으므로, '이번 달' 데이터에는 오늘 발전량을 더해주는 게 안전함. # 그러나 로직 복잡성을 피하기 위해, 크롤러가 실시간으로 monthly_stats를 갱신한다고 가정하고 여기선 생략 가능. # 기존 로직 유지: 'day'일 때는 무조건 덮어쓰기. today_generation = 0.0 # 일별 조회 시 오늘 데이터 덮어쓰기 if period == "day": logs_result = db.table("solar_logs") \ .select("today_kwh") \ .eq("plant_id", plant_id) \ .gte("created_at", f"{today_str}T00:00:00") \ .order("created_at", desc=True) \ .limit(1) \ .execute() if logs_result.data: today_generation = logs_result.data[0].get("today_kwh", 0.0) if today_generation > 0: data_map[today_str] = today_generation # 월별/연도별 조회 시: '이번 달' 키에 오늘 발전량을 더해야 하는가? # 마이그레이션 스크립트는 daily_stats의 합을 넣었으므로 오늘 데이터도 포함됨. # 크롤러도 실시간으로 daily 넣으면서 monthly upsert 할 예정. # 따라서 별도 합산 불필요. # 4. 포맷팅 및 집계 data = [] if period == "day": # 최근 30일 일별 데이터 생성 (누락된 날짜는 0으로 채움) current = start_date while current <= today: d_str = current.isoformat() data.append({ "label": d_str, "value": round(data_map.get(d_str, 0), 2) }) current += timedelta(days=1) elif period == "month": # 월별 집계 monthly = {} # daily_stats + solar_logs(오늘) 데이터로 집계 for date_str, val in data_map.items(): month_key = date_str[:7] monthly[month_key] = monthly.get(month_key, 0) + val # 특정 연도의 1~12월 데이터 생성 (누락된 월은 0) target_year = year if year else today.year for month in range(1, 13): month_key = f"{target_year}-{month:02d}" # 미래 월은 제외 if datetime.strptime(month_key, "%Y-%m").date() > today.replace(day=1): break data.append({ "label": month_key, "value": round(monthly.get(month_key, 0), 2) }) elif period == "year": # 연도별 집계 yearly = {} for date_str, val in data_map.items(): year_key = date_str[:4] yearly[year_key] = yearly.get(year_key, 0) + val # 최근 10년 (또는 지정된 기간) 연도별 데이터 생성 (데이터 없으면 0) data = [] if year: target_start_year = year else: target_start_year = today.year - 9 current_year = today.year for y in range(target_start_year, current_year + 1): y_str = str(y) data.append({ "label": y_str, "value": round(yearly.get(y_str, 0), 2) }) return { "status": "success", "plant_id": plant_id, "period": period, "data": data, "count": len(data), "today_realtime_kwh": today_generation # 디버깅용 } except HTTPException: raise except Exception as e: raise HTTPException( status_code=500, detail=f"통계 조회 실패: {str(e)}" ) @router.get("/{plant_id}/stats/today") async def get_plant_hourly_stats( plant_id: str, db: Client = Depends(get_db) ) -> dict: """ 오늘 시간별 발전 데이터 조회 (solar_logs 기반) Args: plant_id: 발전소 ID Returns: 시간별 데이터 [{"hour": 0, "current_kw": ..., "today_kwh": ...}, ...] """ try: # KST (UTC+9) 시간대 설정 kst_timezone = timezone(timedelta(hours=9)) today_kst = datetime.now(kst_timezone).date() today_str = today_kst.isoformat() # 오늘의 모든 solar_logs 조회 (UTC 기준으로는 전날 15:00부터일 수 있으므로 넉넉하게 조회) # 하지만 간단하게 '오늘' 날짜 문자열 필터링이 가장 안전 (DB가 UTC라면 보정 필요) # 여기서는 created_at이 timestamptz라고 가정하고 단순 문자열 비교보다는 날짜 필터를 사용 # KST 00:00은 UTC로 전날 15:00 start_dt_kst = datetime.combine(today_kst, datetime.min.time()).replace(tzinfo=kst_timezone) start_dt_utc = start_dt_kst.astimezone(timezone.utc) logs_result = db.table("solar_logs") \ .select("current_kw, today_kwh, created_at") \ .eq("plant_id", plant_id) \ .gte("created_at", start_dt_utc.isoformat()) \ .order("created_at", desc=False) \ .execute() # 시간별로 그룹화 (각 시간의 마지막 데이터 사용) hourly_data = {} for log in logs_result.data: created_at = log.get("created_at", "") if created_at: try: # ISO 형식 파싱 (DB에서 UTC로 넘어온다고 가정) dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) # UTC -> KST 변환 dt_kst = dt.astimezone(kst_timezone) # 오늘 날짜인지 확인 if dt_kst.date() != today_kst: continue hour = dt_kst.hour # 해당 시간대의 마지막 데이터로 갱신 (order가 오름차순이므로 덮어쓰면 됨) hourly_data[hour] = { "current_kw": log.get("current_kw", 0) or 0, "today_kwh": log.get("today_kwh", 0) or 0, } except ValueError: continue # 0시~23시 전체 배열 생성 result = [] for hour in range(24): data = hourly_data.get(hour, {"current_kw": 0, "today_kwh": 0}) result.append({ "hour": hour, "label": f"{hour}시", "current_kw": round(data["current_kw"], 2), "today_kwh": round(data["today_kwh"], 2), "has_data": hour in hourly_data }) return { "status": "success", "plant_id": plant_id, "date": today_str, "data": result, "count": len([d for d in result if d["has_data"]]) } except Exception as e: raise HTTPException( status_code=500, detail=f"시간별 통계 조회 실패: {str(e)}" )