1 # -*- coding: utf-8 -*-
3 Models and managers for generic tagging.
6 from django.contrib.contenttypes import generic
7 from django.contrib.contenttypes.models import ContentType
8 from django.db import connection, models
9 from django.utils.translation import ugettext_lazy as _
10 from django.db.models.base import ModelBase
11 from django.db.models.loading import get_model # D1.7: apps?
12 from django.core.exceptions import ObjectDoesNotExist
13 from django.dispatch import Signal
15 qn = connection.ops.quote_name
18 tags_updated = Signal(providing_args=["affected_tags"])
20 def get_queryset_and_model(queryset_or_model):
22 Given a ``QuerySet`` or a ``Model``, returns a two-tuple of
25 If a ``Model`` is given, the ``QuerySet`` returned will be created
26 using its default manager.
29 return queryset_or_model, queryset_or_model.model
30 except AttributeError:
31 return queryset_or_model._default_manager.all(), queryset_or_model
37 class TagManager(models.Manager):
38 def __init__(self, intermediary_table_model):
39 super(TagManager, self).__init__()
40 self.intermediary_table_model = intermediary_table_model
41 models.signals.pre_delete.connect(self.target_deleted)
43 def target_deleted(self, instance, **kwargs):
44 """ clear tag relations before deleting an object """
50 self.update_tags(instance, [])
52 def update_tags(self, obj, tags):
54 Update tags associated with an object.
56 content_type = ContentType.objects.get_for_model(obj)
57 current_tags = list(self.filter(items__content_type__pk=content_type.pk,
58 items__object_id=obj.pk))
59 updated_tags = self.model.get_tag_list(tags)
61 # Remove tags which no longer apply
62 tags_for_removal = [tag for tag in current_tags \
63 if tag not in updated_tags]
64 if len(tags_for_removal):
65 self.intermediary_table_model._default_manager.filter(content_type__pk=content_type.pk,
67 tag__in=tags_for_removal).delete()
69 tags_to_add = [tag for tag in updated_tags
70 if tag not in current_tags]
71 for tag in tags_to_add:
72 if tag not in current_tags:
73 self.intermediary_table_model._default_manager.create(tag=tag, content_object=obj)
75 tags_updated.send(sender=obj, affected_tags=tags_to_add + tags_for_removal)
77 def remove_tag(self, obj, tag):
79 Remove tag from an object.
81 content_type = ContentType.objects.get_for_model(obj)
82 self.intermediary_table_model._default_manager.filter(content_type__pk=content_type.pk,
83 object_id=obj.pk, tag=tag).delete()
85 def get_for_object(self, obj):
87 Create a queryset matching all tags associated with the given
90 ctype = ContentType.objects.get_for_model(obj)
91 return self.filter(items__content_type__pk=ctype.pk,
92 items__object_id=obj.pk)
94 def usage_for_model(self, model, counts=False, filters=None):
96 Obtain a list of tags associated with instances of the given
99 If ``counts`` is True, a ``count`` attribute will be added to
100 each tag, indicating how many times it has been used against
101 the Model class in question.
103 To limit the tags (and counts, if specified) returned to those
104 used by a subset of the Model's instances, pass a dictionary
105 of field lookups to be applied to the given Model as the
106 ``filters`` argument.
108 if filters is None: filters = {}
110 queryset = model._default_manager.filter()
111 for f in filters.items():
112 queryset.query.add_filter(f)
113 usage = self.usage_for_queryset(queryset, counts)
116 def usage_for_queryset(self, queryset, counts=False):
118 Obtain a list of tags associated with instances of a model
119 contained in the given queryset.
121 If ``counts`` is True, a ``count`` attribute will be added to
122 each tag, indicating how many times it has been used against
123 the Model class in question.
125 usage = self.model._default_manager.filter(
126 items__content_type=ContentType.objects.get_for_model(queryset.model),
127 items__object_id__in=queryset
130 usage = usage.annotate(count=models.Count('id'))
132 usage = usage.distinct()
135 def related_for_model(self, tags, model, counts=False):
137 Obtain a list of tags related to a given list of tags - that
138 is, other tags used by items which have all the given tags.
140 If ``counts`` is True, a ``count`` attribute will be added to
141 each tag, indicating the number of items which have it in
142 addition to the given list of tags.
144 objs = self.model.intermediary_table_model.objects.get_by_model(model, tags)
145 qs = self.usage_for_queryset(objs, counts)
146 qs = qs.exclude(pk__in=[tag.pk for tag in tags])
150 class TaggedItemManager(models.Manager):
151 def __init__(self, tag_model):
152 super(TaggedItemManager, self).__init__()
153 self.tag_model = tag_model
155 def get_by_model(self, queryset_or_model, tags):
157 Create a ``QuerySet`` containing instances of the specified
158 model associated with a given tag or list of tags.
160 queryset, model = get_queryset_and_model(queryset_or_model)
161 tags = self.tag_model.get_tag_list(tags)
162 tag_count = len(tags)
164 # No existing tags were given
167 # Optimisation for single tag - fall through to the simpler
169 return queryset.filter(tag_relations__tag=tags[0])
171 # TODO: presumes reverse generic relation
172 return queryset.filter(tag_relations__tag__in=tags
173 ).annotate(count=models.Count('pk')).filter(count=len(tags))
175 def get_union_by_model(self, queryset_or_model, tags):
177 Create a ``QuerySet`` containing instances of the specified
178 model associated with *any* of the given list of tags.
180 queryset, model = get_queryset_and_model(queryset_or_model)
181 tags = self.tag_model.get_tag_list(tags)
184 # TODO: presumes reverse generic relation
185 return queryset.filter(tag_relations__tag__in=tags)
187 def get_related(self, obj, queryset_or_model):
189 Retrieve a list of instances of the specified model which share
190 tags with the model instance ``obj``, ordered by the number of
191 shared tags in descending order.
193 queryset, model = get_queryset_and_model(queryset_or_model)
194 # TODO: presumes reverse generic relation.
195 # Do we know it's 'tags'?
196 return queryset.filter(tag_relations__tag__in=obj.tags).annotate(
197 count=models.Count('pk')).order_by('-count').exclude(obj=obj.pk)
203 def create_intermediary_table_model(model):
204 """Create an intermediary table model for the specific tag model"""
205 name = model.__name__ + 'Relation'
208 db_table = '%s_relation' % model._meta.db_table
209 unique_together = (('tag', 'content_type', 'object_id'),)
210 app_label = model._meta.app_label
212 def obj_unicode(self):
214 return u'%s [%s]' % (self.content_type.get_object_for_this_type(pk=self.object_id), self.tag)
215 except ObjectDoesNotExist:
216 return u'<deleted> [%s]' % self.tag
218 # Set up a dictionary to simulate declarations within a class
220 '__module__': model.__module__,
222 'tag': models.ForeignKey(model, verbose_name=_('tag'), related_name='items'),
223 'content_type': models.ForeignKey(ContentType, verbose_name=_('content type')),
224 'object_id': models.PositiveIntegerField(_('object id'), db_index=True),
225 'content_object': generic.GenericForeignKey('content_type', 'object_id'),
226 '__unicode__': obj_unicode,
229 return type(name, (models.Model,), attrs)
232 class TagMeta(ModelBase):
233 "Metaclass for tag models (models inheriting from TagBase)."
234 def __new__(cls, name, bases, attrs):
235 model = super(TagMeta, cls).__new__(cls, name, bases, attrs)
236 if not model._meta.abstract:
237 # Create an intermediary table and register custom managers for concrete models
238 model.intermediary_table_model = create_intermediary_table_model(model)
239 TagManager(model.intermediary_table_model).contribute_to_class(model, 'objects')
240 TaggedItemManager(model).contribute_to_class(model.intermediary_table_model, 'objects')
244 class TagBase(models.Model):
245 """Abstract class to be inherited by model classes."""
246 __metaclass__ = TagMeta
252 def get_tag_list(tag_list):
254 Utility function for accepting tag input in a flexible manner.
256 You should probably override this method in your subclass.
258 if isinstance(tag_list, TagBase):