StringIO.py 7.28 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
"""File-like objects that read from or write to a string buffer.

This implements (nearly) all stdio methods.

f = StringIO()      # ready for writing
f = StringIO(buf)   # ready for reading
f.close()           # explicitly release resources held
flag = f.isatty()   # always false
pos = f.tell()      # get current position
f.seek(pos)         # set current position
f.seek(pos, mode)   # mode 0: absolute; 1: relative; 2: relative to EOF
buf = f.read()      # read until EOF
buf = f.read(n)     # read up to n bytes
buf = f.readline()  # read until end of line ('\n') or EOF
list = f.readlines()# list of f.readline() results until EOF
16
f.truncate([size])  # truncate file at to at most size (default: current pos)
17 18 19 20 21 22
f.write(buf)        # write at current position
f.writelines(list)  # for line in list: f.write(line)
f.getvalue()        # return whole file's contents as a string

Notes:
- Using a real file is often faster (but less convenient).
23 24
- There's also a much faster implementation in C, called cStringIO, but
  it's not subclassable.
25 26 27 28 29 30
- fileno() is left unimplemented so that code which uses it triggers
  an exception early.
- Seeking far beyond EOF and then writing will insert real null
  bytes that occupy space in the buffer.
- There's a simple test set (see end of this file).
"""
Michael W. Hudson's avatar
Michael W. Hudson committed
31
import types
32
try:
33
    from errno import EINVAL
34
except ImportError:
35
    EINVAL = 22
36

37 38
__all__ = ["StringIO"]

39
class StringIO:
Tim Peters's avatar
Tim Peters committed
40 41
    """class StringIO([buffer])

42 43
    When a StringIO object is created, it can be initialized to an existing
    string by passing the string to the constructor. If no string is given,
Tim Peters's avatar
Tim Peters committed
44
    the StringIO will start empty.
45 46 47 48

    The StringIO object can accept either Unicode or 8-bit strings, but
    mixing the two may take some care. If both are used, 8-bit strings that
    cannot be interpreted as 7-bit ASCII (that use the 8th bit) will cause
Tim Peters's avatar
Tim Peters committed
49
    a UnicodeError to be raised when getvalue() is called.
50
    """
51
    def __init__(self, buf = ''):
Michael W. Hudson's avatar
Michael W. Hudson committed
52
        # Force self.buf to be a string or unicode
53
        if type(buf) not in types.StringTypes:
Michael W. Hudson's avatar
Michael W. Hudson committed
54 55
            buf = str(buf)
        self.buf = buf
56 57 58 59 60
        self.len = len(buf)
        self.buflist = []
        self.pos = 0
        self.closed = 0
        self.softspace = 0
61

62 63 64
    def __iter__(self):
        return iter(self.readline, '')

65
    def close(self):
66
        """Free the memory buffer."""
67 68 69
        if not self.closed:
            self.closed = 1
            del self.buf, self.pos
70 71

    def isatty(self):
72 73 74
        if self.closed:
            raise ValueError, "I/O operation on closed file"
        return 0
75 76

    def seek(self, pos, mode = 0):
77 78 79
        if self.closed:
            raise ValueError, "I/O operation on closed file"
        if self.buflist:
80
            self.buf += ''.join(self.buflist)
81 82 83 84 85 86
            self.buflist = []
        if mode == 1:
            pos += self.pos
        elif mode == 2:
            pos += self.len
        self.pos = max(0, pos)
87 88

    def tell(self):
89 90 91
        if self.closed:
            raise ValueError, "I/O operation on closed file"
        return self.pos
92 93

    def read(self, n = -1):
94 95 96
        if self.closed:
            raise ValueError, "I/O operation on closed file"
        if self.buflist:
97
            self.buf += ''.join(self.buflist)
98 99 100 101 102 103 104 105
            self.buflist = []
        if n < 0:
            newpos = self.len
        else:
            newpos = min(self.pos+n, self.len)
        r = self.buf[self.pos:newpos]
        self.pos = newpos
        return r
106 107

    def readline(self, length=None):
108 109 110
        if self.closed:
            raise ValueError, "I/O operation on closed file"
        if self.buflist:
111
            self.buf += ''.join(self.buflist)
