source: trunk/src/async-fd.lisp

Last change on this file was 6, checked in by psmith, 18 years ago

Moved to standard directory structure

File size: 8.4 KB
Line 
1#|
2Copyright (c) 2006 Risto Laakso
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions
7are met:
81. Redistributions of source code must retain the above copyright
9   notice, this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11   notice, this list of conditions and the following disclaimer in the
12   documentation and/or other materials provided with the distribution.
133. The name of the author may not be used to endorse or promote products
14   derived from this software without specific prior written permission.
15
16THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26|#
27(in-package :nio)
28
29(declaim (optimize (debug 3) (speed 3) (space 0)))
30
31;;; FFI
32
33(defconstant +cmd-get-flags+ 3)
34(defconstant +cmd-set-flags+ 4)
35(defconstant +arg-nonblock+ #x0004)
36
37(defcfun ("close" %close) :int
38  (fd :int))
39
40(defcfun ("fcntl" %fcntl) :int
41  (fd :int)
42  (cmd :int)
43  (arg :int))
44
45(defcfun ("read" %read) :long
46  (fd :int)
47  (buffer :pointer)
48  (nbytes :unsigned-long))
49
50(defcfun ("write" %write) :long
51  (fd :int)
52  (buffer :pointer)
53  (nbytes :unsigned-long))
54
55(defcfun ("memset" %memset) :pointer
56  (buffer :pointer)
57  (byte :int)
58  (len :int))
59
60
61;;; CLASSES
62
63(defclass async-fd ()
64  ((write-fd :initarg :write-fd)
65   (write-queue :initform nil)
66
67   (read-fd :initarg :read-fd)
68
69   (foreign-read-buffer :initform (foreign-alloc :uint8 :count 4096))
70   (foreign-read-buffer-size :initform 4096)
71
72   (lisp-read-buffer :initform (make-uint8-seq 1024))
73   (lisp-read-buffer-write-ptr :initform 0)
74
75   (close-pending :initform nil)
76
77   (accept-filter :initform nil)
78   (read-callback :initform nil) 
79   ))
80
81
82(defmethod print-object ((async-fd async-fd) stream)
83  (with-slots (read-fd write-fd write-queue) async-fd
84    (format stream "#<ASYNC-FD r/w fd: ~D/~D, write queue length: ~D.>"
85            read-fd write-fd (length write-queue))))
86
87
88(defclass packet ()
89  ((buffer  :initarg :buffer  :initform nil :documentation "Foreign array")
90   (size    :initarg :size    :initform 0)
91   (written :initarg :written :initform 0)))
92
93
94;;; FUNCTIONS
95
96
97(defun memzero (ptr size)
98  (%memset ptr 0 size))
99
100
101(defun set-fd-nonblocking (unix-fd)
102  "Set UNIX-FD to non-blocking mode (O_NONBLOCK)."
103  (%fcntl unix-fd +cmd-set-flags+ +arg-nonblock+))
104
105
106(defun close-fd (unix-fd)
107  "Close UNIX-FD."
108  (%close unix-fd))
109
110
111(defun make-uint8-seq (size)
112  "Make uint8 sequence."
113  (make-sequence '(vector (unsigned-byte 8)) size :initial-element 0))
114
115
116(defun map-to-foreign (seq start end)
117  "Map SEQ to foreign array."
118  (let* ((len (- end start))
119         (foreign-array (foreign-alloc :uint8 :count len)))
120
121    (loop for i from 0 below len do
122         (setf (mem-aref foreign-array :uint8 i) (aref seq (+ start i))))
123
124    foreign-array))
125
126
127(defun ensure-lisp-read-buffer-size (async-fd size)
128  "Ensure that ASYNC-FD's LISP-READ-BUFFER is at least SIZE big."
129  (with-slots (lisp-read-buffer lisp-read-buffer-write-ptr) async-fd
130
131    (assert (and (not (null lisp-read-buffer)) (integerp lisp-read-buffer-write-ptr)))
132 
133    (unless (>= (length lisp-read-buffer) size)
134
135      (let ((new-buf (make-uint8-seq (truncate (* 1.5 size)))))
136
137        ;; copy old
138        (loop for i from 0 below lisp-read-buffer-write-ptr do
139             (setf (aref new-buf i) (aref lisp-read-buffer i)))
140
141        (setf lisp-read-buffer new-buf)
142        t))))
143
144
145(define-condition read-error (error) ())
146
147(defun read-more (async-fd)
148  "Read more data from ASYNC-FD."
149  (with-slots (foreign-read-buffer foreign-read-buffer-size) async-fd
150    (with-slots (read-fd lisp-read-buffer lisp-read-buffer-write-ptr) async-fd
151
152    (let ((new-bytes (%read read-fd foreign-read-buffer foreign-read-buffer-size)))
153
154      (cond
155       ((< new-bytes 0) 
156        (error 'read-error))
157
158       ((= new-bytes 0)
159        nil);;(throw 'end-of-file nil))
160
161       (t
162
163        (ensure-lisp-read-buffer-size async-fd (+ lisp-read-buffer-write-ptr new-bytes))
164
165        ;; copy data from foreign buffer to input-buffer
166        (loop for i from 0 below new-bytes do
167              (setf (aref lisp-read-buffer (+ i lisp-read-buffer-write-ptr)) 
168                    (mem-aref foreign-read-buffer :uint8 i)))
169        (incf lisp-read-buffer-write-ptr new-bytes)
170       
171        ;; call callback
172        (with-slots (accept-filter read-callback) async-fd
173          (if accept-filter
174              (if (funcall accept-filter lisp-read-buffer lisp-read-buffer-write-ptr)
175                  (funcall read-callback async-fd (subseq lisp-read-buffer 0 lisp-read-buffer-write-ptr)))
176                  (funcall read-callback async-fd (subseq lisp-read-buffer 0 lisp-read-buffer-write-ptr))))
177
178        (values lisp-read-buffer lisp-read-buffer-write-ptr new-bytes)
179        )))))) 
180
181
182(defun close-async-fd (async-fd)
183  "Close ASYNC-FD's fd after everything has been written from write-queue."
184  (with-slots (write-queue read-fd write-fd foreign-read-buffer) async-fd
185    (cond
186
187      ;; if write-queue is emtpy, close now
188      ((null write-queue)
189       (close-fd read-fd)
190       (foreign-free foreign-read-buffer)
191       (unless (= read-fd write-fd) (close-fd write-fd)))
192
193      ;; data in write-queue, mark as closing
194      (t
195       (setf (slot-value async-fd 'close-pending) t)))))
196
197
198(defun write-more (async-fd)
199  "Write data from ASYNC-FD's write-queue."
200  (with-slots (write-fd write-queue) async-fd
201
202    ;; loop for packets in queue
203    (loop for packet in write-queue do
204
205          (with-slots (buffer size written) packet
206
207            (assert (and (pointerp buffer) (integerp size) (integerp written)))
208
209            ;; write data from entry to socket
210            (let ((now-written (%write write-fd (inc-pointer buffer written) (- size written))))
211              ;; update count
212              (setf written (+ written now-written))
213              ;; unless could write fully, return
214              (unless (= written size)
215                (return)))))
216
217     ;; remove (and dealloc buffers) of entries that have been fully written
218     (let ((unfinished-entries NIL))
219       (setf write-queue
220           (nreverse
221            (dolist (entry write-queue unfinished-entries)
222              (with-slots (size written) entry
223                (if (= size written)
224                    (with-slots (buffer) entry
225                      (foreign-free buffer))
226                    (push entry unfinished-entries)))))))
227
228    ;; if queue empty and close pending, close fd
229    (if (and (null write-queue) (slot-value async-fd 'close-pending))
230        (close-async-fd async-fd))
231
232    t))
233
234
235(defun async-write-seq (async-fd seq &optional (start 0) (end (length seq)))
236  "Queue from SEQ between START and END to write-queue." 
237
238  (assert (and (numberp start) (not (null seq))))
239
240  ;; enqueue sequence
241  (with-slots (write-queue) async-fd
242    (let ((entry (make-instance 'packet :buffer (map-to-foreign seq start end) :size (- end start))))
243      (setf write-queue (append write-queue (list entry)))))
244
245  ;; start writing
246  (write-more async-fd))
247
248
249
250
251
252(defun force-close-async-fd (async-fd)
253  "Drop ASYNC-FD's write-queue and close it."
254  (setf (slot-value async-fd 'write-queue) nil)
255  (close-async-fd async-fd))
256
257
258(defun add-async-fd (event-queue async-fd mode)
259  (ecase mode
260    (:read (add-fd event-queue (slot-value async-fd 'read-fd) :read))
261    (:write (add-fd event-queue (slot-value async-fd 'write-fd) :write))))
262
263
264(defun remove-async-fd (event-queue async-fd mode)
265  (ecase mode
266    (:read (remove-fd event-queue (slot-value async-fd 'read-fd) :read))
267    (:write (remove-fd event-queue (slot-value async-fd 'write-fd) :write))))
268
269
270(defun async-fd-read-fd (async-fd)
271  (slot-value async-fd 'read-fd))
272
273(defun async-fd-write-fd (async-fd)
274  (slot-value async-fd 'write-fd))
275
276(defun set-accept-filter (async-fd accept-filter)
277  (setf (slot-value async-fd 'accept-filter) accept-filter))
278
279(defun set-read-callback (async-fd read-callback)
280  (setf (slot-value async-fd 'read-callback) read-callback))
Note: See TracBrowser for help on using the repository browser.