חזרה לבלוג
full stack·22 בינואר 2026·8 דק' קריאה·מאת יהונתן סעדיה

בניית Data Pipeline (ETL) אמין לדאטה מגירוד

איך אני בונה data pipeline לדאטה מגירוד: extract, validation, ניקוי, deduplication, העשרה ואחסון. ETL pipeline ששורד דאטה מלוכלכת ומשתנה.

סקרייפר שמחזיר שורות הוא לא מוצר דאטה. בפעם הראשונה שלקוח מוסר לי תיקייה של קבצי CSV מקראול חד-פעמי, הדאטה נראית שמישה בערך עשר דקות - עד שמבחינים בחברות הכפולות, במחירים השמורים כמחרוזות "$1,299.00", בריצות שנקטעו באמצע, ובשדות ששינו צורה בשקט ביום שלישי האחרון. כדי להפוך פלט גולמי של גירוד למשהו שעסק באמת יכול לסמוך עליו צריך data pipeline לדאטה מגירוד אמיתי: ETL pipeline חוזר שמבצע extract, validation, ניקוי, deduplication, העשרה, אחסון, scheduling וניטור של עצמו. זה החלק בעבודה שמבדיל בין סקריפט של סוף שבוע לבין תשתית שאפשר לבנות עליה.

בניתי כאלה למאגרי לידים, לפידים של ניטור מחירים, ולקטלוגים של מתחרים. הגירוד עצמו לרוב לא החלק הקשה - אם מעניין אתכם הצד שלפני, ראו את ההערות שלי על איך לגרד אתרים בלי להיחסם. המאמר הזה עוסק בכל מה שקורה אחרי שה-HTML חוזר.

למה דאטה גולמית מגירוד צריכה pipeline

דאטה מהאינטרנט עוינת כברירת מחדל. אתרים משנים markup בלי התראה, אותה ישות מופיעה תחת שלושה שמות שונים במקצת, שדות אופציונליים חסרים בחצי מהדפים, וכל קראול גדול ייכשל באמצע לפחות חלק מהזמן. אם טוענים את זה ישירות למסד נתונים, כל צרכן בהמשך הצינור יורש את הבלגן. ה-pipeline קיים כדי לספוג את הכאוס במקום אחד, כך שמה שנוחת באחסון נקי, מטופס, מנוקה מכפילויות וניתן לתשאול. הכלל שאני עובד לפיו: לעולם אל תיתנו לדאטה גולמית לגעת ב-system of record בלי לעבור validation קודם.

השלבים של ETL pipeline

כל pipeline שאני בונה, ללא קשר למקור, עוקב אחר אותו שלד. לכל שלב תפקיד יחיד והוא מעביר ארטיפקט נקי במעט לשלב הבא.

שלבמה הוא עושההכשל שמפניו אני מגן
Extractמשיכת HTML/JSON גולמי, שמירה ללא נגיעה (raw layer)גירוד מחדש כשמתגלה באג בפרסר מאוחר יותר
Validationבדיקת טיפוסים, שדות חובה וטווחים מול schemaזבל שנכנס למסד הנתונים בשקט
ניקוי / נרמולtrim, המרת טיפוסים, תקנון יחידות, מטבעות, casing"$1,299" מול 1299.0 מול "1299 USD"
Deduplicationאיחוד רשומות למפתח עסקי יציבאותה חברה נספרת חמש פעמים
העשרהgeocoding, הסקת קטגוריה, join לדאטת ייחוסרשומות דלות שאי אפשר לפעול לפיהן
אחסוןidempotent upsert ל-system of recordשורות כפולות בריצות חוזרות
Schedulingהרצה ב-cron, incremental או fullדאטה ישנה, טיפול ידני
ניטורספירת שורות, freshness, שיעורי שגיאה, התראותתקלה שקטה שאף אחד לא מבחין בה שבועות

Extract: שמרו את ה-raw layer

ההרגל היחיד הכי בעל ערך הוא לשמור את ה-payload הגולמי לפני שמפרסרים משהו. דיסק זול; קראול מחדש של אתר מוגן לא. כשאני מוצא באג בפרסר שלושה שבועות אחר כך, אני רוצה להריץ extraction מחדש מעל ה-HTML השמור, לא להלום שוב במקור. אני מחזיק raw layer (object storage או collection בשם raw_pages) ממופתח לפי URL + חותמת זמן של fetch, ומתייחס לפרסור כפונקציה טהורה מעל השכבה הזו.

