1 # This file is part of Librarian, licensed under GNU Affero GPLv3 or later.
 
   2 # Copyright © Fundacja Wolne Lektury. See NOTICE for more information.
 
   4 from xml.parsers.expat import ExpatError
 
   5 from datetime import date
 
   9 from librarian.util import roman_to_int
 
  11 from librarian import (ValidationError, NoDublinCore, ParseError, DCNS, RDFNS,
 
  14 import lxml.etree as etree
 
  15 from lxml.etree import XMLSyntaxError
 
  17 from librarian.meta.types.bool import BoolValue
 
  18 from librarian.meta.types.person import Person
 
  19 from librarian.meta.types.wluri import WLURI
 
  20 from librarian.meta.types import text
 
  24     def __init__(self, uri, attr_name, value_type=text.TextValue,
 
  25                  multiple=False, salias=None, **kwargs):
 
  28         self.value_type = value_type
 
  29         self.multiple = multiple
 
  32         self.required = (kwargs.get('required', True)
 
  33                          and 'default' not in kwargs)
 
  34         self.default = kwargs.get('default', [] if multiple else [None])
 
  36     def validate_value(self, val, strict=False):
 
  44                 raise ValidationError(
 
  45                     "Multiple values not allowed for field '%s'" % self.uri
 
  48                 raise ValidationError(
 
  49                     "Field %s has no value to assign. Check your defaults."
 
  54         except ValueError as e:
 
  55             raise ValidationError(
 
  56                 "Field '%s' - invald value: %s"
 
  60     def validate(self, fdict, fallbacks=None, strict=False, validate_required=True):
 
  63         if self.uri not in fdict:
 
  65                 # Accept single value for single fields and saliases.
 
  66                 if self.name in fallbacks:
 
  68                         f = fallbacks[self.name]
 
  70                         f = [fallbacks[self.name]]
 
  71                 elif self.salias and self.salias in fallbacks:
 
  72                     f = [fallbacks[self.salias]]
 
  75             elif validate_required:
 
  76                 raise ValidationError("Required field %s not found" % self.uri)
 
  82         return self.validate_value(f, strict=strict)
 
  84     def __eq__(self, other):
 
  85         if isinstance(other, Field) and other.name == self.name:
 
  91     def __new__(mcs, classname, bases, class_dict):
 
  92         fields = list(class_dict['FIELDS'])
 
  94         for base in bases[::-1]:
 
  95             if hasattr(base, 'FIELDS'):
 
  96                 for field in base.FIELDS[::-1]:
 
 100                         fields.insert(0, field)
 
 102         class_dict['FIELDS'] = tuple(fields)
 
 103         return super(DCInfo, mcs).__new__(mcs, classname, bases, class_dict)
 
 106 class WorkInfo(metaclass=DCInfo):
 
 108         Field(DCNS('creator'), 'authors', Person, salias='author',
 
 109               multiple=True, required=False),
 
 110         Field(DCNS('title'), 'title'),
 
 111         Field(DCNS('type'), 'type', required=False, multiple=True),
 
 113         Field(DCNS('contributor.editor'), 'editors',
 
 114               Person, salias='editor', multiple=True, required=False),
 
 115         Field(DCNS('contributor.technical_editor'), 'technical_editors',
 
 116               Person, salias='technical_editor', multiple=True,
 
 118         Field(DCNS('contributor.funding'), 'funders', salias='funder',
 
 119               multiple=True, required=False),
 
 120         Field(DCNS('contributor.thanks'), 'thanks', required=False),
 
 122         Field(DCNS('date'), 'created_at'),
 
 123         Field(DCNS('date.pd'), 'released_to_public_domain_at',
 
 125         Field(DCNS('publisher'), 'publisher', multiple=True),
 
 127         Field(DCNS('language'), 'language'),
 
 128         Field(DCNS('description'), 'description', required=False),
 
 130         Field(DCNS('source'), 'source_name', required=False),
 
 131         Field(DCNS('source.URL'), 'source_urls', salias='source_url',
 
 132               multiple=True, required=False),
 
 133         Field(DCNS('identifier.url'), 'url', WLURI),
 
 134         Field(DCNS('rights.license'), 'license', required=False),
 
 135         Field(DCNS('rights'), 'license_description'),
 
 137         Field(PLMETNS('digitisationSponsor'), 'sponsors', multiple=True,
 
 139         Field(WLNS('digitisationSponsorNote'), 'sponsor_note', required=False),
 
 140         Field(WLNS('contentWarning'), 'content_warnings', multiple=True,
 
 142         Field(WLNS('developmentStage'), 'stage', required=False),
 
 143         Field(WLNS('original'), 'original', required=False),
 
 147     def get_field_by_uri(cls, uri):
 
 153     def from_bytes(cls, xml, *args, **kwargs):
 
 154         return cls.from_file(io.BytesIO(xml), *args, **kwargs)
 
 157     def from_file(cls, xmlfile, *args, **kwargs):
 
 160             iter = etree.iterparse(xmlfile, ['start', 'end'])
 
 161             for (event, element) in iter:
 
 162                 if element.tag == RDFNS('RDF') and event == 'start':
 
 167                 raise NoDublinCore("DublinCore section not found. \
 
 168                     Check if there are rdf:RDF and rdf:Description tags.")
 
 170             # continue 'till the end of RDF section
 
 171             for (event, element) in iter:
 
 172                 if element.tag == RDFNS('RDF') and event == 'end':
 
 175             # if there is no end, Expat should yell at us with an ExpatError
 
 177             # extract data from the element and make the info
 
 178             return cls.from_element(desc_tag, *args, **kwargs)
 
 179         except XMLSyntaxError as e:
 
 181         except ExpatError as e:
 
 185     def from_element(cls, rdf_tag, *args, **kwargs):
 
 186         # The tree is already parsed,
 
 187         # so we don't need to worry about Expat errors.
 
 189         desc = rdf_tag.find(".//" + RDFNS('Description'))
 
 193                 "There must be a '%s' element inside the RDF."
 
 194                 % RDFNS('Description')
 
 199         while p is not None and lang is None:
 
 200             lang = p.attrib.get(XMLNS('lang'))
 
 203         for e in desc.getchildren():
 
 206                 meta_id = e.attrib.get('id')
 
 207                 if meta_id and meta_id.endswith('-id'):
 
 210             field = cls.get_field_by_uri(tag)
 
 212                 # Ignore unknown fields.
 
 215             fv = field_dict.get(tag, [])
 
 216             if e.text is not None:
 
 217                 val = field.value_type.from_text(e.text)
 
 218                 val.lang = e.attrib.get(XMLNS('lang'), lang)
 
 224         return cls(desc.attrib, field_dict, *args, **kwargs)
 
 226     def __init__(self, rdf_attrs, dc_fields, fallbacks=None, strict=False, validate_required=True):
 
 228         rdf_attrs should be a dictionary-like object with any attributes
 
 229         of the RDF:Description.
 
 230         dc_fields - dictionary mapping DC fields (with namespace) to
 
 231         list of text values for the given field.
 
 234         self.about = rdf_attrs.get(RDFNS('about'))
 
 237         for field in self.FIELDS:
 
 238             value = field.validate(dc_fields, fallbacks=fallbacks,
 
 239                                    strict=strict, validate_required=validate_required)
 
 240             setattr(self, 'prop_' + field.name, value)
 
 241             self.fmap[field.name] = field
 
 243                 self.fmap[field.salias] = field
 
 245     def __getattribute__(self, name):
 
 247             field = object.__getattribute__(self, 'fmap')[name]
 
 248             value = object.__getattribute__(self, 'prop_'+field.name)
 
 249             if field.name == name:
 
 251             else:  # singular alias
 
 252                 if not field.multiple:
 
 253                     raise "OUCH!! for field %s" % name
 
 255                 return value[0] if value else None
 
 256         except (KeyError, AttributeError):
 
 257             return object.__getattribute__(self, name)
 
 259     def __setattr__(self, name, newvalue):
 
 261             field = object.__getattribute__(self, 'fmap')[name]
 
 262             if field.name == name:
 
 263                 object.__setattr__(self, 'prop_'+field.name, newvalue)
 
 264             else:  # singular alias
 
 265                 if not field.multiple:
 
 266                     raise "OUCH! while setting field %s" % name
 
 268                 object.__setattr__(self, 'prop_'+field.name, [newvalue])
 
 269         except (KeyError, AttributeError):
 
 270             return object.__setattr__(self, name, newvalue)
 
 272     def update(self, field_dict):
 
 274         Update using field_dict. Verify correctness, but don't check
 
 275         if all required fields are present.
 
 277         for field in self.FIELDS:
 
 278             if field.name in field_dict:
 
 279                 setattr(self, field.name, field_dict[field.name])
 
 281     def to_etree(self, parent=None):
 
 282         """XML representation of this object."""
 
 283         # etree._namespace_map[str(self.RDF)] = 'rdf'
 
 284         # etree._namespace_map[str(self.DC)] = 'dc'
 
 287             root = etree.Element(RDFNS('RDF'))
 
 289             root = parent.makeelement(RDFNS('RDF'))
 
 291         description = etree.SubElement(root, RDFNS('Description'))
 
 294             description.set(RDFNS('about'), self.about)
 
 296         for field in self.FIELDS:
 
 297             v = getattr(self, field.name, None)
 
 303                         e = etree.Element(field.uri)
 
 306                         description.append(e)
 
 308                     e = etree.Element(field.uri)
 
 310                     description.append(e)
 
 315         rdf = {'about': {'uri': RDFNS('about'), 'value': self.about}}
 
 318         for field in self.FIELDS:
 
 319             v = getattr(self, field.name, None)
 
 324                     v = [str(x) for x in v if x is not None]
 
 328                 dc[field.name] = {'uri': field.uri, 'value': v}
 
 333         result = {'about': self.about}
 
 334         for field in self.FIELDS:
 
 335             v = getattr(self, field.name, None)
 
 341                     v = [str(x) for x in v if x is not None]
 
 344                 result[field.name] = v
 
 347                 v = getattr(self, field.salias)
 
 349                     result[field.salias] = str(v)
 
 354 class BookInfo(WorkInfo):
 
 356         Field(DCNS('audience'), 'audiences', text.Audience, salias='audience', multiple=True,
 
 359         Field(DCNS('subject.period'), 'epochs', text.Epoch, salias='epoch', multiple=True,
 
 361         Field(DCNS('subject.type'), 'kinds', text.Kind, salias='kind', multiple=True,
 
 363         Field(DCNS('subject.genre'), 'genres', text.Genre, salias='genre', multiple=True,
 
 365         Field('category.legimi', 'legimi', text.LegimiCategory, required=False),
 
 366         Field('category.thema.main', 'thema_main', text.MainThemaCategory, required=False),
 
 367         Field('category.thema', 'thema', text.ThemaCategory, required=False, multiple=True),
 
 368         Field(DCNS('subject.location'), 'location', required=False),
 
 370         Field(DCNS('contributor.translator'), 'translators',
 
 371               Person,  salias='translator', multiple=True, required=False),
 
 372         Field(DCNS('relation.hasPart'), 'parts', WLURI,
 
 373               multiple=True, required=False),
 
 374         Field(DCNS('relation.isVariantOf'), 'variant_of', WLURI,
 
 377         Field(DCNS('relation.coverImage.url'), 'cover_url', required=False),
 
 378         Field(DCNS('relation.coverImage.attribution'), 'cover_by',
 
 380         Field(DCNS('relation.coverImage.source'), 'cover_source',
 
 383         Field(WLNS('coverBarColor'), 'cover_bar_color', required=False),
 
 384         Field(WLNS('coverBoxPosition'), 'cover_box_position', required=False),
 
 385         Field(WLNS('coverClass'), 'cover_class', default=['default']),
 
 386         Field(WLNS('coverLogoUrl'), 'cover_logo_urls', multiple=True,
 
 388         Field(WLNS('endnotes'), 'endnotes', BoolValue,
 
 391         Field('pdf-id',  'isbn_pdf',  required=False),
 
 392         Field('epub-id', 'isbn_epub', required=False),
 
 393         Field('mobi-id', 'isbn_mobi', required=False),
 
 394         Field('txt-id',  'isbn_txt',  required=False),
 
 395         Field('html-id', 'isbn_html', required=False),
 
 399 def parse(file_name, cls=BookInfo):
 
 400     return cls.from_file(file_name)