1
+ from __future__ import annotations
2
+
1
3
from django .apps import apps
2
4
from django .conf import settings
3
5
from django .contrib import admin
4
- from django .core .exceptions import FieldError , PermissionDenied
5
- from django .db .models import OuterRef , Q , Subquery
6
- from django .http import Http404 , JsonResponse
6
+ from django .core .exceptions import PermissionDenied
7
+ from django .db .models import F , Model , Prefetch , Q , QuerySet
8
+ from django .http import Http404 , HttpRequest , JsonResponse
7
9
from django .urls import path
8
10
from django .utils .translation import gettext as _
9
11
from django .views .generic .list import BaseListView
17
19
from .helpers import get_manager
18
20
19
21
22
+ UNICODE_SPACE = "\u3000 " # This is a full-width space character (U+3000)
23
+
24
+
20
25
_version = int (__version__ .split ("." )[0 ])
21
26
if _version >= 4 :
22
27
from cms .admin .utils import GrouperModelAdmin
@@ -37,7 +42,7 @@ class AdminUrlsView(BaseListView):
37
42
paginate_by = getattr (settings , "DJANGOCMS_LINK_PAGINATE_BY" , 50 )
38
43
admin_site = None
39
44
40
- def get (self , request , * args , ** kwargs ):
45
+ def get (self , request : HttpRequest , * args , ** kwargs ) -> JsonResponse :
41
46
"""
42
47
Return a JsonResponse with search results (query parameter "q") usable by
43
48
Django admin's autocomplete view. Each item is returned as defined in
@@ -90,7 +95,7 @@ def get(self, request, *args, **kwargs):
90
95
}
91
96
)
92
97
93
- def get_page (self ):
98
+ def get_page (self ) -> int :
94
99
page_kwarg = self .page_kwarg
95
100
page = self .kwargs .get (page_kwarg ) or self .request .GET .get (page_kwarg ) or 1
96
101
try :
@@ -101,7 +106,7 @@ def get_page(self):
101
106
)
102
107
return page_number
103
108
104
- def get_paginated_multi_qs (self , qs_list ) :
109
+ def get_paginated_multi_qs (self , qs_list : list [ QuerySet ]) -> list [ Model ] | QuerySet :
105
110
"""
106
111
Paginate multiple querysets and return a result list.
107
112
"""
@@ -110,19 +115,34 @@ def get_paginated_multi_qs(self, qs_list):
110
115
return qs_list [0 ]
111
116
# Slize all querysets, evaluate and join them into a list
112
117
max_items = self .get_page () * self .paginate_by
113
- return sum ((list (qs [:max_items ]) for qs in qs_list ), start = [])
114
-
115
- def get_reference (self , request ):
118
+ objects = []
119
+ for qs in qs_list :
120
+ for item in qs :
121
+ if self .has_perm (self .request , item ):
122
+ objects .append (item )
123
+
124
+ if len (objects ) >= max_items :
125
+ # No need to touch the rest of the querysets
126
+ # as we have enough items already
127
+ break
128
+ return objects
129
+
130
+ def get_reference (self , request : HttpRequest ) -> JsonResponse :
116
131
try :
117
132
model_str , pk = request .GET .get ("g" ).split (":" )
118
133
app , model = model_str .split ("." )
119
134
model = apps .get_model (app , model )
120
135
model_admin = self .admin_site ._registry .get (model )
136
+ language = get_language_from_request (request )
137
+
121
138
if model_str == "cms.page" and _version >= 4 or model_admin is None :
122
139
obj = get_manager (model ).get (pk = pk )
123
140
if model_str == "cms.page" :
124
- language = get_language_from_request (request )
125
- obj .__link_text__ = obj .get_admin_content (language ).title
141
+ obj .__link_text__ = obj .get_admin_content (language , fallback = True ).title
142
+ return JsonResponse (self .serialize_result (obj ))
143
+ elif model_str == "cms.page" :
144
+ obj = get_manager (model ).get (pk = pk )
145
+ obj .__link_text__ = obj .get_title (language , fallback = True )
126
146
return JsonResponse (self .serialize_result (obj ))
127
147
128
148
if hasattr (model_admin , "get_link_queryset" ):
@@ -151,43 +171,54 @@ def get_optgroups(self, context):
151
171
results .append (model )
152
172
return results
153
173
154
- def serialize_result (self , obj ) :
174
+ def serialize_result (self , obj : Model ) -> dict :
155
175
"""
156
176
Convert the provided model object to a dictionary that is added to the
157
177
results list.
158
178
"""
179
+ if isinstance (obj , Page ) and hasattr (obj , "prefetched_content" ) and hasattr (obj , "get_admin_content" ):
180
+ obj .admin_content_cache = {trans .language : trans for trans in obj .prefetched_content }
181
+ obj .__link_text__ = obj .get_admin_content (self .language ).title
182
+
183
+ indentation = UNICODE_SPACE * (max (getattr (obj , "__depth__" , 1 ), 1 ) - 1 )
159
184
return {
160
185
"id" : f"{ obj ._meta .app_label } .{ obj ._meta .model_name } :{ obj .pk } " ,
161
- "text" : getattr (obj , "__link_text__" , str (obj )) or str (obj ),
186
+ "text" : indentation + ( getattr (obj , "__link_text__" , str (obj )) or str (obj ) ),
162
187
"url" : obj .get_absolute_url (),
163
188
"verbose_name" : str (obj ._meta .verbose_name ).capitalize (),
164
189
}
165
190
166
- def get_queryset (self ):
191
+ def get_queryset (self ) -> QuerySet :
167
192
"""Return queryset based on ModelAdmin.get_search_results()."""
168
193
languages = get_language_list ()
169
194
try :
170
- # django CMS 5.0+
171
- qs = (
195
+ # django CMS 4.1/ 5.0+
196
+ content_qs = (
172
197
PageContent .admin_manager .filter (language__in = languages )
173
198
.filter (
174
199
Q (title__icontains = self .term ) | Q (menu_title__icontains = self .term )
175
200
)
176
201
.current_content ()
177
202
)
178
203
qs = (
179
- Page .objects .filter (pk__in = qs .values_list ("page_id" , flat = True ))
180
- .order_by ("path" )
181
- .annotate (
182
- __link_text__ = Subquery (
183
- qs .filter (page_id = OuterRef ("pk" )).values ("title" )[:1 ]
184
- )
204
+ Page .objects .filter (pk__in = content_qs .values_list ("page_id" , flat = True ))
205
+ .order_by ("path" if _version >= 5 else "node__path" )
206
+ .prefetch_related (
207
+ Prefetch (
208
+ "pagecontent_set" ,
209
+ to_attr = "prefetched_content" ,
210
+ queryset = PageContent .admin_manager .current_content (),
211
+ ),
185
212
)
186
213
)
214
+ if not self .term :
215
+ qs = qs .annotate (
216
+ __depth__ = F ("depth" if _version >= 5 else "node__depth" )
217
+ )
187
218
if self .site :
188
- qs = qs .filter (site_id = self .site )
189
- except (AttributeError , FieldError ):
190
- # django CMS 3.11 - 4.1
219
+ qs = qs .filter (site_id = self .site ) if _version >= 5 else qs . filter ( node__site_id = self . site )
220
+ except (AttributeError ,):
221
+ # django CMS 3.11
191
222
qs = (
192
223
get_manager (PageContent , current_content = True )
193
224
.filter (language__in = languages )
@@ -198,20 +229,27 @@ def get_queryset(self):
198
229
qs = (
199
230
Page .objects .filter (pk__in = qs .values_list ("page_id" , flat = True ))
200
231
.order_by ("node__path" )
201
- .annotate (
202
- __link_text__ = Subquery (
203
- qs .filter (page_id = OuterRef ("pk" )).values ("title" )[:1 ]
204
- )
232
+ .prefetch_related (
233
+ Prefetch (
234
+ "title_set" ,
235
+ to_attr = "prefetched_content" ,
236
+ queryset = get_manager (PageContent , current_content = True ).all (),
237
+ ),
205
238
)
206
239
)
207
240
if "publisher_draft" in Page ._meta .fields_map :
208
241
# django CMS 3.11
209
242
qs = qs .filter (publisher_is_draft = True )
243
+ if not self .term :
244
+ qs = qs .annotate (
245
+ __depth__ = F ("node__depth" )
246
+ )
247
+
210
248
if self .site :
211
249
qs = qs .filter (node__site_id = self .site )
212
250
return qs
213
251
214
- def add_admin_querysets (self , qs ) :
252
+ def add_admin_querysets (self , qs : list [ QuerySet ]) -> None :
215
253
for model_admin in REGISTERED_ADMIN :
216
254
try :
217
255
# hack: GrouperModelAdmin expects a language to be temporarily set
@@ -229,20 +267,17 @@ def add_admin_querysets(self, qs):
229
267
)
230
268
elif hasattr (model_admin .model , "sites" ) and self .site :
231
269
new_qs = new_qs .filter (sites__id = self .site )
232
- new_qs , search_use_distinct = model_admin .get_search_results (
233
- self .request , new_qs , self .term
234
- )
270
+ new_qs , search_use_distinct = model_admin .get_search_results (self .request , new_qs , self .term )
235
271
if search_use_distinct : # pragma: no cover
236
272
new_qs = new_qs .distinct ()
237
-
238
273
qs .append (new_qs )
239
274
except Exception : # pragma: no cover
240
275
# Still report back remaining urls even if one model fails
241
276
pass
242
277
243
278
return qs
244
279
245
- def process_request (self , request ) :
280
+ def process_request (self , request : HttpRequest ) -> tuple [ str , str , int | None ] :
246
281
"""
247
282
Validate request integrity, extract and return request parameters.
248
283
"""
@@ -257,7 +292,7 @@ def process_request(self, request):
257
292
language = get_language_from_request (request )
258
293
return term , language , site
259
294
260
- def has_perm (self , request , obj = None ):
295
+ def has_perm (self , request : HttpRequest , obj = None ) -> bool :
261
296
"""Check if user has permission to access the related model."""
262
297
if obj is None :
263
298
return True
@@ -279,11 +314,11 @@ def __init__(self, *args, **kwargs):
279
314
super ().__init__ (* args , ** kwargs )
280
315
self .global_link_url_name = f"{ self .opts .app_label } _{ self .opts .model_name } _urls"
281
316
282
- def has_module_permission (self , request ) : # pragma: no cover
317
+ def has_module_permission (self , request : HttpRequest ) -> bool : # pragma: no cover
283
318
# Remove from admin
284
319
return False
285
320
286
- def get_urls (self ):
321
+ def get_urls (self ) -> list :
287
322
# Only url endpoint public, do not call super().get_urls()
288
323
return [
289
324
path (
@@ -293,7 +328,7 @@ def get_urls(self):
293
328
),
294
329
]
295
330
296
- def url_view (self , request ) :
331
+ def url_view (self , request : HttpRequest ) -> JsonResponse :
297
332
return AdminUrlsView .as_view (admin_site = self .admin_site )(request )
298
333
299
334
0 commit comments