# ========================================== # crawler_manager.py - 크롤링 스케줄 최적화 미들웨어 # ========================================== # NAS 리소스 절약을 위해 SQLite 기반으로 각 사이트의 # 업데이트 패턴을 학습하고, 데이터가 실제로 변경된 시점에만 DB 저장 # # [설계 원칙] # - 크롤링(HTTP 요청) 자체는 항상 허용 (야간 제외) # → 원격 서버가 언제 업데이트할지 모르므로 주기적으로 확인해야 함 # - DB 저장은 데이터가 실제로 변경되었을 때만 실행 # → 중복 저장 방지 + NAS I/O 절약 # - 업데이트 패턴 학습은 부가 기능 (로깅용) import sqlite3 from datetime import datetime, timedelta from pathlib import Path class CrawlerManager: """ 크롤링 DB 저장을 최적화하는 매니저 클래스 - should_run: 야간(21시~05시) 여부만 체크 → False면 크롤링 자체를 스킵 - should_save: 데이터가 실제로 변경되었는지 확인 → False면 DB 저장 스킵 - analyze_and_optimize: 업데이트 패턴 학습 (로깅/모니터링 목적) """ def __init__(self, db_path: str = None): """ DB 연결 및 테이블 초기화 Args: db_path: SQLite DB 파일 경로. 기본값은 스크립트와 같은 디렉토리의 crawler_manager.db """ if db_path is None: db_path = Path(__file__).parent / "crawler_manager.db" self.db_path = str(db_path) self._init_db() def _init_db(self): """테이블이 없으면 생성""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.executescript(""" CREATE TABLE IF NOT EXISTS site_rules ( site_id TEXT PRIMARY KEY, status TEXT DEFAULT 'LEARNING', target_minute INTEGER DEFAULT -1, start_date TEXT, last_run TEXT ); CREATE TABLE IF NOT EXISTS site_data ( site_id TEXT PRIMARY KEY, kw REAL, today_kwh REAL, updated_at TEXT ); CREATE TABLE IF NOT EXISTS update_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, site_id TEXT, detected_minute INTEGER, detected_at TEXT ); """) conn.commit() def _get_connection(self) -> sqlite3.Connection: """SQLite 연결 반환 (타임아웃 설정 추가)""" return sqlite3.connect(self.db_path, timeout=10.0) def _cleanup_old_history(self): """오래된 히스토리 정리 (30일 이상 지난 데이터 삭제)""" try: with self._get_connection() as conn: cursor = conn.cursor() limit_date = (datetime.now() - timedelta(days=30)).isoformat() cursor.execute("DELETE FROM update_history WHERE detected_at < ?", (limit_date,)) conn.commit() except Exception as e: print(f"⚠️ [CrawlerManager] 히스토리 정리 실패: {e}") def register_site(self, site_id: str) -> bool: """ 새로운 사이트 등록 Args: site_id: 사이트 식별자 (예: 'nrems-01') Returns: bool: 새로 등록되었으면 True, 이미 존재하면 False """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT 1 FROM site_rules WHERE site_id = ?", (site_id,)) if cursor.fetchone(): return False today = datetime.now().strftime("%Y-%m-%d") cursor.execute(""" INSERT INTO site_rules (site_id, status, target_minute, start_date, last_run) VALUES (?, 'LEARNING', -1, ?, NULL) """, (site_id, today)) conn.commit() print(f" 📝 [CrawlerManager] '{site_id}' 신규 등록 (LEARNING 모드)") return True def should_run(self, site_id: str) -> bool: """ 현재 시점에 해당 사이트를 크롤링(HTTP 요청)해야 하는지 판단. [변경 사항] 이전: OPTIMIZED 상태면 특정 분(minute) 윈도우에서만 크롤링 허용 → 문제: 원격 서버 업데이트 시점을 놓쳐 시계열 데이터 누락 현재: 야간(21시~05시)에만 False 반환, 그 외에는 항상 크롤링 허용 → DB 저장 여부는 should_save()에서 별도 결정 Args: site_id: 사이트 식별자 Returns: bool: 크롤링 실행 여부 (야간이면 False) """ now = datetime.now() current_hour = now.hour current_minute = now.minute # 야간 모드: 21시 ~ 05시에는 크롤링 중지 (발전 없는 시간대) if current_hour >= 21 or current_hour < 5: return False # 히스토리 정리 (05시 정각에 1회) if current_minute == 0 and current_hour == 5: self._cleanup_old_history() # 사이트 등록 (미등록 사이트 자동 등록) self.register_site(site_id) # 항상 크롤링 허용 (데이터 변경 여부는 should_save에서 판단) return True def should_save(self, site_id: str, current_data: dict) -> bool: """ 수집한 데이터를 DB에 저장해야 하는지 판단. 원격 서버의 데이터가 이전 수집 시점과 달라졌을 때만 True 반환. 이를 통해 중복 저장을 방지하고 NAS I/O를 절약. [저장 조건] - today_kwh(금일 발전량)가 증가했을 때: 반드시 저장 (핵심 지표) - kw(현재 출력)가 변했을 때: 저장 (실시간 상태 반영) - 마지막 저장 후 1시간 이상 경과했을 때: 강제 저장 (heartbeat) → 데이터가 정체돼도 최소 1시간에 1번은 기록 보장 Args: site_id: 사이트 식별자 current_data: {'kw': float, 'today': float} Returns: bool: DB에 저장해야 하면 True """ new_kw = float(current_data.get('kw', 0)) new_today = float(current_data.get('today', 0)) now = datetime.now() with self._get_connection() as conn: cursor = conn.cursor() # 이전 데이터 조회 cursor.execute( "SELECT kw, today_kwh, updated_at FROM site_data WHERE site_id = ?", (site_id,) ) row = cursor.fetchone() should_save = False if not row: # 첫 수집 → 반드시 저장 should_save = True else: last_kw, last_today, last_updated_at = row # 1. 금일 발전량이 증가했으면 저장 if new_today - last_today > 0.001: should_save = True # 2. 현재 출력(kW)이 변했으면 저장 elif abs(new_kw - last_kw) > 0.001: should_save = True # 3. 1시간 이상 저장 없었으면 강제 heartbeat 저장 elif last_updated_at: try: last_dt = datetime.fromisoformat(last_updated_at) if now - last_dt >= timedelta(hours=1): should_save = True except (ValueError, TypeError): should_save = True if should_save: # 현재 상태를 캐시에 업데이트 cursor.execute(""" INSERT INTO site_data (site_id, kw, today_kwh, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT(site_id) DO UPDATE SET kw = excluded.kw, today_kwh = excluded.today_kwh, updated_at = excluded.updated_at """, (site_id, new_kw, new_today, now.isoformat())) conn.commit() return should_save def check_data_change(self, site_id: str, current_data: dict) -> bool: """ [하위 호환용] should_save의 별칭. 기존 main.py 코드와의 호환성을 위해 유지. 내부적으로 should_save를 호출하며, 패턴 분석도 함께 수행. """ return self.should_save(site_id, current_data) def analyze_and_optimize(self, site_id: str): """ 업데이트 패턴 분석 및 기록 (모니터링/로깅 목적). 데이터 변경이 감지되었을 때 호출하여 원격 서버의 업데이트 패턴을 학습. 이 정보는 현재 크롤링 스케줄 제어에는 사용하지 않으며, 향후 분석이나 시각화를 위한 참고 데이터로만 활용. """ now = datetime.now() current_minute = now.minute # 히스토리 기록 with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO update_history (site_id, detected_minute, detected_at) VALUES (?, ?, ?) """, (site_id, current_minute, now.isoformat())) # 최근 기록 조회 (최대 5개) cursor.execute(""" SELECT detected_minute FROM update_history WHERE site_id = ? ORDER BY id DESC LIMIT 5 """, (site_id,)) minutes = [r[0] for r in cursor.fetchall()] conn.commit() # 패턴 분석 (최소 3회 이상 데이터 필요) if len(minutes) < 3: return recent = minutes[:3] avg = sum(recent) / len(recent) # 최대 편차가 5분 이내면 패턴 안정 (참고 정보로만 기록) is_consistent = all(abs(m - avg) <= 5 for m in recent) if is_consistent: target = int(avg) # 스케줄 제어에는 사용하지 않지만, 상태 기록은 유지 (모니터링용) self._record_pattern(site_id, target) else: print(f" 📊 [CrawlerManager] '{site_id}' 패턴 분석 중... 최근: {recent}") def _record_pattern(self, site_id: str, detected_minute: int): """ 감지된 업데이트 패턴을 DB에 기록 (모니터링용). 크롤링 스케줄 제어에는 영향을 주지 않음. """ if not 0 <= detected_minute <= 59: return with self._get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT status, target_minute FROM site_rules WHERE site_id = ?", (site_id,)) row = cursor.fetchone() if row and row[0] == 'OPTIMIZED' and abs(row[1] - detected_minute) <= 2: return # 이미 동일한 패턴 기록됨 cursor.execute(""" UPDATE site_rules SET status = 'OPTIMIZED', target_minute = ? WHERE site_id = ? """, (detected_minute, site_id)) conn.commit() if cursor.rowcount > 0: print(f" 📌 [CrawlerManager] '{site_id}' 업데이트 패턴 감지: 매시 {detected_minute}분 경 (참고용)") def update_optimization(self, site_id: str, detected_minute: int) -> bool: """ [하위 호환용] 패턴 기록 메서드. 내부적으로 _record_pattern을 호출. """ self._record_pattern(site_id, detected_minute) return True def record_run(self, site_id: str): """ 크롤링 성공 시 마지막 실행 시간 기록 Args: site_id: 사이트 식별자 """ now_str = datetime.now().isoformat() with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE site_rules SET last_run = ? WHERE site_id = ? """, (now_str, site_id)) conn.commit() def get_site_info(self, site_id: str) -> dict: """ 사이트 정보 조회 (디버깅/모니터링용) Args: site_id: 사이트 식별자 Returns: dict: 사이트 정보 또는 None """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT site_id, status, target_minute, start_date, last_run FROM site_rules WHERE site_id = ? """, (site_id,)) row = cursor.fetchone() if row: return { "site_id": row[0], "status": row[1], "target_minute": row[2], "start_date": row[3], "last_run": row[4] } return None def get_all_sites(self) -> list: """ 모든 사이트 정보 조회 Returns: list: 모든 사이트 정보 리스트 """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT site_id, status, target_minute, start_date, last_run FROM site_rules ORDER BY site_id """) rows = cursor.fetchall() return [ { "site_id": row[0], "status": row[1], "target_minute": row[2], "start_date": row[3], "last_run": row[4] } for row in rows ] def reset_to_learning(self, site_id: str) -> bool: """ 사이트를 다시 LEARNING 상태로 리셋 Args: site_id: 사이트 식별자 Returns: bool: 리셋 성공 여부 """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE site_rules SET status = 'LEARNING', target_minute = -1 WHERE site_id = ? """, (site_id,)) conn.commit() return cursor.rowcount > 0 # ========================================== # main.py 연동 방식 (변경 없음 - 하위 호환 유지) # ========================================== # # main.py에서의 사용 흐름: # # 1. should_run(site_id) # → 야간이면 False (크롤링 자체 스킵) # → 그 외에는 항상 True (항상 HTTP 요청) # # 2. 크롤링(HTTP 요청) 실행 # # 3. record_run(item_id) ← 크롤링 성공 기록 # # 4. check_data_change(item_id, item) ← should_save와 동일 # → True: 데이터 변경됨 → DB 저장 진행 # → False: 변경 없음 → DB 저장 스킵 # # 5. analyze_and_optimize(item_id) ← 패턴 학습 (선택적) # # ========================================== # Cron 설정 (10분마다 실행 권장) # ========================================== # */10 * * * * cd /volume1/dev/SolorPower/crawler && \ # /volume1/dev/SolorPower/crawler/venv/bin/python main.py >> cron.log 2>&1 # ========================================== if __name__ == "__main__": manager = CrawlerManager() print("=== CrawlerManager 테스트 ===\n") test_sites = ["nrems-01", "nrems-02", "kremc-05"] for site_id in test_sites: manager.register_site(site_id) print("\n[등록된 사이트]") for site in manager.get_all_sites(): print(f" {site['site_id']}: {site['status']} (target: {site['target_minute']}분)") print("\n[should_run 테스트]") for site_id in test_sites: result = manager.should_run(site_id) print(f" {site_id}: {'✅ 실행' if result else '⏭️ 스킵 (야간)'}") print("\n[should_save 테스트]") test_data = {'kw': 15.5, 'today': 120.0} for site_id in test_sites: result = manager.should_save(site_id, test_data) print(f" {site_id}: {'✅ 저장' if result else '⏭️ 스킵 (변경 없음)'}") print("\n=== 테스트 완료 ===")