X-Git-Url: https://git.mdrn.pl/redakcja.git/blobdiff_plain/2caa6415139fe58938a62dca695639ba0cc86dda..282654ea252af7e2d30740b40bccfc9be61dd3a8:/src/catalogue/wikidata.py
diff --git a/src/catalogue/wikidata.py b/src/catalogue/wikidata.py
index c6885705..90f1a5cb 100644
--- a/src/catalogue/wikidata.py
+++ b/src/catalogue/wikidata.py
@@ -1,95 +1,183 @@
+# This file is part of FNP-Redakcja, licensed under GNU Affero GPLv3 or later.
+# Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
+#
from datetime import date
+from django.conf import settings
from django.db import models
from django.db.models.signals import m2m_changed
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
-from django.dispatch import receiver
from wikidata.client import Client
from wikidata.datavalue import DatavalueError
+from modeltranslation.translator import translator
+from modeltranslation.settings import AVAILABLE_LANGUAGES
+from .wikimedia import Downloadable
-class WikidataMixin(models.Model):
+class WikidataModel(models.Model):
wikidata = models.CharField(
max_length=255,
- null=True,
blank=True,
- unique=True,
help_text=_('If you have a Wikidata ID, like "Q1337", enter it and save.'),
)
class Meta:
abstract = True
+ def get_wikidata_property(self, client, entity, wd, lang):
+ wdvalue = None
+
+ if callable(wd):
+ return wd(
+ lambda next_arg:
+ self.get_wikidata_property(
+ client, entity, next_arg, lang
+ )
+ )
+ elif wd == "description":
+ wdvalue = entity.description.get(lang, str(entity.description))
+ elif wd == "label":
+ wdvalue = entity.label.get(lang, str(entity.label))
+ elif wd[0] == 'P':
+ try:
+ # TODO: lang?
+ wdvalue = entity.get(client.get(wd))
+ except DatavalueError:
+ pass
+ else:
+ try:
+ # wiki links identified as 'plwiki' etc.
+ wdvalue = entity.attributes['sitelinks'][wd]['url']
+ except KeyError:
+ pass
+
+ return wdvalue
+
+
+ def wikidata_populate_field(self, client, entity, attname, wd, save, lang, force=False):
+ if not force:
+ model_field = self._meta.get_field(attname)
+ if isinstance(model_field, models.ManyToManyField):
+ if getattr(self, attname).all().exists():
+ return
+ else:
+ if getattr(self, attname):
+ return
+
+ wdvalue = self.get_wikidata_property(client, entity, wd, lang)
+
+ self.set_field_from_wikidata(attname, wdvalue, save=save)
+
+ def wikidata_populate(self, save=True, force=False):
+ Wikidata = type(self).Wikidata
+ client = Client()
+ # Probably should getlist
+ entity = client.get(self.wikidata)
+ for attname in dir(Wikidata):
+ if attname.startswith("_"):
+ continue
+ wd = getattr(Wikidata, attname)
+
+ self.wikidata_populate_attribute(client, entity, attname, wd, save=save, force=force)
+ if hasattr(Wikidata, '_supplement'):
+ for attname, wd in Wikidata._supplement(self):
+ self.wikidata_populate_attribute(client, entity, attname, wd, save=save, force=force)
+
+ def wikidata_fields_for_attribute(self, attname):
+ field = getattr(type(self), attname)
+ if type(self) in translator._registry:
+ try:
+ opts = translator.get_options_for_model(type(self))
+ except:
+ pass
+ else:
+ if attname in opts.fields:
+ tfields = opts.fields[attname]
+ for tf in tfields:
+ yield tf.name, tf.language
+ return
+
+ yield attname, settings.LANGUAGE_CODE
+
+ def wikidata_populate_attribute(self, client, entity, attname, wd, save, force=False):
+ for fieldname, lang in self.wikidata_fields_for_attribute(attname):
+ self.wikidata_populate_field(client, entity, fieldname, wd, save, lang, force=force)
+
def save(self, **kwargs):
+ am_new = self.pk is None
+
super().save()
- if self.wikidata and hasattr(self, "Wikidata"):
- Wikidata = type(self).Wikidata
- client = Client()
- # Probably should getlist
- entity = client.get(self.wikidata)
- for attname in dir(Wikidata):
- if attname.startswith("_"):
- continue
- wd = getattr(Wikidata, attname)
-
- model_field = self._meta.get_field(attname)
- if isinstance(model_field, models.ManyToManyField):
- if getattr(self, attname).all().exists():
- continue
- else:
- if getattr(self, attname):
- continue
-
- wdvalue = None
- if wd == "description":
- wdvalue = entity.description.get("pl", str(entity.description))
- elif wd == "label":
- wdvalue = entity.label.get("pl", str(entity.label))
- else:
- try:
- wdvalue = entity.get(client.get(wd))
- except DatavalueError:
- pass
-
- self.set_field_from_wikidata(attname, wdvalue)
+ if am_new and self.wikidata and hasattr(self, "Wikidata"):
+ self.wikidata_populate()
kwargs.update(force_insert=False, force_update=True)
super().save(**kwargs)
- def set_field_from_wikidata(self, attname, wdvalue):
+ def set_field_from_wikidata(self, attname, wdvalue, save, language='pl'):
if not wdvalue:
return
# Find out what this model field is
model_field = self._meta.get_field(attname)
+ skip_set = False
if isinstance(model_field, models.ForeignKey):
rel_model = model_field.related_model
- if issubclass(rel_model, WikidataMixin):
- # welp, we can try and find by WD identifier.
- wdvalue, created = rel_model.objects.get_or_create(wikidata=wdvalue.id)
+ if issubclass(rel_model, WikidataModel):
+ label = wdvalue.label.get(language, str(wdvalue.label))
+ try:
+ wdvalue = rel_model.objects.get(wikidata=wdvalue.id)
+ except rel_model.DoesNotExist:
+ wdvalue = rel_model(wikidata=wdvalue.id)
+ if save:
+ wdvalue.save()
+ wdvalue._wikidata_label = label
setattr(self, attname, wdvalue)
elif isinstance(model_field, models.ManyToManyField):
rel_model = model_field.related_model
- if issubclass(rel_model, WikidataMixin):
- wdvalue, created = rel_model.objects.get_or_create(wikidata=wdvalue.id)
+ if issubclass(rel_model, WikidataModel):
+ label = wdvalue.label.get(language, str(wdvalue.label))
+ try:
+ wdvalue = rel_model.objects.get(wikidata=wdvalue.id)
+ except rel_model.DoesNotExist:
+ wdvalue = rel_model(wikidata=wdvalue.id)
+ if save:
+ wdvalue.save()
+ wdvalue._wikidata_label = label
getattr(self, attname).set([wdvalue])
else:
# How to get original title?
if isinstance(wdvalue, date):
if isinstance(model_field, models.IntegerField):
wdvalue = wdvalue.year
- elif not isinstance(wdvalue, str):
- wdvalue = wdvalue.label.get("pl", str(wdvalue.label))
- setattr(self, attname, wdvalue)
+
+ # If downloadable (and not save)?
+ elif isinstance(wdvalue, Downloadable):
+ if save:
+ wdvalue.apply_to_field(self, attname)
+ skip_set = True
+
+ elif hasattr(wdvalue, 'label'):
+ wdvalue = wdvalue.label.get(language, str(wdvalue.label))
+
+ if not skip_set:
+ setattr(self, attname, wdvalue)
+
+ def wikidata_link(self):
+ if self.wikidata:
+ return format_html(
+ '{wd}',
+ wd=self.wikidata,
+ )
+ else:
+ return ""
+
+ wikidata_link.admin_order_field = "wikidata"
class WikidataAdminMixin:
+ class Media:
+ css = {"screen": ("catalogue/wikidata_admin.css",)}
+ js = ("catalogue/wikidata_admin.js",)
+
def save_related(self, request, form, formsets, change):
super().save_related(request, form, formsets, change)
form.instance.save()
-
- def wikidata_link(self, obj):
- if obj.wikidata:
- return format_html('{wd}', wd=obj.wikidata)
- else:
- return ''
- wikidata_link.admin_order_field = 'wikidata'