Validation לפני כל דבר אחר

ה-validation הוא המקום שבו אני מגדיר מה זה "טוב". מודל Pydantic או JSON Schema מתאר את שדות החובה, הטיפוסים שלהם וטווחים הגיוניים - מחיר חייב להיות מספר חיובי, אימייל חייב להיראות כמו אימייל, מדינה חייבת להיות ברשימה ידועה. רשומות שנכשלות הולכות לטבלת quarantine עם הסיבה מצורפת, ולעולם לא נזרקות בשקט. ה-quarantine הזה הוא זהב: הוא מספר לכם בדיוק איך המקור השתנה.

ניקוי, נרמול, deduplication

הניקוי הוא הליבה הלא-זוהרת. הסירו רווחים, המירו "$1,299.00" ל-float, נרמלו מספרי טלפון ל-E.164, הורידו אימיילים ל-lowercase, תקננו קודי מדינה. ה-deduplication אז צריך מפתח עסקי יציב - לא ה-_id של מונגו, אלא משהו שנגזר מהדאטה: domain מנורמל, מספר טלפון, או hash של (name, city). המפתח הזה הוא העוגן לכתיבות idempotent.

עיצוב schema ו-idempotent upserts

החלטת העיצוב הכי חשובה ב-data pipeline לדאטה מגירוד היא מפתח ה-deduplication, כי הוא הופך את הכתיבות שלכם ל-idempotent. Idempotent אומר שאפשר להריץ את אותו batch פעמיים ומסד הנתונים מגיע למצב זהה - בלי כפילויות, בלי הפתעות. מגיעים לזה באמצעות upsert על המפתח העסקי במקום insert עיוור.

הנה התבנית שאני משתמש בה ב-MongoDB. שדות המפתח מגדירים זהות, $set מעדכן את השדות המשתנים, $setOnInsert חותם זמן יצירה רק בכתיבה הראשונה, ו-bulk write לא מסודר מונע מרשומה רעה אחת להרוג את כל ה-batch.

from pymongo import UpdateOne

def business_key(rec: dict) -> dict:
    # זהות יציבה שנגזרת מהדאטה, לא id אקראי
    return {"domain": rec["domain"].lower().strip()}

def upsert_batch(col, records: list[dict]):
    ops = []
    for rec in records:
        ops.append(UpdateOne(
            business_key(rec),
            {
                "$set": {
                    "name": rec["name"],
                    "price": rec["price"],
                    "updated_at": rec["fetched_at"],
                },
                "$setOnInsert": {"created_at": rec["fetched_at"]},
            },
            upsert=True,
        ))
    if ops:
        # unordered: כשל אחד לא מבטל את השאר
        return col.bulk_write(ops, ordered=False)

גבו את זה עם index ייחודי על המפתח העסקי (db.companies.create_index("domain", unique=True)) כך שמסד הנתונים עצמו יאכוף ייחודיות גם תחת ריצות מקבילות. ב-PostgreSQL המקבילה היא INSERT ... ON CONFLICT (domain) DO UPDATE. כך או כך, החוזה זהה: הרצה מחדש תמיד בטוחה.

טיפול בדאטה מלוכלכת וכשלים חלקיים

שני מצבי כשל שולטים. ראשית, דאטה מלוכלכת ומשתנה: הניחו שכל שדה יכול להיות חסר, פגום, או ששמו השתנה זה עתה. בצעו validation בגבול, העבירו את הנדחים ל-quarantine, והתריעו כששיעור הדחייה קופץ - קפיצה פתאומית בדרך כלל אומרת שהאתר השתנה והפרסר צריך טיפול, לא שהדאטה הפכה לגרועה יותר. שנית, כשלים חלקיים: קראול של 50,000 דפים יאבד חיבורים, ייתקל ב-rate limits, ויעבור timeout. אני הופך את ה-extraction לבר-המשך על ידי מעקב סטטוס per-URL (pending, fetched, failed, parsed) כך שהרצה מחדש מרימה רק את מה שנשאר. כיוון שה-upsert הוא idempotent, עיבוד מחדש של רשומות שכבר טופלו לא עולה כלום. לעולם אל תעטפו את כל ה-batch בטרנזקציה אחת ששורה רעה אחת יכולה לבצע לה rollback - עבדו ב-chunks ותנו לכל chunk להצליח או להיכשל באופן עצמאי.

