【Databricks】PythonでNotebookのフォルダ移動を実現してみた

はじめに

弊社ではDatabricksを運用していく中で徐々にNotebookが増え、整理する必要が出てきました。
そこで、手始めに一部共有フォルダ内の50件ほどのNotebookをフォルダ間移動、一部のリネームを行うこととなりました。

一般的にDatabricksでのNotebookのフォルダ間移動はUI上で行いますが、
この大量のNotebookをひとつひとつ移動させるにはかなりの時間と手間がかかってしまいますし、
今後棚卸しを定期的に行うとして手作業ではかなりの工数がかかってしまいます。

どうにかフォルダ間移動、リネームを一部自動化できないかと考えPythonでのコーディングを駆使して実現することができました。
その際、同じような事例に関する資料がなかなか見つからなったので、
今回工夫を重ねて実践したことを備忘録として記事にすることにしました。

利用するDatabricks Rest API

注意点

  • Notebookに関するライブラリでは移動はサポートされておらず、エクスポート&インポートを実施することで擬似的に実現します。
  • また、エクスポート&インポート時、元Notebookは残ってしまうため、どこかのタイミングで削除する必要がありますので頭の片隅に入れておきましょう。
  • さらに、Notebookに紐づけられたジョブやDLTパイプラインは当ブログ投稿時点では自動で変更されないため、手動やコードで変更する必要があります。
  • 前提として、ある程度Pythonの知識とRestAPIの知識があると良いです。

実践

さて、実際にコードを書きながら実践してみましょう。
まずは、複数件で行う前に1件のNotebookを移動させるコードを書いてみましょう。
具体的な動きは以下になります。

  • Workspaceにある「sample1」フォルダ内の「notebook_sample1」を「sample2」に移動させる

前述の注意書きにも書いた通り、Notebookのフォルダ間移動をコードで実現させるためには、
エクスポート&インポートを行う必要がありますので順番に見てきましょう。

Notebookのエクスポート編

まず初めにNotebookを1件エクスポートする方法を見ていきましょう。

エクスポートを行うライブラリは以下です。

【Export a workspace object | Workspace API | REST API reference | Databricks on AWS】

完成形

まず初めに、上記ライブラリを利用した記述の完成形を見てみましょう。

# 指定のNotebookをエクスポート
import requests
import json

# 対象のNotebookパス(/Workspaceから指定すること)
notebook_path = "/Workspace/Users/xxxxxxxxxxxxxx/sample1/notebook_sample1"

# Databricksのホスト名とパーソナルアクセストークン
databricks_host = "xxxxxx"
databricks_token = "xxxxxx"

# エクスポート用のエンドポイント
url_export = f"https://{databricks_host}/api/2.0/workspace/export"

# 認証ヘッダーを設定
headers = {
    'Authorization': f'Bearer {databricks_token}',
    'Content-Type': 'application/json'
}

# エクスポート用のパラメータ(formatはSOURCE、HTML、JUPYTER、DBC、R_MARKDOWN、AUTOが選べます)
export_param = {
    'path': notebook_path,
    'format': 'AUTO'
}

notebook_data = requests.get(url_export, headers=headers, params=export_payload).json()
print(notebook_data)

上記を実行すると、エクスポート実行した際のレスポンスとして、対象Notebookの情報をエンコードした文字列が返されてきます。

{'content': 'IyBEYXRhYnJpY2tzIG5vdGVib29rIHNvdXJjZQojIE1BR0lDICVtZAojIE1BR0lDIE5vdGVib29r44Gu56e75YuV44K144Oz44OX44Or77yRCiMgTUFHSUMKCiMgQ09NTUFORCAtLS0tLS0tLS0tCgpwcmludCgiSGVsbG8gV29ybGQiKQ==', 'file_type': 'py'}
ライブラリのインポート

それでは、上記完成形コードを細かく見ていきましょう。
まずは、ライブラリのインポートですが、このコードではRestAPIを呼び出すための「requests」、返ってきたレスポンスをJson形式に変換する「json」をインポートしています。

# 指定のNotebookをエクスポート
import requests
import json
変数定義

次に、以下部分では実行に必要な情報を変数に用意しています。

# 対象のNotebookパス(/Workspaceから指定すること)
notebook_path = "/Workspace/Users/xxxxxxxxxxxxxx/sample1/notebook_sample1"

