arrow_back

Cloud Composer: Qwik Start - Console

参加 ログイン
Test and share your knowledge with our community!
done
Get access to over 700 hands-on labs, skill badges, and courses

Cloud Composer: Qwik Start - Console

Lab 1時間 universal_currency_alt クレジット: 1 show_chart 入門
Test and share your knowledge with our community!
done
Get access to over 700 hands-on labs, skill badges, and courses

GSP261

Google Cloud セルフペース ラボ

概要

ワークフローはデータ分析における一般的なテーマのひとつで、データの取り込み、変換、分析によってデータから有益な情報を見つけるために使用されます。Google Cloud には、ワークフローをホストするためのツールとして Cloud Composer が用意されています。これは、よく利用されているオープンソース ワークフロー ツールの Apache Airflow をホスト型にしたものです。

このラボでは、Cloud コンソールを使用して Cloud Composer 環境を作成し、Cloud Composer を使ってシンプルなワークフローを実行します。このワークフローは、データファイルが存在することを確認し、Cloud Dataproc クラスタを作成して Apache Hadoop ワードカウント ジョブを実行した後、Cloud Dataproc クラスタを削除します。

目標

このラボでは、次のタスクの実行方法について学びます。

  • Cloud コンソールを使用して Cloud Composer 環境を作成する
  • Airflow ウェブ インターフェースで DAG(有向非巡回グラフ)を表示して実行する
  • 保存されたワードカウント ジョブの結果を表示する

設定と要件

[ラボを開始] ボタンをクリックする前に

こちらの手順をお読みください。ラボの時間は記録されており、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。

このハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。

このラボを完了するためには、下記が必要です。

  • 標準的なインターネット ブラウザ(Chrome を推奨)
注: このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウント間の競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。
  • ラボを完了するために十分な時間を確保してください。ラボをいったん開始すると一時停止することはできません。
注: すでに個人の Google Cloud アカウントやプロジェクトをお持ちの場合でも、このラボでは使用しないでください。アカウントへの追加料金が発生する可能性があります。

