import requests from bs4 import BeautifulSoup import time import sys import json import sqlite3 import argparse import glob import html from datetime import datetime from urllib.parse import urljoin, urlparse from urllib.robotparser import RobotFileParser import threading import queue import os if os.name == 'nt': os.system('color') GOOGLEBOT_UA = "Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" class TelegramNotifier: def __init__(self, token, chat_id_info, chat_id_errors): self.token = token self.chat_id_info = chat_id_info self.chat_id_errors = chat_id_errors or chat_id_info self.enabled = True if token and chat_id_info else False self.critical_errors = [] self.schema_errors = [] self.translation_issues = [] self.lock = threading.Lock() def send(self, message, target='info'): if not self.enabled: return cid = self.chat_id_info if target == 'info' else self.chat_id_errors url = f"https://api.telegram.org/bot{self.token}/sendMessage" try: r = requests.post(url, json={"chat_id": cid, "text": message, "parse_mode": "HTML"}, timeout=15) if r.status_code != 200: print(f"\n[!] Telegram Error ({target}): {r.text}") except Exception as e: print(f"\n[!] Connection Error (Telegram): {e}") def add_critical(self, url, msg): with self.lock: if len(self.critical_errors) < 30: self.critical_errors.append((url, msg)) def add_schema(self, url, count): with self.lock: if len(self.schema_errors) < 15: self.schema_errors.append((url, count)) def add_translation_issue(self, sku, lang1, lang2, field): with self.lock: if len(self.translation_issues) < 15: self.translation_issues.append(f"SKU {sku}: {field} identyczny w {lang1} i {lang2}") def get_prev_404_count(self, current_db): # Szukamy baz danych w podkatalogu scans, sortujemy po czasie modyfikacji dbs = glob.glob("scans/crawler_v*.db") dbs.sort(key=os.path.getmtime, reverse=True) prev_db = None for d in dbs: if os.path.basename(d) != os.path.basename(current_db): prev_db = d break if not prev_db: return None try: conn = sqlite3.connect(prev_db) count = conn.execute("SELECT COUNT(*) FROM pages WHERE status = 404").fetchone()[0] conn.close() return count except: return None def send_final_report(self, start_url, total, errors, db_file, search_results=-1): if not self.enabled: print("\n[!] Powiadomienia Telegram są wyłączone (brak konfiguracji).") return # Analiza 404 i innych błędów current_404 = 0 schema_errs = 0 transl_errs = 0 try: conn = sqlite3.connect(db_file) current_404 = conn.execute("SELECT COUNT(*) FROM pages WHERE status = 404").fetchone()[0] schema_errs = conn.execute("SELECT COUNT(*) FROM pages WHERE schema_critical > 0").fetchone()[0] transl_errs = conn.execute("SELECT COUNT(*) FROM translation_audit").fetchone()[0] conn.close() except: pass prev_404 = self.get_prev_404_count(db_file) regression_str = "" if prev_404 is not None: diff = current_404 - prev_404 if diff > 0: regression_str = f" (+{diff} NOWE! ⚠️)" elif diff < 0: regression_str = f" ({diff} naprawione)" else: regression_str = " (bez zmian)" # 1. RAPORT INFO domain = html.escape(urlparse(start_url).netloc) total_icon = "✅" http_icon = "✅" if errors == 0 else "❌" err404_icon = "✅" if current_404 == 0 else "❌" schema_icon = "✅" if schema_errs == 0 else "❌" transl_icon = "✅" if transl_errs == 0 else "❌" search_icon = "✅" if search_results > 0 else "❌" schema_text = "poprawne" if schema_errs == 0 else f"{schema_errs} błędów" transl_text = "poprawne" if transl_errs == 0 else f"{transl_errs} błędów" search_text = f"{search_results}" if search_results >= 0 else "BŁĄD" info_msg = f"🏁 AUDYT ZAKOŃCZONY: {domain}\n\n" info_msg += f"{total_icon} Przeskanowano: {total}\n" info_msg += f"{http_icon} Błędy HTTP: {errors}\n" info_msg += f"{err404_icon} Błędy 404: {current_404}{regression_str}\n" info_msg += f"{schema_icon} Dane strukturalne: {schema_text}\n" info_msg += f"{transl_icon} Tłumaczenia: {transl_text}\n" info_msg += f"{search_icon} Wyszukiwarka: {search_text}\n\n" if self.critical_errors or self.schema_errors or self.translation_issues: info_msg += f"🚨 Wykryto błędy. Szczegóły na kanale ERRORS." else: info_msg += f"✅ Brak krytycznych błędów." self.send(info_msg, target='info') # 2. RAPORT ERRORS if self.critical_errors or self.schema_errors or self.translation_issues: err_msg = f"🚨 BŁĘDY: {domain}\n\n" prompt_text = ( "Twoim zadaniem jest weryfikacja poniższych błędów na sklepie i przygotowanie planu naprawy. " "Znasz strukturę plików, masz dostęp do bazy danych sklepu oraz wiesz jak działają wszystkie mechanizmy cache. " "WAŻNE: Podane błędy zostały wykryte przez crawler, który analizował wyrenderowany kod HTML stron sklepu. " "Crawler nie sprawdzał bazy danych – może być tak, że w bazie dane są w pełni poprawne, " "a problem leży po stronie modułów (np. wstrzykujących dane strukturalne do źródła strony).\n" "Oto zestawienie błędów do przeanalizowania:\n\n" ) if self.critical_errors: err_msg += f"❌ KRYTYCZNE:\n" prompt_text += "BŁĘDY KRYTYCZNE:\n" for url, err in self.critical_errors[:15]: safe_url = html.escape(url) err_msg += f"• {err}: {safe_url}\n" prompt_text += f"- {err}: {url}\n" err_msg += "\n" prompt_text += "\n" if self.schema_errors: err_msg += f"🛠 SCHEMA.ORG:\n" prompt_text += "BŁĘDY SCHEMA.ORG:\n" for url, count in self.schema_errors[:10]: safe_url = html.escape(url) err_msg += f"• Brak {count} pól: {safe_url}\n" prompt_text += f"- Brak {count} pól: {url}\n" err_msg += "\n" prompt_text += "\n" if self.translation_issues: err_msg += f"🌐 TŁUMACZENIA:\n" prompt_text += "BŁĘDY TŁUMACZEŃ:\n" for issue in self.translation_issues[:10]: err_msg += f"• {html.escape(issue)}\n" prompt_text += f"- {issue}\n" err_msg += "\n" prompt_text += "\n" err_msg += f"🤖 Gotowy prompt dla Agenta AI:\n" err_msg += f"
{html.escape(prompt_text.strip())}
" self.send(err_msg, target='errors') def crawler(start_url, db_file, max_threads, tg_notifier): parsed_start = urlparse(start_url) base_url = f"{parsed_start.scheme}://{parsed_start.netloc}" base_domain = parsed_start.netloc conn = sqlite3.connect(db_file, check_same_thread=False) cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS pages ( id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT UNIQUE, source_url TEXT, status INTEGER, total_time REAL, ttfb REAL, google_access TEXT, index_status TEXT, schema_critical INTEGER DEFAULT 0, schema_warnings INTEGER DEFAULT 0, images_no_alt INTEGER DEFAULT 0, images_no_webp INTEGER DEFAULT 0, title TEXT, meta_desc TEXT, canonical TEXT, lang TEXT, timestamp DATETIME)''') cursor.execute('''CREATE TABLE IF NOT EXISTS structured_data (id INTEGER PRIMARY KEY AUTOINCREMENT, page_id INTEGER, schema_type TEXT, full_json TEXT, sku TEXT, FOREIGN KEY(page_id) REFERENCES pages(id))''') cursor.execute('''CREATE TABLE IF NOT EXISTS translation_audit (id INTEGER PRIMARY KEY AUTOINCREMENT, sku TEXT, field TEXT, lang1 TEXT, lang2 TEXT, content TEXT)''') cursor.execute('''CREATE TABLE IF NOT EXISTS images_audit (id INTEGER PRIMARY KEY AUTOINCREMENT, page_id INTEGER, img_url TEXT, alt TEXT, is_modern INTEGER, has_modern_source INTEGER, FOREIGN KEY(page_id) REFERENCES pages(id))''') conn.commit() db_queue = queue.Queue() def db_worker(): db_conn = sqlite3.connect(db_file) db_cursor = db_conn.cursor() while True: item = db_queue.get() if item is None: break try: p = item['page'] db_cursor.execute('''INSERT OR REPLACE INTO pages (url, source_url, status, total_time, ttfb, google_access, index_status, schema_critical, schema_warnings, images_no_alt, images_no_webp, title, meta_desc, canonical, lang, timestamp) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', (p['url'], p['source'], p['status'], p['time'], p['ttfb'], p['access'], p['idx'], p['s_crit'], p['s_warn'], p.get('images_no_alt',0), p.get('images_no_webp',0), p.get('title',''), p.get('meta_desc',''), p.get('canonical',''), p['lang'], p['ts'])) page_id = db_cursor.lastrowid for s in item['schemas']: db_cursor.execute('INSERT INTO structured_data (page_id, schema_type, full_json, sku) VALUES (?, ?, ?, ?)', (page_id, s['type'], s['json'], s.get('sku'))) if 'images' in item: for img in item['images']: db_cursor.execute('INSERT INTO images_audit (page_id, img_url, alt, is_modern, has_modern_source) VALUES (?, ?, ?, ?, ?)', (page_id, img['img_url'], img['alt'], img['is_modern'], img['has_modern_source'])) db_conn.commit() except: pass finally: db_queue.task_done() db_conn.close() db_thread = threading.Thread(target=db_worker) db_thread.start() rp = RobotFileParser() try: rp.set_url(urljoin(base_url, "robots.txt")); rp.read() except: pass visited, crawled_count, error_count = {start_url}, 0, 0 total_response_time = 0.0 visited_lock, stats_lock = threading.Lock(), threading.Lock() url_queue = queue.Queue() url_queue.put((start_url, "Start")) stop_event = threading.Event() session = requests.Session() session.headers.update({'User-Agent': GOOGLEBOT_UA}) def analyze_schema(soup): scripts = soup.find_all('script', type='application/ld+json') results, crit, warn = [], 0, 0 def get_val(obj, path): curr = obj for p in path.split('.'): if isinstance(curr, dict) and p in curr: curr = curr[p] else: return None return curr for script in scripts: try: data = json.loads(script.string) objs = data if isinstance(data, list) else [data] for obj in objs: if not isinstance(obj, dict): continue sku = get_val(obj, 'sku') or get_val(obj, 'mpn') if 'Product' in str(obj.get('@type', '')): if not get_val(obj, 'name') or not get_val(obj, 'image') or not get_val(obj, 'offers.price'): crit += 1 results.append({'type': str(obj.get('@type', 'Unknown')), 'json': json.dumps(obj, ensure_ascii=False), 'sku': str(sku) if sku else None}) except: pass return results, crit, warn def analyze_images(soup, url): images_data = [] no_alt, no_webp = 0, 0 for img in soup.find_all('img'): src = img.get('src') or img.get('data-src') or '' if not src or src.startswith('data:image'): continue alt = img.get('alt', '').strip() if img.get('alt') is not None else '' alt_text = alt if alt else '[BRAK]' is_modern = src.lower().endswith(('.webp', '.avif', '.svg')) parent = img.find_parent('picture') has_modern_source = False if parent: for source in parent.find_all('source'): srcs = source.get('srcset', '') typ = source.get('type', '') if 'webp' in srcs.lower() or 'avif' in srcs.lower() or 'webp' in typ or 'avif' in typ: has_modern_source = True break if not has_modern_source: srcset = img.get('srcset', '') if 'webp' in srcset.lower() or 'avif' in srcset.lower(): has_modern_source = True images_data.append({ 'img_url': urljoin(url, src), 'alt': alt_text, 'is_modern': int(is_modern), 'has_modern_source': int(has_modern_source) }) if alt_text == '[BRAK]': no_alt += 1 if not is_modern and not has_modern_source: no_webp += 1 return images_data, no_alt, no_webp def process_url(url, source): nonlocal crawled_count, error_count, total_response_time if not rp.can_fetch("Googlebot", url): tg_notifier.add_critical(url, "ROBOTS.TXT BLOCK") db_queue.put({'page': {'url': url, 'source': source, 'status': 0, 'time': 0, 'ttfb': 0, 'access': 'Blocked', 'idx': '-', 's_crit': 0, 's_warn': 0, 'images_no_alt': 0, 'images_no_webp': 0, 'title': '', 'meta_desc': '', 'canonical': '', 'lang': '?', 'ts': datetime.now().isoformat()}, 'schemas': [], 'images': []}) return try: start_t = time.time() resp = session.get(url, timeout=10, stream=True) ttfb = round(time.time() - start_t, 4) soup = BeautifulSoup(resp.text, 'lxml') total_t = round(time.time() - start_t, 4) lang = soup.find('html').get('lang', 'unknown') if soup.find('html') else 'unknown' schemas, s_crit, s_warn = analyze_schema(soup) idx = "Indexable" if 'noindex' in resp.headers.get('X-Robots-Tag', '').lower(): idx = "Noindex" elif soup.find('meta', attrs={'name': ['robots', 'googlebot'], 'content': lambda x: x and 'noindex' in x.lower()}): idx = "Noindex" title_tag = soup.find('title') title = title_tag.text.strip() if title_tag else '' meta_desc_tag = soup.find('meta', attrs={'name': 'description'}) meta_desc = meta_desc_tag.get('content', '').strip() if meta_desc_tag else '' canonical_tag = soup.find('link', rel='canonical') canonical = canonical_tag.get('href', '').strip() if canonical_tag else '' images_data, no_alt, no_webp = analyze_images(soup, url) if resp.status_code >= 500: tg_notifier.add_critical(url, f"ERR {resp.status_code}") if resp.status_code == 404: tg_notifier.add_critical(url, "404") if s_crit > 0: tg_notifier.add_schema(url, s_crit) db_queue.put({'page': {'url': url, 'source': source, 'status': resp.status_code, 'time': total_t, 'ttfb': ttfb, 'access': 'Allowed', 'idx': idx, 's_crit': s_crit, 's_warn': s_warn, 'images_no_alt': no_alt, 'images_no_webp': no_webp, 'title': title, 'meta_desc': meta_desc, 'canonical': canonical, 'lang': lang, 'ts': datetime.now().isoformat()}, 'schemas': schemas, 'images': images_data}) with stats_lock: crawled_count += 1 total_response_time += total_t if resp.status_code != 200: error_count += 1 if resp.status_code == 200 and 'text/html' in resp.headers.get('Content-Type', ''): for link in soup.find_all('a', href=True): full = urljoin(url, link['href']) parsed = urlparse(full) if parsed.netloc == base_domain: clean = parsed._replace(query='', fragment='').geturl() with visited_lock: if clean not in visited: visited.add(clean); url_queue.put((clean, url)) except: with stats_lock: error_count += 1 def worker(): while not stop_event.is_set(): try: u, s = url_queue.get(timeout=0.5); process_url(u, s); url_queue.task_done() except queue.Empty: continue threads = [threading.Thread(target=worker, daemon=True) for _ in range(max_threads)] for t in threads: t.start() try: while not stop_event.is_set() and url_queue.unfinished_tasks > 0: with stats_lock: cc = crawled_count err = error_count avg = round(total_response_time / cc, 3) if cc > 0 else 0 q_size = url_queue.unfinished_tasks print(f"\r[AUDYT] Skanowanie: {cc} | Błędy: {err} | Kolejka: {q_size} | Średni czas: {avg}s ", end="") time.sleep(0.5) except KeyboardInterrupt: print("\n[!] Przerwano (Ctrl+C). Trwa bezpieczne zamykanie...") stop_event.set() while not url_queue.empty(): try: url_queue.get_nowait(); url_queue.task_done() except queue.Empty: break if not stop_event.is_set(): url_queue.join() else: # Czekamy krótką chwilę, by wątki url_queue zdążyły zobaczyć stop_event time.sleep(1) print("\n[*] Zapisywanie bazy danych, proszę czekać...") db_queue.put(None) db_thread.join() # AUDYT WIELOJĘZYCZNOŚCI cursor.execute('SELECT sku, lang, full_json FROM structured_data JOIN pages ON structured_data.page_id = pages.id WHERE sku IS NOT NULL AND sku != "None"') sku_map = {} for sku, lang, fjson in cursor.fetchall(): if sku not in sku_map: sku_map[sku] = {} data = json.loads(fjson) sku_map[sku][lang] = {'name': data.get('name', ''), 'description': data.get('description', '')} for sku, langs in sku_map.items(): lang_list = list(langs.keys()) if len(lang_list) > 1: for i in range(len(lang_list)): for j in range(i + 1, len(lang_list)): l1, l2 = lang_list[i], lang_list[j] if langs[l1]['name'] == langs[l2]['name']: cursor.execute('INSERT INTO translation_audit (sku, field, lang1, lang2, content) VALUES (?, ?, ?, ?, ?)', (sku, 'name', l1, l2, langs[l1]['name'])) tg_notifier.add_translation_issue(sku, l1, l2, 'name') conn.commit(); conn.close() # TEST WYSZUKIWARKI search_count = -1 try: print("\n[*] Przeprowadzanie testu wyszukiwarki (szukana fraza: karuzela)...") search_url = "https://fluo.dog/szukaj?controller=search&s=karuzela" resp_search = session.get(search_url, timeout=15) if resp_search.status_code == 200: soup_search = BeautifulSoup(resp_search.text, 'lxml') products = soup_search.find_all('article', class_='product-miniature') search_count = len(products) if search_count == 0: tg_notifier.add_critical(search_url, "TEST WYSZUKIWARKI: Brak wyników (0) dla 'karuzela'!") print("[!] Test wyszukiwarki NIEPOWODZENIE: 0 wyników.") else: print(f"[*] Test wyszukiwarki OK: znaleziono {search_count} produktów.") else: tg_notifier.add_critical(search_url, f"TEST WYSZUKIWARKI: Błąd HTTP {resp_search.status_code}") print(f"[!] Test wyszukiwarki BŁĄD HTTP: {resp_search.status_code}") except Exception as e: tg_notifier.add_critical("https://fluo.dog/szukaj?controller=search&s=karuzela", f"TEST WYSZUKIWARKI: Błąd połączenia") print(f"[!] Test wyszukiwarki BŁĄD: {e}") tg_notifier.send_final_report(start_url, crawled_count, error_count, db_file, search_results=search_count) if __name__ == "__main__": def load_config(): try: # Wymuszamy utf-8, żeby uniknąć problemów z kodowaniem Windows with open("config.json", "r", encoding="utf-8") as f: raw_config = json.load(f) # Czyścimy klucze ze spacji na wszelki wypadek return {k.strip(): v for k, v in raw_config.items()} except Exception as e: print(f"[!] Błąd wczytywania config.json: {e}") return {} parser = argparse.ArgumentParser(description="Crawler SEO - Podgląd błędów i audyt.") parser.add_argument("--url", default="https://fluo.dog", help="Startowy URL") parser.add_argument("--threads", type=int, default=10, help="Liczba wątków") args = parser.parse_args() # Upewnij się, że katalog scans istnieje if not os.path.exists("scans"): os.makedirs("scans") config = load_config() # Debug: wypiszmy jakie klucze faktycznie widzi Python available_keys = ", ".join(config.keys()) print(f"[*] Wczytane klucze z config: {available_keys}") token = config.get("telegram_token") id_info = config.get("telegram_chat_id_info") id_err = config.get("telegram_chat_id_errors") print(f"[*] Konfiguracja: INFO_ID={id_info}, ERRORS_ID={id_err}") notifier = TelegramNotifier(token, id_info, id_err) if notifier.enabled: print("[*] Telegram powiadomienia: WŁĄCZONE") notifier.send(f"🚀 Rozpoczynam audyt SEO dla: {html.escape(args.url)}", target='info') else: print("[!] Telegram powiadomienia: WYŁĄCZONE (sprawdź czy klucze w config.json są poprawne)") db_name = f"scans/crawler_v18_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db" crawler(args.url, db_name, args.threads, notifier)