calibre-web/cps/helper.py

1044 lines
46 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# This file is part of the Calibre-Web (https://github.com/janeczku/calibre-web)
# Copyright (C) 2012-2019 cervinko, idalin, SiphonSquirrel, ouzklcn, akushsky,
# OzzieIsaacs, bodybybuddha, jkrehm, matthazinski, janeczku
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import io
import mimetypes
import re
import shutil
import socket
from datetime import datetime, timedelta
from tempfile import gettempdir
import requests
import unidecode
from flask import send_from_directory, make_response, redirect, abort, url_for
from flask_babel import gettext as _
from flask_babel import lazy_gettext as N_
2022-09-19 18:56:22 +02:00
from flask_babel import get_locale
2018-11-03 13:43:38 +01:00
from flask_login import current_user
from sqlalchemy.sql.expression import true, false, and_, or_, text, func
from sqlalchemy.exc import InvalidRequestError, OperationalError
from werkzeug.datastructures import Headers
from werkzeug.security import generate_password_hash
from markupsafe import escape
from urllib.parse import quote
try:
import advocate
from advocate.exceptions import UnacceptableAddressException
use_advocate = True
except ImportError:
use_advocate = False
advocate = requests
UnacceptableAddressException = MissingSchema = BaseException
from . import calibre_db, cli_param
from .tasks.convert import TaskConvert
from . import logger, config, db, ub, fs
from . import gdriveutils as gd
from .constants import STATIC_DIR as _STATIC_DIR, CACHE_TYPE_THUMBNAILS, THUMBNAIL_TYPE_COVER, THUMBNAIL_TYPE_SERIES
from .subproc_wrapper import process_wait
from .services.worker import WorkerThread
from .tasks.mail import TaskEmail
from .tasks.thumbnail import TaskClearCoverThumbnailCache, TaskGenerateCoverThumbnails
2022-09-19 18:56:22 +02:00
from .tasks.metadata_backup import TaskBackupMetadata
log = logger.create()
2020-12-09 14:18:39 +01:00
try:
from wand.image import Image
from wand.exceptions import MissingDelegateError, BlobError
2020-12-09 14:18:39 +01:00
use_IM = True
except (ImportError, RuntimeError) as e:
log.debug('Cannot import Image, generating covers from non jpg files will not work: %s', e)
use_IM = False
MissingDelegateError = BaseException
2020-12-09 14:18:39 +01:00
# Convert existing book entry to new format
def convert_book_format(book_id, calibre_path, old_book_format, new_book_format, user_id, ereader_mail=None):
2020-05-23 10:16:29 +02:00
book = calibre_db.get_book(book_id)
data = calibre_db.get_book_format(book.id, old_book_format)
2016-03-27 23:36:51 +02:00
if not data:
error_message = _(u"%(format)s format not found for book id: %(book)d", format=old_book_format, book=book_id)
log.error("convert_book_format: %s", error_message)
return error_message
file_path = os.path.join(calibre_path, book.path, data.name)
if config.config_use_google_drive:
if not gd.getFileFromEbooksFolder(book.path, data.name + "." + old_book_format.lower()):
error_message = _(u"%(format)s not found on Google Drive: %(fn)s",
format=old_book_format, fn=data.name + "." + old_book_format.lower())
return error_message
else:
if not os.path.exists(file_path + "." + old_book_format.lower()):
error_message = _(u"%(format)s not found: %(fn)s",
format=old_book_format, fn=data.name + "." + old_book_format.lower())
return error_message
# read settings and append converter task to queue
if ereader_mail:
settings = config.get_mail_settings()
settings['subject'] = _('Send to E-Reader') # pretranslate Subject for e-mail
settings['body'] = _(u'This e-mail has been sent via Calibre-Web.')
else:
settings = dict()
link = '<a href="{}">{}</a>'.format(url_for('web.show_book', book_id=book.id), escape(book.title)) # prevent xss
txt = u"{} -> {}: {}".format(
old_book_format.upper(),
new_book_format.upper(),
link)
settings['old_book_format'] = old_book_format
settings['new_book_format'] = new_book_format
WorkerThread.add(user_id, TaskConvert(file_path, book.id, txt, settings, ereader_mail, user_id))
return None
# Texts are not lazy translated as they are supposed to get send out as is
def send_test_mail(ereader_mail, user_name):
WorkerThread.add(user_name, TaskEmail(_(u'Calibre-Web test e-mail'), None, None,
config.get_mail_settings(), ereader_mail, N_(u"Test e-mail"),
2020-12-09 14:18:39 +01:00
_(u'This e-mail has been sent via Calibre-Web.')))
return
# Send registration email or password reset email, depending on parameter resend (False means welcome email)
def send_registration_mail(e_mail, user_name, default_password, resend=False):
2020-12-09 14:18:39 +01:00
txt = "Hello %s!\r\n" % user_name
if not resend:
2020-12-09 14:18:39 +01:00
txt += "Your new account at Calibre-Web has been created. Thanks for joining us!\r\n"
2022-07-01 15:26:06 +02:00
txt += "Please log in to your account using the following information:\r\n"
2020-12-09 14:18:39 +01:00
txt += "User name: %s\r\n" % user_name
txt += "Password: %s\r\n" % default_password
txt += "Don't forget to change your password after first login.\r\n"
txt += "Sincerely\r\n\r\n"
txt += "Your Calibre-Web team"
2020-08-25 06:05:20 +02:00
WorkerThread.add(None, TaskEmail(
2020-08-29 11:14:52 +02:00
subject=_(u'Get Started with Calibre-Web'),
2020-08-25 06:05:20 +02:00
filepath=None,
attachment=None,
settings=config.get_mail_settings(),
recipient=e_mail,
task_message=N_(u"Registration e-mail for user: %(name)s", name=user_name),
2020-12-09 14:18:39 +01:00
text=txt
2020-08-25 06:05:20 +02:00
))
return
def check_send_to_ereader_with_converter(formats):
book_formats = list()
2022-05-16 07:59:35 +02:00
if 'MOBI' in formats and 'EPUB' not in formats:
book_formats.append({'format': 'Epub',
'convert': 1,
'text': _('Convert %(orig)s to %(format)s and send to E-Reader',
orig='Mobi',
format='Epub')})
if 'AZW3' in formats and 'EPUB' not in formats:
book_formats.append({'format': 'Epub',
'convert': 2,
'text': _('Convert %(orig)s to %(format)s and send to E-Reader',
orig='Azw3',
format='Epub')})
return book_formats
2021-03-14 13:28:52 +01:00
def check_send_to_ereader(entry):
"""
returns all available book formats for sending to E-Reader
"""
formats = list()
book_formats = list()
if len(entry.data):
for ele in iter(entry.data):
if ele.uncompressed_size < config.mail_size:
formats.append(ele.format)
if 'EPUB' in formats:
book_formats.append({'format': 'Epub',
'convert': 0,
'text': _('Send %(format)s to E-Reader', format='Epub')})
if 'MOBI' in formats:
book_formats.append({'format': 'Mobi',
'convert': 0,
'text': _('Send %(format)s to E-Reader', format='Mobi')})
if 'PDF' in formats:
book_formats.append({'format': 'Pdf',
'convert': 0,
'text': _('Send %(format)s to E-Reader', format='Pdf')})
if 'AZW' in formats:
book_formats.append({'format': 'Azw',
'convert': 0,
'text': _('Send %(format)s to E-Reader', format='Azw')})
if config.config_converterpath:
book_formats.extend(check_send_to_ereader_with_converter(formats))
return book_formats
else:
log.error(u'Cannot find book entry %d', entry.id)
return None
# Check if a reader is existing for any of the book formats, if not, return empty list, otherwise return
# list with supported formats
def check_read_formats(entry):
extensions_reader = {'TXT', 'PDF', 'EPUB', 'CBZ', 'CBT', 'CBR', 'DJVU'}
book_formats = list()
if len(entry.data):
for ele in iter(entry.data):
if ele.format.upper() in extensions_reader:
book_formats.append(ele.format.lower())
return book_formats
# Files are processed in the following order/priority:
# 1: If Mobi file is existing, it's directly send to E-Reader email,
# 2: If Epub file is existing, it's converted and send to E-Reader email,
# 3: If Pdf file is existing, it's directly send to E-Reader email
def send_mail(book_id, book_format, convert, ereader_mail, calibrepath, user_id):
"""Send email with attachments"""
2020-05-23 10:16:29 +02:00
book = calibre_db.get_book(book_id)
if convert == 1:
2018-11-03 13:43:38 +01:00
# returns None if success, otherwise errormessage
return convert_book_format(book_id, calibrepath, u'epub', book_format.lower(), user_id, ereader_mail)
if convert == 2:
# returns None if success, otherwise errormessage
return convert_book_format(book_id, calibrepath, u'azw3', book_format.lower(), user_id, ereader_mail)
for entry in iter(book.data):
if entry.format.upper() == book_format.upper():
converted_file_name = entry.name + '.' + book_format.lower()
link = '<a href="{}">{}</a>'.format(url_for('web.show_book', book_id=book_id), escape(book.title))
email_text = N_(u"%(book)s send to E-Reader", book=link)
WorkerThread.add(user_id, TaskEmail(_(u"Send to E-Reader"), book.path, converted_file_name,
config.get_mail_settings(), ereader_mail,
email_text, _(u'This e-mail has been sent via Calibre-Web.')))
return
return _(u"The requested file could not be read. Maybe wrong permissions?")
def get_valid_filename(value, replace_whitespace=True, chars=128):
"""
Returns the given string converted to a string that can be used for a clean
filename. Limits num characters to 128 max.
"""
if value[-1:] == u'.':
value = value[:-1]+u'_'
2017-09-16 19:57:00 +02:00
value = value.replace("/", "_").replace(":", "_").strip('\0')
if config.config_unicode_filename:
value = (unidecode.unidecode(value))
2016-04-03 23:52:32 +02:00
if replace_whitespace:
# *+:\"/<>? are replaced by _
value = re.sub(r'[*+:\\\"/<>?]+', u'_', value, flags=re.U)
2017-11-28 08:54:21 +01:00
# pipe has to be replaced with comma
value = re.sub(r'[|]+', u',', value, flags=re.U)
2022-05-04 19:22:18 +02:00
value = value.encode('utf-8')[:chars].decode('utf-8', errors='ignore').strip()
if not value:
raise ValueError("Filename cannot be empty")
return value
def split_authors(values):
authors_list = []
for value in values:
authors = re.split('[&;]', value)
for author in authors:
commas = author.count(',')
if commas == 1:
author_split = author.split(',')
authors_list.append(author_split[1].strip() + ' ' + author_split[0].strip())
elif commas > 1:
authors_list.extend([x.strip() for x in author.split(',')])
else:
authors_list.append(author.strip())
return authors_list
def get_sorted_author(value):
value2 = None
try:
2018-10-30 21:47:33 +01:00
if ',' not in value:
regexes = [r"^(JR|SR)\.?$", r"^I{1,3}\.?$", r"^IV\.?$"]
2018-10-30 21:47:33 +01:00
combined = "(" + ")|(".join(regexes) + ")"
value = value.split(" ")
if re.match(combined, value[-1].upper()):
if len(value) > 1:
value2 = value[-2] + ", " + " ".join(value[:-2]) + " " + value[-1]
else:
value2 = value[0]
2018-10-30 21:47:33 +01:00
elif len(value) == 1:
value2 = value[0]
else:
value2 = value[-1] + ", " + " ".join(value[:-1])
else:
2018-10-30 21:47:33 +01:00
value2 = value
except Exception as ex:
log.error("Sorting author %s failed: %s", value, ex)
if isinstance(list, value2):
value2 = value[0]
else:
value2 = value
return value2
def edit_book_read_status(book_id, read_status=None):
if not config.config_read_column:
book = ub.session.query(ub.ReadBook).filter(and_(ub.ReadBook.user_id == int(current_user.id),
ub.ReadBook.book_id == book_id)).first()
if book:
if read_status is None:
if book.read_status == ub.ReadBook.STATUS_FINISHED:
book.read_status = ub.ReadBook.STATUS_UNREAD
else:
book.read_status = ub.ReadBook.STATUS_FINISHED
else:
book.read_status = ub.ReadBook.STATUS_FINISHED if read_status else ub.ReadBook.STATUS_UNREAD
else:
read_book = ub.ReadBook(user_id=current_user.id, book_id=book_id)
read_book.read_status = ub.ReadBook.STATUS_FINISHED
book = read_book
if not book.kobo_reading_state:
kobo_reading_state = ub.KoboReadingState(user_id=current_user.id, book_id=book_id)
kobo_reading_state.current_bookmark = ub.KoboBookmark()
kobo_reading_state.statistics = ub.KoboStatistics()
book.kobo_reading_state = kobo_reading_state
ub.session.merge(book)
ub.session_commit("Book {} readbit toggled".format(book_id))
else:
try:
calibre_db.update_title_sort(config)
book = calibre_db.get_filtered_book(book_id)
book_read_status = getattr(book, 'custom_column_' + str(config.config_read_column))
if len(book_read_status):
if read_status is None:
book_read_status[0].value = not book_read_status[0].value
else:
book_read_status[0].value = read_status is True
calibre_db.session.commit()
else:
cc_class = db.cc_classes[config.config_read_column]
new_cc = cc_class(value=read_status or 1, book=book_id)
calibre_db.session.add(new_cc)
calibre_db.session.commit()
except (KeyError, AttributeError, IndexError):
log.error(
"Custom Column No.{} does not exist in calibre database".format(config.config_read_column))
return "Custom Column No.{} does not exist in calibre database".format(config.config_read_column)
except (OperationalError, InvalidRequestError) as ex:
calibre_db.session.rollback()
log.error(u"Read status could not set: {}".format(ex))
return _("Read status could not set: {}".format(ex.orig))
return ""
2022-04-28 20:57:09 +02:00
# Deletes a book from the local filestorage, returns True if deleting is successful, otherwise false
def delete_book_file(book, calibrepath, book_format=None):
2022-04-28 20:57:09 +02:00
# check that path is 2 elements deep, check that target path has no sub folders
if book.path.count('/') == 1:
path = os.path.join(calibrepath, book.path)
if book_format:
for file in os.listdir(path):
if file.upper().endswith("."+book_format):
os.remove(os.path.join(path, file))
return True, None
else:
if os.path.isdir(path):
try:
for root, folders, files in os.walk(path):
for f in files:
os.unlink(os.path.join(root, f))
if len(folders):
log.warning("Deleting book {} failed, path {} has subfolders: {}".format(book.id,
book.path, folders))
return True, _("Deleting bookfolder for book %(id)s failed, path has subfolders: %(path)s",
id=book.id,
path=book.path)
shutil.rmtree(path)
except (IOError, OSError) as ex:
log.error("Deleting book %s failed: %s", book.id, ex)
return False, _("Deleting book %(id)s failed: %(message)s", id=book.id, message=ex)
authorpath = os.path.join(calibrepath, os.path.split(book.path)[0])
if not os.listdir(authorpath):
try:
shutil.rmtree(authorpath)
except (IOError, OSError) as ex:
log.error("Deleting authorpath for book %s failed: %s", book.id, ex)
return True, None
log.error("Deleting book %s from database only, book path in database not valid: %s",
book.id, book.path)
return True, _("Deleting book %(id)s from database only, book path in database not valid: %(path)s",
id=book.id,
path=book.path)
2021-11-13 14:57:01 +01:00
def clean_author_database(renamed_author, calibre_path="", local_book=None, gdrive=None):
valid_filename_authors = [get_valid_filename(r, chars=96) for r in renamed_author]
2021-11-13 14:57:01 +01:00
for r in renamed_author:
if local_book:
all_books = [local_book]
else:
all_books = calibre_db.session.query(db.Books) \
.filter(db.Books.authors.any(db.Authors.name == r)).all()
for book in all_books:
book_author_path = book.path.split('/')[0]
if book_author_path in valid_filename_authors or local_book:
new_author = calibre_db.session.query(db.Authors).filter(db.Authors.name == r).first()
all_new_authordir = get_valid_filename(new_author.name, chars=96)
2021-11-13 14:57:01 +01:00
all_titledir = book.path.split('/')[1]
all_new_path = os.path.join(calibre_path, all_new_authordir, all_titledir)
all_new_name = get_valid_filename(book.title, chars=42) + ' - ' \
+ get_valid_filename(new_author.name, chars=42)
2021-11-13 14:57:01 +01:00
# change location in database to new author/title path
book.path = os.path.join(all_new_authordir, all_titledir).replace('\\', '/')
for file_format in book.data:
2022-01-23 19:31:56 +01:00
if not gdrive:
shutil.move(os.path.normcase(os.path.join(all_new_path,
file_format.name + '.' + file_format.format.lower())),
os.path.normcase(os.path.join(all_new_path,
all_new_name + '.' + file_format.format.lower())))
2022-01-23 19:31:56 +01:00
else:
g_file = gd.getFileFromEbooksFolder(all_new_path,
file_format.name + '.' + file_format.format.lower())
if g_file:
gd.moveGdriveFileRemote(g_file, all_new_name + u'.' + file_format.format.lower())
gd.updateDatabaseOnEdit(g_file['id'], all_new_name + u'.' + file_format.format.lower())
2022-01-23 19:31:56 +01:00
else:
log.error("File {} not found on gdrive"
.format(all_new_path, file_format.name + '.' + file_format.format.lower()))
2021-11-13 14:57:01 +01:00
file_format.name = all_new_name
def rename_all_authors(first_author, renamed_author, calibre_path="", localbook=None, gdrive=False):
# Create new_author_dir from parameter or from database
# Create new title_dir from database and add id
if first_author:
new_authordir = get_valid_filename(first_author, chars=96)
for r in renamed_author:
new_author = calibre_db.session.query(db.Authors).filter(db.Authors.name == r).first()
old_author_dir = get_valid_filename(r, chars=96)
new_author_rename_dir = get_valid_filename(new_author.name, chars=96)
if gdrive:
g_file = gd.getFileFromEbooksFolder(None, old_author_dir)
if g_file:
gd.moveGdriveFolderRemote(g_file, new_author_rename_dir)
else:
if os.path.isdir(os.path.join(calibre_path, old_author_dir)):
try:
old_author_path = os.path.join(calibre_path, old_author_dir)
new_author_path = os.path.join(calibre_path, new_author_rename_dir)
shutil.move(os.path.normcase(old_author_path), os.path.normcase(new_author_path))
except OSError as ex:
log.error("Rename author from: %s to %s: %s", old_author_path, new_author_path, ex)
log.debug(ex, exc_info=True)
return _("Rename author from: '%(src)s' to '%(dest)s' failed with error: %(error)s",
src=old_author_path, dest=new_author_path, error=str(ex))
else:
new_authordir = get_valid_filename(localbook.authors[0].name, chars=96)
return new_authordir
2020-09-07 21:26:59 +02:00
# Moves files in file storage during author/title rename, or from temp dir to file storage
2022-02-05 09:06:14 +01:00
def update_dir_structure_file(book_id, calibre_path, first_author, original_filepath, db_filename, renamed_author):
2020-09-07 21:26:59 +02:00
# get book database entry from id, if original path overwrite source with original_filepath
local_book = calibre_db.get_book(book_id)
2022-02-05 09:06:14 +01:00
if original_filepath:
path = original_filepath
2020-09-07 21:26:59 +02:00
else:
path = os.path.join(calibre_path, local_book.path)
2017-03-30 21:17:18 +02:00
# Create (current) author_dir and title_dir from database
author_dir = local_book.path.split('/')[0]
title_dir = local_book.path.split('/')[1]
2020-09-07 21:26:59 +02:00
# Create new_author_dir from parameter or from database
# Create new title_dir from database and add id
new_author_dir = rename_all_authors(first_author, renamed_author, calibre_path, local_book)
if first_author:
if first_author.lower() in [r.lower() for r in renamed_author]:
if os.path.isdir(os.path.join(calibre_path, new_author_dir)):
path = os.path.join(calibre_path, new_author_dir, title_dir)
new_title_dir = get_valid_filename(local_book.title, chars=96) + " (" + str(book_id) + ")"
2017-04-03 20:05:55 +02:00
if title_dir != new_title_dir or author_dir != new_author_dir or original_filepath:
2022-02-05 09:06:14 +01:00
error = move_files_on_change(calibre_path,
new_author_dir,
new_title_dir,
local_book,
2022-02-05 09:06:14 +01:00
db_filename,
original_filepath,
path)
if error:
return error
2020-09-07 21:26:59 +02:00
# Rename all files from old names to new names
return rename_files_on_change(first_author, renamed_author, local_book, original_filepath, path, calibre_path)
2022-02-05 09:06:14 +01:00
def upload_new_file_gdrive(book_id, first_author, renamed_author, title, title_dir, original_filepath, filename_ext):
book = calibre_db.get_book(book_id)
file_name = get_valid_filename(title, chars=42) + ' - ' + \
get_valid_filename(first_author, chars=42) + filename_ext
rename_all_authors(first_author, renamed_author, gdrive=True)
gdrive_path = os.path.join(get_valid_filename(first_author, chars=96),
title_dir + " (" + str(book_id) + ")")
book.path = gdrive_path.replace("\\", "/")
gd.uploadFileToEbooksFolder(os.path.join(gdrive_path, file_name).replace("\\", "/"), original_filepath)
return rename_files_on_change(first_author, renamed_author, local_book=book, gdrive=True)
def update_dir_structure_gdrive(book_id, first_author, renamed_author):
book = calibre_db.get_book(book_id)
authordir = book.path.split('/')[0]
titledir = book.path.split('/')[1]
new_authordir = rename_all_authors(first_author, renamed_author, gdrive=True)
new_titledir = get_valid_filename(book.title, chars=96) + u" (" + str(book_id) + u")"
if titledir != new_titledir:
g_file = gd.getFileFromEbooksFolder(os.path.dirname(book.path), titledir)
if g_file:
gd.moveGdriveFileRemote(g_file, new_titledir)
book.path = book.path.split('/')[0] + u'/' + new_titledir
gd.updateDatabaseOnEdit(g_file['id'], book.path) # only child folder affected
else:
return _(u'File %(file)s not found on Google Drive', file=book.path) # file not found
if authordir != new_authordir and authordir not in renamed_author:
g_file = gd.getFileFromEbooksFolder(os.path.dirname(book.path), new_titledir)
if g_file:
gd.moveGdriveFolderRemote(g_file, new_authordir)
book.path = new_authordir + u'/' + book.path.split('/')[1]
gd.updateDatabaseOnEdit(g_file['id'], book.path)
else:
return _(u'File %(file)s not found on Google Drive', file=authordir) # file not found
# change location in database to new author/title path
book.path = os.path.join(new_authordir, new_titledir).replace('\\', '/')
return rename_files_on_change(first_author, renamed_author, book, gdrive=True)
2022-02-05 09:06:14 +01:00
def move_files_on_change(calibre_path, new_authordir, new_titledir, localbook, db_filename, original_filepath, path):
new_path = os.path.join(calibre_path, new_authordir, new_titledir)
new_name = get_valid_filename(localbook.title, chars=96) + ' - ' + new_authordir
try:
2022-02-05 09:06:14 +01:00
if original_filepath:
if not os.path.isdir(new_path):
os.makedirs(new_path)
shutil.move(os.path.normcase(original_filepath), os.path.normcase(os.path.join(new_path, db_filename)))
log.debug("Moving title: %s to %s/%s", original_filepath, new_path, new_name)
else:
# Check new path is not valid path
if not os.path.exists(new_path):
# move original path to new path
log.debug("Moving title: %s to %s", path, new_path)
shutil.move(os.path.normcase(path), os.path.normcase(new_path))
else: # path is valid copy only files to new location (merge)
2022-02-05 09:06:14 +01:00
log.info("Moving title: %s into existing: %s", path, new_path)
# Take all files and subfolder from old path (strange command)
for dir_name, __, file_list in os.walk(path):
for file in file_list:
shutil.move(os.path.normcase(os.path.join(dir_name, file)),
os.path.normcase(os.path.join(new_path + dir_name[len(path):], file)))
2022-02-05 09:06:14 +01:00
# change location in database to new author/title path
localbook.path = os.path.join(new_authordir, new_titledir).replace('\\', '/')
2022-02-05 09:06:14 +01:00
except OSError as ex:
log.error_or_exception("Rename title from {} to {} failed with error: {}".format(path, new_path, ex))
2022-02-05 09:06:14 +01:00
return _("Rename title from: '%(src)s' to '%(dest)s' failed with error: %(error)s",
src=path, dest=new_path, error=str(ex))
return False
def rename_files_on_change(first_author,
renamed_author,
local_book,
original_filepath="",
2022-02-05 09:06:14 +01:00
path="",
calibre_path="",
gdrive=False):
# Rename all files from old names to new names
try:
clean_author_database(renamed_author, calibre_path, gdrive=gdrive)
if first_author and first_author not in renamed_author:
clean_author_database([first_author], calibre_path, local_book, gdrive)
if not gdrive and not renamed_author and not original_filepath and len(os.listdir(os.path.dirname(path))) == 0:
shutil.rmtree(os.path.dirname(path))
except (OSError, FileNotFoundError) as ex:
log.error_or_exception("Error in rename file in path {}".format(ex))
return _("Error in rename file in path: {}".format(str(ex)))
return False
2022-02-05 09:06:14 +01:00
def delete_book_gdrive(book, book_format):
error = None
if book_format:
name = ''
for entry in book.data:
if entry.format.upper() == book_format:
name = entry.name + '.' + book_format
g_file = gd.getFileFromEbooksFolder(book.path, name)
else:
g_file = gd.getFileFromEbooksFolder(os.path.dirname(book.path), book.path.split('/')[1])
if g_file:
gd.deleteDatabaseEntry(g_file['id'])
g_file.Trash()
else:
2020-04-19 19:08:58 +02:00
error = _(u'Book path %(path)s not found on Google Drive', path=book.path) # file not found
return error is None, error
def reset_password(user_id):
existing_user = ub.session.query(ub.User).filter(ub.User.id == user_id).first()
if not existing_user:
return 0, None
2019-12-28 16:18:21 +01:00
if not config.get_mail_server_configured():
2020-04-19 19:08:58 +02:00
return 2, None
try:
password = generate_random_password()
existing_user.password = generate_password_hash(password)
ub.session.commit()
send_registration_mail(existing_user.email, existing_user.name, password, True)
return 1, existing_user.name
except Exception:
ub.session.rollback()
2020-04-19 19:08:58 +02:00
return 0, None
def generate_random_password():
s = "abcdefghijklmnopqrstuvwxyz01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%&*()?"
passlen = 8
2021-10-04 18:26:46 +02:00
return "".join(s[c % len(s)] for c in os.urandom(passlen))
def uniq(inpt):
output = []
inpt = [" ".join(inp.split()) for inp in inpt]
for x in inpt:
if x not in output:
output.append(x)
return output
def check_email(email):
email = valid_email(email)
if ub.session.query(ub.User).filter(func.lower(ub.User.email) == email.lower()).first():
log.error(u"Found an existing account for this e-mail address")
raise Exception(_(u"Found an existing account for this e-mail address"))
return email
def check_username(username):
username = username.strip()
if ub.session.query(ub.User).filter(func.lower(ub.User.name) == username.lower()).scalar():
log.error(u"This username is already taken")
raise Exception(_(u"This username is already taken"))
return username
def valid_email(email):
email = email.strip()
# if email is not deleted
if email:
# Regex according to https://developer.mozilla.org/en-US/docs/Web/HTML/Element/input/email#validation
if not re.search(r"^[\w.!#$%&'*+\\/=?^_`{|}~-]+@[\w](?:[\w-]{0,61}[\w])?(?:\.[\w](?:[\w-]{0,61}[\w])?)*$",
email):
log.error(u"Invalid e-mail address format")
raise Exception(_(u"Invalid e-mail address format"))
return email
# ################################# External interface #################################
2020-04-19 19:08:58 +02:00
2021-11-11 15:46:32 +01:00
def update_dir_structure(book_id,
2022-02-05 09:06:14 +01:00
calibre_path,
first_author=None, # change author of book to this author
original_filepath=None,
db_filename=None,
renamed_author=None):
2022-01-26 18:11:42 +01:00
renamed_author = renamed_author or []
if config.config_use_google_drive:
2021-12-05 19:01:23 +01:00
return update_dir_structure_gdrive(book_id, first_author, renamed_author)
else:
2021-11-11 15:46:32 +01:00
return update_dir_structure_file(book_id,
2022-02-05 09:06:14 +01:00
calibre_path,
2021-11-11 15:46:32 +01:00
first_author,
2022-02-05 09:06:14 +01:00
original_filepath,
2021-11-11 15:46:32 +01:00
db_filename, renamed_author)
def delete_book(book, calibrepath, book_format):
2022-04-28 20:57:09 +02:00
if not book_format:
2022-09-10 18:26:52 +02:00
clear_cover_thumbnail_cache(book.id) ## here it breaks
calibre_db.delete_dirty_metadata(book.id)
if config.config_use_google_drive:
return delete_book_gdrive(book, book_format)
else:
return delete_book_file(book, calibrepath, book_format)
2019-12-17 18:00:35 +01:00
def get_cover_on_failure(use_generic_cover):
if use_generic_cover:
try:
return send_from_directory(_STATIC_DIR, "generic_cover.jpg")
except PermissionError:
log.error("No permission to access generic_cover.jpg file.")
abort(403)
abort(404)
2019-12-17 18:00:35 +01:00
2020-04-19 19:08:58 +02:00
def get_book_cover(book_id, resolution=None):
2020-05-23 10:16:29 +02:00
book = calibre_db.get_filtered_book(book_id, allow_show_archived=True)
return get_book_cover_internal(book, use_generic_cover_on_failure=True, resolution=resolution)
2019-12-17 18:00:35 +01:00
2020-04-19 19:08:58 +02:00
# Called only by kobo sync -> cover not found should be answered with 404 and not with default cover
def get_book_cover_with_uuid(book_uuid, resolution=None):
2020-05-23 10:16:29 +02:00
book = calibre_db.get_book_by_uuid(book_uuid)
return get_book_cover_internal(book, use_generic_cover_on_failure=False, resolution=resolution)
2020-04-19 19:08:58 +02:00
def get_book_cover_internal(book, use_generic_cover_on_failure, resolution=None):
2019-12-17 18:00:35 +01:00
if book and book.has_cover:
# Send the book cover thumbnail if it exists in cache
if resolution:
thumbnail = get_book_cover_thumbnail(book, resolution)
if thumbnail:
cache = fs.FileSystem()
if cache.get_cache_file_exists(thumbnail.filename, CACHE_TYPE_THUMBNAILS):
return send_from_directory(cache.get_cache_file_dir(thumbnail.filename, CACHE_TYPE_THUMBNAILS),
thumbnail.filename)
# Send the book cover from Google Drive if configured
if config.config_use_google_drive:
try:
if not gd.is_gdrive_ready():
2019-12-17 18:00:35 +01:00
return get_cover_on_failure(use_generic_cover_on_failure)
2020-04-19 19:08:58 +02:00
path = gd.get_cover_via_gdrive(book.path)
if path:
return redirect(path)
else:
log.error('{}/cover.jpg not found on Google Drive'.format(book.path))
2019-12-17 18:00:35 +01:00
return get_cover_on_failure(use_generic_cover_on_failure)
except Exception as ex:
log.error_or_exception(ex)
2019-12-17 18:00:35 +01:00
return get_cover_on_failure(use_generic_cover_on_failure)
# Send the book cover from the Calibre directory
else:
cover_file_path = os.path.join(config.config_calibre_dir, book.path)
if os.path.isfile(os.path.join(cover_file_path, "cover.jpg")):
return send_from_directory(cover_file_path, "cover.jpg")
else:
2019-12-17 18:00:35 +01:00
return get_cover_on_failure(use_generic_cover_on_failure)
else:
2019-12-17 18:00:35 +01:00
return get_cover_on_failure(use_generic_cover_on_failure)
def get_book_cover_thumbnail(book, resolution):
if book and book.has_cover:
return ub.session \
.query(ub.Thumbnail) \
.filter(ub.Thumbnail.type == THUMBNAIL_TYPE_COVER) \
.filter(ub.Thumbnail.entity_id == book.id) \
.filter(ub.Thumbnail.resolution == resolution) \
.filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.utcnow())) \
.first()
def get_series_thumbnail_on_failure(series_id, resolution):
book = calibre_db.session \
.query(db.Books) \
.join(db.books_series_link) \
.join(db.Series) \
.filter(db.Series.id == series_id) \
.filter(db.Books.has_cover == 1) \
.first()
return get_book_cover_internal(book, use_generic_cover_on_failure=True, resolution=resolution)
def get_series_cover_thumbnail(series_id, resolution=None):
return get_series_cover_internal(series_id, resolution)
def get_series_cover_internal(series_id, resolution=None):
# Send the series thumbnail if it exists in cache
if resolution:
thumbnail = get_series_thumbnail(series_id, resolution)
if thumbnail:
cache = fs.FileSystem()
if cache.get_cache_file_exists(thumbnail.filename, CACHE_TYPE_THUMBNAILS):
return send_from_directory(cache.get_cache_file_dir(thumbnail.filename, CACHE_TYPE_THUMBNAILS),
thumbnail.filename)
return get_series_thumbnail_on_failure(series_id, resolution)
def get_series_thumbnail(series_id, resolution):
return ub.session \
.query(ub.Thumbnail) \
.filter(ub.Thumbnail.type == THUMBNAIL_TYPE_SERIES) \
.filter(ub.Thumbnail.entity_id == series_id) \
.filter(ub.Thumbnail.resolution == resolution) \
.filter(or_(ub.Thumbnail.expiration.is_(None), ub.Thumbnail.expiration > datetime.utcnow())) \
.first()
# saves book cover from url
def save_cover_from_url(url, book_path):
2020-05-27 20:41:16 +02:00
try:
if cli_param.allow_localhost:
img = requests.get(url, timeout=(10, 200), allow_redirects=False) # ToDo: Error Handling
elif use_advocate:
img = advocate.get(url, timeout=(10, 200), allow_redirects=False) # ToDo: Error Handling
else:
log.error("python module advocate is not installed but is needed")
return False, _("Python module 'advocate' is not installed but is needed for cover uploads")
2020-05-27 20:41:16 +02:00
img.raise_for_status()
2020-05-29 06:59:40 +02:00
return save_cover(img, book_path)
except (socket.gaierror,
requests.exceptions.HTTPError,
2022-03-15 18:44:36 +01:00
requests.exceptions.InvalidURL,
2020-05-27 20:41:16 +02:00
requests.exceptions.ConnectionError,
requests.exceptions.Timeout) as ex:
2022-03-15 18:44:36 +01:00
# "Invalid host" can be the result of a redirect response
log.error(u'Cover Download Error %s', ex)
2020-05-27 20:41:16 +02:00
return False, _("Error Downloading Cover")
except MissingDelegateError as ex:
log.info(u'File Format Error %s', ex)
return False, _("Cover Format Error")
2022-03-15 18:44:36 +01:00
except UnacceptableAddressException as e:
log.error("Localhost or local network was accessed for cover upload")
return False, _("You are not allowed to access localhost or the local network for cover uploads")
2020-05-29 06:59:40 +02:00
def save_cover_from_filestorage(filepath, saved_filename, img):
# check if file path exists, otherwise create it, copy file to calibre path and delete temp file
if not os.path.exists(filepath):
try:
os.makedirs(filepath)
except OSError:
log.error(u"Failed to create path for cover")
return False, _(u"Failed to create path for cover")
try:
# upload of jgp file without wand
if isinstance(img, requests.Response):
with open(os.path.join(filepath, saved_filename), 'wb') as f:
f.write(img.content)
else:
if hasattr(img, "metadata"):
# upload of jpg/png... via url
img.save(filename=os.path.join(filepath, saved_filename))
img.close()
else:
# upload of jpg/png... from hdd
img.save(os.path.join(filepath, saved_filename))
except (IOError, OSError):
log.error(u"Cover-file is not a valid image file, or could not be stored")
return False, _(u"Cover-file is not a valid image file, or could not be stored")
return True, None
# saves book cover to gdrive or locally
def save_cover(img, book_path):
2019-03-26 07:31:00 +01:00
content_type = img.headers.get('content-type')
2020-12-09 14:18:39 +01:00
if use_IM:
if content_type not in ('image/jpeg', 'image/jpg', 'image/png', 'image/webp', 'image/bmp'):
2020-12-09 14:18:39 +01:00
log.error("Only jpg/jpeg/png/webp/bmp files are supported as coverfile")
return False, _("Only jpg/jpeg/png/webp/bmp files are supported as coverfile")
2019-04-17 20:45:08 +02:00
# convert to jpg because calibre only supports jpg
try:
if hasattr(img, 'stream'):
imgc = Image(blob=img.stream)
else:
imgc = Image(blob=io.BytesIO(img.content))
imgc.format = 'jpeg'
imgc.transform_colorspace("rgb")
img = imgc
except (BlobError, MissingDelegateError):
log.error("Invalid cover file content")
return False, _("Invalid cover file content")
else:
if content_type not in ['image/jpeg', 'image/jpg']:
log.error("Only jpg/jpeg files are supported as coverfile")
return False, _("Only jpg/jpeg files are supported as coverfile")
2019-03-26 07:31:00 +01:00
if config.config_use_google_drive:
tmp_dir = os.path.join(gettempdir(), 'calibre_web')
if not os.path.isdir(tmp_dir):
os.mkdir(tmp_dir)
ret, message = save_cover_from_filestorage(tmp_dir, "uploaded_cover.jpg", img)
if ret is True:
gd.uploadFileToEbooksFolder(os.path.join(book_path, 'cover.jpg').replace("\\", "/"),
os.path.join(tmp_dir, "uploaded_cover.jpg"))
log.info("Cover is saved on Google Drive")
return True, None
else:
return False, message
else:
return save_cover_from_filestorage(os.path.join(config.config_calibre_dir, book_path), "cover.jpg", img)
def do_download_file(book, book_format, client, data, headers):
if config.config_use_google_drive:
# startTime = time.time()
df = gd.getFileFromEbooksFolder(book.path, data.name + "." + book_format)
# log.debug('%s', time.time() - startTime)
if df:
return gd.do_gdrive_download(df, headers)
else:
abort(404)
else:
filename = os.path.join(config.config_calibre_dir, book.path)
if not os.path.isfile(os.path.join(filename, data.name + "." + book_format)):
# ToDo: improve error handling
log.error('File not found: %s', os.path.join(filename, data.name + "." + book_format))
2020-05-09 10:58:59 +02:00
if client == "kobo" and book_format == "kepub":
headers["Content-Disposition"] = headers["Content-Disposition"].replace(".kepub", ".kepub.epub")
response = make_response(send_from_directory(filename, data.name + "." + book_format))
# ToDo Check headers parameter
for element in headers:
response.headers[element[0]] = element[1]
log.info('Downloading file: {}'.format(os.path.join(filename, data.name + "." + book_format)))
return response
##################################
def check_unrar(unrar_location):
if not unrar_location:
return
if not os.path.exists(unrar_location):
2020-04-28 20:23:39 +02:00
return _('Unrar binary file not found')
try:
unrar_location = [unrar_location]
value = process_wait(unrar_location, pattern='UNRAR (.*) freeware')
if value:
version = value.group(1)
log.debug("unrar version %s", version)
except (OSError, UnicodeDecodeError) as err:
log.error_or_exception(err)
2022-07-01 15:26:06 +02:00
return _('Error executing UnRar')
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
2020-04-19 19:08:58 +02:00
if isinstance(obj, datetime):
return obj.isoformat()
2020-04-19 19:08:58 +02:00
if isinstance(obj, timedelta):
return {
'__type__': 'timedelta',
'days': obj.days,
'seconds': obj.seconds,
'microseconds': obj.microseconds,
}
2020-04-19 19:08:58 +02:00
raise TypeError("Type %s not serializable" % type(obj))
def tags_filters():
2020-02-15 10:21:45 +01:00
negtags_list = current_user.list_denied_tags()
postags_list = current_user.list_allowed_tags()
neg_content_tags_filter = false() if negtags_list == [''] else db.Tags.name.in_(negtags_list)
pos_content_tags_filter = true() if postags_list == [''] else db.Tags.name.in_(postags_list)
return and_(pos_content_tags_filter, ~neg_content_tags_filter)
2020-04-19 19:08:58 +02:00
# checks if domain is in database (including wildcards)
# example SELECT * FROM @TABLE WHERE 'abcdefg' LIKE Name;
# from https://code.luasoftware.com/tutorials/flask/execute-raw-sql-in-flask-sqlalchemy/
# in all calls the email address is checked for validity
def check_valid_domain(domain_text):
sql = "SELECT * FROM registration WHERE (:domain LIKE domain and allow = 1);"
result = ub.session.query(ub.Registration).from_statement(text(sql)).params(domain=domain_text).all()
if not len(result):
return False
sql = "SELECT * FROM registration WHERE (:domain LIKE domain and allow = 0);"
result = ub.session.query(ub.Registration).from_statement(text(sql)).params(domain=domain_text).all()
return not len(result)
def get_download_link(book_id, book_format, client):
book_format = book_format.split(".")[0]
2021-12-04 11:16:33 +01:00
book = calibre_db.get_filtered_book(book_id, allow_show_archived=True)
data1= ""
if book:
data1 = calibre_db.get_book_format(book.id, book_format.upper())
else:
log.error("Book id {} not found for downloading".format(book_id))
abort(404)
if data1:
# collect downloaded books only for registered user and not for anonymous user
if current_user.is_authenticated:
ub.update_download(book_id, int(current_user.id))
file_name = book.title
if len(book.authors) > 0:
file_name = file_name + ' - ' + book.authors[0].name
file_name = get_valid_filename(file_name, replace_whitespace=False)
headers = Headers()
headers["Content-Type"] = mimetypes.types_map.get('.' + book_format, "application/octet-stream")
headers["Content-Disposition"] = "attachment; filename=%s.%s; filename*=UTF-8''%s.%s" % (
quote(file_name.encode('utf-8')), book_format, quote(file_name.encode('utf-8')), book_format)
return do_download_file(book, book_format, client, data1, headers)
else:
abort(404)
def clear_cover_thumbnail_cache(book_id):
2022-04-28 20:57:09 +02:00
if config.schedule_generate_book_covers:
WorkerThread.add(None, TaskClearCoverThumbnailCache(book_id), hidden=True)
def replace_cover_thumbnail_cache(book_id):
2022-04-28 20:57:09 +02:00
if config.schedule_generate_book_covers:
WorkerThread.add(None, TaskClearCoverThumbnailCache(book_id), hidden=True)
WorkerThread.add(None, TaskGenerateCoverThumbnails(book_id), hidden=True)
def delete_thumbnail_cache():
WorkerThread.add(None, TaskClearCoverThumbnailCache(-1))
def add_book_to_thumbnail_cache(book_id):
2022-04-28 20:57:09 +02:00
if config.schedule_generate_book_covers:
WorkerThread.add(None, TaskGenerateCoverThumbnails(book_id), hidden=True)
def update_thumbnail_cache():
2022-04-28 20:57:09 +02:00
if config.schedule_generate_book_covers:
WorkerThread.add(None, TaskGenerateCoverThumbnails())
2022-09-19 18:56:22 +02:00
def set_all_metadata_dirty():
WorkerThread.add(None, TaskBackupMetadata(export_language=get_locale(),
2022-09-19 22:39:40 +02:00
translated_title=_("Cover"),
set_dirty=True,
task_message=N_("Queue all books for metadata backup")),
2022-09-19 18:56:22 +02:00
hidden=False)