314 lines
13 KiB
Python
314 lines
13 KiB
Python
# ==========================================
|
|
# database.py - Supabase 연동
|
|
# ==========================================
|
|
|
|
import os
|
|
from datetime import datetime
|
|
|
|
# 환경 변수에서 Supabase 설정 로드
|
|
SUPABASE_URL = os.getenv('SUPABASE_URL', '')
|
|
SUPABASE_KEY = os.getenv('SUPABASE_KEY', '')
|
|
|
|
print(f"DEBUG: SUPABASE_URL prefix: {SUPABASE_URL[:15] if SUPABASE_URL else 'None'}")
|
|
|
|
_supabase_client = None
|
|
|
|
|
|
def get_supabase_client():
|
|
"""Supabase 클라이언트 싱글턴 반환"""
|
|
global _supabase_client
|
|
|
|
if _supabase_client is None:
|
|
if not SUPABASE_URL or not SUPABASE_KEY:
|
|
print("⚠️ SUPABASE_URL 또는 SUPABASE_KEY가 설정되지 않았습니다.")
|
|
print(" .env 파일을 확인하거나 환경 변수를 설정하세요.")
|
|
return None
|
|
|
|
try:
|
|
from supabase import create_client
|
|
_supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY)
|
|
print("✅ Supabase 연결 성공")
|
|
except ImportError:
|
|
print("⚠️ supabase 패키지가 설치되지 않았습니다.")
|
|
print(" pip install supabase 실행하세요.")
|
|
return None
|
|
except Exception as e:
|
|
print(f"⚠️ Supabase 연결 실패: {e}")
|
|
return None
|
|
|
|
return _supabase_client
|
|
|
|
def save_to_supabase(data_list):
|
|
"""
|
|
수집된 발전 데이터를 Supabase solar_logs 테이블에 저장
|
|
|
|
Args:
|
|
data_list: [{'id': 'nrems-01', 'name': '...', 'kw': 10.5, 'today': 100.0, 'status': '...'}]
|
|
|
|
Returns:
|
|
bool: 저장 성공 여부
|
|
"""
|
|
if not data_list:
|
|
print("[DB] 저장할 데이터가 없습니다.")
|
|
return False
|
|
|
|
client = get_supabase_client()
|
|
if client is None:
|
|
print("[DB 저장 생략] Supabase 연결 없음")
|
|
return False
|
|
|
|
try:
|
|
# 저장할 레코드 생성
|
|
records = []
|
|
for item in data_list:
|
|
plant_id = item.get('id', '')
|
|
|
|
# id가 없는 경우 건너뛰기
|
|
if not plant_id:
|
|
print(f" ⚠️ '{item.get('name', 'Unknown')}' ID 없음, 건너뜀")
|
|
continue
|
|
|
|
# 한국 시간(KST) 타임스탬프 생성
|
|
from datetime import timezone, timedelta
|
|
kst = timezone(timedelta(hours=9))
|
|
kst_now = datetime.now(kst).isoformat()
|
|
|
|
record = {
|
|
'plant_id': plant_id,
|
|
'current_kw': float(item.get('kw', 0)),
|
|
'today_kwh': float(item.get('today', 0)),
|
|
'status': item.get('status', ''),
|
|
'created_at': kst_now # 한국 시간으로 저장
|
|
}
|
|
records.append(record)
|
|
|
|
if not records:
|
|
print("[DB] 저장할 유효한 레코드가 없습니다.")
|
|
return False
|
|
|
|
# Supabase에 일괄 삽입 (solar_logs)
|
|
result = client.table("solar_logs").insert(records).execute()
|
|
|
|
print(f"✅ [DB] Supabase 저장 완료: {len(records)}건 (solar_logs)")
|
|
|
|
# daily_stats 테이블 업데이트 (Upsert)
|
|
# 오늘 날짜(KST) 기준, 현재 수집된 today_kwh가 기존 값보다 크거나 같으면 업데이트
|
|
# 하지만 보통 today_kwh는 누적값이므로 간단하게 upsert 처리
|
|
daily_records = []
|
|
kst_date_str = datetime.now(timezone(timedelta(hours=9))).strftime("%Y-%m-%d")
|
|
|
|
for item in data_list:
|
|
plant_id = item.get('id', '')
|
|
if not plant_id: continue
|
|
|
|
today_val = float(item.get('today', 0))
|
|
|
|
# 0인 경우는 저장하지 않거나(새벽), 기존 값을 덮어쓰지 않도록 주의해야 함
|
|
# 하지만 발전소 데이터 보정을 위해 0이어도 일단 기록하거나,
|
|
# 아니면 max 값을 유지하는 로직이 필요할 수 있음.
|
|
# 여기서는 Upsert로 덮어쓰되, DB 트리거가 없다면 마지막 값이 저장됨.
|
|
# 보통 크롤링은 누적값이므로 마지막 값이 그날의 최종값에 가까움.
|
|
|
|
daily_records.append({
|
|
"plant_id": plant_id,
|
|
"date": kst_date_str,
|
|
"total_generation": today_val,
|
|
"created_at": kst_now, # 생성/수정일
|
|
"updated_at": kst_now
|
|
})
|
|
|
|
if daily_records:
|
|
# upsert: plant_id, date가 unique constraint여야 함
|
|
try:
|
|
# ignore_duplicates=False -> 업데이트
|
|
# on_conflict="plant_id, date" (Supabase/PG 설정에 따라 다름, 보통 PK나 UK 기준)
|
|
stats_result = client.table("daily_stats").upsert(daily_records, on_conflict="plant_id, date").execute()
|
|
print(f"✅ [DB] daily_stats 업데이트 완료: {len(daily_records)}건")
|
|
except Exception as e:
|
|
print(f"⚠️ [DB] daily_stats 업데이트 실패: {e}")
|
|
|
|
for r in records:
|
|
print(f" → {r['plant_id']}: {r['current_kw']} kW / {r['today_kwh']} kWh")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ [DB] Supabase 저장 실패: {e}")
|
|
return False
|
|
|
|
def save_to_console(data_list):
|
|
"""콘솔에 데이터 출력"""
|
|
if not data_list:
|
|
print("⚠️ 출력할 데이터가 없습니다.")
|
|
return
|
|
|
|
print("\n" + "=" * 75)
|
|
print("📊 [실시간 통합 현황판]")
|
|
print("=" * 75)
|
|
print(f"{'발전소명':<20} | {'현재출력(kW)':>12} | {'금일발전(kWh)':>12} | {'상태'}")
|
|
print("-" * 75)
|
|
|
|
total_kw = 0
|
|
total_today = 0
|
|
|
|
for d in data_list:
|
|
name = d.get('name', 'N/A')
|
|
kw = d.get('kw', 0)
|
|
today = d.get('today', 0)
|
|
status = d.get('status', '')
|
|
|
|
total_kw += kw
|
|
total_today += today
|
|
|
|
print(f"{name:<20} | {kw:>12.2f} | {today:>12.2f} | {status}")
|
|
|
|
print("-" * 75)
|
|
print(f"{'합계':<20} | {total_kw:>12.2f} | {total_today:>12.2f} |")
|
|
print("=" * 75)
|
|
|
|
def save_history(data_list, data_type='hourly'):
|
|
"""
|
|
과거 데이터 저장 (Hourly, Daily, Monthly)
|
|
|
|
Args:
|
|
data_list: 데이터 리스트
|
|
data_type: 'hourly', 'daily', 'monthly'
|
|
"""
|
|
if not data_list:
|
|
return False
|
|
|
|
client = get_supabase_client()
|
|
if client is None:
|
|
return False
|
|
|
|
try:
|
|
table_name = ""
|
|
records = []
|
|
|
|
if data_type == 'hourly':
|
|
table_name = "solar_logs"
|
|
for item in data_list:
|
|
# hourly 데이터는 timestamp 키를 가짐
|
|
ts = item.get('timestamp')
|
|
if ts:
|
|
ts_iso = ts.replace(' ', 'T')
|
|
# Check if future (simple string comparison works for ISO format if consistent, but datetime is safer)
|
|
# KST aware comparison
|
|
from datetime import timezone, timedelta
|
|
kst = timezone(timedelta(hours=9))
|
|
now_kst = datetime.now(kst)
|
|
|
|
try:
|
|
# ts example: 2026-01-27 14:00:00. Assume input is local time (KST)
|
|
# We convert it to aware datetime
|
|
dt_ts = datetime.fromisoformat(ts_iso)
|
|
if dt_ts.tzinfo is None:
|
|
dt_ts = dt_ts.replace(tzinfo=kst)
|
|
|
|
if dt_ts > now_kst:
|
|
continue # Skip future data
|
|
except ValueError:
|
|
pass # robust date parsing needed if format varies
|
|
|
|
# Ensure timezone is sent to Supabase to prevent UTC assumption
|
|
final_created_at = dt_ts.isoformat()
|
|
|
|
records.append({
|
|
'plant_id': item['plant_id'],
|
|
'created_at': final_created_at,
|
|
'current_kw': float(item.get('current_kw', 0) or item.get('generation_kwh', 0)),
|
|
'today_kwh': float(item.get('generation_kwh', 0)),
|
|
'status': 'History'
|
|
})
|
|
|
|
elif data_type == 'daily':
|
|
table_name = "daily_stats"
|
|
for item in data_list:
|
|
records.append({
|
|
'plant_id': item['plant_id'],
|
|
'date': item['date'],
|
|
'total_generation': float(item.get('generation_kwh', 0))
|
|
# 'updated_at': datetime.now().isoformat()
|
|
})
|
|
|
|
elif data_type == 'monthly':
|
|
table_name = "monthly_stats"
|
|
for item in data_list:
|
|
records.append({
|
|
'plant_id': item['plant_id'],
|
|
'month': item['month'], # YYYY-MM
|
|
'total_generation': float(item.get('generation_kwh', 0)),
|
|
'updated_at': datetime.now().isoformat()
|
|
})
|
|
|
|
if not records:
|
|
return False
|
|
|
|
# upsert 사용
|
|
if data_type == 'hourly':
|
|
# hourly는 시간값 중복 시 업데이트? solar_logs는 보통 log table이라 pk가 id일 수 있음.
|
|
# 하지만 과거 내역이므로 중복 방지가 필요. created_at 기준?
|
|
# solar_logs에 unique constraints가 plant_id, created_at에 있는지 불확실.
|
|
# 일단 insert로 시도
|
|
client.table(table_name).insert(records).execute()
|
|
elif data_type == 'daily':
|
|
client.table(table_name).upsert(records, on_conflict="plant_id, date").execute()
|
|
|
|
# [Auto Update] Daily 데이터 저장 시 Monthly 통계 자동 갱신
|
|
# 1. 업데이트된 월 목록 추출
|
|
updated_months = set()
|
|
for rec in records:
|
|
try:
|
|
# date: YYYY-MM-DD
|
|
month_key = rec['date'][:7]
|
|
updated_months.add((rec['plant_id'], month_key))
|
|
except:
|
|
pass
|
|
|
|
if updated_months:
|
|
monthly_upserts = []
|
|
for (pid, m_key) in updated_months:
|
|
# 2. 해당 월의 Daily 합계 조회 (DB Aggregation)
|
|
# start_date ~ end_date 범위 쿼리가 필요하지만,
|
|
# supabase-py에서는 .select('total_generation.sum()') 같은 게 잘 안됨.
|
|
# 그냥 해당 월 데이터를 가져와서 파이썬에서 합산 (데이터 최대 31개라 매우 가벼움)
|
|
|
|
start_d = f"{m_key}-01"
|
|
# end_d 로직 복잡하므로 그냥 문자열 필터로 (YYYY-MM-01 ~ YYYY-MM-31)
|
|
# like는 지원 안 할 수 있으므로 date >= start AND date <= end
|
|
# 다음달 1일 전까지
|
|
|
|
# 쿼리: select total_generation where plant_id=X and date like 'YYYY-MM%'
|
|
# but 'like' operator might differ.
|
|
# Simpler: gte "YYYY-MM-01", lte "YYYY-MM-31"
|
|
|
|
d_res = client.table("daily_stats").select("total_generation") \
|
|
.eq("plant_id", pid) \
|
|
.gte("date", f"{m_key}-01") \
|
|
.lte("date", f"{m_key}-31") \
|
|
.execute()
|
|
|
|
total_gen = sum(r['total_generation'] or 0 for r in d_res.data)
|
|
|
|
monthly_upserts.append({
|
|
"plant_id": pid,
|
|
"month": m_key,
|
|
"total_generation": round(total_gen, 2),
|
|
"updated_at": datetime.now().isoformat()
|
|
})
|
|
|
|
# 3. Monthly Upsert
|
|
if monthly_upserts:
|
|
client.table("monthly_stats").upsert(monthly_upserts, on_conflict="plant_id, month").execute()
|
|
print(f" 🔄 [Sync] {len(monthly_upserts)}개월치 Monthly Stats 자동 갱신 완료")
|
|
|
|
elif data_type == 'monthly':
|
|
client.table(table_name).upsert(records, on_conflict="plant_id, month").execute()
|
|
|
|
print(f"✅ [History] {data_type} 데이터 {len(records)}건 저장 완료")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ [History] 저장 실패 ({data_type}): {e}")
|
|
return False
|