update auth
This commit is contained in:
		| @@ -7,6 +7,7 @@ from flask import Flask, request, jsonify | |||||||
| from werkzeug.utils import secure_filename | from werkzeug.utils import secure_filename | ||||||
| from dotenv import load_dotenv | from dotenv import load_dotenv | ||||||
| import requests | import requests | ||||||
|  | from urllib.parse import unquote | ||||||
|  |  | ||||||
| # Load environment variables | # Load environment variables | ||||||
| load_dotenv() | load_dotenv() | ||||||
| @@ -20,12 +21,14 @@ CONTENT_PATH = os.environ.get("CONTENT_PATH", "content") | |||||||
| MEDIA_DIR = os.environ.get("MEDIA_DIR", "static/images") | MEDIA_DIR = os.environ.get("MEDIA_DIR", "static/images") | ||||||
| BRANCH = os.environ.get("BRANCH", "main") | BRANCH = os.environ.get("BRANCH", "main") | ||||||
| TOKEN_ENDPOINT = os.environ.get("TOKEN_ENDPOINT", "https://tokens.indieauth.com/token") | TOKEN_ENDPOINT = os.environ.get("TOKEN_ENDPOINT", "https://tokens.indieauth.com/token") | ||||||
|  | DOMAIN = os.environ.get("DOMAIN", "https://thesatelliteoflove.com/") | ||||||
|  | REQUIRED_SCOPES = {"create", "update", "media"} | ||||||
|  |  | ||||||
| # Initialize Flask app | # Initialize Flask app | ||||||
| app = Flask(__name__) | app = Flask(__name__) | ||||||
|  |  | ||||||
| # Logging configuration | # Logging configuration | ||||||
| logging.basicConfig(level=logging.INFO) | logging.basicConfig(level=logging.DEBUG)  # Set to INFO for production | ||||||
| logger = logging.getLogger(__name__) | logger = logging.getLogger(__name__) | ||||||
|  |  | ||||||
| # Helper: Call Gitea API | # Helper: Call Gitea API | ||||||
| @@ -33,6 +36,7 @@ def gitea_api_request(method, endpoint, data=None): | |||||||
|     url = f"{GITEA_API_URL}{endpoint}" |     url = f"{GITEA_API_URL}{endpoint}" | ||||||
|     headers = {"Authorization": f"token {GITEA_TOKEN}"} |     headers = {"Authorization": f"token {GITEA_TOKEN}"} | ||||||
|     try: |     try: | ||||||
|  |         logger.debug(f"Calling Gitea API: {method} {url}, Data: {data}") | ||||||
|         response = requests.request(method, url, headers=headers, json=data) |         response = requests.request(method, url, headers=headers, json=data) | ||||||
|         response.raise_for_status() |         response.raise_for_status() | ||||||
|         return response.json() |         return response.json() | ||||||
| @@ -43,15 +47,41 @@ def gitea_api_request(method, endpoint, data=None): | |||||||
| # Helper: Validate IndieAuth token | # Helper: Validate IndieAuth token | ||||||
| def validate_token(token): | def validate_token(token): | ||||||
|     try: |     try: | ||||||
|         response = requests.post(TOKEN_ENDPOINT, data={"token": token}) |         global DOMAIN, REQUIRED_SCOPES  # Ensure global variables are accessible | ||||||
|         response.raise_for_status() |         headers = {"Authorization": f"Bearer {token}"} | ||||||
|         return response.json()  # Should contain "me" (authenticated user's URL) |         response = requests.get("https://tokens.indieauth.com/token", headers=headers) | ||||||
|     except requests.exceptions.RequestException as e: |         if response.status_code != 200: | ||||||
|  |             logger.error(f"Token validation failed with status {response.status_code}") | ||||||
|  |             return None | ||||||
|  |  | ||||||
|  |         # Parse the x-www-form-urlencoded response | ||||||
|  |         token_data = dict(item.split("=") for item in response.text.split("&")) | ||||||
|  |         logger.debug(f"Raw token data from IndieAuth: {token_data}") | ||||||
|  |  | ||||||
|  |         # Decode URL-encoded values in the response | ||||||
|  |         for key, value in token_data.items(): | ||||||
|  |             token_data[key] = unquote(value) | ||||||
|  |         logger.debug(f"Decoded token data: {token_data}") | ||||||
|  |  | ||||||
|  |         # Validate 'me' field matches DOMAIN | ||||||
|  |         if token_data.get("me") != DOMAIN: | ||||||
|  |             raise ValueError(f"Token 'me' claim ({token_data.get('me')}) does not match the expected domain ({DOMAIN})") | ||||||
|  |  | ||||||
|  |         # Validate required scopes | ||||||
|  |         scopes = token_data.get("scope", "").split("+")  # Split by '+' instead of space | ||||||
|  |         if not REQUIRED_SCOPES.issubset(scopes): | ||||||
|  |             raise ValueError(f"Token does not include the required scopes: {REQUIRED_SCOPES}. Found scopes: {scopes}") | ||||||
|  |  | ||||||
|  |         return token_data | ||||||
|  |     except Exception as e: | ||||||
|         logger.error(f"Token validation failed: {e}") |         logger.error(f"Token validation failed: {e}") | ||||||
|         return None |         return None | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| # Upload content to Gitea | # Upload content to Gitea | ||||||
| def upload_to_gitea(filepath, content, commit_message): | def upload_to_gitea(filepath, content, commit_message): | ||||||
|  |     logger.debug(f"Uploading to Gitea: {filepath}, Commit: {commit_message}") | ||||||
|     encoded_content = base64.b64encode(content.encode() if isinstance(content, str) else content).decode() |     encoded_content = base64.b64encode(content.encode() if isinstance(content, str) else content).decode() | ||||||
|     endpoint = f"/repos/{REPO_OWNER}/{REPO_NAME}/contents/{filepath}" |     endpoint = f"/repos/{REPO_OWNER}/{REPO_NAME}/contents/{filepath}" | ||||||
|     data = { |     data = { | ||||||
| @@ -62,17 +92,22 @@ def upload_to_gitea(filepath, content, commit_message): | |||||||
|     return gitea_api_request("POST", endpoint, data) |     return gitea_api_request("POST", endpoint, data) | ||||||
|  |  | ||||||
| # Micropub endpoint | # Micropub endpoint | ||||||
|  | @app.route("/micropub/", methods=["POST", "GET"]) | ||||||
| @app.route("/micropub", methods=["POST", "GET"]) | @app.route("/micropub", methods=["POST", "GET"]) | ||||||
| def micropub(): | def micropub(): | ||||||
|  |     logger.debug(f"Incoming request: {request.method} {request.url}") | ||||||
|     token = request.headers.get("Authorization", "").replace("Bearer ", "") |     token = request.headers.get("Authorization", "").replace("Bearer ", "") | ||||||
|     if not token: |     if not token: | ||||||
|  |         logger.warning("Missing authorization token") | ||||||
|         return jsonify({"error": "Missing authorization token"}), 401 |         return jsonify({"error": "Missing authorization token"}), 401 | ||||||
|  |  | ||||||
|     user = validate_token(token) |     user = validate_token(token) | ||||||
|     if not user: |     if not user: | ||||||
|  |         logger.warning("Invalid token") | ||||||
|         return jsonify({"error": "Invalid token"}), 403 |         return jsonify({"error": "Invalid token"}), 403 | ||||||
|  |  | ||||||
|     if request.method == "GET": |     if request.method == "GET": | ||||||
|  |         logger.debug("Micropub discovery request") | ||||||
|         return jsonify({ |         return jsonify({ | ||||||
|             "media-endpoint": "/micropub/media", |             "media-endpoint": "/micropub/media", | ||||||
|             "configurations": {}, |             "configurations": {}, | ||||||
| @@ -80,9 +115,11 @@ def micropub(): | |||||||
|         }) |         }) | ||||||
|  |  | ||||||
|     data = request.form |     data = request.form | ||||||
|  |     logger.debug(f"Micropub POST request data: {data}") | ||||||
|     if data.get("h") == "entry": |     if data.get("h") == "entry": | ||||||
|         return create_post(data, user) |         return create_post(data, user) | ||||||
|  |  | ||||||
|  |     logger.warning("Unsupported Micropub request") | ||||||
|     return jsonify({"error": "Unsupported Micropub request"}), 400 |     return jsonify({"error": "Unsupported Micropub request"}), 400 | ||||||
|  |  | ||||||
| # Create a new post | # Create a new post | ||||||
| @@ -105,6 +142,7 @@ author: "{user.get('me')}" | |||||||
|  |  | ||||||
|     filepath = f"{CONTENT_PATH}/{slug}.md" |     filepath = f"{CONTENT_PATH}/{slug}.md" | ||||||
|     try: |     try: | ||||||
|  |         logger.debug(f"Creating post at {filepath} with content:\n{md_content}") | ||||||
|         response = upload_to_gitea(filepath, md_content, f"Create post: {title}") |         response = upload_to_gitea(filepath, md_content, f"Create post: {title}") | ||||||
|         logger.info(f"Post created: {response['content']['html_url']}") |         logger.info(f"Post created: {response['content']['html_url']}") | ||||||
|         return jsonify({"success": True, "location": response["content"]["html_url"]}), 201 |         return jsonify({"success": True, "location": response["content"]["html_url"]}), 201 | ||||||
| @@ -115,28 +153,35 @@ author: "{user.get('me')}" | |||||||
| # Media upload endpoint | # Media upload endpoint | ||||||
| @app.route("/micropub/media", methods=["POST"]) | @app.route("/micropub/media", methods=["POST"]) | ||||||
| def media_upload(): | def media_upload(): | ||||||
|  |     logger.debug(f"Incoming media upload request: {request.url}") | ||||||
|     token = request.headers.get("Authorization", "").replace("Bearer ", "") |     token = request.headers.get("Authorization", "").replace("Bearer ", "") | ||||||
|     if not token: |     if not token: | ||||||
|  |         logger.warning("Missing authorization token") | ||||||
|         return jsonify({"error": "Missing authorization token"}), 401 |         return jsonify({"error": "Missing authorization token"}), 401 | ||||||
|  |  | ||||||
|     user = validate_token(token) |     user = validate_token(token) | ||||||
|     if not user: |     if not user: | ||||||
|  |         logger.warning("Invalid token") | ||||||
|         return jsonify({"error": "Invalid token"}), 403 |         return jsonify({"error": "Invalid token"}), 403 | ||||||
|  |  | ||||||
|     if "file" not in request.files: |     if "file" not in request.files: | ||||||
|  |         logger.warning("No file provided") | ||||||
|         return jsonify({"error": "No file provided"}), 400 |         return jsonify({"error": "No file provided"}), 400 | ||||||
|  |  | ||||||
|     file = request.files["file"] |     file = request.files["file"] | ||||||
|     if file.filename == "": |     if file.filename == "": | ||||||
|  |         logger.warning("Empty filename") | ||||||
|         return jsonify({"error": "Empty filename"}), 400 |         return jsonify({"error": "Empty filename"}), 400 | ||||||
|  |  | ||||||
|     filename = secure_filename(file.filename) |     filename = secure_filename(file.filename) | ||||||
|     mimetype = mimetypes.guess_type(filename)[0] |     mimetype = mimetypes.guess_type(filename)[0] | ||||||
|  |  | ||||||
|     if mimetype not in ["image/png", "image/jpeg", "image/gif"]: |     if mimetype not in ["image/png", "image/jpeg", "image/gif"]: | ||||||
|  |         logger.warning(f"Invalid file type: {mimetype}") | ||||||
|         return jsonify({"error": "Invalid file type"}), 400 |         return jsonify({"error": "Invalid file type"}), 400 | ||||||
|  |  | ||||||
|     try: |     try: | ||||||
|  |         logger.debug(f"Uploading media file: {filename}") | ||||||
|         response = upload_to_gitea( |         response = upload_to_gitea( | ||||||
|             f"{MEDIA_DIR}/{filename}", file.read(), f"Upload media: {filename}" |             f"{MEDIA_DIR}/{filename}", file.read(), f"Upload media: {filename}" | ||||||
|         ) |         ) | ||||||
| @@ -147,6 +192,16 @@ def media_upload(): | |||||||
|         logger.error(f"Failed to upload media: {e}") |         logger.error(f"Failed to upload media: {e}") | ||||||
|         return jsonify({"error": "Failed to upload media"}), 500 |         return jsonify({"error": "Failed to upload media"}), 500 | ||||||
|  |  | ||||||
|  | # Health check endpoint | ||||||
|  | @app.route("/micropub/health", methods=["GET"]) | ||||||
|  | def health_check(): | ||||||
|  |     """ | ||||||
|  |     Simple health check endpoint to verify that the server is running. | ||||||
|  |     Returns a 200 OK response with a JSON message. | ||||||
|  |     """ | ||||||
|  |     logger.debug("Health check requested") | ||||||
|  |     return jsonify({"status": "ok", "message": "Micropub server is running"}), 200 | ||||||
|  |  | ||||||
| # Run the app | # Run the app | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|     app.run(host="0.0.0.0", port=5000) |     app.run(host="0.0.0.0", port=5000) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user