こんな時間からBlogを書き出してしまった、、、akuwanoです。

データパイプラインの面倒を見ている方、、、こんな事はありませんか?
「あれ、このデータ、なんか変な値が入ってるじゃん、、、」とか「ストリーミングでじゃんじゃんデータが流れてくるのに品質チェックってどうやるの?」

そう、今日はそんな悩みを解決してくれるかもしれない、DatabricksのLabのOSS「DQX」についてご紹介!

データ品質チェックの理想と現実

データエンジニアの皆さん、データ品質のチェックってどうやってます?

「まずはデータを取り込んで、、、後で集計してチェックせな、、、あ、問題だ、、、遡って調査しますね、、、」というのはキツイですよね!

そんな時こう思うわけです「データが入ってくるタイミングでリアルタイムに品質チェックできたらいいのになぁ」

そんなときこそDQXです!

DQX参上!

DQXはDatabricks Labsが開発したPySparkデータフレーム向けのデータ品質チェックフレームワークとなっています。

従来のデータ品質フレームワークと何が違うかというとこれ、これがでかい。

「データが処理される瞬間にリアルタイムで品質チェックができる」

要するに、「悪いデータが下流に流れる前に食い止められる」です。
これは地味にメチャクチャ重要で、後から問題を見つけて修正するより、最初から悪いデータをブロックして後でゆっくり対応したほうが圧倒的に効率的な訳です。

DQXのちょっと良いとこ見てみたい!

飲みませんw
いいとこいっていきますw

リアルタイムでのデータ品質チェック

ストリーミングデータに対してもバッチデータに対してもリアルタイムで品質チェック可能!これまでのツールって「完全なデータセット」を前提にしているものが多くて、ストリーミングワークロードには使いづらいパターンも多かったと思います。

隔離機能(Quarantine)

問題のあるデータを自動で隔離して、正常なデータだけを下流に流すことができます。これがDead-Letter patternっていうやつです。

公式絵を訳したものですが、図で表すとこんな感じですね。

サンプルコードで説明してこ!

基本的なセットアップから見ていきましょう!

インストールと基本セットアップ

# DQXのインストール
%pip install databricks-labs-dqx
# 必要なライブラリをインポート
from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
from databricks.labs.dqx.rule import DQRowRule
from databricks.labs.dqx import check_funcs
import yaml
# WorkspaceClientを初期化(共通で使う)
workspace_client = WorkspaceClient()
# エンジンの初期化
dq_engine = DQEngine(workspace_client)

サンプルデータです

問題のあるデータを含んだサンプルデータフレームを作ってみます。

# 問題のあるサンプルデータを作成
df = spark.createDataFrame([
    (1, "Alice", "alice@example.com", 30, "2022-01-15", "Female"),
    (2, "Bob", "bob@example.com", 25, "2022-14-01", "Male"),  # 不正な日付
    (3, "Charlie", "charlie@example.com", None, "2022-03-01", "Femail"),  # 年齢がNull、性別にタイポ
    (4, "Joanna", "joice@example.com", 30, "2022-01-15", "Female"),
    (5, "Eve", None, 200, "2022-02-30", "Female"),  # メールがNull、年齢が異常、不正な日付
    (None, "Frank", "frank@example.com", 28, "2022-05-20", "M"),  # IDがNull
], ["id", "name", "email", "age", "signup_date", "gender"])

データプロファイリングで自動ルール生成

ここもDQXの面白いところで、データを分析して自動でバリデーションルールを生成してくれたりもします。

# WorkspaceClientを共通で使用
workspace_client = WorkspaceClient()
# データプロファイリング(正しいメソッド名はprofile)
profiler = DQProfiler(workspace_client)
summary_stats, profiles = profiler.profile(df)
# バリデーションルールの自動生成(正しいメソッド名はgenerate_dq_rules)
generator = DQGenerator(workspace_client)
checks = generator.generate_dq_rules(profiles)
print("自動生成されたルール:")
print(checks)

