DBInputFormat andmete edastamiseks SQL-ist NoSQL-i andmebaasi



Selle ajaveebi eesmärk on õppida, kuidas andmeid SQL-andmebaasidest HDFS-i edastada, kuidas andmeid SQL-andmebaasidest NoSQL-i andmebaasidesse edastada.

Selles blogis uurime Hadoopi tehnoloogia ühe olulisema komponendi, st MapReduce, võimalusi ja võimalusi.

Täna võtavad ettevõtted Hadoopi raamistiku oma esimese valikuna andmete salvestamiseks, kuna see suudab tõhusalt töödelda suuri andmeid. Kuid me teame ka, et andmed on mitmekülgsed ning eksisteerivad erinevates struktuurides ja vormingutes. Nii suure hulga andmete ja nende erinevate vormingute kontrollimiseks peaks olema olemas mehhanism, mis mahutaks kõik sordid ja annaks siiski tõhusa ja järjepideva tulemuse.





Hadoopi raamistiku kõige võimsam komponent on MapReduce, mis tagab andmete ja nende struktuuri juhtimise paremini kui teised analoogid. Ehkki see nõuab õppimiskõvera lisakulusid ja programmeerimise keerukust, saate nende keerukustega toimetulekul Hadoopiga kindlasti hakkama saada igasuguste andmetega.

MapReduce raamistik jagab kõik töötlemisülesanded põhimõtteliselt kaheks faasiks: Map ja Reduce.



Nende etappide jaoks algandmete ettevalmistamine nõuab mõnede põhiklasside ja liideste mõistmist. Sellise ümbertöötlemise superklass on Sisendvorming.

The Sisendvorming klass on üks Hadoop MapReduce API põhiklassidest. See klass vastutab kahe peamise asja määratlemise eest:

  • Andmed jagunevad
  • Plaadilugeja

Andmete jagamine on Hadoop MapReduce raamistiku põhimõiste, mis määratleb nii üksikute kaardiülesannete suuruse kui ka selle potentsiaalse käivitusserveri. The Plaadilugeja vastutab sisendfailist kirjete tegeliku lugemise ja nende (võtme / väärtuspaaridena) kaardistajale esitamise eest.



Kaardistajate arv otsustatakse jaotuste arvu põhjal. Jaotuste loomine on InputFormati ülesanne. Enamasti on jaotuse suurus võrdne ploki suurusega, kuid alati ei looda jaotusi HDFS-i ploki suuruse põhjal. See sõltub täielikult sellest, kuidas teie InputFormati meetod getSplits () on alistatud.

MR-i jagamise ja HDFS-i blokeerimise vahel on põhimõtteline erinevus. Plokk on füüsiline andmepakk, samal ajal kui jagamine on lihtsalt loogiline tükike, mida kaardistaja loeb. Jagamine ei sisalda sisendandmeid, see sisaldab lihtsalt andmete viidet või aadressi. Jagamisel on põhimõtteliselt kaks asja: pikkus baitides ja hulk salvestuskohti, mis on vaid stringid.

Selle paremaks mõistmiseks võtame ühe näite: MySQL-i salvestatud andmete töötlemine MR-i abil. Kuna antud juhul pole plokkide kontseptsiooni, on teooria: 'splitid luuakse alati HDFS-i ploki põhjal',ebaõnnestub. Üks võimalus on luua jaotised MySQL-i tabeli ridade vahemike põhjal (ja seda teeb DBInputFormat, sisendvorming relatsioonide andmebaasidest andmete lugemiseks). Meil võib olla k arv split, mis koosneb n-st reast.

Ainult FileInputFormati (InputFormati failides salvestatud andmete töötlemiseks) põhiste InputFormatsi jaoks luuakse jaotused sisendfailide kogumahu baiti baasil. Sisendfailide failisuurust FileSystem käsitletakse aga sisendjaotuste ülemise piirina. Kui teil on HDFS-i ploki suurusest väiksem fail, saate selle faili jaoks ainult ühe kaardistaja. Kui soovite, et käitumine oleks erinev, võite kasutada mapred.min.split.size. Kuid see sõltub jällegi ainult teie InputFormati getSplitsist ().

