ก้าวแรก HPC #08: Apache Spark กับงาน Big Compute

เกริ่นนำ

เราเดินทางตามสายธารของเวลา จากก้าวแรก มาถึงบทความนี้เป็นก้าวทีแปดแล้ว ผมวางพื้นทฤษฎี เครื่องไม้เครื่องมือ ตลอดจนโจทย์ทดสอบที่ใช้งานมาตามลำดับเป็นที่เรียบร้อย มาบทความนี้เราจะมาเรียนรู้การประมวลผลแบบขนานกัน มารีดกำลังของเครื่องคอมพิวเตอร์ของเราให้เต็มที่ เรื่องการประมวลผลแบบขนานนั้นมีรายละเอียดค่อนข้างมาก ในบทนี้ผมจึงขอแตะแค่ชั้นผิวของมันโดยเสพอาหารกล่องสำเร็จรูป เพียงอุ่นเล็กน้อยก็ทานได้เลย ยี่ห้อ Apache Spark

ภาคประวัติ

Search Engine ของ Google นั้นนับได้ว่าเป็น Search Engine ที่ดีที่สุดในโลก (ก็ว่าได้) Google Search สามารถค้นหาเอกสารที่มีอยู่มหาศาลใน Internet ได้อย่างรวดเร็ว Google ทำได้อย่างไร ทำไมถึงเก่งกาจขนาดนั้น ความลับนี้ถูกเปิดเผยในงานประชุมวิชาการว่าด้วยเรื่องการออกแบบและสร้างระบบปฏิบัติการ (OSDI : Operating Systems Design and Implementation) ปี 2004 ซึ่งก็เป็นบทความชื่อว่า MapReduce : Simplified Data Processing on Large Clusters บทความมีเพียง 13 หน้าเท่านั้นครับ

คนทั่วไปอ่านบทความดังกล่าวก็คงไม่ได้เรื่องอะไร แต่มีคนหนึ่งครับชื่อว่า Doug Cutting เป็นหนึ่งในทีมพัฒนาของ Apache Foundation ผู้ผลิต Apache ซึ่งเป็น webserver ที่มียอดผู้ใช้งานสูงสุดนั่นเอง Doug Cutting เอาแนวคิดในบทความดังกล่าวไปสร้างเป็น Software ที่ชื่อว่า Hadoop (ตั้งชื่อตามช้างสีเหลืองของเล่นของลูกชาย) ต่อมา Yahoo เห็นผลงานเข้าตาจึงว่าจ้างให้ Doug Cutting พัฒนา Hadoop ต่อเพื่อปรับปรุงประสิทธิภาพของ search engine ของตนให้เก่งขึ้น ผลลัพธ์ก็คือ search engine ของ Yahoo ในเวลานั้นทำงานโดยใช้ Hadoop ทำงานแบบ cluster ขนาด 10,000 cores  จากนั้น Yahoo ก็ปล่อย source code ของ Hadoop ให้เป็นของสาธารณะและกลับสู่อ้อมอกของ Apache อีกครั้ง

Hadoop มันก็ช้างดีๆ นั่นเอง ใหญ่ๆ ชอบ เล็กๆ ไม่ไหว เพราะการทำงานอิงกับ I/O เป็นหลักใช้หน่วยความจำค่อนข้างน้อย ดังนั้น Apache จึงอุดรอยรั่วนี้โดยการออก software อีกตัวที่เน้นการประมวลผลในหน่วยความจำ ตั้งชื่อว่า Spark และโปรแกรมนี้ดังขึ้นเรื่อยๆ จนกลายเป็นก็แรงขึ้นเรื่อยๆ จนกลายเป็นโปรเจค opensource ที่มีความคึกคักสูงสุด

ในงาน Google I/O ปีนี้ (2014) ประกาศเทคโนโลยีที่ชื่อว่า Dataflow เพื่อทดแทน MapReduce ผมยังไม่มีข้อมูลอะไรมากนักสำหรับเรื่องนี้ คงต้องไปถามเซียน Big Data ดูครับ เราก็ยังคงอยู่กับ MapReduce ต่อไป

วิเคราะห์ชื่อบทความของ Google

ชื่อบทความ MapReduce : Simplified Data Processing on Large Clusters ผมว่าน่าสนใจนะครับ เรามาลองวิเคราะห์กันหน่อยดีกว่า เริ่มจากข้างหลัง  “large clusters” คำว่า cluster หมายถึงการเชื่อมเอาเครื่องคอมพิวเตอร์หลายๆ เครื่องเข้าด้วยกันและทำงานเสมือนว่าเป็นเครื่องคอมพิวเตอร์เพียงเครื่องเดียว ก็ต่อเครื่องคอมพิวเตอร์หลายๆ ตัวเข้าด้วยกันก็ไม่ใช่เรื่องพิเศษแต่อย่างใด ก็ LAN ธรรมดานั่นเอง แต่ที่ขึดเส้นใต้เน้นเอาไว้นั่นเป็นประเด็นที่น่าสนใจครับ เครื่องพวกนี้ต้องทำงานเสมือนว่าเป็นเครื่องคอมพิวเตอร์เพียงเครื่องเดียว แน่นอนครับมันคือการประมวลผลแบบขนาน ซึ่งเป็นหัวใจหลักของเว็ปนี้อยู่แล้ว และในบทความก่อนๆ ผมก็เอา Banana Pi 3 ตัวมาลง Lubuntu กลายเป็นคอมพิวเตอร์ 3 เครื่องที่อิสระต่อกัน วันนี้หละครับ ผมจะทำให้มันเสมือนว่าเป็นคอมพิวเตอร์เพียงเครื่องเดียว ทำให้กลายเป็น cluster

