1
0
Fork 0
mirror of https://github.com/HenkKalkwater/harbour-sailfin.git synced 2025-09-04 01:42:44 +00:00

Convert Loader-related thread-code to async code

Closes #10
This commit is contained in:
Chris Josten 2021-08-22 02:33:04 +02:00
parent f028e38b7a
commit 4bbc86d31c
6 changed files with 205 additions and 334 deletions

View file

@ -108,9 +108,6 @@ signals:
*/
void itemsLoaded();
void reloadWanted();
public slots:
virtual void futureReady() = 0;
protected:
// Is this object being parsed by the QML engine
bool m_isBeingParsed = false;
@ -269,13 +266,16 @@ class LoaderModelLoader : public ModelLoader<T> {
public:
explicit LoaderModelLoader(Support::Loader<R, P> *loader, QObject *parent = nullptr)
: ModelLoader<T>(parent), m_loader(QScopedPointer<Support::Loader<R, P>>(loader)) {
QObject::connect(&m_futureWatcher, &QFutureWatcher<QList<T>>::finished, this, &BaseModelLoader::futureReady);
this->connect(m_loader.data(), &Support::Loader<R, P>::ready, this, &LoaderModelLoader<T, D, R, P>::loaderReady);
this->connect(m_loader.data(), &Support::Loader<R, P>::error, this, &LoaderModelLoader<T, D, R, P>::loaderError);
}
protected:
void loadMore(int offset, int limit, ViewModel::ModelStatus suggestedModelStatus) override {
// This method should only be callable on one thread.
// If futureWatcher's future is running, this method should not be called again.
if (m_futureWatcher.isRunning()) return;
if (m_loader->isRunning()) {
m_loader->cancel();
}
// Set an invalid result.
this->m_result = { QList<T*>(), -1 };
@ -286,7 +286,7 @@ protected:
return;
}
if (limit > 0) {
setRequestLimit<P>(this->m_parameters, limit);
setRequestLimit<P>(this->m_parameters, limit);
}
this->setStatus(suggestedModelStatus);
@ -294,33 +294,14 @@ protected:
// instead when Loader::setApiClient is called.
this->m_loader->setApiClient(this->m_apiClient);
this->m_loader->setParameters(this->m_parameters);
this->m_loader->prepareLoad();
QFuture<std::optional<R>> future = QtConcurrent::run(this->m_loader.data(), &Support::Loader<R, P>::load);
this->m_futureWatcher.setFuture(future);
this->m_loader->load();
}
QScopedPointer<Support::Loader<R, P>> m_loader;
QMutex m_mutex;
P m_parameters;
QFutureWatcher<std::optional<R>> m_futureWatcher;
void futureReady() override {
R result;
try {
std::optional<R> optResult = m_futureWatcher.result();
if (!optResult.has_value()) {
this->setStatus(ViewModel::ModelStatus::Error);
qWarning() << "ModelLoader returned with empty optional";
return;
}
result = optResult.value();
} catch (Support::LoadException &e) {
qWarning() << "Exception while loading: " << e.what();
this->setStatus(ViewModel::ModelStatus::Error);
return;
}
void loaderReady() {
R result = m_loader->result();
QList<D> records = extractRecords<D, R>(result);
int totalRecordCount = extractTotalRecordCount<R>(result);
qDebug() << "Total record count: " << totalRecordCount << ", records in request: " << records.size();
@ -341,6 +322,11 @@ protected:
this->emitItemsLoaded();
}
void loaderError(QString error) {
Q_UNUSED(error)
this->setStatus(ViewModel::ModelStatus::Error);
}
};
class BaseApiModel : public QAbstractListModel {
@ -374,27 +360,7 @@ signals:
};
/**
* @brief Abstract model for displaying a REST JSON collection. Role names will be based on the fields encountered in the
* first record.
*
* To create a new model, extend this class and create an QObject-parent constructor.
* Call the right super constructor with the right values, depending which path should be queried and
* how the result should be interpreted.
*
* Register the model in QML and create an instance. Don't forget to set the apiClient attribute or else
* the model you've created will be useless!
*
* Rolenames are based on the fields in the first object within the array of results, with the first letter
* lowercased, to accomodate for QML style guidelines. (This ain't C# here).
*
* If a call to /cats/new results in
* @code{.json}
* [
* {"Name": "meow", "Id": 432},
* {"Name": "miew", "Id": 323}
* ]
* @endcode
* The model will have roleNames for "name" and "id".
* @brief Abstract model for displaying collections.
*
* @tparam T The class of the result.
* @tparam R The class returned by the loader.
@ -567,24 +533,5 @@ private:
QMetaObject::Connection m_futureWatcherConnection;
};
/**
* @brief List of the public users on the server.
*/
/*class PublicUserModel : public ApiModel<QJsonValue> {
public:
explicit PublicUserModel (QObject *parent = nullptr);
};*/
//template<>
//void ApiModel<Item>::apiClientChanged();
void registerModels(const char *URI);
}
#endif //JELLYFIN_API_MODEL

