457 lines
16 KiB
Python
457 lines
16 KiB
Python
# ==========================================
|
|
# crawler_manager.py - 크롤링 스케줄 최적화 미들웨어
|
|
# ==========================================
|
|
# NAS 리소스 절약을 위해 SQLite 기반으로 각 사이트의
|
|
# 업데이트 패턴을 학습하고, 데이터가 실제로 변경된 시점에만 DB 저장
|
|
#
|
|
# [설계 원칙]
|
|
# - 크롤링(HTTP 요청) 자체는 항상 허용 (야간 제외)
|
|
# → 원격 서버가 언제 업데이트할지 모르므로 주기적으로 확인해야 함
|
|
# - DB 저장은 데이터가 실제로 변경되었을 때만 실행
|
|
# → 중복 저장 방지 + NAS I/O 절약
|
|
# - 업데이트 패턴 학습은 부가 기능 (로깅용)
|
|
|
|
import sqlite3
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
|
|
class CrawlerManager:
|
|
"""
|
|
크롤링 DB 저장을 최적화하는 매니저 클래스
|
|
|
|
- should_run: 야간(21시~05시) 여부만 체크 → False면 크롤링 자체를 스킵
|
|
- should_save: 데이터가 실제로 변경되었는지 확인 → False면 DB 저장 스킵
|
|
- analyze_and_optimize: 업데이트 패턴 학습 (로깅/모니터링 목적)
|
|
"""
|
|
|
|
def __init__(self, db_path: str = None):
|
|
"""
|
|
DB 연결 및 테이블 초기화
|
|
|
|
Args:
|
|
db_path: SQLite DB 파일 경로. 기본값은 스크립트와 같은 디렉토리의 crawler_manager.db
|
|
"""
|
|
if db_path is None:
|
|
db_path = Path(__file__).parent / "crawler_manager.db"
|
|
|
|
self.db_path = str(db_path)
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""테이블이 없으면 생성"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.cursor()
|
|
cursor.executescript("""
|
|
CREATE TABLE IF NOT EXISTS site_rules (
|
|
site_id TEXT PRIMARY KEY,
|
|
status TEXT DEFAULT 'LEARNING',
|
|
target_minute INTEGER DEFAULT -1,
|
|
start_date TEXT,
|
|
last_run TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS site_data (
|
|
site_id TEXT PRIMARY KEY,
|
|
kw REAL,
|
|
today_kwh REAL,
|
|
updated_at TEXT
|
|
);
|
|
|
|
CREATE TABLE IF NOT EXISTS update_history (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
site_id TEXT,
|
|
detected_minute INTEGER,
|
|
detected_at TEXT
|
|
);
|
|
""")
|
|
conn.commit()
|
|
|
|
def _get_connection(self) -> sqlite3.Connection:
|
|
"""SQLite 연결 반환 (타임아웃 설정 추가)"""
|
|
return sqlite3.connect(self.db_path, timeout=10.0)
|
|
|
|
def _cleanup_old_history(self):
|
|
"""오래된 히스토리 정리 (30일 이상 지난 데이터 삭제)"""
|
|
try:
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
limit_date = (datetime.now() - timedelta(days=30)).isoformat()
|
|
cursor.execute("DELETE FROM update_history WHERE detected_at < ?", (limit_date,))
|
|
conn.commit()
|
|
except Exception as e:
|
|
print(f"⚠️ [CrawlerManager] 히스토리 정리 실패: {e}")
|
|
|
|
def register_site(self, site_id: str) -> bool:
|
|
"""
|
|
새로운 사이트 등록
|
|
|
|
Args:
|
|
site_id: 사이트 식별자 (예: 'nrems-01')
|
|
|
|
Returns:
|
|
bool: 새로 등록되었으면 True, 이미 존재하면 False
|
|
"""
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT 1 FROM site_rules WHERE site_id = ?", (site_id,))
|
|
if cursor.fetchone():
|
|
return False
|
|
|
|
today = datetime.now().strftime("%Y-%m-%d")
|
|
cursor.execute("""
|
|
INSERT INTO site_rules (site_id, status, target_minute, start_date, last_run)
|
|
VALUES (?, 'LEARNING', -1, ?, NULL)
|
|
""", (site_id, today))
|
|
conn.commit()
|
|
|
|
print(f" 📝 [CrawlerManager] '{site_id}' 신규 등록 (LEARNING 모드)")
|
|
return True
|
|
|
|
def should_run(self, site_id: str) -> bool:
|
|
"""
|
|
현재 시점에 해당 사이트를 크롤링(HTTP 요청)해야 하는지 판단.
|
|
|
|
[변경 사항]
|
|
이전: OPTIMIZED 상태면 특정 분(minute) 윈도우에서만 크롤링 허용
|
|
→ 문제: 원격 서버 업데이트 시점을 놓쳐 시계열 데이터 누락
|
|
현재: 야간(21시~05시)에만 False 반환, 그 외에는 항상 크롤링 허용
|
|
→ DB 저장 여부는 should_save()에서 별도 결정
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
|
|
Returns:
|
|
bool: 크롤링 실행 여부 (야간이면 False)
|
|
"""
|
|
now = datetime.now()
|
|
current_hour = now.hour
|
|
current_minute = now.minute
|
|
|
|
# 야간 모드: 21시 ~ 05시에는 크롤링 중지 (발전 없는 시간대)
|
|
if current_hour >= 21 or current_hour < 5:
|
|
return False
|
|
|
|
# 히스토리 정리 (05시 정각에 1회)
|
|
if current_minute == 0 and current_hour == 5:
|
|
self._cleanup_old_history()
|
|
|
|
# 사이트 등록 (미등록 사이트 자동 등록)
|
|
self.register_site(site_id)
|
|
|
|
# 항상 크롤링 허용 (데이터 변경 여부는 should_save에서 판단)
|
|
return True
|
|
|
|
def should_save(self, site_id: str, current_data: dict) -> bool:
|
|
"""
|
|
수집한 데이터를 DB에 저장해야 하는지 판단.
|
|
|
|
원격 서버의 데이터가 이전 수집 시점과 달라졌을 때만 True 반환.
|
|
이를 통해 중복 저장을 방지하고 NAS I/O를 절약.
|
|
|
|
[저장 조건]
|
|
- today_kwh(금일 발전량)가 증가했을 때: 반드시 저장 (핵심 지표)
|
|
- kw(현재 출력)가 변했을 때: 저장 (실시간 상태 반영)
|
|
- 마지막 저장 후 1시간 이상 경과했을 때: 강제 저장 (heartbeat)
|
|
→ 데이터가 정체돼도 최소 1시간에 1번은 기록 보장
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
current_data: {'kw': float, 'today': float}
|
|
|
|
Returns:
|
|
bool: DB에 저장해야 하면 True
|
|
"""
|
|
new_kw = float(current_data.get('kw', 0))
|
|
new_today = float(current_data.get('today', 0))
|
|
now = datetime.now()
|
|
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
# 이전 데이터 조회
|
|
cursor.execute(
|
|
"SELECT kw, today_kwh, updated_at FROM site_data WHERE site_id = ?",
|
|
(site_id,)
|
|
)
|
|
row = cursor.fetchone()
|
|
|
|
should_save = False
|
|
|
|
if not row:
|
|
# 첫 수집 → 반드시 저장
|
|
should_save = True
|
|
else:
|
|
last_kw, last_today, last_updated_at = row
|
|
|
|
# 1. 금일 발전량이 증가했으면 저장
|
|
if new_today - last_today > 0.001:
|
|
should_save = True
|
|
|
|
# 2. 현재 출력(kW)이 변했으면 저장
|
|
elif abs(new_kw - last_kw) > 0.001:
|
|
should_save = True
|
|
|
|
# 3. 1시간 이상 저장 없었으면 강제 heartbeat 저장
|
|
elif last_updated_at:
|
|
try:
|
|
last_dt = datetime.fromisoformat(last_updated_at)
|
|
if now - last_dt >= timedelta(hours=1):
|
|
should_save = True
|
|
except (ValueError, TypeError):
|
|
should_save = True
|
|
|
|
if should_save:
|
|
# 현재 상태를 캐시에 업데이트
|
|
cursor.execute("""
|
|
INSERT INTO site_data (site_id, kw, today_kwh, updated_at)
|
|
VALUES (?, ?, ?, ?)
|
|
ON CONFLICT(site_id) DO UPDATE SET
|
|
kw = excluded.kw,
|
|
today_kwh = excluded.today_kwh,
|
|
updated_at = excluded.updated_at
|
|
""", (site_id, new_kw, new_today, now.isoformat()))
|
|
conn.commit()
|
|
|
|
return should_save
|
|
|
|
def check_data_change(self, site_id: str, current_data: dict) -> bool:
|
|
"""
|
|
[하위 호환용] should_save의 별칭.
|
|
기존 main.py 코드와의 호환성을 위해 유지.
|
|
내부적으로 should_save를 호출하며, 패턴 분석도 함께 수행.
|
|
"""
|
|
return self.should_save(site_id, current_data)
|
|
|
|
def analyze_and_optimize(self, site_id: str):
|
|
"""
|
|
업데이트 패턴 분석 및 기록 (모니터링/로깅 목적).
|
|
데이터 변경이 감지되었을 때 호출하여 원격 서버의 업데이트 패턴을 학습.
|
|
이 정보는 현재 크롤링 스케줄 제어에는 사용하지 않으며,
|
|
향후 분석이나 시각화를 위한 참고 데이터로만 활용.
|
|
"""
|
|
now = datetime.now()
|
|
current_minute = now.minute
|
|
|
|
# 히스토리 기록
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
INSERT INTO update_history (site_id, detected_minute, detected_at)
|
|
VALUES (?, ?, ?)
|
|
""", (site_id, current_minute, now.isoformat()))
|
|
|
|
# 최근 기록 조회 (최대 5개)
|
|
cursor.execute("""
|
|
SELECT detected_minute
|
|
FROM update_history
|
|
WHERE site_id = ?
|
|
ORDER BY id DESC
|
|
LIMIT 5
|
|
""", (site_id,))
|
|
|
|
minutes = [r[0] for r in cursor.fetchall()]
|
|
conn.commit()
|
|
|
|
# 패턴 분석 (최소 3회 이상 데이터 필요)
|
|
if len(minutes) < 3:
|
|
return
|
|
|
|
recent = minutes[:3]
|
|
avg = sum(recent) / len(recent)
|
|
|
|
# 최대 편차가 5분 이내면 패턴 안정 (참고 정보로만 기록)
|
|
is_consistent = all(abs(m - avg) <= 5 for m in recent)
|
|
|
|
if is_consistent:
|
|
target = int(avg)
|
|
# 스케줄 제어에는 사용하지 않지만, 상태 기록은 유지 (모니터링용)
|
|
self._record_pattern(site_id, target)
|
|
else:
|
|
print(f" 📊 [CrawlerManager] '{site_id}' 패턴 분석 중... 최근: {recent}")
|
|
|
|
def _record_pattern(self, site_id: str, detected_minute: int):
|
|
"""
|
|
감지된 업데이트 패턴을 DB에 기록 (모니터링용).
|
|
크롤링 스케줄 제어에는 영향을 주지 않음.
|
|
"""
|
|
if not 0 <= detected_minute <= 59:
|
|
return
|
|
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("SELECT status, target_minute FROM site_rules WHERE site_id = ?", (site_id,))
|
|
row = cursor.fetchone()
|
|
if row and row[0] == 'OPTIMIZED' and abs(row[1] - detected_minute) <= 2:
|
|
return # 이미 동일한 패턴 기록됨
|
|
|
|
cursor.execute("""
|
|
UPDATE site_rules
|
|
SET status = 'OPTIMIZED', target_minute = ?
|
|
WHERE site_id = ?
|
|
""", (detected_minute, site_id))
|
|
conn.commit()
|
|
|
|
if cursor.rowcount > 0:
|
|
print(f" 📌 [CrawlerManager] '{site_id}' 업데이트 패턴 감지: 매시 {detected_minute}분 경 (참고용)")
|
|
|
|
def update_optimization(self, site_id: str, detected_minute: int) -> bool:
|
|
"""
|
|
[하위 호환용] 패턴 기록 메서드.
|
|
내부적으로 _record_pattern을 호출.
|
|
"""
|
|
self._record_pattern(site_id, detected_minute)
|
|
return True
|
|
|
|
def record_run(self, site_id: str):
|
|
"""
|
|
크롤링 성공 시 마지막 실행 시간 기록
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
"""
|
|
now_str = datetime.now().isoformat()
|
|
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE site_rules
|
|
SET last_run = ?
|
|
WHERE site_id = ?
|
|
""", (now_str, site_id))
|
|
conn.commit()
|
|
|
|
def get_site_info(self, site_id: str) -> dict:
|
|
"""
|
|
사이트 정보 조회 (디버깅/모니터링용)
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
|
|
Returns:
|
|
dict: 사이트 정보 또는 None
|
|
"""
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT site_id, status, target_minute, start_date, last_run
|
|
FROM site_rules
|
|
WHERE site_id = ?
|
|
""", (site_id,))
|
|
row = cursor.fetchone()
|
|
|
|
if row:
|
|
return {
|
|
"site_id": row[0],
|
|
"status": row[1],
|
|
"target_minute": row[2],
|
|
"start_date": row[3],
|
|
"last_run": row[4]
|
|
}
|
|
return None
|
|
|
|
def get_all_sites(self) -> list:
|
|
"""
|
|
모든 사이트 정보 조회
|
|
|
|
Returns:
|
|
list: 모든 사이트 정보 리스트
|
|
"""
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
SELECT site_id, status, target_minute, start_date, last_run
|
|
FROM site_rules
|
|
ORDER BY site_id
|
|
""")
|
|
rows = cursor.fetchall()
|
|
|
|
return [
|
|
{
|
|
"site_id": row[0],
|
|
"status": row[1],
|
|
"target_minute": row[2],
|
|
"start_date": row[3],
|
|
"last_run": row[4]
|
|
}
|
|
for row in rows
|
|
]
|
|
|
|
def reset_to_learning(self, site_id: str) -> bool:
|
|
"""
|
|
사이트를 다시 LEARNING 상태로 리셋
|
|
|
|
Args:
|
|
site_id: 사이트 식별자
|
|
|
|
Returns:
|
|
bool: 리셋 성공 여부
|
|
"""
|
|
with self._get_connection() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute("""
|
|
UPDATE site_rules
|
|
SET status = 'LEARNING', target_minute = -1
|
|
WHERE site_id = ?
|
|
""", (site_id,))
|
|
conn.commit()
|
|
|
|
return cursor.rowcount > 0
|
|
|
|
|
|
# ==========================================
|
|
# main.py 연동 방식 (변경 없음 - 하위 호환 유지)
|
|
# ==========================================
|
|
#
|
|
# main.py에서의 사용 흐름:
|
|
#
|
|
# 1. should_run(site_id)
|
|
# → 야간이면 False (크롤링 자체 스킵)
|
|
# → 그 외에는 항상 True (항상 HTTP 요청)
|
|
#
|
|
# 2. 크롤링(HTTP 요청) 실행
|
|
#
|
|
# 3. record_run(item_id) ← 크롤링 성공 기록
|
|
#
|
|
# 4. check_data_change(item_id, item) ← should_save와 동일
|
|
# → True: 데이터 변경됨 → DB 저장 진행
|
|
# → False: 변경 없음 → DB 저장 스킵
|
|
#
|
|
# 5. analyze_and_optimize(item_id) ← 패턴 학습 (선택적)
|
|
#
|
|
# ==========================================
|
|
# Cron 설정 (10분마다 실행 권장)
|
|
# ==========================================
|
|
# */10 * * * * cd /volume1/dev/SolorPower/crawler && \
|
|
# /volume1/dev/SolorPower/crawler/venv/bin/python main.py >> cron.log 2>&1
|
|
# ==========================================
|
|
|
|
|
|
if __name__ == "__main__":
|
|
manager = CrawlerManager()
|
|
|
|
print("=== CrawlerManager 테스트 ===\n")
|
|
|
|
test_sites = ["nrems-01", "nrems-02", "kremc-05"]
|
|
for site_id in test_sites:
|
|
manager.register_site(site_id)
|
|
|
|
print("\n[등록된 사이트]")
|
|
for site in manager.get_all_sites():
|
|
print(f" {site['site_id']}: {site['status']} (target: {site['target_minute']}분)")
|
|
|
|
print("\n[should_run 테스트]")
|
|
for site_id in test_sites:
|
|
result = manager.should_run(site_id)
|
|
print(f" {site_id}: {'✅ 실행' if result else '⏭️ 스킵 (야간)'}")
|
|
|
|
print("\n[should_save 테스트]")
|
|
test_data = {'kw': 15.5, 'today': 120.0}
|
|
for site_id in test_sites:
|
|
result = manager.should_save(site_id, test_data)
|
|
print(f" {site_id}: {'✅ 저장' if result else '⏭️ 스킵 (변경 없음)'}")
|
|
|
|
print("\n=== 테스트 완료 ===")
|