Ignore:
Timestamp:
01/17/07 05:06:13 (18 years ago)
Author:
psmith
Message:

yarpc work, saving...

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp

    r37 r40  
    3333;; A server that processes remote procedure calls and returns results
    3434;;
    35 ;; Test with:
    36 ;; > telnet 127.0.0.1 16323
    37 ;; Trying 127.0.0.1...
    38 ;; Connected to 127.0.0.1.
    39 ;; Escape character is '^]'.
    40 ;; (test-rpc "who" 2 's)
    41 ;; response - who 2 'S
    42 ;;
    4335(defclass yarpc-state-machine (state-machine)
    44   ((outgoing-packet :initarg :outgoing-packet
    45                     :accessor outgoing-packet
    46                     :initform nil)))
     36  ((job-queue :initarg :job-queue
     37              :initform (error "Must supply a job queue to write work to.")
     38              :accessor job-queue
     39              :documentation "The queue used to hand off work from the NIO thread to an external thread for execution")
     40   (result-queue :initform (nio-compat:concurrent-queue)
     41                 :accessor result-queue
     42                 :documentation "The queue used to return results from an external thread to the nio thread")))
    4743
    48 (defun yarpc-state-machine ()
    49     (make-instance 'yarpc-state-machine))
     44(defun yarpc-state-machine (read-fd write-fd socket job-queue)
     45  (let ((sm (make-instance 'yarpc-state-machine :read-fd read-fd :write-fd write-fd :socket socket :job-queue job-queue)))
     46    (nio-buffer:clear (foreign-read-buffer sm))
     47    (nio-buffer:clear (foreign-write-buffer sm))
     48    (format t "yarpc-state-machine - Created ~S~%" sm)
     49    sm))
    5050
    5151(defparameter yarpc-pf (yarpc-packet-factory))
     
    5353(defmethod get-packet-factory((sm yarpc-state-machine))
    5454  yarpc-pf)
     55
     56(defmethod print-object ((sm yarpc-state-machine) stream)
     57  (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil)))
     58
     59(defconstant STATE-INITIALISED 0)
     60(defconstant STATE-SEND-RESPONSE 1)
     61
     62(defparameter state STATE-INITIALISED)
     63
     64
     65(defmethod process-outgoing-packet((sm yarpc-state-machine))
     66  (format t "yarpc-state-machine: process-outgoing-packet called, polling the results-queue ~%")
     67  (let ((packet (nio-compat:take (result-queue sm) :blocking-call nil)))
     68    (format t "yarpc-state-machine: process-outgoing-packet got result ~A ~%" packet)
     69    packet))
     70
     71
     72;Process a call method packet, returns
     73(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
     74  (assert (eql state STATE-INITIALISED))
     75  (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
     76  (nio-compat:add (job-queue sm) (cons (call-string call) (result-queue sm)))) 
     77
     78
     79;Called from an external thread i.e. *not* the nio thread
     80;Blocks waiting for a job (call-string,result-queue) to process and return the result into the result queue
     81;(defmethod get-job ((sm yarpc-state-machine))
     82;  (values (nio-compat:take (job-queue sm)) (result-queue sm)))
     83
     84
    5585
    5686;;TODO move somewhere suitable
     
    72102  (format nil "response - ~A ~A ~A ~A~%" a b c (code-char #x2211)))
    73103
    74 ;;end move TODO
    75 
    76 
    77 ;;;Utils
    78 
    79 (defun print-hashtable (table &optional (stream t))
    80   (maphash #'(lambda (k v) (format stream "~a -> ~a~%" k v)) table))
    81 ;;;
    82 
    83 
    84 (defmethod print-object ((sm yarpc-state-machine) stream)
    85   (format stream "#<YARPC-STATE-MACHINE ~A >" (call-next-method sm nil)))
    86 
    87 (defconstant STATE-INITIALISED 0)
    88 (defconstant STATE-SEND-RESPONSE 1)
    89 
    90 (defparameter state STATE-INITIALISED)
    91 
    92104(define-condition authorization-error (error) ())
    93 
    94 
    95 (defmethod process-outgoing-packet((sm yarpc-state-machine))
    96   (format t "process-outgoing-packet called~%")
    97   (let ((packet (outgoing-packet sm)))
    98     (setf (outgoing-packet sm) nil)
    99     packet))
    100 
    101 ;TODO queue and thread stuf
    102 (defmethod queue-outgoing-packet((sm yarpc-state-machine) packet)
    103   (setf (outgoing-packet sm) packet))
    104 
    105 ;Process a call method packet, returns
    106 (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
    107   (assert (eql state STATE-INITIALISED))
    108   (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm call)
    109   (handler-case
    110     (let ((result (execute-call (call-string call))))
    111       (when result
    112         (let ((response-packet (progn
    113                                   (setf state STATE-SEND-RESPONSE)
    114                                   (queue-outgoing-packet sm (method-response-packet result)))))
    115           t)))
    116     (reader-error (re) (format t "No such function ~A~%" (call-string call)))
    117     (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call)))))
    118 
    119 (defmethod process-incoming-packet ((sm yarpc-state-machine) (response method-response-packet))
    120   (assert (eql state STATE-INITIALISED))
    121   (format t "yarpc-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response))
    122  
    123105
    124106(defun execute-call (call-string)
     
    130112            (error 'authorization-error))))
    131113
     114;;end move TODO
    132115
    133 (defmethod remote-execute ((sm yarpc-state-machine) call-string)
    134   (queue-outgoing-packet sm (make-instance 'call-method-packet :call-string call-string)))
    135    
     116
     117
     118
     119
     120;  (handler-case
     121;    (let ((result (execute-call (call-string call))))
     122;      (when result
     123;       (let ((response-packet (progn
     124;                                  (setf state STATE-SEND-RESPONSE)
     125;                                  (queue-outgoing-packet sm (method-response-packet result)))))
     126;          t)))
     127;    (reader-error (re) (format t "No such function ~A~%" (call-string call)))
     128;    (authorization-error (ae) (format t "Function not declared with defremote ~A~%" (call-string call))))
Note: See TracChangeset for help on using the changeset viewer.