Apache Spark – Deo II

U prethodnom članku opisan je proces instalacije i konfiguracije Apache Spark-a. U ovom članku upoznaćemo se sa načinom korišćenja Spark Shell preko primera. Takođe, biće opisani i moduli Spark SQL, Spark Streaming, Machine Learning i GraphX.

Spark Shell

Spark donosi interaktivni shell. Komandom u terminalu:

$ spark-shell

otvaramo ovaj moćan alat. Koristeći transformacije i akcije opisane u prethodnom članku možemo kreirati primer brojanja reči u fajlu. Prvo ćemo učitati fajl.

scala> val inputfile = sc.textFile(“ulaz.txt“)

ulaz.txt :

Pera Peric Mika Mikic Pera Aleksandar Pera Mika Sladoled Instagram Pera

Nakon toga, napisaćemo sledeću komandu:

scala> val brR = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

Na ulazni fajl primenjujemo transformaciju flatMap, i svaki učitani red podelićemo na reči pomoću fukncije split, čiji je argument razmak (space-u). Zatim ćemo za svaku reč mapirati sa vrednošću jedan. Format je ( = ). Nakon toga ćemo redukcijom po ključu dobiti rezultat jer će se za jedan ključ naći kolekcija ). Sada možemo primeniti neku akcija. Iskoristićemo saveAsTextFile.

scala> brR.saveAsTlsextFile("izlaz")

Nakon toga, potrebno je otvoriti novi terminal (kako postojeći ne bismo zatvorili), i otići do lokacije na kojoj smo pokrenuli Spark. U folderu „izlaz“, dobijamo:

cd izlaz/ ls part-00000  part-00001  _SUCCESS

Nakon toga možemo štampati rezultat. Ukucati komandu:

cat part-00000

Dobijamo sledeći rezultat:

(Sladoled,1) (Aleksandar,1) (Mika,2) (Instagram,1) (Pera,4)

Isto i za komandu:

cat part-00001

dobijamo sledeći rezultat:

(Peric,1) (Mikic,1)

Spark podržava Javu7 i bolje verzije. Ukoliko se koristi Java 8, Spark podržava lambda izraze. Mogu se koristiti klase iz org.apache.spark.api.java.function paketa. Kada kreiramo maven projekat potrebno je u pom.xml fajlu dodati sledeću zavisnost:

 org.apache.spark spark-core_2.11 2.0.0 

U klasi je potrebno imporotvati sledeće klase:

import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.api.java.JavaRDD import org.apache.spark.SparkConf

Sada možemo napisati primer brojača reči u fajlu, u Java programskom jeziku:

public class App { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("seminarski"); JavaSparkContext sc = new JavaSparkContext(conf); String fajl = "src/main/resources/zgrada.c"; long broj = sc.textFile(fajl).filter(op -> op.contains("include")).count(); System.out.println("Broj pojavljivanja reci include je :" + broj); } }

Nakon uspešnog bildovanja komandom

mvn clean install

, potrebno je pokrenuti projekat. Program pokrećemo sledećom komandom:

/usr/local/spark/bin/spark-submit --class "org.amladenovic.App" target/spark-1.0-SNAPSHOT.jar

Nakon izvršavanja programa dobijamo sledeći izlaz:

....... ....... 16/08/25 22:58:51 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 542 ms on localhost (2/2) 16/08/25 22:58:51 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 16/08/25 22:58:51 INFO DAGScheduler: ResultStage 0 (count at App.java:33) finished in 0.571 s 16/08/25 22:58:51 INFO DAGScheduler: Job 0 finished: count at App.java:33, took 0.760153 s Broj pojavljivanja reci include je :4 16/08/25 22:58:51 INFO SparkContext: Invoking stop() from shutdown hook 16/08/25 22:58:51 INFO SparkUI: Stopped Spark web UI at http://192.168.0.12:4041 16/08/25 22:58:51 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/08/25 22:58:51 INFO MemoryStore: MemoryStore cleared 16/08/25 22:58:51 INFO BlockManager: BlockManager stopped

Sada ćemo pokazati primer računanja broja π Monte Carlo metodom. Monte Carlo metoda koristi uzorkovanje kako bi rešila neki problem. Koristićemo kvadrat određenih dimenzija i krug upisan u njega.

Odnos kvadrata i kruga

Ako uzmemo da je poluprečnik kruga r, stranica kvadrata je 2r, dakle površina kvadrata je 4r2, dok je površina kruga upisana u taj kvadrat r2π. Kako ne znamo obe površine, moraćemo da ih aprkisimiramo tj. nasumičnim biranjem tačaka koje pripadaju kvadratu i koje pripadaju krugu. Nakon n iteracija, odnosi ukupnog broja tačaka i tačaka koje su se našle u krugu biće jednake probližnom odnosu površina pa time možemo izračunati π.

