quickfix: set User-Agent for wikidata
[redakcja.git] / src / catalogue / wikidata.py
1 # This file is part of FNP-Redakcja, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
3 #
4 from datetime import date
5 from django.conf import settings
6 from django.db import models
7 from django.db.models.signals import m2m_changed
8 from django.utils.html import format_html
9 from django.utils.translation import gettext_lazy as _
10 from wikidata.client import Client
11 from wikidata.datavalue import DatavalueError
12 from modeltranslation.translator import translator
13 from modeltranslation.settings import AVAILABLE_LANGUAGES
14 from .wikimedia import Downloadable
15
16
17 class WikidataModel(models.Model):
18     wikidata = models.CharField(
19         max_length=255,
20         blank=True,
21         help_text=_('If you have a Wikidata ID, like "Q1337", enter it and save.'),
22     )
23
24     class Meta:
25         abstract = True
26
27     def get_wikidata_property(self, client, entity, wd, lang):
28         wdvalue = None
29
30         if callable(wd):
31             return wd(
32                 lambda next_arg:
33                 self.get_wikidata_property(
34                     client, entity, next_arg, lang
35                 )
36             )
37         elif wd == "description":
38             wdvalue = entity.description.get(lang, str(entity.description))
39         elif wd == "label":
40             wdvalue = entity.label.get(lang, str(entity.label))
41         elif wd[0] == 'P':
42             try:
43                 # TODO: lang?
44                 wdvalue = entity.get(client.get(wd))
45             except DatavalueError:
46                 pass
47         else:
48             try:
49                 # wiki links identified as 'plwiki' etc.
50                 wdvalue = entity.attributes['sitelinks'][wd]['url']
51             except KeyError:
52                 pass
53
54         return wdvalue
55         
56         
57     def wikidata_populate_field(self, client, entity, attname, wd, save, lang, force=False):
58         if not force:
59             model_field = self._meta.get_field(attname)
60             if isinstance(model_field, models.ManyToManyField):
61                 if getattr(self, attname).all().exists():
62                     return
63             else:
64                 if getattr(self, attname):
65                     return
66
67         wdvalue = self.get_wikidata_property(client, entity, wd, lang)
68             
69         self.set_field_from_wikidata(attname, wdvalue, save=save)
70
71     def wikidata_populate(self, save=True, force=False):
72         Wikidata = type(self).Wikidata
73         client = Client()
74         client.opener.addheaders = [(
75             'User-Agent', 'Wolne Lektury Redakcja / Python-wikidata'
76         )]
77         # Probably should getlist
78         entity = client.get(self.wikidata)
79         for attname in dir(Wikidata):
80             if attname.startswith("_"):
81                 continue
82             wd = getattr(Wikidata, attname)
83
84             self.wikidata_populate_attribute(client, entity, attname, wd, save=save, force=force)
85         if hasattr(Wikidata, '_supplement'):
86             for attname, wd in Wikidata._supplement(self):
87                 self.wikidata_populate_attribute(client, entity, attname, wd, save=save, force=force)
88
89     def wikidata_fields_for_attribute(self, attname):
90         field = getattr(type(self), attname)
91         if type(self) in translator._registry:
92             try:
93                 opts = translator.get_options_for_model(type(self))
94             except:
95                 pass
96             else:
97                 if attname in opts.fields:
98                     tfields = opts.fields[attname]
99                     for tf in tfields:
100                         yield tf.name, tf.language
101                     return
102
103         yield attname, settings.LANGUAGE_CODE
104
105     def wikidata_populate_attribute(self, client, entity, attname, wd, save, force=False):
106         for fieldname, lang in self.wikidata_fields_for_attribute(attname):
107             self.wikidata_populate_field(client, entity, fieldname, wd, save, lang, force=force)
108                 
109     def save(self, **kwargs):
110         am_new = self.pk is None
111
112         super().save()
113         if am_new and self.wikidata and hasattr(self, "Wikidata"):
114             self.wikidata_populate()
115
116         kwargs.update(force_insert=False, force_update=True)
117         super().save(**kwargs)
118
119     def set_field_from_wikidata(self, attname, wdvalue, save, language='pl'):
120         if not wdvalue:
121             return
122         # Find out what this model field is
123         model_field = self._meta.get_field(attname)
124         skip_set = False
125         if isinstance(model_field, models.ForeignKey):
126             rel_model = model_field.related_model
127             if issubclass(rel_model, WikidataModel):
128                 label = wdvalue.label.get(language, str(wdvalue.label))
129                 try:
130                     wdvalue = rel_model.objects.get(wikidata=wdvalue.id)
131                 except rel_model.DoesNotExist:
132                     wdvalue = rel_model(wikidata=wdvalue.id)
133                     if save:
134                         wdvalue.save()
135                 wdvalue._wikidata_label = label
136                 setattr(self, attname, wdvalue)
137         elif isinstance(model_field, models.ManyToManyField):
138             rel_model = model_field.related_model
139             if issubclass(rel_model, WikidataModel):
140                 label = wdvalue.label.get(language, str(wdvalue.label))
141                 try:
142                     wdvalue = rel_model.objects.get(wikidata=wdvalue.id)
143                 except rel_model.DoesNotExist:
144                     wdvalue = rel_model(wikidata=wdvalue.id)
145                     if save:
146                         wdvalue.save()
147                 wdvalue._wikidata_label = label
148                 getattr(self, attname).set([wdvalue])
149         else:
150             # How to get original title?
151             if isinstance(wdvalue, date):
152                 if isinstance(model_field, models.IntegerField):
153                     wdvalue = wdvalue.year
154
155             # If downloadable (and not save)?
156             elif isinstance(wdvalue, Downloadable):
157                 if save:
158                     wdvalue.apply_to_field(self, attname)
159                     skip_set = True
160
161             elif hasattr(wdvalue, 'label'):
162                 wdvalue = wdvalue.label.get(language, str(wdvalue.label))
163
164             if not skip_set:
165                 try:
166                     wdvalue = model_field.to_python(wdvalue)
167                 except:
168                     pass
169                 else:
170                     if isinstance(wdvalue, str):
171                         max_length = getattr(model_field, 'max_length', None)
172                         if max_length:
173                             wdvalue = wdvalue[:max_length]
174                     setattr(self, attname, wdvalue)
175
176     def wikidata_link(self):
177         if self.wikidata:
178             return format_html(
179                 '<a href="https://www.wikidata.org/wiki/{wd}" target="_blank">{wd}</a>',
180                 wd=self.wikidata,
181             )
182         else:
183             return ""
184
185     wikidata_link.admin_order_field = "wikidata"
186
187
188 class WikidataAdminMixin:
189     class Media:
190         css = {"screen": ("catalogue/wikidata_admin.css",)}
191         js = ("catalogue/wikidata_admin.js",)
192
193     def save_related(self, request, form, formsets, change):
194         super().save_related(request, form, formsets, change)
195         form.instance.save()