Coverage for sites/ptf_tools/comments_moderation/views.py: 92%

267 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-05-19 19:20 +0000

1from typing import Any 

2 

3from django.contrib import messages 

4from django.contrib.auth.mixins import LoginRequiredMixin 

5from django.contrib.auth.models import User 

6from django.http import Http404 

7from django.http import HttpRequest 

8from django.http import HttpResponseRedirect 

9from django.urls import reverse 

10from django.views.generic import TemplateView 

11from django.views.generic import View 

12 

13from comments_api.constants import PARAM_ACTION_CREATE 

14from comments_api.constants import PARAM_ACTION_DELETE 

15from comments_api.constants import STATUS_SUBMITTED 

16from comments_views.core.views import CommentChangeStatusView as BaseCommentChangeStatusView 

17from comments_views.core.views import ( 

18 CommentDashboardDetailsView as BaseCommentDashboardDetailsView, 

19) 

20from comments_views.core.views import CommentDashboardListView as BaseCommentDashboardListView 

21from ptf_tools.models import Invitation 

22from ptf_tools.models import InvitationExtraData 

23from ptf_tools.models import InviteCollectionData 

24from ptf_tools.models import InviteCommentData 

25 

26from .forms import AddBaseModeratorForm 

27from .forms import InviteBaseModeratorForm 

28from .forms import InviteStaffModeratorForm 

29from .forms import RemoveBaseModeratorForm 

30from .forms import StaffModeratorForm 

31from .mixins import ForceLanguageMixin 

32from .mixins import ModeratorCommentRightsMixin 

33from .utils import email_moderator_assigned 

34from .utils import get_all_moderators 

35from .utils import get_comment_and_can_manage_moderators 

36from .utils import get_comment_pending_invitations 

37from .utils import get_pending_invitations 

38from .utils import is_comment_moderator 

39from .utils import update_moderation_right 

40from .utils import update_moderator_collections 

41 

42 

43# Maybe create a custom "LoginRequiredMixin" where we additionally check that the user 

44# has some moderator rights. Otherwise restrict access. 

45class CommentDashboardListView( 

46 LoginRequiredMixin, 

47 ModeratorCommentRightsMixin, 

48 ForceLanguageMixin, 

49 BaseCommentDashboardListView, 

50): 

51 pass 

52 

53 

54class CommentDashboardDetailsView( 

55 LoginRequiredMixin, 

56 ModeratorCommentRightsMixin, 

57 ForceLanguageMixin, 

58 BaseCommentDashboardDetailsView, 

59): 

60 """Overloads the default view with additional moderator data.""" 

61 

62 def get_context_data(self, **kwargs: Any) -> dict[str, Any]: 

63 context = super().get_context_data(**kwargs) 

64 if "comment" not in context: 64 ↛ 65line 64 didn't jump to line 65, because the condition on line 64 was never true

65 return context 

66 

67 comment = context["comment"] 

68 rights = self.get_rights(self.request.user) 

69 # Add the moderator data if required 

70 if rights.comment_can_manage_moderators(comment) and comment["status"] == STATUS_SUBMITTED: 

71 # Add moderator form 

72 context["form_moderators"] = True 

73 

74 all_moderators, _ = get_all_moderators(rights, self.request) 

75 context["all_moderators"] = all_moderators 

76 

77 # Populate modal forms 

78 initial_data = {"comment_id": int(kwargs["pk"])} 

79 context["moderator_add_form"] = AddBaseModeratorForm(initial=initial_data) 

80 context["moderator_add_form_url"] = reverse("comment_moderator_add") 

81 

82 context["moderator_invite_form"] = InviteBaseModeratorForm(initial=initial_data) 

83 context["moderator_invite_form_url"] = reverse("comment_moderator_invite_base") 

84 

85 # Remove moderator form 

86 moderators = comment.get("moderators_processed") 

87 if moderators: 

88 moderators_choice = tuple([(col, col) for col in moderators.keys()]) 

89 context["moderator_remove_form"] = RemoveBaseModeratorForm( 

90 moderators=moderators_choice, initial=initial_data 

91 ) 

92 

93 # Get the pending invites linked to the comment. 

