# app.py import os import uuid import sqlite3 from datetime import datetime, timedelta from io import BytesIO from flask import ( Flask, render_template, request, redirect, url_for, flash, session, g, abort, send_file, jsonify, Response ) from werkzeug.utils import secure_filename from werkzeug.middleware.proxy_fix import ProxyFix from werkzeug.security import generate_password_hash, check_password_hash from PIL import Image from pygments import highlight from pygments.lexers import get_lexer_by_name from pygments.formatters import HtmlFormatter from apscheduler.schedulers.background import BackgroundScheduler from cryptography.fernet import Fernet from flask_limiter import Limiter from flask_limiter.util import get_remote_address from dotenv import load_dotenv # Load environment variables from a .env file load_dotenv() app = Flask(__name__) app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1) app.config['SECRET_KEY'] = os.getenv('SSP_SECRET_KEY') app.config['ADMIN_PASSWORD_HASH'] = os.getenv('SSP_ADMIN_PASSWORD_HASH') app.config['ADMIN_URL'] = os.getenv('SSP_ADMIN_URL') enc_key = os.getenv('SSP_ENCRYPTION_KEY') if not enc_key: raise ValueError("FATAL: SSP_ENCRYPTION_KEY is not set in the environment.") app.config['ENCRYPTION_KEY'] = enc_key.encode('utf-8') app.config['UPLOAD_FOLDER'] = os.getenv('SSP_UPLOAD_FOLDER', 'uploads') app.config['DATABASE_PATH'] = os.getenv('SSP_DATABASE_PATH', 'database.db') app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 # 10MB app.config['FLASK_DEBUG'] = os.getenv('SSP_FLASK_DEBUG', 'False').lower() in ('true', '1', 't') ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'webp', 'bmp', 'ico', 'tiff'} # --- Rate Limiting (I2P-aware) --- def i2p_key_func(): # Prioritize the I2P destination header for rate limiting b32 = request.headers.get('X-I2P-DestB32') if b32: return b32 # Fallback to X-Forwarded-For if behind a standard proxy fwd = request.headers.get('X-Forwarded-For') if fwd: return fwd.split(',')[0].strip() # Final fallback to the direct remote address return get_remote_address() limiter = Limiter( app=app, key_func=i2p_key_func, default_limits=["200 per day", "50 per hour"], storage_uri="memory://" ) fernet = Fernet(app.config['ENCRYPTION_KEY']) # --- Expiry Map & Languages --- EXPIRY_MAP = { "15m": timedelta(minutes=15), "1h": timedelta(hours=1), "2h": timedelta(hours=2), "4h": timedelta(hours=4), "8h": timedelta(hours=8), "12h": timedelta(hours=12), "24h": timedelta(hours=24), "48h": timedelta(hours=48) } POPULAR_LANGUAGES = [ 'text', 'bash', 'c', 'cpp', 'csharp', 'css', 'go', 'html', 'java', 'javascript', 'json', 'kotlin', 'lua', 'markdown', 'php', 'python', 'ruby', 'rust', 'sql', 'swift', 'typescript', 'xml', 'yaml' ] # --- Database Helpers --- def get_db(): if 'db' not in g: g.db = sqlite3.connect(app.config['DATABASE_PATH']) g.db.row_factory = sqlite3.Row return g.db def close_db(e=None): db = g.pop('db', None) if db: db.close() app.teardown_appcontext(close_db) def init_db(): with app.app_context(): db = get_db() c = db.cursor() c.execute(''' CREATE TABLE IF NOT EXISTS pastes ( id TEXT PRIMARY KEY, content BLOB NOT NULL, language TEXT NOT NULL, expiry_date DATETIME NOT NULL, password_hash TEXT, view_count INTEGER DEFAULT 0, max_views INTEGER ) ''') c.execute(''' CREATE TABLE IF NOT EXISTS images ( id TEXT PRIMARY KEY, upload_date DATETIME NOT NULL, expiry_date DATETIME NOT NULL, password_hash TEXT, view_count INTEGER DEFAULT 0, max_views INTEGER ) ''') c.execute('CREATE TABLE IF NOT EXISTS stats (stat_key TEXT PRIMARY KEY, stat_value INTEGER NOT NULL)') for stat in ['total_images', 'total_pastes', 'total_api_uploads']: c.execute("INSERT OR IGNORE INTO stats(stat_key,stat_value) VALUES(?,0)", (stat,)) db.commit() def update_stat(key, inc=1): db = get_db() db.execute("UPDATE stats SET stat_value = stat_value + ? WHERE stat_key = ?", (inc, key)) db.commit() # --- Cleanup Scheduler --- scheduler = BackgroundScheduler(daemon=True) def cleanup_expired_content(): with app.app_context(): now = datetime.now() conn = sqlite3.connect(app.config['DATABASE_PATH']) cur = conn.cursor() cur.execute("DELETE FROM pastes WHERE expiry_date < ?", (now,)) cur.execute("SELECT id FROM images WHERE expiry_date < ?", (now,)) for (img_id,) in cur.fetchall(): path = os.path.join(app.config['UPLOAD_FOLDER'], img_id) try: os.remove(path) except OSError as e: app.logger.error(f"Error removing expired image file {path}: {e}") cur.execute("DELETE FROM images WHERE id = ?", (img_id,)) conn.commit() conn.close() # --- Utility Functions --- def allowed_file(fn): return '.' in fn and fn.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def get_time_left(expiry_str): try: expiry = datetime.fromisoformat(expiry_str) rem = expiry - datetime.now() if rem.total_seconds() <= 0: return "Expired" days = rem.days hrs = rem.seconds // 3600 mins = (rem.seconds % 3600) // 60 if days > 0: return f"~{days} days {hrs} hours" if hrs > 0: return f"~{hrs} hours {mins} minutes" return f"~{mins} minutes" except (ValueError, TypeError): return "N/A" def process_and_encrypt_image(stream, orig_fn, keep_exif=False): try: img = Image.open(stream) if img.mode in ('RGBA', 'P'): img = img.convert('RGB') buf = BytesIO() exif = img.info.get('exif') if keep_exif and 'exif' in img.info else None save_params = {'quality': 80} if exif: save_params['exif'] = exif img.save(buf, 'WEBP', **save_params) buf.seek(0) encrypted = fernet.encrypt(buf.read()) new_fn = f"{uuid.uuid4().hex}.webp" path = os.path.join(app.config['UPLOAD_FOLDER'], new_fn) with open(path, 'wb') as f: f.write(encrypted) return new_fn except Exception as e: app.logger.error(f"Image processing failed ({orig_fn}): {e}") return None @app.context_processor def inject_announcement(): try: with open('announcement.txt', 'r') as f: msg = f.read().strip() if msg: return dict(announcement_enabled=True, announcement_message=msg) except FileNotFoundError: pass return dict(announcement_enabled=False, announcement_message='') # --- Error Handlers --- @app.errorhandler(404) def not_found(e): flash('Content not found or has expired.', 'error') return redirect(url_for('index')) @app.errorhandler(410) def gone(e): flash('Content has expired due to exceeding its view limit.', 'error') return redirect(url_for('index')) @app.errorhandler(413) def too_large(e): if request.path.startswith('/api/'): return jsonify(error="File is too large (max 10MB)."), 413 flash('File is too large (max 10MB).', 'error') return redirect(url_for('index')) @app.errorhandler(429) def rate_limited(e): if request.path.startswith('/api/'): return jsonify(error=f"Rate limit exceeded: {e.description}"), 429 flash('Too many requests. Please wait a while.', 'error') return redirect(url_for('index')) # --- Health Check --- @app.route('/healthz') def healthz(): try: conn = sqlite3.connect(app.config['DATABASE_PATH']) conn.execute("SELECT 1").fetchone() conn.close() db_status = "ok" except Exception as e: app.logger.error(f"Health check DB error: {e}") db_status = "error" sched_status = "running" if scheduler.running and scheduler.state == 1 else "stopped" return jsonify(database=db_status, scheduler=sched_status) # --- Web UI Routes --- @app.route('/') def index(): db = get_db() rows = db.execute("SELECT stat_key, stat_value FROM stats").fetchall() stats = {r['stat_key']: r['stat_value'] for r in rows} # We want 'text' to be at the top of the list in the index page dropdown index_languages = [lang for lang in POPULAR_LANGUAGES if lang != 'text'] return render_template( 'index.html', languages=index_languages, stats=stats, allowed_extensions=list(ALLOWED_EXTENSIONS) ) @app.route('/donate') def donate_page(): return render_template('donate.html') if not app.config.get('ADMIN_URL'): raise ValueError("Configuration Error: SSP_ADMIN_URL is not set.") @app.route(app.config['ADMIN_URL'], methods=['GET', 'POST']) def admin_dashboard(): if request.method == 'POST': pw = request.form.get('password', '') if app.config['ADMIN_PASSWORD_HASH'] and check_password_hash(app.config['ADMIN_PASSWORD_HASH'], pw): session['admin_logged_in'] = True return redirect(url_for('admin_dashboard')) flash('Incorrect password.', 'error') if not session.get('admin_logged_in'): return render_template('admin.html', auth_success=False) db = get_db() now = datetime.now() imgs = db.execute("SELECT id, expiry_date, view_count, max_views FROM images ORDER BY expiry_date ASC").fetchall() past = db.execute("SELECT id, language, expiry_date, view_count, max_views FROM pastes ORDER BY expiry_date ASC").fetchall() images = [(i['id'], i['expiry_date'], get_time_left(i['expiry_date']), i['view_count'], i['max_views']) for i in imgs] pastes = [(p['id'], p['language'], p['expiry_date'], get_time_left(p['expiry_date']), p['view_count'], p['max_views']) for p in past] return render_template('admin.html', auth_success=True, images=images, pastes=pastes) @app.route('/upload/image', methods=['POST']) @limiter.limit("10 per hour") def upload_image(): if 'file' not in request.files or request.files['file'].filename == '': flash('No file selected.', 'error') return redirect(url_for('index', _anchor='image')) file = request.files['file'] if file and allowed_file(file.filename): keep_exif = bool(request.form.get('keep_exif')) new_fn = process_and_encrypt_image(file.stream, file.filename, keep_exif) if not new_fn: flash('Error processing image.', 'error') return redirect(url_for('index', _anchor='image')) now = datetime.now() expiry = now + EXPIRY_MAP.get(request.form.get('expiry', '1h'), timedelta(hours=1)) pw = request.form.get('password') or None pw_hash = generate_password_hash(pw, method='pbkdf2:sha256') if pw else None mv = request.form.get('max_views') mv = int(mv) if mv and mv.isdigit() else None db = get_db() db.execute( 'INSERT INTO images (id, upload_date, expiry_date, password_hash, max_views, view_count) VALUES (?, ?, ?, ?, ?, ?)', (new_fn, now, expiry, pw_hash, mv, -1) ) db.commit() update_stat('total_images') flash('Image uploaded successfully! This is your shareable link.', 'success') return redirect(url_for('view_image', filename=new_fn)) flash('Invalid file type.', 'error') return redirect(url_for('index', _anchor='image')) @app.route('/upload/paste', methods=['POST']) @limiter.limit("20 per hour") def upload_paste(): content = request.form.get('content', '').strip() if not content: flash('Paste content cannot be empty.', 'error') return redirect(url_for('index', _anchor='paste')) now = datetime.now() expiry = now + EXPIRY_MAP.get(request.form.get('expiry', '1h'), timedelta(hours=1)) pw = request.form.get('password') or None pw_hash = generate_password_hash(pw, method='pbkdf2:sha256') if pw else None mv = request.form.get('max_views') mv = int(mv) if mv and mv.isdigit() else None paste_id = uuid.uuid4().hex encrypted = fernet.encrypt(content.encode('utf-8')) db = get_db() db.execute( 'INSERT INTO pastes (id, content, language, expiry_date, password_hash, max_views, view_count) VALUES (?, ?, ?, ?, ?, ?, ?)', (paste_id, encrypted, request.form.get('language', 'text'), expiry, pw_hash, mv, -1) ) db.commit() update_stat('total_pastes') flash('Paste created successfully! This is your shareable link.', 'success') return redirect(url_for('view_paste', paste_id=paste_id)) @app.route('/image/', methods=['GET', 'POST']) def view_image(filename): db = get_db() row = db.execute("SELECT * FROM images WHERE id = ?", (filename,)).fetchone() if not row or datetime.now() > datetime.fromisoformat(row['expiry_date']): if row: # If row exists but is expired, delete it. db.execute("DELETE FROM images WHERE id = ?", (filename,)) db.commit() abort(404) pw_hash = row['password_hash'] if pw_hash and not session.get(f'unlocked_image_{filename}'): if request.method == 'POST': if check_password_hash(pw_hash, request.form.get('password', '')): session[f'unlocked_image_{filename}'] = True return redirect(url_for('view_image', filename=filename)) flash('Incorrect password.', 'error') return render_template('view_image.html', password_required=True, filename=filename) return render_template('view_image.html', password_required=False, filename=filename, time_left=get_time_left(row['expiry_date']) ) @app.route('/paste/', methods=['GET', 'POST']) def view_paste(paste_id): db = get_db() row = db.execute("SELECT * FROM pastes WHERE id = ?", (paste_id,)).fetchone() if not row or datetime.now() > datetime.fromisoformat(row['expiry_date']): if row: db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) db.commit() abort(404) pw_hash = row['password_hash'] if pw_hash and not session.get(f'unlocked_paste_{paste_id}'): if request.method == 'POST': if check_password_hash(pw_hash, request.form.get('password', '')): session[f'unlocked_paste_{paste_id}'] = True return redirect(url_for('view_paste', paste_id=paste_id)) flash('Incorrect password.', 'error') return render_template('view_paste.html', password_required=True, paste_id=paste_id) if row['max_views'] is not None and row['view_count'] >= row['max_views']: db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) db.commit() abort(410) # Only increment view count on the initial, non-overridden view if 'lang' not in request.args: db.execute("UPDATE pastes SET view_count = view_count + 1 WHERE id = ?", (paste_id,)) db.commit() content = fernet.decrypt(row['content']).decode('utf-8') # Get the language, allowing for a user override via URL parameter default_language = row['language'] selected_language = request.args.get('lang', default_language) try: lexer = get_lexer_by_name(selected_language) except: lexer = get_lexer_by_name('text') fmt = HtmlFormatter(style='monokai', cssclass='syntax', linenos='table') highlighted = highlight(content, lexer, fmt) return render_template('view_paste.html', password_required=False, paste_id=paste_id, highlighted_content=highlighted, time_left=get_time_left(row['expiry_date']), languages=POPULAR_LANGUAGES, selected_language=selected_language ) @app.route('/paste//raw') def paste_raw(paste_id): db = get_db() row = db.execute("SELECT * FROM pastes WHERE id = ?", (paste_id,)).fetchone() if not row or datetime.now() > datetime.fromisoformat(row['expiry_date']): abort(404) if row['password_hash'] and not session.get(f'unlocked_paste_{paste_id}'): abort(403) if row['max_views'] is not None and row['view_count'] >= row['max_views']: db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) db.commit() abort(410) db.execute("UPDATE pastes SET view_count = view_count + 1 WHERE id = ?", (paste_id,)) db.commit() text = fernet.decrypt(row['content']).decode('utf-8') return Response(text, mimetype='text/plain') @app.route('/uploads/') def get_upload(filename): safe_fn = secure_filename(filename) path = os.path.join(app.config['UPLOAD_FOLDER'], safe_fn) db = get_db() row = db.execute("SELECT * FROM images WHERE id = ?", (safe_fn,)).fetchone() if not row or not os.path.exists(path) or datetime.now() > datetime.fromisoformat(row['expiry_date']): if row: db.execute("DELETE FROM images WHERE id = ?", (safe_fn,)) db.commit() if os.path.exists(path): os.remove(path) abort(404) if row['password_hash'] and not session.get(f'unlocked_image_{safe_fn}'): abort(403) if row['max_views'] is not None and row['view_count'] >= row['max_views']: db.execute("DELETE FROM images WHERE id = ?", (safe_fn,)) db.commit() os.remove(path) abort(410) db.execute("UPDATE images SET view_count = view_count + 1 WHERE id = ?", (safe_fn,)) db.commit() try: with open(path, 'rb') as f: encrypted = f.read() data = fernet.decrypt(encrypted) return send_file(BytesIO(data), mimetype='image/webp') except Exception as e: app.logger.error(f"Error serving image {safe_fn}: {e}") abort(500) @app.route('/admin/delete/image/', methods=['POST']) def delete_image(filename): if not session.get('admin_logged_in'): abort(401) safe = secure_filename(filename) path = os.path.join(app.config['UPLOAD_FOLDER'], safe) try: if os.path.exists(path): os.remove(path) db = get_db() db.execute("DELETE FROM images WHERE id = ?", (safe,)) db.commit() flash(f'Image "{safe}" has been deleted.', 'success') except Exception as e: flash(f'Error deleting image file: {e}', 'error') return redirect(url_for('admin_dashboard')) @app.route('/admin/delete/paste/', methods=['POST']) def delete_paste(paste_id): if not session.get('admin_logged_in'): abort(401) try: db = get_db() db.execute("DELETE FROM pastes WHERE id = ?", (paste_id,)) db.commit() flash(f'Paste "{paste_id}" has been deleted.', 'success') except Exception as e: flash(f'Error deleting paste: {e}', 'error') return redirect(url_for('admin_dashboard')) # --- API Routes --- @app.route('/api/upload/image', methods=['POST']) @limiter.limit("50 per hour") def api_upload_image(): if 'file' not in request.files or request.files['file'].filename == '': return jsonify(error="No file selected"), 400 file = request.files['file'] if file and allowed_file(file.filename): new_fn = process_and_encrypt_image(file.stream, file.filename, bool(request.form.get('keep_exif'))) if not new_fn: return jsonify(error="Failed to process image"), 500 now = datetime.now() expiry = now + EXPIRY_MAP.get(request.form.get('expiry', '1h'), timedelta(hours=1)) pw = request.form.get('password') pw_hash = generate_password_hash(pw, method='pbkdf2:sha256') if pw else None mv = request.form.get('max_views') mv = int(mv) if mv and mv.isdigit() else None db = get_db() db.execute( 'INSERT INTO images (id, upload_date, expiry_date, password_hash, max_views, view_count) VALUES (?, ?, ?, ?, ?, ?)', (new_fn, now, expiry, pw_hash, mv, -1) ) db.commit() update_stat('total_api_uploads') return jsonify(success=True, url=url_for('get_upload', filename=new_fn, _external=True)), 200 return jsonify(error="Invalid file type"), 400 @app.route('/api/upload/paste', methods=['POST']) @limiter.limit("100 per hour") def api_upload_paste(): if not request.is_json: return jsonify(error="Request must be JSON"), 400 data = request.get_json() content = data.get('content', '').strip() if not content: return jsonify(error="Paste content is missing"), 400 now = datetime.now() expiry = now + EXPIRY_MAP.get(data.get('expiry', '1h'), timedelta(hours=1)) pw = data.get('password') pw_hash = generate_password_hash(pw, method='pbkdf2:sha256') if pw else None mv = data.get('max_views') mv = int(mv) if mv and str(mv).isdigit() else None paste_id = uuid.uuid4().hex encrypted = fernet.encrypt(content.encode('utf-8')) db = get_db() db.execute( 'INSERT INTO pastes (id, content, language, expiry_date, password_hash, max_views, view_count) VALUES (?, ?, ?, ?, ?, ?, ?)', (paste_id, encrypted, data.get('language', 'text'), expiry, pw_hash, mv, -1) ) db.commit() update_stat('total_api_uploads') return jsonify(success=True, url=url_for('view_paste', paste_id=paste_id, _external=True)), 200 if __name__ == '__main__': required_vars = ['SSP_SECRET_KEY', 'SSP_ADMIN_PASSWORD_HASH', 'SSP_ADMIN_URL', 'SSP_ENCRYPTION_KEY'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: print(f"FATAL ERROR: Required environment variables are not set: {', '.join(missing_vars)}") exit(1) os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) with app.app_context(): init_db() scheduler.add_job(cleanup_expired_content, 'interval', minutes=15) scheduler.start() print(f"Starting Flask app with debug mode: {app.config['FLASK_DEBUG']}") # Run the app. Debug mode is controlled by the SSP_FLASK_DEBUG environment variable. # For production, it's recommended to use a proper WSGI server like Gunicorn or uWSGI. app.run(debug=app.config['FLASK_DEBUG'], use_reloader=False)