Add a thread pool based async file implementation.
This commit is contained in:
parent
a2691ff0af
commit
2a8c52f347
|
@ -31,6 +31,6 @@ DNS | no | no | no | no
|
||||||
Timers | yes | yes | no | no
|
Timers | yes | yes | no | no
|
||||||
Events | yes | yes | no | no
|
Events | yes | yes | no | no
|
||||||
Signals | no | no | no | no
|
Signals | no | no | no | no
|
||||||
Files | no | no | no | no
|
Files | yes | yes | no | no
|
||||||
UI Integration | no | no | no | no
|
UI Integration | no | no | no | no
|
||||||
File watcher | no | no | no | no
|
File watcher | no | no | no | no
|
||||||
|
|
|
@ -104,8 +104,12 @@ interface EventDriverFiles {
|
||||||
@safe: /*@nogc:*/ nothrow:
|
@safe: /*@nogc:*/ nothrow:
|
||||||
FileFD open(string path, FileOpenMode mode);
|
FileFD open(string path, FileOpenMode mode);
|
||||||
FileFD createTemp();
|
FileFD createTemp();
|
||||||
void write(FileFD file, ulong offset, ubyte[] buffer, IOCallback on_write_finish);
|
void close(FileFD file);
|
||||||
void read(FileFD file, ulong offset, ubyte[] buffer, IOCallback on_read_finish);
|
|
||||||
|
ulong getSize(FileFD file);
|
||||||
|
|
||||||
|
void write(FileFD file, ulong offset, const(ubyte)[] buffer, FileIOCallback on_write_finish);
|
||||||
|
void read(FileFD file, ulong offset, ubyte[] buffer, FileIOCallback on_read_finish);
|
||||||
void cancelWrite(FileFD file);
|
void cancelWrite(FileFD file);
|
||||||
void cancelRead(FileFD file);
|
void cancelRead(FileFD file);
|
||||||
|
|
||||||
|
@ -191,6 +195,7 @@ interface EventDriverWatchers {
|
||||||
alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus);
|
alias ConnectCallback = void delegate(StreamSocketFD, ConnectStatus);
|
||||||
alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD);
|
alias AcceptCallback = void delegate(StreamListenSocketFD, StreamSocketFD);
|
||||||
alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
|
alias IOCallback = void delegate(StreamSocketFD, IOStatus, size_t);
|
||||||
|
alias FileIOCallback = void delegate(FileFD, IOStatus, size_t);
|
||||||
alias EventCallback = void delegate(EventID);
|
alias EventCallback = void delegate(EventID);
|
||||||
alias SignalCallback = void delegate(int);
|
alias SignalCallback = void delegate(int);
|
||||||
alias TimerCallback = void delegate(TimerID);
|
alias TimerCallback = void delegate(TimerID);
|
||||||
|
|
|
@ -31,7 +31,6 @@ final class EpollEventDriver : PosixEventDriver {
|
||||||
|
|
||||||
nothrow @safe {
|
nothrow @safe {
|
||||||
override @property EpollEventDriver core() { return this; }
|
override @property EpollEventDriver core() { return this; }
|
||||||
override @property EpollEventDriver files() { return this; }
|
|
||||||
override @property EpollEventDriver sockets() { return this; }
|
override @property EpollEventDriver sockets() { return this; }
|
||||||
override @property EpollEventDriver timers() { return this; }
|
override @property EpollEventDriver timers() { return this; }
|
||||||
override @property EpollEventDriver events() { return this; }
|
override @property EpollEventDriver events() { return this; }
|
||||||
|
@ -67,6 +66,7 @@ final class EpollEventDriver : PosixEventDriver {
|
||||||
override void dispose()
|
override void dispose()
|
||||||
{
|
{
|
||||||
import core.sys.posix.unistd : close;
|
import core.sys.posix.unistd : close;
|
||||||
|
super.dispose();
|
||||||
close(m_epoll);
|
close(m_epoll);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ module eventcore.drivers.posix;
|
||||||
|
|
||||||
public import eventcore.driver;
|
public import eventcore.driver;
|
||||||
import eventcore.drivers.timer;
|
import eventcore.drivers.timer;
|
||||||
|
import eventcore.drivers.threadedfile;
|
||||||
import eventcore.internal.consumablequeue : ConsumableQueue;
|
import eventcore.internal.consumablequeue : ConsumableQueue;
|
||||||
import eventcore.internal.utils;
|
import eventcore.internal.utils;
|
||||||
|
|
||||||
|
@ -37,7 +38,7 @@ private long currStdTime()
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class PosixEventDriver : EventDriver,
|
abstract class PosixEventDriver : EventDriver,
|
||||||
EventDriverCore, EventDriverFiles, EventDriverSockets, EventDriverTimers,
|
EventDriverCore, EventDriverSockets, EventDriverTimers,
|
||||||
EventDriverEvents, EventDriverSignals, EventDriverWatchers
|
EventDriverEvents, EventDriverSignals, EventDriverWatchers
|
||||||
{
|
{
|
||||||
@safe: /*@nogc:*/ nothrow:
|
@safe: /*@nogc:*/ nothrow:
|
||||||
|
@ -47,6 +48,7 @@ abstract class PosixEventDriver : EventDriver,
|
||||||
size_t m_waiterCount = 0;
|
size_t m_waiterCount = 0;
|
||||||
bool m_exit = false;
|
bool m_exit = false;
|
||||||
FD m_wakeupEvent;
|
FD m_wakeupEvent;
|
||||||
|
ThreadedFileEventDriver!PosixEventDriver m_files;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected this()
|
protected this()
|
||||||
|
@ -55,11 +57,12 @@ abstract class PosixEventDriver : EventDriver,
|
||||||
initFD(m_wakeupEvent);
|
initFD(m_wakeupEvent);
|
||||||
registerFD(m_wakeupEvent, EventMask.read);
|
registerFD(m_wakeupEvent, EventMask.read);
|
||||||
//startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD
|
//startNotify!(EventType.read)(m_wakeupEvent, null); // should already be caught by registerFD
|
||||||
|
m_files = new ThreadedFileEventDriver!PosixEventDriver(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
// force overriding these in the (final) sub classes to avoid virtual calls
|
// force overriding these in the (final) sub classes to avoid virtual calls
|
||||||
abstract override @property PosixEventDriver core();
|
abstract override @property PosixEventDriver core();
|
||||||
abstract override @property PosixEventDriver files();
|
final override @property ThreadedFileEventDriver!PosixEventDriver files() { return m_files; }
|
||||||
abstract override @property PosixEventDriver sockets();
|
abstract override @property PosixEventDriver sockets();
|
||||||
abstract override @property PosixEventDriver timers();
|
abstract override @property PosixEventDriver timers();
|
||||||
abstract override @property PosixEventDriver events();
|
abstract override @property PosixEventDriver events();
|
||||||
|
@ -124,7 +127,11 @@ abstract class PosixEventDriver : EventDriver,
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract bool doProcessEvents(Duration dur);
|
protected abstract bool doProcessEvents(Duration dur);
|
||||||
abstract void dispose();
|
|
||||||
|
abstract void dispose()
|
||||||
|
{
|
||||||
|
m_files.dispose();
|
||||||
|
}
|
||||||
|
|
||||||
final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect)
|
final override StreamSocketFD connectStream(scope Address address, ConnectCallback on_connect)
|
||||||
{
|
{
|
||||||
|
@ -512,47 +519,6 @@ abstract class PosixEventDriver : EventDriver,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final override FileFD open(string path, FileOpenMode mode)
|
|
||||||
{
|
|
||||||
assert(false, "TODO!");
|
|
||||||
}
|
|
||||||
|
|
||||||
final override FileFD createTemp()
|
|
||||||
{
|
|
||||||
assert(false, "TODO!");
|
|
||||||
}
|
|
||||||
|
|
||||||
final override void write(FileFD file, ulong offset, ubyte[] buffer, IOCallback on_write_finish)
|
|
||||||
{
|
|
||||||
assert(false, "TODO!");
|
|
||||||
}
|
|
||||||
|
|
||||||
final override void read(FileFD file, ulong offset, ubyte[] buffer, IOCallback on_read_finish)
|
|
||||||
{
|
|
||||||
assert(false, "TODO!");
|
|
||||||
}
|
|
||||||
|
|
||||||
final override void cancelWrite(FileFD file)
|
|
||||||
{
|
|
||||||
assert(false, "TODO!");
|
|
||||||
}
|
|
||||||
|
|
||||||
final override void cancelRead(FileFD file)
|
|
||||||
{
|
|
||||||
assert(false, "TODO!");
|
|
||||||
}
|
|
||||||
|
|
||||||
final override void addRef(FileFD descriptor)
|
|
||||||
{
|
|
||||||
assert(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
final override void releaseRef(FileFD descriptor)
|
|
||||||
{
|
|
||||||
assert(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
final override EventID create()
|
final override EventID create()
|
||||||
{
|
{
|
||||||
auto id = cast(EventID)eventfd(0, EFD_NONBLOCK);
|
auto id = cast(EventID)eventfd(0, EFD_NONBLOCK);
|
||||||
|
|
|
@ -25,7 +25,6 @@ version (Windows) {
|
||||||
|
|
||||||
final class SelectEventDriver : PosixEventDriver {
|
final class SelectEventDriver : PosixEventDriver {
|
||||||
override @property SelectEventDriver core() { return this; }
|
override @property SelectEventDriver core() { return this; }
|
||||||
override @property SelectEventDriver files() { return this; }
|
|
||||||
override @property SelectEventDriver sockets() { return this; }
|
override @property SelectEventDriver sockets() { return this; }
|
||||||
override @property SelectEventDriver timers() { return this; }
|
override @property SelectEventDriver timers() { return this; }
|
||||||
override @property SelectEventDriver events() { return this; }
|
override @property SelectEventDriver events() { return this; }
|
||||||
|
@ -51,7 +50,7 @@ final class SelectEventDriver : PosixEventDriver {
|
||||||
enumerateFDs!(EventType.write)((fd) @trusted { FD_SET(fd, &writefds); });
|
enumerateFDs!(EventType.write)((fd) @trusted { FD_SET(fd, &writefds); });
|
||||||
enumerateFDs!(EventType.status)((fd) @trusted { FD_SET(fd, &statusfds); });
|
enumerateFDs!(EventType.status)((fd) @trusted { FD_SET(fd, &statusfds); });
|
||||||
|
|
||||||
//print("Wait for event...");
|
//print("Wait for event... %s", timeout);
|
||||||
//writefln("%.3f: select in", Clock.currAppTick.usecs * 1e-3);
|
//writefln("%.3f: select in", Clock.currAppTick.usecs * 1e-3);
|
||||||
auto ret = () @trusted { return select(this.maxFD+1, &readfds, &writefds, &statusfds, timeout == Duration.max ? null : &ts); } ();
|
auto ret = () @trusted { return select(this.maxFD+1, &readfds, &writefds, &statusfds, timeout == Duration.max ? null : &ts); } ();
|
||||||
//writefln("%.3f: select out", Clock.currAppTick.usecs * 1e-3);
|
//writefln("%.3f: select out", Clock.currAppTick.usecs * 1e-3);
|
||||||
|
@ -75,7 +74,7 @@ final class SelectEventDriver : PosixEventDriver {
|
||||||
|
|
||||||
override void dispose()
|
override void dispose()
|
||||||
{
|
{
|
||||||
|
super.dispose();
|
||||||
}
|
}
|
||||||
|
|
||||||
override void registerFD(FD fd, EventMask mask)
|
override void registerFD(FD fd, EventMask mask)
|
||||||
|
|
329
source/eventcore/drivers/threadedfile.d
Normal file
329
source/eventcore/drivers/threadedfile.d
Normal file
|
@ -0,0 +1,329 @@
|
||||||
|
module eventcore.drivers.threadedfile;
|
||||||
|
|
||||||
|
import eventcore.driver;
|
||||||
|
import eventcore.internal.utils;
|
||||||
|
import core.atomic;
|
||||||
|
import core.stdc.errno;
|
||||||
|
import std.algorithm.comparison : among, min;
|
||||||
|
|
||||||
|
version(Posix){
|
||||||
|
import core.sys.posix.fcntl;
|
||||||
|
import core.sys.posix.sys.stat;
|
||||||
|
import core.sys.posix.unistd;
|
||||||
|
}
|
||||||
|
version(Windows){
|
||||||
|
static if (__VERSION__ >= 2070)
|
||||||
|
import core.sys.windows.stat;
|
||||||
|
else
|
||||||
|
import std.c.windows.stat;
|
||||||
|
|
||||||
|
private {
|
||||||
|
// TODO: use CreateFile/HANDLE instead of the Posix API on Windows
|
||||||
|
|
||||||
|
extern(C) {
|
||||||
|
alias off_t = sizediff_t;
|
||||||
|
int open(in char* name, int mode, ...);
|
||||||
|
int chmod(in char* name, int mode);
|
||||||
|
int close(int fd);
|
||||||
|
int read(int fd, void *buffer, uint count);
|
||||||
|
int write(int fd, in void *buffer, uint count);
|
||||||
|
off_t lseek(int fd, off_t offset, int whence);
|
||||||
|
}
|
||||||
|
|
||||||
|
enum O_RDONLY = 0;
|
||||||
|
enum O_WRONLY = 1;
|
||||||
|
enum O_RDWR = 2;
|
||||||
|
enum O_APPEND = 8;
|
||||||
|
enum O_CREAT = 0x0100;
|
||||||
|
enum O_TRUNC = 0x0200;
|
||||||
|
enum O_BINARY = 0x8000;
|
||||||
|
|
||||||
|
enum _S_IREAD = 0x0100; /* read permission, owner */
|
||||||
|
enum _S_IWRITE = 0x0080; /* write permission, owner */
|
||||||
|
alias stat_t = struct_stat;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
enum O_BINARY = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private {
|
||||||
|
enum SEEK_SET = 0;
|
||||||
|
enum SEEK_CUR = 1;
|
||||||
|
enum SEEK_END = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
final class ThreadedFileEventDriver(Events : EventDriverEvents) : EventDriverFiles
|
||||||
|
{
|
||||||
|
import std.parallelism;
|
||||||
|
|
||||||
|
private {
|
||||||
|
enum ThreadedFileStatus {
|
||||||
|
idle, // -> processing
|
||||||
|
processing, // -> cancelling, finished
|
||||||
|
cancelling, // -> cancelled
|
||||||
|
cancelled, // -> idle
|
||||||
|
finished // -> idle
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct IOInfo {
|
||||||
|
FileIOCallback callback;
|
||||||
|
shared ThreadedFileStatus status;
|
||||||
|
shared size_t bytesWritten;
|
||||||
|
IOStatus ioStatus;
|
||||||
|
|
||||||
|
void flush(FileFD fd)
|
||||||
|
@safe nothrow {
|
||||||
|
if (() @trusted { return cas(&this.status, ThreadedFileStatus.finished, ThreadedFileStatus.idle); } ()) {
|
||||||
|
auto cb = this.callback;
|
||||||
|
this.callback = null;
|
||||||
|
log("fire callback");
|
||||||
|
cb(fd, IOStatus.ok, safeAtomicLoad(this.bytesWritten));
|
||||||
|
} else if (() @trusted { return cas(&this.status, ThreadedFileStatus.cancelled, ThreadedFileStatus.idle); } ()) {
|
||||||
|
this.callback = null;
|
||||||
|
log("ignore callback due to cancellation");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static struct FileInfo {
|
||||||
|
IOInfo read;
|
||||||
|
IOInfo write;
|
||||||
|
int refCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
TaskPool m_fileThreadPool;
|
||||||
|
ChoppedVector!FileInfo m_files;
|
||||||
|
SmallIntegerSet!FileFD m_activeReads;
|
||||||
|
SmallIntegerSet!FileFD m_activeWrites;
|
||||||
|
EventID m_readyEvent;
|
||||||
|
Events m_events;
|
||||||
|
}
|
||||||
|
|
||||||
|
@safe: nothrow:
|
||||||
|
|
||||||
|
this(Events events)
|
||||||
|
{
|
||||||
|
m_events = events;
|
||||||
|
m_readyEvent = events.create();
|
||||||
|
m_events.wait(m_readyEvent, &onReady);
|
||||||
|
}
|
||||||
|
|
||||||
|
void dispose()
|
||||||
|
{
|
||||||
|
if (m_fileThreadPool) {
|
||||||
|
log("finishing thread pool");
|
||||||
|
try m_fileThreadPool.finish();
|
||||||
|
catch (Exception e) {
|
||||||
|
//logError("Failed to shut down file I/O thread pool.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
log("finishing file events");
|
||||||
|
m_events.cancelWait(m_readyEvent, &onReady);
|
||||||
|
onReady(m_readyEvent);
|
||||||
|
m_events.releaseRef(m_readyEvent);
|
||||||
|
log("finished file events");
|
||||||
|
}
|
||||||
|
|
||||||
|
final override FileFD open(string path, FileOpenMode mode)
|
||||||
|
{
|
||||||
|
import std.string : toStringz;
|
||||||
|
|
||||||
|
import std.conv : octal;
|
||||||
|
int flags;
|
||||||
|
int amode;
|
||||||
|
final switch (mode) {
|
||||||
|
case FileOpenMode.read: flags = O_RDONLY|O_BINARY; break;
|
||||||
|
case FileOpenMode.readWrite: flags = O_RDWR|O_BINARY; break;
|
||||||
|
case FileOpenMode.createTrunc: flags = O_RDWR|O_CREAT|O_TRUNC|O_BINARY; amode = octal!644; break;
|
||||||
|
case FileOpenMode.append: flags = O_WRONLY|O_CREAT|O_APPEND|O_BINARY; amode = octal!644; break;
|
||||||
|
}
|
||||||
|
auto fd = () @trusted { return .open(path.toStringz(), flags, amode); } ();
|
||||||
|
if (fd < 0) return FileFD.init;
|
||||||
|
return FileFD(fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
final override FileFD createTemp()
|
||||||
|
{
|
||||||
|
assert(false, "TODO!");
|
||||||
|
}
|
||||||
|
|
||||||
|
void close(FileFD file)
|
||||||
|
{
|
||||||
|
.close(file);
|
||||||
|
}
|
||||||
|
|
||||||
|
ulong getSize(FileFD file)
|
||||||
|
{
|
||||||
|
version (linux) {
|
||||||
|
// stat_t seems to be defined wrong on linux/64
|
||||||
|
return .lseek(file, 0, SEEK_END);
|
||||||
|
} else {
|
||||||
|
stat_t st;
|
||||||
|
fstat(file, &st);
|
||||||
|
return st.st_size;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final override void write(FileFD file, ulong offset, const(ubyte)[] buffer, FileIOCallback on_write_finish)
|
||||||
|
{
|
||||||
|
//assert(this.writable);
|
||||||
|
auto f = &m_files[file].write;
|
||||||
|
assert(f.callback is null, "Concurrent file writes are not allowed.");
|
||||||
|
f.callback = on_write_finish;
|
||||||
|
m_activeWrites.insert(file);
|
||||||
|
log("start task");
|
||||||
|
try {
|
||||||
|
if (m_fileThreadPool is null) {
|
||||||
|
m_fileThreadPool = new TaskPool(4);
|
||||||
|
m_fileThreadPool.isDaemon = true;
|
||||||
|
}
|
||||||
|
m_fileThreadPool.put(task!(taskFun!("write", const(ubyte)))(this, file, offset, buffer));
|
||||||
|
} catch (Exception e) {
|
||||||
|
on_write_finish(file, IOStatus.error, 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final override void cancelWrite(FileFD file)
|
||||||
|
{
|
||||||
|
auto f = &m_files[file].write;
|
||||||
|
m_activeWrites.remove(file);
|
||||||
|
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } ();
|
||||||
|
assert(res, "Cancelling write when no write is in progress.");
|
||||||
|
}
|
||||||
|
|
||||||
|
final override void read(FileFD file, ulong offset, ubyte[] buffer, FileIOCallback on_read_finish)
|
||||||
|
{
|
||||||
|
auto f = &m_files[file].read;
|
||||||
|
assert(f.callback is null, "Concurrent file reads are not allowed.");
|
||||||
|
f.callback = on_read_finish;
|
||||||
|
m_activeReads.insert(file);
|
||||||
|
try {
|
||||||
|
if (m_fileThreadPool is null) {
|
||||||
|
m_fileThreadPool = new TaskPool(4);
|
||||||
|
m_fileThreadPool.isDaemon = true;
|
||||||
|
}
|
||||||
|
m_fileThreadPool.put(task!(taskFun!("read", ubyte))(this, file, offset, buffer));
|
||||||
|
} catch (Exception e) {
|
||||||
|
on_read_finish(file, IOStatus.error, 0);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final override void cancelRead(FileFD file)
|
||||||
|
{
|
||||||
|
auto f = &m_files[file].read;
|
||||||
|
m_activeReads.remove(file);
|
||||||
|
auto res = () @trusted { return cas(&f.status, ThreadedFileStatus.processing, ThreadedFileStatus.cancelling); } ();
|
||||||
|
assert(res, "Cancelling read when no write is in progress.");
|
||||||
|
}
|
||||||
|
|
||||||
|
final override void addRef(FileFD descriptor)
|
||||||
|
{
|
||||||
|
auto f = &m_files[descriptor];
|
||||||
|
f.refCount++;
|
||||||
|
}
|
||||||
|
|
||||||
|
final override void releaseRef(FileFD descriptor)
|
||||||
|
{
|
||||||
|
auto f = &m_files[descriptor];
|
||||||
|
if (!--f.refCount) {
|
||||||
|
.close(descriptor);
|
||||||
|
m_files[descriptor] = FileInfo.init;
|
||||||
|
assert(!m_activeReads.contains(descriptor));
|
||||||
|
assert(!m_activeWrites.contains(descriptor));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// private
|
||||||
|
static void taskFun(string op, UB)(ThreadedFileEventDriver fd, FileFD file, ulong offset, UB[] buffer)
|
||||||
|
{
|
||||||
|
log("ready event");
|
||||||
|
IOInfo* f = mixin("&fd.m_files[file]."~op);
|
||||||
|
log("wait for cancel");
|
||||||
|
|
||||||
|
// wait for previous cancel requests to finish
|
||||||
|
while (safeAtomicLoad(f.status) == ThreadedFileStatus.cancelling)
|
||||||
|
safeYield();
|
||||||
|
|
||||||
|
log("wait for callback");
|
||||||
|
// wait for previous callbacks to be fired
|
||||||
|
while (safeAtomicLoad(f.status).among(ThreadedFileStatus.finished, ThreadedFileStatus.cancelled))
|
||||||
|
safeYield();
|
||||||
|
|
||||||
|
assert(safeAtomicLoad(f.status) == ThreadedFileStatus.idle);
|
||||||
|
|
||||||
|
log("start processing");
|
||||||
|
auto res = safeCAS(f.status, ThreadedFileStatus.idle, ThreadedFileStatus.processing);
|
||||||
|
assert(res, "Concurrent file "~op~"s are disallowed.");
|
||||||
|
|
||||||
|
auto bytes = buffer;
|
||||||
|
.lseek(file, offset, SEEK_SET);
|
||||||
|
|
||||||
|
scope (exit) {
|
||||||
|
log("trigger event");
|
||||||
|
safeAtomicStore(f.bytesWritten, buffer.length - bytes.length);
|
||||||
|
() @trusted { return cast(shared)fd.m_events; } ().trigger(fd.m_readyEvent);
|
||||||
|
}
|
||||||
|
|
||||||
|
while (bytes.length > 0) {
|
||||||
|
auto sz = min(bytes.length, 4096);
|
||||||
|
auto ret = () @trusted { return mixin("."~op)(file, bytes.ptr, cast(int)sz); } ();
|
||||||
|
if (ret != sz) {
|
||||||
|
f.ioStatus = IOStatus.error;
|
||||||
|
log("error");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
bytes = bytes[sz .. $];
|
||||||
|
log("check for cancel");
|
||||||
|
if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) return;
|
||||||
|
}
|
||||||
|
|
||||||
|
f.ioStatus = IOStatus.ok;
|
||||||
|
|
||||||
|
log("wait for status set");
|
||||||
|
while (true) {
|
||||||
|
if (safeCAS(f.status, ThreadedFileStatus.processing, ThreadedFileStatus.finished)) break;
|
||||||
|
if (safeCAS(f.status, ThreadedFileStatus.cancelling, ThreadedFileStatus.cancelled)) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void onReady(EventID)
|
||||||
|
{
|
||||||
|
log("ready event");
|
||||||
|
foreach (f; m_activeReads) {
|
||||||
|
m_activeReads.remove(f);
|
||||||
|
m_files[f].read.flush(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (f; m_activeWrites) {
|
||||||
|
m_activeWrites.remove(f);
|
||||||
|
m_files[f].write.flush(f);
|
||||||
|
}
|
||||||
|
|
||||||
|
m_events.wait(m_readyEvent, &onReady);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private auto safeAtomicLoad(T)(ref shared(T) v) @trusted { return atomicLoad(v); }
|
||||||
|
private auto safeAtomicStore(T)(ref shared(T) v, T a) @trusted { return atomicStore(v, a); }
|
||||||
|
private auto safeCAS(T, U, V)(ref shared(T) v, U a, V b) @trusted { return cas(&v, a, b); }
|
||||||
|
private void safeYield() @trusted nothrow {
|
||||||
|
import core.thread : Thread;
|
||||||
|
import core.time : seconds;
|
||||||
|
Thread.sleep(0.seconds);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void log(ARGS...)(string fmt, ARGS args)
|
||||||
|
@trusted nothrow {
|
||||||
|
debug (EventCoreLogFiles) {
|
||||||
|
scope (failure) assert(false);
|
||||||
|
import core.thread : Thread;
|
||||||
|
import std.stdio : writef, writefln;
|
||||||
|
writef("[%s] ", Thread.getThis().name);
|
||||||
|
writefln(fmt, args);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue