# ========================================== # database.py - Supabase 연동 # ========================================== import os from datetime import datetime # 환경 변수에서 Supabase 설정 로드 SUPABASE_URL = os.getenv('SUPABASE_URL', '') SUPABASE_KEY = os.getenv('SUPABASE_KEY', '') print(f"DEBUG: SUPABASE_URL prefix: {SUPABASE_URL[:15] if SUPABASE_URL else 'None'}") _supabase_client = None def get_supabase_client(): """Supabase 클라이언트 싱글턴 반환""" global _supabase_client if _supabase_client is None: if not SUPABASE_URL or not SUPABASE_KEY: print("⚠️ SUPABASE_URL 또는 SUPABASE_KEY가 설정되지 않았습니다.") print(" .env 파일을 확인하거나 환경 변수를 설정하세요.") return None try: from supabase import create_client _supabase_client = create_client(SUPABASE_URL, SUPABASE_KEY) print("✅ Supabase 연결 성공") except ImportError: print("⚠️ supabase 패키지가 설치되지 않았습니다.") print(" pip install supabase 실행하세요.") return None except Exception as e: print(f"⚠️ Supabase 연결 실패: {e}") return None return _supabase_client def save_to_supabase(data_list): """ 수집된 발전 데이터를 Supabase solar_logs 테이블에 저장 Args: data_list: [{'id': 'nrems-01', 'name': '...', 'kw': 10.5, 'today': 100.0, 'status': '...'}] Returns: bool: 저장 성공 여부 """ if not data_list: print("[DB] 저장할 데이터가 없습니다.") return False client = get_supabase_client() if client is None: print("[DB 저장 생략] Supabase 연결 없음") return False try: # 저장할 레코드 생성 records = [] for item in data_list: plant_id = item.get('id', '') # id가 없는 경우 건너뛰기 if not plant_id: print(f" ⚠️ '{item.get('name', 'Unknown')}' ID 없음, 건너뜀") continue # 한국 시간(KST) 타임스탬프 생성 from datetime import timezone, timedelta kst = timezone(timedelta(hours=9)) kst_now = datetime.now(kst).isoformat() record = { 'plant_id': plant_id, 'current_kw': float(item.get('kw', 0)), 'today_kwh': float(item.get('today', 0)), 'status': item.get('status', ''), 'created_at': kst_now # 한국 시간으로 저장 } records.append(record) if not records: print("[DB] 저장할 유효한 레코드가 없습니다.") return False # Supabase에 일괄 삽입 (solar_logs) result = client.table("solar_logs").insert(records).execute() print(f"✅ [DB] Supabase 저장 완료: {len(records)}건 (solar_logs)") # daily_stats 테이블 업데이트 (Upsert) # 오늘 날짜(KST) 기준, 현재 수집된 today_kwh가 기존 값보다 크거나 같으면 업데이트 # 하지만 보통 today_kwh는 누적값이므로 간단하게 upsert 처리 daily_records = [] kst_date_str = datetime.now(timezone(timedelta(hours=9))).strftime("%Y-%m-%d") for item in data_list: plant_id = item.get('id', '') if not plant_id: continue today_val = float(item.get('today', 0)) # 0인 경우는 저장하지 않거나(새벽), 기존 값을 덮어쓰지 않도록 주의해야 함 # 하지만 발전소 데이터 보정을 위해 0이어도 일단 기록하거나, # 아니면 max 값을 유지하는 로직이 필요할 수 있음. # 여기서는 Upsert로 덮어쓰되, DB 트리거가 없다면 마지막 값이 저장됨. # 보통 크롤링은 누적값이므로 마지막 값이 그날의 최종값에 가까움. daily_records.append({ "plant_id": plant_id, "date": kst_date_str, "total_generation": today_val, "created_at": kst_now, # 생성/수정일 "updated_at": kst_now }) if daily_records: # upsert: plant_id, date가 unique constraint여야 함 try: # ignore_duplicates=False -> 업데이트 # on_conflict="plant_id, date" (Supabase/PG 설정에 따라 다름, 보통 PK나 UK 기준) stats_result = client.table("daily_stats").upsert(daily_records, on_conflict="plant_id, date").execute() print(f"✅ [DB] daily_stats 업데이트 완료: {len(daily_records)}건") except Exception as e: print(f"⚠️ [DB] daily_stats 업데이트 실패: {e}") for r in records: print(f" → {r['plant_id']}: {r['current_kw']} kW / {r['today_kwh']} kWh") return True except Exception as e: print(f"❌ [DB] Supabase 저장 실패: {e}") return False def save_to_console(data_list): """콘솔에 데이터 출력""" if not data_list: print("⚠️ 출력할 데이터가 없습니다.") return print("\n" + "=" * 75) print("📊 [실시간 통합 현황판]") print("=" * 75) print(f"{'발전소명':<20} | {'현재출력(kW)':>12} | {'금일발전(kWh)':>12} | {'상태'}") print("-" * 75) total_kw = 0 total_today = 0 for d in data_list: name = d.get('name', 'N/A') kw = d.get('kw', 0) today = d.get('today', 0) status = d.get('status', '') total_kw += kw total_today += today print(f"{name:<20} | {kw:>12.2f} | {today:>12.2f} | {status}") print("-" * 75) print(f"{'합계':<20} | {total_kw:>12.2f} | {total_today:>12.2f} |") print("=" * 75) def save_history(data_list, data_type='hourly'): """ 과거 데이터 저장 (Hourly, Daily, Monthly) Args: data_list: 데이터 리스트 data_type: 'hourly', 'daily', 'monthly' """ if not data_list: return False client = get_supabase_client() if client is None: return False try: table_name = "" records = [] if data_type == 'hourly': table_name = "solar_logs" for item in data_list: # hourly 데이터는 timestamp 키를 가짐 ts = item.get('timestamp') if ts: ts_iso = ts.replace(' ', 'T') # Check if future (simple string comparison works for ISO format if consistent, but datetime is safer) # KST aware comparison from datetime import timezone, timedelta kst = timezone(timedelta(hours=9)) now_kst = datetime.now(kst) try: # ts example: 2026-01-27 14:00:00. Assume input is local time (KST) # We convert it to aware datetime dt_ts = datetime.fromisoformat(ts_iso) if dt_ts.tzinfo is None: dt_ts = dt_ts.replace(tzinfo=kst) if dt_ts > now_kst: continue # Skip future data except ValueError: pass # robust date parsing needed if format varies # Ensure timezone is sent to Supabase to prevent UTC assumption final_created_at = dt_ts.isoformat() records.append({ 'plant_id': item['plant_id'], 'created_at': final_created_at, 'current_kw': float(item.get('current_kw', 0) or item.get('generation_kwh', 0)), 'today_kwh': float(item.get('generation_kwh', 0)), 'status': 'History' }) elif data_type == 'daily': table_name = "daily_stats" for item in data_list: records.append({ 'plant_id': item['plant_id'], 'date': item['date'], 'total_generation': float(item.get('generation_kwh', 0)) # 'updated_at': datetime.now().isoformat() }) elif data_type == 'monthly': table_name = "monthly_stats" for item in data_list: records.append({ 'plant_id': item['plant_id'], 'month': item['month'], # YYYY-MM 'total_generation': float(item.get('generation_kwh', 0)), 'updated_at': datetime.now().isoformat() }) if not records: return False # upsert 사용 if data_type == 'hourly': # hourly는 시간값 중복 시 업데이트? solar_logs는 보통 log table이라 pk가 id일 수 있음. # 하지만 과거 내역이므로 중복 방지가 필요. created_at 기준? # solar_logs에 unique constraints가 plant_id, created_at에 있는지 불확실. # 일단 insert로 시도 client.table(table_name).insert(records).execute() elif data_type == 'daily': client.table(table_name).upsert(records, on_conflict="plant_id, date").execute() # [Auto Update] Daily 데이터 저장 시 Monthly 통계 자동 갱신 # 1. 업데이트된 월 목록 추출 updated_months = set() for rec in records: try: # date: YYYY-MM-DD month_key = rec['date'][:7] updated_months.add((rec['plant_id'], month_key)) except: pass if updated_months: monthly_upserts = [] for (pid, m_key) in updated_months: # 2. 해당 월의 Daily 합계 조회 (DB Aggregation) # start_date ~ end_date 범위 쿼리가 필요하지만, # supabase-py에서는 .select('total_generation.sum()') 같은 게 잘 안됨. # 그냥 해당 월 데이터를 가져와서 파이썬에서 합산 (데이터 최대 31개라 매우 가벼움) start_d = f"{m_key}-01" # end_d 로직 복잡하므로 그냥 문자열 필터로 (YYYY-MM-01 ~ YYYY-MM-31) # like는 지원 안 할 수 있으므로 date >= start AND date <= end # 다음달 1일 전까지 # 쿼리: select total_generation where plant_id=X and date like 'YYYY-MM%' # but 'like' operator might differ. # Simpler: gte "YYYY-MM-01", lte "YYYY-MM-31" d_res = client.table("daily_stats").select("total_generation") \ .eq("plant_id", pid) \ .gte("date", f"{m_key}-01") \ .lte("date", f"{m_key}-31") \ .execute() total_gen = sum(r['total_generation'] or 0 for r in d_res.data) monthly_upserts.append({ "plant_id": pid, "month": m_key, "total_generation": round(total_gen, 2), "updated_at": datetime.now().isoformat() }) # 3. Monthly Upsert if monthly_upserts: client.table("monthly_stats").upsert(monthly_upserts, on_conflict="plant_id, month").execute() print(f" 🔄 [Sync] {len(monthly_upserts)}개월치 Monthly Stats 자동 갱신 완료") elif data_type == 'monthly': client.table(table_name).upsert(records, on_conflict="plant_id, month").execute() print(f"✅ [History] {data_type} 데이터 {len(records)}건 저장 완료") return True except Exception as e: print(f"❌ [History] 저장 실패 ({data_type}): {e}") return False