User Tools

Site Tools


study:data_analysis:cascalog

공부할 내용

  1. Cascalog : Cascading + Datalog on Hadoop in Clojure.
    • Cascading : 하둡의 맵리듀스 API를 자바 기반으로 사용 편의성를 위한 API를 제공하는 레이어(layer). 맵리듀스 잡(job)과 작업흐름(workflow) 관리를 지원한다.
    • Datalog : 순수 선언형 로직 언어. Prolog 문법과 같아. SQL보다 더 표현적이어서 데이타 질의 언어로 주로 사용.
    • Cascalog는 Datalog의 데이타 질의 언어를 Clojure의 매크로로 구현해서, Hadoop 처리를 위한 Cascading API를 구동하는 것으로 이해할 수 있다.
    • 트위터의 실시간 분산 처리 시스템인 Storm을 만든 Nathan Marz가 만들었다.
    • 이번 장에서는 주로 Cascalog에 대해 알아본다.
  2. Hadoop : 구글의 맵리듀스를 구현한 빅데이타 처리를 하는 분산 시스템.
    • Cascalog를 위한 설치 및 간단 사용 정도를 알아 본다.
  3. Pallet : 클라우드 상의 서버에 배포하고 관리하는 툴. DevOps for the JVM
    • Cascalog를 위한 간단 사용법 정도를 알아 본다.

Cascalog

프로젝트 구성

  • project.clj
(defproject data-analysis "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :repogitories [["conjars.org" "http://conjars.org/repo"]]             ;;; cascalog는 conjar.org 레포지토리에 있다.
  :dependencies [[org.clojure/clojure "1.5.1"]
                 [cascalog "1.10.2"]
                 [org.slf4j/slf4j-api "1.7.2"]                          ;;; 로그 출력을 위해
                 [org.clojure/tools.nrepl "0.2.3"]]                     ;;; nREPL 서버 구동을 위해
  :profiles
  {:dev
    {:dependencies [[org.apache.hadoop/hadoop-core "1.1.2"]]}}          ;;; 개발시 사용할 하둡 라이브러리 
  :main data-analysis.companions)
  • companions.clj
(ns data-analysis.companions
  (:use cascalog.api
        cascalog.playground
        [clojure.tools.nrepl.server :only (start-server stop-server)])
  (:require [clojure.string :as string]
            (cascalog [workflow :as w]
                      [ops :as c]
                      [vars :as v])))
 
