1 # This file is part of Librarian, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
4 from xml.parsers.expat import ExpatError
5 from datetime import date
9 from librarian.util import roman_to_int
11 from librarian import (ValidationError, NoDublinCore, ParseError, DCNS, RDFNS,
14 import lxml.etree as etree
15 from lxml.etree import XMLSyntaxError
17 from librarian.meta.types.bool import BoolValue
18 from librarian.meta.types.date import DateValue
19 from librarian.meta.types.person import Person
20 from librarian.meta.types.wluri import WLURI
21 from librarian.meta.types.text import TextValue
25 def __init__(self, uri, attr_name, value_type=TextValue,
26 multiple=False, salias=None, **kwargs):
29 self.value_type = value_type
30 self.multiple = multiple
33 self.required = (kwargs.get('required', True)
34 and 'default' not in kwargs)
35 self.default = kwargs.get('default', [] if multiple else [None])
37 def validate_value(self, val, strict=False):
45 raise ValidationError(
46 "Multiple values not allowed for field '%s'" % self.uri
49 raise ValidationError(
50 "Field %s has no value to assign. Check your defaults."
55 except ValueError as e:
56 raise ValidationError(
57 "Field '%s' - invald value: %s"
61 def validate(self, fdict, fallbacks=None, strict=False, validate_required=True):
64 if self.uri not in fdict:
66 # Accept single value for single fields and saliases.
67 if self.name in fallbacks:
69 f = fallbacks[self.name]
71 f = [fallbacks[self.name]]
72 elif self.salias and self.salias in fallbacks:
73 f = [fallbacks[self.salias]]
76 elif validate_required:
77 raise ValidationError("Required field %s not found" % self.uri)
83 return self.validate_value(f, strict=strict)
85 def __eq__(self, other):
86 if isinstance(other, Field) and other.name == self.name:
92 def __new__(mcs, classname, bases, class_dict):
93 fields = list(class_dict['FIELDS'])
95 for base in bases[::-1]:
96 if hasattr(base, 'FIELDS'):
97 for field in base.FIELDS[::-1]:
101 fields.insert(0, field)
103 class_dict['FIELDS'] = tuple(fields)
104 return super(DCInfo, mcs).__new__(mcs, classname, bases, class_dict)
107 class WorkInfo(six.with_metaclass(DCInfo, object)):
109 Field(DCNS('creator'), 'authors', Person, salias='author',
111 Field(DCNS('title'), 'title'),
112 Field(DCNS('type'), 'type', required=False, multiple=True),
114 Field(DCNS('contributor.editor'), 'editors',
115 Person, salias='editor', multiple=True, required=False),
116 Field(DCNS('contributor.technical_editor'), 'technical_editors',
117 Person, salias='technical_editor', multiple=True,
119 Field(DCNS('contributor.funding'), 'funders', salias='funder',
120 multiple=True, required=False),
121 Field(DCNS('contributor.thanks'), 'thanks', required=False),
123 Field(DCNS('date'), 'created_at'),
124 Field(DCNS('date.pd'), 'released_to_public_domain_at', DateValue,
126 Field(DCNS('publisher'), 'publisher', multiple=True),
128 Field(DCNS('language'), 'language'),
129 Field(DCNS('description'), 'description', required=False),
131 Field(DCNS('source'), 'source_name', required=False),
132 Field(DCNS('source.URL'), 'source_urls', salias='source_url',
133 multiple=True, required=False),
134 Field(DCNS('identifier.url'), 'url', WLURI),
135 Field(DCNS('rights.license'), 'license', required=False),
136 Field(DCNS('rights'), 'license_description'),
138 Field(PLMETNS('digitisationSponsor'), 'sponsors', multiple=True,
140 Field(WLNS('digitisationSponsorNote'), 'sponsor_note', required=False),
141 Field(WLNS('contentWarning'), 'content_warnings', multiple=True,
143 Field(WLNS('developmentStage'), 'stage', required=False),
147 def get_field_by_uri(cls, uri):
153 def from_bytes(cls, xml, *args, **kwargs):
154 return cls.from_file(six.BytesIO(xml), *args, **kwargs)
157 def from_file(cls, xmlfile, *args, **kwargs):
160 iter = etree.iterparse(xmlfile, ['start', 'end'])
161 for (event, element) in iter:
162 if element.tag == RDFNS('RDF') and event == 'start':
167 raise NoDublinCore("DublinCore section not found. \
168 Check if there are rdf:RDF and rdf:Description tags.")
170 # continue 'till the end of RDF section
171 for (event, element) in iter:
172 if element.tag == RDFNS('RDF') and event == 'end':
175 # if there is no end, Expat should yell at us with an ExpatError
177 # extract data from the element and make the info
178 return cls.from_element(desc_tag, *args, **kwargs)
179 except XMLSyntaxError as e:
181 except ExpatError as e:
185 def from_element(cls, rdf_tag, *args, **kwargs):
186 # The tree is already parsed,
187 # so we don't need to worry about Expat errors.
189 desc = rdf_tag.find(".//" + RDFNS('Description'))
193 "There must be a '%s' element inside the RDF."
194 % RDFNS('Description')
199 while p is not None and lang is None:
200 lang = p.attrib.get(XMLNS('lang'))
203 for e in desc.getchildren():
206 meta_id = e.attrib.get('id')
207 if meta_id and meta_id.endswith('-id'):
210 field = cls.get_field_by_uri(tag)
212 # Ignore unknown fields.
215 fv = field_dict.get(tag, [])
216 if e.text is not None:
217 val = field.value_type.from_text(e.text)
218 val.lang = e.attrib.get(XMLNS('lang'), lang)
224 return cls(desc.attrib, field_dict, *args, **kwargs)
226 def __init__(self, rdf_attrs, dc_fields, fallbacks=None, strict=False, validate_required=True):
228 rdf_attrs should be a dictionary-like object with any attributes
229 of the RDF:Description.
230 dc_fields - dictionary mapping DC fields (with namespace) to
231 list of text values for the given field.
234 self.about = rdf_attrs.get(RDFNS('about'))
237 for field in self.FIELDS:
238 value = field.validate(dc_fields, fallbacks=fallbacks,
239 strict=strict, validate_required=validate_required)
240 print(field.name, value)
241 setattr(self, 'prop_' + field.name, value)
242 self.fmap[field.name] = field
244 self.fmap[field.salias] = field
246 def __getattribute__(self, name):
248 field = object.__getattribute__(self, 'fmap')[name]
249 value = object.__getattribute__(self, 'prop_'+field.name)
250 if field.name == name:
252 else: # singular alias
253 if not field.multiple:
254 raise "OUCH!! for field %s" % name
256 return value[0] if value else None
257 except (KeyError, AttributeError):
258 return object.__getattribute__(self, name)
260 def __setattr__(self, name, newvalue):
262 field = object.__getattribute__(self, 'fmap')[name]
263 if field.name == name:
264 object.__setattr__(self, 'prop_'+field.name, newvalue)
265 else: # singular alias
266 if not field.multiple:
267 raise "OUCH! while setting field %s" % name
269 object.__setattr__(self, 'prop_'+field.name, [newvalue])
270 except (KeyError, AttributeError):
271 return object.__setattr__(self, name, newvalue)
273 def update(self, field_dict):
275 Update using field_dict. Verify correctness, but don't check
276 if all required fields are present.
278 for field in self.FIELDS:
279 if field.name in field_dict:
280 setattr(self, field.name, field_dict[field.name])
282 def to_etree(self, parent=None):
283 """XML representation of this object."""
284 # etree._namespace_map[str(self.RDF)] = 'rdf'
285 # etree._namespace_map[str(self.DC)] = 'dc'
288 root = etree.Element(RDFNS('RDF'))
290 root = parent.makeelement(RDFNS('RDF'))
292 description = etree.SubElement(root, RDFNS('Description'))
295 description.set(RDFNS('about'), self.about)
297 for field in self.FIELDS:
298 v = getattr(self, field.name, None)
304 e = etree.Element(field.uri)
306 e.text = six.text_type(x)
307 description.append(e)
309 e = etree.Element(field.uri)
310 e.text = six.text_type(v)
311 description.append(e)
316 rdf = {'about': {'uri': RDFNS('about'), 'value': self.about}}
319 for field in self.FIELDS:
320 v = getattr(self, field.name, None)
325 v = [six.text_type(x) for x in v if x is not None]
329 dc[field.name] = {'uri': field.uri, 'value': v}
334 result = {'about': self.about}
335 for field in self.FIELDS:
336 v = getattr(self, field.name, None)
342 v = [six.text_type(x) for x in v if x is not None]
345 result[field.name] = v
348 v = getattr(self, field.salias)
350 result[field.salias] = six.text_type(v)
355 class BookInfo(WorkInfo):
357 Field(DCNS('audience'), 'audiences', salias='audience', multiple=True,
360 Field(DCNS('subject.period'), 'epochs', salias='epoch', multiple=True,
362 Field(DCNS('subject.type'), 'kinds', salias='kind', multiple=True,
364 Field(DCNS('subject.genre'), 'genres', salias='genre', multiple=True,
366 Field(WLNS('category.legimi'), 'legimi', required=False),
368 Field(DCNS('subject.location'), 'location', required=False),
370 Field(DCNS('contributor.translator'), 'translators',
371 Person, salias='translator', multiple=True, required=False),
372 Field(DCNS('relation.hasPart'), 'parts', WLURI,
373 multiple=True, required=False),
374 Field(DCNS('relation.isVariantOf'), 'variant_of', WLURI,
377 Field(DCNS('relation.coverImage.url'), 'cover_url', required=False),
378 Field(DCNS('relation.coverImage.attribution'), 'cover_by',
380 Field(DCNS('relation.coverImage.source'), 'cover_source',
383 Field(WLNS('coverBarColor'), 'cover_bar_color', required=False),
384 Field(WLNS('coverBoxPosition'), 'cover_box_position', required=False),
385 Field(WLNS('coverClass'), 'cover_class', default=['default']),
386 Field(WLNS('coverLogoUrl'), 'cover_logo_urls', multiple=True,
388 Field(WLNS('endnotes'), 'endnotes', BoolValue,
391 Field('pdf-id', 'isbn_pdf', required=False),
392 Field('epub-id', 'isbn_epub', required=False),
393 Field('mobi-id', 'isbn_mobi', required=False),
394 Field('txt-id', 'isbn_txt', required=False),
395 Field('html-id', 'isbn_html', required=False),
399 def parse(file_name, cls=BookInfo):
400 return cls.from_file(file_name)