289 lines
11 KiB
Python
289 lines
11 KiB
Python
"""
|
|
통계 조회 API
|
|
- 일별/월별/연도별 발전량 통계
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, Depends, Query
|
|
from supabase import Client
|
|
from typing import List, Literal
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from app.core.database import get_db
|
|
|
|
router = APIRouter(
|
|
prefix="/plants",
|
|
tags=["Stats"]
|
|
)
|
|
|
|
|
|
@router.get("/{plant_id}/stats")
|
|
async def get_plant_stats(
|
|
plant_id: str,
|
|
period: Literal["day", "month", "year"] = Query("day", description="통계 기간"),
|
|
year: int = Query(None, description="특정 연도 (월별/연도별 조회 시)"),
|
|
db: Client = Depends(get_db)
|
|
) -> dict:
|
|
"""
|
|
발전소 통계 조회 (Hybrid 방식)
|
|
1. daily_stats: 과거 데이터 조회
|
|
2. solar_logs: 오늘 실시간 데이터 조회
|
|
3. 병합: 오늘 날짜 데이터는 실시간 데이터 우선 사용
|
|
|
|
Args:
|
|
plant_id: 발전소 ID
|
|
period: 'day' (최근 30일), 'month' (최근 12개월 또는 특정 연도), 'year' (최근 연도들)
|
|
year: 특정 연도 (옵션, period='month' 시 해당 연도의 월별 데이터 반환)
|
|
|
|
Returns:
|
|
차트 라이브러리 친화적 포맷 [{"label": "...", "value": ...}, ...]
|
|
"""
|
|
try:
|
|
today = datetime.now().date()
|
|
today_str = today.isoformat()
|
|
|
|
# 1. 과거 데이터 조회 (daily_stats)
|
|
# 1. 과거 데이터 조회 (period에 따라 테이블 분기)
|
|
stats_data_raw = []
|
|
is_monthly_source = False
|
|
|
|
if period == "day":
|
|
# [일별 조회] daily_stats 테이블 사용
|
|
# 이번 달 1일부터 오늘까지
|
|
start_date = today.replace(day=1)
|
|
date_filter = start_date.isoformat()
|
|
|
|
stats_query = db.table("daily_stats") \
|
|
.select("date, total_generation") \
|
|
.eq("plant_id", plant_id) \
|
|
.gte("date", date_filter) \
|
|
.lte("date", today_str) \
|
|
.order("date", desc=False)
|
|
|
|
# 페이지네이션 등 없이 심플하게 (일별은 데이터 적음)
|
|
stats_data_raw = stats_query.execute().data
|
|
|
|
else:
|
|
# [월별/연도별 조회] monthly_stats 테이블 사용
|
|
is_monthly_source = True
|
|
|
|
if period == "month":
|
|
# year 파라미터가 있으면 해당 연도 1월~12월, 없으면 올해
|
|
target_year = year if year else today.year
|
|
start_month = f"{target_year}-01"
|
|
end_month = f"{target_year}-12" # 문자열 비교라 12월도 포함됨
|
|
else: # year
|
|
# year 파라미터가 있으면 해당 연도부터
|
|
if year:
|
|
start_year = year
|
|
else:
|
|
start_year = today.year - 9 # 10년치
|
|
start_month = f"{start_year}-01"
|
|
end_month = f"{today.year}-12"
|
|
|
|
stats_query = db.table("monthly_stats") \
|
|
.select("month, total_generation") \
|
|
.eq("plant_id", plant_id) \
|
|
.gte("month", start_month) \
|
|
.lte("month", end_month) \
|
|
.order("month", desc=False)
|
|
|
|
stats_data_raw = stats_query.execute().data
|
|
|
|
# 데이터 맵핑 {날짜키: 발전량}
|
|
# daily_stats: key='date' (YYYY-MM-DD)
|
|
# monthly_stats: key='month' (YYYY-MM)
|
|
data_map = {}
|
|
for row in stats_data_raw:
|
|
key = row.get("month") if is_monthly_source else row.get("date")
|
|
val = row.get("total_generation") or 0
|
|
if key:
|
|
data_map[key] = val
|
|
|
|
# 2. 오늘 실시간 데이터 조회 (solar_logs)
|
|
# period='day'일 때만 합산 (monthly/year는 monthly_stats가 이미 갱신되었다고 가정하거나, 필요시 로직 추가)
|
|
# 하지만 monthly_stats가 '어제까지'의 합계일 수 있으므로, '이번 달' 데이터에는 오늘 발전량을 더해주는 게 안전함.
|
|
# 그러나 로직 복잡성을 피하기 위해, 크롤러가 실시간으로 monthly_stats를 갱신한다고 가정하고 여기선 생략 가능.
|
|
# 기존 로직 유지: 'day'일 때는 무조건 덮어쓰기.
|
|
|
|
today_generation = 0.0
|
|
# 일별 조회 시 오늘 데이터 덮어쓰기
|
|
if period == "day":
|
|
logs_result = db.table("solar_logs") \
|
|
.select("today_kwh") \
|
|
.eq("plant_id", plant_id) \
|
|
.gte("created_at", f"{today_str}T00:00:00") \
|
|
.order("created_at", desc=True) \
|
|
.limit(1) \
|
|
.execute()
|
|
|
|
if logs_result.data:
|
|
today_generation = logs_result.data[0].get("today_kwh", 0.0)
|
|
if today_generation > 0:
|
|
data_map[today_str] = today_generation
|
|
|
|
# 월별/연도별 조회 시: '이번 달' 키에 오늘 발전량을 더해야 하는가?
|
|
# 마이그레이션 스크립트는 daily_stats의 합을 넣었으므로 오늘 데이터도 포함됨.
|
|
# 크롤러도 실시간으로 daily 넣으면서 monthly upsert 할 예정.
|
|
# 따라서 별도 합산 불필요.
|
|
|
|
# 4. 포맷팅 및 집계
|
|
data = []
|
|
|
|
if period == "day":
|
|
# 최근 30일 일별 데이터 생성 (누락된 날짜는 0으로 채움)
|
|
current = start_date
|
|
while current <= today:
|
|
d_str = current.isoformat()
|
|
data.append({
|
|
"label": d_str,
|
|
"value": round(data_map.get(d_str, 0), 2)
|
|
})
|
|
current += timedelta(days=1)
|
|
|
|
elif period == "month":
|
|
# 월별 집계
|
|
monthly = {}
|
|
# daily_stats + solar_logs(오늘) 데이터로 집계
|
|
for date_str, val in data_map.items():
|
|
month_key = date_str[:7]
|
|
monthly[month_key] = monthly.get(month_key, 0) + val
|
|
|
|
# 특정 연도의 1~12월 데이터 생성 (누락된 월은 0)
|
|
target_year = year if year else today.year
|
|
for month in range(1, 13):
|
|
month_key = f"{target_year}-{month:02d}"
|
|
# 미래 월은 제외
|
|
if datetime.strptime(month_key, "%Y-%m").date() > today.replace(day=1):
|
|
break
|
|
data.append({
|
|
"label": month_key,
|
|
"value": round(monthly.get(month_key, 0), 2)
|
|
})
|
|
|
|
elif period == "year":
|
|
# 연도별 집계
|
|
yearly = {}
|
|
for date_str, val in data_map.items():
|
|
year_key = date_str[:4]
|
|
yearly[year_key] = yearly.get(year_key, 0) + val
|
|
|
|
# 최근 10년 (또는 지정된 기간) 연도별 데이터 생성 (데이터 없으면 0)
|
|
data = []
|
|
if year:
|
|
target_start_year = year
|
|
else:
|
|
target_start_year = today.year - 9
|
|
current_year = today.year
|
|
|
|
for y in range(target_start_year, current_year + 1):
|
|
y_str = str(y)
|
|
data.append({
|
|
"label": y_str,
|
|
"value": round(yearly.get(y_str, 0), 2)
|
|
})
|
|
|
|
return {
|
|
"status": "success",
|
|
"plant_id": plant_id,
|
|
"period": period,
|
|
"data": data,
|
|
"count": len(data),
|
|
"today_realtime_kwh": today_generation # 디버깅용
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"통계 조회 실패: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/{plant_id}/stats/today")
|
|
async def get_plant_hourly_stats(
|
|
plant_id: str,
|
|
db: Client = Depends(get_db)
|
|
) -> dict:
|
|
"""
|
|
오늘 시간별 발전 데이터 조회 (solar_logs 기반)
|
|
|
|
Args:
|
|
plant_id: 발전소 ID
|
|
|
|
Returns:
|
|
시간별 데이터 [{"hour": 0, "current_kw": ..., "today_kwh": ...}, ...]
|
|
"""
|
|
try:
|
|
# KST (UTC+9) 시간대 설정
|
|
kst_timezone = timezone(timedelta(hours=9))
|
|
today_kst = datetime.now(kst_timezone).date()
|
|
today_str = today_kst.isoformat()
|
|
|
|
# 오늘의 모든 solar_logs 조회 (UTC 기준으로는 전날 15:00부터일 수 있으므로 넉넉하게 조회)
|
|
# 하지만 간단하게 '오늘' 날짜 문자열 필터링이 가장 안전 (DB가 UTC라면 보정 필요)
|
|
# 여기서는 created_at이 timestamptz라고 가정하고 단순 문자열 비교보다는 날짜 필터를 사용
|
|
|
|
# KST 00:00은 UTC로 전날 15:00
|
|
start_dt_kst = datetime.combine(today_kst, datetime.min.time()).replace(tzinfo=kst_timezone)
|
|
start_dt_utc = start_dt_kst.astimezone(timezone.utc)
|
|
|
|
logs_result = db.table("solar_logs") \
|
|
.select("current_kw, today_kwh, created_at") \
|
|
.eq("plant_id", plant_id) \
|
|
.gte("created_at", start_dt_utc.isoformat()) \
|
|
.order("created_at", desc=False) \
|
|
.execute()
|
|
|
|
# 시간별로 그룹화 (각 시간의 마지막 데이터 사용)
|
|
hourly_data = {}
|
|
for log in logs_result.data:
|
|
created_at = log.get("created_at", "")
|
|
if created_at:
|
|
try:
|
|
# ISO 형식 파싱 (DB에서 UTC로 넘어온다고 가정)
|
|
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
|
|
|
|
# UTC -> KST 변환
|
|
dt_kst = dt.astimezone(kst_timezone)
|
|
|
|
# 오늘 날짜인지 확인
|
|
if dt_kst.date() != today_kst:
|
|
continue
|
|
|
|
hour = dt_kst.hour
|
|
|
|
# 해당 시간대의 마지막 데이터로 갱신 (order가 오름차순이므로 덮어쓰면 됨)
|
|
hourly_data[hour] = {
|
|
"current_kw": log.get("current_kw", 0) or 0,
|
|
"today_kwh": log.get("today_kwh", 0) or 0,
|
|
}
|
|
except ValueError:
|
|
continue
|
|
|
|
# 0시~23시 전체 배열 생성
|
|
result = []
|
|
for hour in range(24):
|
|
data = hourly_data.get(hour, {"current_kw": 0, "today_kwh": 0})
|
|
result.append({
|
|
"hour": hour,
|
|
"label": f"{hour}시",
|
|
"current_kw": round(data["current_kw"], 2),
|
|
"today_kwh": round(data["today_kwh"], 2),
|
|
"has_data": hour in hourly_data
|
|
})
|
|
|
|
return {
|
|
"status": "success",
|
|
"plant_id": plant_id,
|
|
"date": today_str,
|
|
"data": result,
|
|
"count": len([d for d in result if d["has_data"]])
|
|
}
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"시간별 통계 조회 실패: {str(e)}"
|
|
)
|