как называется ОПЕРАЦИЯ в операциях gcloud ai-platform wait ОПЕРАЦИЯ?

Мне нужно дождаться завершения работы по обучению машинному обучению, прежде чем продолжить работу.

Я использую Composer / Airflow для управления своими задачами. Моя первая задача - запустить обучение машинному обучению на платформе AI, затем мне нужно дождаться завершения этого обучения, прежде чем перейти к следующей задаче.

У меня проблемы с пониманием документации, здесь, в которой объясняется как можно дождаться завершения операции ML.

В документации указано: gcloud ai-platform operations wait OPERATION

В настоящее время мой код:

   gcloud ai-platform operations wait {{ params.JOB_NAME }}

и я получаю сообщение об ошибке:

Running command: gcloud ai-platform operations wait ai_composer_20191119_201848
[2019-11-19 20:18:48,648] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:48,647] {bash_operator.py:97} INFO - Output:
[2019-11-19 20:18:49,811] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,810] {bash_operator.py:101} INFO - ERROR: (gcloud.ai-platform.operations.wait) NOT_FOUND: Field: name Error: The specified job was not found.
[2019-11-19 20:18:49,812] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - - '@type': type.googleapis.com/google.rpc.BadRequest
[2019-11-19 20:18:49,812] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - fieldViolations:
[2019-11-19 20:18:49,812] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - - description: The specified job was not found.
[2019-11-19 20:18:49,813] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,812] {bash_operator.py:101} INFO - field: name
[2019-11-19 20:18:49,853] {base_task_runner.py:98} INFO - Subtask: [2019-11-19 20:18:49,852] {bash_operator.py:105} INFO - Command exited with return code 1
[2019-11-19 20:18:49,883] {models.py:1595} ERROR - Bash command failed

Что должно быть в поле OPERATION?

Для большего контекста мой даг выглядит так:


with dag:
    env = {}
    env['BUCKET'] = models.Variable.get('bucket_name')
    env['JOB_NAME'] = "ai_composer_{}".format(datetime.now().strftime("%Y%m%d_%H%M%S"))
    env['JOB_DIR'] = "gs://{bucket}/jobs/{job_name}".format(bucket=env['BUCKET'], job_name=env['JOB_NAME'])
    env['REGION'] = models.Variable.get('ai_region')
    env['PACKAGE_PATH'] = models.Variable.get('ai_training_package')
    env['CONFIG'] = models.Variable.get('train_config_path', deserialize_json=True)
    env['OUTPUT_FOLDER'] = "{job_dir}/model/".format(job_dir=env['JOB_DIR'])
    env['DUMMY_TRAINING_FILE'] = models.Variable.get('dummy_training_file')


    test_training = BashOperator(
        task_id='test_training',
        xcom_push=True,
        bash_command='gcloud ai-platform jobs submit training {{ params.JOB_NAME }} \
                --region {{ params.REGION }} \
                --scale-tier=CUSTOM \
                --python-version 3.5 \
                --runtime-version 1.13 \
                --master-machine-type n1-highcpu-16 \
                --staging-bucket gs://{{ params.BUCKET }} \
                --job-dir {{ params.JOB_DIR }} \
                --module-name trainer.task \
                --packages {{ params.PACKAGE_PATH }} \
                -- \
                --gcs-bucket gs://{{ params.BUCKET }} \
                --train-file {{ params.DUMMY_TRAINING_FILE }}\
                --verbose-logging \
                --data-type web_views \
                --delimiter , \
                && echo "{{ params.JOB_NAME }}"',
        params=env
    )

    get_ml_status = BashOperator(
        task_id='get_ml_status',
        xcom_push=True,
        bash_command='gcloud ai-platform operations wait {{ params.JOB_NAME }}',
        params=env
    )

test_training >> get_ml_status

Здесь test_training успешна, поэтому задание на обучение запускается до запуска задачи get_ml_status.


person AntsaR    schedule 19.11.2019    source источник
comment
Вы можете позвонить в GetJob, чтобы проверить состояние задания.   -  person Guoqing Xu    schedule 21.11.2019


Ответы (1)


В итоге я нашел обходной путь, проверив статус работы с описанием вакансий.

output = sh.gcloud("ai-platform", "jobs", "describe", job_name)

Я выполняю цикл while, пока состояние не станет SUCCEEDED или FAILED.

person AntsaR    schedule 20.11.2019