1 # -*- coding: utf-8 -*-
3 Models and managers for generic tagging.
6 from django.contrib.contenttypes.fields import GenericForeignKey
7 from django.contrib.contenttypes.models import ContentType
8 from django.db import connection, models
9 from django.utils.translation import ugettext_lazy as _
10 from django.db.models.base import ModelBase
11 from django.core.exceptions import ObjectDoesNotExist
12 from django.dispatch import Signal
14 qn = connection.ops.quote_name
17 tags_updated = Signal(providing_args=["affected_tags"])
19 def get_queryset_and_model(queryset_or_model):
21 Given a ``QuerySet`` or a ``Model``, returns a two-tuple of
24 If a ``Model`` is given, the ``QuerySet`` returned will be created
25 using its default manager.
28 return queryset_or_model, queryset_or_model.model
29 except AttributeError:
30 return queryset_or_model._default_manager.all(), queryset_or_model
36 class TagManager(models.Manager):
37 def __init__(self, intermediary_table_model):
38 super(TagManager, self).__init__()
39 self.intermediary_table_model = intermediary_table_model
40 models.signals.pre_delete.connect(self.target_deleted)
42 def target_deleted(self, instance, **kwargs):
43 """ clear tag relations before deleting an object """
49 self.update_tags(instance, [])
51 def update_tags(self, obj, tags):
53 Update tags associated with an object.
55 content_type = ContentType.objects.get_for_model(obj)
56 current_tags = list(self.filter(items__content_type__pk=content_type.pk,
57 items__object_id=obj.pk))
58 updated_tags = self.model.get_tag_list(tags)
60 # Remove tags which no longer apply
61 tags_for_removal = [tag for tag in current_tags \
62 if tag not in updated_tags]
63 if len(tags_for_removal):
64 self.intermediary_table_model._default_manager.filter(content_type__pk=content_type.pk,
66 tag__in=tags_for_removal).delete()
68 tags_to_add = [tag for tag in updated_tags
69 if tag not in current_tags]
70 for tag in tags_to_add:
71 if tag not in current_tags:
72 self.intermediary_table_model._default_manager.create(tag=tag, content_object=obj)
74 tags_updated.send(sender=obj, affected_tags=tags_to_add + tags_for_removal)
76 def remove_tag(self, obj, tag):
78 Remove tag from an object.
80 content_type = ContentType.objects.get_for_model(obj)
81 self.intermediary_table_model._default_manager.filter(content_type__pk=content_type.pk,
82 object_id=obj.pk, tag=tag).delete()
84 def get_for_object(self, obj):
86 Create a queryset matching all tags associated with the given
89 ctype = ContentType.objects.get_for_model(obj)
90 return self.filter(items__content_type__pk=ctype.pk,
91 items__object_id=obj.pk)
93 def usage_for_model(self, model, counts=False, filters=None):
95 Obtain a list of tags associated with instances of the given
98 If ``counts`` is True, a ``count`` attribute will be added to
99 each tag, indicating how many times it has been used against
100 the Model class in question.
102 To limit the tags (and counts, if specified) returned to those
103 used by a subset of the Model's instances, pass a dictionary
104 of field lookups to be applied to the given Model as the
105 ``filters`` argument.
107 if filters is None: filters = {}
109 queryset = model._default_manager.filter()
110 for f in filters.items():
111 queryset.query.add_filter(f)
112 usage = self.usage_for_queryset(queryset, counts)
115 def usage_for_queryset(self, queryset, counts=False):
117 Obtain a list of tags associated with instances of a model
118 contained in the given queryset.
120 If ``counts`` is True, a ``count`` attribute will be added to
121 each tag, indicating how many times it has been used against
122 the Model class in question.
124 usage = self.model._default_manager.filter(
125 items__content_type=ContentType.objects.get_for_model(queryset.model),
126 items__object_id__in=queryset
129 usage = usage.annotate(count=models.Count('id'))
131 usage = usage.distinct()
134 def related_for_model(self, tags, model, counts=False):
136 Obtain a list of tags related to a given list of tags - that
137 is, other tags used by items which have all the given tags.
139 If ``counts`` is True, a ``count`` attribute will be added to
140 each tag, indicating the number of items which have it in
141 addition to the given list of tags.
143 objs = self.model.intermediary_table_model.objects.get_by_model(model, tags)
144 qs = self.usage_for_queryset(objs, counts)
145 qs = qs.exclude(pk__in=[tag.pk for tag in tags])
149 class TaggedItemManager(models.Manager):
150 def __init__(self, tag_model):
151 super(TaggedItemManager, self).__init__()
152 self.tag_model = tag_model
154 def get_by_model(self, queryset_or_model, tags):
156 Create a ``QuerySet`` containing instances of the specified
157 model associated with a given tag or list of tags.
159 queryset, model = get_queryset_and_model(queryset_or_model)
160 tags = self.tag_model.get_tag_list(tags)
161 tag_count = len(tags)
163 # No existing tags were given
164 return queryset.none()
166 # Optimisation for single tag - fall through to the simpler
168 return queryset.filter(tag_relations__tag=tags[0])
170 # TODO: presumes reverse generic relation
171 return queryset.filter(tag_relations__tag__in=tags
172 ).annotate(count=models.Count('pk')).filter(count=len(tags))
174 def get_union_by_model(self, queryset_or_model, tags):
176 Create a ``QuerySet`` containing instances of the specified
177 model associated with *any* of the given list of tags.
179 queryset, model = get_queryset_and_model(queryset_or_model)
180 tags = self.tag_model.get_tag_list(tags)
182 return queryset.none()
183 # TODO: presumes reverse generic relation
184 return queryset.filter(tag_relations__tag__in=tags)
186 def get_related(self, obj, queryset_or_model):
188 Retrieve a list of instances of the specified model which share
189 tags with the model instance ``obj``, ordered by the number of
190 shared tags in descending order.
192 queryset, model = get_queryset_and_model(queryset_or_model)
193 # TODO: presumes reverse generic relation.
194 # Do we know it's 'tags'?
195 return queryset.filter(tag_relations__tag__in=obj.tags).annotate(
196 count=models.Count('pk')).order_by('-count').exclude(pk=obj.pk)
202 def create_intermediary_table_model(model):
203 """Create an intermediary table model for the specific tag model"""
204 name = model.__name__ + 'Relation'
207 db_table = '%s_relation' % model._meta.db_table
208 unique_together = (('tag', 'content_type', 'object_id'),)
209 app_label = model._meta.app_label
211 def obj_unicode(self):
213 return u'%s [%s]' % (self.content_type.get_object_for_this_type(pk=self.object_id), self.tag)
214 except ObjectDoesNotExist:
215 return u'<deleted> [%s]' % self.tag
217 # Set up a dictionary to simulate declarations within a class
219 '__module__': model.__module__,
221 'tag': models.ForeignKey(model, verbose_name=_('tag'), related_name='items'),
222 'content_type': models.ForeignKey(ContentType, verbose_name=_('content type')),
223 'object_id': models.PositiveIntegerField(_('object id'), db_index=True),
224 'content_object': GenericForeignKey('content_type', 'object_id'),
225 '__unicode__': obj_unicode,
228 return type(name, (models.Model,), attrs)
231 class TagMeta(ModelBase):
232 "Metaclass for tag models (models inheriting from TagBase)."
233 def __new__(cls, name, bases, attrs):
234 model = super(TagMeta, cls).__new__(cls, name, bases, attrs)
235 if not model._meta.abstract:
236 # Create an intermediary table and register custom managers for concrete models
237 model.intermediary_table_model = create_intermediary_table_model(model)
238 TagManager(model.intermediary_table_model).contribute_to_class(model, 'objects')
239 TaggedItemManager(model).contribute_to_class(model.intermediary_table_model, 'objects')
243 class TagBase(models.Model):
244 """Abstract class to be inherited by model classes."""
245 __metaclass__ = TagMeta
251 def get_tag_list(tag_list):
253 Utility function for accepting tag input in a flexible manner.
255 You should probably override this method in your subclass.
257 if isinstance(tag_list, TagBase):