233 lines
8.2 KiB
Python
233 lines
8.2 KiB
Python
"""
|
|
통계 조회 API
|
|
- 일별/월별/연도별 발전량 통계
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Query
|
|
from supabase import Client
|
|
from typing import List, Literal
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from app.core.database import get_db
|
|
|
|
router = APIRouter(
|
|
prefix="/plants",
|
|
tags=["Stats"]
|
|
)
|
|
|
|
|
|
@router.get("/{plant_id}/stats")
|
|
async def get_plant_stats(
|
|
plant_id: str,
|
|
period: Literal["day", "month", "year"] = Query("day", description="통계 기간"),
|
|
db: Client = Depends(get_db)
|
|
) -> dict:
|
|
"""
|
|
발전소 통계 조회 (Hybrid 방식)
|
|
1. daily_stats: 과거 데이터 조회
|
|
2. solar_logs: 오늘 실시간 데이터 조회
|
|
3. 병합: 오늘 날짜 데이터는 실시간 데이터 우선 사용
|
|
|
|
Args:
|
|
plant_id: 발전소 ID
|
|
period: 'day' (최근 30일), 'month' (최근 12개월), 'year' (최근 5년)
|
|
|
|
Returns:
|
|
차트 라이브러리 친화적 포맷 [{"label": "...", "value": ...}, ...]
|
|
"""
|
|
try:
|
|
today = datetime.now().date()
|
|
today_str = today.isoformat()
|
|
|
|
# 1. 과거 데이터 조회 (daily_stats)
|
|
# 1. 과거 데이터 조회 (daily_stats)
|
|
if period == "day":
|
|
# 이번 달 1일부터 오늘까지
|
|
start_date = today.replace(day=1)
|
|
date_filter = start_date.isoformat()
|
|
elif period == "month":
|
|
start_date = today.replace(day=1) - timedelta(days=365)
|
|
date_filter = start_date.isoformat()
|
|
else: # year
|
|
start_date = datetime(today.year - 5, 1, 1).date()
|
|
date_filter = start_date.strftime("%Y-%m-%d")
|
|
|
|
stats_query = db.table("daily_stats") \
|
|
.select("date, total_generation") \
|
|
.eq("plant_id", plant_id) \
|
|
.gte("date", date_filter) \
|
|
.lte("date", today_str) \
|
|
.order("date", desc=False)
|
|
|
|
stats_result = stats_query.execute()
|
|
|
|
# 데이터 맵핑 {날짜: 발전량}
|
|
data_map = {row["date"]: row["total_generation"] or 0 for row in stats_result.data}
|
|
|
|
# 2. 오늘 실시간 데이터 조회 (solar_logs)
|
|
# 오늘의 가장 마지막 기록 1건만 조회 (성능 최적화)
|
|
logs_result = db.table("solar_logs") \
|
|
.select("today_kwh, created_at") \
|
|
.eq("plant_id", plant_id) \
|
|
.gte("created_at", f"{today_str}T00:00:00") \
|
|
.order("created_at", desc=True) \
|
|
.limit(1) \
|
|
.execute()
|
|
|
|
today_generation = 0.0
|
|
if logs_result.data:
|
|
today_generation = logs_result.data[0].get("today_kwh", 0.0)
|
|
|
|
# 3. 데이터 병합 (오늘 데이터 갱신/추가)
|
|
# solar_logs 값이 있으면 무조건 daily_stats 값보다 우선 (실시간성)
|
|
if today_generation > 0:
|
|
data_map[today_str] = today_generation
|
|
|
|
# 4. 포맷팅 및 집계
|
|
data = []
|
|
|
|
if period == "day":
|
|
# 최근 30일 일별 데이터 생성 (누락된 날짜는 0으로 채움)
|
|
current = start_date
|
|
while current <= today:
|
|
d_str = current.isoformat()
|
|
data.append({
|
|
"label": d_str,
|
|
"value": round(data_map.get(d_str, 0), 2)
|
|
})
|
|
current += timedelta(days=1)
|
|
|
|
elif period == "month":
|
|
# 월별 집계
|
|
monthly = {}
|
|
# daily_stats + solar_logs(오늘) 데이터로 집계
|
|
for date_str, val in data_map.items():
|
|
month_key = date_str[:7]
|
|
monthly[month_key] = monthly.get(month_key, 0) + val
|
|
|
|
sorted_keys = sorted(monthly.keys())
|
|
data = [
|
|
{"label": k, "value": round(monthly[k], 2)}
|
|
for k in sorted_keys
|
|
if k >= start_date.strftime("%Y-%m")
|
|
]
|
|
|
|
elif period == "year":
|
|
# 연도별 집계
|
|
yearly = {}
|
|
for date_str, val in data_map.items():
|
|
year_key = date_str[:4]
|
|
yearly[year_key] = yearly.get(year_key, 0) + val
|
|
|
|
sorted_keys = sorted(yearly.keys())
|
|
data = [
|
|
{"label": k, "value": round(yearly[k], 2)}
|
|
for k in sorted_keys
|
|
if k >= str(start_date.year)
|
|
]
|
|
|
|
return {
|
|
"status": "success",
|
|
"plant_id": plant_id,
|
|
"period": period,
|
|
"data": data,
|
|
"count": len(data),
|
|
"today_realtime_kwh": today_generation # 디버깅용
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"통계 조회 실패: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/{plant_id}/stats/today")
|
|
async def get_plant_hourly_stats(
|
|
plant_id: str,
|
|
db: Client = Depends(get_db)
|
|
) -> dict:
|
|
"""
|
|
오늘 시간별 발전 데이터 조회 (solar_logs 기반)
|
|
|
|
Args:
|
|
plant_id: 발전소 ID
|
|
|
|
Returns:
|
|
시간별 데이터 [{"hour": 0, "current_kw": ..., "today_kwh": ...}, ...]
|
|
"""
|
|
try:
|
|
# KST (UTC+9) 시간대 설정
|
|
kst_timezone = timezone(timedelta(hours=9))
|
|
today_kst = datetime.now(kst_timezone).date()
|
|
today_str = today_kst.isoformat()
|
|
|
|
# 오늘의 모든 solar_logs 조회 (UTC 기준으로는 전날 15:00부터일 수 있으므로 넉넉하게 조회)
|
|
# 하지만 간단하게 '오늘' 날짜 문자열 필터링이 가장 안전 (DB가 UTC라면 보정 필요)
|
|
# 여기서는 created_at이 timestamptz라고 가정하고 단순 문자열 비교보다는 날짜 필터를 사용
|
|
|
|
# KST 00:00은 UTC로 전날 15:00
|
|
start_dt_kst = datetime.combine(today_kst, datetime.min.time()).replace(tzinfo=kst_timezone)
|
|
start_dt_utc = start_dt_kst.astimezone(timezone.utc)
|
|
|
|
logs_result = db.table("solar_logs") \
|
|
.select("current_kw, today_kwh, created_at") \
|
|
.eq("plant_id", plant_id) \
|
|
.gte("created_at", start_dt_utc.isoformat()) \
|
|
.order("created_at", desc=False) \
|
|
.execute()
|
|
|
|
# 시간별로 그룹화 (각 시간의 마지막 데이터 사용)
|
|
hourly_data = {}
|
|
for log in logs_result.data:
|
|
created_at = log.get("created_at", "")
|
|
if created_at:
|
|
try:
|
|
# ISO 형식 파싱 (DB에서 UTC로 넘어온다고 가정)
|
|
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
|
|
|
|
# UTC -> KST 변환
|
|
dt_kst = dt.astimezone(kst_timezone)
|
|
|
|
# 오늘 날짜인지 확인
|
|
if dt_kst.date() != today_kst:
|
|
continue
|
|
|
|
hour = dt_kst.hour
|
|
|
|
# 해당 시간대의 마지막 데이터로 갱신 (order가 오름차순이므로 덮어쓰면 됨)
|
|
hourly_data[hour] = {
|
|
"current_kw": log.get("current_kw", 0) or 0,
|
|
"today_kwh": log.get("today_kwh", 0) or 0,
|
|
}
|
|
except ValueError:
|
|
continue
|
|
|
|
# 0시~23시 전체 배열 생성
|
|
result = []
|
|
for hour in range(24):
|
|
data = hourly_data.get(hour, {"current_kw": 0, "today_kwh": 0})
|
|
result.append({
|
|
"hour": hour,
|
|
"label": f"{hour}시",
|
|
"current_kw": round(data["current_kw"], 2),
|
|
"today_kwh": round(data["today_kwh"], 2),
|
|
"has_data": hour in hourly_data
|
|
})
|
|
|
|
return {
|
|
"status": "success",
|
|
"plant_id": plant_id,
|
|
"date": today_str,
|
|
"data": result,
|
|
"count": len([d for d in result if d["has_data"]])
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"시간별 통계 조회 실패: {str(e)}"
|
|
)
|