val count = sc.parallelize(1 to NUM_SAMPLES).map{i => val x = Math.random() val y = Math.random() if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / NUM_SAMPLES)

U svakoj iteraciji generišemo dve nasumične vrednosti koje su x i y koordinate. Izračunamo da li pripadaju krugu ili ne. Kvadrat je u donjem levom uglu koordinatnog sistema (0, 0) do (1,1). $$\frac{ukupanBrTačaka}{brojTačakaUKrugu} \approx \frac{PovršinaKvadrata}{PovršinaKruga} = \frac{4r^2}{r^2 \pi} = \frac{4}{\pi}$$ Pa se π može izraziti: $$ \pi = 4 * \frac{brojTačakaUKrugu}{ukupanBrTačaka}$$

Spark SQL

Spark SQL je modul koji nam omogućava da pokrećemo SQL upite nad podacima u Spark-u. Možemo pokrenuti upite nad podacima dobijenim iz raličitih formata na primer JSON-a, baze, fajla itd. Spark SQ radi sa Dataframe tipom podatka.

DataSet je kolekcija podataka. DataSet je uveden u Spark sa verzijom 1.6. Može da se konsturiše od objekata određenog programskog jezika, a zatim manipulisati transformacijama((map, flatMap, filter..)

DataSet je podskup Dataframe. Spark Dataframe je zapravo DataSet koji je organizovan kao matrica ili tabela i sadrži metapodatke o tipovima podataka koji se nalaze u kolonama. Podržava lazy evaluation. Upravo zahvaljujući Dataframe-u omogućeno je pisanje SQL upita. Dataframe može biti pretvoren u RDDs pozivanjem .rdd metode.

Spark SQL poseduje SQLContext koji sadrži sve fukncionalnosti vezane za taj modul. SQLContext se kreira od SparkContext.

val sql = new org.apache.spark.sql.SQLContext(sc)

Tu je takođe i HiveContext koji pruža funbkcionalosti u radu sa Hive tabelama i manipulacijom upita nad njima.

Kako bi se olakšalo korisnicima, sa pojavom verzije 2.0, uvodi se SparkSession. Uloga SparkSession je da zameni SQLContext i HiveContext.

Učitaćemo fajl sa studentima.

val frame = sql.read.json("student.json")

Fajl izgleda ovako:

{"id" : "0001", "name" : "aca", "indeks" : "74", "upisan":"2011"} {"id" : "0002", "name" : "pera", "indeks" : "74", "upisan":"2011"} {"id" : "0003", "name" : "mika", "indeks" : "74", "upisan":"2011"} {"id" : "0004", "name" : "laza","indeks" : "74", "upisan":"2011"}

Sada možemo manipulisati podacima. Vrednost varijable frame se dobija komandom

scala> frame.show()

Rezultat je:

+----+------+----+------+ |  id|indeks|name|upisan| +----+------+----+------+ |0001|    74| aca|  2011| |0002|    74|pera|  2011| |0003|    74|mika|  2011| |0004|    74|laza|  2011|

Sledećom komandom dobijamo shemu učitanih podataka:

scala> frame.printSchema

Rezultat je:

root |-- id: string (nullable = true) |-- indeks: string (nullable = true) |-- name: string (nullable = true) |-- upisan: string (nullable = true)

Možemo filtrirati promenljivu po određenim kriterijumima:

!==   <=>     asc          endsWith     hashCode    mod         startsWith %     =!=     between      eqNullSafe   isNaN       multiply    substr &&    ===     bitwiseAND   equalTo      isNotNull   name        toString *     >       bitwiseOR    equals       isNull      notEqual    unary_! +     >=      bitwiseXOR   explain      isin        or          unary_- -     alias   cast         geq          leq         otherwise   when /     and     contains     getField     like        over        || <     apply   desc         getItem      lt          plus <=    as      divide       gt           minus       rlike

Recimo, možemo prikazati svakog ko se zove Aca.

scala> frame.filter(col("name").like("aca")).show

Izlaz iz gore navedene komande je sledeći:

+----+------+----+------+ |  id|indeks|name|upisan| +----+------+----+------+ |0001|    74| aca|  2011| +----+------+----+------+

Ovaj frejm možemo sačuvati sledećom komandom:

scala> frame.createTempView("studenti")

Takođe možemo primeniti SQL upit nad ovim podacima.

sql.sql("SELECT * FROM studenti").show

Rezultat je sledeći:

+----+------+----+------+ |  id|indeks|name|upisan| +----+------+----+------+ |0001|    74| aca|  2011| |0002|    74|pera|  2011| |0003|    74|mika|  2011| |0004|    74|laza|  2011| +----+------+----+------+

Spark Streaming

Spark Streaming je modul u Spark-u. Ukoliko nam je potreban alat za obradu podataka koji nastaju u realnom vremenu (obrada senzora, saobraćaja na serveru, pretraga na internetu, tvitova na tviteru i slično), ovo je alat za nas. Podaci sa Twitter, Kafke i drugih sličnih izvora mogu biti obrađeni koristeći algoritme izražene preko funkcija map, reduce, join i drugih. Mogu se primeniti algoritmi iz mašinskog učenja i algoritmi iz obrade grafova na ove podatke.

Iteracija u Spark Streaming-u

Spark Streaming prihvata ulazne podatke, deli na seriju ulaznih podataka kao RDD na predefinisani interval (recimo n sekundi). Potom se obrađuju od strane Spark–a generišući seriju obrađenih podataka. Nakon ovoga možemo sačuvati podatke za buduću analizu, generisati izveštaj i slično. Kompanija Uber koristi Spark Streaming da real-time obradi terabajte podataka sa telefona korisnika. Spark Streaming se koristi u Supply chain analizi. Spark Streaming koriste sve kompanije koji žele da personalizuju i interaguju sa korisnicicima.

DStream

DStream (Discretized Stream) je osnovna apstrakcija u Spark Streaming-u i predstavlja continuous strim podataka. Dstream predstavlja sekvencu RDD-a. Može biti kreiran od razlicitih izvora podataka, kao i primenom operacija na DStream.

Istorija RDD–a tokom vremena

Svaka primerna operacije na DStream čuva prethodnu verziju pre te operacije. DStreams podržava sledeće transformacije i akcije:

  • map
  • flatMap
  • filter
  • count
  • reduce
  • countByValue
  • join

Tu su i Window Operations koji nam dozvoljavaju da primenimo neku transformaciju na prozor podataka.

Window opcija

Tu su i Join Operations sa kojima je moguće spojiti dva strima. Postoje i izlazne operacije koje nam dozvljavaju da podatke sačuvamo na fajl sistemu ili bazi. Neke od značajnijih su saveAsTextFiles, foreachRDD i print..

Machine Learning

Mašinsko učenje je još jedan modul Spark-a. Mašinsko učenje nam služi da na osnovu trenutnih podataka možemo doneti odluku tj. „predvideti budućnost“.

Mašinsko učenje se može podeliti na sledeće modele:

  • Supervised learning
  • Unsupervised learning
  • Semi-supervised Learning
  • Reinforcement learning

Supervised learning – se koristi da predvidi izlaz trenirajućeg program na skupu podataka koji smo obeležili (labeled data). Koristeći ovaj model, želimo da se predvide novi label podaci. Jedna od tehnika ovog modela je klasifikacija.

Klasifikacija: Gmail koristi mašinsko učenje kako bi klasifikovao da li je neki email spam ili ne. Koristeći podatke o pošiljaocu, primaocu, naslovu i tekstu poruke radi klasifikaciju koristeći prethodne informacije. Ovaj model se takođe koristi u otkrivanju prevara u finansijskoj industriji brzo i precizno. Finansijske organizacije imaju vrlo malo milisekundi da odluče da li je neka transkacija legitimna ili prevara. PayPal koristi mašinsko učenje kako bi sprečio transkacije koje su potencijalno prevara.

Unsupervised learning – se koristi da se pronađu skriveni obrazci i povezanosti nad sirovim podacima. Nije potreban trening za ovaj model, pa se tehnika bazira na neoznačenim podacima. Jedna od tehnika ovog modela je Clustering.

Clustering: Google News koristi ovu tehniku kako bi razvrstao vesti u različite kategroje na osnovu sadržaja, naslova itd., analizirajući sličnosti imeđu grupe objekata. Spada u unsupervised algoritme jer ne možemo da odredimo izlaz unapred.

Semi-supervised Learning – koristi supervised i unsupervised learning modele. Uključuje malu grupu obeleženih podataka i veliku grupu neobeleženih podataka. Koristi se za kategorizaciju slika, prepoznavanje glasa i slično.

Reinforcement learning – model koji se koristi da se otkrije koja je najbonja akcija u trenutnom stanju tj. koje je najbonje rešenje za određeni problem u trenutnom stanju. Koristi se kod AI aplikacija.

Jedna od tehnika koja se dosta koristi je Collaborative Filtering. Amazon koristi ovu tehniku kako bi preporučio korisnicima na osnvovu istorije pretrage koji proizvod bi trebalo da mu se svidi. Radi po principu, da ako Pera voli filmove A,B i C, Mika voli filmove B i C, i pojavi se novi podatak da Laza voli film B, film C je moguća preporuka za Lazu. Ovaj modul dolazi u 2 paketa: spark.mllib i spark.ml.

spark.mllib paket sadrži originalni API baziran na Resilient Distrited Datasets-u. Uključuje neke od tehnika mašinskog učenja: correlation, classification, collaborative filtering, clustering i dimensionality reduction.

spark.ml paket API mašinskog učenja baziran na DataFrames-ovima koji su suština Spark SQL biblioteke.. Ovaj paket uključuje neke od tehnika mašinskog učenja, kao što su classification, clustering itd.

GraphX

GraphX je modul u Spark-u koji se koristi za grafove i njihovo izračunavanje. Graf je matematička struktura koja se sastoji od skupa grana i skupa čvorova. Graf može biti usmeren. Grane između 2 čvora ne moraju biti sa obe strane, dok kod regularnog grafa 2 čvora su povezana iz oba smera.

Property Graph proširuje RDD, i uvodi novi tip podatka : usmereni graf sa vrednostima na čvoru i grani. Podržava više grana iz jednog čvora. Svaka grana ima početnu i krajnju tačku. Neke od operacija koji koristi ovaj modul su subgraph, join 2 grafa itd. Ovaj modul optimizuje reprezentaciju čvorova i ivica (grana) kada su oni primitivni tipovi, čuvajući ih u nekim specijalnim nizovima. Moguće je čuvati različite tipove i to se postiže nasleđivanjem VertexProperty klase. Kao i RDD-ovi, property graf je nepromenljiv i tolerantan na greške. Kada se promeni vrednost nekog grafa, krerira se novi graf sa novim promenama. Moguće je, ukoliko mašina prestane sa radom, da se rekonstruiše graf. Property graf se sastoji od para kolekcija RDD-ova:

class Graph[VD, ED] { val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] }