→エクスポート元のNotebookのパスです。「/Workspace」から指定することがポイントです。
 Notebookのパス取得方法については以下公式ドキュメントを参考にしてください。
【ノートブックの管理|Databricks on AWS】

# Databricksのホスト名とパーソナルアクセストークン
databricks_host = "xxxxxx"
databricks_token = "xxxxxx"

→RestAPIを呼び出すために必要な情報です。トークンの詳細については以下公式ドキュメントを参照してください。
【Databricks個人用アクセストークン認証 | Databricks on AWS】

# エクスポート用のエンドポイント
url_export = f"https://{databricks_host}/api/2.0/workspace/export"

→エクスポートAPIのエンドポイントです。ブログ投稿時点での最新バージョンは2.0です。

# 認証ヘッダーを設定
headers = {
    'Authorization': f'Bearer {databricks_token}',
    'Content-Type': 'application/json'
}

→API接続するための認証ヘッダーです。これらの設定をしないと正しくAPIに接続できません。

# エクスポート用のパラメータ(formatはSOURCE、HTML、JUPYTER、DBC、R_MARKDOWN、AUTOが選べます)
export_param = {
    'path': notebook_path,
    'format': 'AUTO'
}

→APIリクエストをする際に必要な情報です。

項目名 説明
path エクスポート元のNotebookのパス
format エクスポート元のNotebookのフォーマット
SOURCE、HTML、JUPYTER、DBC、R_MARKDOWN、AUTOが選択可能
AUTOを選択しておくと安全
リクエストの記述

最後に、リクエスト部分の記述です。
requestライブラリのget関数の引数にそれぞれ定義した変数を設定します。
また、json関数もつけることで、返ってきたレスポンスをJson形式に変換し、後々の処理をしやすいようにしています。

notebook_data = requests.get(url_export, headers=headers, params=export_payload).json()
print(notebook_data)

Notebookのインポート編

続いて、Notebookを1件インポートする方法を見ていきましょう。

インポートを行うライブラリは以下です。
【Import a workspace object | Workspace API | REST API reference | Databricks on AWS】

完成形

まず初めに、上記ライブラリを利用した記述の完成形を見てみましょう。

# 指定のNotebookをエクスポート
import requests
import json

# インポート先のパス(末尾はファイル名となる)
notebook_path = "/Workspace/Users/xxxxxxxxxxxxxx/sample1/notebook_sample2"
# 対象のNotebookのバイナリデータ
notebook_binary = "IyBEYXRhYnJpY2tzIG5vdGVib29rIHNvdXJjZQojIE1BR0lDICVtZAojIE1BR0lDIE5vdGVib29r44Gu56e75YuV44K144Oz44OX44Or77yRCiMgTUFHSUMKCiMgQ09NTUFORCAtLS0tLS0tLS0tCgpwcmludCgiSGVsbG8gV29ybGQiKQ=="
# Notebookの使用言語
notebook_language = "PYTHON"

# Databricksのホスト名とパーソナルアクセストークン
databricks_host = "xxxxxx"
databricks_token = "xxxxxx"

# インポート用のエンドポイント
url_import = f"https://{databricks_host}/api/2.0/workspace/import"

# 認証ヘッダーを設定
headers = {
    'Authorization': f'Bearer {databricks_token}',
    'Content-Type': 'application/json'
}

# インポートのパラメータ
import_payload = {
    'path': notebook_path,
    'format': 'SOURCE',
    "overwrite": 'true',
    'content': notebook_binary,
    'language': notebook_language
}
import_response = requests.post(url_import, headers=headers, data=json.dumps(import_payload))

# 結果を表示
print(import_response)

上記を実行すると、インポート実行した際のレスポンスとして、NotebookのIDが返されてきます。

{'object_id': 242391193179958}
ライブラリのインポート

それでは、上記完成形コードを細かく見ていきましょう。
まずは、ライブラリのインポートですが、エクスポート編と全く同様なので詳細は割愛します。

変数定義

次に、以下部分では実行に必要な情報を変数に用意しています。
エクスポート編と異なる部分や注意が必要なものについて説明していきます。

# インポート先のパス(末尾はファイル名となる)
notebook_path = "/Workspace/Users/xxxxxxxxxxxxxx/sample1/notebook_sample2"

