solorpower_server/app/routers/stats.py

275 lines
10 KiB
Python

"""
통계 조회 API
- 일별/월별/연도별 발전량 통계
"""
from fastapi import APIRouter, HTTPException, Depends, Query
from supabase import Client
from typing import List, Literal
from datetime import datetime, timedelta, timezone
from app.core.database import get_db
router = APIRouter(
prefix="/plants",
tags=["Stats"]
)
@router.get("/{plant_id}/stats")
async def get_plant_stats(
plant_id: str,
period: Literal["day", "month", "year"] = Query("day", description="통계 기간"),
year: int = Query(None, description="특정 연도 (월별/연도별 조회 시)"),
db: Client = Depends(get_db)
) -> dict:
"""
발전소 통계 조회 (Hybrid 방식)
1. daily_stats: 과거 데이터 조회
2. solar_logs: 오늘 실시간 데이터 조회
3. 병합: 오늘 날짜 데이터는 실시간 데이터 우선 사용
Args:
plant_id: 발전소 ID
period: 'day' (최근 30일), 'month' (최근 12개월 또는 특정 연도), 'year' (최근 연도들)
year: 특정 연도 (옵션, period='month' 시 해당 연도의 월별 데이터 반환)
Returns:
차트 라이브러리 친화적 포맷 [{"label": "...", "value": ...}, ...]
"""
try:
today = datetime.now().date()
today_str = today.isoformat()
# 1. 과거 데이터 조회 (daily_stats)
# 1. 과거 데이터 조회 (daily_stats)
if period == "day":
# 이번 달 1일부터 오늘까지
start_date = today.replace(day=1)
date_filter = start_date.isoformat()
elif period == "month":
# year 파라미터가 있으면 해당 연도 1월~12월, 없으면 올해
target_year = year if year else today.year
start_date = datetime(target_year, 1, 1).date()
# 종료일: 해당 연도 12월 31일 또는 오늘 중 작은 값
end_date = min(datetime(target_year, 12, 31).date(), today)
date_filter = start_date.isoformat()
else: # year
# year 파라미터가 있으면 해당 연도부터, 없으면 최근 10년
if year:
start_year = year
else:
start_year = today.year - 9 # 10년치 데이터 (올해 포함)
start_date = datetime(start_year, 1, 1).date()
end_date = today
date_filter = start_date.strftime("%Y-%m-%d")
stats_query = db.table("daily_stats") \
.select("date, total_generation") \
.eq("plant_id", plant_id) \
.gte("date", date_filter)
# period별 종료일 필터 추가
if period in ["month", "year"]:
stats_query = stats_query.lte("date", end_date.isoformat() if period == "month" else today_str)
else:
stats_query = stats_query.lte("date", today_str)
stats_query = stats_query.order("date", desc=False)
# Supabase API Limit(1000) 우회를 위한 페이지네이션
all_stats_data = []
start = 0
batch_size = 1000
while True:
# range는 inclusive index (Start, End)
result = stats_query.range(start, start + batch_size - 1).execute()
batch = result.data
all_stats_data.extend(batch)
if len(batch) < batch_size:
break
start += batch_size
# 데이터 맵핑 {날짜: 발전량}
data_map = {row["date"]: row["total_generation"] or 0 for row in all_stats_data}
# 2. 오늘 실시간 데이터 조회 (solar_logs) - 오늘이 조회 범위에 포함될 때만
today_generation = 0.0
if period == "day" or (period == "month" and (not year or year == today.year)) or period == "year":
# 오늘의 가장 마지막 기록 1건만 조회 (성능 최적화)
logs_result = db.table("solar_logs") \
.select("today_kwh, created_at") \
.eq("plant_id", plant_id) \
.gte("created_at", f"{today_str}T00:00:00") \
.order("created_at", desc=True) \
.limit(1) \
.execute()
if logs_result.data:
today_generation = logs_result.data[0].get("today_kwh", 0.0)
# 3. 데이터 병합 (오늘 데이터 갱신/추가)
# solar_logs 값이 있으면 무조건 daily_stats 값보다 우선 (실시간성)
if today_generation > 0:
data_map[today_str] = today_generation
# 4. 포맷팅 및 집계
data = []
if period == "day":
# 최근 30일 일별 데이터 생성 (누락된 날짜는 0으로 채움)
current = start_date
while current <= today:
d_str = current.isoformat()
data.append({
"label": d_str,
"value": round(data_map.get(d_str, 0), 2)
})
current += timedelta(days=1)
elif period == "month":
# 월별 집계
monthly = {}
# daily_stats + solar_logs(오늘) 데이터로 집계
for date_str, val in data_map.items():
month_key = date_str[:7]
monthly[month_key] = monthly.get(month_key, 0) + val
# 특정 연도의 1~12월 데이터 생성 (누락된 월은 0)
target_year = year if year else today.year
for month in range(1, 13):
month_key = f"{target_year}-{month:02d}"
# 미래 월은 제외
if datetime.strptime(month_key, "%Y-%m").date() > today.replace(day=1):
break
data.append({
"label": month_key,
"value": round(monthly.get(month_key, 0), 2)
})
elif period == "year":
# 연도별 집계
yearly = {}
for date_str, val in data_map.items():
year_key = date_str[:4]
yearly[year_key] = yearly.get(year_key, 0) + val
# 최근 10년 (또는 지정된 기간) 연도별 데이터 생성 (데이터 없으면 0)
data = []
target_start_year = start_date.year
current_year = today.year
for y in range(target_start_year, current_year + 1):
y_str = str(y)
data.append({
"label": y_str,
"value": round(yearly.get(y_str, 0), 2)
})
return {
"status": "success",
"plant_id": plant_id,
"period": period,
"data": data,
"count": len(data),
"today_realtime_kwh": today_generation # 디버깅용
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"통계 조회 실패: {str(e)}"
)
@router.get("/{plant_id}/stats/today")
async def get_plant_hourly_stats(
plant_id: str,
db: Client = Depends(get_db)
) -> dict:
"""
오늘 시간별 발전 데이터 조회 (solar_logs 기반)
Args:
plant_id: 발전소 ID
Returns:
시간별 데이터 [{"hour": 0, "current_kw": ..., "today_kwh": ...}, ...]
"""
try:
# KST (UTC+9) 시간대 설정
kst_timezone = timezone(timedelta(hours=9))
today_kst = datetime.now(kst_timezone).date()
today_str = today_kst.isoformat()
# 오늘의 모든 solar_logs 조회 (UTC 기준으로는 전날 15:00부터일 수 있으므로 넉넉하게 조회)
# 하지만 간단하게 '오늘' 날짜 문자열 필터링이 가장 안전 (DB가 UTC라면 보정 필요)
# 여기서는 created_at이 timestamptz라고 가정하고 단순 문자열 비교보다는 날짜 필터를 사용
# KST 00:00은 UTC로 전날 15:00
start_dt_kst = datetime.combine(today_kst, datetime.min.time()).replace(tzinfo=kst_timezone)
start_dt_utc = start_dt_kst.astimezone(timezone.utc)
logs_result = db.table("solar_logs") \
.select("current_kw, today_kwh, created_at") \
.eq("plant_id", plant_id) \
.gte("created_at", start_dt_utc.isoformat()) \
.order("created_at", desc=False) \
.execute()
# 시간별로 그룹화 (각 시간의 마지막 데이터 사용)
hourly_data = {}
for log in logs_result.data:
created_at = log.get("created_at", "")
if created_at:
try:
# ISO 형식 파싱 (DB에서 UTC로 넘어온다고 가정)
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
# UTC -> KST 변환
dt_kst = dt.astimezone(kst_timezone)
# 오늘 날짜인지 확인
if dt_kst.date() != today_kst:
continue
hour = dt_kst.hour
# 해당 시간대의 마지막 데이터로 갱신 (order가 오름차순이므로 덮어쓰면 됨)
hourly_data[hour] = {
"current_kw": log.get("current_kw", 0) or 0,
"today_kwh": log.get("today_kwh", 0) or 0,
}
except ValueError:
continue
# 0시~23시 전체 배열 생성
result = []
for hour in range(24):
data = hourly_data.get(hour, {"current_kw": 0, "today_kwh": 0})
result.append({
"hour": hour,
"label": f"{hour}",
"current_kw": round(data["current_kw"], 2),
"today_kwh": round(data["today_kwh"], 2),
"has_data": hour in hourly_data
})
return {
"status": "success",
"plant_id": plant_id,
"date": today_str,
"data": result,
"count": len([d for d in result if d["has_data"]])
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"시간별 통계 조회 실패: {str(e)}"
)