112 113 114 115 116 117 118 119 120 121 122 123
            self.buflist = []
        i = self.buf.find('\n', self.pos)
        if i < 0:
            newpos = self.len
        else:
            newpos = i+1
        if length is not None:
            if self.pos + length < newpos:
                newpos = self.pos + length
        r = self.buf[self.pos:newpos]
        self.pos = newpos
        return r
124 125

    def readlines(self, sizehint = 0):
126 127 128 129 130 131 132 133 134 135
        total = 0
        lines = []
        line = self.readline()
        while line:
            lines.append(line)
            total += len(line)
            if 0 < sizehint <= total:
                break
            line = self.readline()
        return lines
136 137

    def truncate(self, size=None):
138 139 140 141 142 143 144 145 146
        if self.closed:
            raise ValueError, "I/O operation on closed file"
        if size is None:
            size = self.pos
        elif size < 0:
            raise IOError(EINVAL, "Negative size not allowed")
        elif size < self.pos:
            self.pos = size
        self.buf = self.getvalue()[:size]
147 148

    def write(self, s):
149 150 151
        if self.closed:
            raise ValueError, "I/O operation on closed file"
        if not s: return
Michael W. Hudson's avatar
Michael W. Hudson committed
152
        # Force s to be a string or unicode
153
        if type(s) not in types.StringTypes:
Michael W. Hudson's avatar
Michael W. Hudson committed
154
            s = str(s)
155 156 157 158 159 160
        if self.pos > self.len:
            self.buflist.append('\0'*(self.pos - self.len))
            self.len = self.pos
        newpos = self.pos + len(s)
        if self.pos < self.len:
            if self.buflist:
161
                self.buf += ''.join(self.buflist)
162 163 164 165 166 167 168 169 170
                self.buflist = []
            self.buflist = [self.buf[:self.pos], s, self.buf[newpos:]]
            self.buf = ''
            if newpos > self.len:
                self.len = newpos
        else:
            self.buflist.append(s)
            self.len = newpos
        self.pos = newpos
171 172

    def writelines(self, list):
173
        self.write(''.join(list))
174 175

    def flush(self):
176 177
        if self.closed:
            raise ValueError, "I/O operation on closed file"
178 179

    def getvalue(self):
180 181 182 183 184 185 186 187
        """
        Retrieve the entire contents of the "file" at any time before
        the StringIO object's close() method is called.

        The StringIO object can accept either Unicode or 8-bit strings,
        but mixing the two may take some care. If both are used, 8-bit
        strings that cannot be interpreted as 7-bit ASCII (that use the
        8th bit) will cause a UnicodeError to be raised when getvalue()
Tim Peters's avatar
Tim Peters committed
188
        is called.
189
        """
190
        if self.buflist:
191
            self.buf += ''.join(self.buflist)
192 193
            self.buflist = []
        return self.buf
194 195 196 197 198


# A little test suite

def test():
199 200
    import sys
    if sys.argv[1:]:
201
        file = sys.argv[1]
202
    else:
203
        file = '/etc/passwd'
204 205 206 207
    lines = open(file, 'r').readlines()
    text = open(file, 'r').read()
    f = StringIO()
    for line in lines[:-2]:
208
        f.write(line)
209 210
    f.writelines(lines[-2:])
    if f.getvalue() != text:
211
        raise RuntimeError, 'write failed'
212 213 214 215 216 217 218 219 220 221 222 223
    length = f.tell()
    print 'File length =', length
    f.seek(len(lines[0]))
    f.write(lines[1])
    f.seek(0)
    print 'First line =', `f.readline()`
    here = f.tell()
    line = f.readline()
    print 'Second line =', `line`
    f.seek(-len(line), 1)
    line2 = f.read(len(line))
    if line != line2:
224
        raise RuntimeError, 'bad result after seek back'
225 226 227 228 229 230
    f.seek(len(line2), 1)
    list = f.readlines()
    line = list[-1]
    f.seek(f.tell() - len(line))
    line2 = f.read()
    if line != line2:
231
        raise RuntimeError, 'bad result after seek back from EOF'
232 233 234
    print 'Read', len(list), 'more lines'
    print 'File length =', f.tell()
    if f.tell() != length:
235
        raise RuntimeError, 'bad length'
236
    f.close()
237 238

if __name__ == '__main__':
239
    test()