จากคำท้ายมาดูที่คำข้างหน้าครับ MapReduce แยกออกเป็น 2 คำ Map กับ Reduce คุ้นหูไหมครับ เคยได้ยินที่ไหนไหมครับที่เอาคำทั้งสองคำนี้มาอยู่ด้วยกัน ใช่แล้วครับไม่ต้องไปหาที่ไหนไกล ในเว็ปของผมก็พูดถึง Functional Programming #5: Filter-Map-Reduce บังเอิญไหมครับ เรื่องนี้ไม่ใช่เรื่องบังเอิญอย่างแน่นอน Google เอาคำว่า MapReduce มาจาก Functional Programming นั่นเอง ซึ่งหมายความว่าแนวทางการเขียนโปรแกรมค้นหาของทั้ง MapReduce ของ Google, Hadoop และ Spark นั้น ล้วนแล้วแต่ใช้ Functional Programming และโดยเฉพาะอย่างยิ่งเน้นที่คำสั่ง Map และ Reduce นั่นเอง

 พื้นฐาน Apache Spark

 

spark overview

 

จากกระดานดำเราเห็นว่าการแบ่งส่วนการทำงานของ Spark แบ่งออกได้เป็น 3 ส่วน ส่วนขวาสุดเรียกว่า worker ซึ่ง worker คือเครื่องคอมพิวเตอร์หนึ่งเครื่องนั่นเอง ถ้าเครื่องคอมพิวเตอร์นั้นมีหลาย CPU หรือหลายคอร์ Spark ก็รีดกำลังมาใช้ได้ทั้งหมด โดยไม่ต้องแก้โปรแกรมแต่อย่างใด ของผมเป็น Banana Pi 3 เครื่อง ก็คือมี worker 3 ตัว แต่ละเครื่องมี 2 คอร์ ดังนั้นผมจึงมีตัวประมวลผล worker 6  ตัว worker นี้ โดยปกติแล้วทำหน้าที่เป็น server ตั้งขึ้นมาเหมือน SQL Server ถ้ามีงานเข้ามาก็ทำและส่งผลลัพธ์กลับ งานหมดก็นั่งรอ

ถัดมาตรงกลางเราเรียกว่า Master หรือถ้าว่ากันตามหน้าที่แล้วก็คือ cluster manager พูดภาษาง่ายๆ ก็คือเป็นผู้กระจายงานนั่นเอง และเป็นตัวประมวลผลในส่วนที่ต้องประมวลผลด้วยตัวประมวลผลเดี่ยว

สุดท้ายคือ client ก็เป็นตัวที่ส่งโปรแกรมไปยัง master หรือทำตัวเป็น REPL นั่นเอง  เครื่องคอมพิวเตอร์ทั้งสามแบบนี้ลงโปรแกรมชุดเดียวกันหมดครับ ไม่ได้แยกว่านี่คือโปรแกรม worker นี่คือ master หรือ client ซึ่งการทำแบบนี้ทำให้การติดตั้งโปรแกรมทำได้ง่ายมาก เพียงลงโปรแกรมชุดเดียวกันในแต่ละเครื่องโดยไม่สนใจว่าทำหน้าที่อะไร สลับหน้าที่กันได้ตลอดเวลา

สรุปการทำงาน เริ่มจากผู้ใช้มาที่เครื่อง client  มองง่ายๆ  client นั้นอาจจะเป็น REPL หรือเป็นโปรแกรมก็ได้ เลือกเขียนใช้ภาษาได้สามภาษาคือ Scala, Java และ Python ในบทความนี้ผมใช้ Python สมมติว่าผมเขียนโปรแกรมชื่อ hello.py ผมอยู่ที่เครื่อง client ผมเชื่อมต่อกับ master ได้โดยใช้คำสั่ง

เมื่อรันคำสั่งนี้ Spark จะส่ง hello.py ที่อยู่ที่ client ไปยัง master ที่ชื่อว่า sawtooth ซึ่ง port มาตรฐานก็คือ 7077  เมื่อ master ได้งานไปก็จะเป็นคนรันโปรแกรม ดังนั้นคนเหนื่อยคือ master และถ้าในโปรแกรมมีการใช้ชนิดตัวแปร RDD และใช้ชุดคำสั่งชุด Transformations master จะสะสมงานเอาไว้ใน RDD และเมื่อพบคำสั่งชุด Actions (รายละเอียดเกี่ยวกับชุดคำสั่ง Transformations และ Actions ดูได้จากข้างล่างครับ) จะเกิดการระเบิดกระจายงานไปสู่ worker แต่ละตัว ที่อยู่ในสังกัดพร้อมกับส่งโปรแกรม hello.py ให้ไปด้วย ดังนั้น master ในกรณีนี้คือทำตัวเป็น scheduler นั่นเอง เมื่อ workers ทำงานเสร็จก็จะส่งผลลัพธ์กลับมา master ก็จะทำงานต่อจนเสร็จ และส่งผลลัพธ์กลับมายัง client  ซึ่งความรู้สึกของผู้ใช้งาน client ก็จะรู้สึกคล้ายๆ ว่าโปรแกรมนี้ทำงานโดยการใช้ ipython ทำงานบนเครื่องตนเองไม่ได้ออกไปไหน

