solorpower_crawler/main.py

159 lines
5.6 KiB
Python

# ==========================================
# main.py - 태양광 발전 통합 관제 시스템
# ==========================================
import re
from datetime import datetime
# 환경 변수 로드 (최상단에서 실행)
try:
from dotenv import load_dotenv
load_dotenv()
print("✅ 환경 변수 로드 완료")
except ImportError:
print("⚠️ python-dotenv가 설치되지 않았습니다. 환경 변수를 직접 설정하세요.")
from config import get_all_plants
from database import save_to_supabase, save_to_console
from crawlers import get_crawler
from crawler_manager import CrawlerManager
# 스마트 스케줄러 초기화
crawler_manager = CrawlerManager()
def extract_unit_number(name):
"""발전소 이름에서 호기 번호 추출 (정렬용)"""
match = re.search(r'(\d+)호기', name)
if match:
return int(match.group(1))
return 999
def integrated_monitoring(save_to_db=True, company_filter=None, force_run=False):
"""
통합 모니터링 실행
Args:
save_to_db: True면 Supabase에 저장
company_filter: 특정 업체만 필터링 (예: 'sunwind')
force_run: True면 스케줄러 무시하고 강제 실행
"""
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"\n🚀 [통합 관제 시스템] 데이터 수집 시작... ({now_str})")
print("-" * 75)
# 평탄화된 발전소 목록 가져오기
all_plants = get_all_plants()
# 업체 필터링 (옵션)
if company_filter:
all_plants = [p for p in all_plants if p['company_id'] == company_filter]
print(f"📌 필터 적용: {company_filter}")
total_results = []
skipped_count = 0
for plant in all_plants:
plant_type = plant['type']
plant_name = plant.get('display_name', plant.get('name', 'Unknown'))
company_id = plant.get('company_id', '')
company_name = plant.get('company_name', '')
# 크롤링 결과에서 생성되는 site_id 목록 (1,2호기 분리 처리 고려)
is_split = plant.get('options', {}).get('is_split', False)
if is_split:
site_ids = ['nrems-01', 'nrems-02']
else:
site_ids = [plant.get('id', '')]
# 스마트 스케줄러 확인 (force_run이 아닌 경우)
if not force_run:
# 모든 site_id에 대해 should_run 확인 (하나라도 실행해야 하면 실행)
should_run_any = False
for site_id in site_ids:
if site_id:
crawler_manager.register_site(site_id)
if crawler_manager.should_run(site_id):
should_run_any = True
break
if not should_run_any:
print(f" ⏭️ [{plant_type.upper()}] {plant_name} 스킵 (스케줄 외)")
skipped_count += 1
continue
print(f"📡 [{plant_type.upper()}] {company_name} - {plant_name} 수집 중...")
try:
crawler_func = get_crawler(plant_type)
if crawler_func:
data = crawler_func(plant)
if data:
# company_id, company_name 주입
for item in data:
item['company_id'] = company_id
item['company_name'] = company_name
# 크롤링 성공 시 실행 기록
item_id = item.get('id', '')
if item_id:
crawler_manager.record_run(item_id)
total_results.extend(data)
else:
print(f" ⚠️ 알 수 없는 크롤러 타입: {plant_type}")
except Exception as e:
print(f"{plant_name} 실패: {e}")
# 정렬 (호기 번호 순)
total_results.sort(key=lambda x: extract_unit_number(x['name']))
# 중복 제거 (company_id + id 조합)
seen_keys = set()
unique_results = []
for item in total_results:
unique_key = f"{item.get('company_id', '')}_{item.get('id', '')}"
if unique_key not in seen_keys:
seen_keys.add(unique_key)
unique_results.append(item)
total_results = unique_results
print("-" * 75)
if skipped_count > 0:
print(f"📊 스킵된 사이트: {skipped_count}개 (스케줄 외)")
if total_results:
# 콘솔 출력
save_to_console(total_results)
# DB 저장
if save_to_db:
save_to_supabase(total_results)
# 이상 감지 로직
current_hour = datetime.now().hour
if 10 <= current_hour <= 17:
issues = [d['name'] for d in total_results if d.get('kw', 0) == 0]
if issues:
print("\n🚨 [이상 감지 리포트]")
for name in issues:
print(f" ⚠️ 경고: '{name}' 발전량이 0입니다! 확인 필요.")
else:
print("\n ✅ 현재 모든 발전소가 정상 가동 중입니다.")
else:
print("❌ 수집된 데이터가 없습니다.")
return total_results
if __name__ == "__main__":
import sys
# 인자 처리: --force 옵션으로 스케줄러 무시
force_run = '--force' in sys.argv or '-f' in sys.argv
if force_run:
print("⚡ [강제 실행 모드] 스케줄러 무시하고 모든 사이트 크롤링")
integrated_monitoring(save_to_db=True, force_run=force_run)