Changeset 94 for branches/home/psmith
- Timestamp:
- 02/22/07 22:50:56 (18 years ago)
- Location:
- branches/home/psmith/restructure/src
- Files:
-
- 9 edited
- 1 moved
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified branches/home/psmith/restructure/src/compat/nio-compat-package.lisp ¶
r62 r94 31 31 ;; errno.lisp 32 32 get-errno +ERRNO_EAGAIN+ perror 33 34 ;;concurrent-queue 35 concurrent-queue add take 33 ;;threading 34 with-mutex make-mutex 36 35 )) -
TabularUnified branches/home/psmith/restructure/src/compat/nio-compat.asd ¶
r62 r94 7 7 :components ((:file "nio-compat-package") 8 8 (:file "errno" :depends-on ("nio-compat-package")) 9 (:file " concurrent-queue" :depends-on ("nio-compat-package"))9 (:file "threading" :depends-on ("nio-compat-package")) 10 10 ) 11 11 -
TabularUnified branches/home/psmith/restructure/src/io/nio-server.lisp ¶
r91 r94 35 35 36 36 ;TODO thread safety 37 (defparameter +connected-sockets-queue+ (nio- compat:concurrent-queue)37 (defparameter +connected-sockets-queue+ (nio-utils:concurrent-queue) 38 38 "List of node objects that are to be connected to") 39 39 … … 156 156 ;add outgoing sockets to event queue 157 157 #+nio-debug2 (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+) 158 (loop for node = (nio- compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do158 (loop for node = (nio-utils:take +connected-sockets-queue+ :blocking-call nil) until (null node) do 159 159 #+nio-debug (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node) 160 160 (push node *nodes-list*)) … … 193 193 194 194 (defun add-connection(node) 195 (nio- compat:add +connected-sockets-queue+ node))195 (nio-utils:add +connected-sockets-queue+ node)) -
TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd ¶
r40 r94 11 11 ) 12 12 13 :depends-on (:nio :nio-sm :nio- compat))13 :depends-on (:nio :nio-sm :nio-utils)) -
TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp ¶
r84 r94 34 34 ;; 35 35 (defclass yarpc-client-state-machine (state-machine) 36 ((job-queue :initform (nio- compat:concurrent-queue)36 ((job-queue :initform (nio-utils:concurrent-queue) 37 37 :accessor job-queue 38 38 :documentation "The queue used to hand off work from an external thread to the io thread") … … 75 75 (defmethod process-outgoing-packet((sm yarpc-client-state-machine)) 76 76 #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%") 77 (let ((ttd (nio- compat:take (job-queue sm) :blocking-call nil)))77 (let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil))) 78 78 (when ttd 79 79 #+nio-debug (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd) … … 95 95 (defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback) 96 96 #+nio-debug (format-log t "yarpc-client-state-machine:remote-execute called :sm ~A :call-string ~A :callback ~A~%" sm call-string callback) 97 (nio- compat:add (job-queue sm) (list (remote-job callback) call-string)))97 (nio-utils:add (job-queue sm) (list (remote-job callback) call-string))) -
TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp ¶
r93 r94 35 35 (defclass yarpc-state-machine (state-machine) 36 36 ( 37 (result-queue :initform (nio- compat:concurrent-queue)37 (result-queue :initform (nio-utils:concurrent-queue) 38 38 :accessor result-queue 39 39 :documentation "The queue used to return results from an external thread to the nio thread"))) 40 40 41 (defparameter job-queue (nio- compat:concurrent-queue)41 (defparameter job-queue (nio-utils:concurrent-queue) 42 42 "The queue used to hand off work from the NIO thread to an external thread for execution") 43 43 … … 61 61 (defun run-job(&key (blocking t)) 62 62 #+nio-debug (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%") 63 (let ((server-job (nio- compat:take nio-yarpc:job-queue :blocking-call blocking)))63 (let ((server-job (nio-utils:take nio-yarpc:job-queue :blocking-call blocking))) 64 64 (when server-job 65 65 (destructuring-bind (job request-id result-queue) server-job 66 66 #+nio-debug (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job) 67 (nio- compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))))67 (nio-utils:add result-queue (list request-id (nio-yarpc:execute-call job))))))) 68 68 69 69 70 70 (defmethod process-outgoing-packet((sm yarpc-state-machine)) 71 71 #+nio-debug2 (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" ) 72 (let ((server-job (nio- compat:take (result-queue sm) :blocking-call nil)))72 (let ((server-job (nio-utils:take (result-queue sm) :blocking-call nil))) 73 73 (when server-job 74 74 (destructuring-bind (request-id result) server-job … … 79 79 (defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet)) 80 80 #+nio-debug (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call) 81 (nio- compat:add job-queue (list (call-string call) (request-id call) (result-queue sm)))81 (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm))) 82 82 (when +process-jobs-inline+ (run-job :blocking nil))) 83 83 -
TabularUnified branches/home/psmith/restructure/src/utils/concurrent-queue.lisp ¶
r93 r94 26 26 |# 27 27 28 (in-package :nio- compat)28 (in-package :nio-utils) 29 29 30 30 (declaim (optimize (debug 3) (speed 3) (space 0))) … … 33 33 ;Modified from sbcl manual example 34 34 35 35 36 (defclass concurrent-queue() 36 37 ((buffer-queue :initform (sb-thread:make-waitqueue) 37 38 :reader buffer-queue) 39 ; (buffer-queue-mutex :initform (sb-thread:make-mutex :name "buffer queue mutex") 40 ; :reader buffer-queue-mutex) 38 41 (buffer-lock :initform (sb-thread:make-mutex :name "buffer lock") 39 42 :reader buffer-lock) … … 44 47 (make-instance 'concurrent-queue)) 45 48 49 50 46 51 (defmacro pop-elt(a-buffer loc) 47 52 `(if ,a-buffer 48 53 (let ((head (car ,a-buffer))) 49 54 (setf ,a-buffer (cdr ,a-buffer)) 50 #+nio-debug (format t "concurent-queue:take - (~A) read ~A at ~A~%" sb-thread:*current-thread*head ,loc)55 #+nio-debug (format-threadsafe t "concurent-queue:take - (~A,~A) read ~A at ~A~%" sb-thread:*current-thread* (length (buffer queue)) head ,loc) 51 56 head) 52 57 nil)) 53 58 59 54 60 ;Do an (optionally blocking) remove of the element at the head of this queue 55 61 (defmethod take ((queue concurrent-queue) &key (blocking-call t)) 62 #+nio-debug (format-threadsafe t "concurent-queue:take - (~A) attempting to obtain mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue)) 56 63 (sb-thread:with-mutex ((buffer-lock queue)) 64 #+nio-debug (format-threadsafe t "concurent-queue:take - (~A) aquired mutex mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue)) 57 65 ;if its there, pop it 58 66 (let ((ret (pop-elt (buffer queue) "1sttry"))) … … 60 68 ret 61 69 (progn 70 #+nio-debug (format-threadsafe t "concurent-queue:take - (~A) about to wait on queue~%" sb-thread:*current-thread*) 62 71 (sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue)) 72 #+nio-debug (format-threadsafe t "concurent-queue:take - (~A) notified on queue~%" sb-thread:*current-thread*) 63 73 (pop-elt (buffer queue) "2ndtry")))))) 64 74 65 75 ;Append the element to the tail of this queue 66 76 (defmethod add ((queue concurrent-queue) elt) 67 #+nio-debug (format t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt)77 #+nio-debug (format-threadsafe t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt) 68 78 (sb-thread:with-mutex ((buffer-lock queue)) 69 79 (setf (buffer queue) (append (buffer queue) (list elt)) ) 70 (sb-thread:condition-notify (buffer-queue queue)))) 71 80 (sb-thread:condition-broadcast (buffer-queue queue)))) 72 81 73 82 74 83 (defun test-writer(queue) 75 (loop for i from 0 to 999 do 76 (sleep 0.1) 84 (loop for i from 0 to 100 do 85 ; (sleep (random 0.1)) 86 (format-threadsafe t "Adding ~A~%" i) 77 87 (add queue i))) 78 88 79 (defun test-reader(queue) 89 (defun test-reader(queue results) 90 (format-threadsafe t "Started reader ~A~%" sb-thread:*current-thread*) 80 91 (loop 81 (format t "reader on ~A got elt ~A~%" 82 sb-thread:*current-thread* (take queue)))) 92 (let ((elt (take queue))) 93 (push elt results) 94 (format-threadsafe t "reader on ~A got elt ~A~%" 95 sb-thread:*current-thread* 96 results)))) 97 98 (defparameter *results1* (list 999999)) 99 (defparameter *results2* (list 888888)) 83 100 84 101 (defun test-queue() 85 102 (let ((queue (make-instance 'concurrent-queue))) 86 103 (sb-thread:make-thread #'(lambda()(test-writer queue))) 104 ; (sleep 10) 105 (let ((t1 (sb-thread:make-thread #'(lambda()(test-reader queue *results1*))))) 106 ;(t2 (sb-thread:make-thread #'(lambda()(test-reader queue *results2*))))) 107 (sleep 5) ;;wait for it to probably complete 108 (format-threadsafe t "t1 got: ~A~%" *results1*) 109 (format-threadsafe t "t2 got: ~A~%" *results2*) 110 (sb-thread:destroy-thread t1) 111 ; (sb-thread:destroy-thread t2) 112 ) 113 (sb-thread:with-mutex ((buffer-lock queue)) 114 (assert (eql (length (buffer queue)) 0))))) 115 116 (defun test-queue2() 117 (let ((queue (make-instance 'concurrent-queue))) 118 (sb-thread:make-thread #'(lambda()(test-reader queue))) 119 (sb-thread:make-thread #'(lambda()(test-writer queue))) 120 (sb-thread:make-thread #'(lambda()(test-reader queue))) 121 (sb-thread:make-thread #'(lambda()(test-writer queue))) 87 122 (sleep 10) 88 (sb-thread:make-thread #'(lambda()(test-reader queue))) 89 (sb-thread:make-thread #'(lambda()(test-reader queue))))) 123 (format-threadsafe t "running asserts") 124 (sb-thread:with-mutex ((buffer-lock queue)) 125 (assert (eql (length (buffer queue)) 0))))) -
TabularUnified branches/home/psmith/restructure/src/utils/nio-utils-package.lisp ¶
r85 r94 31 31 ;;utils 32 32 format-log get-universal-high-res get-readable-time 33 34 ;;concurrent-queue 35 concurrent-queue add take 36 37 33 38 )) -
TabularUnified branches/home/psmith/restructure/src/utils/nio-utils.asd ¶
r49 r94 7 7 :components ((:file "nio-utils-package") 8 8 (:file "utils" :depends-on ("nio-utils-package")) 9 (:file "concurrent-queue" :depends-on ("utils")) 9 10 ) 10 11 11 :depends-on ( ))12 :depends-on (:nio-compat)) 12 13 -
TabularUnified branches/home/psmith/restructure/src/utils/utils.lisp ¶
r49 r94 60 60 ) 61 61 62 (defparameter *format-mutex* (nio-compat:make-mutex "format lock")) 63 62 64 ;Format the message to destination but prepend a high res time to the message, useful for logging 63 65 (defmacro format-log (destination control-string &rest format-arguments) 64 `(format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) ,@format-arguments)) 66 `(nio-compat:with-mutex (*format-mutex*) 67 (format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) ,@format-arguments)))
Note: See TracChangeset
for help on using the changeset viewer.