94 context["invitations"] = get_comment_pending_invitations(comment["id"]) 

95 

96 return context 

97 

98 

99class CommentChangeStatusView( 

100 LoginRequiredMixin, ModeratorCommentRightsMixin, BaseCommentChangeStatusView 

101): 

102 pass 

103 

104 

105class CommentAdminModeratorView( 

106 LoginRequiredMixin, ModeratorCommentRightsMixin, ForceLanguageMixin, TemplateView 

107): 

108 """ 

109 View for managing the comment moderators. 

110 Only available to "admin" and "staff" moderators. 

111 """ 

112 

113 template_name = "blocks/comment_moderators.html" 

114 

115 def get_context_data(self, **kwargs: Any) -> dict[str, Any]: 

116 """ 

117 Moderator management view. 

118 Get all moderators related to the current user: 

119 - all "staff" moderators with one collection in common 

120 with the current user 

121 - all "base" moderators with one comment related to the current 

122 user's collections. 

123 """ 

124 rights = self.get_rights(self.request.user) 

125 if not rights.is_admin_moderator() and not rights.is_staff_moderator(): 125 ↛ 126line 125 didn't jump to line 126, because the condition on line 125 was never true

126 raise Http404 

127 

128 context = super().get_context_data(**kwargs) 

129 

130 context["moderation_rights"] = rights 

131 user_collections = rights.get_user_admin_collections() 

132 

133 context["user_collections"] = ( 

134 user_collections if user_collections else rights.get_user_staff_collections() 

135 ) 

136 

137 all_moderators, moderation_rights = get_all_moderators(rights, self.request) 

138 

139 # Post-process all moderators to categorize them as "staff" moderators 

140 # and "base" moderators 

141 # 1) "Staff" moderators 

142 all_moderators_dict = {} 

143 staff_moderators = [] 

144 for mod in all_moderators: 

145 all_moderators_dict[mod.pk] = mod 

146 common_col = [ 

147 col.pid 

148 for col in mod.comment_moderator.collections.all() # type:ignore 

149 if col.pid in user_collections 

150 ] 

151 if common_col: 

152 staff_moderators.append(mod) 

153 

154 context["all_moderators"] = all_moderators 

155 if rights.is_admin_moderator(): 

156 context["staff_moderators"] = staff_moderators 

157 

158 # 2) "Base" moderators 

159 if moderation_rights: 159 ↛ 195line 159 didn't jump to line 195, because the condition on line 159 was never false

160 base_moderators = {} 

161 staff_moderator_ids = [mod.pk for mod in staff_moderators] 

162 # Group comments per moderator 

163 # mod_right struct: {"moderator_id": x, "comment_id": x, "comment__status": x} 

164 for mod_right in moderation_rights: 

165 try: 

166 moderator = int(mod_right["moderator_id"]) 

167 except ValueError: 

168 continue 

169 # Add an entry in base_moderators only if the given moderator 

170 # is not a staff moderators 

171 if moderator in all_moderators_dict and ( 171 ↛ 164line 171 didn't jump to line 164, because the condition on line 171 was never false

172 not staff_moderator_ids or moderator not in staff_moderator_ids 

173 ): 

174 if moderator not in base_moderators: 

175 moderator_to_cp = all_moderators_dict[moderator] 

176 base_moderators[moderator] = { 

177 "first_name": moderator_to_cp.first_name, 

178 "last_name": moderator_to_cp.last_name, 

179 "email": moderator_to_cp.email, 

180 "pk": moderator_to_cp.pk, 

181 } 

182 base_moderators[moderator]["comments_processed"] = [] 

183 base_moderators[moderator]["comments_pending"] = [] 

184 comment_status = mod_right["comment__status"] 

185 comment_id = mod_right["comment_id"] 

186 if comment_status == STATUS_SUBMITTED: 

187 base_moderators[moderator]["comments_pending"].append(comment_id) 

188 else: 

189 base_moderators[moderator]["comments_processed"].append(comment_id) 

190 

191 if base_moderators: 191 ↛ 195line 191 didn't jump to line 195, because the condition on line 191 was never false

192 context["base_moderators"] = base_moderators.values() 

193 

194 # Populate modal forms 

