کیس 4: اسپارک اسٹریمنگ کی اصل لڑائی میں ونڈو فنکشن کا استعمال:
ونڈو میکانزم: ایک مدت کے اندر اندر ڈیٹا پروسیسنگ باقاعدگی سے کی جاتی ہے ، اور وقت کی مدت اوورلیپ ہوسکتی ہے۔ نیچے دیئے گئے اعداد و شمار کو دیکھیں:
اور
ونڈو کی لمبائی: ونڈو کی لمبائی ، مندرجہ بالا اعداد و شمار میں ونڈو کی لمبائی 3 ہے۔
ونڈو وقفہ: ونڈو کا وقفہ ، مندرجہ بالا اعداد و شمار میں ونڈو کا وقفہ 2 ہے۔
یہ دونوں پیرامیٹرز کا ذکر پہلے ہی بیچ سائز سے ہے۔ کتنی بار ہم ونڈو کا استعمال کرتے ہوئے کسی خاص حد میں اعداد و شمار کا حساب لگاسکتے ہیں ، مثال کے طور پر ، ہم ہر 10 سیکنڈ میں پچھلے 10 منٹ میں ڈیٹا کا حساب لگاسکتے ہیں (ڈیٹا یقینی طور پر اوورلیپ ہوجائے گا)۔
/ ہر 10 سیکنڈ میں آخری 30 سیکنڈ کا ڈیٹا کم کریں ویل ونڈوورڈکاؤنٹ = جوڑے۔ریڈس بائکیی اینڈ ونڈو ((ا: انٹ ، بی: انٹ) > (a + b) ، سیکنڈ (30) ، سیکنڈ (10)کیس 5: بلیک لسٹ فلٹرنگ اصل اسپارک اسٹرریمنگ جنگی میں:
تقاضے: ڈیٹا اسٹریم سے مخصوص ڈیٹا کو فلٹر کریں
اگر درج ذیل تک رسائی لاگ (DStream) موجود ہے تو:
20180808 ، زانگسان
20180808 ، لسی
20180808 ، وانگ وو
بلیک لسٹ ٹیبل (RDD):
زنگسان
لسی
بلیک لسٹ میں ناموں کو فلٹر کرنے کے بعد ، صرف 20180808 ، وانگ وو آؤٹ پٹ ہے
خیالات:
پہلے ، رسائی لاگ (DStream) پر درج ذیل کاروائیاں کریں:
== > (جانگسان: 20180808 ، زانگسان) (لیزی: 20180808 ، لسی) (وانگ وو: 20180808 ، وانگ وو) کی-ویلیو کی شکل میں لکھا گیا ہے ، جہاں کلید نام ہے اور ویلیو اصل رسائی لاگ ہے
پھر بلیک لسٹ (آر ڈی ڈی) پر بھی اسی طرح کی کاروائیاں کریں:
== > (زنسان: سچ) (لسی: سچ)
پھر ، ہم استعمال کرتے ہیں بائیں جوڑ ، درج ذیل نتائج حاصل کریں:
(زنگسان :)
(لسی :)
(وانگو :)
آخر میں ، ہمیں صرف دوسرا فلٹر کرنے کی ضرورت ہے جو سچ ہے ، اور باقی وہ نتیجہ ہے جو ہم چاہتے ہیں۔
org.apache.spark.SparkConf درآمد کریں org.apache.spark.streaming درآمد کریں۔ s سیکنڈز ، اسٹریمکنگ ٹیکسٹ} / ** *Author YuZhansheng * @ ڈیسک بلیک لسٹ فلٹرنگ *Create 2019-02-2015:33 * / آبجیکٹ ٹرانسفارم ایپ { ڈیف مین (آرگس: سرنی): یونٹ = { ویل اسپارککونف = نیا اسپارککونف (). سیٹ ماسٹر ("مقامی")۔ سیٹ ایپ نام ("TransforApp") // StreamingContext کو دو پیرامیٹرز کی ضرورت ہوتی ہے بنائیں: اسپارک کانف اور بیچ کا وقفہ ویل ایس ایس سی = نیا اسٹریمنگکونسٹسٹ (اسپارککونف ، سیکنڈز (5)) // بلیک لسٹ بنائیں ویل کالوں = فہرست ("زانگسان" ، "لسی") ویل کالیکس آر ڈی ڈی = ssc.sparkContext.parallelize (کالوں). نقشہ (x = > (x ، سچ)) ویل لائنز = ssc.sketTextStream ("لوکل ہوسٹ" ، 6789) ویل کلکلوگ = لائنز۔ماپ (x = > (x.split ("،") (1)، x)) تبدیل کریں (rdd = > { rdd.leftOuterJoin (کالوں آرڈیڈی) .فلٹر (x = > x._2._2.getOrElse (جھوٹا)! = سچ) .map (x = > x._2._1) }) پر کلک کریں۔ ssc.start () ssc.awaitTration () } }ٹیسٹ: اعداد و شمار بھیجنے کے لئے موکل پر NC -lk 6789 کمانڈ استعمال کریں ، اور کنسول کے ذریعہ تصدیق کے لئے پرنٹ شدہ نتیجہ دیکھیں۔
کیس 6: اسپارک اسٹریمنگ اسپارک ایس کیو ایل کو ورڈ فریکوئنسی کے اعدادو شمار کو مکمل کرنے کے لئے ضم کرتی ہے
پہلے انحصار شامل کریں
< ! - سپارک ایس کیو ایل انحصار-- > < انحصار > < گروپ آئی ڈی > org.apache.spark < / groupId > < artifactId > چنگاری sql_2.11 < / artifactId > < ورژن > ark ark spark.version} < / ورژن > < / انحصار > java.beans.Transient درآمد کریں org.apache.spark.SparkConf درآمد کریں org.apache.spark.rdd.RDD درآمد کریں org.apache.spark.sql.SparkSession درآمد کریں org.apache.spark.streaming درآمد کریں۔ s سیکنڈز ، اسٹریمنگکون ٹیکسٹ ، ٹائم} / ** *Author YuZhansheng * @ ڈیسک سپارک اسٹریمنگ اسپارڈ ایس کیو ایل کو ورڈ فریکوئنسی کے اعدادو شمار کو مکمل کرنے کے لئے ضم کرتی ہے *Create 2019-02-2016:03 * / آبجیکٹ اسکیل نیٹ ورک ورک کارڈ { ڈیف مین (آرگس: سرنی): یونٹ = { ویل اسپارککونف = نیا اسپارککونف (). سیٹ ماسٹر ("مقامی")۔ سیٹ ایپ نام ("سکل نیٹ ورک ورک کارڈ") // StreamingContext کو دو پیرامیٹرز کی ضرورت ہوتی ہے بنائیں: اسپارک کانف اور بیچ کا وقفہ ویل ایس ایس سی = نیا اسٹریمنگکونسٹسٹ (اسپارککونف ، سیکنڈز (5)) ویل لائنز = ssc.sketTextStream ("لوکل ہوسٹ" ، 6789) ویل الفاظ = لائن.flatMap (_. تقسیم ("")) words.foreachRDD {(rdd: RDD، وقت: وقت) = > ویل اسپارک = اسپارک سیشنسلیٹن ڈاٹ گیٹ انسٹینس (rdd.sparkContext.getConf) درآمد spark.implicits._ ویل الفاظڈٹا فریم = rdd.map (ڈبلیو = > ریکارڈ (ڈبلیو). ٹو ڈی ایف () wordsDataFrame.createOrReplaceTempView ("الفاظ") ویل ورڈکاؤنٹس ڈیٹا فریم = سپارک.سقیل ("لفظ کو منتخب کریں ، گنتی (*) لفظ کے حساب سے گروپ کے مجموعی طور پر") پرنٹ لین (ے "======= $ وقت =======") ورڈکونٹس ڈیٹا فریم.شو () } ssc.start () ssc.awaitTration () } کیس کلاس ریکارڈ (لفظ: سٹرنگ) آبجیکٹ اسپارک سیشن سنگلٹن { @ ٹرانسینٹ نجی وار مثال: اسپارک سیشن = _ Def getInstance (sparkConf: SparkConf): سپارک سیشن = { اگر (مثال کے طور پر == null) { مثال = اسپارک سیشن . بلڈر () .config (sparkConf) .getOrCreate () } مثال } } }ٹیسٹ: اعداد و شمار بھیجنے کے لئے موکل پر NC -lk 6789 کمانڈ استعمال کریں ، اور کنسول کے ذریعہ تصدیق کے لئے پرنٹ شدہ نتیجہ دیکھیں۔
+ ---- + ----- +
| لفظ | کل |
+ ---- + ----- +
| ڈی ایس | 2 |
| ای | 1 |
| d | 1 |
| a | 2 |
| جیسا | 1 |
+ ---- + ----- +
(اس معاملے کو چلانے کے دوران مزید نوشتہ جات چھپی ہوں گے۔)