import sqlite3
import json
import glob
import os
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import HTMLResponse, StreamingResponse
from typing import List, Optional
import io
import csv
app = FastAPI(title="Crawler SEO Dashboard API")
def get_db_conn(db_name: str):
# Sprawdź czy db_name zawiera już katalog, jeśli nie - dodaj scans/
if not db_name.startswith("scans/"):
db_path = os.path.join("scans", db_name)
else:
db_path = db_name
if not os.path.exists(db_path):
raise HTTPException(status_code=404, detail=f"Baza danych nie istnieje: {db_path}")
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
@app.get("/", response_class=HTMLResponse)
def get_dashboard():
return """
Crawler SEO Dashboard
"""
@app.get("/api/list-dbs")
def list_dbs():
dbs = glob.glob("scans/*.db")
# Zwracamy same nazwy plików dla ładniejszego widoku w select
return sorted([os.path.basename(db) for db in dbs], reverse=True)
@app.get("/api/stats")
def get_stats(db: str):
conn = get_db_conn(db)
cursor = conn.cursor()
stats = {"total_pages": 0, "errors": 0, "avg_time": 0, "schema_objects": 0, "img_issues": 0, "translation_errors": 0}
try:
stats["total_pages"] = cursor.execute("SELECT COUNT(*) FROM pages").fetchone()[0]
stats["errors"] = cursor.execute("SELECT COUNT(*) FROM pages WHERE status != 200 AND status != 0").fetchone()[0]
stats["avg_time"] = cursor.execute("SELECT AVG(total_time) FROM pages WHERE total_time > 0").fetchone()[0] or 0
stats["schema_objects"] = cursor.execute("SELECT COUNT(*) FROM structured_data").fetchone()[0]
except: pass
try:
img_stats = cursor.execute("SELECT SUM(images_no_alt), SUM(images_no_webp) FROM pages").fetchone()
stats["img_issues"] = (img_stats[0] or 0) + (img_stats[1] or 0)
except: pass
try:
stats["translation_errors"] = cursor.execute("SELECT COUNT(*) FROM translation_audit").fetchone()[0]
except: pass
conn.close()
return stats
@app.get("/api/pages")
def get_pages(db: str, status_type: Optional[str] = "all"):
conn = get_db_conn(db)
cursor = conn.cursor()
try:
query = "SELECT * FROM pages"
try:
cursor.execute("SELECT images_no_alt FROM pages LIMIT 1")
has_img_cols = True
except: has_img_cols = False
if status_type == "error": query += " WHERE status != 200 AND status != 0"
elif status_type == "noindex": query += " WHERE index_status LIKE 'Noindex%'"
elif status_type == "slow": query += " WHERE total_time > 1.5"
elif status_type == "images" and has_img_cols: query += " WHERE images_no_alt > 0 OR images_no_webp > 0"
query += " ORDER BY id DESC LIMIT 1000"
pages = cursor.execute(query).fetchall()
return [dict(p) for p in pages]
except: return []
finally: conn.close()
@app.get("/api/translations")
def get_translations(db: str):
conn = get_db_conn(db)
cursor = conn.cursor()
try:
try:
cursor.execute("SELECT title, meta_desc FROM pages LIMIT 1")
has_meta = True
except: has_meta = False
if has_meta:
query = """
SELECT s.sku, p.lang, s.full_json, MIN(p.url) as url, MAX(p.title) as title, MAX(p.meta_desc) as meta_desc
FROM structured_data s
JOIN pages p ON s.page_id = p.id
WHERE s.sku IS NOT NULL AND s.sku != 'None' AND s.sku != '' AND s.schema_type LIKE '%Product%'
GROUP BY s.sku, p.lang
"""
else:
query = """
SELECT s.sku, p.lang, s.full_json, MIN(p.url) as url, '' as title, '' as meta_desc
FROM structured_data s
JOIN pages p ON s.page_id = p.id
WHERE s.sku IS NOT NULL AND s.sku != 'None' AND s.sku != '' AND s.schema_type LIKE '%Product%'
GROUP BY s.sku, p.lang
"""
rows = cursor.execute(query).fetchall()
sku_map = {}
langs_set = set()
for r in rows:
sku = str(r['sku']).strip()
lang = str(r['lang']).lower().strip()
if '-' in lang: lang = lang.split('-')[0]
langs_set.add(lang)
try: data = json.loads(r['full_json'])
except: continue
obj = {}
if isinstance(data, list): obj = next((item for item in data if 'Product' in str(item.get('@type', ''))), {})
else: obj = data if 'Product' in str(data.get('@type', '')) else {}
name = obj.get('name', '').strip()
desc = obj.get('description', '').strip()
title = (r['title'] or '').strip()
meta_desc = (r['meta_desc'] or '').strip()
slug = ''
if r['url']:
parts = r['url'].rstrip('/').split('/')
if parts: slug = parts[-1].split('?')[0].split('#')[0]
if sku not in sku_map: sku_map[sku] = {'langs': {}, 'url': r['url']}
sku_map[sku]['langs'][lang] = {
'nazwa': name, 'opis': desc,
'nazwa seo': title, 'opis seo': meta_desc, 'slug': slug
}
all_langs = sorted(list(langs_set))
if 'pl' in all_langs: all_langs.remove('pl')
results = []
fields = ['nazwa', 'opis', 'nazwa seo', 'opis seo', 'slug']
for sku, info in sku_map.items():
if 'pl' not in info['langs']: continue
pl_data = info['langs']['pl']
sku_has_errors = False
sku_rows = []
for field in fields:
pl_val = pl_data.get(field, '')
if not pl_val: continue
row = {'sku': sku, 'field': field, 'url': info['url']}
for lang in all_langs:
l_val = info['langs'].get(lang, {}).get(field, '')
if not l_val or l_val == pl_val:
row[lang] = 'X'
sku_has_errors = True
else:
row[lang] = 'V'
sku_rows.append(row)
if sku_has_errors:
results.extend(sku_rows)
return {"langs": all_langs, "data": results}
except Exception as e:
print(f"Error in translations: {e}")
return {"langs": [], "data": []}
finally: conn.close()
@app.get("/api/analysis/{page_id}")
def get_analysis(db: str, page_id: int):
conn = get_db_conn(db)
cursor = conn.cursor()
try:
schemas = cursor.execute("SELECT schema_type, full_json FROM structured_data WHERE page_id = ?", (page_id,)).fetchall()
try: images = cursor.execute("SELECT img_url, alt, is_modern, has_modern_source FROM images_audit WHERE page_id = ?", (page_id,)).fetchall()
except: images = []
schema_list = []
for s in schemas:
try: schema_list.append({"type": s["schema_type"], "data": json.loads(s["full_json"])})
except: schema_list.append({"type": s["schema_type"], "data": s["full_json"]})
return {"schemas": schema_list, "images": [dict(img) for img in images]}
finally:
conn.close()
@app.get("/api/export-csv")
def export_csv(db: str, status_type: Optional[str] = "all"):
conn = get_db_conn(db)
cursor = conn.cursor()
try:
cursor.execute("SELECT images_no_alt FROM pages LIMIT 1")
has_img_cols = True
except: has_img_cols = False
query = "SELECT * FROM pages"
if status_type == "error": query += " WHERE status != 200 AND status != 0"
elif status_type == "noindex": query += " WHERE index_status LIKE 'Noindex%'"
elif status_type == "slow": query += " WHERE total_time > 1.5"
elif status_type == "images" and has_img_cols: query += " WHERE images_no_alt > 0 OR images_no_webp > 0"
query += " ORDER BY id DESC"
pages = cursor.execute(query).fetchall()
conn.close()
output = io.StringIO()
writer = csv.writer(output, delimiter=';')
if pages:
keys = list(dict(pages[0]).keys())
writer.writerow([k.upper() for k in keys])
for p in pages:
writer.writerow([dict(p).get(k, '') for k in keys])
else:
writer.writerow(['BRAK DANYCH'])
output.seek(0)
filename = f"raport_seo_{status_type}_{db.replace('.db', '')}.csv"
return StreamingResponse(io.BytesIO(output.getvalue().encode('utf-8-sig')), media_type="text/csv", headers={"Content-Disposition": f"attachment; filename={filename}"})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)