195 user_collections_choice = tuple([(col, col) for col in user_collections]) 

196 context["moderator_add_form"] = StaffModeratorForm(user_collections_choice) 

197 context["moderator_add_form_url"] = reverse("comment_moderators") 

198 

199 context["moderator_invite_form"] = InviteStaffModeratorForm(user_collections_choice) 

200 context["moderator_invite_form_url"] = reverse("comment_moderator_invite_staff") 

201 

202 # Get all pending moderator invitations related to the current user 

203 context["invitations"] = get_pending_invitations(rights) 

204 

205 return context 

206 

207 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponseRedirect: 

208 """ 

209 Updates an existing moderator's collections. 

210 The updated collections are limited to the collections the current user 

211 has access to. 

212 """ 

213 rights = self.get_rights(request.user) 

214 if not rights.is_admin_moderator(): 

215 raise Http404 

216 

217 response = HttpResponseRedirect(reverse("comment_moderators")) 

218 user_collections = rights.get_user_admin_collections() 

219 user_collections_choice = tuple([(col, col) for col in user_collections]) 

220 form = StaffModeratorForm(user_collections_choice, request.POST) 

221 

222 if not form.is_valid(): 

223 messages.error(request, "Something went wrong. Please try again.") 

224 return response 

225 

226 cleaned_data = form.cleaned_data 

227 moderator_id = cleaned_data["moderator_id"] 

228 try: 

229 moderator = User.objects.filter(comment_moderator__is_moderator=True).get( 

230 pk=moderator_id 

231 ) 

232 except User.DoesNotExist: 

233 messages.error(request, "Error: The chosen moderator does not exist.") 

234 return response 

235 

236 selected_collections = cleaned_data["collections"] 

237 update_moderator_collections(rights, moderator, selected_collections, user_collections) 

238 

239 if selected_collections: 

240 message = ( 

241 f"The collections {', '.join(selected_collections)} have been" 

242 " added to the moderator " 

243 f"{moderator.first_name} {moderator.last_name}." 

244 ) 

245 else: 

246 message = ( 

247 f"The moderator {moderator.first_name} {moderator.last_name}" 

248 " has been removed from your staff moderators." 

249 ) 

250 

251 messages.success(request, message) 

252 return response 

253 

254 

255class AddBaseModeratorView(LoginRequiredMixin, ModeratorCommentRightsMixin, View): 

256 """ 

257 View for adding an existing moderator to a specific comment. 

258 Limited to "admin" and "staff" moderators and to non-processed comments. 

259 """ 

260 

261 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponseRedirect: 

262 rights = self.get_rights(request.user) 

263 if not rights.is_admin_moderator() and not rights.is_staff_moderator(): 

264 raise Http404 

265 

266 response = HttpResponseRedirect(reverse("comment_list")) 

267 form = AddBaseModeratorForm(request.POST) 

268 if not form.is_valid(): 268 ↛ 269line 268 didn't jump to line 269, because the condition on line 268 was never true

269 messages.error(request, "Error: Something went wrong. Please try again.") 

270 return response 

271 

272 comment_id = form.cleaned_data["comment_id"] 

273 

274 # Get the comment data 

275 error, comment = get_comment_and_can_manage_moderators(request, rights, comment_id) 

276 if error: 

277 return response 

278 

279 moderator_id = form.cleaned_data["moderator_id"] 

280 try: 

281 moderator = User.objects.filter(comment_moderator__is_moderator=True).get( 

282 pk=moderator_id 

283 ) 

284 except User.DoesNotExist: 

285 messages.error(request, "Error: The chosen moderator does not exist.") 

286 return response 

287 

288 error, _ = update_moderation_right( 

289 comment_id, moderator.pk, rights, PARAM_ACTION_CREATE, request 

290 ) 

291 

292 response = HttpResponseRedirect(reverse("comment_details", kwargs={"pk": comment_id})) 

293 if not error: 

294 email_moderator_assigned(request, moderator, comment) 

295 

296 return response 

297 

298 

299class RemoveBaseModeratorView(LoginRequiredMixin, ModeratorCommentRightsMixin, View): 

