名称未設定-2.fw

弊社では、一部プロジェクトでGoogle Cloud Platform(以下GCP)を利用しています。GCPの機能の一つとしてGoogle Cloud Dataflow(以下Dataflow)というものがあります。
Dataflowはフルマネージドなデータ処理サービスです。Dataflowを利用すると、データソースに捉われることなくストリーム処理とバッチ処理のプログラミングモデルの切り替えをスムーズに実現でき、リソースのプロビジョニングが自動的に行われるため、ビジネスにとって最も重要なデータ処理のアルゴリズムに集中できるようになります。

今回はDataflowの処理をテンプレート化する方法をご紹介します。

テンプレートを作成するメリット

Dataflowでは、テンプレートを作成することができます。
テンプレートを利用することで、Dataflowジョブを実行する場所に選択肢が増えるというメリットが得られます。
プログラムを開発する環境から実行ファイルとテンプレートをあらかじめGoogle Cloud Storageにアップロードしておくことで、実行時には別の場所からそれらを利用した実行をDataflowに命ずるだけで良くなります。

ジョブ実行環境にDataflow SDKなどを用意する必要がなくなり、gcloudまたはREST APIを利用できさえすれば良くなります。

テンプレートの作成と実行

Dataflowでは、パラメータの指定が可能なテンプレートを作成することができます。
パラメータを利用しない場合は、既存のコードをステップ3.と4.に従ってコンパイル、実行するだけで良いです。

次のようなパラメータ指定可能なDataflowプログラムを、Template化した際にパラメータ指定可能にしてみます。
プログラムをパラメータ指定可能にする方法は、Dataflowで実行パラメータを指定するを参考にしてください。

public interface MyOptions extends PipelineOptions {
    @Description("Path of the file to read from.")
    @Default.String("gs://bucket/path/to/file")
    String getInputFile();
    void setInputFile(String myCustomOption);
}

public static void main(String[] args) {
    PipelineOptionsFactory.register(MyOptions.class);
    MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
    .apply(..略..);

    p.run();
}

1. ランタイムパラメータを利用可能にする

オプションインターフェイス内で定義しているオプションの型をValueProviderに変更します。

public interface MyOptions extends PipelineOptions {
    @Description("Path of the file to read from.")
    @Default.String("gs://bucket/path/to/file")
    ValueProvider getInputFile();
    void setInputFile(ValueProvider myCustomOption);
}

2. withoutValidatetion()を追加する

TextIO.Read.named()を含む幾つかのPipeline I/Oは、ランタイムパラメータに対応しています。
執筆時点の公式のドキュメントでは特に変更が必要ないとのことですが、筆者の環境では.withoutValidation()を付与しなければコンパイルできませんでした。(発生したエラーを参考までに下記に示します。)

TextIO.Read.named("ReadLines").from(options.getInputFile()).withoutValidation()

ただし、上記を付与しても警告が発生しますが、内容を見る限り問題は無さそうです。(警告の内容は下記に示します。)

ここまででコードの修正は完了です。
変更後のコードは最後に示します。

withoutValidatetion()なしで発生するエラー(引数なし実行時)

Exception in thread "main" java.lang.IllegalStateException: Cannot validate with a filepattern provided at runtime.

withoutValidatetion()なしで発生するエラー(テンプレート作成時)

[WARNING]
java.lang.reflect.InvocationTargetException
...
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.4.0:java (default-cli) on project my-project: An exception occured while executing the Java class. null: InvocationTargetException: Cannot validate with a filepattern provided at runtime. -> [Help 1]
...

withoutValidatetion()ありで発生する警告

警告: Size estimation of the source failed: RuntimeValueProvider{propertyName=inputFile, default=gs://bucket/path/to/file, value=null}
java.lang.IllegalStateException: Size estimation should be done at execution time.
...

3. テンプレートのコンパイル

--runnerTemplatingDataflowPipelineRunnerを指定することでテンプレートを作成できます。
ここでオプションを指定してしまうと、実行時にオプションの値が固定されてしまうので気を付けましょう。

$ mvn compile exec:java \
 -Dexec.mainClass=your.package.MainClass \
 -Dexec.args="--project=your-project \
              --stagingLocation=gs://your-bucket/path/to/staging \
              --dataflowJobFile=gs://your-bucket/path/to/template \
              --runner=TemplatingDataflowPipelineRunner"

4. テンプレートの実行

--parametersにオプションを指定して実行できます。

$ gcloud beta dataflow jobs run $JOB_NAME \
        --gcs-location gs://your-bucket/path/to/template \
        --parameters=inputFile=gs://your-bucket/path/to/input/file

複数のオプションがある場合は、次のようにします。

--parameters=p1=v1,p2=v2

変更後のコード

public interface MyOptions extends PipelineOptions {
    @Description("Path of the file to read from.")
    @Default.String("gs://bucket/path/to/file")
    ValueProvider getInputFile();
    void setInputFile(ValueProvider myCustomOption);
}

public static void main(String[] args) {
    PipelineOptionsFactory.register(MyOptions.class);
    MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()).withoutValidation())
    .apply(..略..);

    p.run();
}

参考