solorpower_server/app/routers/stats.py

280 lines
10 KiB
Python

"""
통계 조회 API
- 일별/월별/연도별 발전량 통계
"""
from fastapi import APIRouter, HTTPException, Depends, Query
from supabase import Client
from typing import List, Literal
from datetime import datetime, timedelta, timezone
from app.core.database import get_db
router = APIRouter(
prefix="/plants",
tags=["Stats"]
)
@router.get("/{plant_id}/stats")
async def get_plant_stats(
plant_id: str,
period: Literal["day", "month", "year"] = Query("day", description="통계 기간"),
year: int = Query(None, description="특정 연도"),
month: int = Query(None, description="특정 월 (period='day' 시 필수)"),
db: Client = Depends(get_db)
) -> dict:
"""
발전소 통계 조회 (Hybrid 방식)
"""
try:
today = datetime.now().date()
today_str = today.isoformat()
# 1. 과거 데이터 조회 (period에 따라 테이블 분기)
stats_data_raw = []
is_monthly_source = False
if period == "day":
# [일별 조회] daily_stats 테이블 사용
# 특정 연/월의 1일 ~ 말일 조회
target_year = year if year else today.year
target_month = month if month else today.month
# 해당 월의 시작일과 마지막일 계산
import calendar
last_day = calendar.monthrange(target_year, target_month)[1]
start_date_str = f"{target_year}-{target_month:02d}-01"
end_date_str = f"{target_year}-{target_month:02d}-{last_day}"
stats_query = db.table("daily_stats") \
.select("date, total_generation") \
.eq("plant_id", plant_id) \
.gte("date", start_date_str) \
.lte("date", end_date_str) \
.order("date", desc=False)
stats_data_raw = stats_query.execute().data
else:
# [월별/연도별 조회] monthly_stats 테이블 사용
is_monthly_source = True
if period == "month":
# year 파라미터가 있으면 해당 연도 1월~12월, 없으면 올해
target_year = year if year else today.year
start_month = f"{target_year}-01"
end_month = f"{target_year}-12"
else: # year
# year 파라미터가 있으면 해당 연도 포함 최근 5년치 조회
if year:
start_year = year - 4
else:
start_year = today.year - 9 # 10년치
start_month = f"{start_year}-01"
end_month = f"{today.year}-12"
stats_query = db.table("monthly_stats") \
.select("month, total_generation") \
.eq("plant_id", plant_id) \
.gte("month", start_month) \
.lte("month", end_month) \
.order("month", desc=False)
stats_data_raw = stats_query.execute().data
# 데이터 맵핑 {날짜키: 발전량}
data_map = {}
for row in stats_data_raw:
key = row.get("month") if is_monthly_source else row.get("date")
val = row.get("total_generation") or 0
if key:
data_map[key] = val
# 2. 오늘 실시간 데이터 조회 (solar_logs) -> period='day'이고 '오늘'이 포함된 달이면 현재값 업데이트
# (생략 가능하지만, 오늘 날짜가 조회 범위에 포함되면 최신값 반영)
if period == "day":
# 조회 중인 달이 이번 달인지 확인
if (not year or year == today.year) and (not month or month == today.month):
logs_result = db.table("solar_logs") \
.select("today_kwh") \
.eq("plant_id", plant_id) \
.gte("created_at", f"{today_str}T00:00:00") \
.order("created_at", desc=True) \
.limit(1) \
.execute()
if logs_result.data:
today_generation = logs_result.data[0].get("today_kwh", 0.0)
if today_generation > 0:
data_map[today_str] = today_generation
# 4. 포맷팅 및 집계
data = []
if period == "day":
# 해당 월의 모든 날짜 생성
target_year = year if year else today.year
target_month = month if month else today.month
last_day = calendar.monthrange(target_year, target_month)[1]
# 미래 날짜는 제외해야 하나? UI에서 처리하거나 null로?
# 일단 0으로 채움.
for d in range(1, last_day + 1):
d_str = f"{target_year}-{target_month:02d}-{d:02d}"
# 미래 날짜 처리: 오늘 이후라면 데이터가 없을 것임 (0).
# 하지만 '오늘'까지는 표시.
data.append({
"label": d_str,
"value": round(data_map.get(d_str, 0), 2)
})
elif period == "month":
# 월별 집계
monthly = {}
for date_str, val in data_map.items():
# date_str이 'YYYY-MM-DD' 형식이면 YYYY-MM 추출
# 근데 monthly_stats 사용 시 이미 YYYY-MM
if is_monthly_source:
month_key = date_str
else:
month_key = date_str[:7]
monthly[month_key] = monthly.get(month_key, 0) + val
# 특정 연도의 1~12월 데이터 생성
target_year = year if year else today.year
for m in range(1, 13):
month_key = f"{target_year}-{m:02d}"
data.append({
"label": month_key,
"value": round(monthly.get(month_key, 0), 2)
})
elif period == "year":
# 연도별 집계
yearly = {}
for date_str, val in data_map.items():
year_key = date_str[:4]
yearly[year_key] = yearly.get(year_key, 0) + val
# year 파라미터가 있으면 해당 연도 기준 최근 5년 표시 (예: 2026 선택 -> 2022~2026)
if year:
end_year = year
start_year = year - 4
else:
end_year = today.year
start_year = today.year - 9
for y in range(start_year, end_year + 1):
y_str = str(y)
data.append({
"label": y_str,
"value": round(yearly.get(y_str, 0), 2)
})
return {
"status": "success",
"plant_id": plant_id,
"period": period,
"data": data,
"count": len(data)
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"통계 조회 실패: {str(e)}"
)
@router.get("/{plant_id}/stats/today")
async def get_plant_hourly_stats(
plant_id: str,
date: str = Query(None, description="조회 날짜 (YYYY-MM-DD)"),
db: Client = Depends(get_db)
) -> dict:
"""
특정 날짜의 시간별 발전 데이터 조회 (solar_logs 기반)
Defaults to today if date is not provided.
"""
try:
# KST (UTC+9) 시간대 설정
kst_timezone = timezone(timedelta(hours=9))
if date:
try:
target_date = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD")
else:
target_date = datetime.now(kst_timezone).date()
target_date_str = target_date.isoformat()
# 조회 범위: 해당 일 00:00:00 ~ 23:59:59 (KST)
from_dt = datetime.combine(target_date, datetime.min.time()).replace(tzinfo=kst_timezone)
to_dt = datetime.combine(target_date, datetime.max.time()).replace(tzinfo=kst_timezone)
# 전조치: UTC로 변환하여 쿼리 (Supabase DB가 UTC라고 가정)
# 하지만 solar_logs는 created_at이 timestamptz로 저장되어 있을 것임.
# 안전하게는 UTC 시간으로 필터링
from_utc = from_dt.astimezone(timezone.utc)
to_utc = to_dt.astimezone(timezone.utc)
logs_result = db.table("solar_logs") \
.select("current_kw, today_kwh, created_at") \
.eq("plant_id", plant_id) \
.gte("created_at", from_utc.isoformat()) \
.lte("created_at", to_utc.isoformat()) \
.order("created_at", desc=False) \
.execute()
# 시간별로 그룹화
hourly_data = {}
for log in logs_result.data:
created_at = log.get("created_at", "")
if created_at:
try:
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
dt_kst = dt.astimezone(kst_timezone)
if dt_kst.date() != target_date:
continue
hour = dt_kst.hour
hourly_data[hour] = {
"current_kw": log.get("current_kw", 0) or 0,
"today_kwh": log.get("today_kwh", 0) or 0,
}
except ValueError:
continue
result = []
for hour in range(24):
data = hourly_data.get(hour, {"current_kw": 0, "today_kwh": 0})
result.append({
"hour": hour,
"label": f"{hour}",
"current_kw": round(data["current_kw"], 2),
"today_kwh": round(data["today_kwh"], 2),
"has_data": hour in hourly_data
})
return {
"status": "success",
"plant_id": plant_id,
"date": target_date_str,
"data": result,
"count": len([d for d in result if d["has_data"]])
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"시간별 통계 조회 실패: {str(e)}"
)