1 # This file is part of Librarian, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Nowoczesna Polska. See NOTICE for more information.
4 from xml.parsers.expat import ExpatError
5 from datetime import date
9 from librarian.util import roman_to_int
11 from librarian import (ValidationError, NoDublinCore, ParseError, DCNS, RDFNS,
14 import lxml.etree as etree
15 from lxml.etree import XMLSyntaxError
17 from librarian.meta.types.bool import BoolValue
18 from librarian.meta.types.date import DateValue
19 from librarian.meta.types.person import Person
20 from librarian.meta.types.wluri import WLURI
21 from librarian.meta.types import text
25 def __init__(self, uri, attr_name, value_type=text.TextValue,
26 multiple=False, salias=None, **kwargs):
29 self.value_type = value_type
30 self.multiple = multiple
33 self.required = (kwargs.get('required', True)
34 and 'default' not in kwargs)
35 self.default = kwargs.get('default', [] if multiple else [None])
37 def validate_value(self, val, strict=False):
45 raise ValidationError(
46 "Multiple values not allowed for field '%s'" % self.uri
49 raise ValidationError(
50 "Field %s has no value to assign. Check your defaults."
55 except ValueError as e:
56 raise ValidationError(
57 "Field '%s' - invald value: %s"
61 def validate(self, fdict, fallbacks=None, strict=False, validate_required=True):
64 if self.uri not in fdict:
66 # Accept single value for single fields and saliases.
67 if self.name in fallbacks:
69 f = fallbacks[self.name]
71 f = [fallbacks[self.name]]
72 elif self.salias and self.salias in fallbacks:
73 f = [fallbacks[self.salias]]
76 elif validate_required:
77 raise ValidationError("Required field %s not found" % self.uri)
83 return self.validate_value(f, strict=strict)
85 def __eq__(self, other):
86 if isinstance(other, Field) and other.name == self.name:
92 def __new__(mcs, classname, bases, class_dict):
93 fields = list(class_dict['FIELDS'])
95 for base in bases[::-1]:
96 if hasattr(base, 'FIELDS'):
97 for field in base.FIELDS[::-1]:
101 fields.insert(0, field)
103 class_dict['FIELDS'] = tuple(fields)
104 return super(DCInfo, mcs).__new__(mcs, classname, bases, class_dict)
107 class WorkInfo(six.with_metaclass(DCInfo, object)):
109 Field(DCNS('creator'), 'authors', Person, salias='author',
111 Field(DCNS('title'), 'title'),
112 Field(DCNS('type'), 'type', required=False, multiple=True),
114 Field(DCNS('contributor.editor'), 'editors',
115 Person, salias='editor', multiple=True, required=False),
116 Field(DCNS('contributor.technical_editor'), 'technical_editors',
117 Person, salias='technical_editor', multiple=True,
119 Field(DCNS('contributor.funding'), 'funders', salias='funder',
120 multiple=True, required=False),
121 Field(DCNS('contributor.thanks'), 'thanks', required=False),
123 Field(DCNS('date'), 'created_at'),
124 Field(DCNS('date.pd'), 'released_to_public_domain_at', DateValue,
126 Field(DCNS('publisher'), 'publisher', multiple=True),
128 Field(DCNS('language'), 'language'),
129 Field(DCNS('description'), 'description', required=False),
131 Field(DCNS('source'), 'source_name', required=False),
132 Field(DCNS('source.URL'), 'source_urls', salias='source_url',
133 multiple=True, required=False),
134 Field(DCNS('identifier.url'), 'url', WLURI),
135 Field(DCNS('rights.license'), 'license', required=False),
136 Field(DCNS('rights'), 'license_description'),
138 Field(PLMETNS('digitisationSponsor'), 'sponsors', multiple=True,
140 Field(WLNS('digitisationSponsorNote'), 'sponsor_note', required=False),
141 Field(WLNS('contentWarning'), 'content_warnings', multiple=True,
143 Field(WLNS('developmentStage'), 'stage', required=False),
147 def get_field_by_uri(cls, uri):
153 def from_bytes(cls, xml, *args, **kwargs):
154 return cls.from_file(six.BytesIO(xml), *args, **kwargs)
157 def from_file(cls, xmlfile, *args, **kwargs):
160 iter = etree.iterparse(xmlfile, ['start', 'end'])
161 for (event, element) in iter:
162 if element.tag == RDFNS('RDF') and event == 'start':
167 raise NoDublinCore("DublinCore section not found. \
168 Check if there are rdf:RDF and rdf:Description tags.")
170 # continue 'till the end of RDF section
171 for (event, element) in iter:
172 if element.tag == RDFNS('RDF') and event == 'end':
175 # if there is no end, Expat should yell at us with an ExpatError
177 # extract data from the element and make the info
178 return cls.from_element(desc_tag, *args, **kwargs)
179 except XMLSyntaxError as e:
181 except ExpatError as e:
185 def from_element(cls, rdf_tag, *args, **kwargs):
186 # The tree is already parsed,
187 # so we don't need to worry about Expat errors.
189 desc = rdf_tag.find(".//" + RDFNS('Description'))
193 "There must be a '%s' element inside the RDF."
194 % RDFNS('Description')
199 while p is not None and lang is None:
200 lang = p.attrib.get(XMLNS('lang'))
203 for e in desc.getchildren():
206 meta_id = e.attrib.get('id')
207 if meta_id and meta_id.endswith('-id'):
210 field = cls.get_field_by_uri(tag)
212 # Ignore unknown fields.
215 fv = field_dict.get(tag, [])
216 if e.text is not None:
217 val = field.value_type.from_text(e.text)
218 val.lang = e.attrib.get(XMLNS('lang'), lang)
224 return cls(desc.attrib, field_dict, *args, **kwargs)
226 def __init__(self, rdf_attrs, dc_fields, fallbacks=None, strict=False, validate_required=True):
228 rdf_attrs should be a dictionary-like object with any attributes
229 of the RDF:Description.
230 dc_fields - dictionary mapping DC fields (with namespace) to
231 list of text values for the given field.
234 self.about = rdf_attrs.get(RDFNS('about'))
237 for field in self.FIELDS:
238 value = field.validate(dc_fields, fallbacks=fallbacks,
239 strict=strict, validate_required=validate_required)
240 setattr(self, 'prop_' + field.name, value)
241 self.fmap[field.name] = field
243 self.fmap[field.salias] = field
245 def __getattribute__(self, name):
247 field = object.__getattribute__(self, 'fmap')[name]
248 value = object.__getattribute__(self, 'prop_'+field.name)
249 if field.name == name:
251 else: # singular alias
252 if not field.multiple:
253 raise "OUCH!! for field %s" % name
255 return value[0] if value else None
256 except (KeyError, AttributeError):
257 return object.__getattribute__(self, name)
259 def __setattr__(self, name, newvalue):
261 field = object.__getattribute__(self, 'fmap')[name]
262 if field.name == name:
263 object.__setattr__(self, 'prop_'+field.name, newvalue)
264 else: # singular alias
265 if not field.multiple:
266 raise "OUCH! while setting field %s" % name
268 object.__setattr__(self, 'prop_'+field.name, [newvalue])
269 except (KeyError, AttributeError):
270 return object.__setattr__(self, name, newvalue)
272 def update(self, field_dict):
274 Update using field_dict. Verify correctness, but don't check
275 if all required fields are present.
277 for field in self.FIELDS:
278 if field.name in field_dict:
279 setattr(self, field.name, field_dict[field.name])
281 def to_etree(self, parent=None):
282 """XML representation of this object."""
283 # etree._namespace_map[str(self.RDF)] = 'rdf'
284 # etree._namespace_map[str(self.DC)] = 'dc'
287 root = etree.Element(RDFNS('RDF'))
289 root = parent.makeelement(RDFNS('RDF'))
291 description = etree.SubElement(root, RDFNS('Description'))
294 description.set(RDFNS('about'), self.about)
296 for field in self.FIELDS:
297 v = getattr(self, field.name, None)
303 e = etree.Element(field.uri)
305 e.text = six.text_type(x)
306 description.append(e)
308 e = etree.Element(field.uri)
309 e.text = six.text_type(v)
310 description.append(e)
315 rdf = {'about': {'uri': RDFNS('about'), 'value': self.about}}
318 for field in self.FIELDS:
319 v = getattr(self, field.name, None)
324 v = [six.text_type(x) for x in v if x is not None]
328 dc[field.name] = {'uri': field.uri, 'value': v}
333 result = {'about': self.about}
334 for field in self.FIELDS:
335 v = getattr(self, field.name, None)
341 v = [six.text_type(x) for x in v if x is not None]
344 result[field.name] = v
347 v = getattr(self, field.salias)
349 result[field.salias] = six.text_type(v)
354 class BookInfo(WorkInfo):
356 Field(DCNS('audience'), 'audiences', text.Audience, salias='audience', multiple=True,
359 Field(DCNS('subject.period'), 'epochs', text.Epoch, salias='epoch', multiple=True,
361 Field(DCNS('subject.type'), 'kinds', text.Kind, salias='kind', multiple=True,
363 Field(DCNS('subject.genre'), 'genres', text.Genre, salias='genre', multiple=True,
365 Field(WLNS('category.legimi'), 'legimi', text.LegimiCategory, required=False),
367 Field(DCNS('subject.location'), 'location', required=False),
369 Field(DCNS('contributor.translator'), 'translators',
370 Person, salias='translator', multiple=True, required=False),
371 Field(DCNS('relation.hasPart'), 'parts', WLURI,
372 multiple=True, required=False),
373 Field(DCNS('relation.isVariantOf'), 'variant_of', WLURI,
376 Field(DCNS('relation.coverImage.url'), 'cover_url', required=False),
377 Field(DCNS('relation.coverImage.attribution'), 'cover_by',
379 Field(DCNS('relation.coverImage.source'), 'cover_source',
382 Field(WLNS('coverBarColor'), 'cover_bar_color', required=False),
383 Field(WLNS('coverBoxPosition'), 'cover_box_position', required=False),
384 Field(WLNS('coverClass'), 'cover_class', default=['default']),
385 Field(WLNS('coverLogoUrl'), 'cover_logo_urls', multiple=True,
387 Field(WLNS('endnotes'), 'endnotes', BoolValue,
390 Field('pdf-id', 'isbn_pdf', required=False),
391 Field('epub-id', 'isbn_epub', required=False),
392 Field('mobi-id', 'isbn_mobi', required=False),
393 Field('txt-id', 'isbn_txt', required=False),
394 Field('html-id', 'isbn_html', required=False),
398 def parse(file_name, cls=BookInfo):
399 return cls.from_file(file_name)