はい!こんにちわー!
TROCCOさんのアドベントカレンダーの8日目の記事です!
どういうのかいてみようかなと思ったんですが、初めてのDatabricks連携的な記事もいいかなと思ってたんですが、やめました!(何
2024-12-01 12:55:09.576 +0000 Preparing your trocco environment...
2024-12-01 12:55:09.617 +0000 cpu_request=3.5, memory_request=15.0Gi, cpu_limit=3.5, memory_limit=15.0Gi, disk=200.0Gi
Successfully created your environment
Loading Java Agent version 1 (using ASM9).
2024-12-01 12:55:19.341 +0000: Embulk v0.9.26
2024-12-01 12:55:19.786 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2024-12-01 12:55:20.862 +0000 [INFO] (main): BUNDLE_GEMFILE is being set: "/work/embulk_bundle/Gemfile"
2024-12-01 12:55:20.862 +0000 [INFO] (main): Gem's home and path are
一旦略
はい、これはなんでしょうか?データをS3からDatabricksに取り込んだ時のログです!
このログが何をしているのかを確認していこう、そういうことです!(どういうこと?
ログをみていこう!
全部は大変なので重要そうな所をピックアップしてみていきましょう!
2024-12-01 12:55:22.191 +0000 [INFO] (main): Started Embulk v0.9.26
2024-12-01 12:55:22.352 +0000 [INFO] (0001:transaction): Loaded plugin embulk-input-s3 (0.3.10)
2024-12-01 12:55:22.424 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-databricks (1.1.1)
2024-12-01 12:55:22.474 +0000 [INFO] (0001:transaction): Loaded plugin embulk-filter-speedometer (0.3.6)
2024-12-01 12:55:22.506 +0000 [INFO] (0001:transaction): Loaded plugin embulk-filter-column (0.7.1)
2024-12-01 12:55:22.516 +0000 [INFO] (0001:transaction): Loaded plugin embulk-filter-typecast (0.2.2)
この辺はembulkのモジュール読み込みですね、このあたりもそのうち見ていきたいんですが一旦おいていきます
2024-12-01 12:55:22.723 +0000 [INFO] (0001:transaction): Start listing file with prefix [torocco-dbx/]
2024-12-01 12:55:23.330 +0000 [INFO] (0001:transaction): Found total [1] files
ここはS3のディレクトリですね[torocco-dbx/]の下に1個ファイルがあるよっていっています。
ココで大事なのは僕がtoroccoとtypoしているところです(申し訳ございません😭)
2024-12-01 12:55:25.035 +0000 [INFO] (0001:transaction): SQL: USE CATALOG `kuwano_catalog`
2024-12-01 12:55:25.442 +0000 [INFO] (0001:transaction): SQL: USE SCHEMA `trocco-test`
2024-12-01 12:55:25.762 +0000 [INFO] (0001:transaction): TransactionIsolation=read_uncommitted
ここはカタログとスキーマの設定ですね
2024-12-01 12:55:29.353 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE `trocco-test`.`test-table_00000193824a5348_embulk000` (`c0` STRING, `c1` STRING)
2024-12-01 12:55:31.598 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE `trocco-test`.`test-table_00000193824a5348_embulk001` (`c0` STRING, `c1` STRING)
2024-12-01 12:55:33.168 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE `trocco-test`.`test-table_00000193824a5348_embulk002` (`c0` STRING, `c1` STRING)
2024-12-01 12:55:34.641 +0000 [INFO] (0001:transaction): SQL: CREATE TABLE `trocco-test`.`test-table_00000193824a5348_embulk003` (`c0` STRING, `c1` STRING)
この辺から面白そうなんですが、Tempテーブルを作ってますね。
2024-12-01 12:55:36.299 +0000 [INFO] (0016:task-0000): Connecting to jdbc:databricks://example.cloud.databricks.com:443 options {ConnSchema=trocco-test, UserAgentEntry=primeNumber_TROCCO/1.0.0, UID=token, httpPath=/sql/1.0/warehouses/475b94ddc7cd5211, PWD=***, ConnCatalog=kuwano_catalog, AuthMech=3, SSL=1}
2024-12-01 12:55:36.737 +0000 [INFO] (0016:task-0000): SQL: USE CATALOG `kuwano_catalog`
2024-12-01 12:55:37.018 +0000 [INFO] (0016:task-0000): > 0.28 seconds (-1 rows)
2024-12-01 12:55:37.018 +0000 [INFO] (0016:task-0000): SQL: USE SCHEMA `trocco-test`
2024-12-01 12:55:37.273 +0000 [INFO] (0016:task-0000): > 0.26 seconds (-1 rows)
2024-12-01 12:55:37.274 +0000 [INFO] (0016:task-0000): TransactionIsolation=read_uncommitted
2024-12-01 12:55:37.277 +0000 [INFO] (0016:task-0000): Connecting to jdbc:databricks://example.cloud.databricks.com:443 options {ConnSchema=trocco-test, UserAgentEntry=primeNumber_TROCCO/1.0.0, UID=token, httpPath=/sql/1.0/warehouses/475b94ddc7cd5211, PWD=***, ConnCatalog=kuwano_catalog, AuthMech=3, SSL=1}
2024-12-01 12:55:37.710 +0000 [INFO] (0016:task-0000): SQL: USE CATALOG `kuwano_catalog`
2024-12-01 12:55:37.968 +0000 [INFO] (0016:task-0000): > 0.26 seconds (-1 rows)
2024-12-01 12:55:37.968 +0000 [INFO] (0016:task-0000): SQL: USE SCHEMA `trocco-test`
2024-12-01 12:55:38.198 +0000 [INFO] (0016:task-0000): > 0.23 seconds (-1 rows)
2024-12-01 12:55:38.198 +0000 [INFO] (0016:task-0000): TransactionIsolation=read_uncommitted
2024-12-01 12:55:38.200 +0000 [INFO] (0016:task-0000): Connecting to jdbc:databricks://example.cloud.databricks.com:443 options {ConnSchema=trocco-test, UserAgentEntry=primeNumber_TROCCO/1.0.0, UID=token, httpPath=/sql/1.0/warehouses/475b94ddc7cd5211, PWD=***, ConnCatalog=kuwano_catalog, AuthMech=3, SSL=1}
2024-12-01 12:55:38.633 +0000 [INFO] (0016:task-0000): SQL: USE CATALOG `kuwano_catalog`
2024-12-01 12:55:38.889 +0000 [INFO] (0016:task-0000): > 0.26 seconds (-1 rows)
2024-12-01 12:55:38.889 +0000 [INFO] (0016:task-0000): SQL: USE SCHEMA `trocco-test`
2024-12-01 12:55:39.104 +0000 [INFO] (0016:task-0000): > 0.21 seconds (-1 rows)
2024-12-01 12:55:39.104 +0000 [INFO] (0016:task-0000): TransactionIsolation=read_uncommitted
2024-12-01 12:55:39.106 +0000 [INFO] (0016:task-0000): Connecting to jdbc:databricks://example.cloud.databricks.com:443 options {ConnSchema=trocco-test, UserAgentEntry=primeNumber_TROCCO/1.0.0, UID=token, httpPath=/sql/1.0/warehouses/475b94ddc7cd5211, PWD=***, ConnCatalog=kuwano_catalog, AuthMech=3, SSL=1}
2024-12-01 12:55:39.550 +0000 [INFO] (0016:task-0000): SQL: USE CATALOG `kuwano_catalog`
2024-12-01 12:55:39.804 +0000 [INFO] (0016:task-0000): > 0.25 seconds (-1 rows)
2024-12-01 12:55:39.804 +0000 [INFO] (0016:task-0000): SQL: USE SCHEMA `trocco-test`
2024-12-01 12:55:40.057 +0000 [INFO] (0016:task-0000): > 0.25 seconds (-1 rows)
2024-12-01 12:55:40.057 +0000 [INFO] (0016:task-0000): TransactionIsolation=read_uncommitted
2024-12-01 12:55:40.079 +0000 [INFO] (0016:task-0000): {speedometer: {active: 0, total: 0.0b, sec: 0.00, speed: 0.0b/s, records: 0, record-speed: 0/s}}
2024-12-01 12:55:40.600 +0000 [INFO] (0016:task-0000): Open S3Object with bucket [bucket-name], key [torocco-dbx/test0001.csv], with size [34]
この辺何をやっているのかログだけだと読み取きれないんですが、最後はS3のCSVをとってきてるのがわかりますね
2024-12-01 12:55:40.607 +0000 [INFO] (pool-4-thread-1): Uploading file /Volumes/kuwano_catalog/trocco-test/embulk_output_databricks_20241201125523TEST_4d4b274057ae449784e74fcbafaa561c/20241201125540TEST_38c792c352684db3b727bada5e12b2d7 to managed volume (35 bytes 4 rows)
2024-12-01 12:55:40.609 +0000 [INFO] (pool-4-thread-1): Trying pat auth
2024-12-01 12:55:41.226 +0000 [INFO] (pool-4-thread-1): Uploaded file /Volumes/kuwano_catalog/trocco-test/embulk_output_databricks_20241201125523TEST_4d4b274057ae449784e74fcbafaa561c/20241201125540TEST_38c792c352684db3b727bada5e12b2d7 (0.62 seconds)
2024-12-01 12:55:41.226 +0000 [INFO] (pool-4-thread-2): Running COPY from file /Volumes/kuwano_catalog/trocco-test/embulk_output_databricks_20241201125523TEST_4d4b274057ae449784e74fcbafaa561c/20241201125540TEST_38c792c352684db3b727bada5e12b2d7
2024-12-01 12:55:41.667 +0000 [INFO] (pool-4-thread-2): SQL: USE CATALOG `kuwano_catalog`
2024-12-01 12:55:41.923 +0000 [INFO] (pool-4-thread-2): > 0.26 seconds (-1 rows)
2024-12-01 12:55:41.923 +0000 [INFO] (pool-4-thread-2): SQL: USE SCHEMA `trocco-test`
2024-12-01 12:55:42.157 +0000 [INFO] (pool-4-thread-2): > 0.23 seconds (-1 rows)
2024-12-01 12:55:42.157 +0000 [INFO] (pool-4-thread-2): TransactionIsolation=read_uncommitted
2024-12-01 12:55:42.157 +0000 [INFO] (pool-4-thread-2): SQL: COPY INTO `trocco-test`.`test-table_00000193824a5348_embulk000` FROM ( SELECT _c0::STRING c0 , _c1::STRING c1 FROM "/Volumes/kuwano_catalog/trocco-test/embulk_output_databricks_20241201125523TEST_4d4b274057ae449784e74fcbafaa561c/20241201125540TEST_38c792c352684db3b727bada5e12b2d7" ) FILEFORMAT = CSV FORMAT_OPTIONS ( 'nullValue' = '\\N' , 'delimiter' = '\t' )
2024-12-01 12:55:45.445 +0000 [INFO] (pool-4-thread-2): > 3.29 seconds (4 rows)
2024-12-01 12:55:45.445 +0000 [INFO] (pool-4-thread-2): Loaded file /Volumes/kuwano_catalog/trocco-test/embulk_output_databricks_20241201125523TEST_4d4b274057ae449784e74fcbafaa561c/20241201125540TEST_38c792c352684db3b727bada5e12b2d7 (3.29 seconds for COPY)
次はVolumesにCSVファイルをアップロードして、そこから先程のTempテーブルにデータを読み込んでます
ココで前処理が必要なものとかは実施するのかな?
2024-12-01 12:55:46.961 +0000 [INFO] (0001:transaction): SQL: USE CATALOG `kuwano_catalog`
2024-12-01 12:55:47.209 +0000 [INFO] (0001:transaction): > 0.25 seconds (-1 rows)
2024-12-01 12:55:47.209 +0000 [INFO] (0001:transaction): SQL: USE SCHEMA `trocco-test`
2024-12-01 12:55:47.433 +0000 [INFO] (0001:transaction): > 0.22 seconds (-1 rows)
2024-12-01 12:55:47.433 +0000 [INFO] (0001:transaction): TransactionIsolation=read_uncommitted
2024-12-01 12:55:47.433 +0000 [INFO] (0001:transaction): SQL: INSERT INTO `trocco-test`.`test-table` (`c0`, `c1`) SELECT `c0`, `c1` FROM `trocco-test`.`test-table_00000193824a5348_embulk000` UNION ALL SELECT `c0`, `c1` FROM `trocco-test`.`test-table_00000193824a5348_embulk001` UNION ALL SELECT `c0`, `c1` FROM `trocco-test`.`test-table_00000193824a5348_embulk002` UNION ALL SELECT `c0`, `c1` FROM `trocco-test`.`test-table_00000193824a5348_embulk003`
2024-12-01 12:55:51.926 +0000 [INFO] (0001:transaction): > 4.49 seconds (4 rows)
そうしたらTempテーブルからINSERT INTOでターゲットのテーブルに書き込んでます
2024-12-01 12:55:53.146 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS `trocco-test`.`test-table_00000193824a5348_embulk000`
2024-12-01 12:55:53.565 +0000 [INFO] (0001:transaction): > 0.42 seconds (-1 rows)
2024-12-01 12:55:53.566 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS `trocco-test`.`test-table_00000193824a5348_embulk001`
2024-12-01 12:55:53.911 +0000 [INFO] (0001:transaction): > 0.35 seconds (-1 rows)
2024-12-01 12:55:53.911 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS `trocco-test`.`test-table_00000193824a5348_embulk002`
2024-12-01 12:55:54.290 +0000 [INFO] (0001:transaction): > 0.38 seconds (-1 rows)
2024-12-01 12:55:54.290 +0000 [INFO] (0001:transaction): SQL: DROP TABLE IF EXISTS `trocco-test`.`test-table_00000193824a5348_embulk003`
2024-12-01 12:55:54.703 +0000 [INFO] (0001:transaction): > 0.41 seconds (-1 rows)
2024-12-01 12:55:54.835 +0000 [INFO] (main): Committed.
最後にTempテーブルを削除して終わり、という感じです!
超シンプルに図にするとこう!

なるほどでした!
実際追ってみるとどう動いているか分かって解像度が上がりますね!よくわからなかった所もあるので後でもうちょっと見てみようと思います!
こんな感じできれいに連携できているTROCCOさんとDatabricks!是非使ってみてくださいねー!





コメントを残す