solorpower_crawler/crawler_manager.py

370 lines
12 KiB
Python

# ==========================================
# crawler_manager.py - 크롤링 스케줄 최적화 미들웨어
# ==========================================
# NAS 리소스 절약을 위해 SQLite 기반으로 각 사이트의
# 업데이트 패턴을 학습하고 최적 시점에만 크롤링 실행
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
class CrawlerManager:
"""
크롤링 스케줄을 자동으로 최적화하는 매니저 클래스
- LEARNING 상태: 모든 크롤링 허용 (패턴 학습 중)
- OPTIMIZED 상태: 학습된 업데이트 시점 전후에만 크롤링 허용
"""
def __init__(self, db_path: str = None):
"""
DB 연결 및 테이블 초기화
Args:
db_path: SQLite DB 파일 경로. 기본값은 스크립트와 같은 디렉토리의 crawler_manager.db
"""
if db_path is None:
db_path = Path(__file__).parent / "crawler_manager.db"
self.db_path = str(db_path)
self._init_db()
def _init_db(self):
"""테이블이 없으면 생성"""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS site_rules (
site_id TEXT PRIMARY KEY,
status TEXT DEFAULT 'LEARNING',
target_minute INTEGER DEFAULT -1,
start_date TEXT,
last_run TEXT
)
""")
conn.commit()
def _get_connection(self) -> sqlite3.Connection:
"""SQLite 연결 반환"""
return sqlite3.connect(self.db_path)
def register_site(self, site_id: str) -> bool:
"""
새로운 사이트 등록
Args:
site_id: 사이트 식별자 (예: 'nrems-01')
Returns:
bool: 새로 등록되었으면 True, 이미 존재하면 False
"""
with self._get_connection() as conn:
cursor = conn.cursor()
# 이미 존재하는지 확인
cursor.execute("SELECT 1 FROM site_rules WHERE site_id = ?", (site_id,))
if cursor.fetchone():
return False
# 새로 등록
today = datetime.now().strftime("%Y-%m-%d")
cursor.execute("""
INSERT INTO site_rules (site_id, status, target_minute, start_date, last_run)
VALUES (?, 'LEARNING', -1, ?, NULL)
""", (site_id, today))
conn.commit()
print(f" 📝 [CrawlerManager] '{site_id}' 신규 등록 (LEARNING 모드)")
return True
def should_run(self, site_id: str) -> bool:
"""
현재 시점에 해당 사이트를 크롤링해야 하는지 판단
Args:
site_id: 사이트 식별자
Returns:
bool: 크롤링 실행 여부
"""
now = datetime.now()
current_hour = now.hour
current_minute = now.minute
# 야간 모드: 21시 ~ 05시에는 크롤링 중지
if current_hour >= 21 or current_hour < 5:
return False
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT status, target_minute, last_run
FROM site_rules
WHERE site_id = ?
""", (site_id,))
row = cursor.fetchone()
# 등록되지 않은 사이트면 일단 등록 후 True 반환
if not row:
self.register_site(site_id)
return True
status, target_minute, last_run = row
# LEARNING 상태: 항상 실행 허용 (패턴 학습 목적)
if status == "LEARNING":
return True
# OPTIMIZED 상태: 최적화된 시간대에만 실행
if status == "OPTIMIZED" and target_minute >= 0:
# target_minute 이후 10분 윈도우 내에서만 허용
# 예: target_minute=15 → 15~24분 사이에만 실행
window_start = target_minute
window_end = (target_minute + 10) % 60
# 윈도우가 시간 경계를 넘는 경우 (예: 55~04분)
if window_start <= window_end:
in_window = window_start <= current_minute < window_end
else:
in_window = current_minute >= window_start or current_minute < window_end
if not in_window:
return False
# 중복 실행 방지: 최근 1시간 내 실행 이력이 있으면 스킵
if last_run:
try:
last_run_dt = datetime.fromisoformat(last_run)
if now - last_run_dt < timedelta(hours=1):
return False
except (ValueError, TypeError):
pass
return True
# 기타 상태는 기본적으로 허용
return True
def update_optimization(self, site_id: str, detected_minute: int) -> bool:
"""
사이트의 업데이트 패턴이 감지되면 OPTIMIZED 상태로 전환
Args:
site_id: 사이트 식별자
detected_minute: 업데이트가 감지된 분 (0~59)
Returns:
bool: 업데이트 성공 여부
"""
if not 0 <= detected_minute <= 59:
print(f" ⚠️ [CrawlerManager] 유효하지 않은 minute 값: {detected_minute}")
return False
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE site_rules
SET status = 'OPTIMIZED', target_minute = ?
WHERE site_id = ?
""", (detected_minute, site_id))
conn.commit()
if cursor.rowcount > 0:
print(f" ✅ [CrawlerManager] '{site_id}' → OPTIMIZED (매시 {detected_minute}분)")
return True
else:
print(f" ⚠️ [CrawlerManager] '{site_id}' 사이트를 찾을 수 없음")
return False
def record_run(self, site_id: str):
"""
크롤링 성공 시 마지막 실행 시간 기록
Args:
site_id: 사이트 식별자
"""
now_str = datetime.now().isoformat()
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE site_rules
SET last_run = ?
WHERE site_id = ?
""", (now_str, site_id))
conn.commit()
def get_site_info(self, site_id: str) -> dict:
"""
사이트 정보 조회 (디버깅/모니터링용)
Args:
site_id: 사이트 식별자
Returns:
dict: 사이트 정보 또는 None
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT site_id, status, target_minute, start_date, last_run
FROM site_rules
WHERE site_id = ?
""", (site_id,))
row = cursor.fetchone()
if row:
return {
"site_id": row[0],
"status": row[1],
"target_minute": row[2],
"start_date": row[3],
"last_run": row[4]
}
return None
def get_all_sites(self) -> list:
"""
모든 사이트 정보 조회
Returns:
list: 모든 사이트 정보 리스트
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT site_id, status, target_minute, start_date, last_run
FROM site_rules
ORDER BY site_id
""")
rows = cursor.fetchall()
return [
{
"site_id": row[0],
"status": row[1],
"target_minute": row[2],
"start_date": row[3],
"last_run": row[4]
}
for row in rows
]
def reset_to_learning(self, site_id: str) -> bool:
"""
사이트를 다시 LEARNING 상태로 리셋
Args:
site_id: 사이트 식별자
Returns:
bool: 리셋 성공 여부
"""
with self._get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE site_rules
SET status = 'LEARNING', target_minute = -1
WHERE site_id = ?
""", (site_id,))
conn.commit()
return cursor.rowcount > 0
# ==========================================
# Example Usage (main.py에서의 활용 예시)
# ==========================================
#
# from crawler_manager import CrawlerManager
# from crawlers import get_crawler
# from config import get_all_plants
#
# def main():
# # 매니저 초기화
# manager = CrawlerManager()
#
# # 모든 발전소 순회
# for plant in get_all_plants():
# site_id = plant.get('id', '')
#
# if not site_id:
# continue
#
# # 1. 사이트 등록 (최초 1회)
# manager.register_site(site_id)
#
# # 2. 실행 여부 확인
# if not manager.should_run(site_id):
# print(f" ⏭️ {site_id} 스킵 (최적화 윈도우 외)")
# continue
#
# # 3. 크롤링 실행
# try:
# crawler_func = get_crawler(plant['type'])
# data = crawler_func(plant)
#
# if data:
# # 4. 실행 기록
# manager.record_run(site_id)
#
# # 5. (옵션) 패턴 분석 후 최적화
# # 예: 데이터가 항상 매시 10분에 갱신된다면
# # manager.update_optimization(site_id, 10)
#
# except Exception as e:
# print(f" ❌ {site_id} 오류: {e}")
#
# if __name__ == "__main__":
# main()
#
# ==========================================
# Cron 예시 (5분마다 실행)
# ==========================================
# */5 * * * * cd /volume1/dev/SolorPower/crawler && \
# /volume1/dev/SolorPower/crawler/venv/bin/python main.py >> cron.log 2>&1
#
# - LEARNING 사이트는 5분마다 크롤링 (패턴 학습)
# - OPTIMIZED 사이트는 학습된 시점 직후 10분 윈도우에서만 크롤링
# - 야간(21시~05시)에는 모든 크롤링 중지
# ==========================================
if __name__ == "__main__":
# 테스트 코드
manager = CrawlerManager()
print("=== CrawlerManager 테스트 ===\n")
# 사이트 등록
test_sites = ["nrems-01", "nrems-02", "kremc-05"]
for site_id in test_sites:
manager.register_site(site_id)
# 현재 상태 출력
print("\n[등록된 사이트]")
for site in manager.get_all_sites():
print(f" {site['site_id']}: {site['status']} (target: {site['target_minute']}분)")
# should_run 테스트
print("\n[should_run 테스트]")
for site_id in test_sites:
result = manager.should_run(site_id)
print(f" {site_id}: {'✅ 실행' if result else '⏭️ 스킵'}")
# 최적화 적용
print("\n[최적화 적용]")
manager.update_optimization("nrems-01", 15) # 매시 15분에 업데이트
manager.update_optimization("kremc-05", 30) # 매시 30분에 업데이트
# 최적화 후 상태
print("\n[최적화 후 상태]")
for site in manager.get_all_sites():
print(f" {site['site_id']}: {site['status']} (target: {site['target_minute']}분)")
# 실행 기록
manager.record_run("nrems-01")
print("\n=== 테스트 완료 ===")