Files

444 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
from bs4 import BeautifulSoup
import time
import sys
import json
import sqlite3
import argparse
import glob
import html
from datetime import datetime
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser
import threading
import queue
import os
if os.name == 'nt': os.system('color')
GOOGLEBOT_UA = "Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"
class TelegramNotifier:
def __init__(self, token, chat_id_info, chat_id_errors):
self.token = token
self.chat_id_info = chat_id_info
self.chat_id_errors = chat_id_errors or chat_id_info
self.enabled = True if token and chat_id_info else False
self.critical_errors = []
self.schema_errors = []
self.translation_issues = []
self.lock = threading.Lock()
def send(self, message, target='info'):
if not self.enabled: return
cid = self.chat_id_info if target == 'info' else self.chat_id_errors
url = f"https://api.telegram.org/bot{self.token}/sendMessage"
try:
r = requests.post(url, json={"chat_id": cid, "text": message, "parse_mode": "HTML"}, timeout=15)
if r.status_code != 200:
print(f"\n[!] Telegram Error ({target}): {r.text}")
except Exception as e:
print(f"\n[!] Connection Error (Telegram): {e}")
def add_critical(self, url, msg):
with self.lock:
if len(self.critical_errors) < 30: self.critical_errors.append((url, msg))
def add_schema(self, url, count):
with self.lock:
if len(self.schema_errors) < 15: self.schema_errors.append((url, count))
def add_translation_issue(self, sku, lang1, lang2, field):
with self.lock:
if len(self.translation_issues) < 15:
self.translation_issues.append(f"SKU {sku}: {field} identyczny w {lang1} i {lang2}")
def get_prev_404_count(self, current_db):
# Szukamy baz danych w podkatalogu scans, sortujemy po czasie modyfikacji
dbs = glob.glob("scans/crawler_v*.db")
dbs.sort(key=os.path.getmtime, reverse=True)
prev_db = None
for d in dbs:
if os.path.basename(d) != os.path.basename(current_db):
prev_db = d
break
if not prev_db: return None
try:
conn = sqlite3.connect(prev_db)
count = conn.execute("SELECT COUNT(*) FROM pages WHERE status = 404").fetchone()[0]
conn.close()
return count
except: return None
def send_final_report(self, start_url, total, errors, db_file, search_results=-1):
if not self.enabled:
print("\n[!] Powiadomienia Telegram są wyłączone (brak konfiguracji).")
return
# Analiza 404 i innych błędów
current_404 = 0
schema_errs = 0
transl_errs = 0
try:
conn = sqlite3.connect(db_file)
current_404 = conn.execute("SELECT COUNT(*) FROM pages WHERE status = 404").fetchone()[0]
schema_errs = conn.execute("SELECT COUNT(*) FROM pages WHERE schema_critical > 0").fetchone()[0]
transl_errs = conn.execute("SELECT COUNT(*) FROM translation_audit").fetchone()[0]
conn.close()
except: pass
prev_404 = self.get_prev_404_count(db_file)
regression_str = ""
if prev_404 is not None:
diff = current_404 - prev_404
if diff > 0: regression_str = f" (<b>+{diff} NOWE!</b> ⚠️)"
elif diff < 0: regression_str = f" ({diff} naprawione)"
else: regression_str = " (bez zmian)"
# 1. RAPORT INFO
domain = html.escape(urlparse(start_url).netloc)
total_icon = ""
http_icon = "" if errors == 0 else ""
err404_icon = "" if current_404 == 0 else ""
schema_icon = "" if schema_errs == 0 else ""
transl_icon = "" if transl_errs == 0 else ""
search_icon = "" if search_results > 0 else ""
schema_text = "poprawne" if schema_errs == 0 else f"{schema_errs} błędów"
transl_text = "poprawne" if transl_errs == 0 else f"{transl_errs} błędów"
search_text = f"{search_results}" if search_results >= 0 else "BŁĄD"
info_msg = f"🏁 <b>AUDYT ZAKOŃCZONY: {domain}</b>\n\n"
info_msg += f"{total_icon} Przeskanowano: {total}\n"
info_msg += f"{http_icon} Błędy HTTP: {errors}\n"
info_msg += f"{err404_icon} Błędy 404: {current_404}{regression_str}\n"
info_msg += f"{schema_icon} Dane strukturalne: {schema_text}\n"
info_msg += f"{transl_icon} Tłumaczenia: {transl_text}\n"
info_msg += f"{search_icon} Wyszukiwarka: {search_text}\n\n"
if self.critical_errors or self.schema_errors or self.translation_issues:
info_msg += f"🚨 Wykryto błędy. Szczegóły na kanale ERRORS."
else:
info_msg += f"✅ Brak krytycznych błędów."
self.send(info_msg, target='info')
# 2. RAPORT ERRORS
if self.critical_errors or self.schema_errors or self.translation_issues:
err_msg = f"🚨 <b>BŁĘDY: {domain}</b>\n\n"
prompt_text = (
"Twoim zadaniem jest weryfikacja poniższych błędów na sklepie i przygotowanie planu naprawy. "
"Znasz strukturę plików, masz dostęp do bazy danych sklepu oraz wiesz jak działają wszystkie mechanizmy cache. "
"WAŻNE: Podane błędy zostały wykryte przez crawler, który analizował wyrenderowany kod HTML stron sklepu. "
"Crawler nie sprawdzał bazy danych może być tak, że w bazie dane są w pełni poprawne, "
"a problem leży po stronie modułów (np. wstrzykujących dane strukturalne do źródła strony).\n"
"Oto zestawienie błędów do przeanalizowania:\n\n"
)
if self.critical_errors:
err_msg += f"❌ <b>KRYTYCZNE:</b>\n"
prompt_text += "BŁĘDY KRYTYCZNE:\n"
for url, err in self.critical_errors[:15]:
safe_url = html.escape(url)
err_msg += f"{err}: {safe_url}\n"
prompt_text += f"- {err}: {url}\n"
err_msg += "\n"
prompt_text += "\n"
if self.schema_errors:
err_msg += f"🛠 <b>SCHEMA.ORG:</b>\n"
prompt_text += "BŁĘDY SCHEMA.ORG:\n"
for url, count in self.schema_errors[:10]:
safe_url = html.escape(url)
err_msg += f"• Brak {count} pól: {safe_url}\n"
prompt_text += f"- Brak {count} pól: {url}\n"
err_msg += "\n"
prompt_text += "\n"
if self.translation_issues:
err_msg += f"🌐 <b>TŁUMACZENIA:</b>\n"
prompt_text += "BŁĘDY TŁUMACZEŃ:\n"
for issue in self.translation_issues[:10]:
err_msg += f"{html.escape(issue)}\n"
prompt_text += f"- {issue}\n"
err_msg += "\n"
prompt_text += "\n"
err_msg += f"🤖 <b>Gotowy prompt dla Agenta AI:</b>\n"
err_msg += f"<pre><code class=\"language-text\">{html.escape(prompt_text.strip())}</code></pre>"
self.send(err_msg, target='errors')
def crawler(start_url, db_file, max_threads, tg_notifier):
parsed_start = urlparse(start_url)
base_url = f"{parsed_start.scheme}://{parsed_start.netloc}"
base_domain = parsed_start.netloc
conn = sqlite3.connect(db_file, check_same_thread=False)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS pages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE, source_url TEXT, status INTEGER,
total_time REAL, ttfb REAL, google_access TEXT, index_status TEXT,
schema_critical INTEGER DEFAULT 0, schema_warnings INTEGER DEFAULT 0,
images_no_alt INTEGER DEFAULT 0, images_no_webp INTEGER DEFAULT 0,
title TEXT, meta_desc TEXT, canonical TEXT,
lang TEXT, timestamp DATETIME)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS structured_data (id INTEGER PRIMARY KEY AUTOINCREMENT, page_id INTEGER, schema_type TEXT, full_json TEXT, sku TEXT, FOREIGN KEY(page_id) REFERENCES pages(id))''')
cursor.execute('''CREATE TABLE IF NOT EXISTS translation_audit (id INTEGER PRIMARY KEY AUTOINCREMENT, sku TEXT, field TEXT, lang1 TEXT, lang2 TEXT, content TEXT)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS images_audit (id INTEGER PRIMARY KEY AUTOINCREMENT, page_id INTEGER, img_url TEXT, alt TEXT, is_modern INTEGER, has_modern_source INTEGER, FOREIGN KEY(page_id) REFERENCES pages(id))''')
conn.commit()
db_queue = queue.Queue()
def db_worker():
db_conn = sqlite3.connect(db_file)
db_cursor = db_conn.cursor()
while True:
item = db_queue.get()
if item is None: break
try:
p = item['page']
db_cursor.execute('''INSERT OR REPLACE INTO pages (url, source_url, status, total_time, ttfb, google_access, index_status, schema_critical, schema_warnings, images_no_alt, images_no_webp, title, meta_desc, canonical, lang, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', (p['url'], p['source'], p['status'], p['time'], p['ttfb'], p['access'], p['idx'], p['s_crit'], p['s_warn'], p.get('images_no_alt',0), p.get('images_no_webp',0), p.get('title',''), p.get('meta_desc',''), p.get('canonical',''), p['lang'], p['ts']))
page_id = db_cursor.lastrowid
for s in item['schemas']: db_cursor.execute('INSERT INTO structured_data (page_id, schema_type, full_json, sku) VALUES (?, ?, ?, ?)', (page_id, s['type'], s['json'], s.get('sku')))
if 'images' in item:
for img in item['images']: db_cursor.execute('INSERT INTO images_audit (page_id, img_url, alt, is_modern, has_modern_source) VALUES (?, ?, ?, ?, ?)', (page_id, img['img_url'], img['alt'], img['is_modern'], img['has_modern_source']))
db_conn.commit()
except: pass
finally: db_queue.task_done()
db_conn.close()
db_thread = threading.Thread(target=db_worker)
db_thread.start()
rp = RobotFileParser()
try: rp.set_url(urljoin(base_url, "robots.txt")); rp.read()
except: pass
visited, crawled_count, error_count = {start_url}, 0, 0
total_response_time = 0.0
visited_lock, stats_lock = threading.Lock(), threading.Lock()
url_queue = queue.Queue()
url_queue.put((start_url, "Start"))
stop_event = threading.Event()
session = requests.Session()
session.headers.update({'User-Agent': GOOGLEBOT_UA})
def analyze_schema(soup):
scripts = soup.find_all('script', type='application/ld+json')
results, crit, warn = [], 0, 0
def get_val(obj, path):
curr = obj
for p in path.split('.'):
if isinstance(curr, dict) and p in curr: curr = curr[p]
else: return None
return curr
for script in scripts:
try:
data = json.loads(script.string)
objs = data if isinstance(data, list) else [data]
for obj in objs:
if not isinstance(obj, dict): continue
sku = get_val(obj, 'sku') or get_val(obj, 'mpn')
if 'Product' in str(obj.get('@type', '')):
if not get_val(obj, 'name') or not get_val(obj, 'image') or not get_val(obj, 'offers.price'): crit += 1
results.append({'type': str(obj.get('@type', 'Unknown')), 'json': json.dumps(obj, ensure_ascii=False), 'sku': str(sku) if sku else None})
except: pass
return results, crit, warn
def analyze_images(soup, url):
images_data = []
no_alt, no_webp = 0, 0
for img in soup.find_all('img'):
src = img.get('src') or img.get('data-src') or ''
if not src or src.startswith('data:image'): continue
alt = img.get('alt', '').strip() if img.get('alt') is not None else ''
alt_text = alt if alt else '[BRAK]'
is_modern = src.lower().endswith(('.webp', '.avif', '.svg'))
parent = img.find_parent('picture')
has_modern_source = False
if parent:
for source in parent.find_all('source'):
srcs = source.get('srcset', '')
typ = source.get('type', '')
if 'webp' in srcs.lower() or 'avif' in srcs.lower() or 'webp' in typ or 'avif' in typ:
has_modern_source = True
break
if not has_modern_source:
srcset = img.get('srcset', '')
if 'webp' in srcset.lower() or 'avif' in srcset.lower():
has_modern_source = True
images_data.append({
'img_url': urljoin(url, src), 'alt': alt_text,
'is_modern': int(is_modern), 'has_modern_source': int(has_modern_source)
})
if alt_text == '[BRAK]': no_alt += 1
if not is_modern and not has_modern_source: no_webp += 1
return images_data, no_alt, no_webp
def process_url(url, source):
nonlocal crawled_count, error_count, total_response_time
if not rp.can_fetch("Googlebot", url):
tg_notifier.add_critical(url, "ROBOTS.TXT BLOCK")
db_queue.put({'page': {'url': url, 'source': source, 'status': 0, 'time': 0, 'ttfb': 0, 'access': 'Blocked', 'idx': '-', 's_crit': 0, 's_warn': 0, 'images_no_alt': 0, 'images_no_webp': 0, 'title': '', 'meta_desc': '', 'canonical': '', 'lang': '?', 'ts': datetime.now().isoformat()}, 'schemas': [], 'images': []})
return
try:
start_t = time.time()
resp = session.get(url, timeout=10, stream=True)
ttfb = round(time.time() - start_t, 4)
soup = BeautifulSoup(resp.text, 'lxml')
total_t = round(time.time() - start_t, 4)
lang = soup.find('html').get('lang', 'unknown') if soup.find('html') else 'unknown'
schemas, s_crit, s_warn = analyze_schema(soup)
idx = "Indexable"
if 'noindex' in resp.headers.get('X-Robots-Tag', '').lower(): idx = "Noindex"
elif soup.find('meta', attrs={'name': ['robots', 'googlebot'], 'content': lambda x: x and 'noindex' in x.lower()}): idx = "Noindex"
title_tag = soup.find('title')
title = title_tag.text.strip() if title_tag else ''
meta_desc_tag = soup.find('meta', attrs={'name': 'description'})
meta_desc = meta_desc_tag.get('content', '').strip() if meta_desc_tag else ''
canonical_tag = soup.find('link', rel='canonical')
canonical = canonical_tag.get('href', '').strip() if canonical_tag else ''
images_data, no_alt, no_webp = analyze_images(soup, url)
if resp.status_code >= 500: tg_notifier.add_critical(url, f"ERR {resp.status_code}")
if resp.status_code == 404: tg_notifier.add_critical(url, "404")
if s_crit > 0: tg_notifier.add_schema(url, s_crit)
db_queue.put({'page': {'url': url, 'source': source, 'status': resp.status_code, 'time': total_t, 'ttfb': ttfb, 'access': 'Allowed', 'idx': idx, 's_crit': s_crit, 's_warn': s_warn, 'images_no_alt': no_alt, 'images_no_webp': no_webp, 'title': title, 'meta_desc': meta_desc, 'canonical': canonical, 'lang': lang, 'ts': datetime.now().isoformat()}, 'schemas': schemas, 'images': images_data})
with stats_lock:
crawled_count += 1
total_response_time += total_t
if resp.status_code != 200: error_count += 1
if resp.status_code == 200 and 'text/html' in resp.headers.get('Content-Type', ''):
for link in soup.find_all('a', href=True):
full = urljoin(url, link['href'])
parsed = urlparse(full)
if parsed.netloc == base_domain:
clean = parsed._replace(query='', fragment='').geturl()
with visited_lock:
if clean not in visited: visited.add(clean); url_queue.put((clean, url))
except:
with stats_lock: error_count += 1
def worker():
while not stop_event.is_set():
try: u, s = url_queue.get(timeout=0.5); process_url(u, s); url_queue.task_done()
except queue.Empty: continue
threads = [threading.Thread(target=worker, daemon=True) for _ in range(max_threads)]
for t in threads: t.start()
try:
while not stop_event.is_set() and url_queue.unfinished_tasks > 0:
with stats_lock:
cc = crawled_count
err = error_count
avg = round(total_response_time / cc, 3) if cc > 0 else 0
q_size = url_queue.unfinished_tasks
print(f"\r[AUDYT] Skanowanie: {cc} | Błędy: {err} | Kolejka: {q_size} | Średni czas: {avg}s ", end="")
time.sleep(0.5)
except KeyboardInterrupt:
print("\n[!] Przerwano (Ctrl+C). Trwa bezpieczne zamykanie...")
stop_event.set()
while not url_queue.empty():
try: url_queue.get_nowait(); url_queue.task_done()
except queue.Empty: break
if not stop_event.is_set():
url_queue.join()
else:
# Czekamy krótką chwilę, by wątki url_queue zdążyły zobaczyć stop_event
time.sleep(1)
print("\n[*] Zapisywanie bazy danych, proszę czekać...")
db_queue.put(None)
db_thread.join()
# AUDYT WIELOJĘZYCZNOŚCI
cursor.execute('SELECT sku, lang, full_json FROM structured_data JOIN pages ON structured_data.page_id = pages.id WHERE sku IS NOT NULL AND sku != "None"')
sku_map = {}
for sku, lang, fjson in cursor.fetchall():
if sku not in sku_map: sku_map[sku] = {}
data = json.loads(fjson)
sku_map[sku][lang] = {'name': data.get('name', ''), 'description': data.get('description', '')}
for sku, langs in sku_map.items():
lang_list = list(langs.keys())
if len(lang_list) > 1:
for i in range(len(lang_list)):
for j in range(i + 1, len(lang_list)):
l1, l2 = lang_list[i], lang_list[j]
if langs[l1]['name'] == langs[l2]['name']:
cursor.execute('INSERT INTO translation_audit (sku, field, lang1, lang2, content) VALUES (?, ?, ?, ?, ?)', (sku, 'name', l1, l2, langs[l1]['name']))
tg_notifier.add_translation_issue(sku, l1, l2, 'name')
conn.commit(); conn.close()
# TEST WYSZUKIWARKI
search_count = -1
try:
print("\n[*] Przeprowadzanie testu wyszukiwarki (szukana fraza: karuzela)...")
search_url = "https://fluo.dog/szukaj?controller=search&s=karuzela"
resp_search = session.get(search_url, timeout=15)
if resp_search.status_code == 200:
soup_search = BeautifulSoup(resp_search.text, 'lxml')
products = soup_search.find_all('article', class_='product-miniature')
search_count = len(products)
if search_count == 0:
tg_notifier.add_critical(search_url, "TEST WYSZUKIWARKI: Brak wyników (0) dla 'karuzela'!")
print("[!] Test wyszukiwarki NIEPOWODZENIE: 0 wyników.")
else:
print(f"[*] Test wyszukiwarki OK: znaleziono {search_count} produktów.")
else:
tg_notifier.add_critical(search_url, f"TEST WYSZUKIWARKI: Błąd HTTP {resp_search.status_code}")
print(f"[!] Test wyszukiwarki BŁĄD HTTP: {resp_search.status_code}")
except Exception as e:
tg_notifier.add_critical("https://fluo.dog/szukaj?controller=search&s=karuzela", f"TEST WYSZUKIWARKI: Błąd połączenia")
print(f"[!] Test wyszukiwarki BŁĄD: {e}")
tg_notifier.send_final_report(start_url, crawled_count, error_count, db_file, search_results=search_count)
if __name__ == "__main__":
def load_config():
try:
# Wymuszamy utf-8, żeby uniknąć problemów z kodowaniem Windows
with open("config.json", "r", encoding="utf-8") as f:
raw_config = json.load(f)
# Czyścimy klucze ze spacji na wszelki wypadek
return {k.strip(): v for k, v in raw_config.items()}
except Exception as e:
print(f"[!] Błąd wczytywania config.json: {e}")
return {}
parser = argparse.ArgumentParser(description="Crawler SEO - Podgląd błędów i audyt.")
parser.add_argument("--url", default="https://fluo.dog", help="Startowy URL")
parser.add_argument("--threads", type=int, default=10, help="Liczba wątków")
args = parser.parse_args()
# Upewnij się, że katalog scans istnieje
if not os.path.exists("scans"):
os.makedirs("scans")
config = load_config()
# Debug: wypiszmy jakie klucze faktycznie widzi Python
available_keys = ", ".join(config.keys())
print(f"[*] Wczytane klucze z config: {available_keys}")
token = config.get("telegram_token")
id_info = config.get("telegram_chat_id_info")
id_err = config.get("telegram_chat_id_errors")
print(f"[*] Konfiguracja: INFO_ID={id_info}, ERRORS_ID={id_err}")
notifier = TelegramNotifier(token, id_info, id_err)
if notifier.enabled:
print("[*] Telegram powiadomienia: WŁĄCZONE")
notifier.send(f"🚀 <b>Rozpoczynam audyt SEO</b> dla: {html.escape(args.url)}", target='info')
else:
print("[!] Telegram powiadomienia: WYŁĄCZONE (sprawdź czy klucze w config.json są poprawne)")
db_name = f"scans/crawler_v18_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
crawler(args.url, db_name, args.threads, notifier)