以下のようなルールが自動生成されます。is_not_nullとかは自動で全部のカラムに入ったりしますね。

「この列はNullチェックが必要だな」って自動で判断してくれる処理がはいってくるんすわ!勿論ここから手動で更新していってもいいですが、シュッと導入できていいですよね。

# 自動生成されたルールの例
[
    {
        'check': {
            'function': 'is_not_null', 
            'arguments': {'column': 'id'}
        }, 
        'name': 'id_is_null', 
        'criticality': 'error'
    },
    {
        'check': {
            'function': 'is_not_null', 
            'arguments': {'column': 'name'}
        }, 
        'name': 'name_is_null', 
        'criticality': 'error'
    },
    {
        'check': {
            'function': 'is_not_null', 
            'arguments': {'column': 'email'}
        }, 
        'name': 'email_is_null', 
        'criticality': 'error'
    },
    {
        'check': {
            'function': 'is_not_null', 
            'arguments': {'column': 'age'}
        }, 
        'name': 'age_is_null', 
        'criticality': 'error'
    },
    {
        'check': {
            'function': 'is_not_null', 
            'arguments': {'column': 'signup_date'}
        }, 
        'name': 'signup_date_is_null', 
        'criticality': 'error'
    },
    {
        'check': {
            'function': 'is_not_null', 
            'arguments': {'column': 'gender'}
        }, 
        'name': 'gender_is_null', 
        'criticality': 'error'
    }
]

パターン1: 悪いデータを隔離(Quarantine)

問題のあるデータを別のテーブルに隔離して、正常なデータだけを下流に流してみましょう。

# 生成されたルールで品質チェックを実行(隔離パターン)
# 辞書形式のchecksの場合は apply_checks_by_metadata_and_split を使用
valid_data, invalid_data = dq_engine.apply_checks_by_metadata_and_split(df, checks)
print(f"正常データ件数: {valid_data.count()}")
print(f"問題データ件数: {invalid_data.count()}")
# 問題のあるデータの詳細確認
if invalid_data.count() > 0:
    invalid_data.select("id", "name", "_errors").show(truncate=False)

結果はこれ。エラーに引っかかった物が隔離されてるのがわかります。

正常データ件数: 3
問題データ件数: 3
+----+-------+-----------------------------------------------------------------------------------------------------------+
|id  |name   |_errors                                                                                                    |
+----+-------+-----------------------------------------------------------------------------------------------------------+
|3   |Charlie|[{age_is_null, Column 'age' value is null, [age], NULL, is_not_null, 2025-09-28 14:03:09.079234, {}}]      |
|5   |Eve    |[{email_is_null, Column 'email' value is null, [email], NULL, is_not_null, 2025-09-28 14:03:09.079234, {}}]|
|NULL|Frank  |[{id_is_null, Column 'id' value is null, [id], NULL, is_not_null, 2025-09-28 14:03:09.079234, {}}]         |
+----+-------+-----------------------------------------------------------------------------------------------------------+

パターン2: 悪いデータにアノテーション

問題のあるデータに「なぜ問題なのか」の情報を追加して、全データを保持する方法です。

# 全データを保持してアノテーション
# 辞書形式のchecksの場合は apply_checks_by_metadata を使用
annotated_data = dq_engine.apply_checks_by_metadata(df, checks)
# エラー情報付きのデータを確認
annotated_data.select("id", "name", "email", "_errors", "_warnings").show(truncate=False)

結果はこれ。エラーが出ているものは_errorsにエラーの理由がアノテーションされてますね。