ラボを開始して Google Cloud コンソールにログインする方法

  1. [ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側の [ラボの詳細] パネルには、以下が表示されます。

    • [Google コンソールを開く] ボタン
    • 残り時間
    • このラボで使用する必要がある一時的な認証情報
    • このラボを行うために必要なその他の情報(ある場合)
  2. [Google コンソールを開く] をクリックします。 ラボでリソースが起動し、別のタブで [ログイン] ページが表示されます。

    ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。

    注: [アカウントの選択] ダイアログが表示されたら、[別のアカウントを使用] をクリックします。
  3. 必要に応じて、[ラボの詳細] パネルから [ユーザー名] をコピーして [ログイン] ダイアログに貼り付けます。[次へ] をクリックします。

  4. [ラボの詳細] パネルから [パスワード] をコピーして [ようこそ] ダイアログに貼り付けます。[次へ] をクリックします。

    重要: 認証情報は左側のパネルに表示されたものを使用してください。Google Cloud Skills Boost の認証情報は使用しないでください。 注: このラボでご自身の Google Cloud アカウントを使用すると、追加料金が発生する場合があります。
  5. その後次のように進みます。

    • 利用規約に同意してください。
    • 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
    • 無料トライアルには登録しないでください。

その後このタブで Cloud Console が開きます。

注: 左上にある [ナビゲーション メニュー] をクリックすると、Google Cloud のプロダクトやサービスのリストが含まれるメニューが表示されます。 ナビゲーション メニュー アイコン

Cloud Shell をアクティブにする

Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする」アイコン 「Cloud Shell をアクティブにする」アイコン をクリックします。

接続した時点で認証が完了しており、プロジェクトに各自の PROJECT_ID が設定されます。出力には、このセッションの PROJECT_ID を宣言する次の行が含まれています。

Your Cloud Platform project in this session is set to YOUR_PROJECT_ID

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  1. (省略可)次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list
  1. [承認] をクリックします。

  2. 出力は次のようになります。

出力:

ACTIVE: * ACCOUNT: student-01-xxxxxxxxxxxx@qwiklabs.net To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (省略可)次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project = <project_ID>

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: Google Cloud における gcloud ドキュメントの全文については、gcloud CLI の概要ガイドをご覧ください。

タスク 1. Cloud Composer 環境を作成する

このセクションでは、Cloud Composer 環境を作成します。

  1. ナビゲーション メニュー (ナビゲーション メニュー アイコン) > [Composer] に移動します。

  2. [環境の作成] をクリックしてプルダウンから [Composer 1] を選択します。

  3. 環境を以下のように設定します。

    • 名前: highcpu

    • ロケーション:

    • イメージのバージョン: 最新バージョンを選択

    • [ゾーン]:

    • マシンタイプ: e2-standard-2

その他の設定はすべてデフォルトのままにします。

  1. [作成] をクリックします。

Cloud コンソールの [環境] ページで環境の名前の左側に緑色のチェックマークが表示されると、環境作成プロセスが完了したことを表します。

設定プロセスが完了するまで 10~15 分かかることがあります。その間にラボの作業を進めてください。

Cloud Storage バケットを作成する

プロジェクトの Cloud Storage バケットを作成します。このバケットは、Dataproc の Hadoop ジョブの出力に使用されます。

  1. ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、[作成] をクリックします。

  2. バケットに という名前を付け、 リージョンに設定して、[作成] をクリックします。

[公開アクセスの防止] が表示されたら、[確認] をクリックします。

この Cloud Storage バケット名(プロジェクト ID)は後ほど Airflow 変数として使用するため、覚えておいてください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

Cloud Storage バケットを作成する

タスク 2. Airflow と主要なコンセプト

Composer 環境が作成されるまでの間に、Airflow で使用される用語を確認しましょう。

Airflow とは、ワークフローの作成、スケジューリング、モニタリングをプログラムで行うためのプラットフォームです。

Airflow を使用して、ワークフローをタスクの有向非循環グラフ(DAG)として作成します。Airflow スケジューラは、指定された依存関係に従って一連のワーカーでタスクを実行します。

基本コンセプト

DAG

有向非循環グラフとは、実行したいすべてのタスクの集まりであり、それらの関係や依存状態を反映するように編成されます。

オペレーター

単一のタスクを記述したもので、通常はアトミックなタスクです。たとえば、BashOperator は bash コマンドの実行に使用されます。

タスク

オペレーターのパラメータ化されたインスタンスであり、DAG 内のノードです。

タスク インスタンス

特定のタスクの実行です。DAG、タスク、ある時点を表し、実行中、成功、失敗、スキップなどの状態を示します。

コンセプトの詳細については、Concepts のドキュメントをご覧ください。

タスク 3. ワークフローの定義

これから使用するワークフローについて説明します。Cloud Composer ワークフローは DAG(有向非巡回グラフ)で構成されます。DAG の定義には標準 Python ファイルを使用し、これらのファイルは Airflow の DAG_FOLDER に配置されます。Airflow では各ファイル内のコードを実行して動的に DAG オブジェクトをビルドします。任意の数のタスクを記述した DAG を必要なだけ作成できます。一般に、DAG と論理ワークフローは 1 対 1 で対応している必要があります。

  1. Cloud コンソール の右上にある Cloud Shell をアクティブにするアイコンをクリックして、新しい Cloud Shell ウィンドウを開きます。

  2. Cloud Shell で、nano(コードエディタの一種)を使って hadoop_tutorial.py というファイルを作成します。

nano hadoop_tutorial.py
  1. 次のコードは hadoop_tutorial.py ワークフロー(DAG)です。hadoop_tutorial.py ファイル内に次のコードを貼り付けます。
# [START composer_hadoop_tutorial] """Airflow DAG の例。Cloud Dataproc クラスタを作成し、Hadoop のワードカウントの例を実行し、クラスタを削除します。 この DAG は 3 つの Airflow 変数に依存します https://airflow.apache.org/concepts.html#variables * gcp_project - Cloud Dataproc クラスタで使用する Google Cloud プロジェクト * gce_zone - Cloud Dataproc クラスタを作成する Google Compute Engine ゾーン * gcs_bucket - Hadoop ジョブの結果に使用する Google Cloud Storage バケット バケットの作成方法については https://cloud.google.com/storage/docs/creating-buckets を参照 """ import datetime import os from airflow import models from airflow.contrib.operators import dataproc_operator from airflow.utils import trigger_rule # Cloud Dataproc ジョブの出力ファイル output_file = os.path.join( models.Variable.get('gcs_bucket'), 'wordcount', datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep # すべての Dataproc クラスタで利用可能な、Hadoop のワードカウントの例へのパス WORDCOUNT_JAR = ( 'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar' ) # Cloud Dataproc ジョブに渡す引数 input_file = 'gs://pub/shakespeare/rose.txt' wordcount_args = ['wordcount', input_file, output_file] yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) default_dag_args = { # 開始日を前日にすると、Cloud Storage バケットで # 検出され次第 DAG が実行されます 'start_date': yesterday, # 失敗時や再試行時にメールを送信するには、「email」引数を # 自分のアドレスに設定してメールを有効にします 'email_on_failure': False, 'email_on_retry': False, # タスクの失敗時には、5 分以上待ってから 1 回再試行します 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), 'project_id': models.Variable.get('gcp_project') } # [START composer_hadoop_schedule] with models.DAG( 'composer_hadoop_tutorial', # DAG を 1 日に 1 回、継続して実行します schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag: # [END composer_hadoop_schedule] # Cloud Dataproc クラスタを作成します create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator( task_id='create_dataproc_cluster', # 設定された日付を加えてクラスタに一意の名前を付けます # https://airflow.apache.org/code.html#default-variables を参照 cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', num_workers=2, region='{{{project_0.default_region|region}}}', zone=models.Variable.get('gce_zone'), image_version='2.0', master_machine_type='e2-standard-2', worker_machine_type='e2-standard-2') # Cloud Dataproc クラスタのマスターノードにインストール # されている Hadoop のワードカウントの例を実行します run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator( task_id='run_dataproc_hadoop', region='{{{project_0.default_region|place_holder_text}}}', main_jar=WORDCOUNT_JAR, cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', arguments=wordcount_args) # Cloud Dataproc クラスタを削除します delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator( task_id='delete_dataproc_cluster', region='{{{project_0.default_region|place_holder_text}}}', cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', # trigger_rule を ALL_DONE に設定すると、Dataproc ジョブが # 失敗した場合でもクラスタは削除されます trigger_rule=trigger_rule.TriggerRule.ALL_DONE) # [START composer_hadoop_steps] # DAG の依存関係を定義します create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster # [END composer_hadoop_steps] # [END composer_hadoop_tutorial]
  1. Ctrl+O キー、Enter キー、Ctrl+X キーの順番に押して、編集内容を保存してエディタを終了します。

3 つのワークフロー タスクをオーケストレーションするために、DAG は次のオペレーターをインポートします。

  • DataprocClusterCreateOperator: Cloud Dataproc クラスタを作成します。
  • DataProcHadoopOperator: Hadoop ワードカウント ジョブを送信し、結果を Cloud Storage バケットに書き込みます。
  • DataprocClusterDeleteOperator: クラスタを削除して、Compute Engine の利用料金が発生しないようにします。

タスクは順番に実行されます。ファイルの次の部分で確認できます。

# DAG の依存関係を定義します create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

この DAG の名前は hadoop_tutorial で、1 日 1 回実行されます。

with models.DAG( 'composer_hadoop_tutorial', # DAG を 1 日に 1 回、継続して実行します schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag:

default_dag_args に渡される start_dateyesterday に設定されているので、Cloud Composer ではワークフローの開始が DAG のアップロード直後に設定されます。

タスク 4. 環境情報を表示する

  1. Composer に戻り、環境のステータスを確認します。

  2. 環境が作成されたら、環境の名前(highcpu)をクリックして詳細を確認します。

[環境の詳細] で、Airflow ウェブ インターフェース URL、Kubernetes Engine クラスタ ID、バケットに保存されている DAG フォルダへのリンクなどの情報を確認できます。

注: Cloud Composer がスケジュール設定を行うのは、/dags フォルダ内のワークフローのみです。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

Cloud Composer 環境を作成する

タスク 5. Airflow UI の使用

Cloud コンソールで Airflow ウェブ インターフェースにアクセスするには:

  1. [環境] ページに戻ります。
  2. 環境の [Airflow ウェブサーバー] 列で、[Airflow] をクリックします。
  3. ラボの認証情報をクリックします。
  4. 新しいウィンドウに Airflow ウェブ インターフェースが表示されます。

タスク 6. Airflow 変数の設定

Airflow 変数は Airflow 固有の概念であり、環境変数とは異なります。

  1. Airflow メニューバーで [Admin] > [Variables] を選択し、+新しいレコードの追加)をクリックします。

  2. gcp_projectgcs_bucketgce_zone の 3 つの Airflow 変数を作成します。

キー

詳細

gcp_project

このクイックスタートで使用する Google Cloud プロジェクトです。

gcs_bucket

gs://

先ほど作成した Cloud Storage バケットの名前。変更していない限りプロジェクト ID と同じになります。このバケットに Dataproc の Hadoop ジョブの出力が保存されます。

gce_zone

Cloud Dataproc クラスタを作成する Compute Engine ゾーン。別のゾーンを選択するには、使用可能なリージョンとゾーンをご覧ください。

注: レコードを入力したら、[Save] をクリックしてからもう一度 + をクリックし、新しいレコードを入力します。

完了すると、Variables のテーブルは次のようになります。

gce_zone、gcp_project、gcs_bucket の 3 つのキーと対応する値、説明、暗号化の有無が List Variables のテーブルに表示されています。

タスク 7. DAG を Cloud Storage にアップロードする

DAG をアップロードするには、hadoop_tutorial.py ファイルのコピーを Cloud Storage バケットにアップロードします。このバケットは、環境を作成したときに自動的に作成されています。

  1. これを確認するには、[Composer] > [環境] に移動します。

  2. 先ほど作成した環境をクリックします。作成した環境の詳細が表示されます。

  3. [環境の構成] をクリックします。

  4. [DAG のフォルダ] を見つけて値をコピーし、次のコマンドの <DAGs_folder_path> をその値に置き換え、Cloud Shell でコマンドを実行します。

gsutil cp hadoop_tutorial.py <DAGs_folder_path>

dags_folder

Cloud Composer により、DAG が Airflow に追加されて自動的にスケジュール設定されます。DAG の変更が 3~5 分で行われます。これ以降、このワークフローを composer_hadoop_tutorial と呼びます。

このタスクのステータスは、Airflow ウェブ インターフェースで確認できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

DAG を Cloud Storage にアップロードする

DAG の実行状況を確認する

DAG ファイルを Cloud Storage の dags フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。エラーが見つからなければ、このワークフローの名前が DAG のリストに表示され、即時実行されるようキューに登録されます。

Airflow ウェブ インターフェースの [DAGs] タブが表示されていることを確認してください。このプロセスが完了するまで数分かかります。ブラウザの画面を更新して、最新情報を表示してください。

  1. Airflow で [composer_hadoop_tutorial] をクリックして DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が図で示されています。

  2. ツールバーの [Graph] をクリックしてグラフを表示します。各タスクのグラフィックにカーソルを合わせると、そのタスクのステータスが表示されます。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。

  3. 必要に応じて [Auto refresh] をオンにするか [Refresh] アイコンをクリックして最新の情報に更新します。プロセスを囲む線の色がステータスに応じて変化するのを確認できます。

create_dataproc_cluster を囲む線の色が変化し、ステータスが「running」(実行中)以外になったら、[Graph] から再度ワークフローを実行します。ステータスが「running」の場合は、以下の 3 つのステップは実行不要です。

  1. [create_dataproc_cluster] グラフィックをクリックします。
  2. [Clear] をクリックして 3 つのタスクをリセットします。
  3. 次に、[OK] をクリックして確定します。

create_dataproc_cluster を囲む線の色が変化し、ステータスが「Running」(実行中)になったことに注目してください。

Cloud コンソール でプロセスをモニタリングすることもできます。

  1. create_dataproc_cluster のステータスが「Running」に変わったら、ナビゲーション メニュー > [Dataproc] に移動し、次の操作を行います。
  • [Clusters] をクリックして、クラスタの作成と削除をモニタリングします。ワークフローによって作成されたクラスタは一時的なものです。ワークフローの実行中にのみ存在し、最後のワークフロー タスクで削除されます。
  • [Jobs] をクリックして、Apache Hadoop ワードカウント ジョブをモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。
  1. Dataproc のステータスが「Running」になったら Airflow に戻って [Refresh] をクリックすると、クラスタが完成したことを確認できます。

  2. run_dataproc_hadoop プロセスが完了したら、ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、バケット名をクリックして wordcount フォルダでワードカウントの結果を確認できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

DAG の実行状況を確認する

理解度テスト

クイズに挑戦して Google Cloud Platform に関するご自分の知識をチェックしましょう。

Cloud Composer 環境を削除する

  1. [Composer] の [環境] ページに戻ります。
  2. Composer 環境の横にあるチェックボックスを選択します。
  3. [削除] をクリックします。
  4. もう一度 [削除] をクリックして、ポップアップを確認します。

お疲れさまでした

これで完了です。このラボでは、Cloud Composer 環境を作成して Cloud Storage に DAG をアップロードし、ワークフローを実行して、Cloud Dataproc クラスタの作成、Hadoop ワードカウント ジョブの実行、クラスタの削除を行いました。また、Airflow と基本コンセプトについて学び、Airflow ウェブ インターフェースを操作しました。Cloud Composer を使用して、独自のワークフローを作成して管理できるようになりました。

次のステップ

このラボは、Google Cloud の多くの機能を体験できる「Qwik Start」と呼ばれるラボシリーズの一部です。ラボカタログで「Qwik Start」を検索し、興味のあるラボを探してみてください。

Google Cloud トレーニングと認定資格

Google Cloud トレーニングと認定資格を通して、Google Cloud 技術を最大限に活用できるようになります。必要な技術スキルとベスト プラクティスについて取り扱うクラスでは、学習を継続的に進めることができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、バーチャル参加など、多忙なスケジュールにも対応できるオプションが用意されています。認定資格を取得することで、Google Cloud テクノロジーに関するスキルと知識を証明できます。

マニュアルの最終更新日: 2023 年 12 月 18 日

ラボの最終テスト日: 2023 年 12 月 18 日

Copyright 2024 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。