300 """ 

301 View for removing 1+ existing moderator(s) from a specific comment. 

302 Limited to "admin" and "staff" moderators and to non-processed comments. 

303 Very similar to AddBaseModeratorView. 

304 """ 

305 

306 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponseRedirect: 

307 rights = self.get_rights(request.user) 

308 if not rights.is_admin_moderator() and not rights.is_staff_moderator(): 

309 raise Http404 

310 

311 response = HttpResponseRedirect(reverse("comment_list")) 

312 

313 try: 

314 comment_id = int(request.POST.get("comment_id", "notParsable")) 

315 except ValueError: 

316 messages.error(request, "Error: Something went wrong. Please try again.") 

317 return response 

318 

319 # Get the comment data 

320 error, comment = get_comment_and_can_manage_moderators(request, rights, comment_id) 

321 if error: 

322 return response 

323 

324 existing_moderators = comment.get("moderators_processed") 

325 if not existing_moderators: 

326 messages.error(request, "Error: Something went wrong. Please try again.") 

327 return response 

328 

329 moderators_choice = tuple([(mod, mod) for mod in existing_moderators.keys()]) 

330 form = RemoveBaseModeratorForm(request.POST, moderators=moderators_choice) 

331 

332 if not form.is_valid(): 332 ↛ 333line 332 didn't jump to line 333, because the condition on line 332 was never true

333 messages.error(request, "Error: Something went wrong. Please try again.") 

334 return response 

335 

336 moderators_form = form.cleaned_data["moderators"] 

337 moderators = User.objects.filter( 

338 comment_moderator__is_moderator=True, pk__in=moderators_form 

339 ).values("first_name", "last_name") 

340 

341 if len(moderators_form) != len(moderators): 341 ↛ 342line 341 didn't jump to line 342, because the condition on line 341 was never true

342 messages.error(request, "Error: One of the chosed moderators does not exist.") 

343 return response 

344 

345 error, _ = update_moderation_right( 

346 comment_id, ",".join(moderators_form), rights, PARAM_ACTION_DELETE, request 

347 ) 

348 

349 if not error: 349 ↛ 359line 349 didn't jump to line 359, because the condition on line 349 was never false

350 moderators_str = "<br>".join( 

351 [f"{mod['first_name']} {mod['last_name']}" for mod in moderators] 

352 ) 

353 messages.success( 

354 request, 

355 "The following moderators have been removed from the comment:" 

356 f"<br>{moderators_str}", 

357 ) 

358 

359 response = HttpResponseRedirect(reverse("comment_details", kwargs={"pk": comment_id})) 

360 

361 return response 

362 

363 

364class InviteStaffModeratorView(LoginRequiredMixin, ModeratorCommentRightsMixin, View): 

365 """ 

366 View for inviting a new "staff" moderator. 

367 Different behaviors depending on the provided e-mail: 

368 - already used by an user -> If the user is a moderator, adds the collection 

369 to his/her available collections. Else error message. 

370 - already linked to an existing valid invitation -> update the invitation 

371 extra data. 

372 - else -> create a fresh invitation + send e-mail 

373 

374 Limited to "admin" moderators. 

375 """ 

376 

377 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponseRedirect: 

378 rights = self.get_rights(request.user) 

379 if not rights.is_admin_moderator(): 

380 raise Http404 

381 

382 response = HttpResponseRedirect(reverse("comment_moderators")) 

383 

384 user_collections = rights.get_user_admin_collections() 

385 user_collections_choice = tuple([(col, col) for col in user_collections]) 

386 form = InviteStaffModeratorForm(user_collections_choice, request.POST) 

387 if not form.is_valid(): 387 ↛ 388line 387 didn't jump to line 388, because the condition on line 387 was never true

388 messages.error(request, "Error: Something went wrong. Please try again.") 

389 return response 

390 

391 cleaned_data = form.cleaned_data 

392 selected_collections = cleaned_data["collections"] 

393 email = cleaned_data["email"] 

394 # Check if there's already an user with the provided email 

395 try: 

396 moderator = User.objects.get(email__iexact=email) 

397 if not is_comment_moderator(moderator): 

398 messages.error( 

399 request, 

400 "Error: An non-moderator user with the provided" 

401 f"email {email} already exists. Please contact us " 

402 "for more information.", 

403 ) 