(def input-data
  [{:given-name "Susan", :surname "Forman", :doctors [1]}
   {:given-name "Barbara", :surname "Wright", :doctors [1]}
   {:given-name "Ian", :surname "Chesterton", :doctors [1]}
   {:given-name "Vicki", :surname nil, :doctors [1]}
   {:given-name "Steven", :surname "Taylor", :doctors [1]}
   {:given-name "Katarina", :surname nil, :doctors [1]}
   {:given-name "Sara", :surname "Kingdom", :doctors [1]}
   {:given-name "Dodo", :surname "Chaplet", :doctors [1]}
   {:given-name "Polly", :surname nil, :doctors [1 2]}
   {:given-name "Ben", :surname "Jackson", :doctors [1 2]}
   {:given-name "Jamie", :surname "McCrimmon", :doctors [2]}
   {:given-name "Victoria", :surname "Waterfield", :doctors [2]}
   {:given-name "Zoe", :surname "Heriot", :doctors [2]}
   {:given-name "Brigadier", :surname "Lethbridge-Stewart", :doctors [2]}
   {:given-name "Liz", :surname "Shaw", :doctors [3]}
   {:given-name "Jo", :surname "Grant", :doctors [3]}
   {:given-name "Sarah Jane", :surname "Smith", :doctors [3 4 10]}
   {:given-name "Harry", :surname "Sullivan", :doctors [4]}
   {:given-name "Leela", :surname nil, :doctors [4]}
   {:given-name "K-9 Mark I", :surname nil, :doctors [4]}
   {:given-name "K-9 Mark II", :surname nil, :doctors [4]}
   {:given-name "Romana", :surname nil, :doctors [4]}
   {:given-name "Adric", :surname nil, :doctors [4 5]}
   {:given-name "Nyssa", :surname nil, :doctors [4 5]}
   {:given-name "Tegan", :surname "Jovanka", :doctors [4 5]}
   {:given-name "Vislor", :surname "Turlough", :doctors [5]}
   {:given-name "Kamelion", :surname nil, :doctors [5]}
   {:given-name "Peri", :surname "Brown", :doctors [5 6]}
   {:given-name "Melanie", :surname "Bush", :doctors [6 7]}
   {:given-name "Ace", :surname nil, :doctors [7]}
   {:given-name "Grace", :surname "Holloway", :doctors [8]}
   {:given-name "Rose", :surname "Tyler", :doctors [9 10]}
   {:given-name "Adam", :surname "Mitchell", :doctors [9]}
   {:given-name "Jack", :surname "Harkness", :doctors [9 10]}
   {:given-name "Mickey", :surname "Smith", :doctors [10]}
   {:given-name "Donna", :surname "Noble", :doctors [10]}
   {:given-name "Martha", :surname "Jones", :doctors [10]}
   {:given-name "Astrid", :surname "Peth", :doctors [10]}
   {:given-name "Jackson", :surname "Lake", :doctors [10]}
   {:given-name "Rosita", :surname "Farisi", :doctors [10]}
   {:given-name "Christina", :surname "de Souza", :doctors [10]}
   {:given-name "Adelaide", :surname "Brooke", :doctors [10]}
   {:given-name "Wilfred", :surname "Mott", :doctors [10]}
   {:given-name "Amy", :surname "Pond", :doctors [11]}
   {:given-name "Rory", :surname "Williams", :doctors [11]}
   {:given-name "River", :surname "Song", :doctors [11]}
   {:given-name "Craig", :surname "Owens", :doctors [11]}
   {:given-name "Clara", :surname "Oswald", :doctors [11]}])
 
;;; 닥터후의 상대 배역
(def companion (map string/lower-case
                    (map :given-name input-data)))
 
(def full-name
  (map (fn [{:keys [given-name surname]}]
         [(string/lower-case given-name)
          (string/trim
            (string/join \space [given-name surname]))])
       input-data))
 
;;; 상대 배역과 닥터후의 번호
(def doctor
  (mapcat #(map (fn [d] [(string/lower-case (:given-name %)) d])
                (:doctors %))
          input-data))
 
;;; 닥터후를 연기한 배우
(def actor
  [[1 "William Hartnell" "1963–66"]
   [2 "Patrick Troughton" "1966–69"]
   [3 "Jon Pertwee" "1970–74"]
   [4 "Tom Baker" "1974–81"]
   [5 "Peter Davison" "1981–84"]
   [6 "Colin Baker" "1984–86"]
   [7 "Sylvester McCoy" "1987–89, 1996"]
   [8 "Paul McGann" "1996"]
   [9 "Christopher Eccleston" "2005"]
   [10 "David Tennant" "2005–10"]
   [11 "Matt Smith" "2010–present"]])
 
(defonce server (start-server :port 7888))     ;;; nREPL 서버를 포트 7888에서 구동한다.
 
(defn -main [& args]
  (println "started..."))

소스 다운로드

companions.clj 파일은 이 책 전반에서 사용하는 데이타를 정의하고 있는데 다음 URL에서 다운로드한 것을 약간 변형한 것이다.

http://www.ericrochester.com/clj-data-analysis/data/companions.clj

nREPL 서버 구동

불행하게도 Cascalog는 윈도우즈 환경에서는 실행되지 않는다.(Cynwin으로는 된다) 본 세미나에서는 원격 리죽스 서버에서 Cascalog를 테스트하기 위해 nREPL 서버를 구동한다.

Doctor Who

영국 BBC 방송의 SF TV 프로그램으로 시간 여행을 하는 the Doctor로 불리는 휴머노이드의 모험 이야기. 1963 첫 시리즈가 방송된 이래로 현재까지 총 11째 닥터가 방영되었다.

위 소스에서는 닥터후를 1~11까지 번호로 표기하고 있다.