จากรูปบนหน้ากระดานดำ ถ้าทำตามรูปเลย ก็คือเราจะต้องมีคอมพิวเตอร์ 5 เครื่อง เป็น worker 3 เครื่อง เป็น client และ master อย่างละเครื่อง แต่เราสามารถยุบรวมกันได้อย่างอิสระครับ จะยุบหน้าที่ไหนรวมกับหน้าที่ไหนก็ได้ จะยุบให้ทุกอย่างรวมอยู่ในเครื่องเดียวกันก็ได้ ซึ่งผมจะใช้การรวมในเครื่องเดียวกันในการทดสอบการติดตั้ง Spark จากนั้นผมจะขยาย โดยให้ node01 (banana pi) เป็ทั้ง client, master และ  worker ส่วน node02 และ node03 ทำหน้าที่เป็น worker แต่อย่างเดียว

อยากให้สังเกตดูครับ โปรแกรมของเรานั้นอยู่แค่ภายใน hello.py เท่านั้น อยู่บนเครื่อง client แต่การทำงานแบบ cluster มองเครื่องคอมพิวเตอร์หลายๆ ตัวกลายเป็นเครื่องเดียว โดยที่เราไม่ต้องไปเขียนโปรแกรมของเราอยู่บนเครื่อง master หรือ workers แต่อย่างใด เรามองเห็นเหมือนเป็นเครื่องของเราเพียงเครื่องเดียว ดังนั้นไม่ว่าเราจะเพิ่ม workers เป็นล้านตัวเข้าสู่ระบบ การสั่งงานก็ยังสั่งเพียงแค่คำสั่งนี้เท่านั้นครับ ได้มนตราของคำว่า “มองเสมือนเป็นเครื่องเดียว”

การติดตั้ง Spark

Spark นั้นเขียนด้วยภาษา Scala ซึ่งภาษา Scala อาศัย JVM ดังนั้นเราต้องลงทั้ง Java และ Scala  ถ้าใครใช้ *buntu ก็สามารถประยุกต์ขั้นตอนการลงได้จาก ก้าวแรก HPC #03: การสร้าง Cluster ส่วนตัวราคาถูกและดี  การลงบนเครื่อง Windows  ลองประยุกต์จากบทความดังกล่าวดูครับ น่าจะตรงไปตรงมาไม่ยุ่งยาก แต่การ Windows ก็ทดสอบการทำงานบนเครื่องเดียวได้ครับ แต่เมื่อกระจายข้ามเครื่องจะทำหน้าที่เป็น client ได้อย่างเดียว (หรือแค่ผมทำไม่เป็นเองครับ)

แหย่ขาเข้าสู่ Spark

เป็นที่น่าอึดอัดใจเป็นอย่างยิ่งครับที่ Spark นั้นรองรับเพียง Python2 ดังนั้นเราต้องปรับมาเป็น Python2 ก่อนเริ่มใช้ Spark ถ้าท่านใช้ anaconda และลงโปรแกรมตามที่ผมแนะนำในบทความ ก้าวแรก HPC #1: Python ท่านเพียงแค่ใช้คำสั่งข้างล่างนี้ใน shell

แต่ถ้าท่านใช้ Windows ก็ตัดคำว่า source ทิ้งไป สั่งแค่ activate python2 เท่านั้น

การใช้งาน Spark นั้น เราควรย้ายตำแหน่งปัจจุบันไปยัง home directory ของ Spark และแช่อยู่ตรงนั้น ซึ่งตลอดบทความนี้ผมจะแช่ตำแหน่งปัจจุบันไว้ที่นี่เท่านั้น คำสั่งที่เราเรียกใช้ spark นั้นสำหรับ client แล้วจะอยู่ใน bin ซึ่งง่ายๆ ครับ เราจะใช้เพียงแค่ 2 คำสั่งเท่านั้น นั่นคือ bin/pyspark เป็นการเข้าสู่ REPL และ bin/spark-submit เป็นการส่งแฟ้มไปยัง master ดังที่อธิบายไว้แล้วข้างต้น

ในบริบทของการใช้งาน ถามว่า REPL และ โปรแกรมต่างกันตรงไหน หลักๆ ไม่ต่างกัน เพียงแค่ใน REPL ก็มีตัวแปรตัวหนึ่งแถมมาให้ชื่อว่า sc  ซึ่ง sc จะเป็นหัวขั้วของคำสั่งของ Spark ทั้งหมด ดังนั้นถ้าเราเขียนเป็นโปรแกรม ทำงานไม่ผ่าน REPL เราก็สามารถสร้างตัวแปร sc นี้เองได้ง่ายๆ ดังนี้ครับ

โดยที่ test1 เป็นชื่อของ app ของเรา ตั้งชื่ออะไรก็ได้ ในเมื่อเรามี sc แล้ว ต่อจากนี้ REPL และโปรแกรมก็ไม่มีอะไรต่างกันแล้ว ผมก็จะไม่แยกอีกต่อไป ใครใคร่ใช้ REPL ก็เชิญ ใครอยากเขียนเป็นโปรแกรมก็ได้

การสร้าง RDD

โปรแกรมของท่าน ท่านเขียนเป็นแบบ Python ได้ 100% ทำงานได้ตามปกติ แต่ถ้าท่านอยากดึงความสามารถของ Spark มาใช้ ท่านต้องใช้ sc เพื่อสร้าง RDD ขึ้นมาครับ RDD ย่อมาจาก Resilient Distributed Dataset แปลเป็นภาษาไทยแบบรู้เรื่องว่า เป็นชุดของข้อมูลกระจายตัวแบบอึดๆ การกระจายตัวก็คงเข้าใจกันดีอยู่แล้วว่าเป็นการกระจายข้ามเครื่องไปสู่ workers ต่างๆ ส่วนคำว่าอึดในที่นี้หมายถึงเฮียบรูซ วิลลิส แกอึดตายยาก ในที่นี่คือ แม้ว่า worker บางตัวลาโลกไประหว่างทำงาน ระบบก็ยังอึดไม่ยอมตาย master สามารถโอนงานไปให้ worker ตัวอื่นทำต่อได้ และยังได้ผลลัพธ์ที่ถูกต้องเหมือนเดิม เรามาดูคำสั่งหัวขั้วที่ต่อมาจาก sc กัน หลักๆ แล้วมีสองคำสั่งคือ sc.parallelize() และ sc.textFile()  ผลลัพธ์ของทั้งสองคำสั่งนี้คือ RDD นั่นเอง

