# ========================================== # crawler_manager.py - 크롤링 스케줄 최적화 미들웨어 # ========================================== # NAS 리소스 절약을 위해 SQLite 기반으로 각 사이트의 # 업데이트 패턴을 학습하고 최적 시점에만 크롤링 실행 import sqlite3 from datetime import datetime, timedelta from pathlib import Path class CrawlerManager: """ 크롤링 스케줄을 자동으로 최적화하는 매니저 클래스 - LEARNING 상태: 모든 크롤링 허용 (패턴 학습 중) - OPTIMIZED 상태: 학습된 업데이트 시점 전후에만 크롤링 허용 """ def __init__(self, db_path: str = None): """ DB 연결 및 테이블 초기화 Args: db_path: SQLite DB 파일 경로. 기본값은 스크립트와 같은 디렉토리의 crawler_manager.db """ if db_path is None: db_path = Path(__file__).parent / "crawler_manager.db" self.db_path = str(db_path) self._init_db() def _init_db(self): """테이블이 없으면 생성""" with sqlite3.connect(self.db_path) as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS site_rules ( site_id TEXT PRIMARY KEY, status TEXT DEFAULT 'LEARNING', target_minute INTEGER DEFAULT -1, start_date TEXT, last_run TEXT ) """) conn.commit() def _get_connection(self) -> sqlite3.Connection: """SQLite 연결 반환""" return sqlite3.connect(self.db_path) def register_site(self, site_id: str) -> bool: """ 새로운 사이트 등록 Args: site_id: 사이트 식별자 (예: 'nrems-01') Returns: bool: 새로 등록되었으면 True, 이미 존재하면 False """ with self._get_connection() as conn: cursor = conn.cursor() # 이미 존재하는지 확인 cursor.execute("SELECT 1 FROM site_rules WHERE site_id = ?", (site_id,)) if cursor.fetchone(): return False # 새로 등록 today = datetime.now().strftime("%Y-%m-%d") cursor.execute(""" INSERT INTO site_rules (site_id, status, target_minute, start_date, last_run) VALUES (?, 'LEARNING', -1, ?, NULL) """, (site_id, today)) conn.commit() print(f" 📝 [CrawlerManager] '{site_id}' 신규 등록 (LEARNING 모드)") return True def should_run(self, site_id: str) -> bool: """ 현재 시점에 해당 사이트를 크롤링해야 하는지 판단 Args: site_id: 사이트 식별자 Returns: bool: 크롤링 실행 여부 """ now = datetime.now() current_hour = now.hour current_minute = now.minute # 야간 모드: 21시 ~ 05시에는 크롤링 중지 if current_hour >= 21 or current_hour < 5: return False with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT status, target_minute, last_run FROM site_rules WHERE site_id = ? """, (site_id,)) row = cursor.fetchone() # 등록되지 않은 사이트면 일단 등록 후 True 반환 if not row: self.register_site(site_id) return True status, target_minute, last_run = row # LEARNING 상태: 항상 실행 허용 (패턴 학습 목적) if status == "LEARNING": return True # OPTIMIZED 상태: 최적화된 시간대에만 실행 if status == "OPTIMIZED" and target_minute >= 0: # target_minute 이후 10분 윈도우 내에서만 허용 # 예: target_minute=15 → 15~24분 사이에만 실행 window_start = target_minute window_end = (target_minute + 10) % 60 # 윈도우가 시간 경계를 넘는 경우 (예: 55~04분) if window_start <= window_end: in_window = window_start <= current_minute < window_end else: in_window = current_minute >= window_start or current_minute < window_end if not in_window: return False # 중복 실행 방지: 최근 1시간 내 실행 이력이 있으면 스킵 if last_run: try: last_run_dt = datetime.fromisoformat(last_run) if now - last_run_dt < timedelta(hours=1): return False except (ValueError, TypeError): pass return True # 기타 상태는 기본적으로 허용 return True def update_optimization(self, site_id: str, detected_minute: int) -> bool: """ 사이트의 업데이트 패턴이 감지되면 OPTIMIZED 상태로 전환 Args: site_id: 사이트 식별자 detected_minute: 업데이트가 감지된 분 (0~59) Returns: bool: 업데이트 성공 여부 """ if not 0 <= detected_minute <= 59: print(f" ⚠️ [CrawlerManager] 유효하지 않은 minute 값: {detected_minute}") return False with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE site_rules SET status = 'OPTIMIZED', target_minute = ? WHERE site_id = ? """, (detected_minute, site_id)) conn.commit() if cursor.rowcount > 0: print(f" ✅ [CrawlerManager] '{site_id}' → OPTIMIZED (매시 {detected_minute}분)") return True else: print(f" ⚠️ [CrawlerManager] '{site_id}' 사이트를 찾을 수 없음") return False def record_run(self, site_id: str): """ 크롤링 성공 시 마지막 실행 시간 기록 Args: site_id: 사이트 식별자 """ now_str = datetime.now().isoformat() with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE site_rules SET last_run = ? WHERE site_id = ? """, (now_str, site_id)) conn.commit() def get_site_info(self, site_id: str) -> dict: """ 사이트 정보 조회 (디버깅/모니터링용) Args: site_id: 사이트 식별자 Returns: dict: 사이트 정보 또는 None """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT site_id, status, target_minute, start_date, last_run FROM site_rules WHERE site_id = ? """, (site_id,)) row = cursor.fetchone() if row: return { "site_id": row[0], "status": row[1], "target_minute": row[2], "start_date": row[3], "last_run": row[4] } return None def get_all_sites(self) -> list: """ 모든 사이트 정보 조회 Returns: list: 모든 사이트 정보 리스트 """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT site_id, status, target_minute, start_date, last_run FROM site_rules ORDER BY site_id """) rows = cursor.fetchall() return [ { "site_id": row[0], "status": row[1], "target_minute": row[2], "start_date": row[3], "last_run": row[4] } for row in rows ] def reset_to_learning(self, site_id: str) -> bool: """ 사이트를 다시 LEARNING 상태로 리셋 Args: site_id: 사이트 식별자 Returns: bool: 리셋 성공 여부 """ with self._get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE site_rules SET status = 'LEARNING', target_minute = -1 WHERE site_id = ? """, (site_id,)) conn.commit() return cursor.rowcount > 0 # ========================================== # Example Usage (main.py에서의 활용 예시) # ========================================== # # from crawler_manager import CrawlerManager # from crawlers import get_crawler # from config import get_all_plants # # def main(): # # 매니저 초기화 # manager = CrawlerManager() # # # 모든 발전소 순회 # for plant in get_all_plants(): # site_id = plant.get('id', '') # # if not site_id: # continue # # # 1. 사이트 등록 (최초 1회) # manager.register_site(site_id) # # # 2. 실행 여부 확인 # if not manager.should_run(site_id): # print(f" ⏭️ {site_id} 스킵 (최적화 윈도우 외)") # continue # # # 3. 크롤링 실행 # try: # crawler_func = get_crawler(plant['type']) # data = crawler_func(plant) # # if data: # # 4. 실행 기록 # manager.record_run(site_id) # # # 5. (옵션) 패턴 분석 후 최적화 # # 예: 데이터가 항상 매시 10분에 갱신된다면 # # manager.update_optimization(site_id, 10) # # except Exception as e: # print(f" ❌ {site_id} 오류: {e}") # # if __name__ == "__main__": # main() # # ========================================== # Cron 예시 (5분마다 실행) # ========================================== # */5 * * * * cd /volume1/dev/SolorPower/crawler && \ # /volume1/dev/SolorPower/crawler/venv/bin/python main.py >> cron.log 2>&1 # # - LEARNING 사이트는 5분마다 크롤링 (패턴 학습) # - OPTIMIZED 사이트는 학습된 시점 직후 10분 윈도우에서만 크롤링 # - 야간(21시~05시)에는 모든 크롤링 중지 # ========================================== if __name__ == "__main__": # 테스트 코드 manager = CrawlerManager() print("=== CrawlerManager 테스트 ===\n") # 사이트 등록 test_sites = ["nrems-01", "nrems-02", "kremc-05"] for site_id in test_sites: manager.register_site(site_id) # 현재 상태 출력 print("\n[등록된 사이트]") for site in manager.get_all_sites(): print(f" {site['site_id']}: {site['status']} (target: {site['target_minute']}분)") # should_run 테스트 print("\n[should_run 테스트]") for site_id in test_sites: result = manager.should_run(site_id) print(f" {site_id}: {'✅ 실행' if result else '⏭️ 스킵'}") # 최적화 적용 print("\n[최적화 적용]") manager.update_optimization("nrems-01", 15) # 매시 15분에 업데이트 manager.update_optimization("kremc-05", 30) # 매시 30분에 업데이트 # 최적화 후 상태 print("\n[최적화 후 상태]") for site in manager.get_all_sites(): print(f" {site['site_id']}: {site['status']} (target: {site['target_minute']}분)") # 실행 기록 manager.record_run("nrems-01") print("\n=== 테스트 완료 ===")