Coverage for apps/comments_views/core/utils.py: 95%
138 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-05-19 19:20 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-05-19 19:20 +0000
1from json import JSONDecodeError
2from urllib.parse import urlencode
4import requests
5from requests import RequestException
6from requests import Response
8from django.conf import settings
9from django.contrib import messages
10from django.contrib.auth import get_user_model
11from django.http import HttpRequest
12from django.urls import reverse
13from django.utils.translation import gettext_lazy as _
15from comments_api.constants import API_MESSAGE_KEY
16from comments_api.utils import parse_date
18from .app_settings import app_settings
19from .rights import AbstractUserRights
21# Default timeout (seconds) to be used when performing API requests.
22DEFAULT_TIMEOUT = 4
23DEFAULT_API_ERROR_MESSAGE = _("Error: Something went wrong. Please try again.")
26def comments_server_base_url() -> str:
27 """
28 Returns the base URL of the comment server as defined in settings.py.
29 Fallbacks to https://comments.centre-mersenne.org
30 """
31 comment_base_url = app_settings.API_BASE_URL
33 # Remove trailing slash
34 if comment_base_url[-1] == "/":
35 comment_base_url = comment_base_url[:-1]
36 return comment_base_url
39def comments_server_url(
40 query_params: dict[str, str] = {}, url_prefix="comments", item_id=None
41) -> str:
42 """
43 Returns a formatted URL to query the comment server (API).
44 The default URL corresponds to the list of all available comments.
46 Parameters:
47 - `query_params`: A dictionnary of query parameters to be appended to the URL.
48 They are used to precisely filter the comment dataset.
49 - `comment_id`: Filters on the given comment ID
50 """
51 base_url = f"{comments_server_base_url()}/api/{url_prefix}/"
52 if item_id:
53 base_url += f"{item_id}/"
54 if query_params:
55 base_url += f"?{urlencode(query_params)}"
56 return base_url
59def comments_credentials() -> tuple[str, str]:
60 """
61 Returns the current server credentials (login, password) required to authenticate
62 on the comment server.
63 """
64 credentials = app_settings.API_CREDENTIALS
65 if credentials:
66 assert isinstance(credentials, tuple) and len(credentials) == 2, (
67 "The server has malformed comments server credentials."
68 "Please provide them as `(login, password)`"
69 )
71 return credentials
73 raise AttributeError("The server is missing the comments server credentials")
76def add_api_error_message(initial_message: str, api_json_data: dict) -> str:
77 """Adds the API error description to the given string."""
78 formatted_message = initial_message
79 api_error = api_json_data.get(API_MESSAGE_KEY)
80 if api_error and isinstance(api_error, str):
81 formatted_message = f"{formatted_message}<br><br>" if formatted_message else ""
82 formatted_message += f'"{api_error}"'
84 return formatted_message
87def json_from_response(
88 response: Response, request_for_message: HttpRequest | None = None
89) -> tuple[bool, dict]:
90 """
91 Returns the JSON content of a HTTP response and whether an exception was raised
92 while doing so.
93 We consider that there's an exception if the response does not contain correctly
94 formed JSON (except HTTP 204) or if its status code >= 300.
96 Params:
97 - `response` The HTTP response to parse.
98 - `request_for_message` The current HTTP request handled by Django. Used
99 to add a message (Django) when there's an exception.
100 Don't provide it if you don't want any messages.
102 Returns:
103 - `error` Whether an exception was raised.
104 - `content` The JSON content of the HTTP response.
105 """
106 try:
107 data = response.json()
108 except JSONDecodeError:
109 # 204 No Content
110 if response.status_code == 204:
111 return (False, {})
112 if request_for_message is not None:
113 messages.error(request_for_message, DEFAULT_API_ERROR_MESSAGE)
114 return (True, {})
116 error = False
117 if response.status_code >= 300:
118 if request_for_message is not None:
119 error_message = add_api_error_message(DEFAULT_API_ERROR_MESSAGE, data)
120 messages.error(request_for_message, error_message)
121 error = True
122 return (error, data)
125def format_comment_author_name(comment: dict) -> str:
126 """
127 Formats an author name according to the comments properties.
128 """
129 author_name = comment.get("author_name", "")
131 hide_name = comment.get("hide_author_name")
132 article_author = comment.get("article_author_comment")
133 editorial_team = comment.get("editorial_team_comment")
134 extra_text = ""
136 if article_author:
137 if hide_name:
138 return _("Auteur(s) de l'article").capitalize()
139 extra_text = _("Auteur de l'article")
140 elif editorial_team:
141 extra_text = _("Équipe éditoriale")
142 if hide_name:
143 return extra_text.capitalize()
145 if not extra_text:
146 return author_name
148 return f"{author_name} ({extra_text.lower()})"
151def format_comment(
152 comment: dict, rights: AbstractUserRights | None = None, users: dict = {}
153) -> None:
154 """
155 Format a comment data for display.
156 Formatted data includes:
157 - author display name (actual name or function)
158 - Moderation data (only if `users is provided`)
159 - comment URL
161 Params:
162 - `comment` The comment data (usually a dict constructed from
163 the JSON data).
164 - `rights` The user rights used to check access over moderation data.
165 - `users` The dictionnary of users used to map moderator IDs
166 """
167 # Format author name and parent author name
168 comment["author_display_name"] = format_comment_author_name(comment)
170 parent = comment.get("parent")
171 # The comment data may contain either the parent dictionnary data or
172 # just the parent ID
173 if parent and isinstance(parent, dict):
174 comment["parent"]["author_display_name"] = format_comment_author_name(parent)
176 # Format author provider link (only ORCID right now)
177 author_provider = comment.get("author_provider")
178 author_provider_uid = comment.get("author_provider_uid")
179 if author_provider and author_provider.lower() == "orcid" and author_provider_uid:
180 comment["author_provider_link"] = f"{app_settings.ORCID_BASE_URL}{author_provider_uid}/"
182 # Parse dates for display
183 date_submitted = comment.get("date_submitted")
184 if date_submitted: 184 ↛ 187line 184 didn't jump to line 187, because the condition on line 184 was never false
185 comment["date_submitted"] = parse_date(date_submitted)
187 moderation_data = comment.get("moderation_data")
188 if moderation_data:
189 if moderation_data.get("date_moderated"): 189 ↛ 194line 189 didn't jump to line 194, because the condition on line 189 was never false
190 comment["moderation_data"]["date_moderated"] = parse_date(
191 moderation_data.get("date_moderated")
192 )
194 moderator_id = moderation_data.get("moderator")
195 if ( 195 ↛ 204line 195 didn't jump to line 204
196 moderator_id
197 and moderator_id in users
198 and rights
199 and (
200 rights.comment_can_manage_moderators(comment)
201 or getattr(rights.user, "is_superuser", False)
202 )
203 ):
204 comment["moderation_data"]["moderated_by"] = users[moderator_id]
206 # Format comment link
207 base_url = comment.get("base_url")
208 if base_url: 208 ↛ 222line 208 didn't jump to line 222, because the condition on line 208 was never false
209 # Remove potential trailing slash
210 if base_url[-1] == "/":
211 base_url = base_url[:-1]
212 article_route = reverse("article", kwargs={"aid": comment["doi"]})
213 # If a site has an URL prefix, we need to discard it from the article_route,
214 # it should already be present in base_url.
215 url_prefix = getattr(settings, "SITE_URL_PREFIX", None)
216 if url_prefix: 216 ↛ 220line 216 didn't jump to line 220, because the condition on line 216 was never false
217 url_prefix = f"/{url_prefix}"
218 if article_route.startswith(url_prefix):
219 article_route = article_route[len(url_prefix) :]
220 comment["base_url"] = f"{base_url}{article_route}"
222 if rights:
223 comment["can_moderate"] = rights.comment_can_moderate(comment)
225 # Get the moderators
226 if rights and users:
227 moderators = comment.get("moderators")
228 if moderators and ( 228 ↛ 232line 228 didn't jump to line 232, because the condition on line 228 was never true
229 rights.comment_can_manage_moderators(comment)
230 or getattr(rights.user, "is_superuser", False)
231 ):
232 comment["moderators_processed"] = {
233 moderator_id: users[moderator_id]
234 for moderator_id in moderators
235 if moderator_id in users
236 }
239def get_user_dict() -> dict[int, dict[str, str]]:
240 """
241 Return a dict of users indexed by the user's pk.
242 A value is a dict of the user's `pk`, `first_name`, `last_name` and `email`
243 """
244 users = get_user_model().objects.all().values("pk", "first_name", "last_name", "email")
245 return {u["pk"]: u for u in users}
248def api_request_wrapper(http_method: str, url: str, **kwargs) -> Response | None:
249 """
250 Light wrapper around `get`, `post`, and `put` methods of the `requests` library:
251 - Catch any `RequestException`.
252 - Add a default timeout to the request if no `timeout` was provided.
254 Params:
255 - `http_method` The HTTP method to use for the request
256 (get, post, put, head or delete).
258 Returns:
259 - `response` The `Response` object if no `RequestException` was raised,
260 else `None`.
261 """
262 if "timeout" not in kwargs: 262 ↛ 265line 262 didn't jump to line 265, because the condition on line 262 was never false
263 # (connect timeout, read timeout)
264 kwargs["timeout"] = (4, 7)
265 http_method = http_method.upper()
266 try:
267 if http_method == "GET":
268 response = requests.get(url, **kwargs)
269 elif http_method == "POST":
270 response = requests.post(url, **kwargs)
271 elif http_method == "PUT":
272 response = requests.put(url, **kwargs)
273 # Other HTTP methods are not used. They can be added above
274 # The timeout kwarg is only available for HEAD and DELETE
275 else:
276 raise ValueError(f"Wrong argument for 'http_method': '{http_method}'")
277 except RequestException:
278 response = None
280 return response
283def make_api_request(
284 http_method: str, url: str, request_for_message: HttpRequest | None = None, **kwargs
285) -> tuple[bool, dict]:
286 """
287 Performs the HTTP request to the provided URL and return whether an exception was
288 raised and the JSON content of the response. \\
289 Catched exceptions are any `RequestException` or `JSONDecodeError`. \\
290 Any additional kwargs are passed directly to the `requests` library.
292 Params:
293 - `http_method` The HTTP method to use for the request
294 (get, post, put, head or delete).
295 - `url` The URL to request.
296 - `request_for_message` The current HTTP request handled by Django. Used
297 to add a message (Django) when there's an exception.
298 Don't provide it if you don't want any messages.
300 Returns:
301 - `error` Whether an exception was raised.
302 - `content` The JSON content of the HTTP response.
303 """
304 response = api_request_wrapper(http_method, url, **kwargs)
306 if response is None:
307 if request_for_message is not None:
308 messages.error(request_for_message, DEFAULT_API_ERROR_MESSAGE)
309 return True, {}
311 return json_from_response(response, request_for_message)
314def get_comment(
315 query_params: dict[str, str],
316 id: int,
317 request_for_message: HttpRequest | None = None,
318) -> tuple[bool, dict]:
319 """
320 Make an HTTP request to retrieve the requested comment. Commodity wrapper
321 around `make_api_request`.
323 Params:
324 - `query_params` Query parameters for the API URL.
325 - `id` The comment's ID.
326 - `request_for_message` The current HTTP request handled by Django. Used
327 to add a message (Django) when there's an exception.
328 Don't provide it if you don't want any messages.
330 Returns:
331 - `error` Whether an exception was raised.
332 - `content` The JSON content of the HTTP response.
333 """
334 return make_api_request(
335 "GET",
336 comments_server_url(query_params, item_id=id),
337 request_for_message,
338 auth=comments_credentials(),
339 )