feat: Implement IndieAuth token and authorization endpoints (Phase 2)
Following design in /docs/design/micropub-architecture.md and /docs/decisions/ADR-029-micropub-v1-implementation-phases.md Token Endpoint (/auth/token): - Exchange authorization codes for access tokens - Form-encoded POST requests per IndieAuth spec - PKCE support (code_verifier validation) - OAuth 2.0 error responses - Validates client_id, redirect_uri, me parameters - Returns Bearer tokens with scope Authorization Endpoint (/auth/authorization): - GET: Display consent form (requires admin login) - POST: Process approval/denial - PKCE support (code_challenge storage) - Scope validation and filtering - Integration with session management - Proper error handling and redirects All 33 Phase 2 tests pass (17 token + 16 authorization) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -2,16 +2,18 @@
|
|||||||
Authentication routes for StarPunk
|
Authentication routes for StarPunk
|
||||||
|
|
||||||
Handles IndieLogin authentication flow including login form, OAuth callback,
|
Handles IndieLogin authentication flow including login form, OAuth callback,
|
||||||
and logout functionality.
|
logout functionality, and IndieAuth endpoints for Micropub clients.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from flask import (
|
from flask import (
|
||||||
Blueprint,
|
Blueprint,
|
||||||
current_app,
|
current_app,
|
||||||
flash,
|
flash,
|
||||||
|
jsonify,
|
||||||
redirect,
|
redirect,
|
||||||
render_template,
|
render_template,
|
||||||
request,
|
request,
|
||||||
|
session,
|
||||||
url_for,
|
url_for,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -26,6 +28,14 @@ from starpunk.auth import (
|
|||||||
verify_session,
|
verify_session,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from starpunk.tokens import (
|
||||||
|
create_access_token,
|
||||||
|
create_authorization_code,
|
||||||
|
exchange_authorization_code,
|
||||||
|
InvalidAuthorizationCodeError,
|
||||||
|
validate_scope,
|
||||||
|
)
|
||||||
|
|
||||||
# Create blueprint
|
# Create blueprint
|
||||||
bp = Blueprint("auth", __name__, url_prefix="/auth")
|
bp = Blueprint("auth", __name__, url_prefix="/auth")
|
||||||
|
|
||||||
@@ -182,3 +192,259 @@ def logout():
|
|||||||
|
|
||||||
flash("Logged out successfully", "success")
|
flash("Logged out successfully", "success")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
@bp.route("/token", methods=["POST"])
|
||||||
|
def token_endpoint():
|
||||||
|
"""
|
||||||
|
IndieAuth token endpoint for exchanging authorization codes for access tokens
|
||||||
|
|
||||||
|
Implements the IndieAuth token endpoint as specified in:
|
||||||
|
https://www.w3.org/TR/indieauth/#token-endpoint
|
||||||
|
|
||||||
|
Form parameters (application/x-www-form-urlencoded):
|
||||||
|
grant_type: Must be "authorization_code"
|
||||||
|
code: The authorization code received from authorization endpoint
|
||||||
|
client_id: Client application URL (must match authorization request)
|
||||||
|
redirect_uri: Redirect URI (must match authorization request)
|
||||||
|
me: User's profile URL (must match authorization request)
|
||||||
|
code_verifier: PKCE verifier (optional, required if PKCE was used)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
200 OK with JSON response on success:
|
||||||
|
{
|
||||||
|
"access_token": "xxx",
|
||||||
|
"token_type": "Bearer",
|
||||||
|
"scope": "create",
|
||||||
|
"me": "https://user.example"
|
||||||
|
}
|
||||||
|
|
||||||
|
400 Bad Request with JSON error response on failure:
|
||||||
|
{
|
||||||
|
"error": "invalid_grant|invalid_request|invalid_client",
|
||||||
|
"error_description": "Human-readable error description"
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
# Only accept form-encoded POST requests
|
||||||
|
if request.content_type and 'application/x-www-form-urlencoded' not in request.content_type:
|
||||||
|
return jsonify({
|
||||||
|
"error": "invalid_request",
|
||||||
|
"error_description": "Content-Type must be application/x-www-form-urlencoded"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
# Extract parameters from form data
|
||||||
|
grant_type = request.form.get('grant_type')
|
||||||
|
code = request.form.get('code')
|
||||||
|
client_id = request.form.get('client_id')
|
||||||
|
redirect_uri = request.form.get('redirect_uri')
|
||||||
|
me = request.form.get('me')
|
||||||
|
code_verifier = request.form.get('code_verifier')
|
||||||
|
|
||||||
|
# Validate required parameters
|
||||||
|
if not grant_type:
|
||||||
|
return jsonify({
|
||||||
|
"error": "invalid_request",
|
||||||
|
"error_description": "Missing grant_type parameter"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
if grant_type != 'authorization_code':
|
||||||
|
return jsonify({
|
||||||
|
"error": "unsupported_grant_type",
|
||||||
|
"error_description": f"Unsupported grant_type: {grant_type}"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
if not code:
|
||||||
|
return jsonify({
|
||||||
|
"error": "invalid_request",
|
||||||
|
"error_description": "Missing code parameter"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
if not client_id:
|
||||||
|
return jsonify({
|
||||||
|
"error": "invalid_request",
|
||||||
|
"error_description": "Missing client_id parameter"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
if not redirect_uri:
|
||||||
|
return jsonify({
|
||||||
|
"error": "invalid_request",
|
||||||
|
"error_description": "Missing redirect_uri parameter"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
if not me:
|
||||||
|
return jsonify({
|
||||||
|
"error": "invalid_request",
|
||||||
|
"error_description": "Missing me parameter"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
# Exchange authorization code for token
|
||||||
|
try:
|
||||||
|
auth_info = exchange_authorization_code(
|
||||||
|
code=code,
|
||||||
|
client_id=client_id,
|
||||||
|
redirect_uri=redirect_uri,
|
||||||
|
me=me,
|
||||||
|
code_verifier=code_verifier
|
||||||
|
)
|
||||||
|
|
||||||
|
# IndieAuth spec: MUST NOT issue token if no scope
|
||||||
|
if not auth_info['scope']:
|
||||||
|
return jsonify({
|
||||||
|
"error": "invalid_scope",
|
||||||
|
"error_description": "Authorization code was issued without scope"
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
# Create access token
|
||||||
|
access_token = create_access_token(
|
||||||
|
me=auth_info['me'],
|
||||||
|
client_id=auth_info['client_id'],
|
||||||
|
scope=auth_info['scope']
|
||||||
|
)
|
||||||
|
|
||||||
|
# Return token response
|
||||||
|
return jsonify({
|
||||||
|
"access_token": access_token,
|
||||||
|
"token_type": "Bearer",
|
||||||
|
"scope": auth_info['scope'],
|
||||||
|
"me": auth_info['me']
|
||||||
|
}), 200
|
||||||
|
|
||||||
|
except InvalidAuthorizationCodeError as e:
|
||||||
|
current_app.logger.warning(f"Invalid authorization code: {e}")
|
||||||
|
return jsonify({
|
||||||
|
"error": "invalid_grant",
|
||||||
|
"error_description": str(e)
|
||||||
|
}), 400
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
current_app.logger.error(f"Token endpoint error: {e}")
|
||||||
|
return jsonify({
|
||||||
|
"error": "server_error",
|
||||||
|
"error_description": "An unexpected error occurred"
|
||||||
|
}), 500
|
||||||
|
|
||||||
|
|
||||||
|
@bp.route("/authorization", methods=["GET", "POST"])
|
||||||
|
def authorization_endpoint():
|
||||||
|
"""
|
||||||
|
IndieAuth authorization endpoint for Micropub client authorization
|
||||||
|
|
||||||
|
Implements the IndieAuth authorization endpoint as specified in:
|
||||||
|
https://www.w3.org/TR/indieauth/#authorization-endpoint
|
||||||
|
|
||||||
|
GET: Display authorization consent form
|
||||||
|
Query parameters:
|
||||||
|
response_type: Must be "code"
|
||||||
|
client_id: Client application URL
|
||||||
|
redirect_uri: Client's callback URL
|
||||||
|
state: Client's CSRF state token
|
||||||
|
scope: Space-separated list of requested scopes (optional)
|
||||||
|
me: User's profile URL (optional)
|
||||||
|
code_challenge: PKCE challenge (optional)
|
||||||
|
code_challenge_method: PKCE method, typically "S256" (optional)
|
||||||
|
|
||||||
|
POST: Process authorization approval/denial
|
||||||
|
Form parameters:
|
||||||
|
approve: "yes" if user approved, anything else is denial
|
||||||
|
(other parameters inherited from GET via hidden form fields)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
GET: HTML authorization consent form
|
||||||
|
POST: Redirect to client's redirect_uri with code and state parameters
|
||||||
|
"""
|
||||||
|
if request.method == "GET":
|
||||||
|
# Extract IndieAuth parameters
|
||||||
|
response_type = request.args.get('response_type')
|
||||||
|
client_id = request.args.get('client_id')
|
||||||
|
redirect_uri = request.args.get('redirect_uri')
|
||||||
|
state = request.args.get('state')
|
||||||
|
scope = request.args.get('scope', '')
|
||||||
|
me_param = request.args.get('me')
|
||||||
|
code_challenge = request.args.get('code_challenge')
|
||||||
|
code_challenge_method = request.args.get('code_challenge_method')
|
||||||
|
|
||||||
|
# Validate required parameters
|
||||||
|
if not response_type:
|
||||||
|
return "Missing response_type parameter", 400
|
||||||
|
|
||||||
|
if response_type != 'code':
|
||||||
|
return f"Unsupported response_type: {response_type}", 400
|
||||||
|
|
||||||
|
if not client_id:
|
||||||
|
return "Missing client_id parameter", 400
|
||||||
|
|
||||||
|
if not redirect_uri:
|
||||||
|
return "Missing redirect_uri parameter", 400
|
||||||
|
|
||||||
|
if not state:
|
||||||
|
return "Missing state parameter", 400
|
||||||
|
|
||||||
|
# Validate and filter scope to supported scopes
|
||||||
|
validated_scope = validate_scope(scope)
|
||||||
|
|
||||||
|
# Check if user is logged in as admin
|
||||||
|
session_token = request.cookies.get("starpunk_session")
|
||||||
|
if not session_token or not verify_session(session_token):
|
||||||
|
# Store authorization request in session
|
||||||
|
session['pending_auth_url'] = request.url
|
||||||
|
flash("Please log in to authorize this application", "info")
|
||||||
|
return redirect(url_for('auth.login_form'))
|
||||||
|
|
||||||
|
# User is logged in, show authorization consent form
|
||||||
|
# Use ADMIN_ME as the user's identity
|
||||||
|
me = current_app.config.get('ADMIN_ME')
|
||||||
|
|
||||||
|
return render_template(
|
||||||
|
'auth/authorize.html',
|
||||||
|
client_id=client_id,
|
||||||
|
redirect_uri=redirect_uri,
|
||||||
|
state=state,
|
||||||
|
scope=validated_scope,
|
||||||
|
me=me,
|
||||||
|
response_type=response_type,
|
||||||
|
code_challenge=code_challenge,
|
||||||
|
code_challenge_method=code_challenge_method
|
||||||
|
)
|
||||||
|
|
||||||
|
else: # POST
|
||||||
|
# User submitted authorization form
|
||||||
|
approve = request.form.get('approve')
|
||||||
|
client_id = request.form.get('client_id')
|
||||||
|
redirect_uri = request.form.get('redirect_uri')
|
||||||
|
state = request.form.get('state')
|
||||||
|
scope = request.form.get('scope', '')
|
||||||
|
me = request.form.get('me')
|
||||||
|
code_challenge = request.form.get('code_challenge')
|
||||||
|
code_challenge_method = request.form.get('code_challenge_method')
|
||||||
|
|
||||||
|
# Check if user is still logged in
|
||||||
|
session_token = request.cookies.get("starpunk_session")
|
||||||
|
if not session_token or not verify_session(session_token):
|
||||||
|
flash("Session expired, please log in again", "error")
|
||||||
|
return redirect(url_for('auth.login_form'))
|
||||||
|
|
||||||
|
# If user denied, redirect with error
|
||||||
|
if approve != 'yes':
|
||||||
|
error_redirect = f"{redirect_uri}?error=access_denied&error_description=User+denied+authorization&state={state}"
|
||||||
|
return redirect(error_redirect)
|
||||||
|
|
||||||
|
# User approved, generate authorization code
|
||||||
|
try:
|
||||||
|
auth_code = create_authorization_code(
|
||||||
|
me=me,
|
||||||
|
client_id=client_id,
|
||||||
|
redirect_uri=redirect_uri,
|
||||||
|
scope=scope,
|
||||||
|
state=state,
|
||||||
|
code_challenge=code_challenge,
|
||||||
|
code_challenge_method=code_challenge_method
|
||||||
|
)
|
||||||
|
|
||||||
|
# Redirect back to client with authorization code
|
||||||
|
callback_url = f"{redirect_uri}?code={auth_code}&state={state}"
|
||||||
|
return redirect(callback_url)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
current_app.logger.error(f"Authorization endpoint error: {e}")
|
||||||
|
error_redirect = f"{redirect_uri}?error=server_error&error_description=Failed+to+generate+authorization+code&state={state}"
|
||||||
|
return redirect(error_redirect)
|
||||||
|
|||||||
81
templates/auth/authorize.html
Normal file
81
templates/auth/authorize.html
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
{% extends "base.html" %}
|
||||||
|
|
||||||
|
{% block title %}Authorize Application - StarPunk{% endblock %}
|
||||||
|
|
||||||
|
{% block content %}
|
||||||
|
<div class="authorization-container">
|
||||||
|
<h2>Authorization Request</h2>
|
||||||
|
|
||||||
|
<div class="authorization-info">
|
||||||
|
<p class="auth-intro">
|
||||||
|
An application is requesting access to your StarPunk site.
|
||||||
|
</p>
|
||||||
|
|
||||||
|
<div class="client-info">
|
||||||
|
<h3>Application Details</h3>
|
||||||
|
<dl>
|
||||||
|
<dt>Client:</dt>
|
||||||
|
<dd><code>{{ client_id }}</code></dd>
|
||||||
|
|
||||||
|
<dt>Your Identity:</dt>
|
||||||
|
<dd><code>{{ me }}</code></dd>
|
||||||
|
|
||||||
|
{% if scope %}
|
||||||
|
<dt>Requested Permissions:</dt>
|
||||||
|
<dd>
|
||||||
|
<ul class="scope-list">
|
||||||
|
{% for s in scope.split() %}
|
||||||
|
<li><strong>{{ s }}</strong> - {% if s == 'create' %}Create new posts{% endif %}</li>
|
||||||
|
{% endfor %}
|
||||||
|
</ul>
|
||||||
|
</dd>
|
||||||
|
{% else %}
|
||||||
|
<dt>Requested Permissions:</dt>
|
||||||
|
<dd><em>No permissions requested (read-only access)</em></dd>
|
||||||
|
{% endif %}
|
||||||
|
</dl>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<div class="authorization-warning">
|
||||||
|
<p><strong>Warning:</strong> Only authorize applications you trust.</p>
|
||||||
|
<p>This application will be able to perform the above actions on your behalf.</p>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
<form action="{{ url_for('auth.authorization_endpoint') }}" method="POST" class="authorization-form">
|
||||||
|
<!-- Pass through all parameters as hidden fields -->
|
||||||
|
<input type="hidden" name="client_id" value="{{ client_id }}">
|
||||||
|
<input type="hidden" name="redirect_uri" value="{{ redirect_uri }}">
|
||||||
|
<input type="hidden" name="state" value="{{ state }}">
|
||||||
|
<input type="hidden" name="scope" value="{{ scope }}">
|
||||||
|
<input type="hidden" name="me" value="{{ me }}">
|
||||||
|
<input type="hidden" name="response_type" value="{{ response_type }}">
|
||||||
|
{% if code_challenge %}
|
||||||
|
<input type="hidden" name="code_challenge" value="{{ code_challenge }}">
|
||||||
|
<input type="hidden" name="code_challenge_method" value="{{ code_challenge_method }}">
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
|
<div class="authorization-actions">
|
||||||
|
<button type="submit" name="approve" value="yes" class="button button-primary">
|
||||||
|
Authorize
|
||||||
|
</button>
|
||||||
|
<button type="submit" name="approve" value="no" class="button button-secondary">
|
||||||
|
Deny
|
||||||
|
</button>
|
||||||
|
</div>
|
||||||
|
</form>
|
||||||
|
|
||||||
|
<div class="authorization-help">
|
||||||
|
<h3>What does this mean?</h3>
|
||||||
|
<p>
|
||||||
|
By clicking "Authorize", you allow this application to access your StarPunk site
|
||||||
|
with the permissions listed above. You can revoke access at any time from your
|
||||||
|
admin dashboard.
|
||||||
|
</p>
|
||||||
|
<p>
|
||||||
|
If you don't recognize this application or didn't intend to authorize it,
|
||||||
|
click "Deny" to reject the request.
|
||||||
|
</p>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
{% endblock %}
|
||||||
361
tests/test_routes_authorization.py
Normal file
361
tests/test_routes_authorization.py
Normal file
@@ -0,0 +1,361 @@
|
|||||||
|
"""
|
||||||
|
Tests for authorization endpoint route
|
||||||
|
|
||||||
|
Tests the /auth/authorization endpoint for IndieAuth client authorization.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from starpunk.auth import create_session
|
||||||
|
from urllib.parse import urlparse, parse_qs
|
||||||
|
|
||||||
|
|
||||||
|
def create_admin_session(client, app):
|
||||||
|
"""Helper to create an authenticated admin session"""
|
||||||
|
with app.test_request_context():
|
||||||
|
admin_me = app.config.get('ADMIN_ME', 'https://test.example.com')
|
||||||
|
session_token = create_session(admin_me)
|
||||||
|
client.set_cookie('starpunk_session', session_token)
|
||||||
|
return session_token
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_get_not_logged_in(client, app):
|
||||||
|
"""Test authorization endpoint redirects to login when not authenticated"""
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create'
|
||||||
|
})
|
||||||
|
|
||||||
|
# Should redirect to login
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert '/auth/login' in response.location
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_get_logged_in(client, app):
|
||||||
|
"""Test authorization endpoint shows consent form when authenticated"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert b'Authorization Request' in response.data
|
||||||
|
assert b'https://client.example' in response.data
|
||||||
|
assert b'create' in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_missing_response_type(client, app):
|
||||||
|
"""Test authorization endpoint rejects missing response_type"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert b'Missing response_type' in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_invalid_response_type(client, app):
|
||||||
|
"""Test authorization endpoint rejects unsupported response_type"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'token', # Only 'code' is supported
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert b'Unsupported response_type' in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_missing_client_id(client, app):
|
||||||
|
"""Test authorization endpoint rejects missing client_id"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert b'Missing client_id' in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_missing_redirect_uri(client, app):
|
||||||
|
"""Test authorization endpoint rejects missing redirect_uri"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'state': 'random_state_123'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert b'Missing redirect_uri' in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_missing_state(client, app):
|
||||||
|
"""Test authorization endpoint rejects missing state"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert b'Missing state' in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_empty_scope(client, app):
|
||||||
|
"""Test authorization endpoint allows empty scope"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': '' # Empty scope allowed per IndieAuth spec
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert b'Authorization Request' in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_filters_unsupported_scopes(client, app):
|
||||||
|
"""Test authorization endpoint filters to supported scopes only"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create update delete' # Only 'create' is supported in V1
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should only show 'create' scope
|
||||||
|
assert b'create' in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_post_approve(client, app):
|
||||||
|
"""Test authorization approval generates code and redirects"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
admin_me = app.config.get('ADMIN_ME', 'https://test.example.com')
|
||||||
|
|
||||||
|
response = client.post('/auth/authorization', data={
|
||||||
|
'approve': 'yes',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create',
|
||||||
|
'me': admin_me,
|
||||||
|
'response_type': 'code'
|
||||||
|
})
|
||||||
|
|
||||||
|
# Should redirect to client's redirect_uri
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert response.location.startswith('https://client.example/callback')
|
||||||
|
|
||||||
|
# Parse redirect URL
|
||||||
|
parsed = urlparse(response.location)
|
||||||
|
params = parse_qs(parsed.query)
|
||||||
|
|
||||||
|
# Should include code and state
|
||||||
|
assert 'code' in params
|
||||||
|
assert 'state' in params
|
||||||
|
assert params['state'][0] == 'random_state_123'
|
||||||
|
assert len(params['code'][0]) > 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_post_deny(client, app):
|
||||||
|
"""Test authorization denial redirects with error"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
admin_me = app.config.get('ADMIN_ME', 'https://test.example.com')
|
||||||
|
|
||||||
|
response = client.post('/auth/authorization', data={
|
||||||
|
'approve': 'no',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create',
|
||||||
|
'me': admin_me,
|
||||||
|
'response_type': 'code'
|
||||||
|
})
|
||||||
|
|
||||||
|
# Should redirect to client's redirect_uri with error
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert response.location.startswith('https://client.example/callback')
|
||||||
|
|
||||||
|
# Parse redirect URL
|
||||||
|
parsed = urlparse(response.location)
|
||||||
|
params = parse_qs(parsed.query)
|
||||||
|
|
||||||
|
# Should include error
|
||||||
|
assert 'error' in params
|
||||||
|
assert params['error'][0] == 'access_denied'
|
||||||
|
assert 'state' in params
|
||||||
|
assert params['state'][0] == 'random_state_123'
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_post_not_logged_in(client, app):
|
||||||
|
"""Test authorization POST requires authentication"""
|
||||||
|
with app.app_context():
|
||||||
|
admin_me = app.config.get('ADMIN_ME', 'https://test.example.com')
|
||||||
|
|
||||||
|
response = client.post('/auth/authorization', data={
|
||||||
|
'approve': 'yes',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create',
|
||||||
|
'me': admin_me,
|
||||||
|
'response_type': 'code'
|
||||||
|
})
|
||||||
|
|
||||||
|
# Should redirect to login
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert '/auth/login' in response.location
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_with_pkce(client, app):
|
||||||
|
"""Test authorization endpoint accepts PKCE parameters"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create',
|
||||||
|
'code_challenge': 'E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM',
|
||||||
|
'code_challenge_method': 'S256'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert b'Authorization Request' in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_post_with_pkce(client, app):
|
||||||
|
"""Test authorization approval preserves PKCE parameters"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
admin_me = app.config.get('ADMIN_ME', 'https://test.example.com')
|
||||||
|
|
||||||
|
response = client.post('/auth/authorization', data={
|
||||||
|
'approve': 'yes',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create',
|
||||||
|
'me': admin_me,
|
||||||
|
'response_type': 'code',
|
||||||
|
'code_challenge': 'E9Melhoa2OwvFrEMTJguCHaoeK1t8URWbuGJSstw-cM',
|
||||||
|
'code_challenge_method': 'S256'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 302
|
||||||
|
assert response.location.startswith('https://client.example/callback')
|
||||||
|
|
||||||
|
# Parse redirect URL
|
||||||
|
parsed = urlparse(response.location)
|
||||||
|
params = parse_qs(parsed.query)
|
||||||
|
|
||||||
|
# Should have code and state
|
||||||
|
assert 'code' in params
|
||||||
|
assert 'state' in params
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_endpoint_preserves_me_parameter(client, app):
|
||||||
|
"""Test authorization endpoint uses ADMIN_ME as identity"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
admin_me = app.config.get('ADMIN_ME', 'https://test.example.com')
|
||||||
|
|
||||||
|
response = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
# Should show admin's identity in the form
|
||||||
|
assert admin_me.encode() in response.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorization_flow_end_to_end(client, app):
|
||||||
|
"""Test complete authorization flow from consent to token exchange"""
|
||||||
|
create_admin_session(client, app)
|
||||||
|
|
||||||
|
with app.app_context():
|
||||||
|
admin_me = app.config.get('ADMIN_ME', 'https://test.example.com')
|
||||||
|
|
||||||
|
# Step 1: Get authorization form
|
||||||
|
response1 = client.get('/auth/authorization', query_string={
|
||||||
|
'response_type': 'code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response1.status_code == 200
|
||||||
|
|
||||||
|
# Step 2: Approve authorization
|
||||||
|
response2 = client.post('/auth/authorization', data={
|
||||||
|
'approve': 'yes',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'state': 'random_state_123',
|
||||||
|
'scope': 'create',
|
||||||
|
'me': admin_me,
|
||||||
|
'response_type': 'code'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response2.status_code == 302
|
||||||
|
|
||||||
|
# Extract authorization code
|
||||||
|
parsed = urlparse(response2.location)
|
||||||
|
params = parse_qs(parsed.query)
|
||||||
|
code = params['code'][0]
|
||||||
|
|
||||||
|
# Step 3: Exchange code for token
|
||||||
|
response3 = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': admin_me
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response3.status_code == 200
|
||||||
|
token_data = response3.get_json()
|
||||||
|
assert 'access_token' in token_data
|
||||||
|
assert token_data['token_type'] == 'Bearer'
|
||||||
|
assert token_data['scope'] == 'create'
|
||||||
|
assert token_data['me'] == admin_me
|
||||||
394
tests/test_routes_token.py
Normal file
394
tests/test_routes_token.py
Normal file
@@ -0,0 +1,394 @@
|
|||||||
|
"""
|
||||||
|
Tests for token endpoint route
|
||||||
|
|
||||||
|
Tests the /auth/token endpoint for IndieAuth token exchange.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from starpunk.tokens import create_authorization_code
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_success(client, app):
|
||||||
|
"""Test successful token exchange"""
|
||||||
|
with app.app_context():
|
||||||
|
# Create authorization code
|
||||||
|
code = create_authorization_code(
|
||||||
|
me="https://user.example",
|
||||||
|
client_id="https://client.example",
|
||||||
|
redirect_uri="https://client.example/callback",
|
||||||
|
scope="create"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Exchange for token
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.get_json()
|
||||||
|
assert 'access_token' in data
|
||||||
|
assert data['token_type'] == 'Bearer'
|
||||||
|
assert data['scope'] == 'create'
|
||||||
|
assert data['me'] == 'https://user.example'
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_with_pkce(client, app):
|
||||||
|
"""Test token exchange with PKCE"""
|
||||||
|
with app.app_context():
|
||||||
|
# Generate PKCE verifier and challenge
|
||||||
|
code_verifier = "test_verifier_with_sufficient_entropy_12345"
|
||||||
|
code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
|
||||||
|
|
||||||
|
# Create authorization code with PKCE
|
||||||
|
code = create_authorization_code(
|
||||||
|
me="https://user.example",
|
||||||
|
client_id="https://client.example",
|
||||||
|
redirect_uri="https://client.example/callback",
|
||||||
|
scope="create",
|
||||||
|
code_challenge=code_challenge,
|
||||||
|
code_challenge_method="S256"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Exchange with correct verifier
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example',
|
||||||
|
'code_verifier': code_verifier
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 200
|
||||||
|
data = response.get_json()
|
||||||
|
assert 'access_token' in data
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_missing_grant_type(client, app):
|
||||||
|
"""Test token endpoint rejects missing grant_type"""
|
||||||
|
with app.app_context():
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'code': 'some_code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_request'
|
||||||
|
assert 'grant_type' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_invalid_grant_type(client, app):
|
||||||
|
"""Test token endpoint rejects invalid grant_type"""
|
||||||
|
with app.app_context():
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'password',
|
||||||
|
'code': 'some_code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'unsupported_grant_type'
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_missing_code(client, app):
|
||||||
|
"""Test token endpoint rejects missing code"""
|
||||||
|
with app.app_context():
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_request'
|
||||||
|
assert 'code' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_missing_client_id(client, app):
|
||||||
|
"""Test token endpoint rejects missing client_id"""
|
||||||
|
with app.app_context():
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': 'some_code',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_request'
|
||||||
|
assert 'client_id' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_missing_redirect_uri(client, app):
|
||||||
|
"""Test token endpoint rejects missing redirect_uri"""
|
||||||
|
with app.app_context():
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': 'some_code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_request'
|
||||||
|
assert 'redirect_uri' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_missing_me(client, app):
|
||||||
|
"""Test token endpoint rejects missing me parameter"""
|
||||||
|
with app.app_context():
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': 'some_code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_request'
|
||||||
|
assert 'me' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_invalid_code(client, app):
|
||||||
|
"""Test token endpoint rejects invalid authorization code"""
|
||||||
|
with app.app_context():
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': 'invalid_code_12345',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_grant'
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_code_replay(client, app):
|
||||||
|
"""Test token endpoint prevents code replay attacks"""
|
||||||
|
with app.app_context():
|
||||||
|
# Create authorization code
|
||||||
|
code = create_authorization_code(
|
||||||
|
me="https://user.example",
|
||||||
|
client_id="https://client.example",
|
||||||
|
redirect_uri="https://client.example/callback",
|
||||||
|
scope="create"
|
||||||
|
)
|
||||||
|
|
||||||
|
# First exchange succeeds
|
||||||
|
response1 = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response1.status_code == 200
|
||||||
|
|
||||||
|
# Second exchange fails (replay attack)
|
||||||
|
response2 = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response2.status_code == 400
|
||||||
|
data = response2.get_json()
|
||||||
|
assert data['error'] == 'invalid_grant'
|
||||||
|
assert 'already been used' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_client_id_mismatch(client, app):
|
||||||
|
"""Test token endpoint rejects mismatched client_id"""
|
||||||
|
with app.app_context():
|
||||||
|
code = create_authorization_code(
|
||||||
|
me="https://user.example",
|
||||||
|
client_id="https://client.example",
|
||||||
|
redirect_uri="https://client.example/callback",
|
||||||
|
scope="create"
|
||||||
|
)
|
||||||
|
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://different-client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_grant'
|
||||||
|
assert 'client_id' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_redirect_uri_mismatch(client, app):
|
||||||
|
"""Test token endpoint rejects mismatched redirect_uri"""
|
||||||
|
with app.app_context():
|
||||||
|
code = create_authorization_code(
|
||||||
|
me="https://user.example",
|
||||||
|
client_id="https://client.example",
|
||||||
|
redirect_uri="https://client.example/callback",
|
||||||
|
scope="create"
|
||||||
|
)
|
||||||
|
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/different-callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_grant'
|
||||||
|
assert 'redirect_uri' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_me_mismatch(client, app):
|
||||||
|
"""Test token endpoint rejects mismatched me parameter"""
|
||||||
|
with app.app_context():
|
||||||
|
code = create_authorization_code(
|
||||||
|
me="https://user.example",
|
||||||
|
client_id="https://client.example",
|
||||||
|
redirect_uri="https://client.example/callback",
|
||||||
|
scope="create"
|
||||||
|
)
|
||||||
|
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://different-user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_grant'
|
||||||
|
assert 'me parameter' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_empty_scope(client, app):
|
||||||
|
"""Test token endpoint rejects authorization code with empty scope"""
|
||||||
|
with app.app_context():
|
||||||
|
# Create authorization code with empty scope
|
||||||
|
code = create_authorization_code(
|
||||||
|
me="https://user.example",
|
||||||
|
client_id="https://client.example",
|
||||||
|
redirect_uri="https://client.example/callback",
|
||||||
|
scope="" # Empty scope
|
||||||
|
)
|
||||||
|
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
# IndieAuth spec: MUST NOT issue token if no scope
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_scope'
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_wrong_content_type(client, app):
|
||||||
|
"""Test token endpoint rejects non-form-encoded requests"""
|
||||||
|
with app.app_context():
|
||||||
|
response = client.post('/auth/token',
|
||||||
|
json={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': 'some_code',
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_request'
|
||||||
|
assert 'Content-Type' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_pkce_missing_verifier(client, app):
|
||||||
|
"""Test token endpoint rejects PKCE exchange without verifier"""
|
||||||
|
with app.app_context():
|
||||||
|
# Create authorization code with PKCE
|
||||||
|
code = create_authorization_code(
|
||||||
|
me="https://user.example",
|
||||||
|
client_id="https://client.example",
|
||||||
|
redirect_uri="https://client.example/callback",
|
||||||
|
scope="create",
|
||||||
|
code_challenge="some_challenge",
|
||||||
|
code_challenge_method="S256"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Exchange without verifier
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example'
|
||||||
|
# Missing code_verifier
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_grant'
|
||||||
|
assert 'code_verifier' in data['error_description']
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_endpoint_pkce_wrong_verifier(client, app):
|
||||||
|
"""Test token endpoint rejects PKCE exchange with wrong verifier"""
|
||||||
|
with app.app_context():
|
||||||
|
code_verifier = "correct_verifier"
|
||||||
|
code_challenge = hashlib.sha256(code_verifier.encode()).hexdigest()
|
||||||
|
|
||||||
|
# Create authorization code with PKCE
|
||||||
|
code = create_authorization_code(
|
||||||
|
me="https://user.example",
|
||||||
|
client_id="https://client.example",
|
||||||
|
redirect_uri="https://client.example/callback",
|
||||||
|
scope="create",
|
||||||
|
code_challenge=code_challenge,
|
||||||
|
code_challenge_method="S256"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Exchange with wrong verifier
|
||||||
|
response = client.post('/auth/token', data={
|
||||||
|
'grant_type': 'authorization_code',
|
||||||
|
'code': code,
|
||||||
|
'client_id': 'https://client.example',
|
||||||
|
'redirect_uri': 'https://client.example/callback',
|
||||||
|
'me': 'https://user.example',
|
||||||
|
'code_verifier': 'wrong_verifier'
|
||||||
|
}, content_type='application/x-www-form-urlencoded')
|
||||||
|
|
||||||
|
assert response.status_code == 400
|
||||||
|
data = response.get_json()
|
||||||
|
assert data['error'] == 'invalid_grant'
|
||||||
|
assert 'code_verifier' in data['error_description']
|
||||||
Reference in New Issue
Block a user