sc.parallelize()

เป็นการแปลง list หรือ range ปกติของ Python ให้กลายเป็น RDD คำสั่งนี้ถือเป็นทางเข้าหลักสำหรับงาน Big Compute เลยทีเดียว พอคุ้นๆ ไหมครับ เราสามารถนำเอา list ของจำนวนเฉพาะ 587 ตัวในบทความที่แล้วมาแปลงเป็น RDD ได้โดยตัว  มาลองดูตัวอย่างกันครับ

 sc.textFile()

เป็นการแปลง text file ให้กลายเป็น RDD โดยที่ หนึ่งหน่วยใน RDD ก็คือ 1 บรรทัดนั่นเอง ตัวอย่างเช่น

คำสั่งนี้เป็นทางเข้าหลักของงาน Big Data ซึ่งไม่ใช่เนื้อหาหลักของเว็ปนี้ ผมเลยขอข้ามไปครับ

คำสั่งที่ใช้กับ RDD

เราได้ RDD มาแล้วจากคำสั่งข้างบน RDD นั้นไม่ได้เป็นเพียงแค่โครงสร้างข้อมูลเท่านั้น แต่ยังสามารถที่ผูกเอาคำสั่งปฏิบัติการต่างๆ ใส่ลงไปใน RDD ได้ด้วย คำสั่งเหล่านี้เขียนอยู่ในรูปของ Functional Programming นั่นเอง  เราสามารถสั่งคำสั่งต่อเนื่องเป็นหางว่าวได้ เพื่อให้ workers ทำงานตามลำดับในภายหลัง   เนื่องจาก RDD มีความสามารถ laziness เหมือนภาษา FP ชั้นดีทั่วไป กล่าวคือเมื่อเราสั่งคำสั่งไปเรื่อยๆ RDD แทนที่จะทำงานกลับขี้เกียจ เก็บคำสั่งหางว่าวเอาไป และเมื่อจวนตัวจริงๆ แล้วค่อยโยนออกไปให้ workers ซึ่ง เมื่อ worker ได้รับทั้งข้อมูลที่ master แจกจ่ายมาให้พร้อมกับชุดคำสั่งหางว่าวให้กระทำ ดังนั้น workers ก็ทำไป ทำงานเสร็จก็ส่งข้อมูลกลับมายัง master เพื่อรวบรวม

คำสั่งใน RDD แบบออกเป็นสองกลุ่มใหญ่ๆ กลุ่มแรกเรียกว่า Transformations กลุ่มที่สองเรียกว่า Actions  ซึ่ง Spark นั้นใช้ MapReduce ดังนั้น คำสั่งในชุด Transformations คือการ map นั่นเอง และคำสั่งในชุด Actions ก็คือการ reduce อย่างไม่ต้องสงสัย

Transformations

เป็นชุดคำสั่งที่กระทำกับสมาชิกแต่ละตัวใน RDD ชื่อก็บอกแล้วว่าแปรรูปข้อมูล ข้อที่น่าสังเกตคือ ถ้าเรามีข้อมูล 10 ตัว หลังจากทำคำสั่งใดคำสั่งหนึ่งในชุดนี้เสร็จ หน้าตาข้อมูลจะถูกแปรรูปเปลี่ยนไป แต่ผลลัพธ์ก็ยังคงมี 10 ตัวเช่นเดิม  ความพิเศษของคำสั่งชุดนี้ก็คือ มีคุณสมบัติในการทำ laziness สามารถสั่งหลายๆ คำสั่งเป็นหางว่าวได้ โดยที่ Spark เพียงแต่รับทราบแต่ยังไม่ปฏิบัติ ผมขอยกมาแค่ 2 คำสั่งที่ใช้บ่อยนั่นคือ map() และ filter() ลองดูตัวอย่างกันเลยดีกว่าครับ

ถ้าใครมีความรู้เรื่อง Functional Program (อาจจะอ่านจากบทความของผมก่อนหน้า) ท่านจะคุ้นเคยกับคำสั่งเหล่านี้เป็นอย่างดี จะต่างกว่าปกติเล็กน้อยตรงที่ ทุกคำสั่งต้องเรียกผ่าน rdd1 และทุกคำสั่งส่งค่ากลับออกมาเป็น RDD เช่นเดิม

คำสั่งยังมีอีกหลายคำสั่งนะครับ ส่วนมากเอาไว้สำหรับงาน Big Data ใครต้องการศึกษาเพิ่มเติม ไปที่ https://spark.apache.org/docs/latest/programming-guide.html

Actions

คำสั่งชุด action นี้เป็นชุดที่สั่งให้ Spark ทำงานทันทีไม่มีการขี้เกียจอีกต่อไป เมื่อสั่งคำสั่งชุด actions นี้เพียงคำสั่งเดียว rdd จะถูกแบ่งส่วนและกระจายไปยัง workers ต่างๆ และในแต่ละ worker ก็จะกระทำคำสั่ง transformations ที่สะสมเอาไว้ ผมนำเสนอในที่นี้เพียงสองคำสั่งที่ใช้บ่อยสำหรับงาน Big Compute ก็แล้วกัน นั่นคือ reduce() และ collect()

