codon/stdlib/internal/file.codon

412 lines
10 KiB
Python

# Copyright (C) 2022-2023 Exaloop Inc. <https://exaloop.io>
from internal.gc import realloc, free
class File:
sz: int
buf: Ptr[byte]
fp: cobj
def __init__(self, fp: cobj):
self.fp = fp
self._reset()
def __init__(self, path: str, mode: str):
self.fp = _C.fopen(path.c_str(), mode.c_str())
if not self.fp:
raise IOError(f"file {path} could not be opened")
self._reset()
def _errcheck(self, msg: str):
err = int(_C.ferror(self.fp))
if err:
raise IOError(f"file I/O error: {msg}")
def __enter__(self):
pass
def __exit__(self):
self.close()
def __iter__(self) -> Generator[str]:
for a in self._iter():
yield a.__ptrcopy__()
def readlines(self) -> List[str]:
return [l for l in self]
def write(self, s: str):
self._ensure_open()
_C.fwrite(s.ptr, 1, len(s), self.fp)
self._errcheck("error in write")
def __file_write_gen__(self, g: Generator[T], T: type):
for s in g:
self.write(str(s))
def read(self, sz: int = -1) -> str:
self._ensure_open()
if sz < 0:
SEEK_SET = 0
SEEK_END = 2
cur = _C.ftell(self.fp)
_C.fseek(self.fp, 0, i32(SEEK_END))
sz = _C.ftell(self.fp) - cur
_C.fseek(self.fp, cur, i32(SEEK_SET))
buf = Ptr[byte](sz)
ret = _C.fread(buf, 1, sz, self.fp)
self._errcheck("error in read")
return str(buf, ret)
def tell(self) -> int:
self._ensure_open()
ret = _C.ftell(self.fp)
self._errcheck("error in tell")
return ret
def seek(self, offset: int, whence: int):
self._ensure_open()
_C.fseek(self.fp, offset, i32(whence))
self._errcheck("error in seek")
def flush(self):
self._ensure_open()
_C.fflush(self.fp)
def close(self):
if self.fp:
_C.fclose(self.fp)
self.fp = cobj()
if self.buf:
_C.free(self.buf)
self._reset()
def _ensure_open(self):
if not self.fp:
raise IOError("I/O operation on closed file")
def _reset(self):
self.buf = Ptr[byte]()
self.sz = 0
def _iter(self) -> Generator[str]:
self._ensure_open()
while True:
rd = _C.getline(
Ptr[Ptr[byte]](self.__raw__() + 8), Ptr[int](self.__raw__()), self.fp
)
if rd != -1:
yield str(self.buf, rd)
else:
break
def _iter_trim_newline(self) -> Generator[str]:
self._ensure_open()
while True:
rd = _C.getline(
Ptr[Ptr[byte]](self.__raw__() + 8), Ptr[int](self.__raw__()), self.fp
)
if rd != -1:
if self.buf[rd - 1] == byte(10):
rd -= 1
yield str(self.buf, rd)
else:
break
def _gz_errcheck(stream: cobj):
errnum = i32(0)
msg = _C.gzerror(stream, __ptr__(errnum))
if msg and msg[0]:
raise IOError(f"zlib error: {str(msg, _C.strlen(msg))}")
class gzFile:
sz: int
buf: Ptr[byte]
fp: cobj
def __init__(self, fp: cobj):
self.fp = fp
self._reset()
def __init__(self, path: str, mode: str):
self.fp = _C.gzopen(path.c_str(), mode.c_str())
if not self.fp:
raise IOError(f"file {path} could not be opened")
self._reset()
def _getline(self) -> int:
if not self.buf:
self.sz = 128
self.buf = Ptr[byte](self.sz)
offset = 0
while True:
if not _C.gzgets(self.fp, self.buf + offset, i32(self.sz - offset)):
_gz_errcheck(self.fp)
if offset == 0:
return -1
break
offset += _C.strlen(self.buf + offset)
if self.buf[offset - 1] == byte(10): # '\n'
break
oldsz = self.sz
self.sz *= 2
self.buf = realloc(self.buf, self.sz, oldsz)
return offset
def __iter__(self) -> Generator[str]:
for a in self._iter():
yield a.__ptrcopy__()
def __enter__(self):
pass
def __exit__(self):
self.close()
def close(self):
if self.fp:
_C.gzclose(self.fp)
self.fp = cobj()
if self.buf:
free(self.buf)
self._reset()
def readlines(self) -> List[str]:
return [l for l in self]
def write(self, s: str):
self._ensure_open()
_C.gzwrite(self.fp, s.ptr, u32(len(s)))
_gz_errcheck(self.fp)
def __file_write_gen__(self, g: Generator[T], T: type):
for s in g:
self.write(str(s))
def read(self, sz: int = -1) -> str:
self._ensure_open()
if sz < 0:
buf = _strbuf()
for a in self._iter():
buf.append(a)
return buf.__str__()
buf = Ptr[byte](sz)
ret = _C.gzread(self.fp, buf, u32(sz))
_gz_errcheck(self.fp)
return str(buf, int(ret))
def tell(self) -> int:
self._ensure_open()
ret = _C.gztell(self.fp)
_gz_errcheck(self.fp)
return ret
def seek(self, offset: int, whence: int):
self._ensure_open()
_C.gzseek(self.fp, offset, i32(whence))
_gz_errcheck(self.fp)
def flush(self):
Z_FINISH = 4
self._ensure_open()
_C.gzflush(self.fp, i32(Z_FINISH))
_gz_errcheck(self.fp)
def _iter(self) -> Generator[str]:
self._ensure_open()
while True:
rd = self._getline()
if rd != -1:
yield str(self.buf, rd)
else:
break
def _iter_trim_newline(self) -> Generator[str]:
self._ensure_open()
while True:
rd = self._getline()
if rd != -1:
if self.buf[rd - 1] == byte(10):
rd -= 1
yield str(self.buf, rd)
else:
break
def _ensure_open(self):
if not self.fp:
raise IOError("I/O operation on closed file")
def _reset(self):
self.buf = cobj()
self.sz = 0
def _bz_errcheck(stream: cobj):
errnum = i32(0)
msg = _C.BZ2_bzerror(stream, __ptr__(errnum))
if errnum < i32(0):
raise IOError(f"bzip2 error: {str(msg, _C.strlen(msg))}")
class bzFile:
sz: int
buf: Ptr[byte]
fp: cobj
def __init__(self, fp: cobj):
self.fp = fp
self._reset()
def __init__(self, path: str, mode: str):
self.fp = _C.BZ2_bzopen(path.c_str(), mode.c_str())
if not self.fp:
raise IOError(f"file {path} could not be opened")
self._reset()
def _getline(self) -> int:
if not self.buf:
self.sz = 128
self.buf = Ptr[byte](self.sz)
offset = 0
while True:
m = self._gets(self.buf + offset, self.sz - offset)
if m == 0:
if offset == 0:
return -1
break
elif m < 0:
_bz_errcheck(self.fp)
if offset == 0:
return -1
break
offset += m
if self.buf[offset - 1] == byte(10): # '\n'
break
oldsz = self.sz
self.sz *= 2
self.buf = realloc(self.buf, self.sz, oldsz)
return offset
def __iter__(self) -> Generator[str]:
for a in self._iter():
yield a.__ptrcopy__()
def __enter__(self):
pass
def __exit__(self):
self.close()
def close(self):
if self.fp:
_C.BZ2_bzclose(self.fp)
self.fp = cobj()
if self.buf:
free(self.buf)
self._reset()
def readlines(self) -> List[str]:
return [l for l in self]
def write(self, s: str):
self._ensure_open()
_C.BZ2_bzwrite(self.fp, s.ptr, i32(len(s)))
_bz_errcheck(self.fp)
def __file_write_gen__(self, g: Generator[T], T: type):
for s in g:
self.write(str(s))
def read(self, sz: int = -1) -> str:
self._ensure_open()
if sz < 0:
buf = _strbuf()
for a in self._iter():
buf.append(a)
return buf.__str__()
buf = Ptr[byte](sz)
ret = _C.BZ2_bzread(self.fp, buf, i32(sz))
_bz_errcheck(self.fp)
return str(buf, int(ret))
def flush(self):
self._ensure_open()
_C.BZ2_bzflush(self.fp)
_bz_errcheck(self.fp)
def _iter(self) -> Generator[str]:
self._ensure_open()
while True:
rd = self._getline()
if rd != -1:
yield str(self.buf, rd)
else:
break
def _iter_trim_newline(self) -> Generator[str]:
self._ensure_open()
while True:
rd = self._getline()
if rd != -1:
if self.buf[rd - 1] == byte(10):
rd -= 1
yield str(self.buf, rd)
else:
break
def _ensure_open(self):
if not self.fp:
raise IOError("I/O operation on closed file")
def _reset(self):
self.buf = cobj()
self.sz = 0
def _gets(self, buf: cobj, sz: int):
if sz <= 0:
return 0
fp = self.fp
b = byte(0)
i = 0
while i < sz - 1:
status = _C.BZ2_bzread(fp, __ptr__(b), i32(1))
if status == i32(0):
break
elif status < i32(0):
return -1
buf[i] = b
i += 1
if b == byte(10):
break
return i
def open(path: str, mode: str = "r") -> File:
return File(path, mode)
def gzopen(path: str, mode: str = "r") -> gzFile:
return gzFile(path, mode)
def bzopen(path: str, mode: str = "r") -> gzFile:
return bzFile(path, mode)
def is_binary(path: str) -> bool:
# https://stackoverflow.com/questions/898669/how-can-i-detect-if-a-file-is-binary-non-text-in-python/7392391#7392391
# Can get both false positive and false negatives, but still is a
# clever approach that works for the large majority of files
textchars = {7, 8, 9, 10, 12, 13, 27} | set(iter(range(0x20, 0x100))) - {0x7F}
with open(path, "rb") as f:
header = f.read(1024)
return any(ord(c) not in textchars for c in header)