Автор: Джошан Котни

Введение

При работе с данными основным фактором, который следует учитывать, является качество данных. Особенно в среде больших данных качество данных имеет решающее значение. Наличие неточных или ошибочных данных приведет к неверным результатам в анализе данных. Многие разработчики проверяют данные вручную, прежде чем обучать свою модель на доступных данных. Это отнимает много времени, и есть вероятность совершения ошибок.

Дикью

Deequ — это фреймворк с открытым исходным кодом для тестирования качества данных. Он построен на основе Apache Spark и предназначен для масштабирования до больших наборов данных. Deequ разработан и используется в Amazon для проверки качества многих крупных производственных наборов данных. Система регулярно вычисляет показатели качества данных, проверяет ограничения, установленные производителями наборов данных, и в случае успеха публикует наборы данных для потребителей.

Deequ позволяет нам рассчитать показатели качества данных в нашем наборе данных, а также позволяет нам определять и проверять ограничения качества данных. Он также указывает, какие проверки ограничений должны выполняться для ваших данных. В Deequ есть три важных компонента. Это предложение ограничений, проверка ограничений и вычисление метрик. Это изображено на изображении ниже.

Этот блог содержит подробное объяснение этих трех компонентов с помощью практических примеров.

1.Проверка ограничений.Вы можете указать собственный набор ограничений качества данных, которые вы хотите проверить на своих данных. Deequ проверяет все предоставленные ограничения и сообщает статус каждой проверки.

2. Предложение по ограничению.Если вы не знаете, какие ограничения тестировать на своих данных, вы можете воспользоваться предложением по ограничению Deequ. Предложение ограничений предоставляет список возможных ограничений, которые вы можете протестировать на своих данных. Вы можете использовать эти предложенные ограничения и передать их в проверку ограничений Deequ, чтобы проверить качество ваших данных.

3. Вычисление показателей.Вы также можете регулярно вычислять показатели качества данных.

Теперь давайте реализуем их с некоторыми примерами данных.

Для этого примера мы загрузили образец CSV-файла со 100 записями. Код запускается с помощью Intellij IDE (также можно использовать Spark Shell).

Добавить библиотеку Deequ

Вы можете добавить приведенную ниже зависимость в свой pom.xml.

‹зависимость›
‹groupId›com.amazon.deequ‹/groupId›
‹artifactId›deequ‹/artifactId›
‹версия›1.0.2‹/версия›
‹ /зависимость›

Если вы используете Spark Shell, вы можете загрузить банку Deequ, как показано ниже:

wget http://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.1/deequ-1.0.2.jar

Теперь давайте запустим сеанс Spark и загрузим файл csv в фрейм данных.

val spark= SparkSession.builder()
.master(“local”)
.appName(“deequ-Tests”).getOrCreate()
val data = spark.read.format("csv")
.option("header",true)
.option ("разделитель", "")
.option("inferschema",true)
.load("C:/Users/Downloads/100SalesRecords.csv")
data.show( 5)

Теперь данные загружены во фрейм данных.

Примечание.Убедитесь, что в именах столбцов нет пробелов. Deequ выдаст ошибку, если в именах столбцов есть пробелы.

Проверка ограничений
Давайте проверим наши данные, определив набор ограничений качества данных.

Здесь мы дали проверку дублирования (isUnique), проверку количества (hasSize), проверку типа данных (hasDataType) и т. д. для столбцов, которые мы хотим протестировать.

Мы должны импортировать пакет проверки Deequ и передать наши данные в этот пакет. Только тогда мы можем указать проверки, которые мы хотим протестировать на наших данных.