Postoji puno ugrađenih funkcija, neke od njih su: * numVertices * inDegrees * outDegrees * degrees * vertices * edges

Najbolji način je da se neke funkcije demonstriraju na primeru. Imamo nekoliko gradova i njihove udaljenosti.

polazni gradodredišteudaljenost
Grad1Grad2100
Grad2Grad3150
Grad3Grad1120

Prvo je potrebno da pokrenemo Spark.

spark-shell

Zatim imporotvati sledeću klasu:

import org.apache.spark.graphx._

Zatim je potrebno da definišemo čvorove. Čvor ima ID i atribut.

ID GradaNaziv
1Grad1
2Grad2
3Grad3

To ćemo uraditi sledećom komandom:

val vertices=Array((1L, ("Grad1")),(2L, ("Grad2")),(3L,("Grad3")))

Potom definišemo čvorove u RDD-u.

val vRDD= sc.parallelize(vertices)

Sada ćemo definisati grane:

Polazni gradOdredišteUdaljenost
12100
23150
34120

Narednom komando pravimo grane:

val edges = Array(Edge(1L,2L,100),Edge(2L,3L,150),Edge(3L,1L,120))

Potom definišemo grane u RDD -u.

val eRDD= sc.parallelize(edges) val nowhere = "nowhere"

