Changeset 94 for branches/home/psmith


Ignore:
Timestamp:
02/22/07 22:50:56 (18 years ago)
Author:
psmith
Message:

moved threadsafe queue and added more tests.

Location:
branches/home/psmith/restructure/src
Files:
9 edited
1 moved

Legend:

Unmodified
Added
Removed
  • TabularUnified branches/home/psmith/restructure/src/compat/nio-compat-package.lisp

    r62 r94  
    3131             ;; errno.lisp
    3232             get-errno +ERRNO_EAGAIN+ perror
    33              
    34              ;;concurrent-queue
    35              concurrent-queue add take
     33             ;;threading
     34             with-mutex make-mutex
    3635             ))
  • TabularUnified branches/home/psmith/restructure/src/compat/nio-compat.asd

    r62 r94  
    77    :components ((:file "nio-compat-package")
    88                 (:file "errno" :depends-on ("nio-compat-package"))
    9                  (:file "concurrent-queue" :depends-on ("nio-compat-package"))
     9                 (:file "threading" :depends-on ("nio-compat-package"))
    1010                 )
    1111
  • TabularUnified branches/home/psmith/restructure/src/io/nio-server.lisp

    r91 r94  
    3535
    3636;TODO thread safety
    37 (defparameter +connected-sockets-queue+ (nio-compat:concurrent-queue)
     37(defparameter +connected-sockets-queue+ (nio-utils:concurrent-queue)
    3838  "List of node objects that are to be connected to")
    3939
     
    156156;add outgoing sockets to event queue
    157157#+nio-debug2     (format-log t "nio-server:start-server - Processing new connections queue ~A~%" +connected-sockets-queue+)
    158              (loop for node = (nio-compat:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
     158             (loop for node = (nio-utils:take +connected-sockets-queue+ :blocking-call nil) until (null node) do
    159159#+nio-debug       (format-log t "nio-server:start-server - adding node to nodes-list ~A~%" node)
    160160                  (push node *nodes-list*))
     
    193193
    194194(defun add-connection(node)
    195   (nio-compat:add +connected-sockets-queue+ node))
     195  (nio-utils:add +connected-sockets-queue+ node))
  • TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/nio-yarpc.asd

    r40 r94  
    1111                 )
    1212
    13     :depends-on (:nio :nio-sm :nio-compat))
     13    :depends-on (:nio :nio-sm :nio-utils))
  • TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-client-state-machine.lisp

    r84 r94  
    3434;;
    3535(defclass yarpc-client-state-machine (state-machine)
    36   ((job-queue :initform (nio-compat:concurrent-queue)
     36  ((job-queue :initform (nio-utils:concurrent-queue)
    3737              :accessor job-queue
    3838              :documentation "The queue used to hand off work from an external thread to the io thread")
     
    7575(defmethod process-outgoing-packet((sm yarpc-client-state-machine))
    7676#+nio-debug  (format-log t "yarpc-client-state-machine:process-outgoing-packet called, polling the job-queue ~%")
    77   (let ((ttd (nio-compat:take (job-queue sm) :blocking-call nil)))
     77  (let ((ttd (nio-utils:take (job-queue sm) :blocking-call nil)))
    7878    (when ttd
    7979#+nio-debug      (format-log t "yarpc-client-state-machine:process-outgoing-packet got job ~A ~%" ttd)
     
    9595(defmethod remote-execute ((sm yarpc-client-state-machine) call-string callback)
    9696#+nio-debug  (format-log t "yarpc-client-state-machine:remote-execute called :sm ~A :call-string ~A :callback ~A~%" sm call-string callback)
    97   (nio-compat:add (job-queue sm) (list (remote-job callback) call-string)))
     97  (nio-utils:add (job-queue sm) (list (remote-job callback) call-string)))
  • TabularUnified branches/home/psmith/restructure/src/protocol/yarpc/yarpc-state-machine.lisp

    r93 r94  
    3535(defclass yarpc-state-machine (state-machine)
    3636  (
    37    (result-queue :initform (nio-compat:concurrent-queue)
     37   (result-queue :initform (nio-utils:concurrent-queue)
    3838                 :accessor result-queue
    3939                 :documentation "The queue used to return results from an external thread to the nio thread")))
    4040
    41 (defparameter job-queue (nio-compat:concurrent-queue)
     41(defparameter job-queue (nio-utils:concurrent-queue)
    4242  "The queue used to hand off work from the NIO thread to an external thread for execution")
    4343
     
    6161(defun run-job(&key (blocking t))
    6262#+nio-debug  (format-log t "yarpc-state-machine:run-job - Server toplevel waiting for job~%")
    63   (let ((server-job (nio-compat:take nio-yarpc:job-queue :blocking-call blocking)))
     63  (let ((server-job (nio-utils:take nio-yarpc:job-queue :blocking-call blocking)))
    6464    (when server-job
    6565      (destructuring-bind (job request-id result-queue) server-job
    6666#+nio-debug     (format-log t "yarpc-state-machine:run-job - Server received job ~A~%" job)
    67         (nio-compat:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
     67        (nio-utils:add result-queue (list request-id (nio-yarpc:execute-call job)))))))
    6868
    6969
    7070(defmethod process-outgoing-packet((sm yarpc-state-machine))
    7171#+nio-debug2  (format-log t "yarpc-state-machine:process-outgoing-packet - called, polling the results-queue ~%" )
    72   (let ((server-job (nio-compat:take (result-queue sm) :blocking-call nil)))
     72  (let ((server-job (nio-utils:take (result-queue sm) :blocking-call nil)))
    7373    (when server-job
    7474      (destructuring-bind (request-id result) server-job
     
    7979(defmethod process-incoming-packet ((sm yarpc-state-machine) (call call-method-packet))
    8080#+nio-debug  (format-log t "yarpc-state-machine:process-incoming-packet - called :sm ~A :packet ~A~%" sm call)
    81   (nio-compat:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
     81  (nio-utils:add job-queue (list (call-string call) (request-id call) (result-queue sm)))
    8282  (when +process-jobs-inline+ (run-job :blocking nil)))
    8383
  • TabularUnified branches/home/psmith/restructure/src/utils/concurrent-queue.lisp

    r93 r94  
    2626|#
    2727
    28 (in-package :nio-compat)
     28(in-package :nio-utils)
    2929
    3030(declaim (optimize (debug 3) (speed 3) (space 0)))
     
    3333;Modified from sbcl manual example
    3434
     35
    3536(defclass concurrent-queue()
    3637  ((buffer-queue :initform (sb-thread:make-waitqueue)
    3738                 :reader buffer-queue)
     39;   (buffer-queue-mutex :initform (sb-thread:make-mutex :name "buffer queue mutex")
     40;                      :reader buffer-queue-mutex)
    3841   (buffer-lock :initform (sb-thread:make-mutex :name "buffer lock")
    3942                :reader buffer-lock)
     
    4447  (make-instance 'concurrent-queue))
    4548
     49
     50
    4651(defmacro pop-elt(a-buffer loc)
    4752  `(if ,a-buffer
    4853       (let ((head (car ,a-buffer)))
    4954         (setf ,a-buffer (cdr ,a-buffer))
    50 #+nio-debug (format t "concurent-queue:take - (~A) read ~A at ~A~%" sb-thread:*current-thread* head ,loc)
     55#+nio-debug (format-threadsafe t "concurent-queue:take - (~A,~A) read ~A at ~A~%" sb-thread:*current-thread* (length (buffer queue)) head ,loc)
    5156         head)
    5257       nil))
    5358
     59
    5460;Do an (optionally blocking) remove of the element at the head of this queue
    5561(defmethod take ((queue concurrent-queue) &key (blocking-call t))
     62#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) attempting to obtain mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue))
    5663  (sb-thread:with-mutex ((buffer-lock queue))
     64#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) aquired mutex mutex ~A~%" sb-thread:*current-thread* (buffer-lock queue))
    5765    ;if its there, pop it
    5866    (let ((ret (pop-elt (buffer queue) "1sttry")))
     
    6068          ret
    6169          (progn
     70#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) about to wait on queue~%" sb-thread:*current-thread*)
    6271            (sb-thread:condition-wait (buffer-queue queue) (buffer-lock queue))
     72#+nio-debug (format-threadsafe t "concurent-queue:take - (~A) notified on queue~%" sb-thread:*current-thread*)
    6373            (pop-elt (buffer queue) "2ndtry"))))))
    6474
    6575;Append the element to the tail of this queue
    6676(defmethod add ((queue concurrent-queue) elt)
    67 #+nio-debug (format t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt)
     77#+nio-debug (format-threadsafe t "concurent-queue:add - (~A) adding ~A~%" sb-thread:*current-thread* elt)
    6878  (sb-thread:with-mutex ((buffer-lock queue))
    6979    (setf (buffer queue) (append (buffer queue) (list elt)) )
    70     (sb-thread:condition-notify (buffer-queue queue))))
    71 
     80    (sb-thread:condition-broadcast (buffer-queue queue))))
    7281
    7382
    7483(defun test-writer(queue)
    75   (loop for i from 0 to 999 do
    76        (sleep 0.1)
     84  (loop for i from 0 to 100 do
     85;       (sleep (random 0.1))
     86       (format-threadsafe t "Adding ~A~%" i)
    7787       (add queue i)))
    7888       
    79 (defun test-reader(queue)
     89(defun test-reader(queue results)
     90  (format-threadsafe t "Started reader ~A~%" sb-thread:*current-thread*)
    8091  (loop
    81        (format t "reader on ~A got elt ~A~%"
    82                sb-thread:*current-thread* (take queue))))
     92     (let ((elt (take queue)))
     93       (push elt results)
     94       (format-threadsafe t "reader on ~A got elt ~A~%"
     95               sb-thread:*current-thread*
     96               results))))
     97
     98(defparameter *results1* (list 999999))
     99(defparameter *results2* (list 888888))
    83100
    84101(defun test-queue()
    85102  (let ((queue (make-instance 'concurrent-queue)))
    86103    (sb-thread:make-thread #'(lambda()(test-writer queue)))
     104;    (sleep 10)
     105    (let ((t1 (sb-thread:make-thread #'(lambda()(test-reader queue *results1*)))))
     106          ;(t2 (sb-thread:make-thread #'(lambda()(test-reader queue *results2*)))))
     107      (sleep 5) ;;wait for it to probably complete
     108      (format-threadsafe t "t1 got: ~A~%" *results1*)
     109      (format-threadsafe t "t2 got: ~A~%" *results2*)
     110      (sb-thread:destroy-thread t1)
     111;      (sb-thread:destroy-thread t2)
     112)
     113    (sb-thread:with-mutex ((buffer-lock queue))
     114      (assert (eql (length (buffer queue)) 0)))))
     115
     116(defun test-queue2()
     117  (let ((queue (make-instance 'concurrent-queue)))
     118    (sb-thread:make-thread #'(lambda()(test-reader queue)))
     119    (sb-thread:make-thread #'(lambda()(test-writer queue)))
     120    (sb-thread:make-thread #'(lambda()(test-reader queue)))
     121    (sb-thread:make-thread #'(lambda()(test-writer queue)))
    87122    (sleep 10)
    88     (sb-thread:make-thread #'(lambda()(test-reader queue)))
    89     (sb-thread:make-thread #'(lambda()(test-reader queue)))))
     123    (format-threadsafe t "running asserts")
     124    (sb-thread:with-mutex ((buffer-lock queue))
     125      (assert (eql (length (buffer queue)) 0)))))
  • TabularUnified branches/home/psmith/restructure/src/utils/nio-utils-package.lisp

    r85 r94  
    3131             ;;utils
    3232             format-log get-universal-high-res get-readable-time
     33             
     34             ;;concurrent-queue
     35             concurrent-queue add take
     36
     37             
    3338             ))
  • TabularUnified branches/home/psmith/restructure/src/utils/nio-utils.asd

    r49 r94  
    77    :components ((:file "nio-utils-package")
    88                 (:file "utils" :depends-on ("nio-utils-package"))
     9                 (:file "concurrent-queue" :depends-on ("utils"))
    910                 )
    1011
    11     :depends-on ())
     12    :depends-on (:nio-compat))
    1213
  • TabularUnified branches/home/psmith/restructure/src/utils/utils.lisp

    r49 r94  
    6060  )
    6161
     62(defparameter *format-mutex* (nio-compat:make-mutex "format lock"))
     63
    6264;Format the message to destination but prepend a high res time to the message, useful for logging
    6365(defmacro format-log (destination control-string &rest format-arguments)
    64   `(format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) ,@format-arguments))
     66  `(nio-compat:with-mutex (*format-mutex*)
     67     (format ,destination (concatenate 'string "~A - " ,control-string) (get-readable-high-res-time) ,@format-arguments)))
Note: See TracChangeset for help on using the changeset viewer.