คำสั่ง reduce() ก็คงไม่ต้องอธิบายมากนะครับ มันคือ reduce() ของ Functional Programming โดยตรงเลย ถ้าเราเขียน

กับอีกคำสั่งคือ collect() คือคำสั่งเอาผลลัพธ์ปัจจุบันใน RDD รวบเป็น list เช่น

จำได้ใช่ไหมครับว่า rdd1 ในขั้นตอนสุดท้ายได้ [2, 8, 18, 32, 50]  สั่ง collect() ก็จะได้เป็น list ตามที่เห็นแบบ Python ปกติ คราวนี้เรามาลองดูเวลารันโปรแกรมจริงบ้าง ผมใช้ REPL ตัว pyspark พิมพ์คำสั่ง บรรทัดต่อบรรทัดเลยครับ

prompt ของ REPL คือ >>> ดังนั้นสิ่งที่ผมพิมพ์จะอยู่หลัง >> สังเกตนะครับ บรรทัด 10-13 พอกด enter ก็ตอบสนองทันที Spark รับรู้คำว่าเราต้องการทำอะไร แต่ก็ยัง lazy อยู่ครับเพราะเป็นคำสั่งชุด Transformations พอเจอคำสั่งบรรทัดที่ 14 เท่านั้น อย่างกับป่าช้าแตก อะไรไม่รู้เต็มจอไปหมดครับ ของจริงเป็นร้อยบรรทัดนะครับ ผมตัดเอามาแค่บางส่วนให้พอมองเห็นภาพ สุดท้ายแล้วก็ได้คำตอบเก็บไว้ที่ ans ที่ถูกต้องนั่นคือ 110 นั่นเอง

การเชื่อมคำสั่งเป็นลูกโซ่

มีข้อน่าสังเกตอยู่ข้อหนึ่งครับ คำสั่งชุด Transformations นั้น ทุกคำสั่งส่งค่ากลับมาเป็น RDD ทั้งสิ้น ผิดกับชุด Actions จะมีการส่งค่ากลับมาหลากหลาย แต่ไม่ใช่ RDD ดังนั้น แทนที่จะเขียนแยกกัน เราสามารถเขียนคำสั่ง Transformations ต่อกันเป็นขบวนรถไฟแล้วปิดท้ายขบวนด้วย Actions เสร็จสรรพในบรรทัดเดียวได้ดังนี้

ตัวอย่างโปรแกรม

เพื่อให้เข้าใจการทำงาน ผมเลยเอาโปรแกรมที่ใช้หาค่า Bernoulli Number โดยใช้อัลกอริทึมของ John Bernoulli เอง ซึ่งรายละเอียดสามารถอ่านเพิ่มเติมได้ที่ ก้าวแรก HPC #04 : ตำนาน Note G ผมถอดออกมาจาก sagemath ลองดูเลยครับ

 สิ่งที่น่าสนใจของโปรแกรมนี้คือ โปรแกรมนี้สร้าง RDD 2 ตัวครับ ตัวแรก b_rdd เป็นการหาค่า binomial เก็บเอาไว้  ส่วน RDD ตัวที่ 2 นั้นชื่อ p_rdd เริ่มจากการแกะค่าสัมประสิทธิของสูตรออกมา จากนั้นหารด้วยพจน์แรก แล้วก็ zip RDD 2 ตัวเข้าด้วยกันโดยการหาร เป็นอันเสร็จพิธีครับ

การรันโปรแกรมนั้นใช้ spark-submit ดังนี้

ก่อนรันโปรแกรมอย่าลืมนะครับ copy bern_orig.py ไปยัง home directory ของ Spark เสียก่อน  คำตอบที่ได้เป็น list นี้ [1, 1/2, 1/6, 0, -1/30, 0, 1/42, 0, -1/30, 0, 5/66, 0, -691/2730, 0, 17/510, 0, 43867/798, 0, -174611/330, 0]  เป็นคำตอบที่ถูกต้อง เทียบกับ Wikipedia ของ Bernoulli Number ได้ครับ

ความแคลงใจเมื่อเทียบกับ Scala

ผมกำลังเอา Spark ไปใช้งานกับโปรแกรม Bernoulli ในบทก่อนๆ ดูว่าเมื่อขยายสเกลเป็นหลายตัวประมวลผลและหลายเครื่องแล้วประสิทธิภาพเป็นอย่างไร ด้วยความสัตย์จริงผมเองก็ยังไม่เคยทดสอบมาก่อนแบบจับเวลามาก่อน สารภาพตามตรง วีธีการเขียนบทความของผม ผมใช้วิธีเขียนบทความบทต่อบท จริงๆ แล้วหัวข้อต่อหัวข้อด้วยซ้ำไป การเขียนโปรแกรมส่วนมากก็เขียนกันสดๆ เมื่อหัวข้อเหล่านั้นอ้างถึง ผมยังไม่รู้หรอกว่าวิธีไหนเขียนแล้วเร็ววิธีไหนช้า เรามารู้พร้อมๆ กันครับ ดังนั้น ณ เวลาที่เขียนอยู่ตรงนี้ ผมยังไม่ได้ทดสอบ Spark กับอัลกอริทึมของ Dr. Harvey เลย ว่ามันดีหรือไม่ดีอย่างไร ผมทำไปเขียนไปครับ

