370 lines
12 KiB
Python
370 lines
12 KiB
Python
# ==========================================
|
|
# crawler_manager.py - 크롤링 스케줄 최적화 미들웨어
|
|
# ==========================================
|
|
# NAS 리소스 절약을 위해 SQLite 기반으로 각 사이트의
|
|
# 업데이트 패턴을 학습하고 최적 시점에만 크롤링 실행
|
|
|
|
import sqlite3
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
|
|
class CrawlerManager:
|
|
"""
|
|
크롤링 스케줄을 자동으로 최적화하는 매니저 클래스
|
|
|
|
- LEARNING 상태: 모든 크롤링 허용 (패턴 학습 중)
|
|
- OPTIMIZED 상태: 학습된 업데이트 시점 전후에만 크롤링 허용
|
|
"""
|
|
|
|
def __init__(self, db_path: str = None):
|
|
"""
|
|
DB 연결 및 테이블 초기화
|
|
|
|
Args:
|
|
db_path: SQLite DB 파일 경로. 기본값은 스크립트와 같은 디렉토리의 crawler_manager.db
|
|
"""
|
|
if db_path is None:
|
|
db_path = Path(__file__).parent / "crawler_manager.db"
|
|
|
|
self.db_path = str(db_path)
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""테이블이 없으면 생성"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
CREATE TABLE IF NOT EXISTS site_rules (
|
|
site_id TEXT PRIMARY KEY,
|
|
status TEXT DEFAULT 'LEARNING',
|
|
target_minute INTEGER DEFAULT -1,
|
|
start_date TEXT,
|
|
last_run TEXT
|
|
)
|
|
""")
|
|
conn.commit()
|
|
|
|
def _get_connection(self) -> sqlite3.Connection:
|
|
"""SQLite 연결 반환"""
|
|
return sqlite3.connect(self.db_path)
|
|
|
|
def register_site(self, site_id: str) -> bool:
|
|
"""
|
|
새로운 사이트 등록
|
|
|
|
Args:
|
|
site_id: 사이트 식별자 (예: 'nrems-01')
|
|
|
|
Returns:
|
|
bool: 새로 등록되었으면 True, 이미 존재하면 False
|
|
"""
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# 이미 존재하는지 확인
|
|
cursor.execute("SELECT 1 FROM site_rules WHERE site_id = ?", (site_id,))
|
|
if cursor.fetchone():
|
|
return False
|
|
|
|
# 새로 등록
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
cursor.execute("""
|
|
INSERT INTO site_rules (site_id, status, target_minute, start_date, last_run)
|
|
VALUES (?, 'LEARNING', -1, ?, NULL)
|
|
""", (site_id, today))
|
|
conn.commit()
|
|
|
|
print(f" 📝 [CrawlerManager] '{site_id}' 신규 등록 (LEARNING 모드)")
|
|
return True
|
|
|
|
def should_run(self, site_id: str) -> bool:
|
|
"""
|
|
현재 시점에 해당 사이트를 크롤링해야 하는지 판단
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
|
|
Returns:
|
|
bool: 크롤링 실행 여부
|
|
"""
|
|
now = datetime.now()
|
|
current_hour = now.hour
|
|
current_minute = now.minute
|
|
|
|
# 야간 모드: 21시 ~ 05시에는 크롤링 중지
|
|
if current_hour >= 21 or current_hour < 5:
|
|
return False
|
|
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT status, target_minute, last_run
|
|
FROM site_rules
|
|
WHERE site_id = ?
|
|
""", (site_id,))
|
|
row = cursor.fetchone()
|
|
|
|
# 등록되지 않은 사이트면 일단 등록 후 True 반환
|
|
if not row:
|
|
self.register_site(site_id)
|
|
return True
|
|
|
|
status, target_minute, last_run = row
|
|
|
|
# LEARNING 상태: 항상 실행 허용 (패턴 학습 목적)
|
|
if status == "LEARNING":
|
|
return True
|
|
|
|
# OPTIMIZED 상태: 최적화된 시간대에만 실행
|
|
if status == "OPTIMIZED" and target_minute >= 0:
|
|
# target_minute 이후 10분 윈도우 내에서만 허용
|
|
# 예: target_minute=15 → 15~24분 사이에만 실행
|
|
window_start = target_minute
|
|
window_end = (target_minute + 10) % 60
|
|
|
|
# 윈도우가 시간 경계를 넘는 경우 (예: 55~04분)
|
|
if window_start <= window_end:
|
|
in_window = window_start <= current_minute < window_end
|
|
else:
|
|
in_window = current_minute >= window_start or current_minute < window_end
|
|
|
|
if not in_window:
|
|
return False
|
|
|
|
# 중복 실행 방지: 최근 1시간 내 실행 이력이 있으면 스킵
|
|
if last_run:
|
|
try:
|
|
last_run_dt = datetime.fromisoformat(last_run)
|
|
if now - last_run_dt < timedelta(hours=1):
|
|
return False
|
|
except (ValueError, TypeError):
|
|
pass
|
|
|
|
return True
|
|
|
|
# 기타 상태는 기본적으로 허용
|
|
return True
|
|
|
|
def update_optimization(self, site_id: str, detected_minute: int) -> bool:
|
|
"""
|
|
사이트의 업데이트 패턴이 감지되면 OPTIMIZED 상태로 전환
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
detected_minute: 업데이트가 감지된 분 (0~59)
|
|
|
|
Returns:
|
|
bool: 업데이트 성공 여부
|
|
"""
|
|
if not 0 <= detected_minute <= 59:
|
|
print(f" ⚠️ [CrawlerManager] 유효하지 않은 minute 값: {detected_minute}")
|
|
return False
|
|
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE site_rules
|
|
SET status = 'OPTIMIZED', target_minute = ?
|
|
WHERE site_id = ?
|
|
""", (detected_minute, site_id))
|
|
conn.commit()
|
|
|
|
if cursor.rowcount > 0:
|
|
print(f" ✅ [CrawlerManager] '{site_id}' → OPTIMIZED (매시 {detected_minute}분)")
|
|
return True
|
|
else:
|
|
print(f" ⚠️ [CrawlerManager] '{site_id}' 사이트를 찾을 수 없음")
|
|
return False
|
|
|
|
def record_run(self, site_id: str):
|
|
"""
|
|
크롤링 성공 시 마지막 실행 시간 기록
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
"""
|
|
now_str = datetime.now().isoformat()
|
|
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE site_rules
|
|
SET last_run = ?
|
|
WHERE site_id = ?
|
|
""", (now_str, site_id))
|
|
conn.commit()
|
|
|
|
def get_site_info(self, site_id: str) -> dict:
|
|
"""
|
|
사이트 정보 조회 (디버깅/모니터링용)
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
|
|
Returns:
|
|
dict: 사이트 정보 또는 None
|
|
"""
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT site_id, status, target_minute, start_date, last_run
|
|
FROM site_rules
|
|
WHERE site_id = ?
|
|
""", (site_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
return {
|
|
"site_id": row[0],
|
|
"status": row[1],
|
|
"target_minute": row[2],
|
|
"start_date": row[3],
|
|
"last_run": row[4]
|
|
}
|
|
return None
|
|
|
|
def get_all_sites(self) -> list:
|
|
"""
|
|
모든 사이트 정보 조회
|
|
|
|
Returns:
|
|
list: 모든 사이트 정보 리스트
|
|
"""
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT site_id, status, target_minute, start_date, last_run
|
|
FROM site_rules
|
|
ORDER BY site_id
|
|
""")
|
|
rows = cursor.fetchall()
|
|
|
|
return [
|
|
{
|
|
"site_id": row[0],
|
|
"status": row[1],
|
|
"target_minute": row[2],
|
|
"start_date": row[3],
|
|
"last_run": row[4]
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
def reset_to_learning(self, site_id: str) -> bool:
|
|
"""
|
|
사이트를 다시 LEARNING 상태로 리셋
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
|
|
Returns:
|
|
bool: 리셋 성공 여부
|
|
"""
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE site_rules
|
|
SET status = 'LEARNING', target_minute = -1
|
|
WHERE site_id = ?
|
|
""", (site_id,))
|
|
conn.commit()
|
|
|
|
return cursor.rowcount > 0
|
|
|
|
|
|
# ==========================================
|
|
# Example Usage (main.py에서의 활용 예시)
|
|
# ==========================================
|
|
#
|
|
# from crawler_manager import CrawlerManager
|
|
# from crawlers import get_crawler
|
|
# from config import get_all_plants
|
|
#
|
|
# def main():
|
|
# # 매니저 초기화
|
|
# manager = CrawlerManager()
|
|
#
|
|
# # 모든 발전소 순회
|
|
# for plant in get_all_plants():
|
|
# site_id = plant.get('id', '')
|
|
#
|
|
# if not site_id:
|
|
# continue
|
|
#
|
|
# # 1. 사이트 등록 (최초 1회)
|
|
# manager.register_site(site_id)
|
|
#
|
|
# # 2. 실행 여부 확인
|
|
# if not manager.should_run(site_id):
|
|
# print(f" ⏭️ {site_id} 스킵 (최적화 윈도우 외)")
|
|
# continue
|
|
#
|
|
# # 3. 크롤링 실행
|
|
# try:
|
|
# crawler_func = get_crawler(plant['type'])
|
|
# data = crawler_func(plant)
|
|
#
|
|
# if data:
|
|
# # 4. 실행 기록
|
|
# manager.record_run(site_id)
|
|
#
|
|
# # 5. (옵션) 패턴 분석 후 최적화
|
|
# # 예: 데이터가 항상 매시 10분에 갱신된다면
|
|
# # manager.update_optimization(site_id, 10)
|
|
#
|
|
# except Exception as e:
|
|
# print(f" ❌ {site_id} 오류: {e}")
|
|
#
|
|
# if __name__ == "__main__":
|
|
# main()
|
|
#
|
|
# ==========================================
|
|
# Cron 예시 (5분마다 실행)
|
|
# ==========================================
|
|
# */5 * * * * cd /volume1/dev/SolorPower/crawler && \
|
|
# /volume1/dev/SolorPower/crawler/venv/bin/python main.py >> cron.log 2>&1
|
|
#
|
|
# - LEARNING 사이트는 5분마다 크롤링 (패턴 학습)
|
|
# - OPTIMIZED 사이트는 학습된 시점 직후 10분 윈도우에서만 크롤링
|
|
# - 야간(21시~05시)에는 모든 크롤링 중지
|
|
# ==========================================
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# 테스트 코드
|
|
manager = CrawlerManager()
|
|
|
|
print("=== CrawlerManager 테스트 ===\n")
|
|
|
|
# 사이트 등록
|
|
test_sites = ["nrems-01", "nrems-02", "kremc-05"]
|
|
for site_id in test_sites:
|
|
manager.register_site(site_id)
|
|
|
|
# 현재 상태 출력
|
|
print("\n[등록된 사이트]")
|
|
for site in manager.get_all_sites():
|
|
print(f" {site['site_id']}: {site['status']} (target: {site['target_minute']}분)")
|
|
|
|
# should_run 테스트
|
|
print("\n[should_run 테스트]")
|
|
for site_id in test_sites:
|
|
result = manager.should_run(site_id)
|
|
print(f" {site_id}: {'✅ 실행' if result else '⏭️ 스킵'}")
|
|
|
|
# 최적화 적용
|
|
print("\n[최적화 적용]")
|
|
manager.update_optimization("nrems-01", 15) # 매시 15분에 업데이트
|
|
manager.update_optimization("kremc-05", 30) # 매시 30분에 업데이트
|
|
|
|
# 최적화 후 상태
|
|
print("\n[최적화 후 상태]")
|
|
for site in manager.get_all_sites():
|
|
print(f" {site['site_id']}: {site['status']} (target: {site['target_minute']}분)")
|
|
|
|
# 실행 기록
|
|
manager.record_run("nrems-01")
|
|
|
|
print("\n=== 테스트 완료 ===")
|