1 # This file is part of Librarian, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
4 from xml.parsers.expat import ExpatError
5 from datetime import date
9 from librarian.util import roman_to_int
11 from librarian import (ValidationError, NoDublinCore, ParseError, DCNS, RDFNS,
14 import lxml.etree as etree
15 from lxml.etree import XMLSyntaxError
17 from librarian.meta.types.bool import BoolValue
18 from librarian.meta.types.person import Person
19 from librarian.meta.types.wluri import WLURI
20 from librarian.meta.types import text
24 def __init__(self, uri, attr_name, value_type=text.TextValue,
25 multiple=False, salias=None, **kwargs):
28 self.value_type = value_type
29 self.multiple = multiple
32 self.required = (kwargs.get('required', True)
33 and 'default' not in kwargs)
34 self.default = kwargs.get('default', [] if multiple else [None])
36 def validate_value(self, val, strict=False):
44 raise ValidationError(
45 "Multiple values not allowed for field '%s'" % self.uri
48 raise ValidationError(
49 "Field %s has no value to assign. Check your defaults."
54 except ValueError as e:
55 raise ValidationError(
56 "Field '%s' - invald value: %s"
60 def validate(self, fdict, fallbacks=None, strict=False, validate_required=True):
63 if self.uri not in fdict:
65 # Accept single value for single fields and saliases.
66 if self.name in fallbacks:
68 f = fallbacks[self.name]
70 f = [fallbacks[self.name]]
71 elif self.salias and self.salias in fallbacks:
72 f = [fallbacks[self.salias]]
75 elif validate_required:
76 raise ValidationError("Required field %s not found" % self.uri)
82 return self.validate_value(f, strict=strict)
84 def __eq__(self, other):
85 if isinstance(other, Field) and other.name == self.name:
91 def __new__(mcs, classname, bases, class_dict):
92 fields = list(class_dict['FIELDS'])
94 for base in bases[::-1]:
95 if hasattr(base, 'FIELDS'):
96 for field in base.FIELDS[::-1]:
100 fields.insert(0, field)
102 class_dict['FIELDS'] = tuple(fields)
103 return super(DCInfo, mcs).__new__(mcs, classname, bases, class_dict)
106 class WorkInfo(six.with_metaclass(DCInfo, object)):
108 Field(DCNS('creator'), 'authors', Person, salias='author',
110 Field(DCNS('title'), 'title'),
111 Field(DCNS('type'), 'type', required=False, multiple=True),
113 Field(DCNS('contributor.editor'), 'editors',
114 Person, salias='editor', multiple=True, required=False),
115 Field(DCNS('contributor.technical_editor'), 'technical_editors',
116 Person, salias='technical_editor', multiple=True,
118 Field(DCNS('contributor.funding'), 'funders', salias='funder',
119 multiple=True, required=False),
120 Field(DCNS('contributor.thanks'), 'thanks', required=False),
122 Field(DCNS('date'), 'created_at'),
123 Field(DCNS('date.pd'), 'released_to_public_domain_at',
125 Field(DCNS('publisher'), 'publisher', multiple=True),
127 Field(DCNS('language'), 'language'),
128 Field(DCNS('description'), 'description', required=False),
130 Field(DCNS('source'), 'source_name', required=False),
131 Field(DCNS('source.URL'), 'source_urls', salias='source_url',
132 multiple=True, required=False),
133 Field(DCNS('identifier.url'), 'url', WLURI),
134 Field(DCNS('rights.license'), 'license', required=False),
135 Field(DCNS('rights'), 'license_description'),
137 Field(PLMETNS('digitisationSponsor'), 'sponsors', multiple=True,
139 Field(WLNS('digitisationSponsorNote'), 'sponsor_note', required=False),
140 Field(WLNS('contentWarning'), 'content_warnings', multiple=True,
142 Field(WLNS('developmentStage'), 'stage', required=False),
146 def get_field_by_uri(cls, uri):
152 def from_bytes(cls, xml, *args, **kwargs):
153 return cls.from_file(six.BytesIO(xml), *args, **kwargs)
156 def from_file(cls, xmlfile, *args, **kwargs):
159 iter = etree.iterparse(xmlfile, ['start', 'end'])
160 for (event, element) in iter:
161 if element.tag == RDFNS('RDF') and event == 'start':
166 raise NoDublinCore("DublinCore section not found. \
167 Check if there are rdf:RDF and rdf:Description tags.")
169 # continue 'till the end of RDF section
170 for (event, element) in iter:
171 if element.tag == RDFNS('RDF') and event == 'end':
174 # if there is no end, Expat should yell at us with an ExpatError
176 # extract data from the element and make the info
177 return cls.from_element(desc_tag, *args, **kwargs)
178 except XMLSyntaxError as e:
180 except ExpatError as e:
184 def from_element(cls, rdf_tag, *args, **kwargs):
185 # The tree is already parsed,
186 # so we don't need to worry about Expat errors.
188 desc = rdf_tag.find(".//" + RDFNS('Description'))
192 "There must be a '%s' element inside the RDF."
193 % RDFNS('Description')
198 while p is not None and lang is None:
199 lang = p.attrib.get(XMLNS('lang'))
202 for e in desc.getchildren():
205 meta_id = e.attrib.get('id')
206 if meta_id and meta_id.endswith('-id'):
209 field = cls.get_field_by_uri(tag)
211 # Ignore unknown fields.
214 fv = field_dict.get(tag, [])
215 if e.text is not None:
216 val = field.value_type.from_text(e.text)
217 val.lang = e.attrib.get(XMLNS('lang'), lang)
223 return cls(desc.attrib, field_dict, *args, **kwargs)
225 def __init__(self, rdf_attrs, dc_fields, fallbacks=None, strict=False, validate_required=True):
227 rdf_attrs should be a dictionary-like object with any attributes
228 of the RDF:Description.
229 dc_fields - dictionary mapping DC fields (with namespace) to
230 list of text values for the given field.
233 self.about = rdf_attrs.get(RDFNS('about'))
236 for field in self.FIELDS:
237 value = field.validate(dc_fields, fallbacks=fallbacks,
238 strict=strict, validate_required=validate_required)
239 setattr(self, 'prop_' + field.name, value)
240 self.fmap[field.name] = field
242 self.fmap[field.salias] = field
244 def __getattribute__(self, name):
246 field = object.__getattribute__(self, 'fmap')[name]
247 value = object.__getattribute__(self, 'prop_'+field.name)
248 if field.name == name:
250 else: # singular alias
251 if not field.multiple:
252 raise "OUCH!! for field %s" % name
254 return value[0] if value else None
255 except (KeyError, AttributeError):
256 return object.__getattribute__(self, name)
258 def __setattr__(self, name, newvalue):
260 field = object.__getattribute__(self, 'fmap')[name]
261 if field.name == name:
262 object.__setattr__(self, 'prop_'+field.name, newvalue)
263 else: # singular alias
264 if not field.multiple:
265 raise "OUCH! while setting field %s" % name
267 object.__setattr__(self, 'prop_'+field.name, [newvalue])
268 except (KeyError, AttributeError):
269 return object.__setattr__(self, name, newvalue)
271 def update(self, field_dict):
273 Update using field_dict. Verify correctness, but don't check
274 if all required fields are present.
276 for field in self.FIELDS:
277 if field.name in field_dict:
278 setattr(self, field.name, field_dict[field.name])
280 def to_etree(self, parent=None):
281 """XML representation of this object."""
282 # etree._namespace_map[str(self.RDF)] = 'rdf'
283 # etree._namespace_map[str(self.DC)] = 'dc'
286 root = etree.Element(RDFNS('RDF'))
288 root = parent.makeelement(RDFNS('RDF'))
290 description = etree.SubElement(root, RDFNS('Description'))
293 description.set(RDFNS('about'), self.about)
295 for field in self.FIELDS:
296 v = getattr(self, field.name, None)
302 e = etree.Element(field.uri)
304 e.text = six.text_type(x)
305 description.append(e)
307 e = etree.Element(field.uri)
308 e.text = six.text_type(v)
309 description.append(e)
314 rdf = {'about': {'uri': RDFNS('about'), 'value': self.about}}
317 for field in self.FIELDS:
318 v = getattr(self, field.name, None)
323 v = [six.text_type(x) for x in v if x is not None]
327 dc[field.name] = {'uri': field.uri, 'value': v}
332 result = {'about': self.about}
333 for field in self.FIELDS:
334 v = getattr(self, field.name, None)
340 v = [six.text_type(x) for x in v if x is not None]
343 result[field.name] = v
346 v = getattr(self, field.salias)
348 result[field.salias] = six.text_type(v)
353 class BookInfo(WorkInfo):
355 Field(DCNS('audience'), 'audiences', text.Audience, salias='audience', multiple=True,
358 Field(DCNS('subject.period'), 'epochs', text.Epoch, salias='epoch', multiple=True,
360 Field(DCNS('subject.type'), 'kinds', text.Kind, salias='kind', multiple=True,
362 Field(DCNS('subject.genre'), 'genres', text.Genre, salias='genre', multiple=True,
364 Field(WLNS('category.legimi'), 'legimi', text.LegimiCategory, required=False),
365 Field(WLNS('category.thema'), 'thema', text.ThemaCategory, required=False, multiple=True),
366 Field(DCNS('subject.location'), 'location', required=False),
368 Field(DCNS('contributor.translator'), 'translators',
369 Person, salias='translator', multiple=True, required=False),
370 Field(DCNS('relation.hasPart'), 'parts', WLURI,
371 multiple=True, required=False),
372 Field(DCNS('relation.isVariantOf'), 'variant_of', WLURI,
375 Field(DCNS('relation.coverImage.url'), 'cover_url', required=False),
376 Field(DCNS('relation.coverImage.attribution'), 'cover_by',
378 Field(DCNS('relation.coverImage.source'), 'cover_source',
381 Field(WLNS('coverBarColor'), 'cover_bar_color', required=False),
382 Field(WLNS('coverBoxPosition'), 'cover_box_position', required=False),
383 Field(WLNS('coverClass'), 'cover_class', default=['default']),
384 Field(WLNS('coverLogoUrl'), 'cover_logo_urls', multiple=True,
386 Field(WLNS('endnotes'), 'endnotes', BoolValue,
389 Field('pdf-id', 'isbn_pdf', required=False),
390 Field('epub-id', 'isbn_epub', required=False),
391 Field('mobi-id', 'isbn_mobi', required=False),
392 Field('txt-id', 'isbn_txt', required=False),
393 Field('html-id', 'isbn_html', required=False),
397 def parse(file_name, cls=BookInfo):
398 return cls.from_file(file_name)