-
Notifications
You must be signed in to change notification settings - Fork 1
/
stream.lisp
59 lines (54 loc) · 2.42 KB
/
stream.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
;;;; stream.lisp
(in-package #:cl-multiagent-system)
(defparameter *default-buffer-size* 65536)
(defparameter *default-buffer-element-type* '(unsigned-byte 8))
(defun make-stream-sender (serializer stream stream-lock)
(let (buffer)
(lambda (id msg)
(when (open-stream-p stream)
(multiple-value-bind (new-buffer serialized-p)
(funcall serializer id msg buffer)
(setf buffer new-buffer)
(when serialized-p
(bt:with-lock-held (stream-lock)
(write-sequence buffer stream))
t))))))
(defun make-stream-receiver
(deserializer stream stream-lock
&optional (buffer-size *default-buffer-size*)
(buffer-element-type *default-buffer-element-type*))
(let ((buffer (make-array `(,buffer-size) :element-type buffer-element-type))
(buffer-lock (bt:make-lock)) (buffer-start 0) (buffer-end 0)
buffer-changed)
(values
;; receive bytes from stream
(lambda ()
(bt:with-lock-held (stream-lock)
(when (listen stream)
(bt:with-lock-held (buffer-lock)
(let ((old-end buffer-end))
(setf buffer-end (read-sequence buffer stream
:start buffer-end)
buffer-changed (or buffer-changed
(/= buffer-end old-end))))))))
;; deserialize received bytes
(lambda ()
(bt:with-lock-held (buffer-lock)
(when buffer-changed
(setf buffer-changed nil)
(multiple-value-bind (id msg deserialized-length)
(funcall deserializer buffer buffer-start buffer-end)
(when (and deserialized-length (plusp deserialized-length))
(incf buffer-start deserialized-length)
(cond
((= buffer-start buffer-end)
(setf buffer-start 0 buffer-end 0))
((and (= buffer-end buffer-size) (plusp buffer-start))
(decf buffer-end buffer-start)
(dotimes (i buffer-end)
(setf (aref buffer i) (aref buffer (+ i buffer-start))))
(setf buffer-start 0))
((and (= buffer-end buffer-size) (zerop buffer-start))
(setf buffer-size (* 2 buffer-size))
(adjust-array buffer `(,buffer-size))))
(values t id msg)))))))))