בחירת אחסון: MongoDB מול PostgreSQL מול warehouse

אין תשובה אוניברסלית, רק התאמה. אני בוחר לפי צורת הדאטה ואיך היא תתושאל.

  • MongoDB - ברירת המחדל שלי כשה-schema של המקור מבולגן או מתפתח. דפים מגורדים נדירות בעלי צורה קבועה, ומודל מסמכים גמיש מאפשר לי לקלוט קודם ולהדק את ה-schema אחר כך עם כללי validation. מצוין לדאטה מקוננת (מוצר עם וריאנטים, חברה עם כמה אנשי קשר).
  • PostgreSQL - כשיחסים ושלמות חשובים, או כשהצרכן הוא כלי BI שמצפה ל-SQL. constraints חזקים, upserts עם ON CONFLICT ועמודות JSONB נותנים לכם דרך אמצע גמישה-אך-מחמירה.
  • warehouse (BigQuery, Snowflake, Redshift) - כשהמטרה היא analytics בקנה מידה גדול מעל snapshots היסטוריים ולא שירות לאפליקציה. בדרך כלל אני נוחת עם דאטה נקייה ב-Mongo או Postgres לצד התפעולי ומסנכרן אגרגטים ל-warehouse לניתוח.

Incremental מול full refresh

full refresh מגרד מחדש ובונה הכל מאפס. זה פשוט ומרפא את עצמו - כל טעות מהעבר מתוקנת - אבל זה יקר ואיטי, והוא הולם במקור. טעינה incremental מושכת רק את מה שהשתנה מאז הריצה האחרונה, באמצעות cursor כמו חותמת זמן, watermark של ID, או lastmod מ-sitemap. incremental זול ועדין יותר על אתר היעד, אבל הוא יכול לסטות אם מפספסים עדכונים. הפשרה הרגילה שלי: incremental בלוח זמנים תכוף (כל שעה או יום) בתוספת full refresh תקופתי (שבועי או חודשי) כרשת ביטחון שתתפוס כל מה שהריצה ה-incremental פספסה.

Scheduling ו-orchestration

ל-pipeline בודד, cron job שקורא ל-entrypoint של פייתון הוא באמת בסדר גמור - אל תושיטו יד לכלים כבדים לפני שצריך. ברגע שיש כמה שלבים תלויים, retries ו-backfills, orchestrator מצדיק את עצמו: Apache Airflow, Prefect או Dagster נותנים לכם DAG של משימות, retries אוטומטיים עם backoff, ו-UI שמראה מה רץ ומתי. אני מתייחס ל-orchestration כאותה דיסציפלינה כמו שאר עבודת האוטומציה שלי - העקרונות עוברים ישירות מאוטומציית תהליכים עסקיים עם פייתון. כל משימה צריכה להיות idempotent ובת-retry עצמאי, כך שכשל בשלב ארבע לעולם לא יכריח אתכם להריץ מחדש את שלבים אחת עד שלוש.

Observability ובדיקות איכות דאטה

pipeline שאי אפשר לראות לתוכו ייכשל בשקט, וכשל שקט הוא הסוג הגרוע ביותר כי אתם סומכים על דאטה ישנה בלי לדעת. אני מכשיר כל ריצה עם המטריקות המשעממות שבאמת תופסות בעיות: שורות נכנסות מול שורות יוצאות, שיעור דחיית validation, שיעור כפילויות, ו-freshness (בן כמה הרשומה החדשה ביותר). מעל אלה אני מוסיף assertions של איכות דאטה - ספירת השורות בתוך טווח צפוי, לא יותר מ-X אחוז של שדה חובה הוא null, מחירים נופלים בטווח סביר. כש-assertion נכשל, ה-pipeline מתריע, ובהתאם לחומרה, עוצר במקום לדרוס דאטה טובה ברעה. המטרה היא שאני אלמד שהמקור נשבר מההתראה שלי, לא מהלקוח.

ארכיטקטורת ייחוס

