1 # This file is part of Librarian, licensed under GNU Affero GPLv3 or later.
2 # Copyright © Fundacja Wolne Lektury. See NOTICE for more information.
4 from xml.parsers.expat import ExpatError
5 from datetime import date
9 from librarian.util import roman_to_int
11 from librarian import (ValidationError, NoDublinCore, ParseError, DCNS, RDFNS,
14 import lxml.etree as etree
15 from lxml.etree import XMLSyntaxError
17 from librarian.meta.types.bool import BoolValue
18 from librarian.meta.types.person import Person
19 from librarian.meta.types.wluri import WLURI
20 from librarian.meta.types import text
24 def __init__(self, uri, attr_name, value_type=text.TextValue,
25 multiple=False, salias=None, **kwargs):
28 self.value_type = value_type
29 self.multiple = multiple
32 self.required = (kwargs.get('required', True)
33 and 'default' not in kwargs)
34 self.default = kwargs.get('default', [] if multiple else [None])
36 def validate_value(self, val, strict=False):
44 raise ValidationError(
45 "Multiple values not allowed for field '%s'" % self.uri
48 raise ValidationError(
49 "Field %s has no value to assign. Check your defaults."
54 except ValueError as e:
55 raise ValidationError(
56 "Field '%s' - invald value: %s"
60 def validate(self, fdict, fallbacks=None, strict=False, validate_required=True):
63 if self.uri not in fdict:
65 # Accept single value for single fields and saliases.
66 if self.name in fallbacks:
68 f = fallbacks[self.name]
70 f = [fallbacks[self.name]]
71 elif self.salias and self.salias in fallbacks:
72 f = [fallbacks[self.salias]]
75 elif validate_required:
76 raise ValidationError("Required field %s not found" % self.uri)
82 return self.validate_value(f, strict=strict)
84 def __eq__(self, other):
85 if isinstance(other, Field) and other.name == self.name:
91 def __new__(mcs, classname, bases, class_dict):
92 fields = list(class_dict['FIELDS'])
94 for base in bases[::-1]:
95 if hasattr(base, 'FIELDS'):
96 for field in base.FIELDS[::-1]:
100 fields.insert(0, field)
102 class_dict['FIELDS'] = tuple(fields)
103 return super(DCInfo, mcs).__new__(mcs, classname, bases, class_dict)
106 class WorkInfo(metaclass=DCInfo):
108 Field(DCNS('creator'), 'authors', Person, salias='author',
109 multiple=True, required=False),
110 Field(DCNS('title'), 'title'),
111 Field(DCNS('type'), 'type', required=False, multiple=True),
113 Field(DCNS('contributor.editor'), 'editors',
114 Person, salias='editor', multiple=True, required=False),
115 Field(DCNS('contributor.technical_editor'), 'technical_editors',
116 Person, salias='technical_editor', multiple=True,
118 Field(DCNS('contributor.funding'), 'funders', salias='funder',
119 multiple=True, required=False),
120 Field(DCNS('contributor.thanks'), 'thanks', required=False),
122 Field(DCNS('date'), 'created_at'),
123 Field(DCNS('date.pd'), 'released_to_public_domain_at',
125 Field(DCNS('publisher'), 'publisher', multiple=True),
127 Field(DCNS('language'), 'language'),
128 Field(DCNS('description'), 'description', required=False),
130 Field(DCNS('source'), 'source_name', required=False),
131 Field(DCNS('source.URL'), 'source_urls', salias='source_url',
132 multiple=True, required=False),
133 Field(DCNS('identifier.url'), 'url', WLURI),
134 Field(DCNS('rights.license'), 'license', required=False),
135 Field(DCNS('rights'), 'license_description'),
137 Field(PLMETNS('digitisationSponsor'), 'sponsors', multiple=True,
139 Field(WLNS('digitisationSponsorNote'), 'sponsor_note', required=False),
140 Field(WLNS('contentWarning'), 'content_warnings', multiple=True,
142 Field(WLNS('developmentStage'), 'stage', required=False),
143 Field(WLNS('original'), 'original', required=False),
147 def get_field_by_uri(cls, uri):
153 def from_bytes(cls, xml, *args, **kwargs):
154 return cls.from_file(io.BytesIO(xml), *args, **kwargs)
157 def from_file(cls, xmlfile, *args, **kwargs):
160 iter = etree.iterparse(xmlfile, ['start', 'end'])
161 for (event, element) in iter:
162 if element.tag == RDFNS('RDF') and event == 'start':
167 raise NoDublinCore("DublinCore section not found. \
168 Check if there are rdf:RDF and rdf:Description tags.")
170 # continue 'till the end of RDF section
171 for (event, element) in iter:
172 if element.tag == RDFNS('RDF') and event == 'end':
175 # if there is no end, Expat should yell at us with an ExpatError
177 # extract data from the element and make the info
178 return cls.from_element(desc_tag, *args, **kwargs)
179 except XMLSyntaxError as e:
181 except ExpatError as e:
185 def from_element(cls, rdf_tag, *args, **kwargs):
186 # The tree is already parsed,
187 # so we don't need to worry about Expat errors.
189 desc = rdf_tag.find(".//" + RDFNS('Description'))
193 "There must be a '%s' element inside the RDF."
194 % RDFNS('Description')
199 while p is not None and lang is None:
200 lang = p.attrib.get(XMLNS('lang'))
203 for e in desc.getchildren():
206 meta_id = e.attrib.get('id')
207 if meta_id and meta_id.endswith('-id'):
210 field = cls.get_field_by_uri(tag)
212 # Ignore unknown fields.
215 fv = field_dict.get(tag, [])
216 if e.text is not None:
217 val = field.value_type.from_text(e.text)
218 val.lang = e.attrib.get(XMLNS('lang'), lang)
224 return cls(desc.attrib, field_dict, *args, **kwargs)
226 def __init__(self, rdf_attrs, dc_fields, fallbacks=None, strict=False, validate_required=True):
228 rdf_attrs should be a dictionary-like object with any attributes
229 of the RDF:Description.
230 dc_fields - dictionary mapping DC fields (with namespace) to
231 list of text values for the given field.
234 self.about = rdf_attrs.get(RDFNS('about'))
237 for field in self.FIELDS:
238 value = field.validate(dc_fields, fallbacks=fallbacks,
239 strict=strict, validate_required=validate_required)
240 setattr(self, 'prop_' + field.name, value)
241 self.fmap[field.name] = field
243 self.fmap[field.salias] = field
245 def __getattribute__(self, name):
247 field = object.__getattribute__(self, 'fmap')[name]
248 value = object.__getattribute__(self, 'prop_'+field.name)
249 if field.name == name:
251 else: # singular alias
252 if not field.multiple:
253 raise "OUCH!! for field %s" % name
255 return value[0] if value else None
256 except (KeyError, AttributeError):
257 return object.__getattribute__(self, name)
259 def __setattr__(self, name, newvalue):
261 field = object.__getattribute__(self, 'fmap')[name]
262 if field.name == name:
263 object.__setattr__(self, 'prop_'+field.name, newvalue)
264 else: # singular alias
265 if not field.multiple:
266 raise "OUCH! while setting field %s" % name
268 object.__setattr__(self, 'prop_'+field.name, [newvalue])
269 except (KeyError, AttributeError):
270 return object.__setattr__(self, name, newvalue)
272 def update(self, field_dict):
274 Update using field_dict. Verify correctness, but don't check
275 if all required fields are present.
277 for field in self.FIELDS:
278 if field.name in field_dict:
279 setattr(self, field.name, field_dict[field.name])
281 def to_etree(self, parent=None):
282 """XML representation of this object."""
283 # etree._namespace_map[str(self.RDF)] = 'rdf'
284 # etree._namespace_map[str(self.DC)] = 'dc'
287 root = etree.Element(RDFNS('RDF'))
289 root = parent.makeelement(RDFNS('RDF'))
291 description = etree.SubElement(root, RDFNS('Description'))
294 description.set(RDFNS('about'), self.about)
296 for field in self.FIELDS:
297 v = getattr(self, field.name, None)
303 e = etree.Element(field.uri)
306 description.append(e)
308 e = etree.Element(field.uri)
310 description.append(e)
315 rdf = {'about': {'uri': RDFNS('about'), 'value': self.about}}
318 for field in self.FIELDS:
319 v = getattr(self, field.name, None)
324 v = [str(x) for x in v if x is not None]
328 dc[field.name] = {'uri': field.uri, 'value': v}
333 result = {'about': self.about}
334 for field in self.FIELDS:
335 v = getattr(self, field.name, None)
341 v = [str(x) for x in v if x is not None]
344 result[field.name] = v
347 v = getattr(self, field.salias)
349 result[field.salias] = str(v)
354 class BookInfo(WorkInfo):
356 Field(DCNS('audience'), 'audiences', text.Audience, salias='audience', multiple=True,
359 Field(DCNS('subject.period'), 'epochs', text.Epoch, salias='epoch', multiple=True,
361 Field(DCNS('subject.type'), 'kinds', text.Kind, salias='kind', multiple=True,
363 Field(DCNS('subject.genre'), 'genres', text.Genre, salias='genre', multiple=True,
365 Field('category.legimi', 'legimi', text.LegimiCategory, required=False),
366 Field('category.thema.main', 'thema_main', text.MainThemaCategory, required=False),
367 Field('category.thema', 'thema', text.ThemaCategory, required=False, multiple=True),
368 Field(DCNS('subject.location'), 'location', required=False),
370 Field(DCNS('contributor.translator'), 'translators',
371 Person, salias='translator', multiple=True, required=False),
372 Field(DCNS('relation.hasPart'), 'parts', WLURI,
373 multiple=True, required=False),
374 Field(DCNS('relation.isVariantOf'), 'variant_of', WLURI,
377 Field(DCNS('relation.coverImage.url'), 'cover_url', required=False),
378 Field(DCNS('relation.coverImage.attribution'), 'cover_by',
380 Field(DCNS('relation.coverImage.source'), 'cover_source',
383 Field(WLNS('coverBarColor'), 'cover_bar_color', required=False),
384 Field(WLNS('coverBoxPosition'), 'cover_box_position', required=False),
385 Field(WLNS('coverClass'), 'cover_class', default=['default']),
386 Field(WLNS('coverLogoUrl'), 'cover_logo_urls', multiple=True,
388 Field(WLNS('endnotes'), 'endnotes', BoolValue,
391 Field('pdf-id', 'isbn_pdf', required=False),
392 Field('epub-id', 'isbn_epub', required=False),
393 Field('mobi-id', 'isbn_mobi', required=False),
394 Field('txt-id', 'isbn_txt', required=False),
395 Field('html-id', 'isbn_html', required=False),
399 def parse(file_name, cls=BookInfo):
400 return cls.from_file(file_name)