1 # This file is part of Librarian, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
4 from xml.parsers.expat import ExpatError
5 from datetime import date
9 from librarian.util import roman_to_int
11 from librarian import (ValidationError, NoDublinCore, ParseError, DCNS, RDFNS,
12 XMLNS, WLURI, WLNS, PLMETNS)
14 import lxml.etree as etree
15 from lxml.etree import XMLSyntaxError
17 from librarian.meta.types.bool import BoolValue
18 from librarian.meta.types.date import DateValue
19 from librarian.meta.types.person import Person
20 from librarian.meta.types.text import TextValue
24 def __init__(self, uri, attr_name, validator=TextValue, strict=None,
25 multiple=False, salias=None, **kwargs):
28 self.validator = validator
30 self.multiple = multiple
33 self.required = (kwargs.get('required', True)
34 and 'default' not in kwargs)
35 self.default = kwargs.get('default', [] if multiple else [None])
37 def validate_value(self, val, strict=False):
38 if strict and self.strict is not None:
39 validator = self.strict
41 validator = self.validator
52 if hasattr(v, 'lang'):
53 setattr(nv, 'lang', v.lang)
57 raise ValidationError(
58 "Multiple values not allowed for field '%s'" % self.uri
61 raise ValidationError(
62 "Field %s has no value to assign. Check your defaults."
66 if validator is None or val[0] is None:
68 #nv = validator(val[0])
70 if hasattr(val[0], 'lang') and not hasattr(validator, 'no_lang'):
71 setattr(nv, 'lang', val[0].lang)
73 except ValueError as e:
74 raise ValidationError(
75 "Field '%s' - invald value: %s"
79 def validate(self, fdict, fallbacks=None, strict=False, validate_required=True):
82 if self.uri not in fdict:
84 # Accept single value for single fields and saliases.
85 if self.name in fallbacks:
87 f = fallbacks[self.name]
89 f = [fallbacks[self.name]]
90 elif self.salias and self.salias in fallbacks:
91 f = [fallbacks[self.salias]]
94 elif validate_required:
95 raise ValidationError("Required field %s not found" % self.uri)
101 return self.validate_value(f, strict=strict)
103 def __eq__(self, other):
104 if isinstance(other, Field) and other.name == self.name:
110 def __new__(mcs, classname, bases, class_dict):
111 fields = list(class_dict['FIELDS'])
113 for base in bases[::-1]:
114 if hasattr(base, 'FIELDS'):
115 for field in base.FIELDS[::-1]:
119 fields.insert(0, field)
121 class_dict['FIELDS'] = tuple(fields)
122 return super(DCInfo, mcs).__new__(mcs, classname, bases, class_dict)
125 class WorkInfo(six.with_metaclass(DCInfo, object)):
127 Field(DCNS('creator'), 'authors', Person, salias='author',
129 Field(DCNS('title'), 'title'),
130 Field(DCNS('type'), 'type', required=False, multiple=True),
132 Field(DCNS('contributor.editor'), 'editors',
133 Person, salias='editor', multiple=True, required=False),
134 Field(DCNS('contributor.technical_editor'), 'technical_editors',
135 Person, salias='technical_editor', multiple=True,
137 Field(DCNS('contributor.funding'), 'funders', salias='funder',
138 multiple=True, required=False),
139 Field(DCNS('contributor.thanks'), 'thanks', required=False),
141 Field(DCNS('date'), 'created_at'),
142 Field(DCNS('date.pd'), 'released_to_public_domain_at', DateValue,
144 Field(DCNS('publisher'), 'publisher', multiple=True),
146 Field(DCNS('language'), 'language'),
147 Field(DCNS('description'), 'description', required=False),
149 Field(DCNS('source'), 'source_name', required=False),
150 Field(DCNS('source.URL'), 'source_urls', salias='source_url',
151 multiple=True, required=False),
152 Field(DCNS('identifier.url'), 'url', WLURI),
153 Field(DCNS('rights.license'), 'license', required=False),
154 Field(DCNS('rights'), 'license_description'),
156 Field(PLMETNS('digitisationSponsor'), 'sponsors', multiple=True,
158 Field(WLNS('digitisationSponsorNote'), 'sponsor_note', required=False),
159 Field(WLNS('contentWarning'), 'content_warnings', multiple=True,
161 Field(WLNS('developmentStage'), 'stage', required=False),
165 def get_field_by_uri(cls, uri):
171 def from_bytes(cls, xml, *args, **kwargs):
172 return cls.from_file(six.BytesIO(xml), *args, **kwargs)
175 def from_file(cls, xmlfile, *args, **kwargs):
178 iter = etree.iterparse(xmlfile, ['start', 'end'])
179 for (event, element) in iter:
180 if element.tag == RDFNS('RDF') and event == 'start':
185 raise NoDublinCore("DublinCore section not found. \
186 Check if there are rdf:RDF and rdf:Description tags.")
188 # continue 'till the end of RDF section
189 for (event, element) in iter:
190 if element.tag == RDFNS('RDF') and event == 'end':
193 # if there is no end, Expat should yell at us with an ExpatError
195 # extract data from the element and make the info
196 return cls.from_element(desc_tag, *args, **kwargs)
197 except XMLSyntaxError as e:
199 except ExpatError as e:
203 def from_element(cls, rdf_tag, *args, **kwargs):
204 # The tree is already parsed,
205 # so we don't need to worry about Expat errors.
207 desc = rdf_tag.find(".//" + RDFNS('Description'))
211 "There must be a '%s' element inside the RDF."
212 % RDFNS('Description')
217 while p is not None and lang is None:
218 lang = p.attrib.get(XMLNS('lang'))
221 for e in desc.getchildren():
222 field = cls.get_field_by_uri(e.tag)
224 # Ignore unknown fields.
225 ### TODO: does it do <meta> for isbn?
228 fv = field_dict.get(e.tag, [])
229 if e.text is not None:
230 val = field.validator(e.text)
231 val.lang = e.attrib.get(XMLNS('lang'), lang)
234 meta_id = e.attrib.get('id')
235 if meta_id and meta_id.endswith('-id'):
236 field_dict[meta_id] = [val.replace('ISBN-', 'ISBN ')]
240 field_dict[e.tag] = fv
242 return cls(desc.attrib, field_dict, *args, **kwargs)
244 def __init__(self, rdf_attrs, dc_fields, fallbacks=None, strict=False, validate_required=True):
246 rdf_attrs should be a dictionary-like object with any attributes
247 of the RDF:Description.
248 dc_fields - dictionary mapping DC fields (with namespace) to
249 list of text values for the given field.
252 self.about = rdf_attrs.get(RDFNS('about'))
255 for field in self.FIELDS:
256 value = field.validate(dc_fields, fallbacks=fallbacks,
257 strict=strict, validate_required=validate_required)
258 setattr(self, 'prop_' + field.name, value)
259 self.fmap[field.name] = field
261 self.fmap[field.salias] = field
263 def __getattribute__(self, name):
265 field = object.__getattribute__(self, 'fmap')[name]
266 value = object.__getattribute__(self, 'prop_'+field.name)
267 if field.name == name:
269 else: # singular alias
270 if not field.multiple:
271 raise "OUCH!! for field %s" % name
273 return value[0] if value else None
274 except (KeyError, AttributeError):
275 return object.__getattribute__(self, name)
277 def __setattr__(self, name, newvalue):
279 field = object.__getattribute__(self, 'fmap')[name]
280 if field.name == name:
281 object.__setattr__(self, 'prop_'+field.name, newvalue)
282 else: # singular alias
283 if not field.multiple:
284 raise "OUCH! while setting field %s" % name
286 object.__setattr__(self, 'prop_'+field.name, [newvalue])
287 except (KeyError, AttributeError):
288 return object.__setattr__(self, name, newvalue)
290 def update(self, field_dict):
292 Update using field_dict. Verify correctness, but don't check
293 if all required fields are present.
295 for field in self.FIELDS:
296 if field.name in field_dict:
297 setattr(self, field.name, field_dict[field.name])
299 def to_etree(self, parent=None):
300 """XML representation of this object."""
301 # etree._namespace_map[str(self.RDF)] = 'rdf'
302 # etree._namespace_map[str(self.DC)] = 'dc'
305 root = etree.Element(RDFNS('RDF'))
307 root = parent.makeelement(RDFNS('RDF'))
309 description = etree.SubElement(root, RDFNS('Description'))
312 description.set(RDFNS('about'), self.about)
314 for field in self.FIELDS:
315 v = getattr(self, field.name, None)
321 e = etree.Element(field.uri)
323 e.text = six.text_type(x)
324 description.append(e)
326 e = etree.Element(field.uri)
327 e.text = six.text_type(v)
328 description.append(e)
333 rdf = {'about': {'uri': RDFNS('about'), 'value': self.about}}
336 for field in self.FIELDS:
337 v = getattr(self, field.name, None)
342 v = [six.text_type(x) for x in v if x is not None]
346 dc[field.name] = {'uri': field.uri, 'value': v}
351 result = {'about': self.about}
352 for field in self.FIELDS:
353 v = getattr(self, field.name, None)
359 v = [six.text_type(x) for x in v if x is not None]
362 result[field.name] = v
365 v = getattr(self, field.salias)
367 result[field.salias] = six.text_type(v)
372 class BookInfo(WorkInfo):
374 Field(DCNS('audience'), 'audiences', salias='audience', multiple=True,
377 Field(DCNS('subject.period'), 'epochs', salias='epoch', multiple=True,
379 Field(DCNS('subject.type'), 'kinds', salias='kind', multiple=True,
381 Field(DCNS('subject.genre'), 'genres', salias='genre', multiple=True,
383 Field(WLNS('category.legimi'), 'legimi', required=False),
385 Field(DCNS('subject.location'), 'location', required=False),
387 Field(DCNS('contributor.translator'), 'translators',
388 Person, salias='translator', multiple=True, required=False),
389 Field(DCNS('relation.hasPart'), 'parts', WLURI,
390 multiple=True, required=False),
391 Field(DCNS('relation.isVariantOf'), 'variant_of', WLURI,
394 Field(DCNS('relation.coverImage.url'), 'cover_url', required=False),
395 Field(DCNS('relation.coverImage.attribution'), 'cover_by',
397 Field(DCNS('relation.coverImage.source'), 'cover_source',
400 Field(WLNS('coverBarColor'), 'cover_bar_color', required=False),
401 Field(WLNS('coverBoxPosition'), 'cover_box_position', required=False),
402 Field(WLNS('coverClass'), 'cover_class', default=['default']),
403 Field(WLNS('coverLogoUrl'), 'cover_logo_urls', multiple=True,
405 Field(WLNS('endnotes'), 'endnotes', BoolValue,
408 Field('pdf-id', 'isbn_pdf', required=False),
409 Field('epub-id', 'isbn_epub', required=False),
410 Field('mobi-id', 'isbn_mobi', required=False),
411 Field('txt-id', 'isbn_txt', required=False),
412 Field('html-id', 'isbn_html', required=False),
416 def parse(file_name, cls=BookInfo):
417 return cls.from_file(file_name)