Ignore:
Timestamp:
02/10/07 20:36:43 (18 years ago)
Author:
psmith
Message:

First stab at rpc multiplexing

Location:
branches/home/psmith/restructure/src
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified branches/home/psmith/restructure/src/io/nio-package.lisp

    r74 r81  
    4343             ;;ip-authorisation
    4444             check-ip load-ips
     45             
     46             ;;nodes
     47             node with-connected-nodes active-conn
    4548             ))
  • TabularUnified branches/home/psmith/restructure/src/io/nio-server.lisp

    r78 r81  
    3636;TODO thread safety
    3737(defparameter +connected-sockets-queue+ (nio-compat:concurrent-queue)
    38   "List of sockets that have been connected and are awaiting addition to the event-notification system")
     38  "List of node objects that are to be connected to")
    3939
    4040;loop over hashtable
     
    151151
    152152                                        ;add outgoing sockets to event queue
    153 #+nio-debug2     (format-log t "nio-server:start-server - Processing client add ~A~%" +connected-sockets-queue+)
    154 
    155              (loop for new-fd = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null new-fd) do
     153#+nio-debug2     (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+)
     154             (loop for node = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
     155#+nio-debug       (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node)
     156                  (push node *nodes-list*))
     157             (with-connect-ready-nodes (a-node)
     158#+nio-debug       (format-log t "nio-server:start-server - attempting connection to node ~A~%" a-node)
     159               (let ((new-fd (connect (host a-node) (port a-node) connection-type)))
     160                 (update-last-connect-attempt a-node)
     161                 (when new-fd
    156162#+nio-debug       (format-log t "nio-server:start-server - adding connection to nio thread ~A~%" new-fd)
    157                   (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd)
    158                   (add-async-fd event-queue new-fd :read-write))
     163                   (setf (gethash (async-fd-read-fd new-fd) client-hash) new-fd)
     164                   (add-async-fd event-queue new-fd :read-write))))
    159165             
    160166                                        ;loop over async-fd's processing where necessary
     
    165171
    166172
    167 (defun add-connection (host port connection-type
    168                        &key
    169                        (protocol :inet))
    170   (format-log t "nio-server:add-connection - Called with: ~A:~A:~A ~%" protocol host port)
     173(defun connect(host port connection-type
     174               &key
     175               (protocol :inet))
     176  (format-log t "nio-server:connect - Called with: ~A:~A:~A ~%" protocol host port)
    171177  (let ((sock nil))
    172178    (setq sock (ecase protocol
     
    176182    (if (connect-inet-socket sock host port)
    177183        (let ((sm (create-state-machine connection-type sock sock sock)))
    178           (nio-compat:add +connected-sockets-queue+ sm)
    179           (format-log t "nio-server:add-connection - Socket enqueued: ~A~%" +connected-sockets-queue+)
    180           (return-from add-connection sm))
     184;         (nio-compat:add +connected-sockets-queue+ sm)
     185;         (format-log t "nio-server:connect - Socket enqueued: ~A~%" +connected-sockets-queue+)
     186          (return-from connect sm))
    181187        (format t "Connect failed!!~A ~%" (get-errno)))))
    182    
     188
     189(defun add-connection(node)
     190  (nio-compat:add +connected-sockets-queue+ node))
  • TabularUnified branches/home/psmith/restructure/src/io/nio.asd

    r80 r81  
    1010                 (:file "async-fd" :depends-on ("fd-helper"))
    1111                 (:file "async-socket" :depends-on ("async-fd"))
    12                  (:file "nio-server" :depends-on ("async-socket"))
     12                 (:file "nodes" :depends-on ("nio-package"))
     13                 (:file "nio-server" :depends-on ("async-socket" "nodes"))
    1314                 (:file "ip-authorisation" :depends-on ("nio-package"))
    14                  (:file "nodes" :depends-on ("nio-package"))
    1515                 )
    1616
  • TabularUnified branches/home/psmith/restructure/src/io/nodes.lisp

    r80 r81  
    7676      (+ (last-connect-attempt node) (retry-delay node))))
    7777
     78(defun allowed-to-connect(node)
     79  (if (null (last-connect-attempt node))
     80      t
     81      (and (not (active-conn node)) (< (+ (last-connect-attempt node) (retry-delay node))  (get-universal-high-res)))))
     82
    7883(defun update-last-connect-attempt(node)
    7984  (setf (last-connect-attempt node) (get-universal-high-res)))
    8085
     86;;iterates over the nodes list looking for nodes that are ready to be connected to
     87;;i.e. the SM is null and the next-allowed-connect time has expired
     88(defmacro with-connect-ready-nodes ((node) &rest body)
     89  `(dolist (,node *nodes-list*)
     90     (when (allowed-to-connect ,node) ,@body)))
     91
     92
     93(defmacro with-connected-nodes ((node) &rest body)
     94  `(dolist (,node *nodes-list*)
     95     (when (active-conn ,node) ,@body)))
  • TabularUnified branches/home/psmith/restructure/src/nio-logger/nio-logger.lisp

    r79 r81  
    4242
    4343
     44(defun callback(result)
     45  (nio-utils:format-log t "Result of remote-log ~A~%" result))
     46
     47
    4448;;Tail the given log and write to remote logger
    4549;;e.g. (tail-log "/var/log/httpd/access_log" "192.168.1.1")
    4650(defun tail-log(filename ip-address)
    4751  (sleep 4)
    48   (let ((sm (nio:add-connection ip-address 16323 'nio-yarpc:yarpc-client-state-machine)))
    49     (nio-utils:format-log t "toplevel adding conn ~A to ~A~%" sm ip-address)
    50     (with-line-from-tailed-file (text filename 1)
    51       (let ((rpc (format nil "(nio-logger:remote-log \"~A\")" (cl-base64:string-to-base64-string text))))
    52         (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc)
    53         (nio-utils:format-log t "Result of remote-log ~A~%" (nio-yarpc:remote-execute sm rpc))))))
     52  (nio:add-connection (nio:node ip-address 16323))
     53  (with-line-from-tailed-file (text filename 1)
     54    (let ((rpc (format nil "(nio-logger:remote-log \"~A\")" (cl-base64:string-to-base64-string text))))
     55      (nio-utils:format-log t "Toplevel Submitting job~A~%" rpc)
     56      (nio:with-connected-nodes (node)
     57        (nio-yarpc:remote-execute (nio:active-conn node) rpc #'callback)))))
    5458
    5559;Runs a multithreaded system with an IO thread dealing with IO only and a 'job'  thread taking and executing jobs
  • TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp

    r78 r81  
    3737              :accessor job-queue
    3838              :documentation "The queue used to hand off work from an external thread to the io thread")
    39    (result-queue :initform (nio-compat:concurrent-queue)
    40                  :accessor result-queue
    41                  :documentation "The queue used to return results from the io thread to an external thread")))
     39   (request-map :initform (make-hash-table)
     40                :reader request-map
     41                :documentation "A map from request-id (a unique id for this request) to remote-job")))
     42
     43(defclass remote-job()
     44  ((callback :accessor callback
     45             :documentation "A function accepting one argument to call with the result of the remote operation")
     46   (start-time :initform (get-universal-high-res)
     47              :reader start-time
     48              :documentation "The (floating point) start time")   
     49   (timeout :initarg :timeout
     50            :initform 1.5
     51            :documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close.")))
     52
     53(defun remote-job(callback)
     54  (make-instance 'remote-job :callback callback))
     55
    4256
    4357(defun yarpc-client-state-machine ()
     
    5670(defconstant STATE-SENT-REQUEST 1)
    5771
     72(defparameter +request-id+ 0)
     73
    5874(defmethod process-outgoing-packet((sm yarpc-client-state-machine))
    5975#+nio-debug2  (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
    60   (let ((packet (nio-compat:take (job-queue sm) :blocking-call nil)))
    61     (when packet
    62       (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" packet)
    63       (setf (state sm) STATE-SENT-REQUEST))
    64     packet))
     76  (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil)))
     77    (when ttd
     78      (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd)
     79      (destructuring-bind (job call-string) ttd
     80        (setf (gethash (1+ +request-id+) (request-map sm)) job)
     81        (make-instance 'call-method-packet :call-string call-string :request-id +request-id+)))))
    6582
    6683(defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet))
    67   (assert (eql (state sm) STATE-SENT-REQUEST))
    6884  (format-log t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response)
    6985  (let* ((*package* (find-package :nio-yarpc))
    7086         (result  (read-from-string (response response))))
    71     (setf (state sm) STATE-INITIALISED)   
    7287    (nio-compat:add (result-queue sm) result)))
    73  
    74 ;Called from an external thread i.e. *not* the nio thread
    75 ;Blocks calling thread on the remote m/c's response
    76 (defmethod remote-execute ((sm yarpc-client-state-machine) call-string)
    77   (assert (eql (state sm) STATE-INITIALISED))
    78   (nio-compat:add (job-queue sm) (make-instance 'call-method-packet :call-string call-string))
    79   (nio-compat:take (result-queue sm)))
     88
     89;Execute the call-string on the remote node and call callback with the result
     90(defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback)
     91  (nio-compat:add (job-queue sm) '((remote-job callback) call-string)))
  • TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-packet-factory.lisp

    r67 r81  
    4141(defconstant +PACKET-ID-SIZE+ 1)
    4242(defconstant +PACKET-LENGTH-SIZE+ 4)
     43;(defconstant +PACKET-REQUEST-ID+ 4)
    4344
    4445(defconstant +yarpc-packet-header-size+
     
    4748(defmethod get-packet ((pf yarpc-packet-factory) buf)
    4849  (flip buf)
    49   (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size
     50  (if (>= (remaining buf) +yarpc-packet-header-size+) ;; First byte denotes packet ID ;;bytes 2,3,4,5 denote packet size ;; 6,7,8,9 request-id
    5051      (let ((packet-id (bytebuffer-read-8 buf))
    5152            (packet-length (bytebuffer-read-32 buf)))
    5253        (if (<= (- packet-length +yarpc-packet-header-size+) (remaining buf)) ;is the whole packet available in the buffer?
    53             (let ((ret-packet (ecase packet-id
    54                                 (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)))))
    55                                 (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+))))))))
     54            (let* ((packet-request-id (bytebuffer-read-32 buf))
     55                   (ret-packet (ecase packet-id
     56                                (0 (progn (format-log t "yarpc-packet-factory:get-packet - got CALL-METHOD-PACKET-ID~%") (call-method-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id)))
     57                                (1 (progn (format-log t "yarpc-packet-factory:get-packet - got METHOD-RESPONSE-PACKET-ID~%") (method-response-packet (bytebuffer-read-string buf (- packet-length +yarpc-packet-header-size+)) :request-id packet-request-id))))))
    5658              (compact buf)
    5759              #+nio-debug  (format-log t "yarpc-packet-factory:get-packet - after compact ~%~A~%" buf)
     
    6567
    6668
    67 (defclass call-method-packet (packet)((call-string :initarg :call-string
     69(defclass yarpc-packet(packet)
     70  ((request-id :initarg :request-id
     71               :reader request-id)))
     72
     73(defclass call-method-packet (yarpc-packet)((call-string :initarg :call-string
    6874                                            :accessor call-string)))
    6975(defun call-method-packet (call-string)
     
    8086        (nio-buffer:bytebuffer-write-8 buf +CALL-METHOD-PACKET-ID+)
    8187        (nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later
     88        (nio-buffer:bytebuffer-write-32 buf (request-id packet))
    8289        (nio-buffer:bytebuffer-write-string buf (call-string packet))
    8390        (nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1)
     
    93100     (length (sb-ext:string-to-octets (write-to-string (call-string packet))))))
    94101
    95 (defclass method-response-packet (packet)
     102(defclass method-response-packet (yarpc-packet)
    96103  ((response :initarg :response
    97104             :accessor response)))
     
    110117        (nio-buffer:bytebuffer-write-8 buf +METHOD-RESPONSE-PACKET-ID+)
    111118        (nio-buffer:bytebuffer-write-32 buf 0) ; come back and write length later
     119        (nio-buffer:bytebuffer-write-32 buf (request-id packet))
    112120        (nio-buffer:bytebuffer-write-string buf (write-to-string (response packet)))
    113121        (nio-buffer:bytebuffer-insert-32 buf (buffer-position buf) 1)
  • TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp

    r74 r81  
    5959
    6060
    61 (defun run-job(&key (wait-on-job-pdw t))
     61(defun run-job(&key (blocking t))
    6262  (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%")
    63   (destructuring-bind (job result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call wait-on-job-pdw)
     63  (destructuring-bind (job request-id result-queue) (nio-compat:take nio-yarpc:job-queue :blocking-call blocking)
    6464    (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
    65     (nio-compat:add result-queue (nio-yarpc:execute-call job))))
     65    (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))
    6666
    6767
    6868(defmethod process-outgoing-packet((sm yarpc-state-machine))
    6969  (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" )
    70   (let ((result (nio-compat:take (result-queue sm) :blocking-call nil)))
    71     (format-log t "yarpc-state-machine:process-outgoing-packet - got result ~A ~%" result)
     70  (destructuring-bind (request-id result) (nio-compat:take (result-queue sm) :blocking-call nil)
     71    (format-log t "yarpc-state-machine:process-outgoing-packet - got :request-id ~A result ~A ~%" request-id result)
    7272     (when result
    73         (method-response-packet result))))
     73        (method-response-packet result :request-id request-id))))
    7474
    7575;Process a call method packet by placing it in the job-queue
    7676(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
    77   (assert (eql (state sm) STATE-INITIALISED))
    7877  (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call)
    79   (nio-compat:add job-queue (list (call-string call) (result-queue sm)))
    80   (when +process-jobs-inline+ (run-job :wait-on-job-pdw nil)))
     78  (nio-compat:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
     79  (when +process-jobs-inline+ (run-job :blocking nil)))
    8180
    8281
Note: See TracChangeset for help on using the changeset viewer.