अपाचे स्पार्क स्ट्रीमिंग में संचयी स्टेटफुल ट्रांसफॉर्मेशन



यह ब्लॉग पोस्ट स्पार्क स्ट्रीमिंग में स्टेटफुल ट्रांसफॉर्मेशन पर चर्चा करता है। एक Hadoop स्पार्क कैरियर के लिए संचयी ट्रैकिंग और अप-कौशल के बारे में सभी जानें।

पृथ्वीराज बोस द्वारा योगदान दिया गया

अपने पिछले ब्लॉग में मैंने अपाचे स्पार्क स्ट्रीमिंग की विंडोिंग अवधारणा का उपयोग करते हुए स्टेटफुल ट्रांसफॉर्मेशन पर चर्चा की है। आप इसे पढ़ सकते हैं यहाँ





इस पोस्ट में मैं अपाचे स्पार्क स्ट्रीमिंग में संचयी स्टेटफुल ऑपरेशंस पर चर्चा करने जा रहा हूं। यदि आप स्पार्क स्ट्रीमिंग के लिए नए हैं, तो मैं आपको यह समझने के लिए अपने पिछले ब्लॉग को पढ़ने के लिए दृढ़ता से सलाह देता हूं कि विंडो कैसे काम करती है।

स्पार्क स्ट्रीमिंग में स्टेटफुल ट्रांसफॉर्मेशन के प्रकार (जारी…)

> संचयी ट्रैकिंग

हमने इस्तेमाल किया था कम करेंबायएंडविंडो (…) एपीआई कुंजी की स्थिति को ट्रैक करने के लिए, हालांकि विंडोिंग कुछ उपयोग मामलों के लिए सीमाएं बनती है। क्या होगा यदि हम समय विंडो में इसे सीमित करने के बजाय कुंजी की अवस्थाओं को संचित करना चाहते हैं? उस मामले में हमें उपयोग करने की आवश्यकता होगी updateStateByKey (…) आग।



इस एपीआई को स्पार्क 1.3.0 में पेश किया गया था और यह बहुत लोकप्रिय रहा है। हालाँकि इस API में कुछ प्रदर्शन ओवरहेड हैं, समय के साथ राज्यों के आकार में वृद्धि के साथ इसका प्रदर्शन घटता जाता है। मैंने इस API का उपयोग दिखाने के लिए एक नमूना लिखा है। आप कोड पा सकते हैं यहाँ

स्पार्क 1.6.0 ने एक नया एपीआई पेश किया MapWithState (…) जो प्रदर्शन ओवरहेड्स द्वारा हल किए गए हल करता है updateStateByKey (…) । इस ब्लॉग में मैं इस विशेष एपीआई पर चर्चा करने जा रहा हूं जिसमें मैंने एक नमूना कार्यक्रम लिखा है। आप कोड पा सकते हैं यहाँ

इससे पहले कि मैं एक कोड के माध्यम से चलूं, चेकपॉइंटिंग पर कुछ शब्दों को छोड़ दूं। किसी भी राज्य परिवर्तन के लिए, चेकपॉइंटिंग अनिवार्य है। ड्राइवर प्रोग्राम विफल होने की स्थिति में चेकपॉइंटिंग कुंजी की स्थिति को पुनर्स्थापित करने के लिए एक तंत्र है। जब ड्राइवर पुनरारंभ होता है, तो चाबियों की स्थिति को चेकपॉइंटिंग फ़ाइलों से पुनर्स्थापित किया जाता है। चेकपॉइंट स्थान आमतौर पर एचडीएफएस या अमेज़ॅन एस 3 या कोई विश्वसनीय भंडारण हैं। कोड का परीक्षण करते समय, कोई स्थानीय फ़ाइल सिस्टम में भी स्टोर कर सकता है।



