Changeset 81 for branches/home/psmith/restructure
- Timestamp:
- 02/10/07 20:36:43 (18 years ago)
- Location:
- branches/home/psmith/restructure/src
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified branches/home/psmith/restructure/src/io/nio-package.lisp ¶
r74 r81 43 43 ;;ip-authorisation 44 44 check-ip load-ips 45 46 ;;nodes 47 node with-connected-nodes active-conn 45 48 )) -
TabularUnified branches/home/psmith/restructure/src/io/nio-server.lisp ¶
r78 r81 36 36 ;TODO thread safety 37 37 (defparameter +connected-sockets-queue+ (nio-compat:concurrent-queue) 38 "List of sockets that have been connected and are awaiting addition to the event-notification system")38 "List of node objects that are to be connected to") 39 39 40 40 ;loop over hashtable … … 151 151 152 152 ;add outgoing sockets to event queue 153 #+nio-debug2 (format-log t "nio-server:start-server - Processing client add ~A~%" +connected-sockets-queue+) 154 155 (loop for new-fd = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null new-fd) do 153 #+nio-debug2 (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+) 154 (loop for node = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do 155 #+nio-debug (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node) 156 (push node *nodes-list*)) 157 (with-connect-ready-nodes (a-node) 158 #+nio-debug (format-log t "nio-server:start-server - attempting connection to node ~A~%" a-node) 159 (let ((new-fd (connect (host a-node) (port a-node) connection-type))) 160 (update-last-connect-attempt a-node) 161 (when new-fd 156 162 #+nio-debug (format-log t "nio-server:start-server - adding connection to nio thread ~A~%" new-fd) 157 158 (add-async-fd event-queue new-fd :read-write))163 (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd) 164 (add-async-fd event-queue new-fd :read-write)))) 159 165 160 166 ;loop over async-fd's processing where necessary … … 165 171 166 172 167 (defun add-connection(host port connection-type168 169 170 (format-log t "nio-server: add-connection- Called with: ~A:~A:~A ~%" protocol host port)173 (defun connect(host port connection-type 174 &key 175 (protocol :inet)) 176 (format-log t "nio-server:connect - Called with: ~A:~A:~A ~%" protocol host port) 171 177 (let ((sock nil)) 172 178 (setq sock (ecase protocol … … 176 182 (if (connect-inet-socket sock host port) 177 183 (let ((sm (create-state-machine connection-type sock sock sock))) 178 (nio-compat:add +connected-sockets-queue+ sm)179 (format-log t "nio-server:add-connection- Socket enqueued: ~A~%" +connected-sockets-queue+)180 (return-from add-connectionsm))184 ; (nio-compat:add +connected-sockets-queue+ sm) 185 ; (format-log t "nio-server:connect - Socket enqueued: ~A~%" +connected-sockets-queue+) 186 (return-from connect sm)) 181 187 (format t "Connect failed!!~A ~%" (get-errno))))) 182 188 189 (defun add-connection(node) 190 (nio-compat:add +connected-sockets-queue+ node)) -
TabularUnified branches/home/psmith/restructure/src/io/nio.asd ¶
r80 r81 10 10 (:file "async-fd" :depends-on ("fd-helper")) 11 11 (:file "async-socket" :depends-on ("async-fd")) 12 (:file "nio-server" :depends-on ("async-socket")) 12 (:file "nodes" :depends-on ("nio-package")) 13 (:file "nio-server" :depends-on ("async-socket" "nodes")) 13 14 (:file "ip-authorisation" :depends-on ("nio-package")) 14 (:file "nodes" :depends-on ("nio-package"))15 15 ) 16 16 -
TabularUnified branches/home/psmith/restructure/src/io/nodes.lisp ¶
r80 r81 76 76 (+ (last-connect-attempt node) (retry-delay node)))) 77 77 78 (defun allowed-to-connect(node) 79 (if (null (last-connect-attempt node)) 80 t 81 (and (not (active-conn node)) (< (+ (last-connect-attempt node) (retry-delay node)) (get-universal-high-res))))) 82 78 83 (defun update-last-connect-attempt(node) 79 84 (setf (last-connect-attempt node) (get-universal-high-res))) 80 85 86 ;;iterates over the nodes list looking for nodes that are ready to be connected to 87 ;;i.e. the SM is null and the next-allowed-connect time has expired 88 (defmacro with-connect-ready-nodes ((node) &rest body) 89 `(dolist (,node *nodes-list*) 90 (when (allowed-to-connect ,node) ,@body))) 91 92 93 (defmacro with-connected-nodes ((node) &rest body) 94 `(dolist (,node *nodes-list*) 95 (when (active-conn ,node) ,@body))) -
TabularUnified branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp ¶
r79 r81 42 42 43 43 44 (defun callback(result) 45 (nio-utils:format-log t "Result of remote-log ~A~%" result)) 46 47 44 48 ;;Tail the given log and write to remote logger 45 49 ;;e.g. (tail-log "/var/log/httpd/access_log" "192.168.1.1") 46 50 (defun tail-log(filename ip-address) 47 51 (sleep 4) 48 ( let ((sm (nio:add-connection ip-address 16323 'nio-yarpc:yarpc-client-state-machine)))49 (nio-utils:format-log t "toplevel adding conn ~A to ~A~%" sm ip-address)50 ( with-line-from-tailed-file (text filename 1)51 ( let ((rpc (format nil "(nio-logger:remote-log \"~A\")" (cl-base64:string-to-base64-string text))))52 (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc)53 (nio- utils:format-log t "Result of remote-log ~A~%" (nio-yarpc:remote-execute sm rpc))))))52 (nio:add-connection (nio:node ip-address 16323)) 53 (with-line-from-tailed-file (text filename 1) 54 (let ((rpc (format nil "(nio-logger:remote-log \"~A\")" (cl-base64:string-to-base64-string text)))) 55 (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc) 56 (nio:with-connected-nodes (node) 57 (nio-yarpc:remote-execute (nio:active-conn node) rpc #'callback))))) 54 58 55 59 ;Runs a multithreaded system with an IO thread dealing with IO only and a 'job' thread taking and executing jobs -
TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp ¶
r78 r81 37 37 :accessor job-queue 38 38 :documentation "The queue used to hand off work from an external thread to the io thread") 39 (result-queue :initform (nio-compat:concurrent-queue) 40 :accessor result-queue 41 :documentation "The queue used to return results from the io thread to an external thread"))) 39 (request-map :initform (make-hash-table) 40 :reader request-map 41 :documentation "A map from request-id (a unique id for this request) to remote-job"))) 42 43 (defclass remote-job() 44 ((callback :accessor callback 45 :documentation "A function accepting one argument to call with the result of the remote operation") 46 (start-time :initform (get-universal-high-res) 47 :reader start-time 48 :documentation "The (floating point) start time") 49 (timeout :initarg :timeout 50 :initform 1.5 51 :documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close."))) 52 53 (defun remote-job(callback) 54 (make-instance 'remote-job :callback callback)) 55 42 56 43 57 (defun yarpc-client-state-machine () … … 56 70 (defconstant STATE-SENT-REQUEST 1) 57 71 72 (defparameter +request-id+ 0) 73 58 74 (defmethod process-outgoing-packet((sm yarpc-client-state-machine)) 59 75 #+nio-debug2 (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%") 60 (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil))) 61 (when packet 62 (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" packet) 63 (setf (state sm) STATE-SENT-REQUEST)) 64 packet)) 76 (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil))) 77 (when ttd 78 (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd) 79 (destructuring-bind (job call-string) ttd 80 (setf (gethash (1+ +request-id+) (request-map sm)) job) 81 (make-instance 'call-method-packet :call-string call-string :request-id +request-id+))))) 65 82 66 83 (defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet)) 67 (assert (eql (state sm) STATE-SENT-REQUEST))68 84 (format-log t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response) 69 85 (let* ((*package* (find-package :nio-yarpc)) 70 86 (result (read-from-string (response response)))) 71 (setf (state sm) STATE-INITIALISED)72 87 (nio-compat:add (result-queue sm) result))) 73 74 ;Called from an external thread i.e. *not* the nio thread 75 ;Blocks calling thread on the remote m/c's response 76 (defmethod remote-execute ((sm yarpc-client-state-machine) call-string) 77 (assert (eql (state sm) STATE-INITIALISED)) 78 (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string)) 79 (nio-compat:take (result-queue sm))) 88 89 ;Execute the call-string on the remote node and call callback with the result 90 (defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback) 91 (nio-compat:add (job-queue sm) '((remote-job callback) call-string))) -
TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp ¶
r67 r81 41 41 (defconstant +PACKET-ID-SIZE+ 1) 42 42 (defconstant +PACKET-LENGTH-SIZE+ 4) 43 ;(defconstant +PACKET-REQUEST-ID+ 4) 43 44 44 45 (defconstant +yarpc-packet-header-size+ … … 47 48 (defmethod get-packet ((pf yarpc-packet-factory) buf) 48 49 (flip buf) 49 (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size 50 (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size ;; 6,7,8,9 request-id 50 51 (let ((packet-id (bytebuffer-read-8 buf)) 51 52 (packet-length (bytebuffer-read-32 buf))) 52 53 (if (<= (- packet-length +yarpc-packet-header-size+) (remaining buf)) ;is the whole packet available in the buffer? 53 (let ((ret-packet (ecase packet-id 54 (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+))))) 55 (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)))))))) 54 (let* ((packet-request-id (bytebuffer-read-32 buf)) 55 (ret-packet (ecase packet-id 56 (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id))) 57 (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id)))))) 56 58 (compact buf) 57 59 #+nio-debug (format-log t "yarpc-packet-factory:get-packet - after compact ~%~A~%" buf) … … 65 67 66 68 67 (defclass call-method-packet (packet)((call-string :initarg :call-string 69 (defclass yarpc-packet(packet) 70 ((request-id :initarg :request-id 71 :reader request-id))) 72 73 (defclass call-method-packet (yarpc-packet)((call-string :initarg :call-string 68 74 :accessor call-string))) 69 75 (defun call-method-packet (call-string) … … 80 86 (nio-buffer:bytebuffer-write-8 buf +CALL-METHOD-PACKET-ID+) 81 87 (nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later 88 (nio-buffer:bytebuffer-write-32 buf (request-id packet)) 82 89 (nio-buffer:bytebuffer-write-string buf (call-string packet)) 83 90 (nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1) … … 93 100 (length (sb-ext:string-to-octets (write-to-string (call-string packet)))))) 94 101 95 (defclass method-response-packet ( packet)102 (defclass method-response-packet (yarpc-packet) 96 103 ((response :initarg :response 97 104 :accessor response))) … … 110 117 (nio-buffer:bytebuffer-write-8 buf +METHOD-RESPONSE-PACKET-ID+) 111 118 (nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later 119 (nio-buffer:bytebuffer-write-32 buf (request-id packet)) 112 120 (nio-buffer:bytebuffer-write-string buf (write-to-string (response packet))) 113 121 (nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1) -
TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp ¶
r74 r81 59 59 60 60 61 (defun run-job(&key ( wait-on-job-pdwt))61 (defun run-job(&key (blocking t)) 62 62 (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%") 63 (destructuring-bind (job re sult-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call wait-on-job-pdw)63 (destructuring-bind (job request-id result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call blocking) 64 64 (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job) 65 (nio-compat:add result-queue ( nio-yarpc:execute-call job))))65 (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job))))) 66 66 67 67 68 68 (defmethod process-outgoing-packet((sm yarpc-state-machine)) 69 69 (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" ) 70 ( let ((result (nio-compat:take (result-queue sm) :blocking-call nil)))71 (format-log t "yarpc-state-machine:process-outgoing-packet - got result ~A ~%"result)70 (destructuring-bind (request-id result) (nio-compat:take (result-queue sm) :blocking-call nil) 71 (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result) 72 72 (when result 73 (method-response-packet result ))))73 (method-response-packet result :request-id request-id)))) 74 74 75 75 ;Process a call method packet by placing it in the job-queue 76 76 (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet)) 77 (assert (eql (state sm) STATE-INITIALISED))78 77 (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call) 79 (nio-compat:add job-queue (list (call-string call) (re sult-queue sm)))80 (when +process-jobs-inline+ (run-job : wait-on-job-pdwnil)))78 (nio-compat:add job-queue (list (call-string call) (request-id call) (result-queue sm))) 79 (when +process-jobs-inline+ (run-job :blocking nil))) 81 80 82 81
Note: See TracChangeset
for help on using the changeset viewer.