Base FileStream on eventcore.

This commit is contained in:
Sönke Ludwig 2016-10-06 22:06:51 +02:00
parent 1337715b2d
commit cf75d968d5
2 changed files with 62 additions and 102 deletions

View file

@ -9,14 +9,17 @@ module vibe.core.file;
//public import vibe.core.stream; //public import vibe.core.stream;
//public import vibe.inet.url; //public import vibe.inet.url;
import eventcore.core : eventDriver;
import eventcore.driver;
import vibe.core.log;
import vibe.core.path; import vibe.core.path;
import vibe.internal.async : asyncAwait;
import core.stdc.stdio; import core.stdc.stdio;
import core.sys.posix.unistd; import core.sys.posix.unistd;
import core.sys.posix.fcntl; import core.sys.posix.fcntl;
import core.sys.posix.sys.stat; import core.sys.posix.sys.stat;
import std.conv : octal; import std.conv : octal;
import vibe.core.log;
import std.datetime; import std.datetime;
import std.exception; import std.exception;
import std.file; import std.file;
@ -34,8 +37,7 @@ version(Posix){
*/ */
FileStream openFile(Path path, FileMode mode = FileMode.read) FileStream openFile(Path path, FileMode mode = FileMode.read)
{ {
assert(false); return FileStream(eventDriver.files.open(path.toNativeString(), cast(FileOpenMode)mode), path, mode);
//return eventDriver.openFile(path, mode);
} }
/// ditto /// ditto
FileStream openFile(string path, FileMode mode = FileMode.read) FileStream openFile(string path, FileMode mode = FileMode.read)
@ -362,116 +364,56 @@ struct FileInfo {
*/ */
enum FileMode { enum FileMode {
/// The file is opened read-only. /// The file is opened read-only.
read, read = FileOpenMode.read,
/// The file is opened for read-write random access. /// The file is opened for read-write random access.
readWrite, readWrite = FileOpenMode.readWrite,
/// The file is truncated if it exists or created otherwise and then opened for read-write access. /// The file is truncated if it exists or created otherwise and then opened for read-write access.
createTrunc, createTrunc = FileOpenMode.createTrunc,
/// The file is opened for appending data to it and created if it does not exist. /// The file is opened for appending data to it and created if it does not exist.
append append = FileOpenMode.append
} }
/** /**
Accesses the contents of a file as a stream. Accesses the contents of a file as a stream.
*/ */
struct FileStream { struct FileStream {
import std.algorithm.comparison : min;
import vibe.core.core : yield;
import core.stdc.errno;
version (Windows) {} else
{
enum O_BINARY = 0;
}
private { private {
int m_fileDescriptor; FileFD m_fd;
Path m_path; Path m_path;
ulong m_size; ulong m_size;
ulong m_ptr = 0;
FileMode m_mode; FileMode m_mode;
bool m_ownFD = true; ulong m_ptr;
} }
this(Path path, FileMode mode) this(FileFD fd, Path path, FileMode mode)
{ {
auto pathstr = path.toNativeString(); m_fd = fd;
final switch(mode){
case FileMode.read:
m_fileDescriptor = open(pathstr.toStringz(), O_RDONLY|O_BINARY);
break;
case FileMode.readWrite:
m_fileDescriptor = open(pathstr.toStringz(), O_RDWR|O_BINARY);
break;
case FileMode.createTrunc:
m_fileDescriptor = open(pathstr.toStringz(), O_RDWR|O_CREAT|O_TRUNC|O_BINARY, octal!644);
break;
case FileMode.append:
m_fileDescriptor = open(pathstr.toStringz(), O_WRONLY|O_CREAT|O_APPEND|O_BINARY, octal!644);
break;
}
if( m_fileDescriptor < 0 )
//throw new Exception(format("Failed to open '%s' with %s: %d", pathstr, cast(int)mode, errno));
throw new Exception("Failed to open file '"~pathstr~"'.");
this(m_fileDescriptor, path, mode);
}
this(int fd, Path path, FileMode mode)
{
assert(fd >= 0);
m_fileDescriptor = fd;
m_path = path; m_path = path;
m_mode = mode; m_mode = mode;
m_size = eventDriver.files.getSize(fd);
version(linux){
// stat_t seems to be defined wrong on linux/64
m_size = lseek(m_fileDescriptor, 0, SEEK_END);
} else {
stat_t st;
fstat(m_fileDescriptor, &st);
m_size = st.st_size;
// (at least) on windows, the created file is write protected
version(Windows){
if( mode == FileMode.createTrunc )
chmod(path.toNativeString().toStringz(), S_IREAD|S_IWRITE);
}
}
lseek(m_fileDescriptor, 0, SEEK_SET);
logDebug("opened file %s with %d bytes as %d", path.toNativeString(), m_size, m_fileDescriptor);
} }
~this() this(this) { eventDriver.files.addRef(m_fd); }
{ ~this() { eventDriver.files.releaseRef(m_fd); }
close();
}
@property int fd() { return m_fileDescriptor; } @property int fd() { return m_fd; }
/// The path of the file. /// The path of the file.
@property Path path() const { return m_path; } @property Path path() const { return m_path; }
/// Determines if the file stream is still open /// Determines if the file stream is still open
@property bool isOpen() const { return m_fileDescriptor >= 0; } @property bool isOpen() const { return m_fd != FileFD.init; }
@property ulong size() const { return m_size; } @property ulong size() const { return m_size; }
@property bool readable() const { return m_mode != FileMode.append; } @property bool readable() const { return m_mode != FileMode.append; }
@property bool writable() const { return m_mode != FileMode.read; } @property bool writable() const { return m_mode != FileMode.read; }
void takeOwnershipOfFD() void takeOwnershipOfFD()
{ {
enforce(m_ownFD); assert(false, "TODO!");
m_ownFD = false;
} }
void seek(ulong offset) void seek(ulong offset)
{ {
version (Win32) {
enforce(offset <= off_t.max, "Cannot seek above 4GB on Windows x32.");
auto pos = lseek(m_fileDescriptor, cast(off_t)offset, SEEK_SET);
} else auto pos = lseek(m_fileDescriptor, offset, SEEK_SET);
enforce(pos == offset, "Failed to seek in file.");
m_ptr = offset; m_ptr = offset;
} }
@ -480,9 +422,10 @@ struct FileStream {
/// Closes the file handle. /// Closes the file handle.
void close() void close()
{ {
if( m_fileDescriptor != -1 && m_ownFD ){ if (m_fd != FileFD.init) {
.close(m_fileDescriptor); eventDriver.files.close(m_fd);
m_fileDescriptor = -1; eventDriver.files.releaseRef(m_fd);
m_fd = FileFD.init;
} }
} }
@ -496,31 +439,24 @@ struct FileStream {
} }
void read(ubyte[] dst) void read(ubyte[] dst)
{ {
assert(this.readable); auto res = asyncAwait!(FileIOCallback,
while (dst.length > 0) { cb => eventDriver.files.read(m_fd, m_ptr, dst, cb),
enforce(dst.length <= leastSize); cb => eventDriver.files.cancelRead(m_fd)
auto sz = min(dst.length, 4096); );
enforce(.read(m_fileDescriptor, dst.ptr, cast(int)sz) == sz, "Failed to read data from disk."); enforce(res[1] == IOStatus.ok, "Failed to read data from disk.");
dst = dst[sz .. $];
m_ptr += sz;
yield();
}
} }
void write(in ubyte[] bytes_) void write(in ubyte[] bytes)
{ {
const(ubyte)[] bytes = bytes_; auto res = asyncAwait!(FileIOCallback,
assert(this.writable); cb => eventDriver.files.write(m_fd, m_ptr, bytes, cb),
while (bytes.length > 0) { cb => eventDriver.files.cancelWrite(m_fd)
auto sz = min(bytes.length, 4096); );
auto ret = .write(m_fileDescriptor, bytes.ptr, cast(int)sz); m_ptr += res[2];
import std.format : format; logDebug("Written %s", res[2]);
enforce(ret == sz, format("Failed to write data to disk. %s %s %s %s", sz, errno, ret, m_fileDescriptor)); if (m_ptr > m_size) m_size = m_ptr;
bytes = bytes[sz .. $]; enforce(res[1] == IOStatus.ok, "Failed to read data from disk.");
m_ptr += sz;
yield();
}
} }
void write(InputStream)(InputStream stream, ulong nbytes = 0) void write(InputStream)(InputStream stream, ulong nbytes = 0)

24
tests/vibe.core.file.d Normal file
View file

@ -0,0 +1,24 @@
/++ dub.sdl:
name "test"
dependency "vibe-core" path=".."
+/
module test;
import vibe.core.file;
void main()
{
auto f = openFile("test.dat", FileMode.createTrunc);
assert(f.size == 0);
f.write([1, 2, 3, 4, 5]);
assert(f.size == 5);
f.seek(0);
f.write([1, 2, 3, 4, 5]);
assert(f.size == 5);
f.write([6, 7, 8, 9, 10]);
assert(f.size == 10);
ubyte[5] dst;
f.seek(2);
f.read(dst);
assert(dst[] == [3, 4, 5, 6, 7]);
}