Compare commits

..

No commits in common. "faaf75d156b4edbc0c5902581e5721d7e77a045d" and "0f47b8abd61527665d44e8f4ccbfc63933d27ffa" have entirely different histories.

3 changed files with 6 additions and 77 deletions

2
.gitignore vendored
View File

@ -86,7 +86,7 @@ ipython_config.py
# pyenv # pyenv
# For a library or package, you might want to ignore these files since the code is # For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in: # intended to run in multiple environments; otherwise, check them in:
.python-version # .python-version
# pipenv # pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. # According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.

View File

@ -7,7 +7,6 @@ from flask import Flask, request, jsonify
from werkzeug.utils import secure_filename from werkzeug.utils import secure_filename
from dotenv import load_dotenv from dotenv import load_dotenv
import requests import requests
from urllib.parse import unquote
# Load environment variables # Load environment variables
load_dotenv() load_dotenv()
@ -21,14 +20,12 @@ CONTENT_PATH = os.environ.get("CONTENT_PATH", "content")
MEDIA_DIR = os.environ.get("MEDIA_DIR", "static/images") MEDIA_DIR = os.environ.get("MEDIA_DIR", "static/images")
BRANCH = os.environ.get("BRANCH", "main") BRANCH = os.environ.get("BRANCH", "main")
TOKEN_ENDPOINT = os.environ.get("TOKEN_ENDPOINT", "https://tokens.indieauth.com/token") TOKEN_ENDPOINT = os.environ.get("TOKEN_ENDPOINT", "https://tokens.indieauth.com/token")
DOMAIN = os.environ.get("DOMAIN", "https://thesatelliteoflove.com/")
REQUIRED_SCOPES = {"create", "update", "media"}
# Initialize Flask app # Initialize Flask app
app = Flask(__name__) app = Flask(__name__)
# Logging configuration # Logging configuration
logging.basicConfig(level=logging.DEBUG) # Set to INFO for production logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Helper: Call Gitea API # Helper: Call Gitea API
@ -36,7 +33,6 @@ def gitea_api_request(method, endpoint, data=None):
url = f"{GITEA_API_URL}{endpoint}" url = f"{GITEA_API_URL}{endpoint}"
headers = {"Authorization": f"token {GITEA_TOKEN}"} headers = {"Authorization": f"token {GITEA_TOKEN}"}
try: try:
logger.debug(f"Calling Gitea API: {method} {url}, Data: {data}")
response = requests.request(method, url, headers=headers, json=data) response = requests.request(method, url, headers=headers, json=data)
response.raise_for_status() response.raise_for_status()
return response.json() return response.json()
@ -47,41 +43,15 @@ def gitea_api_request(method, endpoint, data=None):
# Helper: Validate IndieAuth token # Helper: Validate IndieAuth token
def validate_token(token): def validate_token(token):
try: try:
global DOMAIN, REQUIRED_SCOPES # Ensure global variables are accessible response = requests.post(TOKEN_ENDPOINT, data={"token": token})
headers = {"Authorization": f"Bearer {token}"} response.raise_for_status()
response = requests.get("https://tokens.indieauth.com/token", headers=headers) return response.json() # Should contain "me" (authenticated user's URL)
if response.status_code != 200: except requests.exceptions.RequestException as e:
logger.error(f"Token validation failed with status {response.status_code}")
return None
# Parse the x-www-form-urlencoded response
token_data = dict(item.split("=") for item in response.text.split("&"))
logger.debug(f"Raw token data from IndieAuth: {token_data}")
# Decode URL-encoded values in the response
for key, value in token_data.items():
token_data[key] = unquote(value)
logger.debug(f"Decoded token data: {token_data}")
# Validate 'me' field matches DOMAIN
if token_data.get("me") != DOMAIN:
raise ValueError(f"Token 'me' claim ({token_data.get('me')}) does not match the expected domain ({DOMAIN})")
# Validate required scopes
scopes = token_data.get("scope", "").split("+") # Split by '+' instead of space
if not REQUIRED_SCOPES.issubset(scopes):
raise ValueError(f"Token does not include the required scopes: {REQUIRED_SCOPES}. Found scopes: {scopes}")
return token_data
except Exception as e:
logger.error(f"Token validation failed: {e}") logger.error(f"Token validation failed: {e}")
return None return None
# Upload content to Gitea # Upload content to Gitea
def upload_to_gitea(filepath, content, commit_message): def upload_to_gitea(filepath, content, commit_message):
logger.debug(f"Uploading to Gitea: {filepath}, Commit: {commit_message}")
encoded_content = base64.b64encode(content.encode() if isinstance(content, str) else content).decode() encoded_content = base64.b64encode(content.encode() if isinstance(content, str) else content).decode()
endpoint = f"/repos/{REPO_OWNER}/{REPO_NAME}/contents/{filepath}" endpoint = f"/repos/{REPO_OWNER}/{REPO_NAME}/contents/{filepath}"
data = { data = {
@ -92,22 +62,17 @@ def upload_to_gitea(filepath, content, commit_message):
return gitea_api_request("POST", endpoint, data) return gitea_api_request("POST", endpoint, data)
# Micropub endpoint # Micropub endpoint
@app.route("/micropub/", methods=["POST", "GET"])
@app.route("/micropub", methods=["POST", "GET"]) @app.route("/micropub", methods=["POST", "GET"])
def micropub(): def micropub():
logger.debug(f"Incoming request: {request.method} {request.url}")
token = request.headers.get("Authorization", "").replace("Bearer ", "") token = request.headers.get("Authorization", "").replace("Bearer ", "")
if not token: if not token:
logger.warning("Missing authorization token")
return jsonify({"error": "Missing authorization token"}), 401 return jsonify({"error": "Missing authorization token"}), 401
user = validate_token(token) user = validate_token(token)
if not user: if not user:
logger.warning("Invalid token")
return jsonify({"error": "Invalid token"}), 403 return jsonify({"error": "Invalid token"}), 403
if request.method == "GET": if request.method == "GET":
logger.debug("Micropub discovery request")
return jsonify({ return jsonify({
"media-endpoint": "/micropub/media", "media-endpoint": "/micropub/media",
"configurations": {}, "configurations": {},
@ -115,11 +80,9 @@ def micropub():
}) })
data = request.form data = request.form
logger.debug(f"Micropub POST request data: {data}")
if data.get("h") == "entry": if data.get("h") == "entry":
return create_post(data, user) return create_post(data, user)
logger.warning("Unsupported Micropub request")
return jsonify({"error": "Unsupported Micropub request"}), 400 return jsonify({"error": "Unsupported Micropub request"}), 400
# Create a new post # Create a new post
@ -142,7 +105,6 @@ author: "{user.get('me')}"
filepath = f"{CONTENT_PATH}/{slug}.md" filepath = f"{CONTENT_PATH}/{slug}.md"
try: try:
logger.debug(f"Creating post at {filepath} with content:\n{md_content}")
response = upload_to_gitea(filepath, md_content, f"Create post: {title}") response = upload_to_gitea(filepath, md_content, f"Create post: {title}")
logger.info(f"Post created: {response['content']['html_url']}") logger.info(f"Post created: {response['content']['html_url']}")
return jsonify({"success": True, "location": response["content"]["html_url"]}), 201 return jsonify({"success": True, "location": response["content"]["html_url"]}), 201
@ -153,35 +115,28 @@ author: "{user.get('me')}"
# Media upload endpoint # Media upload endpoint
@app.route("/micropub/media", methods=["POST"]) @app.route("/micropub/media", methods=["POST"])
def media_upload(): def media_upload():
logger.debug(f"Incoming media upload request: {request.url}")
token = request.headers.get("Authorization", "").replace("Bearer ", "") token = request.headers.get("Authorization", "").replace("Bearer ", "")
if not token: if not token:
logger.warning("Missing authorization token")
return jsonify({"error": "Missing authorization token"}), 401 return jsonify({"error": "Missing authorization token"}), 401
user = validate_token(token) user = validate_token(token)
if not user: if not user:
logger.warning("Invalid token")
return jsonify({"error": "Invalid token"}), 403 return jsonify({"error": "Invalid token"}), 403
if "file" not in request.files: if "file" not in request.files:
logger.warning("No file provided")
return jsonify({"error": "No file provided"}), 400 return jsonify({"error": "No file provided"}), 400
file = request.files["file"] file = request.files["file"]
if file.filename == "": if file.filename == "":
logger.warning("Empty filename")
return jsonify({"error": "Empty filename"}), 400 return jsonify({"error": "Empty filename"}), 400
filename = secure_filename(file.filename) filename = secure_filename(file.filename)
mimetype = mimetypes.guess_type(filename)[0] mimetype = mimetypes.guess_type(filename)[0]
if mimetype not in ["image/png", "image/jpeg", "image/gif"]: if mimetype not in ["image/png", "image/jpeg", "image/gif"]:
logger.warning(f"Invalid file type: {mimetype}")
return jsonify({"error": "Invalid file type"}), 400 return jsonify({"error": "Invalid file type"}), 400
try: try:
logger.debug(f"Uploading media file: {filename}")
response = upload_to_gitea( response = upload_to_gitea(
f"{MEDIA_DIR}/{filename}", file.read(), f"Upload media: {filename}" f"{MEDIA_DIR}/{filename}", file.read(), f"Upload media: {filename}"
) )
@ -192,16 +147,6 @@ def media_upload():
logger.error(f"Failed to upload media: {e}") logger.error(f"Failed to upload media: {e}")
return jsonify({"error": "Failed to upload media"}), 500 return jsonify({"error": "Failed to upload media"}), 500
# Health check endpoint
@app.route("/micropub/health", methods=["GET"])
def health_check():
"""
Simple health check endpoint to verify that the server is running.
Returns a 200 OK response with a JSON message.
"""
logger.debug("Health check requested")
return jsonify({"status": "ok", "message": "Micropub server is running"}), 200
# Run the app # Run the app
if __name__ == "__main__": if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000) app.run(host="0.0.0.0", port=5000)

View File

@ -1,16 +0,0 @@
import jwt
import sys
if len(sys.argv) != 2:
print("Usage: python decode_jwt.py <JWT_TOKEN>")
sys.exit(1)
token = sys.argv[1]
try:
# Decode the token without verifying the signature
decoded = jwt.decode(token, options={"verify_signature": False})
print("Decoded JWT:")
print(decoded)
except jwt.DecodeError as e:
print(f"Failed to decode JWT: {e}")