import sqlite3 import requests from datetime import datetime from pathlib import Path from config import TELEGRAM_BOT_TOKEN class AlertManager: """ 발전소 이상 감지 및 텔레그램 알림 관리 - 상태(정상/이상)를 DB에 저장하여 중복 알림 방지 """ def __init__(self, db_path: str = None): """ DB 연결 및 테이블 초기화 """ if db_path is None: # crawler_manager와 같은 DB 파일 사용 db_path = Path(__file__).parent / "crawler_manager.db" self.db_path = str(db_path) self._init_db() def _init_db(self): """알림 히스토리 테이블 생성""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() # site_id: 발전소 ID # alert_status: 'NORMAL' (정상), 'ALERT' (이상 발생 및 알림 전송됨) # last_alert_time: 마지막 알림 전송 시간 cursor.execute(""" CREATE TABLE IF NOT EXISTS alert_history ( site_id TEXT PRIMARY KEY, alert_status TEXT DEFAULT 'NORMAL', last_alert_time TEXT ) """) conn.commit() def send_telegram_message(self, chat_id, message): """텔레그램 메시지 전송""" if not TELEGRAM_BOT_TOKEN: print(" ⚠️ 텔레그램 토큰이 설정되지 않았습니다.") return False if not chat_id: # Chat ID가 설정되지 않은 경우 조용히 리턴 (로그는 호출부에서 처리) return False url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage" try: payload = {"chat_id": chat_id, "text": message} response = requests.post(url, json=payload, timeout=5) if response.status_code == 200: print(f" 🔔 텔레그램 알림 전송 성공") return True else: print(f" ❌ 텔레그램 전송 실패 ({response.status_code}): {response.text}") return False except Exception as e: print(f" ❌ 텔레그램 전송 중 에러: {e}") return False def check_and_alert(self, plant_info: dict, current_kw: float): """ 발전량을 체크하고 필요 시 알림 전송 - 오전 10시 ~ 오후 5시에만 동작 - 상태 변경 시에만 알림 (중복 방지) """ # 1. 시간 체크 (오전 10시 ~ 오후 5시) now = datetime.now() if not (10 <= now.hour <= 17): return site_id = plant_info.get('id') plant_name = plant_info.get('display_name', plant_info.get('name')) chat_id = plant_info.get('telegram_chat_id') if not site_id: return # 2. 현재 DB 상태 확인 current_status = 'NORMAL' with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute("SELECT alert_status FROM alert_history WHERE site_id = ?", (site_id,)) row = cursor.fetchone() if row: current_status = row[0] else: # 초기값 생성 cursor.execute("INSERT INTO alert_history (site_id, alert_status) VALUES (?, ?)", (site_id, 'NORMAL')) conn.commit() # 3. 상태 전이 로직 new_status = current_status # [Case A] 발전량 0 (이상 감지) if current_kw == 0: if current_status == 'NORMAL': # NORMAL -> ALERT: 알림 전송 print(f" 🚨 [Alert] {plant_name} 발전량 0kW 감지! 알림 전송 시도...") if chat_id: message = ( f"🚨 [긴급] 발전소 이상 감지!\n\n" f"- 발전소: {plant_name}\n" f"- 상태: 발전량 0kW\n" f"- 시간: {now.strftime('%Y-%m-%d %H:%M:%S')}" ) if self.send_telegram_message(chat_id, message): new_status = 'ALERT' else: print(f" ⚠️ {plant_name}: Chat ID 오류로 알림 실패") # 전송 실패해도 상태를 ALERT로 할 것인가? # 실패했다면 다음에 다시 시도해야 하므로 NORMAL 유지 else: print(f" ⚠️ {plant_name}: 설정된 Chat ID가 없습니다. (config.py 확인)") else: # 이미 ALERT 상태: 중복 알림 생략 pass # [Case B] 발전량 > 0 (정상 복구) else: if current_status == 'ALERT': # ALERT -> NORMAL: 상태 리셋 print(f" ✅ [Alert] {plant_name} 정상 복구됨 ({current_kw}kW)") # 복구 알림은 옵션 (현재는 생략) new_status = 'NORMAL' # 4. 상태 변경 시 DB 업데이트 if new_status != current_status: with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" UPDATE alert_history SET alert_status = ?, last_alert_time = ? WHERE site_id = ? """, (new_status, now.isoformat(), site_id)) conn.commit()