import com.amazon.deequ.VerificationResult.checkResultsAsDataFrame
import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.constraints.ConstrainableDataTypes< br /> import com.amazon.deequ.{VerificationResult, VerificationSuite}//Проверка ограничений
val verificationResult: VerificationResult = {
VerificationSuite()
.onData(data) //наши входные данные для проверки
//проверка качества данных
.addCheck(
Check(CheckLevel.Ошибка, "Проверка проверки")
.isUnique("OrderID")
.hasSize(_ == 100)
.hasDataType("UnitPrice",ConstrainableDataTypes.String)
.hasNumberOfDistinctValues("OrderDate",_›=5)
.isNonNegative("UnitCost"))
.run()
}

При успешном выполнении он отображает следующий результат. Он покажет каждый статус проверки и предоставит сообщение, если какое-либо ограничение не работает.

Используя приведенный ниже код, мы можем преобразовать результаты проверки во фрейм данных.

//Преобразование результатов проверки в DataFrame val VerificationResultDf =
checkResultsAsDataFrame(spark, VerificationResult)
VerificationResultDf.show(false)

Предложение об ограничении

Deequ может предоставить возможные проверки ограничений качества данных для проверки ваших данных. Для этого нам нужно импортировать ConstraintSuggestionRunner и передать ему наши данные.

import com.amazon.deequ.checks.{Check, CheckLevel}
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}//Constraint Suggestion
val OfferingResult = {
ConstraintSuggestionRunner()
.onData(data)
.addConstraintRules(Rules.DEFAULT)
.run()
}

Теперь мы можем исследовать ограничения, предложенные Дику.

импортировать spark.implicits._
val OfferingDataFrame = предложениеResult.constraintSuggestions.flatMap {
случай (столбец, предложения) =›
предложения.карта { ограничение =›< br /> (столбец, ограничение.описание, ограничение.codeForConstraint)
}
}.toSeq.toDS()

При успешном выполнении мы можем увидеть результат, как показано ниже. Он предоставляет автоматические предложения по вашим данным.

suggestionDataFrame.show(4)

Вы также можете передать эти предложенные Deequ ограничения в VerificationSuite, чтобы выполнить все проверки, предоставляемые SuggestionRunner. Это показано в следующем коде.

val allConstraints = предложениеResult.constraintSuggestions
.flatMap { case (_, предложения) => предложения.карта { _.constraint }}
.toSeq
val generateCheck = Check(CheckLevel. Ошибка, «сгенерированные ограничения», allConstraints)//передача сгенерированных проверок в VerificationSuite
val VerificationResult = VerificationSuite()
.onData(data)
.addChecks(Seq(generatedCheck))
.run()
val resultDf = checkResultsAsDataFrame(spark, verifyResult )

Запуск приведенного выше кода проверит все ограничения, которые предложил Deequ, и предоставит статус каждой проверки ограничений, как показано ниже.

результатDf.show(4)

Вычисление показателей

Вы можете вычислять метрики с помощью Deequ. Для этого вам нужно импортировать AnalyzerContext.

import com.amazon.deequ.analyzers._
import com.amazon.deequ.analyzers.runners.AnalyzerContext.successMetricsAsDataFrame
import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}// Расчет показателей
val analysisResult: AnalyzerContext = {
AnalysisRunner
// данные для выполнения анализа
.onData(data)
/ / определить анализаторы, вычисляющие метрики Maximum("TotalRevenue"))))
.run()
}
val metricsDf = successMetricsAsDataFrame(spark, analysisResult)

После успешного запуска вы можете увидеть результаты, как показано ниже.

metricsDf.show(false)

Вы также можете сохранить метрики, которые вы вычислили для своих данных. Для этого вы можете использовать репозиторий метрик Deequ. Чтобы узнать больше об этом репозитории, нажмите здесь.

Заключение

В целом, Deequ имеет много преимуществ. Мы можем рассчитать метрики данных, определить и проверить ограничения качества данных. Даже большие наборы данных, состоящие из миллиардов строк данных, можно легко проверить с помощью Deequ. Помимо проверки качества данных, Deequ также обеспечивает обнаружение анамолии и вычисление дополнительных показателей.

Первоначально опубликовано на https://www.tigeranalytics.com 3 сентября 2020 г.