→インポート先のパスです。「/Workspace」から指定することがポイントです。
 また、末尾はファイル名になり、インポート元と違う名前でインポートすることも可能です。

 DatabricksではNotebookは同じフォルダに同じ名前のものは保存できないため、基本的な動きは以下のようになります。
 フォルダが同じでファイル名も同じ場合:インポート元を上書きする形もしくは何も起きない(詳細はパラメータ設定で説明します)
 上記以外の場合:指定名で指定のフォルダにコピーされる形

# インポート用のエンドポイント
url_export = f"https://{databricks_host}/api/2.0/workspace/import"

→インポートAPIのエンドポイントです。ブログ投稿時点での最新バージョンは2.0です。

# インポートのパラメータ
import_payload = {
    'path': notebook_path,
    'format': 'SOURCE',
    "overwrite": 'true',
    'content': notebook_binary,
    'language': notebook_language
}

→APIリクエストをする際に必要な情報です。
 特に注意点として、前後で形式が変わってしまうと実行できなくなったりしてしまうので確認は徹底しましょう

項目名 説明
path インポート先のパス(末尾はファイル名)
format インポート元のNotebookのフォーマット
SOURCE、HTML、JUPYTER、DBC、R_MARKDOWN、AUTOが選択可能
overwrite パス、ファイル名が同じ時に上書きをするかどうかの設定
trueだと上書きする、falseだと上書きしない
content インポート元のNotebookのバイナリデータを設定
language 該当Notebookの使用言語を設定
SCALA、PYTHON、SQL、Rが選択可能
リクエストの記述

最後に、リクエスト部分の記述です。
requestライブラリのget関数の引数にそれぞれ定義した変数を設定します。
パラメータを設定する際はJsonDumpすることに注意しましょう。
また、json関数もつけることで、返ってきたレスポンスをJson形式に変換し、後々の処理をしやすいようにしています。

import_response = requests.post(url_import, headers=headers, data=json.dumps(import_payload))
print(import_response)

エクスポートとインポートを組み合わせてみる編

さて、つづいて上述したエクスポートとインポートを組み合わせてみましょう。

完成形

まず初めに、二つを組み合わせた完成形をみてみましょう。

# 指定のNotebookをエクスポート
import requests
import json

# Notebookパス
export_path = "/Workspace/Users/xxxxxxxxxxxxxx/sample1/notebook_sample1"
import_path = "/Workspace/Users/xxxxxxxxxxxxxx/sample1/notebook_sample2"

# Databricksのホスト名とパーソナルアクセストークン
databricks_host = "xxxxxxxxxxxxxx"
databricks_token = "xxxxxxxxxxxxxx"

# エンドポイント
url_export = f"https://{databricks_host}/api/2.0/workspace/export"
url_import = f"https://{databricks_host}/api/2.0/workspace/import"

# 認証ヘッダーを設定
headers = {
    'Authorization': f'Bearer {databricks_token}',
    'Content-Type': 'application/json'
}

# エクスポートの実施
export_param = {
    'path': notebook_path,
    'format': 'AUTO'
}
notebook_data = requests.get(url_export, headers=headers, params=export_param).json()

# Notebookのバイナリーデータ
content = notebook_data["content"]
# Notebookの使用言語
file_type = request_data["file_type"]
if file_type == "py":
  language = "PYTHON"
elif file_type == "scala":
  language = "SCALA"
elif file_type == "r":
  language = "R"
elif file_type == "sql":
  language = "SQL"


# インポートの実施
import_payload = {
    'path': notebook_path,
    'format': 'SOURCE',
    "overwrite": 'true',
    'content': content,
    'language': language
}
import_response = requests.post(url_import, headers=headers, data=json.dumps(import_payload)).json()

このように組み合わせることができます。

ポイント

特にポイントとなるところは、エクスポートしたNotebookの情報をインポートのパラメータに設定する部分です。

notebook_data = requests.get(url_export, headers=headers, params=export_param).json()

# Notebookのバイナリーデータ
content = notebook_data["content"]
# Notebookの使用言語
file_type = request_data["file_type"]
if file_type == "py":
  language = "PYTHON"
elif file_type == "scala":
  language = "SCALA"
elif file_type == "r":
  language = "R"
elif file_type == "sql":
  language = "SQL"

# インポートの実施
import_payload = {
    'path': notebook_path,
    'format': 'SOURCE',
    "overwrite": 'true',
    'content': content,
    'language': language
}

notebook_dataはエクスポートして返ってきたレスポンスが格納されます。
おさらいですが、このレスポンスは以下のような形で入ってきます。

