201 lines
6.4 KiB
Python
201 lines
6.4 KiB
Python
# ==========================================
|
||
# daily_summary.py - 일일 발전 통계 집계
|
||
# ==========================================
|
||
# solar_logs 데이터를 집계하여 daily_stats 테이블에 저장
|
||
|
||
from datetime import datetime, timedelta
|
||
|
||
try:
|
||
from dotenv import load_dotenv
|
||
load_dotenv()
|
||
except ImportError:
|
||
pass
|
||
|
||
import pandas as pd
|
||
from database import get_supabase_client
|
||
|
||
|
||
def get_plant_capacities(client) -> dict:
|
||
"""plants 테이블에서 용량 정보 조회"""
|
||
try:
|
||
result = client.table("plants").select("id, capacity").execute()
|
||
return {row['id']: row.get('capacity', 99.0) for row in result.data}
|
||
except Exception as e:
|
||
print(f" ⚠️ 용량 조회 실패: {e}")
|
||
return {}
|
||
|
||
|
||
def calculate_daily_stats(date_str: str = None):
|
||
"""
|
||
특정 날짜의 발전 통계 집계
|
||
|
||
Args:
|
||
date_str: 집계 대상 날짜 (YYYY-MM-DD). 미지정 시 오늘.
|
||
"""
|
||
if date_str is None:
|
||
date_str = datetime.now().strftime('%Y-%m-%d')
|
||
|
||
print(f"\n📊 [일일 통계 집계] {date_str}")
|
||
print("-" * 60)
|
||
|
||
client = get_supabase_client()
|
||
if not client:
|
||
print("❌ Supabase 연결 실패")
|
||
return False
|
||
|
||
# 1. 용량 정보 조회
|
||
capacities = get_plant_capacities(client)
|
||
|
||
# 2. 해당일 로그 조회
|
||
start_dt = f"{date_str}T00:00:00"
|
||
end_dt = f"{date_str}T23:59:59"
|
||
|
||
try:
|
||
result = client.table("solar_logs") \
|
||
.select("plant_id, current_kw, today_kwh, created_at") \
|
||
.gte("created_at", start_dt) \
|
||
.lte("created_at", end_dt) \
|
||
.order("created_at", desc=False) \
|
||
.execute()
|
||
|
||
if not result.data:
|
||
print(" ⚠️ 해당 날짜의 로그가 없습니다.")
|
||
return False
|
||
|
||
df = pd.DataFrame(result.data)
|
||
|
||
except Exception as e:
|
||
print(f" ❌ 로그 조회 실패: {e}")
|
||
return False
|
||
|
||
# 3. 발전소별 통계 계산
|
||
stats_list = []
|
||
|
||
for plant_id, group in df.groupby('plant_id'):
|
||
# 마지막 로그의 today_kwh
|
||
total_generation = group['today_kwh'].iloc[-1] if len(group) > 0 else 0
|
||
|
||
# 최대 출력
|
||
peak_kw = group['current_kw'].max() if len(group) > 0 else 0
|
||
|
||
# 이용률 시간 = 발전량 / 용량
|
||
capacity = capacities.get(plant_id, 99.0)
|
||
generation_hours = round(total_generation / capacity, 2) if capacity > 0 else 0
|
||
|
||
stats = {
|
||
'plant_id': plant_id,
|
||
'date': date_str,
|
||
'total_generation': round(total_generation, 2),
|
||
'peak_kw': round(peak_kw, 2),
|
||
'generation_hours': generation_hours
|
||
}
|
||
stats_list.append(stats)
|
||
|
||
# 출력
|
||
print(f" {plant_id}: {total_generation:.1f}kWh ({generation_hours:.1f}시간, 최대 {peak_kw:.1f}kW)")
|
||
|
||
# 4. daily_stats 테이블에 Upsert
|
||
if stats_list:
|
||
try:
|
||
result = client.table("daily_stats").upsert(
|
||
stats_list,
|
||
on_conflict="plant_id,date"
|
||
).execute()
|
||
|
||
print("-" * 60)
|
||
print(f"✅ {len(stats_list)}개 발전소 통계 저장 완료")
|
||
|
||
except Exception as e:
|
||
print(f" ❌ 저장 실패: {e}")
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def calculate_monthly_stats(target_month: str):
|
||
"""
|
||
특정 월의 발전 통계 집계 (일간 데이터 합산)
|
||
|
||
Args:
|
||
target_month: YYYY-MM
|
||
"""
|
||
print(f"\n📅 [월간 통계 집계] {target_month}")
|
||
print("-" * 60)
|
||
|
||
client = get_supabase_client()
|
||
if not client:
|
||
return False
|
||
|
||
try:
|
||
# 1. 모든 발전소 ID 조회
|
||
plants_res = client.table("plants").select("id").execute()
|
||
plant_ids = [p['id'] for p in plants_res.data]
|
||
|
||
updated_count = 0
|
||
|
||
for pid in plant_ids:
|
||
# 2. 해당 월의 Daily 합계 조회
|
||
d_res = client.table("daily_stats").select("total_generation") \
|
||
.eq("plant_id", pid) \
|
||
.gte("date", f"{target_month}-01") \
|
||
.lte("date", f"{target_month}-31") \
|
||
.execute()
|
||
|
||
if not d_res.data:
|
||
continue
|
||
|
||
total_gen = sum(r.get('total_generation', 0) or 0 for r in d_res.data)
|
||
|
||
# 3. Monthly Upsert
|
||
client.table("monthly_stats").upsert({
|
||
"plant_id": pid,
|
||
"month": target_month,
|
||
"total_generation": round(total_gen, 2),
|
||
"updated_at": datetime.now().isoformat()
|
||
}, on_conflict="plant_id, month").execute()
|
||
|
||
print(f" {pid}: {total_gen:.1f}kWh (Month Total)")
|
||
updated_count += 1
|
||
|
||
print("-" * 60)
|
||
print(f"✅ {updated_count}개 발전소 월간 통계 갱신 완료")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f" ❌ 월간 집계 실패: {e}")
|
||
return False
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import sys
|
||
from datetime import timedelta
|
||
|
||
# 인자로 날짜 지정 가능: python daily_summary.py 2026-01-22
|
||
if len(sys.argv) > 1:
|
||
target_date = sys.argv[1]
|
||
else:
|
||
# 인자 없으면 '어제' 날짜를 기본값으로 사용
|
||
# (새벽에 실행하여 전날 데이터를 마감하는 시나리오)
|
||
yesterday = datetime.now() - timedelta(days=1)
|
||
target_date = yesterday.strftime('%Y-%m-%d')
|
||
print(f"ℹ️ 날짜 미지정 -> 어제({target_date}) 기준으로 집계합니다.")
|
||
|
||
# 1. 일간 통계 집계
|
||
success = calculate_daily_stats(target_date)
|
||
|
||
# 2. 월말 체크 및 월간 집계 트리거
|
||
# target_date가 해당 월의 마지막 날이면 월간 집계 실행
|
||
if success:
|
||
try:
|
||
current_dt = datetime.strptime(target_date, '%Y-%m-%d')
|
||
import calendar
|
||
last_day = calendar.monthrange(current_dt.year, current_dt.month)[1]
|
||
|
||
if current_dt.day == last_day:
|
||
target_month = current_dt.strftime('%Y-%m')
|
||
print(f"\n🔔 월말({target_date}) 감지 -> {target_month} 월간 집계 실행")
|
||
calculate_monthly_stats(target_month)
|
||
except Exception as e:
|
||
print(f"⚠️ 월간 집계 트리거 오류: {e}")
|
||
|