404 return response 

405 

406 update_moderator_collections( 

407 rights, moderator, selected_collections, selected_collections 

408 ) 

409 messages.success( 

410 request, 

411 f"The collections {', '.join(selected_collections)} " 

412 f"have been added to the moderator {email}", 

413 ) 

414 return response 

415 

416 except User.DoesNotExist: 

417 pass 

418 

419 inviter_name = f"{request.user.first_name} {request.user.last_name}" # type:ignore 

420 cleaned_data[ 

421 "invite_message_txt" 

422 ] = f"You have been invited to join Trammel by {inviter_name} to moderate comments." 

423 invite = Invitation.get_invite(email, request, cleaned_data) 

424 

425 # Add selected collection(s) to the extra_data 

426 extra_data = InvitationExtraData(**invite.extra_data) 

427 extra_data.moderator.collections.append( 

428 InviteCollectionData(pid=selected_collections, user_id=request.user.pk) 

429 ) 

430 invite.extra_data = extra_data.serialize() 

431 invite.save() 

432 messages.success(request, f"An invite has been sent to {email}") 

433 

434 return response 

435 

436 

437class InviteBaseModeratorView(LoginRequiredMixin, ModeratorCommentRightsMixin, View): 

438 """ 

439 View for inviting a new "base" moderator. 

440 Logic is similar to InviteStaffModeratorView. 

441 

442 Limited to "admin" and "staff" moderators. 

443 """ 

444 

445 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponseRedirect: 

446 rights = self.get_rights(request.user) 

447 if not rights.is_admin_moderator() and not rights.is_staff_moderator(): 

448 raise Http404 

449 redirect_url = request.headers.get("referer") 

450 redirect_url = redirect_url if redirect_url else reverse("comment_list") 

451 response = HttpResponseRedirect(redirect_url) 

452 

453 form = InviteBaseModeratorForm(request.POST) 

454 if not form.is_valid(): 454 ↛ 455line 454 didn't jump to line 455, because the condition on line 454 was never true

455 messages.error(request, "Error: Something went wrong. Please try again.") 

456 return response 

457 

458 cleaned_data = form.cleaned_data 

459 comment_id = cleaned_data["comment_id"] 

460 

461 # Get the comment data 

462 error, comment = get_comment_and_can_manage_moderators(request, rights, comment_id) 

463 

464 if error: 464 ↛ 465line 464 didn't jump to line 465, because the condition on line 464 was never true

465 return response 

466 

467 email = cleaned_data.pop("email") 

468 

469 # Check if there's already an user or an invite with the provided email 

470 try: 

471 moderator = User.objects.get(email__iexact=email) 

472 if not is_comment_moderator(moderator): 

473 messages.error( 

474 request, 

475 "Error: An non-moderator user with the provided" 

476 f"email {email} already exists. Please contact us " 

477 "for more information.", 

478 ) 

479 return response 

480 

481 error, _ = update_moderation_right( 

482 comment_id, moderator.pk, rights, PARAM_ACTION_CREATE, request 

483 ) 

484 

485 if error: 485 ↛ 486line 485 didn't jump to line 486, because the condition on line 485 was never true

486 return response 

487 

488 email_moderator_assigned(request, moderator, comment) 

489 return response 

490 

491 except User.DoesNotExist: 

492 pass 

493 

494 inviter_name = f"{request.user.first_name} {request.user.last_name}" # type:ignore 

495 cleaned_data[ 

496 "invite_message_txt" 

497 ] = f"You have been invited to join Trammel by {inviter_name} to moderate comments." 

498 invite = Invitation.get_invite(email, request, cleaned_data) 

499 

500 # Add comment to the invitation extra_data 

501 extra_data = InvitationExtraData(**invite.extra_data) 

502 extra_data.moderator.comments.append( 

503 InviteCommentData( 

504 id=comment_id, 

505 user_id=request.user.pk, 

506 pid=comment["site_name"].upper(), 

507 doi=comment["doi"], 

508 ) 

509 ) 

510 invite.extra_data = extra_data.serialize() 

511 invite.save() 

512 messages.success(request, f"An invite has been sent to {email}") 

513 

514 return response