User Tools

Site Tools


lecture:core.async:설명

설명

비동기적인 로직은 동기적으로 보이는 코드로 작성하도록 한다.

채널

  • 채널은 큐와 비슷하다.
  • 채널을 데이타가 통해 전달된다. (어디서 어디로? 생산자에서 소비자로)
  • 기본적으로 채널은 버퍼가 없다. (즉 버퍼 크기가 0)
  • 데이타의 생산자와 소비자는 채널을 통해 데이타를 전송함으로써 서로 만난다.
    • 이런 식으로 서로 분리된다. 독립성 획득.
  • 채널을 통한 … 새로운 프로그래밍 방식.
  • go 블럭은 비동기적 연산을 동기적으로 기술할 수 있도록 함으로써 콜백 지옥을 탈출할 수 있게 해준다.
  • go 블럭은 항상 채널을 리턴한다.

비동기 라이브러리

project.clj

[org.clojure/core.async “0.1.242.0-44b1e3-alpha”]

API 사용

(require '[clojure.core.async :as async :refer :all])

채널의 생성

버퍼없는 채널 생성

(chan)   ; Unbuffered channel.

송신자가 channel에 값을 쓰면, 수신자가 그 값을 읽을 때까지 대기. 수신자는 송신자가 값을 쓸 때까지 대기.

고정 크기 버퍼의 채널 생성

(chan 10) ; Channel with 10 slots. FIFO.

버퍼가 비어있으면 수신자 대기, 버퍼가 꽉차면 송신자 대기.

dropping 버퍼 채널 생성

버퍼가 꽉차면 최신값을 버린다.

(chan (dropping-buffer 10))

sliding 버퍼 채널 생성

버퍼가 꽉차면 가장 오래된 값을 버린다.

(chan (sliding-buffer 10))

채널 닫기

(let [c (chan)]
  (close! c))

채널이 닫히면…

  • 채널은 더 이상 입력을 받아들이지 않는다.
  • 채널에 남아있는 데이타를 take 연산으로 출력된다.
  • 채널에 데이타가 없으면 take 연산은 nil을 반환한다.
  • 따라서 nil은 채널을 통해 전달될 수 없다!

일반 스레드용 채널 연산

  • >!! : blocking put. 기다리다 (채널에) 넣기. 대기 입력.
  • <!! : blocking take. 기다리다 (채널에서) 빼오기. 대기 출력.
(let [c (chan 10)]
  (>!! c "hello")  ;; => 대기(blocking)
  (assert (= "hello" (<!! c)))
  (close! c))

버퍼없는 채널에 put 을 하면 주 스레드가 대기 상태가 된다.

thread나 future를 사용하여 스레드 풀 스레드에서 코드를 수행하여 채널에 데이타를 넣고, 그 채널을 리턴하면, 주 스레드에서 그 채널에서 데이타를 빼오는 방식이다.

(let [c (chan)]
  (thread (>!! c "hello"))
  (assert (= "hello" (<!! c)))
  (close! c))

go 블럭과 IOC 스레드

  • go 매크로는 바디를 특수 스레드 풀에서 실행한다.
    • go 블럭은 경량(lightweight) 스레드.
    • 경량 스레드는 일반 스레드와 1:1 매칭이 아니다.
    • clojure의 경우 시스템 core + 2의 스레드 풀 사용.
    • clojurescript의 경우 이벤트 루프 사용.
  • 채널 대기 연산들은 다른 스레드는 방해하지 않고, 자신의 실행은 중단하게 된다.
  • 이러한 메카니즘은 이벤트/콜백 시스템에는 외재적인 제어의 역전을 캡슐화한다.
  • go 블럭 안에서는 >! (put) 과 <! (take)를 사용한다.
(let [c (chan)]
  (go (>! c "hello"))
  (assert (= "hello" (<!! (go (<! c)))))
  (close! c))

일반 스레드를 사용하여 대기 상태에 들어가기 보다는 생산자를 위해서는 go 블럭을 사용한다. 소비자도 go 블럭을 사용하여 데이타를 빼오는데, 이때는 대기 상태가 될 수 있다.

alts

  • 채널이 큐보다 좋은 점은 동시에 여러 개의 채널을 기다릴 수 있다는 점이다. (마치 소켓의 select 처럼)
  • 이를 위한 연산은 alts!! (일반 스레드용), alts! (go 블럭용)이다.
(let [c1 (chan)
       c2 (chan)]
  (thread (while true
               (let [[v ch] (alts!! [c1 c2])]
                 (println "Read" v "from" ch))))
  (>!! c1 "hi")
  (>!! c2 "there"))

go 블럭에서는 alts!을 사용한다.

(let [c1 (chan)
      c2 (chan)]
  (go (while true
        (let [[v ch] (alts! [c1 c2])]
          (println "Read" v "from" ch))))
  (go (>! c1 "hi"))
  (go (>! c2 "there")))

go 블럭은 스레드에 묶여있지 않은 경량 프로세스이기 때문에, 얼마든지 생성해도 된다. 다음은 1000개의 go 블럭을 만들어 내는 코드이다.

(let [n 1000
      cs (repeatedly n chan)
      begin (System/currentTimeMillis)]
  (doseq [c cs] (go (>! c "hi")))
  (dotimes [i n]
    (let [[v c] (alts!! cs)]
      (assert (= "hi" v))))
  (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))

timeout 은 지정한 시간만큼 기다렸다가 종료하는 채널을 만든다.

(let [t (timeout 100)
      begin (System/currentTimeMillis)]
  (<!! t)
  (println "Waited" (- (System/currentTimeMillis) begin)))

timeout을 alts! 와 결합하면 채널을 기다라는 시간 제한을 할 수 있다.

(let [c (chan)
      begin (System/currentTimeMillis)]
  (alts!! [c (timeout 100)])
  (println "Gave up after" (- (System/currentTimeMillis) begin)))
lecture/core.async/설명.txt · Last modified: 2019/02/04 14:26 (external edit)