כשמחברים הכל, הנה הצורה שאני חוזר אליה שוב ושוב:

  1. סקרייפרים כותבים payloads גולמיים ל-raw layer (object storage או raw_pages), מתויגים ב-URL ובחותמת זמן.
  2. פרסר קורא את ה-raw layer ופולט רשומות מובנות - פונקציה טהורה ובת-הרצה-מחדש.
  3. שער validation: רשומות תקינות ממשיכות, נדחים הולכים ל-quarantine עם סיבה.
  4. Transform: ניקוי, נרמול, deduplication למפתח עסקי, העשרה.
  5. Idempotent upsert ל-system of record (Mongo או Postgres) מאחורי index ייחודי.
  6. Orchestrator (cron או Airflow/Prefect) מריץ טעינות incremental בתכיפות ו-full refresh תקופתית.
  7. ניטור עוקב אחר ספירות, freshness ו-assertions של איכות, ומתריע על drift.

סיכום

ההבדל בין סקרייפר ל-data pipeline הוא ההבדל בין דאטה שאתם מקווים שהיא נכונה לבין דאטה שאפשר לבנות עליה עסק. שמרו את ה-raw layer, בצעו validation בגבול, בצעו deduplication על מפתח יציב כך שכל כתיבה היא idempotent, בחרו אחסון שמתאים לצורת הדאטה, שלבו טעינות incremental עם full refresh תקופתי, ועקבו אחר freshness ואיכות בלי הרף. עשו זאת, והרצה מחדש תמיד בטוחה ומקור שבור מכריז על עצמו במקום להרעיל בשקט את המספרים שלכם.

אם יש לכם דאטה מגורדת שצריכה להפוך לפיד אמין ומתוזמן שהצוות שלכם יכול לסמוך עליו, אני יכול לעזור לכם לעצב ולבנות את ה-pipeline מקצה לקצה. קבעו שיחה וספרו לי עם מה אתם עובדים, או דרך טופס הקשר.

#data pipeline#etl#web scraping#mongodb#data engineering

שאלות נפוצות

מה ההבדל בין data pipeline לבין סתם הרצת סקרייפר?

סקרייפר מחזיר שורות גולמיות; pipeline מבצע validation, ניקוי, deduplication, העשרה, אחסון וניטור של הדאטה בלוח זמנים כך שהיא נשארת אמינה. ה-pipeline סופג דאטה מלוכלכת ומשתנה במקום אחד כך שצרכנים בהמשך תמיד מקבלים רשומות נקיות, מטופסות וניתנות לתשאול.

איך הופכים כתיבות של דאטה מגירוד ל-idempotent?

גזרו מפתח עסקי יציב מהדאטה עצמה (domain מנורמל, טלפון, או hash של name+city), ואז בצעו upsert על המפתח עם index ייחודי מאחוריו. ב-MongoDB השתמשו ב-UpdateOne עם upsert=True ו-$setOnInsert; ב-PostgreSQL השתמשו ב-INSERT ... ON CONFLICT DO UPDATE. הרצה חוזרת של אותו batch משאירה את מסד הנתונים זהה.

כדאי לאחסן דאטה מגירוד ב-MongoDB או ב-PostgreSQL?

השתמשו ב-MongoDB כשה-schema של המקור מבולגן או מתפתח והדאטה מקוננת, כי אפשר לקלוט קודם ולהדק validation אחר כך. השתמשו ב-PostgreSQL כשיחסים, constraints מחמירים, או כלי BI מבוססי SQL חשובים. ל-analytics היסטורי בקנה מידה גדול, סנכרנו דאטה נקייה ל-warehouse כמו BigQuery או Snowflake.

מתי להשתמש בטעינות incremental לעומת full refresh?

השתמשו בטעינות incremental בלוח זמנים תכוף כדי למשוך רק את מה שהשתנה, מה שזול ועדין יותר על המקור. הוסיפו full refresh תקופתי (שבועי או חודשי) כרשת ביטחון שתתקן drift או עדכונים שהריצה ה-incremental פספסה. שילוב של שניהם נותן יעילות עלות וגם ריפוי עצמי.

להמשך קריאה

יש לך פרויקט דומה?

ספר לי מה אתה מנסה להפוך לאוטומטי או לבנות, ואומר לך מהי הדרך המהירה והאמינה ביותר ליישם את זה.