1 | #| |
---|
2 | Copyright (c) 2007 |
---|
3 | All rights reserved. |
---|
4 | |
---|
5 | Redistribution and use in source and binary forms, with or without |
---|
6 | modification, are permitted provided that the following conditions |
---|
7 | are met: |
---|
8 | 1. Redistributions of source code must retain the above copyright |
---|
9 | notice, this list of conditions and the following disclaimer. |
---|
10 | 2. Redistributions in binary form must reproduce the above copyright |
---|
11 | notice, this list of conditions and the following disclaimer in the |
---|
12 | documentation and/or other materials provided with the distribution. |
---|
13 | 3. The name of the author may not be used to endorse or promote products |
---|
14 | derived from this software without specific prior written permission. |
---|
15 | |
---|
16 | THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR |
---|
17 | IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES |
---|
18 | OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. |
---|
19 | IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, |
---|
20 | INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT |
---|
21 | NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
---|
22 | DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
---|
23 | THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
---|
24 | (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF |
---|
25 | THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
---|
26 | |# |
---|
27 | (in-package :nio-yarpc) |
---|
28 | |
---|
29 | (declaim (optimize (debug 3) (speed 3) (space 0))) |
---|
30 | |
---|
31 | ;; YetAnotherRPC Client state machine |
---|
32 | ;; |
---|
33 | ;; A client that accepts jobs to be run via a threadsafe queue and then submits them to the remote end for execution |
---|
34 | ;; |
---|
35 | (defclass yarpc-client-state-machine (state-machine) |
---|
36 | ((job-queue :initform (nio-utils:concurrent-queue) |
---|
37 | :accessor job-queue |
---|
38 | :documentation "The queue used to hand off work from an external thread to the io thread") |
---|
39 | (request-map :initform (make-hash-table) |
---|
40 | :reader request-map |
---|
41 | :documentation "A map from request-id (a unique id for this request) to remote-job"))) |
---|
42 | |
---|
43 | (defclass remote-job() |
---|
44 | ((callback :initarg :callback |
---|
45 | :accessor callback |
---|
46 | :documentation "A function accepting one argument to call with the result of the remote operation") |
---|
47 | (start-time :initform (get-universal-high-res) |
---|
48 | :reader start-time |
---|
49 | :documentation "The (floating point) start time") |
---|
50 | (timeout :initarg :timeout |
---|
51 | :reader timeout |
---|
52 | :documentation "The time in seconds before a timeout should occur, abviously we dont guarantee that this will be honored, it depends on other processing but should be close."))) |
---|
53 | |
---|
54 | (defparameter +rpc-timeout+ 60 |
---|
55 | "The number of seconds before a remote call is considered timedout") |
---|
56 | |
---|
57 | (defun remote-job(callback &key (timeout +rpc-timeout+)) |
---|
58 | (make-instance 'remote-job :callback callback :timeout timeout)) |
---|
59 | |
---|
60 | |
---|
61 | (defun yarpc-client-state-machine () |
---|
62 | (make-instance 'yarpc-client-state-machine)) |
---|
63 | |
---|
64 | (defparameter yarpc-pf (yarpc-packet-factory)) |
---|
65 | |
---|
66 | (defmethod get-packet-factory((sm yarpc-client-state-machine)) |
---|
67 | yarpc-pf) |
---|
68 | |
---|
69 | |
---|
70 | (defmethod print-object ((sm yarpc-client-state-machine) stream) |
---|
71 | (format stream "#<YARPC-CLIENT-STATE-MACHINE ~A >" (call-next-method sm nil))) |
---|
72 | |
---|
73 | (defmethod print-object ((job remote-job) stream) |
---|
74 | (format stream "#<REMOTE-JOB :start-time ~A :timeout ~A>" (start-time job) (timeout job))) |
---|
75 | |
---|
76 | |
---|
77 | (defconstant STATE-INITIALISED 0) |
---|
78 | (defconstant STATE-SENT-REQUEST 1) |
---|
79 | |
---|
80 | (defparameter +request-id+ 0) |
---|
81 | |
---|
82 | |
---|
83 | (defun check-timeouts(id job) |
---|
84 | ; (format-log t "Checking timeout on ~A~%" job) |
---|
85 | (when (> (get-universal-high-res) (+ (start-time job) (timeout job))) |
---|
86 | (format-log t "Timeout detected ~A ~A~%" id job) |
---|
87 | t)) |
---|
88 | |
---|
89 | (defun finish-job (request-id sm result) |
---|
90 | "Remove the job from the request map and call the callback with the result" |
---|
91 | (let ((remote-job (gethash request-id (request-map sm)))) |
---|
92 | (when remote-job |
---|
93 | (remhash request-id (request-map sm)) |
---|
94 | (funcall (callback remote-job) result)))) |
---|
95 | |
---|
96 | (defmethod process-timeout((sm yarpc-client-state-machine)) |
---|
97 | (let ((requests (request-map sm))) |
---|
98 | #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, searching for timeouts in ~A ~%" requests) |
---|
99 | (maphash #'(lambda (id job) |
---|
100 | (when (check-timeouts id job) (finish-job id sm nil))) |
---|
101 | requests))) |
---|
102 | |
---|
103 | |
---|
104 | (defmethod process-outgoing-packet((sm yarpc-client-state-machine)) |
---|
105 | #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%") |
---|
106 | (let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil))) |
---|
107 | (when ttd |
---|
108 | #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd) |
---|
109 | (destructuring-bind (job call-string) ttd |
---|
110 | (setf (gethash (incf +request-id+) (request-map sm)) job) |
---|
111 | (make-instance 'call-method-packet :call-string call-string :request-id +request-id+))))) |
---|
112 | |
---|
113 | (defmethod process-incoming-packet ((sm yarpc-client-state-machine) (response method-response-packet)) |
---|
114 | #+nio-debug (format-log t "yarpc-client-state-machine:process-incoming-packet called :sm ~A :packet ~A~%" sm response) |
---|
115 | (let* ((*package* (find-package :nio-yarpc)) |
---|
116 | (result (read-from-string (response response))) |
---|
117 | (request-id (request-id response))) |
---|
118 | #+nio-debug (format-log t "yarpc-client-state-machine:process-incoming-packet :result ~A :request-id ~A~%" result request-id) |
---|
119 | ; (maphash #'(lambda (k v) (format t "~a -> ~a~%" k v)) (request-map sm)) |
---|
120 | (finish-job request-id sm result))) |
---|
121 | |
---|
122 | (defparameter *simulate-calls* nil) |
---|
123 | |
---|
124 | ;Execute the call-string on the remote node and call callback with the result |
---|
125 | (defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback) |
---|
126 | #+nio-debug (format-log t "yarpc-client-state-machine:remote-execute called :sm ~A :call-string ~A :callback ~A~%" sm call-string callback) |
---|
127 | (if *simulate-calls* |
---|
128 | (funcall callback (execute-call call-string)) |
---|
129 | (nio-utils:add (job-queue sm) (list (remote-job callback) call-string)))) |
---|
130 | |
---|
131 | |
---|
132 | (defun simulate-connection() |
---|
133 | (setf *simulate-calls* t) |
---|
134 | (let* ((node (nio:node "127.0.0.1" 9999))) |
---|
135 | (setf (nio:active-conn node) (nio::create-state-machine 'yarpc-client-state-machine 1 1 6)) |
---|
136 | (push node nio::*nodes-list*))) |
---|
137 | |
---|
138 | |
---|
139 | |
---|
140 | (defun test-timeout() |
---|
141 | (let* ((done nil) |
---|
142 | (job (remote-job #'(lambda(x) (format-log t "~A finished~%" x) (setf done t)) :timeout 30))) |
---|
143 | (format-log t "Job: ~A~%" job) |
---|
144 | (loop while (not done) do |
---|
145 | (check-timeouts 99 job) |
---|
146 | (format-log t ".~%") |
---|
147 | (sleep 1)) |
---|
148 | (format-log t "done test~%"))) |
---|