질의 연산자와 프레디키트(predicate) 연산자

Cascalog는 Datalog에서 사용하는 다음의 질의 연산자와 프레디키트(predicate) 연산자를 사용한다.

  • 질의 연산자
     <-, ?-, ?<-, ??-, ??<- 
  • 프레디키트(predicate) 연산자
     :>, :<, :>>, :<< 

<- 연산자 : 질의 정의 연산자

<- 연산자는 질의 정의 연산자(query creation operator)이다. 출력 변수(output variable) 벡터와 일련의 프레디키트(predicate)를 받아들인다.

다음과 같이 데이타 집합이 있을 때,

(def people [["ben" 35]
             ["jerry" 41]])
;;=> #'user/people           

40살 미만인 사람에 대한 질의는 다음과 같이 정의된다.

(<- [?name ?age]             ;;; 출력 변수 벡터
    (people :> ?name ?age)   ;;; 프레디키트(predicate) 1
    (< ?age 40))             ;;; 프레디키트(predicate) 2
;;=>{:type :generator, :id "34cfa7c4-9e91-4246-b35e-5899fe4f0a1d", :join-set-var nil, :ground? true, :sourcemap {"43de32f4-627f-4460-97af-58b3e4fedf25" #<MemorySourceTap MemorySourceTap["MemorySourceScheme[[UNKNOWN]->[ALL]]"]["/fdfd814c-0ba1-48c0-8f6a-63705d37dccd"]>}, :pipe #<Each Each(aa5b84af-2891-4daa-9dc1-20e5a8f22834)[Identity[decl:ARGS]]>, :outfields ["?name" "?age"], :trapmap {}}

<- 연산자의 첫 인수 [?name ?age]는 출력 변수 벡터이다. 이 벡터안에서 있는 출력 변수는 이후 뒤에 나오는 프레디키트(predicate)들에서 반드시 한 번은 나타나야 한다. 출력 변수는 반드시 '?'으로 시작해야 하는데 이 질의가 묻고자 하는 것을 정의한다. 출력 변수 벡터뒤에 붙는 모든 프레디키트(predicate)들은 이 출력 변수가 어떻게 발생하고 또 어떻게 제한되는가에 대한 정의이다.

(people :> ?name ?age) 프레디키트(predicate)는 출력 변수 ?name과 ?age가 people이라는 데이타 집합의 튜플과 매칭이 된다는 것을 정의한다. 데이타 집합은 항상 벡터로 된 튜플의 벡터이어야 한다. 결국 people의 [“ben” 35], [“jerry” 41] 튜플이 차례로 ?name과 ?age 출력 변수에 묶인다(binding).

:>는 프레디키트(predicate) 연산자로 오른쪽의 변수를 왼쪽의 입력으로 취급한다. 반면 :<는 오른쪽의 변수를 왼쪽의 출력으로 취급한다.

그러나 문맥상 모호하지 않으면 :> 프레디키트(predicate) 연산자는 생략할 수 있다.

(people ?name ?age)

(< ?age 40) 프레디키트(predicate)는 나이가 40 미만인 것만을 걸러낸다.

질의는 그 자체로는 아무것도 하지 못한다. 질의는 그 자체로 다른 질의에 발생기로서 역할을 하거나, 혹은 출력탭(output tap)에 묶여(binding) 질의 수행이 이루어진다.

?- 연산자 : 질의 수행 연산자

?- 는 질의 수행 연산자이다. <outpt tap, query>의 일련의 쌍을 받아, 병행적으로 query를 수행하고 그 결과를 output tab에 출력한다.

(<outpt tap, query>는 Cascading의 'source-pipe-sink' 패러다임의 <sink, source> 쌍에 매칭된다. Cascading에서 pipe는 source나 sink와는 독립적으로 생성되는 데이타 처리 객체인데, pipe에 source와 sink가 붙으면 'flow'라고 불린다. 여러 flow들이 묶여서 'cascade'가 된다.)

  (?- (stdout)                  ;;; output tab, or sink
      (<- [?name ?age]          ;;; query, or source
          (people ?name ?age)
          (< ?age 40)))
;;=> ((["ben" 35]))

(stdout)은 Cascading tap으로 출력 스트림으로 결과를 뿌린다.

tap은 Cascading 용어로 pipe의 양끝에 붙는 source와 sink를 말한다.

파일에 쓰는 tab등 다양한 tap이 있으며, 사용자 tap을 정의할 수도 있다.

?<- 연산자 : 질의 정의 수행 연산자

?<- 연산자는 <-와 ?-의 결합이다.

  (?<- (stdout)
       [?name ?age]
       (people ?name ?age)
       (< ?age 40))
;;=> (["ben" 35])

??- 연산자

??- 연산자는 질의들을 받아 수행한 후 그 각각의 결과를 시퀀스로 리턴한다.

(def results
     (??- (<- [?name ?age]
              (people ?name ?age)
              (< ?age 40))))
 
  user=> results
  ((["ben" 35]))
  (def multi-results
    (??- (<- [?name ?age]
             (people ?name ?age)
             (< ?age 40))
         (<- [?name ?age]
             (people ?name ?age)
             (< ?age 50))))
 
  user=> multi-results
  ((["ben" 35]) (["ben" 35] ["jerry" 41]))

??<- 연산자

??<- 연산자는 질의 수행 결과를 클로져 시퀀스로 리턴한다.

  (def results-??<-
    (??<- [?name ?age]
          (people ?name ?age)
          (< ?age 50)))
 
  user=> results-??<-
  (["ben" 35] ["jerry" 41])

프레디키트(predicate)

질의는 하나 이상의 프레디키트(predicate)로 구성된다. 프레디키트(predicate)는 질의에서 관심을 갖는 출력 변수를 정의하고 제약한다. 프레디키트(predicate)는 3가지 카테고리가 있다.

  • 발생기(Generators) : 질의에서 기본이 되는 데이타 집합. 클로져의 벡터나 CVS, TSV 파일등.
  • 연산(Operations) : 외부에서 정의된 입력 변수를 받아 필터링하거나 새로운 변수에 바인딩하는 함수.
  • 집계기(Aggregators) : Count, sum, min, max 등.

프레디키트(predicate)는 이름, 입력 변수 리스트, 그리고 출력 변수 리스트를 갖는다.

:> 는 프레디키트(predicate) 연산자인데, 입력 변수와 출력 변수를 나누는 역할을 한다. 만일 생략되면 변수는 프레디키트(predicate)가 연산일 경우에는 입력 변수로, 발생기나 집계기일 경우에는 출력 변수가 된다.

(?<- (stdout)                ;;; Cascading sink tap. 질의를 수행한 결과를 표준 출력으로 보낸다.
     [?person ?a2]           ;;; 출력 변수 정의. 이 질의에서 관심을 갖는 출력 변수를 정의한다.
     (age ?person ?age)      ;;; 프레디키트(predicate) 1. 
     (< ?age 30)             ;;; 프레디키트(predicate) 2.
     (* 2 ?age :> ?a2))      ;;; 프레디키트(predicate) 3.
;;=> (["alice" 56] ["david" 50] ["emily" 50] ["gary" 56] ["kumar" 54])

위 질의는 3개의 프레디키트(predicate)로 구성된다.

  1. (age ?person ?age) : “age”는 이 프레디키트(predicate)의 이름이다. age는 클로져 벡터인데 발생기가 된다. “age” 프레디키트(predicate)는 ?person과 ?age 변수를 방사한다.
  2. (< ?age 30) : “<“는 이 프레디키트(predicate)의 이름이다. < 는 클로져 함수인데, 연산이 된다. :>이 생략되었기 때문에 변수들은 입력 변수가 되어, 필터링 작용을 하게 된다.
  3. (* 2 ?age :> ?a2)) : “*”는 이 프레디키트(predicate)의 이름이다. < 는 클로져 함수인데, 연산이 된다. :>이 있어서 2 와 ?age는 입력으로 ?a2 는 출력 변수가 된다.

변수는 항상 '?'나 '!'로 시작되는 클로져 심볼이어야 한다. 만일 출력 변수의 값을 무시하고 싶을 때는 심볼 '_' 를 사용할 수 있다.

상수 치환(constant substitution)

변수위치에서 상수가 사용될 수 있는데, 이를 상수 치환이라고 한다.

  (* 4 ?v2 :> 100)

위의 경우 '4 * v2 = 100'을 만족하는 v2 만을 필터링하는 연산 프레디키트(predicate)가 된다.

발생기(Generator)

Cascalog 발생기는 질의에서 하나의 리스트가 된다.이 리스트는 튜플의 리스트이다. 클로져에서 튜플은 벡터로 표현되고 발생기는 벡터의 벡터로 표현된다. 질의에서 발생기 다음에 오는 것이 발생기 var인데, 이것은 발생기의 튜플의 필드와 같은 수의 출력 변수들이다.

  (<- [?a ?b]
      (generator :> ?a ?b))

:» 프레디키트(predicate) 연산자를 사용하면 다음과 같이 할 수 있다.

  (def output-variables ["?a" "?b"])
 
  (<- output-variables 
      (generator :>> output-variables))

:> 가 없으면 출력 변수는 연산자에 대한 입력으로 그리고 발생기에 대해서는 출력으로 취급된다.

  (<- [?a ?b] (generator-seq ?a ?b))

<br>

Cascalog는 3가지 발생기 유형이 있다.

  • 클로져 시퀀스 : 클로져 시퀀스는 가장 단순한 발생기이다. 테스트 용도로 사용하기에 좋다.
  (def generator-seq [["a" 1]
                      ["b" 2]])
 
  (?<- (stdout) [?a ?b] (generator-seq :> ?a ?b))
;;=> (["a" 1] ["b" 2])
  • ← 연산자로 정의된 기존 질의 : 기존의 질의는 그 자체로 다른 질의에 대해 발생기가 될 수 있다.
  (let [subquery (<- [?a ?b] (generator-seq ?a ?b))]
    (?<- (stdout) [?also-a ?also-b]
        (subquery ?also-a ?also-b)))
;;=> (["a" 1] ["b" 2])
  • Cascading Tap : Cascading Tap 발생기는 다양한 입력 소스를 튜플 형태의 데이타로 처리해 준다. 예를 들어 hfs-textline 함수는 파일을 읽어 각 라인을 1-tuple로 처리한다.
  (let [text-tap (hfs-textline "/some/textfile.txt")]
    (?<- (stdout) [?textline]
        (text-tap ?textline)))

집계기(Aggregator)

먼저 다음 코드를 보자.

(?<- (stdout) [?count] 
              (age _ ?a) 
              (< ?a 30)
              (c/count ?count))
;;=> ([5])              

위 질의는 30 미만의 사람들의 수를 세는 것이다.

c/count는 Cascalog의 ops 이름공간에 선언되어 있는 연산 함수이다. 이 함수는 레코드의 개수를 센다.

사실 집계기는 MapReduce의 Reduce 과정을 수행하는 것이다.

(?<- (stdout) [?person ?count] 
              (follows ?person _)
              (c/count ?count))
;;=> (["alice" 3] ["bob" 3] ["david" 2] ["emily" 4] ["george" 1] ["harold" 1] ["luanne" 2])              

위 코드는 follows 데이타 집합에서 같은 사람이 나타나는 빈도를 출력한다.

사용자 정의 연산

사용자가 정의한 연산을 질의에서 사용할 수 있다.

다음은 글 속에서 단어들의 출현 빈도를 세는 질의이다.

(defmapcatop split [sentence]
  (seq (.split sentence "\s+")))
 
(?<- (stdout) [?word ?count] 
              (sentence ?s)
              (split ?s :> ?word) 
              (c/count ?count))       

이 질의를 위해 split이라는 연산을 정의하였다. 연산을 정의하기 위해 매크로 defmapcatop 사용되었는데 다음과 같다.

  • defmapcatop : 입력을 받고 0개 이상의 튜플을 출력한다.
  • deffilterop : 필터링 연산을 정의할 때 사용하는 것으로 불린값을 출력한다.
  • defmapop : 단일 튜플을 출력한다.
  • defaggregateop : 집계기를 정의한다.

하지만 보통의 클로져 함수도 연산으로 사용될 수 있다.

(defn lowercase [w] (.toLowerCase w))
 
(?<- (stdout) [?word ?count] 
              (sentence ?s) 
              (split ?s :> ?word1)
              (lowercase ?word1 :> ?word) 
              (c/count ?count))

Join

내부 조인

아래 코드는 두 데이타 소스에서 내부 조인을 한다.

(?<- (stdout) [?person ?age ?gender]
              (age ?person ?age) 
              (gender ?person ?gender))
;;=> (["alice" 28 "f"] ["bob" 33 "m"] ["chris" 40 "m"] ["david" 25 "m"] ["emily" 25 "f"] ["gary" 28 "m"] ["george" 31 "m"] ["luanne" 36 "f"])              

age와 gender는 RDBMS에서의 테이블과 같은데, 컬럼에 해당하는 ?person으로 age의 ?age와 gender의 ?gender를 조인하고 있다.

아래 코드는 외부 조인을 한다.

(?<- (stdout) [?person !!age !!gender]
              (age ?person !!age) 
              (gender ?person !!gender))
;;=> (["alice" 28 "f"] ["bob" 33 "m"] ["chris" 40 "m"] ["david" 25 "m"] ["emily" 25 "f"] ["gary" 28 "m"] ["george" 31 "m"] ["harold" nil "m"] ["kumar" 27 nil] ["luanne" 36 "f"])

외부 조인을 위해서 변수는 !!으로 시작한다. 이러한 변수를 “ungrounding variables”라고 하고, ungrouding variables를 갖는 프레디키트(predicate)를 “unground predicate”이라고, 없는 것을 “ground predicate”이라고 한다. 2개의 unground predicate을 조인하는 것을 완전 외부 조인이라하고 반면에 ground predicate을 unground predicate에 조인하는 것을 left join(왼쪽 조인?)이라고 한다.

다음은 left join(왼쪽 조인?) 예제 코드이다.

(?<- (stdout) [?person1 !!person2]
              (person ?person1) 
              (follows ?person1 !!person2))
;;=> (["alice" "david"] ["alice" "bob"] ["alice" "emily"] ["bob" "david"] ["bob" "george"] ["bob" "luanne"] ["chris" nil] ["david" "alice"] ["david" "luanne"] ["emily" "alice"] ["emily" "bob"] ["emily" "george"] ["emily" "gary"] ["gary" nil] ["george" "gary"] ["harold" "bob"] ["kumar" nil] ["luanne" "harold"] ["luanne" "gary"])              

위 코드는 모든 사람에 대한 follow 관계를 출력하는데, 만일 follow 관계가 없으면 nil이 된다.

아래 코드는 follow 관계를 갖지 않는 사람들만 구한다.

(?<- (stdout) [?person]
              (person ?person) 
              (follows ?person !!p2) 
              (nil? !!p2))
::=> (["chris"] ["gary"] ["kumar"])

이제 여기서 각 사람에 대해서 follow관계의 수를 세어보자. 보통의 “count” 집계기(aggregator)는 여기서는 문제가 되는데 왜냐하면 그것은 모든 것을 다 세기 때문이다. null이든 non-null이든! 여기서는 follow 관계가 없으면 0, 있으면 1로 세어야 한다. Cascalog는 이것을 위해 !count 집계기를 마련하고 있다.

(?<- (stdout) [?person ?count]
              (person ?person) 
              (follows ?person !!p2) 
              (c/!count !!p2 :> ?count))
;;=> (["alice" 3] ["bob" 3] ["chris" 0] ["david" 2] ["emily" 4] ["gary" 0] ["george" 1] ["harold" 1] ["kumar" 0] ["luanne" 2])

결합자(Conbiners)와 병행 집계기(Parallel Aggregators)

정렬(Sorting)

중복 제거

Hadoop

Pallet

study/data_analysis/cascalog.txt · Last modified: 2019/02/04 14:26 (external edit)