""" 엑셀 파일 업로드 API - 과거 발전 데이터(Excel)를 업로드하여 daily_stats 테이블에 저장 """ import io from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Depends from supabase import Client import pandas as pd from app.core.database import get_db router = APIRouter( prefix="/upload", tags=["Upload"] ) @router.post("/history") async def upload_history( file: UploadFile = File(..., description="엑셀 파일 (.xlsx, .xls)"), plant_id: str = Form(..., description="발전소 ID (예: nrems-01)"), db: Client = Depends(get_db) ) -> dict: """ 과거 발전 데이터 엑셀 업로드 엑셀 컬럼 형식: - date: 날짜 (YYYY-MM-DD) - generation: 발전량 (kWh) Args: file: 엑셀 파일 plant_id: 발전소 ID Returns: 저장 결과 메시지 """ # 1. 파일 확장자 검증 if not file.filename.endswith(('.xlsx', '.xls')): raise HTTPException( status_code=400, detail="엑셀 파일(.xlsx, .xls)만 업로드 가능합니다." ) # 2. 발전소 정보 조회 (capacity 필요) try: plant_response = db.table("plants") \ .select("id, capacity") \ .eq("id", plant_id) \ .single() \ .execute() if not plant_response.data: raise HTTPException( status_code=404, detail=f"발전소 '{plant_id}'를 찾을 수 없습니다." ) capacity = plant_response.data.get('capacity', 99.0) if capacity <= 0: capacity = 99.0 # 기본값 except HTTPException: raise except Exception as e: raise HTTPException( status_code=500, detail=f"발전소 조회 실패: {str(e)}" ) # 3. 엑셀 파일 읽기 try: contents = await file.read() df = pd.read_excel(io.BytesIO(contents)) # 컬럼 확인 required_columns = ['date', 'generation'] missing_columns = [col for col in required_columns if col not in df.columns] if missing_columns: raise HTTPException( status_code=400, detail=f"필수 컬럼이 없습니다: {missing_columns}. 엑셀에는 'date', 'generation' 컬럼이 필요합니다." ) # 빈 데이터 체크 if df.empty: raise HTTPException( status_code=400, detail="엑셀 파일에 데이터가 없습니다." ) except HTTPException: raise except Exception as e: raise HTTPException( status_code=400, detail=f"엑셀 파일 읽기 실패: {str(e)}" ) # 4. 데이터 변환 및 저장 준비 records = [] errors = [] for idx, row in df.iterrows(): try: # 날짜 파싱 date_val = row['date'] if pd.isna(date_val): errors.append(f"행 {idx+2}: 날짜가 비어있습니다.") continue if isinstance(date_val, str): date_str = date_val.strip() else: date_str = pd.to_datetime(date_val).strftime('%Y-%m-%d') # 발전량 generation = float(row['generation']) if pd.notna(row['generation']) else 0.0 # generation_hours 계산 generation_hours = round(generation / capacity, 2) if capacity > 0 else 0.0 record = { 'plant_id': plant_id, 'date': date_str, 'total_generation': round(generation, 2), 'peak_kw': 0.0, # 과거 데이터에서는 알 수 없음 'generation_hours': generation_hours } records.append(record) except Exception as e: errors.append(f"행 {idx+2}: {str(e)}") if not records: raise HTTPException( status_code=400, detail=f"저장할 유효한 데이터가 없습니다. 오류: {errors[:5]}" ) # 5. DB Upsert try: result = db.table("daily_stats").upsert( records, on_conflict="plant_id,date" ).execute() response_msg = f"총 {len(records)}건의 데이터가 저장되었습니다." if errors: response_msg += f" (오류 {len(errors)}건 스킵)" return { "status": "success", "message": response_msg, "saved_count": len(records), "error_count": len(errors), "errors": errors[:10] if errors else [] # 최대 10개 오류만 반환 } except Exception as e: raise HTTPException( status_code=500, detail=f"DB 저장 실패: {str(e)}" ) @router.post("/plants/{plant_id}/upload") async def upload_plant_data( plant_id: str, file: UploadFile = File(..., description="엑셀 파일 (.xlsx, .xls)"), db: Client = Depends(get_db) ) -> dict: """ 개별 발전소 엑셀 데이터 업로드 엑셀 필수 컬럼: - date: 날짜 (YYYY-MM-DD) - generation: 발전량 (kWh) Args: plant_id: 발전소 ID (URL 경로) file: 엑셀 파일 Returns: 저장 결과 메시지 """ # 1. 파일 확장자 검증 if not file.filename.endswith(('.xlsx', '.xls')): raise HTTPException( status_code=400, detail="엑셀 파일(.xlsx, .xls)만 업로드 가능합니다." ) # 2. 발전소 정보 조회 try: plant_response = db.table("plants") \ .select("id, capacity, name") \ .eq("id", plant_id) \ .single() \ .execute() if not plant_response.data: raise HTTPException( status_code=404, detail=f"발전소 '{plant_id}'를 찾을 수 없습니다." ) capacity = plant_response.data.get('capacity', 99.0) or 99.0 plant_name = plant_response.data.get('name', plant_id) except HTTPException: raise except Exception as e: raise HTTPException( status_code=500, detail=f"발전소 조회 실패: {str(e)}" ) # 3. 엑셀 파일 읽기 try: contents = await file.read() df = pd.read_excel(io.BytesIO(contents)) # 컬럼명 소문자 변환 df.columns = [str(c).lower().strip() for c in df.columns] # 필수 컬럼 확인 if 'date' not in df.columns or 'generation' not in df.columns: raise HTTPException( status_code=400, detail="필수 컬럼 'date', 'generation'이 없습니다." ) if df.empty: raise HTTPException( status_code=400, detail="엑셀 파일에 데이터가 없습니다." ) except HTTPException: raise except Exception as e: raise HTTPException( status_code=400, detail=f"엑셀 파일 읽기 실패: {str(e)}" ) # 4. 데이터 변환 records = [] errors = [] for idx, row in df.iterrows(): try: date_val = row['date'] if pd.isna(date_val): continue date_str = pd.to_datetime(date_val).strftime('%Y-%m-%d') generation = float(row['generation']) if pd.notna(row['generation']) else 0.0 generation_hours = round(generation / capacity, 2) if capacity > 0 else 0.0 records.append({ 'plant_id': plant_id, 'date': date_str, 'total_generation': round(generation, 2), 'peak_kw': 0.0, 'generation_hours': generation_hours }) except Exception as e: errors.append(f"행 {idx+2}: {str(e)}") if not records: raise HTTPException( status_code=400, detail=f"저장할 유효한 데이터가 없습니다." ) # 5. DB Upsert try: db.table("daily_stats").upsert( records, on_conflict="plant_id,date" ).execute() return { "status": "success", "plant_id": plant_id, "plant_name": plant_name, "message": f"{len(records)}건의 데이터가 저장되었습니다.", "saved_count": len(records), "error_count": len(errors) } except Exception as e: raise HTTPException( status_code=500, detail=f"DB 저장 실패: {str(e)}" ) @router.post("/plants/{plant_id}/upload/monthly") async def upload_plant_monthly_data( plant_id: str, file: UploadFile = File(..., description="월간 발전량 엑셀 파일 (year, month, kwh)"), db: Client = Depends(get_db) ) -> dict: """ 월간 발전 데이터 엑셀 업로드 엑셀 컬럼 형식: - year: 연도 (2014년 등, 병합된 셀 지원) - month: 월 (1월, 2월, 합계, 평균 등) - kwh: 발전량 (1,234.56) 기능: - '합계', '평균' 행은 자동으로 무시 - 'year' 컬럼이 비어있으면 이전 행의 값 사용 (ffill) - 'month', 'kwh'의 특수문자(월, 콤마 등) 제거 및 숫자 변환 - monthly_stats 테이블에 저장 """ # 1. 파일 확장자 검증 if not file.filename.endswith(('.xlsx', '.xls')): raise HTTPException( status_code=400, detail="엑셀 파일(.xlsx, .xls)만 업로드 가능합니다." ) # 2. 발전소 존재 확인 try: plant_check = db.table("plants").select("id").eq("id", plant_id).single().execute() if not plant_check.data: raise HTTPException(status_code=404, detail="발전소를 찾을 수 없습니다.") except Exception as e: raise HTTPException(status_code=500, detail=f"발전소 확인 실패: {e}") # 3. 엑셀 파싱 및 전처리 try: contents = await file.read() df = pd.read_excel(io.BytesIO(contents)) # 컬럼명 정규화 (소문자, 공백제거) df.columns = [str(col).lower().strip() for col in df.columns] # 필수 컬럼 체크 # 사용자가 제공한 엑셀은 header가 없을 수도 있고, 1행이 header일 수도 있음. # 일단 year, month, kwh가 포함되어 있다고 가정하거나, 첫 3열을 사용해야 할 수도 있음. # 이미지에서는 header가 명확함 (year, month, kwh). required = {'year', 'month', 'kwh'} if not required.issubset(df.columns): # 혹시 한글 헤더일 경우 매핑 시도 rename_map = {'년도': 'year', '월': 'month', '발전량': 'kwh', '발전량(kwh)': 'kwh'} df.rename(columns=rename_map, inplace=True) if not required.issubset(df.columns): raise HTTPException( status_code=400, detail=f"필수 컬럼(year, month, kwh)이 누락되었습니다. 현재 컬럼: {list(df.columns)}" ) # A열(year) 병합된 셀 처리 (Forward Fill) df['year'] = df['year'].fillna(method='ffill') records = [] errors = [] from datetime import datetime for idx, row in df.iterrows(): try: # 1. Year 파싱 y_raw = str(row['year']).replace('년', '').strip() if not y_raw or y_raw.lower() == 'nan': continue year_val = int(float(y_raw)) # 2014.0 -> 2014 # 2. Month 파싱 m_raw = str(row['month']).strip() if m_raw in ['합계', '평균', 'nan', 'None']: continue # '1월' -> 1 month_val_str = m_raw.replace('월', '').strip() if not month_val_str.isdigit(): continue # '합계' 등이 걸러지지 않은 경우 대비 month_val = int(month_val_str) if not (1 <= month_val <= 12): continue # 3. Kwh 파싱 k_raw = str(row['kwh']).replace(',', '').strip() if not k_raw or k_raw.lower() == 'nan': kwh_val = 0.0 else: kwh_val = float(k_raw) # 포맷: YYYY-MM month_key = f"{year_val}-{month_val:02d}" records.append({ "plant_id": plant_id, "month": month_key, "total_generation": kwh_val, "updated_at": datetime.now().isoformat() }) except Exception as e: errors.append(f"Row {idx}: {e}") if not records: raise HTTPException(status_code=400, detail="유효한 데이터가 없습니다.") # 4. DB Upsert # monthly_stats 테이블 생성 여부 확인이 필요하지만, 이미 되어있다고 가정 res = db.table("monthly_stats").upsert( records, on_conflict="plant_id, month" ).execute() return { "status": "success", "message": f"총 {len(records)}건의 월간 데이터가 업로드되었습니다.", "saved_count": len(records), "errors": errors[:5] } except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=f"처리 중 오류: {str(e)}")