{'content': 'IyBEYXRhYnJpY2tzIG5vdGVib29rIHNvdXJjZQojIE1BR0lDICVtZAojIE1BR0lDIE5vdGVib29r44Gu56e75YuV44K144Oz44OX44Or77yRCiMgTUFHSUMKCiMgQ09NTUFORCAtLS0tLS0tLS0tCgpwcmludCgiSGVsbG8gV29ybGQiKQ==', 'file_type': 'py'}

パラメータのcontentに設定しなければいけないのはバイナリデータのみなので、
notebook_data["content"]という形でレスポンス内のcontentに設定します。
また、languageは「SCALA、PYTHON、SQL、R」のいずれかを設定する必要があるため、
notebook_data["file_type"]から条件分岐を使って選択していきます。

複数件実施するには?

さて、ここまでは1件のNotebookを対象としてきました。
ここから複数件のNotebookを対象にしたい場合どうするのかをみていきましょう。

サンプルとして、以下3件のNotebookの情報をDataFrameに用意しておきます。

before_path after_path
/Workspace/Users/xxx/test/notebook1 /Workspace/Users/xxx/test/notebook2
/Workspace/Users/xxx/test/notebook3 /Workspace/Users/xxx/test/notebook4
/Workspace/Users/xxx/test/notebook5 /Workspace/Users/xxx/test/notebook6

今回はコードにベタ書きしますが、実際の案件ではCSVを読み込むなどしてメンテナンスがしやすいようにしておきましょう。

完成形

まずは、複数件実施の場合の完成形をみてみましょう。

# 指定のNotebookをエクスポート
import requests
import json

# Dataframeの定義
df = spark.createDataFrame([
  (
    '/Workspace/Users/xxx/test/notebook1', 
    '/Workspace/Users/xxx/test/notebook2'
  ), (
    '/Workspace/Users/xxx/test/notebook3', 
    '/Workspace/Users/xxx/test/notebook4'
  ), (
    '/Workspace/Users/xxx/test/notebook5', 
    '/Workspace/Users/xxx/test/notebook6'
  )
  ], ['before_path', 'after_path'])

# Databricksのホスト名とパーソナルアクセストークン
databricks_host = "xxxxxxxxxxxxxx"
databricks_token = "xxxxxxxxxxxxxx"

# エンドポイント
url_export = f"https://{databricks_host}/api/2.0/workspace/export"
url_import = f"https://{databricks_host}/api/2.0/workspace/import"

# 認証ヘッダーを設定
headers = {
    'Authorization': f'Bearer {databricks_token}',
    'Content-Type': 'application/json'
}

for row in df.collect():
  # エクスポートの実施
  export_param = {
    'path': row['before_path'],
    'format': 'AUTO'
  }
  notebook_data = requests.get(url_export, headers=headers, params=export_param).json()
  # Notebookのバイナリーデータ
  content = notebook_data["content"]
  # Notebookの使用言語
  file_type = request_data["file_type"]
  if file_type == "py":
    language = "PYTHON"
  elif file_type == "scala":
    language = "SCALA"
  elif file_type == "r":
    language = "R"
  elif file_type == "sql":
    language = "SQL"

  # インポートの実施
  import_payload = {
    'path': row['after_path'],
    'format': 'SOURCE',
    "overwrite": 'true',
    'content': content,
    'language': language
  }
  import_response = requests.post(url_import, headers=headers, data=json.dumps(import_payload)).json()
ポイント

ポイントとなる点は以下になります。

for row in df.collect():

DatabricksのAPIでは、複数件を一気に処理するAPIは用意されていないため、
Dataframeの一行ずつ処理を行う必要があるのです。

さいごに

いかがだったでしょうか?
今回は、エクスポートとインポートを駆使して擬似的にNotebookを一気にフォルダ間移動する方法を記述してみました。
一見コーディングが必要なさそうなGUI操作も、作業効率や保守の観点からコーディングした方が最善の方法の場合もあるのです。
特に大量のNotebookを一度に整理する場合は手動操作よりもはるかに効率的です。

また、こうした自動化の工夫をこらすことで、作業効率化だけではなくケアレスミスを防ぐこともできるというメリットもあります。
今後も環境や状況に応じた工夫を取り入れ、より良いデータ運用を目指していきたいですね。

このブログが皆さんの作業環境に少しでも役立つ情報となれば幸いです!