solorpower_crawler/daily_summary.py

201 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ==========================================
# daily_summary.py - 일일 발전 통계 집계
# ==========================================
# solar_logs 데이터를 집계하여 daily_stats 테이블에 저장
from datetime import datetime, timedelta
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
import pandas as pd
from database import get_supabase_client
def get_plant_capacities(client) -> dict:
"""plants 테이블에서 용량 정보 조회"""
try:
result = client.table("plants").select("id, capacity").execute()
return {row['id']: row.get('capacity', 99.0) for row in result.data}
except Exception as e:
print(f" ⚠️ 용량 조회 실패: {e}")
return {}
def calculate_daily_stats(date_str: str = None):
"""
특정 날짜의 발전 통계 집계
Args:
date_str: 집계 대상 날짜 (YYYY-MM-DD). 미지정 시 오늘.
"""
if date_str is None:
date_str = datetime.now().strftime('%Y-%m-%d')
print(f"\n📊 [일일 통계 집계] {date_str}")
print("-" * 60)
client = get_supabase_client()
if not client:
print("❌ Supabase 연결 실패")
return False
# 1. 용량 정보 조회
capacities = get_plant_capacities(client)
# 2. 해당일 로그 조회
start_dt = f"{date_str}T00:00:00"
end_dt = f"{date_str}T23:59:59"
try:
result = client.table("solar_logs") \
.select("plant_id, current_kw, today_kwh, created_at") \
.gte("created_at", start_dt) \
.lte("created_at", end_dt) \
.order("created_at", desc=False) \
.execute()
if not result.data:
print(" ⚠️ 해당 날짜의 로그가 없습니다.")
return False
df = pd.DataFrame(result.data)
except Exception as e:
print(f" ❌ 로그 조회 실패: {e}")
return False
# 3. 발전소별 통계 계산
stats_list = []
for plant_id, group in df.groupby('plant_id'):
# 마지막 로그의 today_kwh
total_generation = group['today_kwh'].iloc[-1] if len(group) > 0 else 0
# 최대 출력
peak_kw = group['current_kw'].max() if len(group) > 0 else 0
# 이용률 시간 = 발전량 / 용량
capacity = capacities.get(plant_id, 99.0)
generation_hours = round(total_generation / capacity, 2) if capacity > 0 else 0
stats = {
'plant_id': plant_id,
'date': date_str,
'total_generation': round(total_generation, 2),
'peak_kw': round(peak_kw, 2),
'generation_hours': generation_hours
}
stats_list.append(stats)
# 출력
print(f" {plant_id}: {total_generation:.1f}kWh ({generation_hours:.1f}시간, 최대 {peak_kw:.1f}kW)")
# 4. daily_stats 테이블에 Upsert
if stats_list:
try:
result = client.table("daily_stats").upsert(
stats_list,
on_conflict="plant_id,date"
).execute()
print("-" * 60)
print(f"{len(stats_list)}개 발전소 통계 저장 완료")
except Exception as e:
print(f" ❌ 저장 실패: {e}")
return False
return True
def calculate_monthly_stats(target_month: str):
"""
특정 월의 발전 통계 집계 (일간 데이터 합산)
Args:
target_month: YYYY-MM
"""
print(f"\n📅 [월간 통계 집계] {target_month}")
print("-" * 60)
client = get_supabase_client()
if not client:
return False
try:
# 1. 모든 발전소 ID 조회
plants_res = client.table("plants").select("id").execute()
plant_ids = [p['id'] for p in plants_res.data]
updated_count = 0
for pid in plant_ids:
# 2. 해당 월의 Daily 합계 조회
d_res = client.table("daily_stats").select("total_generation") \
.eq("plant_id", pid) \
.gte("date", f"{target_month}-01") \
.lte("date", f"{target_month}-31") \
.execute()
if not d_res.data:
continue
total_gen = sum(r.get('total_generation', 0) or 0 for r in d_res.data)
# 3. Monthly Upsert
client.table("monthly_stats").upsert({
"plant_id": pid,
"month": target_month,
"total_generation": round(total_gen, 2),
"updated_at": datetime.now().isoformat()
}, on_conflict="plant_id, month").execute()
print(f" {pid}: {total_gen:.1f}kWh (Month Total)")
updated_count += 1
print("-" * 60)
print(f"{updated_count}개 발전소 월간 통계 갱신 완료")
return True
except Exception as e:
print(f" ❌ 월간 집계 실패: {e}")
return False
if __name__ == "__main__":
import sys
from datetime import timedelta
# 인자로 날짜 지정 가능: python daily_summary.py 2026-01-22
if len(sys.argv) > 1:
target_date = sys.argv[1]
else:
# 인자 없으면 '어제' 날짜를 기본값으로 사용
# (새벽에 실행하여 전날 데이터를 마감하는 시나리오)
yesterday = datetime.now() - timedelta(days=1)
target_date = yesterday.strftime('%Y-%m-%d')
print(f" 날짜 미지정 -> 어제({target_date}) 기준으로 집계합니다.")
# 1. 일간 통계 집계
success = calculate_daily_stats(target_date)
# 2. 월말 체크 및 월간 집계 트리거
# target_date가 해당 월의 마지막 날이면 월간 집계 실행
if success:
try:
current_dt = datetime.strptime(target_date, '%Y-%m-%d')
import calendar
last_day = calendar.monthrange(current_dt.year, current_dt.month)[1]
if current_dt.day == last_day:
target_month = current_dt.strftime('%Y-%m')
print(f"\n🔔 월말({target_date}) 감지 -> {target_month} 월간 집계 실행")
calculate_monthly_stats(target_month)
except Exception as e:
print(f"⚠️ 월간 집계 트리거 오류: {e}")