View file

@ -26,9 +26,12 @@
#include <QException>
#include <QJsonDocument>
#include <QObject>
#include <QUrlQuery>
#include <QString>
#include <QtConcurrent/QtConcurrent>
#include "../apiclient.h"
#include "jsonconv.h"
@ -54,6 +57,23 @@ private:
static const int HTTP_TIMEOUT = 30000; // 30 seconds;
/**
* @brief Base class for loaders that defines available signals.
*/
class LoaderBase : public QObject {
Q_OBJECT
signals:
/**
* @brief Emitted when an error has occurred during loading and no result
* is available.
*/
void error(QString message = QString());
/**
* @brief Emitted when data was successfully loaded.
*/
void ready();
};
/**
* Interface describing a way to load items. Used to abstract away
* the difference between loading from a cache or loading over the network.
@ -71,24 +91,35 @@ static const int HTTP_TIMEOUT = 30000; // 30 seconds;
* be loaded.
*/
template <typename R, typename P>
class Loader {
class Loader : public LoaderBase {
public:
explicit Loader(ApiClient *apiClient)
: m_apiClient(apiClient) {}
/**
* @brief Called just before load() is called. In constrast to load,
* this runs on the same thread as the ApiClient object.
* @brief load Loads the given resource asynchronously.
*/
virtual void prepareLoad() {};
/**
* @brief load Loads the given resource. This usually run on a different thread.
* @return The resource if successfull.
*/
virtual std::optional<R> load() {
virtual void load() {
throw LoadException(QStringLiteral("Loader not set"));
}
/**
* @brief Retrieves the loaded resource. Only valid after the ready signal has been emitted.
* @return The loaded resource
*/
R result() const {
return m_result;
}
/**
* @returns whether this loader is already fetching a resource
*/
virtual void cancel() {}
bool isRunning() const {
return m_isRunning;
}
/**
* @brief Heuristic to determine if this resource can be loaded via this loaded.
*
@ -103,6 +134,13 @@ public:
protected:
Jellyfin::ApiClient *m_apiClient;
P m_parameters;
R m_result;
bool m_isRunning = false;
void stopWithError(QString message = QString()) {
m_isRunning = false;
emit this->error(message);
}
};
/**
@ -112,41 +150,24 @@ template <typename R, typename P>
class HttpLoader : public Loader<R, P> {
public:
explicit HttpLoader(Jellyfin::ApiClient *apiClient)
: Loader<R, P> (apiClient) {}
virtual void prepareLoad() override {
m_reply = this->m_apiClient->get(path(this->m_parameters), query(this->m_parameters));
m_requestFinishedConnection = QObject::connect(m_reply, &QNetworkReply::finished, [&]() { this->requestFinished(); });
: Loader<R, P> (apiClient) {
this->connect(&m_parsedWatcher, &QFutureWatcher<std::optional<R>>::finished, this, &HttpLoader<R, P>::onResponseParsed);
}
virtual std::optional<R> load() override {
Q_ASSERT_X(m_reply != nullptr, "HttpLoader::load", "prepareLoad() must be called before load()");
QMutexLocker locker(&m_mutex);
while (!m_reply->isFinished()) {
m_waitCondition.wait(&m_mutex);
virtual void load() override {
if (m_reply != nullptr) {
this->m_reply->deleteLater();
}
QByteArray array = m_reply->readAll();
if (m_reply->error() != QNetworkReply::NoError) {
this->m_isRunning = true;
m_reply = this->m_apiClient->get(path(this->m_parameters), query(this->m_parameters));
this->connect(m_reply, &QNetworkReply::finished, this, &HttpLoader<R, P>::onRequestFinished);
}
virtual void cancel() override {
if (m_reply == nullptr) return;
if (m_reply->isRunning()) {
m_reply->abort();
m_reply->deleteLater();
//: An HTTP has occurred. First argument is replaced by QNetworkReply->errorString()
throw LoadException(QObject::tr("HTTP error: %1").arg(m_reply->errorString()));
}
m_reply->deleteLater();
m_reply = nullptr;
QJsonParseError error;
QJsonDocument document = QJsonDocument::fromJson(array, &error);
if (error.error != QJsonParseError::NoError) {
qWarning() << array;
throw LoadException(error.errorString().toLocal8Bit().constData());
}
if (document.isNull() || document.isEmpty()) {
return std::nullopt;
} else if (document.isArray()) {
return std::optional<R>(fromJsonValue<R>(document.array()));
} else if (document.isObject()){
return std::optional<R>(fromJsonValue<R>(document.object()));
} else {
return std::nullopt;
}
}
@ -167,13 +188,49 @@ protected:
virtual QUrlQuery query(const P &parameters) const = 0;
private:
QNetworkReply *m_reply = nullptr;
QWaitCondition m_waitCondition;
QMutex m_mutex;
QMetaObject::Connection m_requestFinishedConnection;
QFutureWatcher<std::optional<R>> m_parsedWatcher;
void requestFinished() {
QObject::disconnect(m_requestFinishedConnection);
m_waitCondition.wakeAll();
void onRequestFinished() {
if (m_reply->error() != QNetworkReply::NoError) {
m_reply->deleteLater();
//: An HTTP has occurred. First argument is replaced by QNetworkReply->errorString()
this->stopWithError(QStringLiteral("HTTP error: %1").arg(m_reply->errorString()));
}
QByteArray array = m_reply->readAll();
m_reply->deleteLater();
m_reply = nullptr;
m_parsedWatcher.setFuture(QtConcurrent::run(this, &HttpLoader<R, P>::parseResponse, array));
}
std::optional<R> parseResponse(QByteArray response) {
QJsonParseError error;
QJsonDocument document = QJsonDocument::fromJson(response, &error);
if (error.error != QJsonParseError::NoError) {
qWarning() << response;
this->stopWithError(error.errorString().toLocal8Bit().constData());
}
if (document.isNull() || document.isEmpty()) {
this->stopWithError(QStringLiteral("Unexpected empty JSON response"));
return std::nullopt;
} else if (document.isArray()) {
return std::make_optional<R>(fromJsonValue<R>(document.array()));
} else if (document.isObject()){
return std::make_optional<R>(fromJsonValue<R>(document.object()));
} else {
this->stopWithError(QStringLiteral("Unexpected JSON response"));
return std::nullopt;
}
}
void onResponseParsed() {
if (m_parsedWatcher.result().has_value()) {
R result = m_parsedWatcher.result().value();
this->m_result = result;
this->m_isRunning = false;
emit this->ready();
} else {
this->m_isRunning = false;
}
}
};

View file

@ -153,24 +153,24 @@ public:
Loader(ApiClient *apiClient, Support::Loader<R, P> *loaderImpl, QObject *parent = nullptr)
: LoaderBase(apiClient, parent),
m_loader(loaderImpl),
m_futureWatcher(new QFutureWatcher<std::optional<R>>(this)) {
m_loader(loaderImpl) {
m_dataViewModel = new T(this);
connect(m_futureWatcher, &RFutureWatcher::finished, this, &Loader<T, R, P>::updateData);
connect(m_loader.data(), &Support::LoaderBase::ready, this, &Loader<T, R, P>::onLoaderReady);
connect(m_loader.data(), &Support::LoaderBase::error, this, &Loader<T, R, P>::onLoaderError);
}
T *dataViewModel() const { return m_dataViewModel; }
QObject *data() const override { return m_dataViewModel; }
void reload() override {
if (m_futureWatcher->isRunning()) return;
if (m_loader->isRunning()) {
m_loader->cancel();
};
setStatus(Loading);
this->m_loader->setApiClient(m_apiClient);
m_loader->setApiClient(m_apiClient);
m_loader->setParameters(m_parameters);
m_loader->prepareLoad();
QFuture<std::optional<R>> future = QtConcurrent::run(this, &Loader<T, R, P>::invokeLoader);
m_futureWatcher->setFuture(future);
m_loader->load();
}
protected:
T* m_dataViewModel;
@ -180,46 +180,28 @@ protected:
*/
QScopedPointer<Support::Loader<R, P>> m_loader = nullptr;
private:
QFutureWatcher<std::optional<R>> *m_futureWatcher;
/**
* @brief Callback for QtConcurrent::run()
* @param self Pointer to this class
* @param parameters Parameters to forward to the loader
* @return empty optional if an error occured, otherwise the result.
*/
std::optional<R> invokeLoader() {
QMutexLocker(&this->m_mutex);
try {
return this->m_loader->load();
} catch (Support::LoadException &e) {
qWarning() << "Exception while loading an item: " << e.what();
this->setErrorString(QString(e.what()));
return std::nullopt;
}
}
/**
* @brief Updates the data when finished.
*/
void updateData() {
std::optional<R> newDataOpt = m_futureWatcher->result();
if (newDataOpt.has_value()) {
R newData = newDataOpt.value();
if (m_dataViewModel->data()->sameAs(newData)) {
// Replace the data the model holds
m_dataViewModel->data()->replaceData(newData);
} else {
// Replace the model
using PointerType = typename decltype(m_dataViewModel->data())::Type;
m_dataViewModel = new T(this, QSharedPointer<PointerType>::create(newData, m_apiClient));
}
setStatus(Ready);
emitDataChanged();
void onLoaderReady() {
R newData = m_loader->result();
if (m_dataViewModel->data()->sameAs(newData)) {
// Replace the data the model holds
m_dataViewModel->data()->replaceData(newData);
} else {
setStatus(Error);
// Replace the model
using PointerType = typename decltype(m_dataViewModel->data())::Type;
m_dataViewModel = new T(this, QSharedPointer<PointerType>::create(newData, m_apiClient));
}
setStatus(Ready);
emitDataChanged();
}
void onLoaderError(QString message) {
setStatus(Error);
setErrorString(message);
}
QMutex m_mutex;
};
void registerRemoteTypes(const char *uri);

View file

@ -70,7 +70,7 @@ class PlaybackManager : public QObject, public QQmlParserStatus {
Q_OBJECT
Q_INTERFACES(QQmlParserStatus)
public:
using FetchCallback = std::function<void(QUrl &&, PlayMethod)>;
using ItemUrlLoader = Support::Loader<DTO::PlaybackInfoResponse, Jellyfin::Loader::GetPostedPlaybackInfoParams>;
explicit PlaybackManager(QObject *parent = nullptr);
@ -179,13 +179,12 @@ private slots:
*/
void updatePlaybackInfo();
/// Called when the fetcherThread has fetched the playback URL and playSession
void onItemExtraDataReceived(const QString &itemId, const QUrl &url, const QString &playSession,
/// Called when we have fetched the playback URL and playSession
void onItemUrlReceived(const QString &itemId, const QUrl &url, const QString &playSession,
// Fully specify class to please MOC
Jellyfin::DTO::PlayMethodClass::Value playMethod);
/// Called when the fetcherThread encountered an error
/// Called when we have encountered an error
void onItemErrorReceived(const QString &itemId, const QString &errorString);
void onDestroyed();
private:
/// Factor to multiply with when converting from milliseconds to ticks.
@ -228,9 +227,6 @@ private:
*/
bool m_autoOpen = false;
// Playback-related members
ItemUrlFetcherThread *m_urlFetcherThread;
QMediaPlayer::State m_oldState = QMediaPlayer::StoppedState;
PlayMethod m_playMethod = PlayMethod::Transcode;
QMediaPlayer::State m_playbackState = QMediaPlayer::StoppedState;
@ -252,6 +248,9 @@ private:
*/
void postPlaybackInfo(PlaybackInfoType type);
void requestItemUrl(QSharedPointer<Model::Item> item);
void handlePlaybackInfoResponse(QString itemId, QString mediaType, DTO::PlaybackInfoResponse &response);
// QQmlParserListener interface
void classBegin() override { m_qmlIsParsingComponent = true; }
@ -262,57 +261,6 @@ private:
const qint64 PRELOAD_DURATION = 15 * 1000;
};
/// Thread that fetches the Item's stream URL always in the given order they were requested
class ItemUrlFetcherThread : public QThread {
Q_OBJECT
public:
ItemUrlFetcherThread(PlaybackManager *manager);
/**
* @brief Adds an item to the queue of items that should be requested
* @param item The item to fetch the URL of
*/
void addItemToQueue(QSharedPointer<Model::Item> item);
signals:
/**
* @brief Emitted when the url of the item with the itemId has been retrieved.
* @param itemId The id of the item of which the URL has been retrieved
* @param itemUrl The retrieved url
* @param playSession The playsession set by the Jellyfin Server
*/
void itemUrlFetched(QString itemId, QUrl itemUrl, QString playSession, Jellyfin::DTO::PlayMethodClass::Value playMethod);
void itemUrlFetchError(QString itemId, QString errorString);
void prepareLoaderRequested(QPrivateSignal);
public slots:
/**
* @brief Ask the thread nicely to stop running.
*/
void cleanlyStop();
private slots:
void onPrepareLoader();
protected:
void run() override;
private:
PlaybackManager *m_parent;
Support::Loader<DTO::PlaybackInfoResponse, Jellyfin::Loader::GetPostedPlaybackInfoParams> *m_loader;
QMutex m_queueModifyMutex;
QQueue<QSharedPointer<Model::Item>> m_queue;
QMutex m_urlWaitConditionMutex;
/// WaitCondition on which this threads waits until an Item is put into the queue
QWaitCondition m_urlWaitCondition;
QMutex m_waitLoaderPreparedMutex;
/// WaitCondition on which this threads waits until the loader has been prepared.
QWaitCondition m_waitLoaderPrepared;
bool m_keepRunning = true;
bool m_loaderPrepared = false;
};
} // NS ViewModel
} // NS Jellyfin