1 # This file is part of Librarian, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Wolne Lektury. See NOTICE for more information.
4 from xml.parsers.expat import ExpatError
5 from datetime import date
9 from librarian.util import roman_to_int
11 from librarian import (ValidationError, NoDublinCore, ParseError, DCNS, RDFNS,
14 import lxml.etree as etree
15 from lxml.etree import XMLSyntaxError
17 from librarian.meta.types.bool import BoolValue
18 from librarian.meta.types.person import Person
19 from librarian.meta.types.wluri import WLURI
20 from librarian.meta.types import text
24 def __init__(self, uri, attr_name, value_type=text.TextValue,
25 multiple=False, salias=None, **kwargs):
28 self.value_type = value_type
29 self.multiple = multiple
32 self.required = (kwargs.get('required', True)
33 and 'default' not in kwargs)
34 self.default = kwargs.get('default', [] if multiple else [None])
36 def validate_value(self, val, strict=False):
44 raise ValidationError(
45 "Multiple values not allowed for field '%s'" % self.uri
48 raise ValidationError(
49 "Field %s has no value to assign. Check your defaults."
54 except ValueError as e:
55 raise ValidationError(
56 "Field '%s' - invald value: %s"
60 def validate(self, fdict, fallbacks=None, strict=False, validate_required=True):
63 if self.uri not in fdict:
65 # Accept single value for single fields and saliases.
66 if self.name in fallbacks:
68 f = fallbacks[self.name]
70 f = [fallbacks[self.name]]
71 elif self.salias and self.salias in fallbacks:
72 f = [fallbacks[self.salias]]
75 elif validate_required:
76 raise ValidationError("Required field %s not found" % self.uri)
82 return self.validate_value(f, strict=strict)
84 def __eq__(self, other):
85 if isinstance(other, Field) and other.name == self.name:
92 Field(DCNS('creator'), 'authors', Person, salias='author',
94 Field(DCNS('title'), 'title'),
95 Field(DCNS('type'), 'type', required=False, multiple=True),
97 Field(DCNS('contributor.editor'), 'editors',
98 Person, salias='editor', multiple=True, required=False),
99 Field(DCNS('contributor.technical_editor'), 'technical_editors',
100 Person, salias='technical_editor', multiple=True,
102 Field(DCNS('contributor.funding'), 'funders', salias='funder',
103 multiple=True, required=False),
104 Field(DCNS('contributor.thanks'), 'thanks', required=False),
106 Field(DCNS('date'), 'created_at'),
107 Field(DCNS('date.pd'), 'released_to_public_domain_at',
109 Field(DCNS('publisher'), 'publisher', multiple=True),
111 Field(DCNS('language'), 'language'),
112 Field(DCNS('description'), 'description', required=False),
114 Field(DCNS('source'), 'source_name', required=False),
115 Field(DCNS('source.URL'), 'source_urls', salias='source_url',
116 multiple=True, required=False),
117 Field(DCNS('identifier.url'), 'url', WLURI),
118 Field(DCNS('rights.license'), 'license', required=False),
119 Field(DCNS('rights'), 'license_description'),
121 Field(PLMETNS('digitisationSponsor'), 'sponsors', multiple=True,
123 Field(WLNS('digitisationSponsorNote'), 'sponsor_note', required=False),
124 Field(WLNS('contentWarning'), 'content_warnings', multiple=True,
126 Field(WLNS('developmentStage'), 'stage', required=False),
128 Field(DCNS('audience'), 'audiences', text.Audience, salias='audience', multiple=True,
131 Field(DCNS('subject.period'), 'epochs', text.Epoch, salias='epoch', multiple=True,
133 Field(DCNS('subject.type'), 'kinds', text.Kind, salias='kind', multiple=True,
135 Field(DCNS('subject.genre'), 'genres', text.Genre, salias='genre', multiple=True,
137 Field('category.legimi', 'legimi', text.LegimiCategory, required=False),
138 Field('category.thema.main', 'thema_main', text.MainThemaCategory, required=False),
139 Field('category.thema', 'thema', text.ThemaCategory, required=False, multiple=True),
140 Field(DCNS('subject.location'), 'location', required=False),
142 Field(DCNS('contributor.translator'), 'translators',
143 Person, salias='translator', multiple=True, required=False),
144 Field(DCNS('relation.hasPart'), 'parts', WLURI,
145 multiple=True, required=False),
146 Field(DCNS('relation.isVariantOf'), 'variant_of', WLURI,
149 Field(DCNS('relation.coverImage.url'), 'cover_url', required=False),
150 Field(DCNS('relation.coverImage.attribution'), 'cover_by',
152 Field(DCNS('relation.coverImage.source'), 'cover_source',
155 Field(WLNS('coverBarColor'), 'cover_bar_color', required=False),
156 Field(WLNS('coverBoxPosition'), 'cover_box_position', required=False),
157 Field(WLNS('coverClass'), 'cover_class', default=['default']),
158 Field(WLNS('coverLogoUrl'), 'cover_logo_urls', multiple=True,
160 Field(WLNS('endnotes'), 'endnotes', BoolValue,
163 Field('pdf-id', 'isbn_pdf', required=False),
164 Field('epub-id', 'isbn_epub', required=False),
165 Field('mobi-id', 'isbn_mobi', required=False),
166 Field('txt-id', 'isbn_txt', required=False),
167 Field('html-id', 'isbn_html', required=False),
172 def get_field_by_uri(cls, uri):
178 def from_bytes(cls, xml, *args, **kwargs):
179 return cls.from_file(io.BytesIO(xml), *args, **kwargs)
182 def from_file(cls, xmlfile, *args, **kwargs):
185 iter = etree.iterparse(xmlfile, ['start', 'end'])
186 for (event, element) in iter:
187 if element.tag == RDFNS('RDF') and event == 'start':
192 raise NoDublinCore("DublinCore section not found. \
193 Check if there are rdf:RDF and rdf:Description tags.")
195 # continue 'till the end of RDF section
196 for (event, element) in iter:
197 if element.tag == RDFNS('RDF') and event == 'end':
200 # if there is no end, Expat should yell at us with an ExpatError
202 # extract data from the element and make the info
203 return cls.from_element(desc_tag, *args, **kwargs)
204 except XMLSyntaxError as e:
206 except ExpatError as e:
210 def from_element(cls, rdf_tag, *args, **kwargs):
211 # The tree is already parsed,
212 # so we don't need to worry about Expat errors.
214 desc = rdf_tag.find(".//" + RDFNS('Description'))
218 "There must be a '%s' element inside the RDF."
219 % RDFNS('Description')
224 while p is not None and lang is None:
225 lang = p.attrib.get(XMLNS('lang'))
228 for e in desc.getchildren():
231 meta_id = e.attrib.get('id')
232 if meta_id and meta_id.endswith('-id'):
235 field = cls.get_field_by_uri(tag)
237 # Ignore unknown fields.
240 fv = field_dict.get(tag, [])
241 if e.text is not None:
242 val = field.value_type.from_text(e.text)
243 val.lang = e.attrib.get(XMLNS('lang'), lang)
249 return cls(desc.attrib, field_dict, *args, **kwargs)
251 def __init__(self, rdf_attrs, dc_fields, fallbacks=None, strict=False, validate_required=True):
253 rdf_attrs should be a dictionary-like object with any attributes
254 of the RDF:Description.
255 dc_fields - dictionary mapping DC fields (with namespace) to
256 list of text values for the given field.
259 self.about = rdf_attrs.get(RDFNS('about'))
262 for field in self.FIELDS:
263 value = field.validate(dc_fields, fallbacks=fallbacks,
264 strict=strict, validate_required=validate_required)
265 setattr(self, 'prop_' + field.name, value)
266 self.fmap[field.name] = field
268 self.fmap[field.salias] = field
270 def __getattribute__(self, name):
272 field = object.__getattribute__(self, 'fmap')[name]
273 value = object.__getattribute__(self, 'prop_'+field.name)
274 if field.name == name:
276 else: # singular alias
277 if not field.multiple:
278 raise "OUCH!! for field %s" % name
280 return value[0] if value else None
281 except (KeyError, AttributeError):
282 return object.__getattribute__(self, name)
284 def __setattr__(self, name, newvalue):
286 field = object.__getattribute__(self, 'fmap')[name]
287 if field.name == name:
288 object.__setattr__(self, 'prop_'+field.name, newvalue)
289 else: # singular alias
290 if not field.multiple:
291 raise "OUCH! while setting field %s" % name
293 object.__setattr__(self, 'prop_'+field.name, [newvalue])
294 except (KeyError, AttributeError):
295 return object.__setattr__(self, name, newvalue)
297 def update(self, field_dict):
299 Update using field_dict. Verify correctness, but don't check
300 if all required fields are present.
302 for field in self.FIELDS:
303 if field.name in field_dict:
304 setattr(self, field.name, field_dict[field.name])
306 def to_etree(self, parent=None):
307 """XML representation of this object."""
308 # etree._namespace_map[str(self.RDF)] = 'rdf'
309 # etree._namespace_map[str(self.DC)] = 'dc'
312 root = etree.Element(RDFNS('RDF'))
314 root = parent.makeelement(RDFNS('RDF'))
316 description = etree.SubElement(root, RDFNS('Description'))
319 description.set(RDFNS('about'), self.about)
321 for field in self.FIELDS:
322 v = getattr(self, field.name, None)
328 e = etree.Element(field.uri)
331 description.append(e)
333 e = etree.Element(field.uri)
335 description.append(e)
340 rdf = {'about': {'uri': RDFNS('about'), 'value': self.about}}
343 for field in self.FIELDS:
344 v = getattr(self, field.name, None)
349 v = [str(x) for x in v if x is not None]
353 dc[field.name] = {'uri': field.uri, 'value': v}
358 result = {'about': self.about}
359 for field in self.FIELDS:
360 v = getattr(self, field.name, None)
366 v = [str(x) for x in v if x is not None]
369 result[field.name] = v
372 v = getattr(self, field.salias)
374 result[field.salias] = str(v)
379 def parse(file_name, cls=BookInfo):
380 return cls.from_file(file_name)