solorpower_crawler/alert_manager.py
2026-03-30 13:01:18 +09:00

145 lines
5.6 KiB
Python

import sqlite3
import requests
from datetime import datetime
from pathlib import Path
from config import TELEGRAM_BOT_TOKEN
class AlertManager:
"""
발전소 이상 감지 및 텔레그램 알림 관리
- 상태(정상/이상)를 DB에 저장하여 중복 알림 방지
"""
def __init__(self, db_path: str = None):
"""
DB 연결 및 테이블 초기화
"""
if db_path is None:
# crawler_manager와 같은 DB 파일 사용
db_path = Path(__file__).parent / "crawler_manager.db"
self.db_path = str(db_path)
self._init_db()
def _init_db(self):
"""알림 히스토리 테이블 생성"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
# site_id: 발전소 ID
# alert_status: 'NORMAL' (정상), 'ALERT' (이상 발생 및 알림 전송됨)
# last_alert_time: 마지막 알림 전송 시간
cursor.execute("""
CREATE TABLE IF NOT EXISTS alert_history (
site_id TEXT PRIMARY KEY,
alert_status TEXT DEFAULT 'NORMAL',
last_alert_time TEXT
)
""")
conn.commit()
def send_telegram_message(self, chat_id, message):
"""텔레그램 메시지 전송"""
if not TELEGRAM_BOT_TOKEN:
print(" ⚠️ 텔레그램 토큰이 설정되지 않았습니다.")
return False
if not chat_id:
# Chat ID가 설정되지 않은 경우 조용히 리턴 (로그는 호출부에서 처리)
return False
url = f"https://api.telegram.org/bot{TELEGRAM_BOT_TOKEN}/sendMessage"
try:
payload = {"chat_id": chat_id, "text": message}
response = requests.post(url, json=payload, timeout=5)
if response.status_code == 200:
print(f" 🔔 텔레그램 알림 전송 성공")
return True
else:
print(f" ❌ 텔레그램 전송 실패 ({response.status_code}): {response.text}")
return False
except Exception as e:
print(f" ❌ 텔레그램 전송 중 에러: {e}")
return False
def check_and_alert(self, plant_info: dict, current_kw: float):
"""
발전량을 체크하고 필요 시 알림 전송
- 오전 10시 ~ 오후 5시에만 동작
- 상태 변경 시에만 알림 (중복 방지)
"""
# 1. 시간 체크 (오전 10시 ~ 오후 5시)
now = datetime.now()
if not (10 <= now.hour <= 17):
return
site_id = plant_info.get('id')
plant_name = plant_info.get('display_name', plant_info.get('name'))
chat_id = plant_info.get('telegram_chat_id')
if not site_id:
return
# 2. 현재 DB 상태 확인
current_status = 'NORMAL'
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("SELECT alert_status FROM alert_history WHERE site_id = ?", (site_id,))
row = cursor.fetchone()
if row:
current_status = row[0]
else:
# 초기값 생성
cursor.execute("INSERT INTO alert_history (site_id, alert_status) VALUES (?, ?)", (site_id, 'NORMAL'))
conn.commit()
# 3. 상태 전이 로직
new_status = current_status
# [Case A] 발전량 0 (이상 감지)
if current_kw == 0:
if current_status == 'NORMAL':
# NORMAL -> ALERT: 알림 전송
print(f" 🚨 [Alert] {plant_name} 발전량 0kW 감지! 알림 전송 시도...")
if chat_id:
message = (
f"🚨 [긴급] 발전소 이상 감지!\n\n"
f"- 발전소: {plant_name}\n"
f"- 상태: 발전량 0kW\n"
f"- 시간: {now.strftime('%Y-%m-%d %H:%M:%S')}"
)
if self.send_telegram_message(chat_id, message):
new_status = 'ALERT'
else:
print(f" ⚠️ {plant_name}: Chat ID 오류로 알림 실패")
# 전송 실패해도 상태를 ALERT로 할 것인가?
# 실패했다면 다음에 다시 시도해야 하므로 NORMAL 유지
else:
print(f" ⚠️ {plant_name}: 설정된 Chat ID가 없습니다. (config.py 확인)")
else:
# 이미 ALERT 상태: 중복 알림 생략
pass
# [Case B] 발전량 > 0 (정상 복구)
else:
if current_status == 'ALERT':
# ALERT -> NORMAL: 상태 리셋
print(f" ✅ [Alert] {plant_name} 정상 복구됨 ({current_kw}kW)")
# 복구 알림은 옵션 (현재는 생략)
new_status = 'NORMAL'
# 4. 상태 변경 시 DB 업데이트
if new_status != current_status:
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE alert_history
SET alert_status = ?, last_alert_time = ?
WHERE site_id = ?
""", (new_status, now.isoformat(), site_id))
conn.commit()