Meil on paketi org.apache.hadoop.mapreduce.lib.input all saadaval nii palju olemasolevaid sisendvorminguid.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

mis on andmeteaduse kursus

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Vaikimisi on TextInputFormat.

Samamoodi on meil nii palju väljundvorminguid, mis loevad reduktorite andmeid ja salvestavad need HDFS-i:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

mis on kontekstifiltrid tabelis

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Vaikimisi on TextOutputFormat.

Selle ajaveebi lugemise lõpetamiseks oleksite õppinud:

  • Kuidas kirjutada kaardi vähendamise programm
  • Mapreduce'is saadaval olevate erinevat tüüpi InputFormatside kohta
  • Mis on InputFormatsi vajadus
  • Kuidas kirjutada kohandatud InputFormats
  • Andmete ülekandmine SQL-i andmebaasidest HDFS-i
  • Andmete ülekandmine SQL-i (siin MySQL) andmebaasidest NoSQL-i andmebaasidesse (siin Hbase)
  • Kuidas edastada andmeid ühest SQL-i andmebaasist teise SQL-i andmebaaside tabelisse (võib-olla pole see nii oluline, kui teeme seda samas SQL-i andmebaasis. Samas pole midagi halba, kui teil on samad teadmised. Sa ei või kunagi teada kuidas seda saab kasutada

Eeldus:

  • Hadoop on eelinstallitud
  • SQL on eelinstallitud
  • Hbase on eelinstallitud
  • Java põhiteadmised
  • MapReduce teadmisi
  • Hadoopi raamistiku põhiteadmised

Mõistame probleemi lahendust, mille me siin lahendame:

Meie relatsioonide andmebaasis Edureka on MySQL DB-s töötajate tabel. Vastavalt ärinõuetele peame kogu suhtelises DB-s saadaolevad andmed teisaldama Hadoopi failisüsteemi, st HDFS, NoSQL DB, mida nimetatakse Hbase'iks.

Selle ülesande täitmiseks on meil palju võimalusi:

  • Sqoop
  • Flume
  • MapReduce

Nüüd ei soovi te selle toimingu jaoks ühtegi muud tööriista installida ega konfigureerida. Teile jääb ainult üks võimalus, mis on Hadoopi töötlusraamistik MapReduce. MapReduce raamistik annaks teile andmete edastamise ajal täieliku kontrolli. Veergudega saate manipuleerida ja panna otse ükskõik millisesse sihtkohta.

Märge:

  • MySQL-i tabelite tabelite toomiseks peame MySQL-konnektori alla laadima ja panema Hadoopi klassirajale. Selleks laadige alla pistikühendus com.mysql.jdbc_5.1.5.jar ja hoidke seda kataloogis Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Allalaadimised / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Samuti pange kõik Hbase'i purgid Hadoopi klassiraja alla, et teie MR-programm saaks juurdepääsu Hbase'ile. Selleks käivitage järgmine käsk :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Tarkvaraversioonid, mida olen selle ülesande täitmisel kasutanud, on:

  • Hadooop-2.3.0
  • HBase 0,98,9-Hadoop2
  • Varjutatud Kuu

Programmi vältimiseks ühilduvusprobleemides määran oma lugejatele käsu käitada sarnase keskkonnaga.

Kohandatud DBInputWritable:

pakett com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implementates Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException // Objekt Resultset esindab SQL-i lausest tagastatud andmeid {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) throws IOException { } public void write (PreparedStatement ps) viskab SQLException {ps.setInt (1, id) ps.setString (2, nimi) ps.setString (3, osakond)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

Kohandatud DBOutputWritable:

pakett com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implementates Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = nimi this.id = id this.dept = dept} public void readFields (DataInput in) viskab IOException {} public void readFields (ResultSet rs) viskab SQLException {} public void write (DataOutput out) viskab IOException {} public void write (PreparedStatement ps) viskab SQLException {ps.setString (1, nimi) ps.setInt (2, id) ps.setString (3, osakond)}}

