247 lines
11 KiB
Python
247 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
|
|
# Copyright (C) 2020 monkey
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
import os
|
|
from lxml import objectify
|
|
from urllib.request import urlopen
|
|
from lxml import etree
|
|
from html import escape
|
|
|
|
from cps import config, db, fs, gdriveutils, logger, ub
|
|
from cps.services.worker import CalibreTask, STAT_CANCELLED, STAT_ENDED
|
|
from flask_babel import lazy_gettext as N_
|
|
|
|
OPF_NAMESPACE = "http://www.idpf.org/2007/opf"
|
|
PURL_NAMESPACE = "http://purl.org/dc/elements/1.1/"
|
|
|
|
OPF = "{%s}" % OPF_NAMESPACE
|
|
PURL = "{%s}" % PURL_NAMESPACE
|
|
|
|
etree.register_namespace("opf", OPF_NAMESPACE)
|
|
etree.register_namespace("dc", PURL_NAMESPACE)
|
|
|
|
OPF_NS = {None: OPF_NAMESPACE} # the default namespace (no prefix)
|
|
NSMAP = {'dc': PURL_NAMESPACE, 'opf': OPF_NAMESPACE}
|
|
|
|
|
|
class TaskBackupMetadata(CalibreTask):
|
|
|
|
def __init__(self, export_language="en",
|
|
translated_title="Cover",
|
|
set_dirty=False,
|
|
task_message=N_('Backing up Metadata')):
|
|
super(TaskBackupMetadata, self).__init__(task_message)
|
|
self.log = logger.create()
|
|
self.calibre_db = db.CalibreDB(expire_on_commit=False, init=True)
|
|
self.export_language = export_language
|
|
self.translated_title = translated_title
|
|
self.set_dirty = set_dirty
|
|
|
|
def run(self, worker_thread):
|
|
if self.set_dirty:
|
|
self.set_all_books_dirty()
|
|
else:
|
|
self.backup_metadata()
|
|
|
|
def set_all_books_dirty(self):
|
|
try:
|
|
books = self.calibre_db.session.query(db.Books).all()
|
|
for book in books:
|
|
self.calibre_db.set_metadata_dirty(book.id)
|
|
self.calibre_db.session.commit()
|
|
self._handleSuccess()
|
|
except Exception as ex:
|
|
self.log.debug('Error adding book for backup: ' + str(ex))
|
|
self._handleError('Error adding book for backup: ' + str(ex))
|
|
self.calibre_db.session.rollback()
|
|
self.calibre_db.session.close()
|
|
|
|
def backup_metadata(self):
|
|
try:
|
|
metadata_backup = self.calibre_db.session.query(db.Metadata_Dirtied).all()
|
|
custom_columns = self.calibre_db.session.query(db.CustomColumns).order_by(db.CustomColumns.label).all()
|
|
count = len(metadata_backup)
|
|
i = 0
|
|
for backup in metadata_backup:
|
|
book = self.calibre_db.session.query(db.Books).filter(db.Books.id == backup.book).one_or_none()
|
|
self.calibre_db.session.query(db.Metadata_Dirtied).filter(
|
|
db.Metadata_Dirtied.book == backup.book).delete()
|
|
self.calibre_db.session.commit()
|
|
if book:
|
|
self.open_metadata(book, custom_columns)
|
|
else:
|
|
self.log.error("Book {} not found in database".format(backup.book))
|
|
# self._handleError("Book {} not found in database".format(backup.book))
|
|
i += 1
|
|
self.progress = (1.0 / count) * i
|
|
self._handleSuccess()
|
|
self.calibre_db.session.close()
|
|
|
|
except Exception as ex:
|
|
b = "NaN" if not hasattr(book, 'id') else book.id
|
|
self.log.debug('Error creating metadata backup for book {}: '.format(b) + str(ex))
|
|
self._handleError('Error creating metadata backup: ' + str(ex))
|
|
self.calibre_db.session.rollback()
|
|
self.calibre_db.session.close()
|
|
|
|
def open_metadata(self, book, custom_columns):
|
|
if config.config_use_google_drive:
|
|
if not gdriveutils.is_gdrive_ready():
|
|
raise Exception('Google Drive is configured but not ready')
|
|
|
|
web_content_link = gdriveutils.get_metadata_backup_via_gdrive(book.path)
|
|
if not web_content_link:
|
|
raise Exception('Google Drive cover url not found')
|
|
|
|
stream = None
|
|
try:
|
|
stream = urlopen(web_content_link)
|
|
except Exception as ex:
|
|
# Bubble exception to calling function
|
|
self.log.debug('Error reading metadata.opf: ' + str(ex)) # ToDo Check whats going on
|
|
raise ex
|
|
finally:
|
|
if stream is not None:
|
|
stream.close()
|
|
else:
|
|
# ToDo: Handle book folder not found or not readable
|
|
book_metadata_filepath = os.path.join(config.config_calibre_dir, book.path, 'metadata.opf')
|
|
#if not os.path.isfile(book_metadata_filepath):
|
|
self.create_new_metadata_backup(book, custom_columns, book_metadata_filepath)
|
|
# else:
|
|
'''namespaces = {'dc': PURL_NAMESPACE, 'opf': OPF_NAMESPACE}
|
|
test = etree.parse(book_metadata_filepath)
|
|
root = test.getroot()
|
|
for i in root.iter():
|
|
self.log.info(i)
|
|
title = root.find("dc:metadata", namespaces)
|
|
pass
|
|
with open(book_metadata_filepath, "rb") as f:
|
|
xml = f.read()
|
|
|
|
root = objectify.fromstring(xml)
|
|
# root.metadata['{http://purl.org/dc/elements/1.1/}title']
|
|
# root.metadata[PURL + 'title']
|
|
# getattr(root.metadata, PURL +'title')
|
|
# test = objectify.parse()
|
|
pass
|
|
# backup not found has to be created
|
|
#raise Exception('Book cover file not found')'''
|
|
|
|
def create_new_metadata_backup(self, book, custom_columns, book_metadata_filepath):
|
|
# generate root package element
|
|
package = etree.Element(OPF + "package", nsmap=OPF_NS)
|
|
package.set("unique-identifier", "uuid_id")
|
|
package.set("version", "2.0")
|
|
|
|
# generate metadata element and all subelements of it
|
|
metadata = etree.SubElement(package, "metadata", nsmap=NSMAP)
|
|
identifier = etree.SubElement(metadata, PURL + "identifier", id="calibre_id", nsmap=NSMAP)
|
|
identifier.set(OPF + "scheme", "calibre")
|
|
identifier.text = str(book.id)
|
|
identifier2 = etree.SubElement(metadata, PURL + "identifier", id="uuid_id", nsmap=NSMAP)
|
|
identifier2.set(OPF + "scheme", "uuid")
|
|
identifier2.text = book.uuid
|
|
title = etree.SubElement(metadata, PURL + "title", nsmap=NSMAP)
|
|
title.text = book.title
|
|
for author in book.authors:
|
|
creator = etree.SubElement(metadata, PURL + "creator", nsmap=NSMAP)
|
|
creator.text = str(author.name)
|
|
creator.set(OPF + "file-as", book.author_sort) # ToDo Check
|
|
creator.set(OPF + "role", "aut")
|
|
contributor = etree.SubElement(metadata, PURL + "contributor", nsmap=NSMAP)
|
|
contributor.text = "calibre (5.7.2) [https://calibre-ebook.com]"
|
|
contributor.set(OPF + "file-as", "calibre") # ToDo Check
|
|
contributor.set(OPF + "role", "bkp")
|
|
|
|
date = etree.SubElement(metadata, PURL + "date", nsmap=NSMAP)
|
|
date.text = '{d.year:04}-{d.month:02}-{d.day:02}T{d.hour:02}:{d.minute:02}:{d.second:02}'.format(d=book.pubdate)
|
|
if book.comments:
|
|
for b in book.comments:
|
|
description = etree.SubElement(metadata, PURL + "description", nsmap=NSMAP)
|
|
description.text = b.text
|
|
if not book.languages:
|
|
language = etree.SubElement(metadata, PURL + "language", nsmap=NSMAP)
|
|
language.text = self.export_language
|
|
else:
|
|
for b in book.languages:
|
|
language = etree.SubElement(metadata, PURL + "language", nsmap=NSMAP)
|
|
language.text = str(b.lang_code)
|
|
for b in book.tags:
|
|
subject = etree.SubElement(metadata, PURL + "subject", nsmap=NSMAP)
|
|
subject.text = str(b.name)
|
|
etree.SubElement(metadata, "meta", name="calibre:author_link_map",
|
|
content="{" + ", ".join(['"' + str(a.name) + '": ""' for a in book.authors]) + "}",
|
|
nsmap=NSMAP)
|
|
for b in book.series:
|
|
etree.SubElement(metadata, "meta", name="calibre:series",
|
|
content=str(str(b.name)),
|
|
nsmap=NSMAP)
|
|
if book.series:
|
|
etree.SubElement(metadata, "meta", name="calibre:series_index",
|
|
content=str(book.series_index),
|
|
nsmap=NSMAP)
|
|
etree.SubElement(metadata, "meta", name="calibre:timestamp",
|
|
content='{d.year:04}-{d.month:02}-{d.day:02}T{d.hour:02}:{d.minute:02}:{d.second:02}'.format(
|
|
d=book.timestamp),
|
|
nsmap=NSMAP)
|
|
etree.SubElement(metadata, "meta", name="calibre:title_sort",
|
|
content=book.sort,
|
|
nsmap=NSMAP)
|
|
sequence = 0
|
|
for cc in custom_columns:
|
|
value = None
|
|
extra = None
|
|
cc_entry = getattr(book, "custom_column_" + str(cc.id))
|
|
if cc_entry.__len__():
|
|
value = cc_entry[0].get("value")
|
|
extra = cc_entry[0].get("extra")
|
|
etree.SubElement(metadata, "meta", name="calibre:user_metadata:#{}".format(cc.label),
|
|
content=cc.to_json(value, extra, sequence),
|
|
nsmap=NSMAP)
|
|
sequence += 1
|
|
|
|
# generate guide element and all sub elements of it
|
|
# Title is translated from default export language
|
|
guide = etree.SubElement(package, "guide")
|
|
etree.SubElement(guide, "reference", type="cover", title=self.translated_title, href="cover.jpg")
|
|
|
|
# prepare finalize everything and output
|
|
doc = etree.ElementTree(package)
|
|
# doc = etree.tostring(package, xml_declaration=True, encoding='utf-8', pretty_print=True) # .replace(b"&quot;", b""")
|
|
try:
|
|
with open(book_metadata_filepath, 'wb') as f:
|
|
# f.write(doc)
|
|
doc.write(f, xml_declaration=True, encoding='utf-8', pretty_print=True)
|
|
except Exception:
|
|
# ToDo: Folder not writeable error
|
|
pass
|
|
@property
|
|
def name(self):
|
|
return "Metadata backup"
|
|
|
|
# needed for logging
|
|
def __str__(self):
|
|
if self.set_dirty:
|
|
return "Queue all books for metadata backup"
|
|
else:
|
|
return "Perform metadata backup"
|
|
|
|
@property
|
|
def is_cancellable(self):
|
|
return True
|