Potrebno da se kreira graf:

val graph = Graph(vRDD,eRDD, nowhere)

Sada možemo da manipulišemo podacima: Za početak možemo ištampati čvorove i grane:

graph.vertices.collect.foreach(println) graph.edges.collect.foreach(println)

Takođe možemo i da odštampamo udaljensoti gradova veće od 100 kilometara:

graph.edges.filter { case Edge(src, dst, prop) => prop > 100 }.collect.foreach(println)

Zaključak

Živimo u vremenu kada je potrebno obraditi veliku količinu podataka iz raličitih izvora ( Twitter, Facebook, Email-ovi, senzori, baze podataka itd.) za što kraće vreme. Apache Spark je odličan alat koji nam, uz module sa kojima dolazi, to može pružiti. Odlična dokumentacija, lakoća korišćenja, performanse i tim ljudi koji svakodnevno unapređuju ovaj alat su razlozi sve veće popularnosti ovog alata. Sa novom verzijom u julu 2.0, Spark je optimizovan i postiže bolje rezultate. Na sajtu stack overflow je izvršeno istraživanje o stanju u programerskoj sferi, a RDD Spark se nalazi na drugom mestu po kriterijumu tehnologija koje se sve više koriste, što govori da ovom alatu tek predostoji svetla budućnost.