solorpower_server/app/routers/upload.py

519 lines
17 KiB
Python

"""
엑셀 파일 업로드 API
- 과거 발전 데이터(Excel)를 업로드하여 daily_stats 테이블에 저장
"""
import io
from fastapi import APIRouter, UploadFile, File, Form, HTTPException, Depends
from supabase import Client
import pandas as pd
from app.core.database import get_db
router = APIRouter(
tags=["Upload"]
)
@router.post("/upload/history")
async def upload_history(
file: UploadFile = File(..., description="엑셀 파일 (.xlsx, .xls)"),
plant_id: str = Form(..., description="발전소 ID (예: nrems-01)"),
db: Client = Depends(get_db)
) -> dict:
"""
과거 발전 데이터 엑셀 업로드
엑셀 컬럼 형식:
- date: 날짜 (YYYY-MM-DD)
- generation: 발전량 (kWh)
Args:
file: 엑셀 파일
plant_id: 발전소 ID
Returns:
저장 결과 메시지
"""
# 1. 파일 확장자 검증
if not file.filename.endswith(('.xlsx', '.xls')):
raise HTTPException(
status_code=400,
detail="엑셀 파일(.xlsx, .xls)만 업로드 가능합니다."
)
# 2. 발전소 정보 조회 (capacity 필요)
try:
plant_response = db.table("plants") \
.select("id, capacity") \
.eq("id", plant_id) \
.single() \
.execute()
if not plant_response.data:
raise HTTPException(
status_code=404,
detail=f"발전소 '{plant_id}'를 찾을 수 없습니다."
)
capacity = plant_response.data.get('capacity', 99.0)
if capacity <= 0:
capacity = 99.0 # 기본값
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"발전소 조회 실패: {str(e)}"
)
# 3. 엑셀 파일 읽기
try:
contents = await file.read()
df = pd.read_excel(io.BytesIO(contents))
# 컬럼 확인
required_columns = ['date', 'generation']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise HTTPException(
status_code=400,
detail=f"필수 컬럼이 없습니다: {missing_columns}. 엑셀에는 'date', 'generation' 컬럼이 필요합니다."
)
# 빈 데이터 체크
if df.empty:
raise HTTPException(
status_code=400,
detail="엑셀 파일에 데이터가 없습니다."
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"엑셀 파일 읽기 실패: {str(e)}"
)
# 4. 데이터 변환 및 저장 준비
records = []
errors = []
for idx, row in df.iterrows():
try:
# 날짜 파싱
date_val = row['date']
if pd.isna(date_val):
errors.append(f"{idx+2}: 날짜가 비어있습니다.")
continue
if isinstance(date_val, str):
date_str = date_val.strip()
else:
date_str = pd.to_datetime(date_val).strftime('%Y-%m-%d')
# 발전량
generation = float(row['generation']) if pd.notna(row['generation']) else 0.0
# generation_hours 계산
generation_hours = round(generation / capacity, 2) if capacity > 0 else 0.0
record = {
'plant_id': plant_id,
'date': date_str,
'total_generation': round(generation, 2),
'peak_kw': 0.0, # 과거 데이터에서는 알 수 없음
'generation_hours': generation_hours
}
records.append(record)
except Exception as e:
errors.append(f"{idx+2}: {str(e)}")
if not records:
raise HTTPException(
status_code=400,
detail=f"저장할 유효한 데이터가 없습니다. 오류: {errors[:5]}"
)
# 5. DB Upsert
try:
result = db.table("daily_stats").upsert(
records,
on_conflict="plant_id,date"
).execute()
response_msg = f"{len(records)}건의 데이터가 저장되었습니다."
if errors:
response_msg += f" (오류 {len(errors)}건 스킵)"
return {
"status": "success",
"message": response_msg,
"saved_count": len(records),
"error_count": len(errors),
"errors": errors[:10] if errors else [] # 최대 10개 오류만 반환
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"DB 저장 실패: {str(e)}"
)
@router.post("/plants/{plant_id}/upload")
async def upload_plant_data(
plant_id: str,
file: UploadFile = File(..., description="엑셀 파일 (.xlsx, .xls)"),
db: Client = Depends(get_db)
) -> dict:
"""
개별 발전소 엑셀 데이터 업로드
엑셀 필수 컬럼:
- date: 날짜 (YYYY-MM-DD)
- generation: 발전량 (kWh)
Args:
plant_id: 발전소 ID (URL 경로)
file: 엑셀 파일
Returns:
저장 결과 메시지
"""
# 1. 파일 확장자 검증
if not file.filename.endswith(('.xlsx', '.xls')):
raise HTTPException(
status_code=400,
detail="엑셀 파일(.xlsx, .xls)만 업로드 가능합니다."
)
# 2. 발전소 정보 조회
try:
plant_response = db.table("plants") \
.select("id, capacity, name") \
.eq("id", plant_id) \
.single() \
.execute()
if not plant_response.data:
raise HTTPException(
status_code=404,
detail=f"발전소 '{plant_id}'를 찾을 수 없습니다."
)
capacity = plant_response.data.get('capacity', 99.0) or 99.0
plant_name = plant_response.data.get('name', plant_id)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"발전소 조회 실패: {str(e)}"
)
# 3. 엑셀 파일 읽기
try:
contents = await file.read()
df = pd.read_excel(io.BytesIO(contents))
# 컬럼명 소문자 변환
df.columns = [str(c).lower().strip() for c in df.columns]
# 컬럼 매핑 (별칭 지원)
# date: date, 일자, 날짜
# generation: generation, kwh, 발전량, 발전량(kwh)
# year/month/day: year, month, day, 년, 월, 일, 연도
col_map = {}
for col in df.columns:
if col in ['date', '일자', '날짜']:
col_map['date'] = col
elif col in ['generation', 'kwh', '발전량', '발전량(kwh)']:
col_map['generation'] = col
elif col in ['year', '', '년도', '연도']:
col_map['year'] = col
elif col in ['month', '']:
col_map['month'] = col
elif col in ['day', '']:
col_map['day'] = col
# 전략 1: date, generation 컬럼이 있는 경우
if 'date' in col_map and 'generation' in col_map:
df.rename(columns={col_map['date']: 'date', col_map['generation']: 'generation'}, inplace=True)
# 전략 2: year, month, day, generation(kwh) 컬럼이 있는 경우 (New Strategy)
elif all(k in col_map for k in ['year', 'month', 'day', 'generation']):
rename_dict = {
col_map['year']: 'year',
col_map['month']: 'month',
col_map['day']: 'day',
col_map['generation']: 'generation'
}
df.rename(columns=rename_dict, inplace=True)
# 병합된 셀 처리 (ffill)
df['year'] = df['year'].fillna(method='ffill')
df['month'] = df['month'].fillna(method='ffill')
# 날짜 생성 함수
def make_date(row):
try:
y = int(float(str(row['year']).replace('','').strip()))
m = int(float(str(row['month']).replace('','').strip()))
d = int(float(str(row['day']).replace('','').strip()))
return f"{y:04d}-{m:02d}-{d:02d}"
except:
return None
df['date'] = df.apply(make_date, axis=1)
else:
raise HTTPException(
status_code=400,
detail="필수 컬럼이 없습니다. (date, generation) 또는 (year, month, day, kwh) 형식이 필요합니다."
)
# date 컬럼 유효성 재확인
if 'date' not in df.columns:
raise HTTPException(
status_code=400,
detail="날짜 정보를 파싱할 수 없습니다."
)
if df.empty:
raise HTTPException(
status_code=400,
detail="엑셀 파일에 데이터가 없습니다."
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"엑셀 파일 읽기 실패: {str(e)}"
)
# 4. 데이터 변환
records = []
errors = []
for idx, row in df.iterrows():
try:
date_val = row['date']
if pd.isna(date_val):
continue
date_str = pd.to_datetime(date_val).strftime('%Y-%m-%d')
generation = float(row['generation']) if pd.notna(row['generation']) else 0.0
generation_hours = round(generation / capacity, 2) if capacity > 0 else 0.0
records.append({
'plant_id': plant_id,
'date': date_str,
'total_generation': round(generation, 2),
'peak_kw': 0.0,
'generation_hours': generation_hours
})
except Exception as e:
errors.append(f"{idx+2}: {str(e)}")
if not records:
raise HTTPException(
status_code=400,
detail=f"저장할 유효한 데이터가 없습니다."
)
# 5. DB Upsert
try:
db.table("daily_stats").upsert(
records,
on_conflict="plant_id,date"
).execute()
return {
"status": "success",
"plant_id": plant_id,
"plant_name": plant_name,
"message": f"{len(records)}건의 데이터가 저장되었습니다.",
"saved_count": len(records),
"error_count": len(errors)
}
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"DB 저장 실패: {str(e)}"
)
@router.post("/plants/{plant_id}/upload/monthly")
async def upload_plant_monthly_data(
plant_id: str,
file: UploadFile = File(..., description="월간 발전량 엑셀 파일 (year, month, kwh)"),
db: Client = Depends(get_db)
) -> dict:
"""
월간 발전 데이터 엑셀 업로드
엑셀 컬럼 형식:
- year: 연도 (2014년 등, 병합된 셀 지원)
- month: 월 (1월, 2월, 합계, 평균 등)
- kwh: 발전량 (1,234.56)
기능:
- '합계', '평균' 행은 자동으로 무시
- 'year' 컬럼이 비어있으면 이전 행의 값 사용 (ffill)
- 'month', 'kwh'의 특수문자(월, 콤마 등) 제거 및 숫자 변환
- monthly_stats 테이블에 저장
"""
# 1. 파일 확장자 검증
if not file.filename.endswith(('.xlsx', '.xls')):
raise HTTPException(
status_code=400,
detail="엑셀 파일(.xlsx, .xls)만 업로드 가능합니다."
)
# 2. 발전소 존재 확인
try:
plant_check = db.table("plants").select("id").eq("id", plant_id).single().execute()
if not plant_check.data:
raise HTTPException(status_code=404, detail="발전소를 찾을 수 없습니다.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"발전소 확인 실패: {e}")
# 3. 엑셀 파싱 및 전처리
try:
contents = await file.read()
# engine='openpyxl' 명시 (xlsx)
try:
df = pd.read_excel(io.BytesIO(contents), engine='openpyxl')
except:
# xls fallback
df = pd.read_excel(io.BytesIO(contents))
# 컬럼명 정규화 (모두 소문자, 앞뒤 공백 제거)
df.columns = [str(col).lower().strip() for col in df.columns]
# 필수 컬럼 검사 (별칭 지원)
# year: year, 년도, 연도
# month: month, 월
# kwh: kwh, generation, 발전량, 발전량(kwh)
col_map = {}
for col in df.columns:
if col in ['year', '년도', '연도']:
col_map['year'] = col
elif col in ['month', '']:
col_map['month'] = col
elif col in ['kwh', 'generation', '발전량', '발전량(kwh)']:
col_map['kwh'] = col
# 매핑된 컬럼으로 이름 변경
rename_dict = {}
if 'year' in col_map: rename_dict[col_map['year']] = 'year'
if 'month' in col_map: rename_dict[col_map['month']] = 'month'
if 'kwh' in col_map: rename_dict[col_map['kwh']] = 'kwh'
df.rename(columns=rename_dict, inplace=True)
required = {'year', 'month', 'kwh'}
if not required.issubset(df.columns):
found_cols = list(df.columns)
raise HTTPException(
status_code=400,
detail=f"필수 컬럼이 누락되었습니다. (필요: year, month, kwh). 현재 인식된 컬럼: {found_cols}. 1행에 헤더가 있는지 확인해주세요."
)
# A열(year) 병합된 셀 처리 (Forward Fill)
df['year'] = df['year'].fillna(method='ffill')
records = []
errors = []
from datetime import datetime
for idx, row in df.iterrows():
try:
# 1. Year 파싱
y_raw = str(row['year']).replace('', '').strip()
# '2014.0' 같은 실수형 문자열 처리
if not y_raw or y_raw.lower() == 'nan':
continue
try:
year_val = int(float(y_raw))
except:
continue
# 2. Month 파싱
m_raw = str(row['month']).strip()
if m_raw in ['합계', '평균', 'nan', 'None', '', 'nan']:
continue
# '1월' -> 1, '01' -> 1
month_val_str = m_raw.replace('', '').strip()
# 숫자가 아닌 경우(합계 등) skip
if not month_val_str.replace('.', '').isdigit():
continue
month_val = int(float(month_val_str))
if not (1 <= month_val <= 12):
continue
# 3. Kwh 파싱
k_raw = str(row['kwh']).replace(',', '').strip()
if not k_raw or k_raw.lower() == 'nan':
kwh_val = 0.0
else:
try:
kwh_val = float(k_raw)
except:
kwh_val = 0.0
# 포맷: YYYY-MM
month_key = f"{year_val}-{month_val:02d}"
records.append({
"plant_id": plant_id,
"month": month_key,
"total_generation": kwh_val,
"updated_at": datetime.now().isoformat()
})
except Exception as e:
errors.append(f"Row {idx+2}: {e}")
if not records:
raise HTTPException(
status_code=400,
detail=f"유효한 데이터가 없습니다. 파싱 에러 예시: {errors[:3] if errors else '없음'}"
)
# 4. DB Upsert
# monthly_stats 테이블 생성 여부 확인이 필요하지만, 이미 되어있다고 가정
res = db.table("monthly_stats").upsert(
records,
on_conflict="plant_id, month"
).execute()
return {
"status": "success",
"message": f"{len(records)}건의 월간 데이터가 업로드되었습니다.",
"saved_count": len(records),
"errors": errors[:5]
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"처리 중 오류: {str(e)}")