สิ่งหนึ่งที่ผมเกรงก็คือ ถ้าผลการทำงานออกมาปรากฏว่า Spark ทำงานช้า (แต่อาจจะเร็วก็ได้) บางท่านอาจบอกว่า ปัญหาไม่ใช่อยู่ที่ Spark แต่หากเป็นที่ตัวภาษาก็ได้ Python มันช้าเอง ถ้าเปลี่ยนเป็น Java หรือ Scala ก็น่าจะเร็วขึ้น ผมเองก็คงไม่เพิ่ม Java หรือ Scala เข้ามาในเว็ปนี้ เพราะมันจะบวมใหญ่ เขียนไปเขียนมาไปได้ไม่ถึงไหน เอาอย่างนี้ครับ ผมลองเขียนอัลกอริทึมของ Dr. Harvey เอาไว้โดยใช้ Scala ลองทำงานเทียบกับ Python ดู วัดได้แค่ไหนแค่นั้นครับ จะได้มีฐานเวลาในการวัด ส่วน Java ก็คงขอข้ามไปครับ เพราะ Java และ Scala นั้นน่ามีประสิทธิภาพพอๆ กัน  แล้วต่อไปเราจะไม่ไปยุ่งกับภาษาสองภาษานี้อีกตลอดบทความชุดนี้ครับ

ทำงานบน Banana Pi ได้ผลลัพธ์ดังนี้ครับ

b1000_scala

ผลลัพธ์ออกมาทำเอาผมอึ้งเหมือนกันเพราPython ใช้เวลาแค่ 1:45 นาที ซึ่งเร็วกว่า Scala เป็นเท่าตัว ผมยังไม่มีเวลาไปตรวจสอบจริงจัง อาจจะเป็นเพราะผมเขียนเวอร์ชัน Scala ไม่ดีก็ได้ แต่ถ้าให้เดาจริงๆ แล้ว ผมว่าตัวตัดสินอยู่ที่ library ในการจัดการเลขจำนวนเต็มขนาดใหญ่ ทาง JVM น่าจะทำได้ยังไม่ค่อยดี แค่คาดเดานะครับ ไม่ใช่ข้อสรุป 

Java เองก็เคยทำ benchmark ออกครับว่าการคำนวณทางคณิตศาสตร์ทำได้เร็วกว่า C++ เสียอีก ซึ่งเท่าที่ผมดูก็น่าจะเป็นการประมวลผลตัวเลขทศนิยม floating point ซึ่งในบทความชุดนี้เราแทบไม่ได้ใช้เลยครับ เอาไว้ในบทความชุดต่อไป ผมจะทดสอบให้เห็นชัดๆ กันไปเลยครับ

Spark กับการทดสอบครั้งแรก

เราจะปรับโปรแกรมให้รองรับกับ Spark ซึ่งปรับเฉพาะในส่วนของฟังก์ชัน distribute() เท่านั้น (ใช้แนวทางการแตก distribute() แบบ C++ ในบทก่อนหน้า) นอกนั้นเขียนเหมือนเดิมทุกประการดังนี้

หลักการทำงานก็ง่ายๆ ครับ เอา primeList ที่ส่งเป็นพารามิเตอร์เข้ามา นำเข้าสู่ Spark โดยการใช้ sc.paralleize(primeList)  ได้ออกมาเป็น RDD จากนั้นผมใช้ฟังก์ชัน map() เพื่อแปลง p ให้กลายเป็น tuple (computeBkModP(p, k), p) ซึ่งเป็นรูปแบบเดิมนำไปใช้งานต่อได้ จากนั้น ก็เรียก collect() เพื่อรวบออกมาเป็น list ที่แก้ก็มีเท่านี้ครับ จากนั้นผมก็เพิ่ม ส่วนของ main เข้าไปเพื่อบังคับให้มันหาค่าเฉพาะ B(1000) จะได้กลายเป็นโปรแกรมที่สมบูรณ์รันได้ ดังนี้ครับ

จากนั้นผมนำเอาไปทดสอบกับ Banana Pi จะได้อ้างอิงเวลาที่เป็นมาตรฐานเดียวกัน เนื่องจาก Spark เป็นอาหารกล่องสำเร็จรูป ดังนั้นในเครื่องมีตัวประมวลผลกี่ตัว มันดึงมาใช้หมดโดยอัตโนมัติ ถ้าคุณเขียน

แต่ถ้าท่านต้องการให้ทำงานตามจำนวนตัวประมวลผลที่ท่านต้องการ ท่านสามารถระบุได้โดย

จากตัวอย่างข้างบนนี้ จะมีกี่ตัวประมวลก็ช่าง ใช้แค่ตัวเดียว Banana Pi นั้น SoC A20 มี 2 คอร์ ดังนั้น ถ้าเราไม่ระบุ  –master  ก็ย่อมหมายถึง –master=local[2] นั่นเอง ผมทดลองบน Banana Pi ได้ผลลัพธ์ดังแฟ้ม HPCThai/introHPC/08/spark_single_machine.txt ขอให้กดไปดูครับ

คำตอบนั้นถูกต้องแน่นอน แต่เรื่องเวลาที่ใช้น่าสนใจครับ ในบทความ ก้าวแรก HPC #06: อัลกอริทึมประสิทธิภาพสูงและการปรับปรุงประสิทธิภาพ ผมจับเวลาอัลกอริทึมนี้ ทำงาน B(1000) ได้ 1:45 นาที (105 วินาที) ซึ่งผมใช้เป็นฐานเวลา ทำงานคอร์เดียว ส่วน Spark นั้นได้ผลลัพธ์ 2:04  นาที (124 วินาที) สำหรับการทำงานคอร์เดียวและ 1:44 นาที (104 วินาที) เมื่อทำงาน 2 คอร์

