From 665ff4914e35b6f0295a4b550803153216c86634 Mon Sep 17 00:00:00 2001 From: haneulai Date: Wed, 28 Jan 2026 10:16:48 +0900 Subject: [PATCH] feat: enhance stats API for date navigation and support new excel upload format --- app/routers/stats.py | 169 ++++++++++++++++++++---------------------- app/routers/upload.py | 60 ++++++++++++++- 2 files changed, 136 insertions(+), 93 deletions(-) diff --git a/app/routers/stats.py b/app/routers/stats.py index 86dda24..c7f679d 100644 --- a/app/routers/stats.py +++ b/app/routers/stats.py @@ -20,46 +20,41 @@ router = APIRouter( async def get_plant_stats( plant_id: str, period: Literal["day", "month", "year"] = Query("day", description="통계 기간"), - year: int = Query(None, description="특정 연도 (월별/연도별 조회 시)"), + year: int = Query(None, description="특정 연도"), + month: int = Query(None, description="특정 월 (period='day' 시 필수)"), db: Client = Depends(get_db) ) -> dict: """ 발전소 통계 조회 (Hybrid 방식) - 1. daily_stats: 과거 데이터 조회 - 2. solar_logs: 오늘 실시간 데이터 조회 - 3. 병합: 오늘 날짜 데이터는 실시간 데이터 우선 사용 - - Args: - plant_id: 발전소 ID - period: 'day' (최근 30일), 'month' (최근 12개월 또는 특정 연도), 'year' (최근 연도들) - year: 특정 연도 (옵션, period='month' 시 해당 연도의 월별 데이터 반환) - - Returns: - 차트 라이브러리 친화적 포맷 [{"label": "...", "value": ...}, ...] """ try: today = datetime.now().date() today_str = today.isoformat() - # 1. 과거 데이터 조회 (daily_stats) # 1. 과거 데이터 조회 (period에 따라 테이블 분기) stats_data_raw = [] is_monthly_source = False if period == "day": # [일별 조회] daily_stats 테이블 사용 - # 이번 달 1일부터 오늘까지 - start_date = today.replace(day=1) - date_filter = start_date.isoformat() + # 특정 연/월의 1일 ~ 말일 조회 + target_year = year if year else today.year + target_month = month if month else today.month + + # 해당 월의 시작일과 마지막일 계산 + import calendar + last_day = calendar.monthrange(target_year, target_month)[1] + + start_date_str = f"{target_year}-{target_month:02d}-01" + end_date_str = f"{target_year}-{target_month:02d}-{last_day}" stats_query = db.table("daily_stats") \ .select("date, total_generation") \ .eq("plant_id", plant_id) \ - .gte("date", date_filter) \ - .lte("date", today_str) \ + .gte("date", start_date_str) \ + .lte("date", end_date_str) \ .order("date", desc=False) - # 페이지네이션 등 없이 심플하게 (일별은 데이터 적음) stats_data_raw = stats_query.execute().data else: @@ -70,7 +65,7 @@ async def get_plant_stats( # year 파라미터가 있으면 해당 연도 1월~12월, 없으면 올해 target_year = year if year else today.year start_month = f"{target_year}-01" - end_month = f"{target_year}-12" # 문자열 비교라 12월도 포함됨 + end_month = f"{target_year}-12" else: # year # year 파라미터가 있으면 해당 연도부터 if year: @@ -90,8 +85,6 @@ async def get_plant_stats( stats_data_raw = stats_query.execute().data # 데이터 맵핑 {날짜키: 발전량} - # daily_stats: key='date' (YYYY-MM-DD) - # monthly_stats: key='month' (YYYY-MM) data_map = {} for row in stats_data_raw: key = row.get("month") if is_monthly_source else row.get("date") @@ -99,62 +92,61 @@ async def get_plant_stats( if key: data_map[key] = val - # 2. 오늘 실시간 데이터 조회 (solar_logs) - # period='day'일 때만 합산 (monthly/year는 monthly_stats가 이미 갱신되었다고 가정하거나, 필요시 로직 추가) - # 하지만 monthly_stats가 '어제까지'의 합계일 수 있으므로, '이번 달' 데이터에는 오늘 발전량을 더해주는 게 안전함. - # 그러나 로직 복잡성을 피하기 위해, 크롤러가 실시간으로 monthly_stats를 갱신한다고 가정하고 여기선 생략 가능. - # 기존 로직 유지: 'day'일 때는 무조건 덮어쓰기. - - today_generation = 0.0 - # 일별 조회 시 오늘 데이터 덮어쓰기 + # 2. 오늘 실시간 데이터 조회 (solar_logs) -> period='day'이고 '오늘'이 포함된 달이면 현재값 업데이트 + # (생략 가능하지만, 오늘 날짜가 조회 범위에 포함되면 최신값 반영) if period == "day": - logs_result = db.table("solar_logs") \ - .select("today_kwh") \ - .eq("plant_id", plant_id) \ - .gte("created_at", f"{today_str}T00:00:00") \ - .order("created_at", desc=True) \ - .limit(1) \ - .execute() - - if logs_result.data: - today_generation = logs_result.data[0].get("today_kwh", 0.0) - if today_generation > 0: - data_map[today_str] = today_generation - - # 월별/연도별 조회 시: '이번 달' 키에 오늘 발전량을 더해야 하는가? - # 마이그레이션 스크립트는 daily_stats의 합을 넣었으므로 오늘 데이터도 포함됨. - # 크롤러도 실시간으로 daily 넣으면서 monthly upsert 할 예정. - # 따라서 별도 합산 불필요. + # 조회 중인 달이 이번 달인지 확인 + if (not year or year == today.year) and (not month or month == today.month): + logs_result = db.table("solar_logs") \ + .select("today_kwh") \ + .eq("plant_id", plant_id) \ + .gte("created_at", f"{today_str}T00:00:00") \ + .order("created_at", desc=True) \ + .limit(1) \ + .execute() + + if logs_result.data: + today_generation = logs_result.data[0].get("today_kwh", 0.0) + if today_generation > 0: + data_map[today_str] = today_generation # 4. 포맷팅 및 집계 data = [] if period == "day": - # 최근 30일 일별 데이터 생성 (누락된 날짜는 0으로 채움) - current = start_date - while current <= today: - d_str = current.isoformat() + # 해당 월의 모든 날짜 생성 + target_year = year if year else today.year + target_month = month if month else today.month + last_day = calendar.monthrange(target_year, target_month)[1] + + # 미래 날짜는 제외해야 하나? UI에서 처리하거나 null로? + # 일단 0으로 채움. + + for d in range(1, last_day + 1): + d_str = f"{target_year}-{target_month:02d}-{d:02d}" + # 미래 날짜 처리: 오늘 이후라면 데이터가 없을 것임 (0). + # 하지만 '오늘'까지는 표시. data.append({ "label": d_str, "value": round(data_map.get(d_str, 0), 2) }) - current += timedelta(days=1) elif period == "month": # 월별 집계 monthly = {} - # daily_stats + solar_logs(오늘) 데이터로 집계 for date_str, val in data_map.items(): - month_key = date_str[:7] + # date_str이 'YYYY-MM-DD' 형식이면 YYYY-MM 추출 + # 근데 monthly_stats 사용 시 이미 YYYY-MM + if is_monthly_source: + month_key = date_str + else: + month_key = date_str[:7] monthly[month_key] = monthly.get(month_key, 0) + val - # 특정 연도의 1~12월 데이터 생성 (누락된 월은 0) + # 특정 연도의 1~12월 데이터 생성 target_year = year if year else today.year - for month in range(1, 13): - month_key = f"{target_year}-{month:02d}" - # 미래 월은 제외 - if datetime.strptime(month_key, "%Y-%m").date() > today.replace(day=1): - break + for m in range(1, 13): + month_key = f"{target_year}-{m:02d}" data.append({ "label": month_key, "value": round(monthly.get(month_key, 0), 2) @@ -167,8 +159,6 @@ async def get_plant_stats( year_key = date_str[:4] yearly[year_key] = yearly.get(year_key, 0) + val - # 최근 10년 (또는 지정된 기간) 연도별 데이터 생성 (데이터 없으면 0) - data = [] if year: target_start_year = year else: @@ -187,8 +177,7 @@ async def get_plant_stats( "plant_id": plant_id, "period": period, "data": data, - "count": len(data), - "today_realtime_kwh": today_generation # 디버깅용 + "count": len(data) } except HTTPException: @@ -203,57 +192,58 @@ async def get_plant_stats( @router.get("/{plant_id}/stats/today") async def get_plant_hourly_stats( plant_id: str, + date: str = Query(None, description="조회 날짜 (YYYY-MM-DD)"), db: Client = Depends(get_db) ) -> dict: """ - 오늘 시간별 발전 데이터 조회 (solar_logs 기반) - - Args: - plant_id: 발전소 ID - - Returns: - 시간별 데이터 [{"hour": 0, "current_kw": ..., "today_kwh": ...}, ...] + 특정 날짜의 시간별 발전 데이터 조회 (solar_logs 기반) + Defaults to today if date is not provided. """ try: # KST (UTC+9) 시간대 설정 kst_timezone = timezone(timedelta(hours=9)) - today_kst = datetime.now(kst_timezone).date() - today_str = today_kst.isoformat() - # 오늘의 모든 solar_logs 조회 (UTC 기준으로는 전날 15:00부터일 수 있으므로 넉넉하게 조회) - # 하지만 간단하게 '오늘' 날짜 문자열 필터링이 가장 안전 (DB가 UTC라면 보정 필요) - # 여기서는 created_at이 timestamptz라고 가정하고 단순 문자열 비교보다는 날짜 필터를 사용 + if date: + try: + target_date = datetime.strptime(date, "%Y-%m-%d").date() + except ValueError: + raise HTTPException(status_code=400, detail="Invalid date format. Use YYYY-MM-DD") + else: + target_date = datetime.now(kst_timezone).date() + + target_date_str = target_date.isoformat() - # KST 00:00은 UTC로 전날 15:00 - start_dt_kst = datetime.combine(today_kst, datetime.min.time()).replace(tzinfo=kst_timezone) - start_dt_utc = start_dt_kst.astimezone(timezone.utc) + # 조회 범위: 해당 일 00:00:00 ~ 23:59:59 (KST) + from_dt = datetime.combine(target_date, datetime.min.time()).replace(tzinfo=kst_timezone) + to_dt = datetime.combine(target_date, datetime.max.time()).replace(tzinfo=kst_timezone) + + # 전조치: UTC로 변환하여 쿼리 (Supabase DB가 UTC라고 가정) + # 하지만 solar_logs는 created_at이 timestamptz로 저장되어 있을 것임. + # 안전하게는 UTC 시간으로 필터링 + from_utc = from_dt.astimezone(timezone.utc) + to_utc = to_dt.astimezone(timezone.utc) logs_result = db.table("solar_logs") \ .select("current_kw, today_kwh, created_at") \ .eq("plant_id", plant_id) \ - .gte("created_at", start_dt_utc.isoformat()) \ + .gte("created_at", from_utc.isoformat()) \ + .lte("created_at", to_utc.isoformat()) \ .order("created_at", desc=False) \ .execute() - # 시간별로 그룹화 (각 시간의 마지막 데이터 사용) + # 시간별로 그룹화 hourly_data = {} for log in logs_result.data: created_at = log.get("created_at", "") if created_at: try: - # ISO 형식 파싱 (DB에서 UTC로 넘어온다고 가정) dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) - - # UTC -> KST 변환 dt_kst = dt.astimezone(kst_timezone) - # 오늘 날짜인지 확인 - if dt_kst.date() != today_kst: + if dt_kst.date() != target_date: continue hour = dt_kst.hour - - # 해당 시간대의 마지막 데이터로 갱신 (order가 오름차순이므로 덮어쓰면 됨) hourly_data[hour] = { "current_kw": log.get("current_kw", 0) or 0, "today_kwh": log.get("today_kwh", 0) or 0, @@ -261,7 +251,6 @@ async def get_plant_hourly_stats( except ValueError: continue - # 0시~23시 전체 배열 생성 result = [] for hour in range(24): data = hourly_data.get(hour, {"current_kw": 0, "today_kwh": 0}) @@ -276,7 +265,7 @@ async def get_plant_hourly_stats( return { "status": "success", "plant_id": plant_id, - "date": today_str, + "date": target_date_str, "data": result, "count": len([d for d in result if d["has_data"]]) } diff --git a/app/routers/upload.py b/app/routers/upload.py index 8aa7f58..ea569c2 100644 --- a/app/routers/upload.py +++ b/app/routers/upload.py @@ -225,13 +225,67 @@ async def upload_plant_data( # 컬럼명 소문자 변환 df.columns = [str(c).lower().strip() for c in df.columns] - # 필수 컬럼 확인 - if 'date' not in df.columns or 'generation' not in df.columns: + # 컬럼 매핑 (별칭 지원) + # date: date, 일자, 날짜 + # generation: generation, kwh, 발전량, 발전량(kwh) + # year/month/day: year, month, day, 년, 월, 일, 연도 + + col_map = {} + for col in df.columns: + if col in ['date', '일자', '날짜']: + col_map['date'] = col + elif col in ['generation', 'kwh', '발전량', '발전량(kwh)']: + col_map['generation'] = col + elif col in ['year', '년', '년도', '연도']: + col_map['year'] = col + elif col in ['month', '월']: + col_map['month'] = col + elif col in ['day', '일']: + col_map['day'] = col + + # 전략 1: date, generation 컬럼이 있는 경우 + if 'date' in col_map and 'generation' in col_map: + df.rename(columns={col_map['date']: 'date', col_map['generation']: 'generation'}, inplace=True) + + # 전략 2: year, month, day, generation(kwh) 컬럼이 있는 경우 (New Strategy) + elif all(k in col_map for k in ['year', 'month', 'day', 'generation']): + rename_dict = { + col_map['year']: 'year', + col_map['month']: 'month', + col_map['day']: 'day', + col_map['generation']: 'generation' + } + df.rename(columns=rename_dict, inplace=True) + + # 병합된 셀 처리 (ffill) + df['year'] = df['year'].fillna(method='ffill') + df['month'] = df['month'].fillna(method='ffill') + + # 날짜 생성 함수 + def make_date(row): + try: + y = int(float(str(row['year']).replace('년','').strip())) + m = int(float(str(row['month']).replace('월','').strip())) + d = int(float(str(row['day']).replace('일','').strip())) + return f"{y:04d}-{m:02d}-{d:02d}" + except: + return None + + df['date'] = df.apply(make_date, axis=1) + + else: raise HTTPException( status_code=400, - detail="필수 컬럼 'date', 'generation'이 없습니다." + detail="필수 컬럼이 없습니다. (date, generation) 또는 (year, month, day, kwh) 형식이 필요합니다." ) + # date 컬럼 유효성 재확인 + if 'date' not in df.columns: + raise HTTPException( + status_code=400, + detail="날짜 정보를 파싱할 수 없습니다." + ) + if df.empty: raise HTTPException( status_code=400,