Sisendtabel:

luua andmebaas edureka
luua tabeli emp (empid int not null, nimi varchar (30), dept varchar (20), esmane võti (empid))
sisestage empi väärtustesse (1, 'abhay', 'areng'), (2, 'brundesh', 'test')
vali * emp

1. juhtum: teisaldamine MySQL-ist HDFS-i

pakett com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Konfiguratsiooni import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) viskab erandi {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // draiveriklass' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // kasutajanimi' root ') // parool Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.utormorm uus tee (args [0])) DBInputFormat.setInput (töö, DBInputWritable.class, 'emp', // sisendtabeli nimi null, null, uus string [] {'empid', 'name', 'dept'} / / tabeli veerud) Tee p = uus tee (args [0]) FileSystem fs = FileSystem.get (uus URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

See koodilõik võimaldab meil valmistada ette või konfigureerida sisendvormi, et pääseda juurde meie lähte-SQL-i DB-le. Parameeter sisaldab draiveriklassi, URL-il on SQL-i andmebaasi aadress, selle kasutajanimi ja parool.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // draiveriklass 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // kasutajanimi 'root') // parool

See koodijupp võimaldab meil edastada andmebaasis olevate tabelite üksikasjad ja määrata need tööobjektiks. Parameetrid hõlmavad loomulikult tööeksemplari, kohandatud kirjutatavat klassi, mis peab juurutama DBWritable'i liidese, lähtetabeli nime, tingimust kui on veel null, mis tahes sorteerimisparameetreid muu null, vastavalt tabeliveergude loendit.

DBInputFormat.setInput (töö, DBInputWritable.class, 'emp', // sisendtabeli nimi null, null, uus string [] {'empid', 'nimi', 'dept'} // tabeli veerud)

Kaardistaja

pakett com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable avaliku klassi kaart laiendab Mapperit {
kaitstud tühine kaart (LongWritable võti, DBInputWritable väärtus, Context ctx) {proovige {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (uus tekst (nimi + '' + id + '' + osakond), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reduktor: kasutatud identiteedi reduktorit

Käsk käivitada:

hadoop purk dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Väljund: MySQL-i tabel teisaldati HDFS-i

hadoop dfs -ls / dbtohdfs / *

2. juhtum: teisaldamine MySQL-i ühelt tabelilt teisele MySQL-is

väljundtabeli loomine MySQL-is

luua tabeli töötaja1 (nimi varchar (20), id int, dept varchar (20))

pakett com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) viskab erandi {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // draiveriklass 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // kasutajanimi' root ') // parool Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) töö .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (töö, DBInputWritable.class, 'emp', // sisendtabeli nimi null, null, uus string [] {'empid ',' nimi ',' osakond '} // tabeli veerud) DBOutputFormat.setOutput (töö,' töötaja1 ', // väljundtabeli nimi uus string [] {' nimi ',' id ',' osakond '} // tabel veerud) System.exit (job.waitForCompletion (true)? 0: 1)}}

Selle kooditüki abil saame konfigureerida väljundtabeli nime SQL DB-s. Parameetrid on vastavalt tööeksemplari, väljundtabeli nimi ja väljundveeru nimed.

DBOutputFormat.setOutput (töö, 'töötaja1', // väljundtabeli nimi uus string [] {'nimi', 'id', 'osakond} // tabeli veerud)

Kaardistaja: sama mis juhtumil 1

Reduktor:

pakett com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reduceri import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable avalik klass Reduce laiendab Reducerit {kaitstud void reduc (tekstivõti, korduvad väärtused, kontekst ctx) {int summa = 0 stringirida [] = key.toString (). Split ('') proovige {ctx.write (uus DBOutputWritable (rida [0] .toString (), Integer.parseInt (rida [1] .toString ()), rida [2] .toString ()), NullWritable.get ())} saak (IOException e) {e.printStackTrace ()} saak (InterruptedException e) {e.printStackTrace ()}}}

Käsk käivitamiseks:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Väljund: andmete edastamine MySQL-i EMP-tabelist teisele MySQL-i tabeli töötajale1

3. juhtum: teisaldamine MySQL-i tabelilt NoSQL-i (Hbase) tabelisse

Hbase'i tabeli loomine SQL-tabeli väljundi majutamiseks:

luua 'töötaja', 'ametlik_info'

Juhi klass:

pakett Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) viskab erandi {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // draiveriklass 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'juur', // kasutajanimi 'juur') // parool Tööülesanne = uus töö (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('töötaja', Reduce.class, job) job.setInputFormFormTormFormFormTormFormTormFormATsetFormFormATormFormTormFormTormFormTormFormTormFormatFormATsetFormFormatClass (Job.setMassOutFormatClass) klass) DBInputFormat.setInput (töö, DBInputWritable.class, 'emp', // sisendtabeli nimi null, null, uus string [] {'empid', 'nimi', 'dept'} // tabeli veerud) System.exit (job.waitForCompletion (tõene)? 0: 1)}}

See koodijupp võimaldab teil konfigureerida väljundvõti klassi, mis hbase'i korral on ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Siin edastame tabelis tegutsemiseks hbase tabeli nime ja reduktori.

TableMapReduceUtil.initTableReducerJob ('töötaja', Reduce.class, töö)

Kaardistaja:

pakett Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map pikendab Mapperit {private IntWritable one = new IntWritable (1) kaitstud tühikaart (LongWritable id, DBInputWritable value, Context context) {proovige {String line = value.getName () String cd = value.getId () + 'String dept = value.getDept () context.write (uus ImmutableBytesWritable (Bytes.toBytes (cd)), uus tekst (rida + ') '+ dept))} saak (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Selles koodijupis võtame väärtused klassi DBinputwritable algoritmidest ja edastame need seejärel
ImmutableBytesWritable nii, et nad jõuaksid reduktorini bytewriatble kujul, millest Hbase aru saab.

Stringirida = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (uus ImmutableBytesWritable (Bytes.toBytes (cd)), uus tekst (rida + '' + dept ))

Reduktor:

pakett Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce laiendab TableReducer {public void reduc (ImmutableBytesWritable key, Iterable values, Context context) viskab IOException, InterruptedException {String [] põhjus = null // Loop väärtused for (Teksti val: väärtused) {põhjus = val.toString (). split ('')} // HBase-i panemine Put put = new Put (key.get ()) put.add (Bytes.toBytes ('ametlik_info') ), Bytes.toBytes ('nimi'), Bytes.toBytes (põhjus [0])) put.add (Bytes.toBytes ('ametlik_info'), Bytes.toBytes ('osakond'), Bytes.toBytes (põhjus [1 ])) context.write (võti, pane)}}

See koodijupp võimaldab meil otsustada täpse rea ja veeru, kuhu reduktori väärtused salvestame. Siin hoiustame iga empiidi eraldi reas, kui tegime empid rea võtmena, mis oleks ainulaadne. Igas reas hoiame töötajate ametlikku teavet veergude perekonna “ametlik_info” all vastavalt veergude “nimi” ja “osakond” all.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('ametlik_info'), Bytes.toBytes ('nimi'), Bytes.toBytes (põhjus [0])) put.add (Bytes. toBytes ('ametlik_info'), Bytes.toBytes ('osakond'), Bytes.toBytes (põhjus [1])) context.write (võti, pane)

Edastatud andmed Hbase'is:

skaneeri töötaja

Nagu näeme, suutsime edukalt täita oma äriandmete üleviimise relatsioonilisest SQL-i andmebaasist NoSQL-i DB-ks.

Järgmises ajaveebis õpime teiste sisend- ja väljundvormingute koodide kirjutamist ja täitmist.

Postitage oma kommentaare, küsimusi või tagasisidet. Tahaksin sinust hea meelega kuulda.

Kas teil on meile küsimus? Palun mainige seda kommentaaride jaotises ja võtame teiega ühendust.

Seonduvad postitused: