Как да използваме Neo4j Streams и да изградим точно навреме склад за данни с Apache Kafka

Снимка на Ванеса Охоторена на Unsplash

В тази статия ще покажем как да създадем Just-in-Time Data Warehouse, като използваме Neo4j и модула Neo4j Streams с структурираното поточно предаване на Apache Spark Apis и Apache Kafka.

За да покажа как да ги интегрирате, да опростите интеграцията и да ви позволя да тествате целия проект на ръка, ще използвам Apache Zeppelin бегач за преносими компютри, който позволява просто да взаимодействат с Neo4j.

Крайният резултат: как Apache Spark се събира кафка събитие, предавано от Neo4j

Използване на Neo4j потоци

Проектът Neo4j Streams е съставен от три основни стълба:

  • Променете Заснемането на данни (предмет на тази първа статия), която ни позволява да предаваме промените в базата данни по теми на Kafka
  • Мивката, която позволява консумация на потоци от данни от темата Kafka
  • Набор от процедури, които ни позволяват да произвеждаме / консумираме данни от / от Kafka Topics

Какво е заснемане на промяна на данни?

Това е система, която автоматично улавя промените от изходна система (например база данни) и автоматично предоставя тези промени в низходящи системи за различни случаи на използване.

CDC обикновено представлява част от тръбопровода ETL. Това е важен компонент, който гарантира, че Складовете на данни (DWH) се актуализират с всички промени в записа.

Също така традиционно CDC приложения, използвани за отработване на регистрационните файлове на транзакциите, като по този начин ни позволяват да репликираме бази данни, без да оказваме голямо влияние върху производителността върху нейната работа.

Как CDC модулът Neo4j Streams се справя с промените в базата данни?

Всяка транзакция вътре в Neo4j се улавя и трансформира, за да поточно атомен елемент от транзакцията.

Да предположим, че имаме просто създаване на два възли и една връзка между тях:

СЪЗДАВАНЕ (andrea: Лице {name: "Andrea"}) - [знае: ЗНАЕ {от: 2014}] -> (michael: Person {name: "Michael"})

CDC модулът ще преобразува тази транзакция в 3 събития (създаване на 2 възли, създаване на 1 връзка).

Структурата на събитието е вдъхновена от Debezium формат и има следната обща структура:

{
  "meta": {/ * метаданни за транзакции * /},
  "полезен товар": {/ * данните, свързани с транзакцията * /
    "преди": {/ * данните преди транзакцията * /},
    "след": {/ * данните след транзакцията * /}
  }
}

Източник на възел (andrea):

{
  "мета": {
    „времева марка“: 1532597182604,
    "потребителско име": "neo4j",
    "tx_id": 1,
    "tx_event_id": 0,
    "tx_events_count": 3,
    "операция": "създаден",
    "източник": {
      "име на хост": "neo4j.mycompany.com"
    }
  }
  "полезен товар": {
    "id": "1004",
    "type": "възел",
    "след": {
      "етикети": ["Лице"],
      "Имоти": {
        "име": "Андреа"
      }
    }
  }
}

Целева точка на възела (Майкъл):

{
  "мета": {
    „времева марка“: 1532597182604,
    "потребителско име": "neo4j",
    "tx_id": 1,
    "tx_event_id": 1,
    "tx_events_count": 3,
    "операция": "създаден",
    "източник": {
      "име на хост": "neo4j.mycompany.com"
    }
  }
  "полезен товар": {
    "id": "1006",
    "type": "възел",
    "след": {
      "етикети": ["Лице"],
      "Имоти": {
        "име": "Майкъл"
      }
    }
  }
}

Връзката знае:

{
  "мета": {
    „времева марка“: 1532597182604,
    "потребителско име": "neo4j",
    "tx_id": 1,
    "tx_event_id": 2,
    "tx_events_count": 3,
    "операция": "създаден",
    "източник": {
      "име на хост": "neo4j.mycompany.com"
    }
  }
  "полезен товар": {
    "id": "1007",
    "type": "връзка",
    "етикет": "ЗНАЕ",
    "старт": {
      "етикети": ["Лице"],
      "id": "1005"
    }
    "край": {
      "етикети": ["Лице"],
      "id": "106"
    }
    "след": {
      "Имоти": {
        "от": 2014г
      }
    }
  }
}

По подразбиране всички данни ще бъдат предавани на тема neo4j. CDC модулът позволява да се контролират кои възли се изпращат до Kafka и кои от техните свойства искате да изпратите към темата:

streams.source.topic.nodes.  = <ОБРАЗЕЦ>

Със следния пример:

streams.source.topic.nodes.products = Продукт {име, код}

CDC модулът ще изпрати на темата за продуктите всички възли, които имат етикета Product. След това изпраща към тази тема само промените относно свойствата на името и кода. Моля, посетете официалната документация за пълно описание как работи филтрирането на етикети.

За по-задълбочено описание на проекта Neo4j Streams и как / защо ние от LARUS и Neo4j го изградихме, вижте тази статия, която предоставя подробно описание.

Отвъд традиционния склад за данни

Традиционната DWH изисква екипи от данни постоянно да изграждат множество скъпи и отнемащи време тръбопроводи за извличане на трансформация (ETL), за да получат в крайна сметка бизнес информация.

Една от най-големите болкови точки е, че поради своята твърда архитектура, която е трудно да се промени, Enterprise Data Warehouses са присъщи твърди. Това е защото:

  • те се базират на архитектурата Schema-On-Write: първо определяте схемата си, след това пишете данните си, след това четете данните си и тя се връща в схемата, която сте дефинирали отпред
  • те се базират на (скъпи) партидни / планирани работни места

Това води до необходимост от изграждане на скъпи и отнемащи време ETL тръбопроводи за достъп и манипулиране на данните. И с въвеждането на нови типове данни и източници, необходимостта от разширяване на вашите тръбопроводи ETL изостря проблема.

Благодарение на комбинацията от обработка на данни в потока с CDC модула Neo4j Streams и подхода Schema-On-Read, осигурен от Apache Spark, можем да преодолеем тази твърдост и да изградим нов вид (гъвкава) DWH.

Смяна на парадигмата: Време за съхранение на данни

Решението JIT-DWH е проектирано така, че лесно да обработва по-голямо разнообразие от данни от различни източници и започва от различен подход за това как да се справяме и управляваме данните: Schema-On-Read.

Schema-On-прочетен

Schema-On-Read следва различна последователност: тя просто зарежда данните такива, каквито са и прилага собствената си леща към данните, когато ги прочетете обратно. С този вид подход можете да представите данни в схема, която е адаптирана най-добре към изпратените заявки. Не сте залепени със схема с един размер за всички. С четене на схема можете да представите данните обратно в схема, която е най-подходяща за задачата, която се изпълнява.

Настройка на околната среда

Отивайки на следното репортаж на Github, ще намерите всичко необходимо, за да повторите това, което представям в тази статия. Това, което ще трябва да започнете, е Докер. След това можете просто да въртите стека, като въведете директорията и от терминала, изпълнявайки следната команда:

$ docker-композиране

Това ще стартира цялата среда, която включва:

  • Neo4j + Neo4j Streams модул + APOC процедури
  • Апаче Кафка
  • Apache Spark
  • Apache Zeppelin
Цялата архитектура, базирана на Docker контейнери

Влизайки в Apache Zeppelin @ http: // localhost: 8080 ще намерите в директорията Medium / Part 1 два тефтера:

  • Създайте хранилище за данни точно в момента: в тази тетрадка ще изградим JIT-DWH
  • Запитване на JIT-DWH: в този бележник ще изпълним някои заявки по JIT-DWH

Случаят на използване:

Ще създадем фалшива социална мрежа като набор от данни. Това ще активира CDC модула на Neo4j Stream и чрез Apache Spark ще прихванем това събитие и ще ги поддържаме във файловата система като JSON.

Тогава ще демонстрираме как новите полета, добавени в нашите възли, ще бъдат автоматично добавени към нашия JIT-DWL без изменение на тръбопровода ETL, благодарение на подхода Schema-On-Read.

Ще изпълним следните стъпки:

  1. Създайте фалшив набор от данни
  2. Изградете нашия тръбопровод за данни, който пресича събитията от Kafka, публикувани от CDC модула Neo4j Streams
  3. Направете първото запитване на нашия JIT-DWH на Spark
  4. Добавете ново поле в нашия графичен модел
  5. Покажете как новото поле се излага автоматично в реално време, благодарение на CDC модула Neo4j Streams (без нужда от промени в нашия тръбопровод ETL благодарение на подхода Schema-On-Read).

Бележник 1: Създайте хранилище за данни в момента

Ще създадем фалшива социална мрежа, като използваме процедурата APOC apoc.periodic.repeat, която изпълнява тази заявка на всеки 15 секунди:

С ["M", "F", ""] AS пол
UNWIND обхват (1, 10) AS id
CREATE (p: Лице {id: apoc.create.uuid (), име: "Name-" + apoc.text.random (10), възраст: кръг (rand () * 100), индекс: id, пол: пол [toInteger (размер (пол) * rand ())]})
С събиране (p) КАК хора
НЕИЗВЕСТВАЩИ хора като p1
UNWIND обхват (1, 3) AS приятел
С p1, хора [(p1.index + приятел)% размер (хора)] AS p2
СЪЗДАВАНЕ (p1) - [: ЗНАЕ {години: кръг (rand () * 10), ангажиран: (rand ()> 0.5)}] -> (p2)

Ако имате нужда от повече подробности за проекта APOC, моля, следвайте този линк.

Така че полученият графичен модел е доста ясен:

Графичният модел

Нека създадем индекс за възела Person:

% neo4j
СЪЗДАВАНЕ НА ИНДЕКС: Лице (идентификатор)

Сега нека да зададем Background Job в Neo4j:

% neo4j
CALL apoc.periodic.repeat ('create-fake-social-data', 'WITH ["M", "F", "X"] AS gender UNWIND range (1, 10) AS id CREATE (p: Person {id : apoc.create.uuid (), име: "Name-" + apoc.text.random (10), възраст: кръг (rand () * 100), индекс: id, пол: пол [toInteger (размер (пол)) * rand ())]}) С събиране (p) КАК хора НЕИЗВЕСТВЕНИ хора КАК p1 НЕИЗВЕСТЕН ​​диапазон (1, 3) КАК приятел С p1, хора [(p1.index + приятел)% размер (хора)] КАК p2 СЪЗДАВАНЕ ( p1) - [: ЗНАЕ {години: кръг (rand () * 10), ангажиран: (rand ()> 0.5)}] -> (p2) ', 15) YIELD име
ВРЪЩА име AS AS created

Тази фонова заявка носи CDC модула Neo4j-Streams, за да предава свързани събития по темата „neo4j“ Kafka (темата по подразбиране на CDC).

Сега нека създадем Структуриран набор от поточни данни, който консумира данните от темата „neo4j“:

val kafkaStreamingDF = (искра
    .readStream
    .format ( "Кафка")
    .option ("kafka.bootstrap.servers", "брокер: 9093")
    .option ("начални компенсации", "най-рано")
    .option ("абонирайте се", "neo4j")
    .load ())

Dataframe kafkaStreamingDF е основно представяне на ProducerRecord. И всъщност схемата му е:

корен
| - ключ: двоичен (нулируем = вярно)
| - стойност: двоична (нулируема = вярна)
| - тема: низ (нулируем = вярно)
| - дял: цяло число (нула = вярно)
| - отместване: дълго (нулиране = вярно)
| - timestamp: timestamp (нулируем = true)
| - timestampType: integer (nullable = true)

Сега нека създадем Структурата на данните, излъчвани от CDC, използвайки API на Spark, за да прочетете поточните данни:

val cdcMetaSchema = (нов StructType ()
    .add („времева марка“, LongType)
    .add ("потребителско име", StringType)
    .add ("операция", StringType)
    .add ("източник", MapType (StringType, StringType, true)))
    
val cdcPayloadSchemaBeforeAfter = (нов StructType ()
    .add ("етикети", ArrayType (StringType, невярно))
    .add ("свойства", MapType (StringType, StringType, true)))
    
val cdcPayloadSchema = (нов StructType ()
    .add ("id", StringType)
    .add ("тип", StringType)
    .add ("етикет", StringType)
    .add ("старт", MapType (StringType, StringType, true))
    .add ("край", MapType (StringType, StringType, true))
    .add ("преди", cdcPayloadSchemaBeforeAfter)
    .add ("след", cdcPayloadSchemaBeforeAfter))
    
val cdcSchema = (нов StructType ()
    .add ("мета", cdcMetaSchema)
    .add ("полезен товар", cdcPayloadSchema))

CdcSchema е подходящ както за събития, така и за връзки.

Това, от което се нуждаем сега, е да извлечем само събитието CDC от Dataframe, така че нека изпълним просто запитване за трансформация през Spark:

val cdcDataFrame = (kafkaStreamingDF)
    .selectExpr ("CAST (стойност AS STRING) КАТО СТОЙНОСТ")
    .select (from_json ('VALUE, cdcSchema) като' JSON))

CdcDataFrame съдържа само една колона JSON, която е данните, излъчвани от CDC модула Neo4j-Streams.

Нека да извършим проста ETL заявка, за да извлечем интересни полета:

val dataWarehouseDataFrame = (cdcDataFrame
    .where ("json.payload.type = 'възел" и (array_contains (nvl (json.payload.after.labels, json.payload.before.labels), "Person")) ")
    .selectExpr ("json.payload.id AS neo_id", "CAST (json.meta.timestamp / 1000 AS Timestamp) AS timestamp",
        "json.meta.source.hostname AS хост",
        "json.meta.operation AS операция",
        "nvl (json.payload.after.labels, json.payload.before.labels) AS етикети",
        "Експлодира (json.payload.after.properties)"))

Тази заявка е доста важна, защото представя как данните ще бъдат упорити във файловата система. Всеки възел ще бъде експлодиран в редица фрагменти от JSON, по един за всяко свойство на възел, точно така:

{ "Neo_id": "35340", "клеймото": "2018-12-19T23: 07: 10.465Z", "гостоприемник": "neo4j", "работа": "създаден", "маркери": [ "човек" ], "ключ": "име", "стойност": "име-5wc62uKO5l"}
{ "Neo_id": "35340", "клеймото": "2018-12-19T23: 07: 10.465Z", "гостоприемник": "neo4j", "работа": "създаден", "маркери": [ "човек" ] ", клавиш": "индекс", "стойност": "8"}
{ "Neo_id": "35340", "клеймото": "2018-12-19T23: 07: 10.465Z", "гостоприемник": "neo4j", "работа": "създаден", "маркери": [ "човек" ] ", клавиш": "ID", "стойност": "944e58bf-0cf7-49cf-af4a-c803d44f222a"}
{ "Neo_id": "35340", "клеймото": "2018-12-19T23: 07: 10.465Z", "гостоприемник": "neo4j", "работа": "създаден", "маркери": [ "човек" ], "ключ": "пол", "стойност": "F"}

Този вид структура може лесно да се превърне в таблично представяне (ще видим в следващите няколко стъпки как да направите това).

Сега нека напишем запитване за непрекъснато стриймиране Spark, което запазва данните във файловата система като JSON:

val writeOnDisk = (dataWarehouseDataFrame
    .writeStream
    .format ( "JSON")
    .option ("checkpointLocation", "/ zeppelin / spark-склад / jit-dwh / checkpoint")
    .option ("path", "/ zeppelin / spark-склад / jit-dwh")
    .queryName ( "възли")
    .start ())

Сега създадохме обикновен JIT-DWH. Във втория тефтер ще научим как да го запитваме и колко е лесно да се справим с динамичните промени в структурите на данните благодарение на схемата за четене.

Бележник 2: Запитване на JIT-DWH

Първият параграф нека да попитаме и да покажем JIT-DWH

val flattenedDF = (spark.read.format ("json"). load ("/ zeppelin / spark-склад / jit-dwh / **")
    .where ("neo_id не е нула")
    .groupBy ("neo_id", "timetamp", "host", "labels", "операция")
    .pivot ( "ключ")
    .agg (първа ($ "стойност")))
z.show (flattenedDF)

Спомняте ли си как запазихме данните в JSON някакъв ред по-горе? Изравненият DF просто завърта JSON над ключовото поле, като по този начин групира данните в 5 колони, които представляват „уникален ключ“ („neo_id“, „timetamp“, „host“, „labels“, „операция“). Това ни позволява да имаме това таблично представяне на изходните данни, както следва:

Резултатът от заявката

А сега си представете, че нашият набор от данни на Person получава ново поле: раждане. Нека добавим това ново поле към един възел; в този случай трябва да изберете идентификатор от вашия набор от данни и да го актуализирате със следния параграф:

Просто попълнете формата с вашите данни и изпълнете абзаца

Сега последната стъпка: използвайте отново същата заявка и филтрирайте DWH по идентификатора, който преди променихме, за да проверим как се промени нашият набор от данни според промените, направени през Neo4j.

Полето за раждане присъства без промени в нашите заявки

Заключения

В тази първа част научихме как да използваме събитията, произведени от CDC модула Neo4j Stream, за да се изгради прост (в реално време) JIT-DWL, който използва подхода Schema-On-Read.

В част 2 ще разберем как да използваме модула Sink, за да поемаме данни в Neo4j директно от Kafka.

Ако вече сте тествали модула Neo4j-Streams или сте го тествали чрез тези тетрадки, моля, попълнете нашето проучване за обратна връзка.

Ако се сблъскате с някакви проблеми или имате мисли за подобряване на нашата работа, моля, повдигнете проблем с GitHub.