ตัวเลขดังกล่าวน่าสนใจมากครับ มันบอกอะไรเราได้บ้าง อย่างแรกก็คือ Spark มีค่าโสหุ้ย (overhead) พอสมควรประมาณ 18% เมื่อเทียบเวลากันที่คอร์เดียวไม่ใช้ Spark อย่างที่สองก็คือ สุภาษิตของเราได้รับการพิสูจน์แล้วว่า “สองหัวดีกว่าหัวเดียว” แม้ว่าจะดีกว่ากันแค่ 1 วินาทีก็ตาม  อย่างสุดท้ายอย่างที่สาม สำคัญที่สุดครับ ก็คือ ก็เพิ่มสองคอร์ไม่ได้ทำให้งานเร็วขึ้นเป็นสองเท่า เราใช้กฏของอู๋มาพิสูจน์กันครับ ถ้าเร็วขึ้นเป็นสองเท่าจริง จะได้ 1 มิฉะนั้นจะลดหลั่นกันไปครับ แทนค่าตามกฏของอู๋จะได้

\frac{N(T_{1}-T_{N})}{T_{1}(N-1)}=\frac{2(124-104)}{124(2-1)}= 0.32258

ตัวเลข 0.32258 นี่ต่ำมากครับ การเพิ่มมาอีกหนึ่งคอร์นี้แทบไม่มีประโยชน์ ยิ่งถ้าเราวิเคราะห์กันจริงๆ แล้ว เราต้องเทียบเวลาที่ใช้ Spark 2 คอร์กับเวลาที่ใช้หนึ่งคอร์แบบปกติที่ไม่ได้ใช้ Spark ถ้าเอามาเทียบกับจะได้

\frac{N(T_{1}-T_{N})}{T_{1}(N-1)}=\frac{2(105-104)}{105(2-1)}= 0.01923

อันนี้ยิ่งต่ำติดดินเข้าไปใหญ่ ก็รู้ๆ กันนะครับว่าไม่ต้องเข้าสูตรใดๆ ให้วุ่นวาย ตัวเลขดิบมันฟ้องอยู่แล้ว เพิ่มหนึ่งคอร์ เร็วขึ้นแค่ 1 วินาที แบบนี้ไม่ต้องเพิ่มจะดีกว่า ประหยัดพลังงานได้ตั้งเยอะ แต่อย่าเพิ่งด่วนสรุปนะครับ ผมมี banana pi อยู่ 3 บอร์ด นับเป็นคอร์ได้ 6 คอร์ เดี๋ยวเรามาลองดูกันครับ สยายปีก Spark ให้เต็มที่ดูว่าจะไปได้ไกลแค่ไหน

 Spark สยายปีก

จุดเด่นของ Spark ก็คือเราสามารถเพิ่ม hardware ได้เรื่อยๆ จะเป็นการเพิ่มจำนวนตัวประมวลผลในเครื่องเดียวกันหรือเพิ่มเครื่องคอมพิวเตอร์ใหม่เข้าไปในระบบก็ได้ เพราะการทำงานของ Spark นั้นใช้ network เป็นหลักอยู่แล้ว การเพิ่มเครื่องคอมพิวเตอร์เข้าไปในระบบก็เพียงแก้ config เล็กน้อยเท่านั้น ตัวโปรแกรมนั้นไม่ต้องแก้แม้แต่บรรทัดเดียวเลย ใช้ของเดิมได้เลย

master และ worker นั่นเชื่อมด้วย ssh แบบไม่มีรหัสผ่าน รายละเอียดการติดตั้งอยู่ใน ก้าวแรก HPC #03: การสร้าง Cluster ส่วนตัวราคาถูกและดี ผมตั้งใจให้ node01 เป็นทั้ง client และ master เพื่อความเรียบง่าย ผมเลยอยู่ที่  Spark home directory ของเครื่อง node01 แล้วพิมพ์

เมื่อเราทำเช่นนี้ได้ เราทดสอบการใช้งานด้วย

เราต้องสามารถ login เข้าเครื่องทั้ง 3 เครื่องได้โดยไม่ต้องใส่ทั้ง username และ password ถ้าทำสำเร็จก็จะใช้งานได้ เพราะ Spark ก็ใช้วิธีนี้ในการเข้าถึงตัว worker เช่นกัน ขั้นต่อไปเราเพียงแค่บอก spark ว่า workers คือเครื่องใดบ้าง ง่ายนิดเดียวครับ แก้แฟ้ม conf/slaves ระบุชื่อ worker หนึ่งเครื่องต่อหนึ่งบรรทัด ดังตัวอย่างนี้ครับ

เชื่อไหมครับ การ conf ทำง่ายๆ แค่นี้ครับ จากนั้นสิ่งที่เราทำก็คือการ start server นั่นเอง ซึ่งคำสั่งจัดการกับ server (สำหรับ master และ workers) จะอยู่ใน directory sbin ครับ ถ้าเราใช้คำสั่ง

แค่นี้ครับ script มันจะไล่ start service ของ master (ซึ่งก็คือเครื่อง node01) และไล่ start service ของ worker (node01, node02, node03) เมื่อทุกอย่างเสร็จเรียบร้อยแล้ว ทางตัว master จะเชื่อมพอร์ทให้เราสองพอร์ทครับคือ 8080 เป็น web server ให้เราเข้าไปดูสถานะของระบบโดยรวม และ 7077 สำหรับเชื่อมต่อกับ client ลองเข้าเว็ป http://node01:8080 จะเห็นประมาณนี้ครับ

spark_web_start