+----+-------+-------------------+-----------------------------------------------------------------------------------------------------------+---------+
|id  |name   |email              |_errors                                                                                                    |_warnings|
+----+-------+-------------------+-----------------------------------------------------------------------------------------------------------+---------+
|1   |Alice  |alice@example.com  |NULL                                                                                                       |NULL     |
|2   |Bob    |bob@example.com    |NULL                                                                                                       |NULL     |
|3   |Charlie|charlie@example.com|[{age_is_null, Column 'age' value is null, [age], NULL, is_not_null, 2025-09-28 14:03:09.079234, {}}]      |NULL     |
|4   |Joanna |joice@example.com  |NULL                                                                                                       |NULL     |
|5   |Eve    |NULL               |[{email_is_null, Column 'email' value is null, [email], NULL, is_not_null, 2025-09-28 14:03:09.079234, {}}]|NULL     |
|NULL|Frank  |frank@example.com  |[{id_is_null, Column 'id' value is null, [id], NULL, is_not_null, 2025-09-28 14:03:09.079234, {}}]         |NULL     |
+----+-------+-------------------+-----------------------------------------------------------------------------------------------------------+---------+

コードベースでルール定義

YAMLじゃなくてPythonのコードでルールを定義することも可能です!
コードで定義したい場合はこちらで。

# まず利用可能なクラスを確認
from databricks.labs.dqx import rule
print("利用可能なクラス:", dir(rule))
# 実際の例
from databricks.labs.dqx.rule import DQRowRule
from databricks.labs.dqx import check_funcs
# Pythonでルール定義
rules = [
    DQRowRule(
        name="id_not_null",
        criticality="error",
        check_func=check_funcs.is_not_null_and_not_empty,
        column="id"
    ),
    DQRowRule(
        name="valid_gender",
        criticality="error", 
        check_func=check_funcs.is_in_list,
        column="gender",
        check_func_kwargs={"allowed": ["Male", "Female"]}
    ),
    DQRowRule(
        name="age_range",
        criticality="warn",  
        check_func=check_funcs.is_in_range,
        column="age",
        check_func_kwargs={"min_limit": 0, "max_limit": 120}
    )
]
# ルールを適用
valid_data, invalid_data = dq_engine.apply_checks_and_split(df, rules)

レイクハウスアーキテクチャでの活用

データレイクハウスアーキテクチャの中では、Bronze(生データ)からSilver(整理されたデータ)レイヤーにデータが移る段階で品質チェックを行うのがベストプラクティスです。

DQXを使えば、悪いデータが後続のレイヤー(Silver、Gold)に伝播することを防げるし、隔離されたデータは後でキュレーションして再取り込みすることも可能です。

要するに「汚いデータを下流に流さない」仕組みを作れるわけです。

いつDQXを使うべき?

こんな場面でDQXが威力を発揮すると思います!

DQXを使うべき場面、使うべきではない場面:

DQXを使うべき場面については以下のようなパターンが有るかと思います。

  • データがターゲットテーブルに書き込まれる前にチェックしたい(プロアクティブな監視)
  • ストリーミングデータの品質チェックが必要
  • カスタムの品質ルールを定義したい

逆に使うべきではない、というか他のサービスのほうが適切な部分としては以下です。

  • すでにDelta Tableに保存されているデータの監視→ Databricks Lakehouse Monitoring
  • Lakeflow 宣言的パイプライン内での品質チェック→ まずはExpectationsを検討

まとめ

データ品質管理って「後から気づいて火消し」になりがちですが、DQXを使えば「事前に防ぐ」アプローチに切り替える事が可能です!

特にストリーミングデータを扱っている方や、データ品質の問題でつらい思いをされた方は、ぜひ試してもらいたいです!

データ品質の問題は、早期発見・早期対応が鉄則です。DQXでプロアクティブなデータ品質管理を始めてみませんか?

そんなこんなで、データ品質に悩むエンジニアの皆さんの一助となれば幸いです!

DQXはDatabricksのすべてのエンジン(Spark Core、Spark Structured Streaming、Lakeflow 宣言的パイプライン)とクラスタタイプ(標準、サーバーレス)で動作します。

既存のDatabricksワークフローにサッと組み込めるのは実用性の観点で重要ですよね!と、宣伝もしつつ、、、ではでは!

“データ品質チェックを簡単にやってこー。DQXでスマートなデータ品質管理を!”. への1件のコメント

コメントを残す

Trending