1 # This file is part of Librarian, licensed under GNU Affero GPLv3 or later.
 
   2 # Copyright © Fundacja Wolne Lektury. See NOTICE for more information.
 
   4 from xml.parsers.expat import ExpatError
 
   5 from datetime import date
 
   9 from librarian.util import roman_to_int
 
  11 from librarian import (ValidationError, NoDublinCore, ParseError, DCNS, RDFNS,
 
  14 import lxml.etree as etree
 
  15 from lxml.etree import XMLSyntaxError
 
  17 from librarian.meta.types.bool import BoolValue
 
  18 from librarian.meta.types.person import Person
 
  19 from librarian.meta.types.wluri import WLURI
 
  20 from librarian.meta.types import text
 
  24     def __init__(self, uri, attr_name, value_type=text.TextValue,
 
  25                  multiple=False, salias=None, **kwargs):
 
  28         self.value_type = value_type
 
  29         self.multiple = multiple
 
  32         self.required = (kwargs.get('required', True)
 
  33                          and 'default' not in kwargs)
 
  34         self.default = kwargs.get('default', [] if multiple else [None])
 
  36     def validate_value(self, val, strict=False):
 
  44                 raise ValidationError(
 
  45                     "Multiple values not allowed for field '%s'" % self.uri
 
  48                 raise ValidationError(
 
  49                     "Field %s has no value to assign. Check your defaults."
 
  54         except ValueError as e:
 
  55             raise ValidationError(
 
  56                 "Field '%s' - invald value: %s"
 
  60     def validate(self, fdict, fallbacks=None, strict=False, validate_required=True):
 
  63         if self.uri not in fdict:
 
  65                 # Accept single value for single fields and saliases.
 
  66                 if self.name in fallbacks:
 
  68                         f = fallbacks[self.name]
 
  70                         f = [fallbacks[self.name]]
 
  71                 elif self.salias and self.salias in fallbacks:
 
  72                     f = [fallbacks[self.salias]]
 
  75             elif validate_required:
 
  76                 raise ValidationError("Required field %s not found" % self.uri)
 
  82         return self.validate_value(f, strict=strict)
 
  84     def __eq__(self, other):
 
  85         if isinstance(other, Field) and other.name == self.name:
 
  91     def __new__(mcs, classname, bases, class_dict):
 
  92         fields = list(class_dict['FIELDS'])
 
  94         for base in bases[::-1]:
 
  95             if hasattr(base, 'FIELDS'):
 
  96                 for field in base.FIELDS[::-1]:
 
 100                         fields.insert(0, field)
 
 102         class_dict['FIELDS'] = tuple(fields)
 
 103         return super(DCInfo, mcs).__new__(mcs, classname, bases, class_dict)
 
 106 class WorkInfo(metaclass=DCInfo):
 
 108         Field(DCNS('creator'), 'authors', Person, salias='author',
 
 110         Field(DCNS('title'), 'title'),
 
 111         Field(DCNS('type'), 'type', required=False, multiple=True),
 
 113         Field(DCNS('contributor.editor'), 'editors',
 
 114               Person, salias='editor', multiple=True, required=False),
 
 115         Field(DCNS('contributor.technical_editor'), 'technical_editors',
 
 116               Person, salias='technical_editor', multiple=True,
 
 118         Field(DCNS('contributor.funding'), 'funders', salias='funder',
 
 119               multiple=True, required=False),
 
 120         Field(DCNS('contributor.thanks'), 'thanks', required=False),
 
 122         Field(DCNS('date'), 'created_at'),
 
 123         Field(DCNS('date.pd'), 'released_to_public_domain_at',
 
 125         Field(DCNS('publisher'), 'publisher', multiple=True),
 
 127         Field(DCNS('language'), 'language'),
 
 128         Field(DCNS('description'), 'description', required=False),
 
 130         Field(DCNS('source'), 'source_name', required=False),
 
 131         Field(DCNS('source.URL'), 'source_urls', salias='source_url',
 
 132               multiple=True, required=False),
 
 133         Field(DCNS('identifier.url'), 'url', WLURI),
 
 134         Field(DCNS('rights.license'), 'license', required=False),
 
 135         Field(DCNS('rights'), 'license_description'),
 
 137         Field(PLMETNS('digitisationSponsor'), 'sponsors', multiple=True,
 
 139         Field(WLNS('digitisationSponsorNote'), 'sponsor_note', required=False),
 
 140         Field(WLNS('contentWarning'), 'content_warnings', multiple=True,
 
 142         Field(WLNS('developmentStage'), 'stage', required=False),
 
 146     def get_field_by_uri(cls, uri):
 
 152     def from_bytes(cls, xml, *args, **kwargs):
 
 153         return cls.from_file(io.BytesIO(xml), *args, **kwargs)
 
 156     def from_file(cls, xmlfile, *args, **kwargs):
 
 159             iter = etree.iterparse(xmlfile, ['start', 'end'])
 
 160             for (event, element) in iter:
 
 161                 if element.tag == RDFNS('RDF') and event == 'start':
 
 166                 raise NoDublinCore("DublinCore section not found. \
 
 167                     Check if there are rdf:RDF and rdf:Description tags.")
 
 169             # continue 'till the end of RDF section
 
 170             for (event, element) in iter:
 
 171                 if element.tag == RDFNS('RDF') and event == 'end':
 
 174             # if there is no end, Expat should yell at us with an ExpatError
 
 176             # extract data from the element and make the info
 
 177             return cls.from_element(desc_tag, *args, **kwargs)
 
 178         except XMLSyntaxError as e:
 
 180         except ExpatError as e:
 
 184     def from_element(cls, rdf_tag, *args, **kwargs):
 
 185         # The tree is already parsed,
 
 186         # so we don't need to worry about Expat errors.
 
 188         desc = rdf_tag.find(".//" + RDFNS('Description'))
 
 192                 "There must be a '%s' element inside the RDF."
 
 193                 % RDFNS('Description')
 
 198         while p is not None and lang is None:
 
 199             lang = p.attrib.get(XMLNS('lang'))
 
 202         for e in desc.getchildren():
 
 205                 meta_id = e.attrib.get('id')
 
 206                 if meta_id and meta_id.endswith('-id'):
 
 209             field = cls.get_field_by_uri(tag)
 
 211                 # Ignore unknown fields.
 
 214             fv = field_dict.get(tag, [])
 
 215             if e.text is not None:
 
 216                 val = field.value_type.from_text(e.text)
 
 217                 val.lang = e.attrib.get(XMLNS('lang'), lang)
 
 223         return cls(desc.attrib, field_dict, *args, **kwargs)
 
 225     def __init__(self, rdf_attrs, dc_fields, fallbacks=None, strict=False, validate_required=True):
 
 227         rdf_attrs should be a dictionary-like object with any attributes
 
 228         of the RDF:Description.
 
 229         dc_fields - dictionary mapping DC fields (with namespace) to
 
 230         list of text values for the given field.
 
 233         self.about = rdf_attrs.get(RDFNS('about'))
 
 236         for field in self.FIELDS:
 
 237             value = field.validate(dc_fields, fallbacks=fallbacks,
 
 238                                    strict=strict, validate_required=validate_required)
 
 239             setattr(self, 'prop_' + field.name, value)
 
 240             self.fmap[field.name] = field
 
 242                 self.fmap[field.salias] = field
 
 244     def __getattribute__(self, name):
 
 246             field = object.__getattribute__(self, 'fmap')[name]
 
 247             value = object.__getattribute__(self, 'prop_'+field.name)
 
 248             if field.name == name:
 
 250             else:  # singular alias
 
 251                 if not field.multiple:
 
 252                     raise "OUCH!! for field %s" % name
 
 254                 return value[0] if value else None
 
 255         except (KeyError, AttributeError):
 
 256             return object.__getattribute__(self, name)
 
 258     def __setattr__(self, name, newvalue):
 
 260             field = object.__getattribute__(self, 'fmap')[name]
 
 261             if field.name == name:
 
 262                 object.__setattr__(self, 'prop_'+field.name, newvalue)
 
 263             else:  # singular alias
 
 264                 if not field.multiple:
 
 265                     raise "OUCH! while setting field %s" % name
 
 267                 object.__setattr__(self, 'prop_'+field.name, [newvalue])
 
 268         except (KeyError, AttributeError):
 
 269             return object.__setattr__(self, name, newvalue)
 
 271     def update(self, field_dict):
 
 273         Update using field_dict. Verify correctness, but don't check
 
 274         if all required fields are present.
 
 276         for field in self.FIELDS:
 
 277             if field.name in field_dict:
 
 278                 setattr(self, field.name, field_dict[field.name])
 
 280     def to_etree(self, parent=None):
 
 281         """XML representation of this object."""
 
 282         # etree._namespace_map[str(self.RDF)] = 'rdf'
 
 283         # etree._namespace_map[str(self.DC)] = 'dc'
 
 286             root = etree.Element(RDFNS('RDF'))
 
 288             root = parent.makeelement(RDFNS('RDF'))
 
 290         description = etree.SubElement(root, RDFNS('Description'))
 
 293             description.set(RDFNS('about'), self.about)
 
 295         for field in self.FIELDS:
 
 296             v = getattr(self, field.name, None)
 
 302                         e = etree.Element(field.uri)
 
 305                         description.append(e)
 
 307                     e = etree.Element(field.uri)
 
 309                     description.append(e)
 
 314         rdf = {'about': {'uri': RDFNS('about'), 'value': self.about}}
 
 317         for field in self.FIELDS:
 
 318             v = getattr(self, field.name, None)
 
 323                     v = [str(x) for x in v if x is not None]
 
 327                 dc[field.name] = {'uri': field.uri, 'value': v}
 
 332         result = {'about': self.about}
 
 333         for field in self.FIELDS:
 
 334             v = getattr(self, field.name, None)
 
 340                     v = [str(x) for x in v if x is not None]
 
 343                 result[field.name] = v
 
 346                 v = getattr(self, field.salias)
 
 348                     result[field.salias] = str(v)
 
 353 class BookInfo(WorkInfo):
 
 355         Field(DCNS('audience'), 'audiences', text.Audience, salias='audience', multiple=True,
 
 358         Field(DCNS('subject.period'), 'epochs', text.Epoch, salias='epoch', multiple=True,
 
 360         Field(DCNS('subject.type'), 'kinds', text.Kind, salias='kind', multiple=True,
 
 362         Field(DCNS('subject.genre'), 'genres', text.Genre, salias='genre', multiple=True,
 
 364         Field('category.legimi', 'legimi', text.LegimiCategory, required=False),
 
 365         Field('category.thema.main', 'thema_main', text.MainThemaCategory, required=False),
 
 366         Field('category.thema', 'thema', text.ThemaCategory, required=False, multiple=True),
 
 367         Field(DCNS('subject.location'), 'location', required=False),
 
 369         Field(DCNS('contributor.translator'), 'translators',
 
 370               Person,  salias='translator', multiple=True, required=False),
 
 371         Field(DCNS('relation.hasPart'), 'parts', WLURI,
 
 372               multiple=True, required=False),
 
 373         Field(DCNS('relation.isVariantOf'), 'variant_of', WLURI,
 
 376         Field(DCNS('relation.coverImage.url'), 'cover_url', required=False),
 
 377         Field(DCNS('relation.coverImage.attribution'), 'cover_by',
 
 379         Field(DCNS('relation.coverImage.source'), 'cover_source',
 
 382         Field(WLNS('coverBarColor'), 'cover_bar_color', required=False),
 
 383         Field(WLNS('coverBoxPosition'), 'cover_box_position', required=False),
 
 384         Field(WLNS('coverClass'), 'cover_class', default=['default']),
 
 385         Field(WLNS('coverLogoUrl'), 'cover_logo_urls', multiple=True,
 
 387         Field(WLNS('endnotes'), 'endnotes', BoolValue,
 
 390         Field('pdf-id',  'isbn_pdf',  required=False),
 
 391         Field('epub-id', 'isbn_epub', required=False),
 
 392         Field('mobi-id', 'isbn_mobi', required=False),
 
 393         Field('txt-id',  'isbn_txt',  required=False),
 
 394         Field('html-id', 'isbn_html', required=False),
 
 398 def parse(file_name, cls=BookInfo):
 
 399     return cls.from_file(file_name)