เห็นชัดเจนครับคือมี Worker ในสังกัดอยู่ 3 ตัว ซึ่งทั้ง 3 ตัว อยู่ในสถานะ ALIVE มีชีวิตอยู่ รอคอยงานที่จะทำ แต่ละตัวมี 2 คอร์แต่ว่างยังไม่มีการใช้งาน และจองหน่วยความจำเอาไว้ 512MB (ฺbanana pi มีหน่วยความจำทั้งหมด 1GB) เรามาลองรันโปรแกรมที่ node01 โดยพิมพ์

ระหว่างรันโปรแกรม ถ้าเราแอบดูที่พอร์ท 8080 จะเห็น

spark_web_run

เห็นได้ชัดครับว่าคอร์ทั้ง 6 คอร์ถึงดึงมาใช้งานอย่างเต็มที่ ชื่องาน bern_spark เป็นชื่อที่เราตั้งใน appname เวลาเขียนโปรแกรมนั่นเอง ผลลัพธ์ที่ได้ไม่ค่อยดีครับ อาจเป็นเพราะมันต้องเสียเวลามาประมวลผล web serverให้ผมแอบดู ผมเลยตัดสินใจหยุดการทำงานของ server โดย

แล้วเริ่มกระบวนการใหม่ นั่งนิ่งๆ ไม่ทำอะไร รอจนได้คำตอบ ผลลัพธ์ก็ได้เท่าเดิมคือ

b1000_spark_6_cores

ผลลัพธ์ที่ได้คือ 2:16 นาทีหรือ (136 วินาที) แย่กว่าทำงานคอร์เดียวที่ใช้เวลาเพียง 124 วินาทีเสียอีก!!! เกิดอะไรขึ้น ค่อนข้างน่าสนใจครับ เรื่องนี้ผมคิดว่าประเด็นสำคัญมันอยู่ที่โสหุ้ย (overhead) นั่นเอง เมื่อเราเปิดการทำงานของ Spark แบบเต็มรูปนั้น มีการใช้ network ซึ่งตัว network เองย่อมต้องช้ากว่าความเร็วของหน่วยความจำแรมอย่างแน่นอน ทำให้การทำงานช้าลง แต่ท่านอาจจะแย้งว่า แล้วการกระจายคอร์ ถึง 6 คอร์ ช่วยกันทำแล้วทำไมถึงสู้ตัวเดียวไม่ได้ ดูไม่สมเหตุสมผลเลย ผมก็เลยอยากให้พิจารณากราฟจากบทก่อนหน้า ตัวนี้ครับ

tgraph

ปัญหาคือเนื้องานมันมีความยากง่ายใช้เวลาไม่เท่ากันครับ ถ้า Spark แบ่งงานเรียงไปเครื่องละประมาณ 3 บล็อกตามแกน X จะทำให้เครื่อง node03 ทำงานหนักอยู่เครื่องเดียวครับ ประกอบกับผมไม่รู้ว่า ระหว่าง master และ worker มีอะไรคุยกันเพิ่มเติมทำให้ให้เสียเวลาเพิ่มอีก ผมว่านะ เพิ่มมา 100 คอร์ก็อาจจะไม่เร็วขึ้นครับ แต่ทุกอย่างอยู่ที่ผมมโนขึ้นเองนะครับ ไม่ใช่คำตอบแต่อย่างใด

ถ้าเป็นตามที่ผมมโน โสหุ้ยของ Spark มันเยอะมาก แต่ตายตัวเดาๆ เอาว่า โสหุ้ยนั้นกินเวลาเป็นนาทีครับ การหา B(1000) นี้มันกินเวลาน้อยอยู่แล้ว โสหุ้ยมันกินหมด เหมือนกันเอารถเมล์มาวิ่งแต่มีผู้โดยสารแค่สองคน มันไม่คุ้มครับ ดังนั้นงานต้องใหญ่กว่านี้ครับ เช่นทำงานกันเป็นชั่วโมง ถึงคุ้มค่าโสหุ้ย ขอฝากเป็นการบ้านไปลองทดสอบดูครับ

ก่อนจาก

จากข้อมูลข้างต้นทั้งหมด ผมเอามาประมวลแล้วมโนได้เป็นข้อสรุปว่า Spark เหมาะกับงานที่แตกออกมาแล้วแต่ละส่วนใช้เวลาพอๆ กัน และงานนั้นต้องใหญ่กินเวลานาน อย่างงานพวก Big Data ที่แบ่งส่วนของข้อมูลแล้วไปค้นหา แต่กลับงานแบบ Big Compute ที่แต่ละส่วนใช้เวลาไม่เท่ากัน Spark อาจไม่เหมาะนัก แค่โสหุ้ยของ Spark ก็ทำผมแหยงพอสมควรครับ แต่ Spark เองก็มีข้อดีคือ มีการใช้ Functional Programming ในการจัดการข้อมูลในระดับบน ซึ่งถ้าเรามีฐานข้อมูล SQL หรือ NOSQL ขนาดใหญ่ ก็สามารถใช้ Library ชุดนี้ในการค้นหาและจัดการได้โดยง่าย ล่าสุด Spark ขึ้นแท่นเป็นแชมป์ในการเรียงข้อมูลขนาด 100TB ใช้เวลาแค่ 23 นาที 206 เครื่อง  ก็ต้องถือว่าประสิทธิภาพนั้นยอดเยี่ยม เพียงแค่เราควรเอาค้อนมาตอกตะปู ไม่ใช่เอาไปขันน๊อต ก็เท่านั้น

[Total: 10    Average: 3.4/5]

You may also like...

Leave a Reply

Your email address will not be published. Required fields are marked *