import sqlite3 import json import glob import os from fastapi import FastAPI, HTTPException, Query from fastapi.responses import HTMLResponse, StreamingResponse from typing import List, Optional import io import csv app = FastAPI(title="Crawler SEO Dashboard API") def get_db_conn(db_name: str): # Sprawdź czy db_name zawiera już katalog, jeśli nie - dodaj scans/ if not db_name.startswith("scans/"): db_path = os.path.join("scans", db_name) else: db_path = db_name if not os.path.exists(db_path): raise HTTPException(status_code=404, detail=f"Baza danych nie istnieje: {db_path}") conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row return conn @app.get("/", response_class=HTMLResponse) def get_dashboard(): return """ Crawler SEO Dashboard
""" @app.get("/api/list-dbs") def list_dbs(): dbs = glob.glob("scans/*.db") # Zwracamy same nazwy plików dla ładniejszego widoku w select return sorted([os.path.basename(db) for db in dbs], reverse=True) @app.get("/api/stats") def get_stats(db: str): conn = get_db_conn(db) cursor = conn.cursor() stats = {"total_pages": 0, "errors": 0, "avg_time": 0, "schema_objects": 0, "img_issues": 0, "translation_errors": 0} try: stats["total_pages"] = cursor.execute("SELECT COUNT(*) FROM pages").fetchone()[0] stats["errors"] = cursor.execute("SELECT COUNT(*) FROM pages WHERE status != 200 AND status != 0").fetchone()[0] stats["avg_time"] = cursor.execute("SELECT AVG(total_time) FROM pages WHERE total_time > 0").fetchone()[0] or 0 stats["schema_objects"] = cursor.execute("SELECT COUNT(*) FROM structured_data").fetchone()[0] except: pass try: img_stats = cursor.execute("SELECT SUM(images_no_alt), SUM(images_no_webp) FROM pages").fetchone() stats["img_issues"] = (img_stats[0] or 0) + (img_stats[1] or 0) except: pass try: stats["translation_errors"] = cursor.execute("SELECT COUNT(*) FROM translation_audit").fetchone()[0] except: pass conn.close() return stats @app.get("/api/pages") def get_pages(db: str, status_type: Optional[str] = "all"): conn = get_db_conn(db) cursor = conn.cursor() try: query = "SELECT * FROM pages" try: cursor.execute("SELECT images_no_alt FROM pages LIMIT 1") has_img_cols = True except: has_img_cols = False if status_type == "error": query += " WHERE status != 200 AND status != 0" elif status_type == "noindex": query += " WHERE index_status LIKE 'Noindex%'" elif status_type == "slow": query += " WHERE total_time > 1.5" elif status_type == "images" and has_img_cols: query += " WHERE images_no_alt > 0 OR images_no_webp > 0" query += " ORDER BY id DESC LIMIT 1000" pages = cursor.execute(query).fetchall() return [dict(p) for p in pages] except: return [] finally: conn.close() @app.get("/api/translations") def get_translations(db: str): conn = get_db_conn(db) cursor = conn.cursor() try: try: cursor.execute("SELECT title, meta_desc FROM pages LIMIT 1") has_meta = True except: has_meta = False if has_meta: query = """ SELECT s.sku, p.lang, s.full_json, MIN(p.url) as url, MAX(p.title) as title, MAX(p.meta_desc) as meta_desc FROM structured_data s JOIN pages p ON s.page_id = p.id WHERE s.sku IS NOT NULL AND s.sku != 'None' AND s.sku != '' AND s.schema_type LIKE '%Product%' GROUP BY s.sku, p.lang """ else: query = """ SELECT s.sku, p.lang, s.full_json, MIN(p.url) as url, '' as title, '' as meta_desc FROM structured_data s JOIN pages p ON s.page_id = p.id WHERE s.sku IS NOT NULL AND s.sku != 'None' AND s.sku != '' AND s.schema_type LIKE '%Product%' GROUP BY s.sku, p.lang """ rows = cursor.execute(query).fetchall() sku_map = {} langs_set = set() for r in rows: sku = str(r['sku']).strip() lang = str(r['lang']).lower().strip() if '-' in lang: lang = lang.split('-')[0] langs_set.add(lang) try: data = json.loads(r['full_json']) except: continue obj = {} if isinstance(data, list): obj = next((item for item in data if 'Product' in str(item.get('@type', ''))), {}) else: obj = data if 'Product' in str(data.get('@type', '')) else {} name = obj.get('name', '').strip() desc = obj.get('description', '').strip() title = (r['title'] or '').strip() meta_desc = (r['meta_desc'] or '').strip() slug = '' if r['url']: parts = r['url'].rstrip('/').split('/') if parts: slug = parts[-1].split('?')[0].split('#')[0] if sku not in sku_map: sku_map[sku] = {'langs': {}, 'url': r['url']} sku_map[sku]['langs'][lang] = { 'nazwa': name, 'opis': desc, 'nazwa seo': title, 'opis seo': meta_desc, 'slug': slug } all_langs = sorted(list(langs_set)) if 'pl' in all_langs: all_langs.remove('pl') results = [] fields = ['nazwa', 'opis', 'nazwa seo', 'opis seo', 'slug'] for sku, info in sku_map.items(): if 'pl' not in info['langs']: continue pl_data = info['langs']['pl'] sku_has_errors = False sku_rows = [] for field in fields: pl_val = pl_data.get(field, '') if not pl_val: continue row = {'sku': sku, 'field': field, 'url': info['url']} for lang in all_langs: l_val = info['langs'].get(lang, {}).get(field, '') if not l_val or l_val == pl_val: row[lang] = 'X' sku_has_errors = True else: row[lang] = 'V' sku_rows.append(row) if sku_has_errors: results.extend(sku_rows) return {"langs": all_langs, "data": results} except Exception as e: print(f"Error in translations: {e}") return {"langs": [], "data": []} finally: conn.close() @app.get("/api/analysis/{page_id}") def get_analysis(db: str, page_id: int): conn = get_db_conn(db) cursor = conn.cursor() try: schemas = cursor.execute("SELECT schema_type, full_json FROM structured_data WHERE page_id = ?", (page_id,)).fetchall() try: images = cursor.execute("SELECT img_url, alt, is_modern, has_modern_source FROM images_audit WHERE page_id = ?", (page_id,)).fetchall() except: images = [] schema_list = [] for s in schemas: try: schema_list.append({"type": s["schema_type"], "data": json.loads(s["full_json"])}) except: schema_list.append({"type": s["schema_type"], "data": s["full_json"]}) return {"schemas": schema_list, "images": [dict(img) for img in images]} finally: conn.close() @app.get("/api/export-csv") def export_csv(db: str, status_type: Optional[str] = "all"): conn = get_db_conn(db) cursor = conn.cursor() try: cursor.execute("SELECT images_no_alt FROM pages LIMIT 1") has_img_cols = True except: has_img_cols = False query = "SELECT * FROM pages" if status_type == "error": query += " WHERE status != 200 AND status != 0" elif status_type == "noindex": query += " WHERE index_status LIKE 'Noindex%'" elif status_type == "slow": query += " WHERE total_time > 1.5" elif status_type == "images" and has_img_cols: query += " WHERE images_no_alt > 0 OR images_no_webp > 0" query += " ORDER BY id DESC" pages = cursor.execute(query).fetchall() conn.close() output = io.StringIO() writer = csv.writer(output, delimiter=';') if pages: keys = list(dict(pages[0]).keys()) writer.writerow([k.upper() for k in keys]) for p in pages: writer.writerow([dict(p).get(k, '') for k in keys]) else: writer.writerow(['BRAK DANYCH']) output.seek(0) filename = f"raport_seo_{status_type}_{db.replace('.db', '')}.csv" return StreamingResponse(io.BytesIO(output.getvalue().encode('utf-8-sig')), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename={filename}"}) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8000)