feat: Update stats API to include real-time crawled data

This commit is contained in:
haneulai 2026-01-23 16:45:37 +09:00
parent f4e7e8b03e
commit a7a83e8792

View File

@ -23,7 +23,10 @@ async def get_plant_stats(
db: Client = Depends(get_db) db: Client = Depends(get_db)
) -> dict: ) -> dict:
""" """
발전소 통계 조회 발전소 통계 조회 (Hybrid 방식)
1. daily_stats: 과거 데이터 조회
2. solar_logs: 오늘 실시간 데이터 조회
3. 병합: 오늘 날짜 데이터는 실시간 데이터 우선 사용
Args: Args:
plant_id: 발전소 ID plant_id: 발전소 ID
@ -34,80 +37,100 @@ async def get_plant_stats(
""" """
try: try:
today = datetime.now().date() today = datetime.now().date()
today_str = today.isoformat()
# 1. 과거 데이터 조회 (daily_stats)
if period == "day":
start_date = today - timedelta(days=30)
date_filter = start_date.isoformat()
elif period == "month":
start_date = today.replace(day=1) - timedelta(days=365)
date_filter = start_date.isoformat()
else: # year
start_date = datetime(today.year - 5, 1, 1).date()
date_filter = start_date.strftime("%Y-%m-%d")
stats_query = db.table("daily_stats") \
.select("date, total_generation") \
.eq("plant_id", plant_id) \
.gte("date", date_filter) \
.lte("date", today_str) \
.order("date", desc=False)
stats_result = stats_query.execute()
# 데이터 맵핑 {날짜: 발전량}
data_map = {row["date"]: row["total_generation"] or 0 for row in stats_result.data}
# 2. 오늘 실시간 데이터 조회 (solar_logs)
# 오늘의 가장 마지막 기록 1건만 조회 (성능 최적화)
logs_result = db.table("solar_logs") \
.select("today_kwh, created_at") \
.eq("plant_id", plant_id) \
.gte("created_at", f"{today_str}T00:00:00") \
.order("created_at", desc=True) \
.limit(1) \
.execute()
today_generation = 0.0
if logs_result.data:
today_generation = logs_result.data[0].get("today_kwh", 0.0)
# 3. 데이터 병합 (오늘 데이터 갱신/추가)
# solar_logs 값이 있으면 무조건 daily_stats 값보다 우선 (실시간성)
if today_generation > 0:
data_map[today_str] = today_generation
# 4. 포맷팅 및 집계
data = []
if period == "day": if period == "day":
# 최근 30일 # 최근 30일 일별 데이터 생성 (누락된 날짜는 0으로 채움)
start_date = today - timedelta(days=30) current = start_date
while current <= today:
result = db.table("daily_stats") \ d_str = current.isoformat()
.select("date, total_generation") \ data.append({
.eq("plant_id", plant_id) \ "label": d_str,
.gte("date", start_date.isoformat()) \ "value": round(data_map.get(d_str, 0), 2)
.lte("date", today.isoformat()) \ })
.order("date", desc=False) \ current += timedelta(days=1)
.execute()
data = [
{"label": row["date"], "value": row["total_generation"] or 0}
for row in result.data
]
elif period == "month": elif period == "month":
# 최근 12개월 - 월별 합계
start_date = today.replace(day=1) - timedelta(days=365)
result = db.table("daily_stats") \
.select("date, total_generation") \
.eq("plant_id", plant_id) \
.gte("date", start_date.isoformat()) \
.lte("date", today.isoformat()) \
.execute()
# 월별 집계 # 월별 집계
monthly = {} monthly = {}
for row in result.data: # daily_stats + solar_logs(오늘) 데이터로 집계
date_str = row["date"] for date_str, val in data_map.items():
month_key = date_str[:7] # YYYY-MM month_key = date_str[:7]
generation = row["total_generation"] or 0 monthly[month_key] = monthly.get(month_key, 0) + val
monthly[month_key] = monthly.get(month_key, 0) + generation
sorted_keys = sorted(monthly.keys())
data = [ data = [
{"label": month, "value": round(value, 2)} {"label": k, "value": round(monthly[k], 2)}
for month, value in sorted(monthly.items()) for k in sorted_keys
][-12:] # 최근 12개월만 if k >= start_date.strftime("%Y-%m")
]
elif period == "year": elif period == "year":
# 최근 5년 - 연도별 합계
start_year = today.year - 5
result = db.table("daily_stats") \
.select("date, total_generation") \
.eq("plant_id", plant_id) \
.gte("date", f"{start_year}-01-01") \
.lte("date", today.isoformat()) \
.execute()
# 연도별 집계 # 연도별 집계
yearly = {} yearly = {}
for row in result.data: for date_str, val in data_map.items():
date_str = row["date"] year_key = date_str[:4]
year_key = date_str[:4] # YYYY yearly[year_key] = yearly.get(year_key, 0) + val
generation = row["total_generation"] or 0
yearly[year_key] = yearly.get(year_key, 0) + generation
sorted_keys = sorted(yearly.keys())
data = [ data = [
{"label": year, "value": round(value, 2)} {"label": k, "value": round(yearly[k], 2)}
for year, value in sorted(yearly.items()) for k in sorted_keys
if k >= str(start_date.year)
] ]
else:
raise HTTPException(status_code=400, detail="Invalid period parameter")
return { return {
"status": "success", "status": "success",
"plant_id": plant_id, "plant_id": plant_id,
"period": period, "period": period,
"data": data, "data": data,
"count": len(data) "count": len(data),
"today_realtime_kwh": today_generation # 디버깅용
} }
except HTTPException: except HTTPException: