पृथ्वीराज बोस द्वारा योगदान दिया गया
अपने पिछले ब्लॉग में मैंने अपाचे स्पार्क स्ट्रीमिंग की विंडोिंग अवधारणा का उपयोग करते हुए स्टेटफुल ट्रांसफॉर्मेशन पर चर्चा की है। आप इसे पढ़ सकते हैं यहाँ ।
इस पोस्ट में मैं अपाचे स्पार्क स्ट्रीमिंग में संचयी स्टेटफुल ऑपरेशंस पर चर्चा करने जा रहा हूं। यदि आप स्पार्क स्ट्रीमिंग के लिए नए हैं, तो मैं आपको यह समझने के लिए अपने पिछले ब्लॉग को पढ़ने के लिए दृढ़ता से सलाह देता हूं कि विंडो कैसे काम करती है।
स्पार्क स्ट्रीमिंग में स्टेटफुल ट्रांसफॉर्मेशन के प्रकार (जारी…)
> संचयी ट्रैकिंग
हमने इस्तेमाल किया था कम करेंबायएंडविंडो (…) एपीआई कुंजी की स्थिति को ट्रैक करने के लिए, हालांकि विंडोिंग कुछ उपयोग मामलों के लिए सीमाएं बनती है। क्या होगा यदि हम समय विंडो में इसे सीमित करने के बजाय कुंजी की अवस्थाओं को संचित करना चाहते हैं? उस मामले में हमें उपयोग करने की आवश्यकता होगी updateStateByKey (…) आग।
इस एपीआई को स्पार्क 1.3.0 में पेश किया गया था और यह बहुत लोकप्रिय रहा है। हालाँकि इस API में कुछ प्रदर्शन ओवरहेड हैं, समय के साथ राज्यों के आकार में वृद्धि के साथ इसका प्रदर्शन घटता जाता है। मैंने इस API का उपयोग दिखाने के लिए एक नमूना लिखा है। आप कोड पा सकते हैं यहाँ ।
स्पार्क 1.6.0 ने एक नया एपीआई पेश किया MapWithState (…) जो प्रदर्शन ओवरहेड्स द्वारा हल किए गए हल करता है updateStateByKey (…) । इस ब्लॉग में मैं इस विशेष एपीआई पर चर्चा करने जा रहा हूं जिसमें मैंने एक नमूना कार्यक्रम लिखा है। आप कोड पा सकते हैं यहाँ ।
इससे पहले कि मैं एक कोड के माध्यम से चलूं, चेकपॉइंटिंग पर कुछ शब्दों को छोड़ दूं। किसी भी राज्य परिवर्तन के लिए, चेकपॉइंटिंग अनिवार्य है। ड्राइवर प्रोग्राम विफल होने की स्थिति में चेकपॉइंटिंग कुंजी की स्थिति को पुनर्स्थापित करने के लिए एक तंत्र है। जब ड्राइवर पुनरारंभ होता है, तो चाबियों की स्थिति को चेकपॉइंटिंग फ़ाइलों से पुनर्स्थापित किया जाता है। चेकपॉइंट स्थान आमतौर पर एचडीएफएस या अमेज़ॅन एस 3 या कोई विश्वसनीय भंडारण हैं। कोड का परीक्षण करते समय, कोई स्थानीय फ़ाइल सिस्टम में भी स्टोर कर सकता है।
नमूना कार्यक्रम में, हम होस्ट = लोकलहोस्ट और पोर्ट = 9999 पर सॉकेट टेक्स्ट स्ट्रीम को सुनते हैं। यह आने वाली स्ट्रीम को (शब्द, सं। की संख्या) में टोकन करता है और 1.6.0 एपीआई का उपयोग करके शब्द गणना को ट्रैक करता है। MapWithState (…) । इसके अतिरिक्त, बिना अपडेट वाली कुंजियों का उपयोग करके हटाया जाता है StateSpec.timeout एपीआई। हम HDFS में चेकपॉइंट कर रहे हैं और चेकपॉइंटिंग आवृत्ति हर 20 सेकंड में है।
जावा प्रोग्राम कैसे संकलित करें
पहले स्पार्क स्ट्रीमिंग सत्र बनाएं।
हम एक बनाते हैं चेकपॉइंट एचडीएफएस में और फिर ऑब्जेक्ट विधि को कॉल करें getOrCreate (…) । द getOrCreate एपीआई जाँच करता है चेकपॉइंट यह देखने के लिए कि क्या कोई पूर्ववर्ती स्थिति बहाल करने के लिए है, यदि वह मौजूद है, तो यह स्पार्क स्ट्रीमिंग सत्र को फिर से बनाता है और नए डेटा के साथ आगे बढ़ने से पहले फ़ाइलों में संग्रहीत डेटा से कुंजियों की स्थिति को अपडेट करता है। अन्यथा यह एक नया स्पार्क स्ट्रीमिंग सत्र बनाता है।
द getOrCreate चेकपॉइंट निर्देशिका नाम और एक फ़ंक्शन (जिसे हमने नाम दिया है) लेता है CreateFunc ) किसका हस्ताक्षर होना चाहिए () => StreamingContext ।
अंदर कोड की जांच करते हैं CreateFunc ।
पंक्ति # 2: हम 'TestMapWithStateJob' और बैच अंतराल = 5 सेकंड के लिए नौकरी के नाम के साथ एक स्ट्रीमिंग संदर्भ बनाते हैं।
लाइन # 5: चेकपॉइंट डायरेक्टरी को सेट करें।
लाइन # 8: कक्षा का उपयोग करके राज्य विनिर्देश सेट करें org.apache.streaming.StateSpec वस्तु। हम पहले उस फ़ंक्शन को सेट करते हैं जो राज्य को ट्रैक करेगा, फिर हम परिणामी DStreams के लिए विभाजन की संख्या निर्धारित करते हैं जिन्हें बाद के परिवर्तनों के दौरान उत्पन्न किया जाना है। अंत में हम टाइमआउट (30 सेकंड) सेट करते हैं, जहां यदि किसी कुंजी के लिए कोई अपडेट 30 सेकंड में प्राप्त नहीं होता है, तो कुंजी स्थिति को हटा दिया जाएगा।
पंक्ति 12 #: सॉकेट स्ट्रीम को सेटअप करें, आने वाले बैच डेटा को समतल करें, कुंजी-मूल्य जोड़ी बनाएं, कॉल करें mapWithState , 20 के लिए चेकपॉइंटिंग अंतराल सेट करें और अंत में परिणाम प्रिंट करें।
स्पार्क फ्रेमवर्क को वें कहते हैं e createFunc पिछले मूल्य और वर्तमान स्थिति के साथ हर कुंजी के लिए। हम योग की गणना करते हैं और राज्य को संचयी राशि से अपडेट करते हैं और अंत में हम कुंजी के लिए राशि वापस करते हैं।
उदाहरण के साथ जावा में mvc वास्तुकला
जीथूब सूत्र -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala
क्या आप हमसे कोई प्रश्न पूछना चाहते हैं? कृपया टिप्पणी अनुभाग में इसका उल्लेख करें और हम आपके पास वापस आ जाएंगे।
संबंधित पोस्ट:
Apache Spark & Scala से शुरू करें
स्पार्क स्ट्रीमिंग में वाइंडिंग के साथ स्टेटफुल ट्रांसफॉर्मेशन