source: branches/home/psmith/restructure/src/io/nio-server.lisp

Last change on this file was 109, checked in by psmith, 18 years ago

Remove +process-jobs-inline+ as can't work like this. Added timeout mechanism

File size: 7.8 KB
Line 
1#|
2Copyright (c) 2006 Risto Laakso
3All rights reserved.
4
5Redistribution and use in source and binary forms, with or without
6modification, are permitted provided that the following conditions
7are met:
81. Redistributions of source code must retain the above copyright
9   notice, this list of conditions and the following disclaimer.
102. Redistributions in binary form must reproduce the above copyright
11   notice, this list of conditions and the following disclaimer in the
12   documentation and/or other materials provided with the distribution.
133. The name of the author may not be used to endorse or promote products
14   derived from this software without specific prior written permission.
15
16THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26|#
27(in-package :nio)
28
29(declaim (optimize (debug 3) (speed 3) (space 0)))
30
31(defun trivial-accept (client)
32  (declare (ignore client))
33;;  (format t "Accepting connection from ~S:~D [~A].~%" host port proto)
34  t)
35
36;TODO thread safety
37(defparameter +connected-sockets-queue+ (nio-utils:concurrent-queue)
38  "List of node objects that are to be connected to")
39
40;loop over hashtable
41(defun process-async-fds (client-hash)
42  (let ((removals nil))
43    (maphash #'(lambda (k async-fd) 
44#+nio-debug2    (format-log t "Dealing with ~a => ~a~%" k async-fd)
45               
46;process reads
47               (handler-case
48                   (progn
49                     (process-timeout async-fd)
50                     (when (read-ready async-fd) (read-more async-fd))
51                     (when (> (buffer-position (foreign-read-buffer async-fd)) 0)
52                       (process-read async-fd))
53;process-writes
54                     (loop 
55                        (when (and (write-ready async-fd) (> (buffer-position (foreign-write-buffer async-fd)) 0))
56                          (write-more async-fd))
57                        (when (write-ready async-fd) 
58                            (assert (eql (buffer-position (foreign-write-buffer async-fd)) 0))
59                            (process-write async-fd))
60                        (unless (and (write-ready async-fd) (> (buffer-position (foreign-write-buffer async-fd)) 0)) (return)))
61
62;process normal close
63                     (when (close-pending async-fd)
64                       (write-more async-fd)
65                       (push async-fd removals)))
66                 (read-error (re) (push async-fd removals))
67                 (write-error (we) (push async-fd removals))
68                 (timeout (to) (push async-fd removals))))
69           client-hash)
70    (dolist (async-fd removals)
71      (format-log t "nio-server:process-async-fds processing remove for ~a~%" async-fd)
72      (close-sm async-fd)
73      (setf (active-conn (socket async-fd)) nil)
74      (remhash (async-fd-read-fd async-fd) client-hash))))
75                             
76
77(defun start-server (connection-type
78                     &key 
79                     (protocol :inet)
80                     (port 0) ;//if set then listen
81                     (host "127.0.0.1")
82                     (accept-connection #'trivial-accept))
83  (let (sock
84        (event-queue (make-event-queue))
85        (client-hash (make-hash-table :test 'eql))
86        )
87
88    (when (not (eql port 0))
89      (format t "Binding to ~A:~A~%" host port)
90      (setq sock (ecase protocol
91                   (:inet (make-inet-socket)) 
92                   (:inet6 (make-inet6-socket))))
93      (unless (ecase protocol 
94                (:inet (bind-inet-socket sock port host))
95                (:inet6 (bind-inet6-socket sock port host)))
96        (error "Can't bind socket!"))
97      (set-fd-nonblocking sock)
98      (format t "~&Starting server on ~S port ~S.. (socket fd is ~D)~%" host port sock)     
99      (start-listen sock)
100      (add-fd event-queue sock :read :trigger :level))
101           
102    (format t "waiting for events..~%") (force-output)
103
104    (catch 'poll-error-exit
105      (handler-bind ((poll-error #'(lambda (cond) 
106                                     (declare (ignore cond))
107                                     (format t "Poll-error (errno ~A), exiting..~%" (get-errno))
108                                     (throw 'poll-error-exit nil))))
109       
110        (loop 
111           (let ((unix-epoll-events (poll-events event-queue)))
112             (loop for (fd . event) in unix-epoll-events do         
113                  (cond
114                    ;; new connection
115                    ((and sock (= fd sock))
116                     (progn
117#+nio-debug                    (format t "start-server - incomming conn")
118                       (let ((async-fd (socket-accept fd connection-type)))
119#+nio-debug                    (format t "start-server - New conn: ~A~%" async-fd)
120                       (cond
121                         ((null async-fd)
122                          (format t "Accept failed.~%"))
123
124                         ;; accept connection ?
125                         ((funcall accept-connection async-fd)
126                          (sleep 0.1)
127                          (let ((nb-ret (set-fd-nonblocking (async-fd-read-fd async-fd))))
128                            (format t "set bb ret: ~A :flags ~A~%" nb-ret (get-fd-flags (async-fd-read-fd async-fd)))
129                            (when (< nb-ret 0) 
130                              (format t "Error setting socket non-blocking: ")
131                              (perror)))
132                          (setf (gethash (async-fd-read-fd async-fd) client-hash) async-fd)
133                          (add-async-fd event-queue async-fd :read-write))
134
135                         ;; no accept, close
136                         (t
137                          (format-log t "start-server - accept-connection closed~%")
138                          (close-async-fd async-fd))))))
139
140
141                    ;; socket i/o available
142                    (t
143                     (let ((async-fd (gethash fd client-hash)))
144#+nio-debug                    (format-log t "nio-server::start-server - IO event ~A on ~A~%" event async-fd)
145                       (unless (null async-fd)
146                         (catch 'error-exit
147                           (handler-bind ((read-error #'(lambda (x) 
148                                                          (declare (ignore x))
149                                                          (format t "read-error, dropping ~A.~%" async-fd)
150                                                          (setf (gethash (async-fd-read-fd async-fd) client-hash) nil)
151                                                          (remove-async-fd event-queue async-fd :read)
152                                                          (remove-async-fd event-queue async-fd :write)
153                                                          (force-close-async-fd async-fd)
154                                                          (throw 'error-exit nil))))
155
156                             (if (error-event-p event) 
157                                 (close-sm async-fd)
158                                 (progn
159                                   (when (read-event-p event) (setf (read-ready async-fd) t))
160                                   (when (write-event-p event) (setf (write-ready async-fd) t)))))))))))
161
162;add outgoing sockets to event queue
163#+nio-debug2     (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+)
164             (loop for node = (nio-utils:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
165#+nio-debug       (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node)
166                  (push node *nodes-list*))
167
168             (with-connect-ready-nodes (a-node)
169#+nio-debug       (format-log t "nio-server:start-server - attempting connection to node ~A~%" a-node)
170               (let ((new-fd (connect a-node connection-type)))
171#+nio-debug       (format-log t "nio-server:start-server - connect returned async-fd ~A~%" new-fd)
172                 (update-last-connect-attempt a-node)
173                 (when new-fd
174#+nio-debug       (format-log t "nio-server:start-server - adding connection to nio thread ~A~%" new-fd)
175                   (set-fd-nonblocking (async-fd-read-fd new-fd))
176                   (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd)
177                   (setf (active-conn a-node) new-fd)
178                   (add-async-fd event-queue new-fd :read-write))))
179             
180                                        ;loop over async-fd's processing where necessary
181             (process-async-fds client-hash)
182                  ))))
183    (ignore-errors 
184      (close-fd sock))))
185
186
187(defun connect(node connection-type
188               &key 
189               (protocol :inet))
190  (format-log t "nio-server:connect - Called with: ~A ~A~%" protocol node)
191  (let ((sock nil))
192    (setq sock (ecase protocol
193                 (:inet (make-inet-socket)) 
194                 (:inet6 (make-inet6-socket))))
195   
196    (if (connect-inet-socket sock node)
197        (let ((sm (create-state-machine connection-type sock sock node)))
198          (return-from connect sm))
199        (progn
200          (format t "Connect failed!!~A ~%" (get-errno))
201          (close-fd sock)
202          nil))))
203
204(defun add-connection(node)
205  (nio-utils:add +connected-sockets-queue+ node))
Note: See TracBrowser for help on using the repository browser.