नमूना कार्यक्रम में, हम होस्ट = लोकलहोस्ट और पोर्ट = 9999 पर सॉकेट टेक्स्ट स्ट्रीम को सुनते हैं। यह आने वाली स्ट्रीम को (शब्द, सं। की संख्या) में टोकन करता है और 1.6.0 एपीआई का उपयोग करके शब्द गणना को ट्रैक करता है। MapWithState (…) । इसके अतिरिक्त, बिना अपडेट वाली कुंजियों का उपयोग करके हटाया जाता है StateSpec.timeout एपीआई। हम HDFS में चेकपॉइंट कर रहे हैं और चेकपॉइंटिंग आवृत्ति हर 20 सेकंड में है।

जावा प्रोग्राम कैसे संकलित करें

पहले स्पार्क स्ट्रीमिंग सत्र बनाएं।

Spark-streaming-session

हम एक बनाते हैं चेकपॉइंट एचडीएफएस में और फिर ऑब्जेक्ट विधि को कॉल करें getOrCreate (…) । द getOrCreate एपीआई जाँच करता है चेकपॉइंट यह देखने के लिए कि क्या कोई पूर्ववर्ती स्थिति बहाल करने के लिए है, यदि वह मौजूद है, तो यह स्पार्क स्ट्रीमिंग सत्र को फिर से बनाता है और नए डेटा के साथ आगे बढ़ने से पहले फ़ाइलों में संग्रहीत डेटा से कुंजियों की स्थिति को अपडेट करता है। अन्यथा यह एक नया स्पार्क स्ट्रीमिंग सत्र बनाता है।

getOrCreate चेकपॉइंट निर्देशिका नाम और एक फ़ंक्शन (जिसे हमने नाम दिया है) लेता है CreateFunc ) किसका हस्ताक्षर होना चाहिए () => StreamingContext

अंदर कोड की जांच करते हैं CreateFunc

पंक्ति # 2: हम 'TestMapWithStateJob' और बैच अंतराल = 5 सेकंड के लिए नौकरी के नाम के साथ एक स्ट्रीमिंग संदर्भ बनाते हैं।

लाइन # 5: चेकपॉइंट डायरेक्टरी को सेट करें।

लाइन # 8: कक्षा का उपयोग करके राज्य विनिर्देश सेट करें org.apache.streaming.StateSpec वस्तु। हम पहले उस फ़ंक्शन को सेट करते हैं जो राज्य को ट्रैक करेगा, फिर हम परिणामी DStreams के लिए विभाजन की संख्या निर्धारित करते हैं जिन्हें बाद के परिवर्तनों के दौरान उत्पन्न किया जाना है। अंत में हम टाइमआउट (30 सेकंड) सेट करते हैं, जहां यदि किसी कुंजी के लिए कोई अपडेट 30 सेकंड में प्राप्त नहीं होता है, तो कुंजी स्थिति को हटा दिया जाएगा।

पंक्ति 12 #: सॉकेट स्ट्रीम को सेटअप करें, आने वाले बैच डेटा को समतल करें, कुंजी-मूल्य जोड़ी बनाएं, कॉल करें mapWithState , 20 के लिए चेकपॉइंटिंग अंतराल सेट करें और अंत में परिणाम प्रिंट करें।

स्पार्क फ्रेमवर्क को वें कहते हैं e createFunc पिछले मूल्य और वर्तमान स्थिति के साथ हर कुंजी के लिए। हम योग की गणना करते हैं और राज्य को संचयी राशि से अपडेट करते हैं और अंत में हम कुंजी के लिए राशि वापस करते हैं।

उदाहरण के साथ जावा में mvc वास्तुकला

जीथूब सूत्र -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

क्या आप हमसे कोई प्रश्न पूछना चाहते हैं? कृपया टिप्पणी अनुभाग में इसका उल्लेख करें और हम आपके पास वापस आ जाएंगे।

संबंधित पोस्ट:

Apache Spark & ​​Scala से शुरू करें

स्पार्क स्ट्रीमिंग में वाइंडिंग के साथ स्टेटफुल ट्रांसफॉर्मेशन