solorpower_server/app/routers/stats.py

469 lines
18 KiB
Python

"""
통계 조회 API
- 일별/월별/연도별 발전량 통계
"""
from fastapi import APIRouter, HTTPException, Depends, Query
from supabase import Client
from typing import List, Literal, Optional
from datetime import datetime, timedelta, timezone
import calendar
import re
from app.core.database import get_db
router = APIRouter(
prefix="/plants",
tags=["Stats"]
)
@router.get("/stats/comparison")
async def get_all_plants_comparison(
period: Literal["day", "month", "year"] = Query("day", description="통계 기간"),
date: Optional[str] = Query(None, description="날짜 (YYYY-MM-DD)"),
year: Optional[int] = Query(None, description="연도"),
month: Optional[int] = Query(None, description=""),
db: Client = Depends(get_db)
) -> dict:
"""
전체 발전소 발전량 비교 통계 조회
"""
try:
# 1. 모든 발전소 기본 정보 조회 (이름, 용량)
plants_res = db.table("plants").select("id, name, capacity").execute()
plants = {p['id']: p for p in plants_res.data}
# 결과 초기화
result_data = []
# 날짜 파라미터 처리
# KST 시간대 고려
kst_timezone = timezone(timedelta(hours=9))
now_kst = datetime.now(kst_timezone)
today = now_kst.date()
target_date = None
if date:
try:
target_date = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError:
target_date = today
else:
target_date = today
target_year = year if year else target_date.year
target_month = month if month else target_date.month
# 데이터 조회 로직
stats_map = {} # plant_id -> generation
if period == "day":
date_str = target_date.isoformat()
# (A) 오늘 날짜인 경우: solar_logs 최신값 (실시간)
# 서버 시간대와 클라이언트 요청 날짜 일치 여부 확인
if target_date == today:
# 오늘 00:00:00 (KST) 이후 데이터 조회
start_dt = f"{date_str}T00:00:00"
# solar_logs에서 오늘 생성된 데이터 조회
logs_res = db.table("solar_logs") \
.select("plant_id, today_kwh") \
.gte("created_at", start_dt) \
.order("created_at", desc=True) \
.execute()
# plant_id별로 첫 번째(최신) 값만 취함
seen_plants = set()
for log in logs_res.data:
pid = log['plant_id']
if pid not in seen_plants:
val = log.get('today_kwh', 0)
if val is None: val = 0
stats_map[pid] = val
seen_plants.add(pid)
else:
# 과거: daily_stats 조회
daily_res = db.table("daily_stats") \
.select("plant_id, total_generation") \
.eq("date", date_str) \
.execute()
for row in daily_res.data:
val = row.get('total_generation', 0)
if val is None: val = 0
stats_map[row['plant_id']] = val
elif period == "month":
# 월간: monthly_stats 조회 (해당 월)
month_str = f"{target_year}-{target_month:02d}"
# 1. monthly_stats 조회
monthly_res = db.table("monthly_stats") \
.select("plant_id, total_generation") \
.eq("month", month_str) \
.execute()
for row in monthly_res.data:
val = row.get('total_generation', 0)
if val is None: val = 0
stats_map[row['plant_id']] = val
# 2. 이번 달이거나 데이터가 부족하면 daily_stats 합산 시도
is_current_month = (target_year == today.year and target_month == today.month)
has_missing_data = not stats_map or all(v == 0 for v in stats_map.values())
if is_current_month or has_missing_data:
last_day = calendar.monthrange(target_year, target_month)[1]
start_date = f"{target_year}-{target_month:02d}-01"
end_date = f"{target_year}-{target_month:02d}-{last_day}"
daily_res = db.table("daily_stats") \
.select("plant_id, total_generation") \
.gte("date", start_date) \
.lte("date", end_date) \
.execute()
temp_map = {}
for row in daily_res.data:
pid = row['plant_id']
val = row.get('total_generation', 0)
if val is None: val = 0
temp_map[pid] = temp_map.get(pid, 0) + val
# 합산된 값이 있으면 업데이트
for pid, val in temp_map.items():
if val > stats_map.get(pid, 0):
stats_map[pid] = val
elif period == "year":
# 연간: monthly_stats 조회 (해당 연도 전체 합산)
start_month = f"{target_year}-01"
end_month = f"{target_year}-12"
monthly_res = db.table("monthly_stats") \
.select("plant_id, total_generation") \
.gte("month", start_month) \
.lte("month", end_month) \
.execute()
# 합산
for row in monthly_res.data:
pid = row['plant_id']
val = row.get('total_generation', 0)
if val is None: val = 0
stats_map[pid] = stats_map.get(pid, 0) + val
# 결과 조합
for pid, p_info in plants.items():
gen = stats_map.get(pid, 0) or 0
cap = p_info.get('capacity', 0) or 0
# 발전시간 계산
gen_hours = 0
if cap > 0:
if period == "day":
gen_hours = gen / cap
elif period == "month":
# 해당 월의 일수 (일평균 발전시간)
days_in_month = calendar.monthrange(target_year, target_month)[1]
gen_hours = (gen / cap) / days_in_month
elif period == "year":
# 연간 일평균 발전시간 (/365)
gen_hours = (gen / cap) / 365
result_data.append({
"plant_id": pid,
"plant_name": p_info['name'],
"capacity": cap,
"generation": round(gen, 2),
"generation_hours": round(gen_hours, 2)
})
# 발전소 이름 순 정렬 (호기 추출)
def sort_key(item):
match = re.search(r'(\d+)호기', item['plant_name'])
if match:
return int(match.group(1))
return 999
result_data.sort(key=sort_key)
return {
"status": "success",
"period": period,
"target_date": target_date.isoformat(),
"data": result_data,
"count": len(result_data)
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"전체 통계 조회 실패: {str(e)}"
)
@router.get("/{plant_id}/stats")
async def get_plant_stats(
plant_id: str,
period: Literal["day", "month", "year"] = Query("day", description="통계 기간"),
year: int = Query(None, description="특정 연도"),
month: int = Query(None, description="특정 월 (period='day' 시 필수)"),
db: Client = Depends(get_db)
) -> dict:
"""
발전소 통계 조회 (Hybrid 방식)
"""
try:
today = datetime.now().date()
today_str = today.isoformat()
# 1. 과거 데이터 조회 (period에 따라 테이블 분기)
stats_data_raw = []
is_monthly_source = False
if period == "day":
# [일별 조회] daily_stats 테이블 사용
# 특정 연/월의 1일 ~ 말일 조회
target_year = year if year else today.year
target_month = month if month else today.month
# 해당 월의 시작일과 마지막일 계산
import calendar
last_day = calendar.monthrange(target_year, target_month)[1]
start_date_str = f"{target_year}-{target_month:02d}-01"
end_date_str = f"{target_year}-{target_month:02d}-{last_day}"
stats_query = db.table("daily_stats") \
.select("date, total_generation") \
.eq("plant_id", plant_id) \
.gte("date", start_date_str) \
.lte("date", end_date_str) \
.order("date", desc=False)
stats_data_raw = stats_query.execute().data
else:
# [월별/연도별 조회] monthly_stats 테이블 사용
is_monthly_source = True
if period == "month":
# year 파라미터가 있으면 해당 연도 1월~12월, 없으면 올해
target_year = year if year else today.year
start_month = f"{target_year}-01"
end_month = f"{target_year}-12"
else: # year
# year 파라미터가 있으면 해당 연도 포함 최근 5년치 조회
if year:
start_year = year - 4
else:
start_year = today.year - 9 # 10년치
start_month = f"{start_year}-01"
end_month = f"{today.year}-12"
stats_query = db.table("monthly_stats") \
.select("month, total_generation") \
.eq("plant_id", plant_id) \
.gte("month", start_month) \
.lte("month", end_month) \
.order("month", desc=False)
stats_data_raw = stats_query.execute().data
# 데이터 맵핑 {날짜키: 발전량}
data_map = {}
for row in stats_data_raw:
key = row.get("month") if is_monthly_source else row.get("date")
val = row.get("total_generation") or 0
if key:
data_map[key] = val
# 2. 오늘 실시간 데이터 조회 (solar_logs) -> period='day'이고 '오늘'이 포함된 달이면 현재값 업데이트
# (생략 가능하지만, 오늘 날짜가 조회 범위에 포함되면 최신값 반영)
if period == "day":
# 조회 중인 달이 이번 달인지 확인
if (not year or year == today.year) and (not month or month == today.month):
logs_result = db.table("solar_logs") \
.select("today_kwh") \
.eq("plant_id", plant_id) \
.gte("created_at", f"{today_str}T00:00:00") \
.order("created_at", desc=True) \
.limit(1) \
.execute()
if logs_result.data:
today_generation = logs_result.data[0].get("today_kwh", 0.0)
if today_generation > 0:
data_map[today_str] = today_generation
# 4. 포맷팅 및 집계
data = []
if period == "day":
# 해당 월의 모든 날짜 생성
target_year = year if year else today.year
target_month = month if month else today.month
last_day = calendar.monthrange(target_year, target_month)[1]
# 미래 날짜는 제외해야 하나? UI에서 처리하거나 null로?
# 일단 0으로 채움.
for d in range(1, last_day + 1):
d_str = f"{target_year}-{target_month:02d}-{d:02d}"
# 미래 날짜 처리: 오늘 이후라면 데이터가 없을 것임 (0).
# 하지만 '오늘'까지는 표시.
data.append({
"label": d_str,
"value": round(data_map.get(d_str, 0), 2)
})
elif period == "month":
# 월별 집계
monthly = {}
for date_str, val in data_map.items():
# date_str이 'YYYY-MM-DD' 형식이면 YYYY-MM 추출
# 근데 monthly_stats 사용 시 이미 YYYY-MM
if is_monthly_source:
month_key = date_str
else:
month_key = date_str[:7]
monthly[month_key] = monthly.get(month_key, 0) + val
# 특정 연도의 1~12월 데이터 생성
target_year = year if year else today.year
for m in range(1, 13):
month_key = f"{target_year}-{m:02d}"
data.append({
"label": month_key,
"value": round(monthly.get(month_key, 0), 2)
})
elif period == "year":
# 연도별 집계
yearly = {}
for date_str, val in data_map.items():
year_key = date_str[:4]
yearly[year_key] = yearly.get(year_key, 0) + val
# year 파라미터가 있으면 해당 연도 기준 최근 5년 표시 (예: 2026 선택 -> 2022~2026)
if year:
end_year = year
start_year = year - 4
else:
end_year = today.year
start_year = today.year - 9
for y in range(start_year, end_year + 1):
y_str = str(y)
data.append({
"label": y_str,
"value": round(yearly.get(y_str, 0), 2)
})
return {
"status": "success",
"plant_id": plant_id,
"period": period,
"data": data,
"count": len(data)
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"통계 조회 실패: {str(e)}"
)
@router.get("/{plant_id}/stats/today")
async def get_plant_hourly_stats(
plant_id: str,
date: str = Query(None, description="조회 날짜 (YYYY-MM-DD)"),
db: Client = Depends(get_db)
) -> dict:
"""
특정 날짜의 시간별 발전 데이터 조회 (solar_logs 기반)
Defaults to today if date is not provided.
"""
try:
# KST (UTC+9) 시간대 설정
kst_timezone = timezone(timedelta(hours=9))
if date:
try:
target_date = datetime.strptime(date, "%Y-%m-%d").date()
except ValueError:
raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD")
else:
target_date = datetime.now(kst_timezone).date()
target_date_str = target_date.isoformat()
# 조회 범위: 해당 일 00:00:00 ~ 23:59:59 (KST)
from_dt = datetime.combine(target_date, datetime.min.time()).replace(tzinfo=kst_timezone)
to_dt = datetime.combine(target_date, datetime.max.time()).replace(tzinfo=kst_timezone)
# 전조치: UTC로 변환하여 쿼리 (Supabase DB가 UTC라고 가정)
# 하지만 solar_logs는 created_at이 timestamptz로 저장되어 있을 것임.
# 안전하게는 UTC 시간으로 필터링
from_utc = from_dt.astimezone(timezone.utc)
to_utc = to_dt.astimezone(timezone.utc)
logs_result = db.table("solar_logs") \
.select("current_kw, today_kwh, created_at") \
.eq("plant_id", plant_id) \
.gte("created_at", from_utc.isoformat()) \
.lte("created_at", to_utc.isoformat()) \
.order("created_at", desc=False) \
.execute()
# 시간별로 그룹화
hourly_data = {}
for log in logs_result.data:
created_at = log.get("created_at", "")
if created_at:
try:
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
dt_kst = dt.astimezone(kst_timezone)
if dt_kst.date() != target_date:
continue
hour = dt_kst.hour
hourly_data[hour] = {
"current_kw": log.get("current_kw", 0) or 0,
"today_kwh": log.get("today_kwh", 0) or 0,
}
except ValueError:
continue
result = []
for hour in range(24):
data = hourly_data.get(hour, {"current_kw": 0, "today_kwh": 0})
result.append({
"hour": hour,
"label": f"{hour}",
"current_kw": round(data["current_kw"], 2),
"today_kwh": round(data["today_kwh"], 2),
"has_data": hour in hourly_data
})
return {
"status": "success",
"plant_id": plant_id,
"date": target_date_str,
"data": result,
"count": len([d for d in result if d["has_data"]])
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"시간별 통계 조회 실패: {str(e)}"
)