183 lines
7.0 KiB
Clojure
Executable File
183 lines
7.0 KiB
Clojure
Executable File
;; Copyright (c) Rich Hickey and contributors. All rights reserved.
|
|
;; The use and distribution terms for this software are covered by the
|
|
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
|
|
;; which can be found in the file epl-v10.html at the root of this distribution.
|
|
;; By using this software in any fashion, you are agreeing to be bound by
|
|
;; the terms of this license.
|
|
;; You must not remove this notice, or any other, from this software.
|
|
|
|
(ns cljs.core.async.impl.channels
|
|
(:require [cljs.core.async.impl.protocols :as impl]
|
|
[cljs.core.async.impl.dispatch :as dispatch]
|
|
[cljs.core.async.impl.buffers :as buffers]))
|
|
|
|
|
|
|
|
(defn box [val]
|
|
(reify cljs.core/IDeref
|
|
(-deref [_] val)))
|
|
|
|
(deftype PutBox [handler val])
|
|
|
|
(defn put-active? [box]
|
|
(impl/active? (.-handler box)))
|
|
|
|
(def ^:const MAX_DIRTY 64)
|
|
|
|
(defprotocol MMC
|
|
(abort [this]))
|
|
|
|
(deftype ManyToManyChannel [takes ^:mutable dirty-takes puts ^:mutable dirty-puts ^not-native buf ^:mutable closed add!]
|
|
MMC
|
|
(abort [this]
|
|
(loop []
|
|
(let [putter (.pop puts)]
|
|
(when-not (nil? putter)
|
|
(let [^not-native put-handler (.-handler putter)
|
|
val (.-val putter)]
|
|
(if ^boolean (impl/active? put-handler)
|
|
(let [put-cb (impl/commit put-handler)]
|
|
(dispatch/run #(put-cb true)))
|
|
(recur))))))
|
|
(.cleanup puts (constantly false))
|
|
(impl/close! this))
|
|
impl/WritePort
|
|
(put! [this val ^not-native handler]
|
|
(assert (not (nil? val)) "Can't put nil in on a channel")
|
|
;; bug in CLJS compiler boolean inference - David
|
|
(let [^boolean closed closed]
|
|
(if (or closed (not ^boolean (impl/active? handler)))
|
|
(box (not closed))
|
|
(if (and buf (not (impl/full? buf)))
|
|
(do
|
|
(impl/commit handler)
|
|
(let [done? (reduced? (add! buf val))]
|
|
(loop []
|
|
(when (and (pos? (.-length takes)) (pos? (count buf)))
|
|
(let [^not-native taker (.pop takes)]
|
|
(if ^boolean (impl/active? taker)
|
|
(let [take-cb (impl/commit taker)
|
|
val (impl/remove! buf)]
|
|
(dispatch/run (fn [] (take-cb val))))
|
|
(recur)))))
|
|
(when done? (abort this))
|
|
(box true)))
|
|
(let [taker (loop []
|
|
(let [^not-native taker (.pop takes)]
|
|
(when taker
|
|
(if (impl/active? taker)
|
|
taker
|
|
(recur)))))]
|
|
(if taker
|
|
(let [take-cb (impl/commit taker)]
|
|
(impl/commit handler)
|
|
(dispatch/run (fn [] (take-cb val)))
|
|
(box true))
|
|
(do
|
|
(if (> dirty-puts MAX_DIRTY)
|
|
(do (set! dirty-puts 0)
|
|
(.cleanup puts put-active?))
|
|
(set! dirty-puts (inc dirty-puts)))
|
|
(assert (< (.-length puts) impl/MAX-QUEUE-SIZE)
|
|
(str "No more than " impl/MAX-QUEUE-SIZE
|
|
" pending puts are allowed on a single channel."
|
|
" Consider using a windowed buffer."))
|
|
(.unbounded-unshift puts (PutBox. handler val))
|
|
nil)))))))
|
|
impl/ReadPort
|
|
(take! [this ^not-native handler]
|
|
(if (not ^boolean (impl/active? handler))
|
|
nil
|
|
(if (and (not (nil? buf)) (pos? (count buf)))
|
|
(let [_ (impl/commit handler)
|
|
retval (box (impl/remove! buf))]
|
|
(loop []
|
|
(when-not (impl/full? buf)
|
|
(let [putter (.pop puts)]
|
|
(when-not (nil? putter)
|
|
(let [^not-native put-handler (.-handler putter)
|
|
val (.-val putter)]
|
|
(when ^boolean (impl/active? put-handler)
|
|
(let [put-cb (impl/commit put-handler)]
|
|
(impl/commit handler)
|
|
(dispatch/run #(put-cb true))
|
|
(when (reduced? (add! buf val))
|
|
(abort this)))))
|
|
(recur)))))
|
|
retval)
|
|
(let [putter (loop []
|
|
(let [putter (.pop puts)]
|
|
(when putter
|
|
(if ^boolean (impl/active? (.-handler putter))
|
|
putter
|
|
(recur)))))]
|
|
(if putter
|
|
(let [put-cb (impl/commit (.-handler putter))]
|
|
(impl/commit handler)
|
|
(dispatch/run #(put-cb true))
|
|
(box (.-val putter)))
|
|
(if closed
|
|
(do
|
|
(when buf (add! buf))
|
|
(if (and (impl/active? handler) (impl/commit handler))
|
|
(let [has-val (and buf (pos? (count buf)))]
|
|
(let [val (when has-val (impl/remove! buf))]
|
|
(box val)))
|
|
nil))
|
|
(do
|
|
(if (> dirty-takes MAX_DIRTY)
|
|
(do (set! dirty-takes 0)
|
|
(.cleanup takes impl/active?))
|
|
(set! dirty-takes (inc dirty-takes)))
|
|
(assert (< (.-length takes) impl/MAX-QUEUE-SIZE)
|
|
(str "No more than " impl/MAX-QUEUE-SIZE
|
|
" pending takes are allowed on a single channel."))
|
|
(.unbounded-unshift takes handler)
|
|
nil)))))))
|
|
impl/Channel
|
|
(closed? [_] closed)
|
|
(close! [this]
|
|
(if ^boolean closed
|
|
nil
|
|
(do (set! closed true)
|
|
(when (and buf (zero? (.-length puts)))
|
|
(add! buf))
|
|
(loop []
|
|
(let [^not-native taker (.pop takes)]
|
|
(when-not (nil? taker)
|
|
(when ^boolean (impl/active? taker)
|
|
(let [take-cb (impl/commit taker)
|
|
val (when (and buf (pos? (count buf))) (impl/remove! buf))]
|
|
(dispatch/run (fn [] (take-cb val)))))
|
|
(recur))))
|
|
nil))))
|
|
|
|
(defn- ex-handler [ex]
|
|
(.log js/console ex)
|
|
nil)
|
|
|
|
(defn- handle [buf exh t]
|
|
(let [else ((or exh ex-handler) t)]
|
|
(if (nil? else)
|
|
buf
|
|
(impl/add! buf else))))
|
|
|
|
(defn chan
|
|
([buf] (chan buf nil))
|
|
([buf xform] (chan buf xform nil))
|
|
([buf xform exh]
|
|
(ManyToManyChannel. (buffers/ring-buffer 32) 0 (buffers/ring-buffer 32)
|
|
0 buf false
|
|
(let [add! (if xform (xform impl/add!) impl/add!)]
|
|
(fn
|
|
([buf]
|
|
(try
|
|
(add! buf)
|
|
(catch :default t
|
|
(handle buf exh t))))
|
|
([buf val]
|
|
(try
|
|
(add! buf val)
|
|
(catch :default t
|
|
(handle buf exh t)))))))))
|