Kumulatiivne olekuline teisendus Apache Sparki voogesituses



Selles blogipostituses käsitletakse Sparki voogesituse olekulisi muutusi. Siit saate teada Hadoop Sparki karjääri kumulatiivse jälgimise ja oskuste suurendamise kohta.

Kaastööd tegi Prithviraj Bose

Oma eelmises ajaveebis olen arutanud olekulisi teisendusi, kasutades Apache Spark Streamingi jõulist kontseptsiooni. Saate seda lugeda siin .





Selles postituses käsitlen Apache Sparki voogesituse kumulatiivseid olekulisi toiminguid. Kui olete Sparki voogesituse uus kasutaja, soovitan teil tungivalt lugeda minu eelmist ajaveebi, et aru saada, kuidas tuuletamine töötab.

Olekuliste muutuste tüübid säde voogesituses (jätkub ...)

> Kumulatiivne jälgimine

Olime seda kasutanud reducByKeyAndWindow (…) API võtmete olekute jälgimiseks, kuid murdmine seab teatud kasutusjuhtumite korral piirangud. Mis siis, kui me tahame võtmete olekuid kogu aja jooksul akumuleerida, mitte piirduda ajaaknaga? Sel juhul peaksime seda kasutama updateStateByKey (…) TULI.



See API võeti kasutusele Spark 1.3.0-s ja on olnud väga populaarne. Kuid sellel API-l on mõningane jõudlus, selle jõudlus väheneb, kuna olekute suurus aja jooksul suureneb. Selle API kasutamise näitamiseks olen kirjutanud näidise. Koodi leiate siin .

Spark 1.6.0 tutvustas uut API-d mapWithState (…) mis lahendab esinemise üldkulud updateStateByKey (…) . Selles blogis käsitlen seda konkreetset API-d, kasutades minu kirjutatud näidisprogrammi. Koodi leiate siin .

Enne kui sukeldun koodijuhendisse, säästame paar sõna kontrollpunkti määramise kohta. Mis tahes olekulises teisenduses on kontrollpunktide määramine kohustuslik. Kontrollpunktimine on klahvide oleku taastamise mehhanism juhujuhi tõrke korral. Kui draiver taaskäivitub, taastatakse kontrollpunktide failidest võtmete olek. Kontrollpunkti asukohad on tavaliselt HDFS või Amazon S3 või mis tahes usaldusväärne salvestusruum. Koodi testimise ajal saab salvestada ka kohalikku failisüsteemi.



Näidisprogrammis kuulame sokli tekstivoogu hostil = localhost ja port = 9999. See tokeniseerib sissetuleva voo (sõnad, esinemiste arv) ja jälgib sõnade arvu, kasutades 1.6.0 API-d mapWithState (…) . Lisaks eemaldatakse klahvid ilma värskendusteta StateSpec. Timeout API. Kontrollime HDFS-is ja kontrollpunkti sagedus on iga 20 sekundi tagant.

mis on reavahetus HTML-is

Loome kõigepealt Sparki voogesituse seansi,

Spark-streaming-session

Loome a kontrollpunktDir HDFS-is ja seejärel kutsuge objekti meetod getOrCreate (…) . The getOrCreate API kontrollib kontrollpunktDir et näha, kas on varasemaid olekuid, mida taastada, kui see on olemas, loob see uuesti sädeme voogesituse seansi ja värskendab võtmete olekut failidesse salvestatud andmetest enne uute andmetega edasi liikumist. Vastasel juhul loob see uue Sparki voogesituse seansi.

The getOrCreate võtab kontrollpunkti kataloogi nime ja funktsiooni (mille oleme nimetanud createFunc ) kelle allkiri peaks olema () => StreamingContext .

Uurime sees olevat koodi createFunc .

Rida nr 2: loome voogesituse konteksti, kus töö nimi on „TestMapWithStateJob” ja pakkide intervall = 5 sekundit.

Rida nr 5: määrake kontrollpunktide kataloog.

Rida nr 8: määrake klassi spetsifikatsioon org.apache.streaming.StateSpec objekt. Kõigepealt seadistame oleku jälgimise funktsiooni, seejärel määrame tulemuseks olevate DStreamide partitsioonide arvu, mis tuleb luua järgnevate teisenduste ajal. Lõpuks seadsime ajalõpu (30 sekundiks), kus kui võtme värskendust ei saada 30 sekundi jooksul, eemaldatakse võtme olek.

Rida 12 #: seadistage pistikupesa voog, tasandage sissetulevad partiiandmed, looge võtme-väärtuste paar, helistage mapWithState , määrake kontrollpunkti intervalliks 20s ja printige tulemused lõpuks välja.

Sparki raamistik kutsub th e createFunc iga võtme jaoks, millel on eelmine väärtus ja praegune olek. Arvutame summa ja värskendame olekut kumulatiivse summaga ning lõpuks tagastame võtme summa.

kuidas teha javas esemete massiivi

Githubi allikad -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Kas teil on meile küsimus? Palun mainige seda kommentaaride jaotises ja võtame teiega ühendust.

Seonduvad postitused:

Alustage Apache Sparki ja Scalaga

Olekulised muutused koos sädemete voogesitusega