Этот конвейер описывает, как использовать условный оператор kfp.dsl вместе с аргументом конвейера среды выполнения для разветвления конвейера. Kubeflow DSL не поддерживает использование обычных операторов if:else для определения графа конвейера. Для этого мы можем передать решающий аргумент, значение которого определяет поток выполнения конвейерного графа.
Это показано ниже на примере использования простых функций для вычисления сложения или деления по отношению к требуемой переменной вычисления во время выполнения, переданной в конвейер.

import kfp
import kfp.components as comp

Определение трех простых функций

def print_fun(calculation: str) -> str:
"""
simple print function
"""
	print("Calculation Type: ",calculation)
	return calculation
def add(calculation: str, a: float, b: float) -> float:
"""
calculates sum of two arguments
"""
	print("Calculation Type: ",calculation)
	print("Sum: ",a+b)
	return a + b
def div(calculation: str, dividend: float, divisor: float) -> float:
"""
Divides two numbers and calculates the quotient
"""
	print("Calculation Type: ",calculation)
	quotient = dividend/divisor
	print("Quotient: ",quotient)
	return quotient

Преобразование функций в операторы конвейера

print_op = comp.func_to_container_op(print_fun)
add_op = comp.func_to_container_op(add)
divmod_op = comp.func_to_container_op(div)

Определите конвейер

Функция конвейера должна быть украшена декоратором @dsl.pipeline вместе с dsl.Condition, используемым внутри функции конвейера для создания графа конвейера.

Использование операторов if:else не будет работать при определении того, как должно выполняться выполнение конвейера.

import kfp.dsl as dsl
@dsl.pipeline(
	name="calculation pipeline",
	description="A toy pipeline that performs arithmetic calculations."
)
def calculation_pipeline(calculation: str, a: float, b: float) -> None:
"""
This function defines the pipeline graph and uses dsl conditions that
need to be evaluated on runtime to decide the execution flow of the pipeline
"""
print_task = print_op(calculation)
# defining the branching condition
with dsl.Condition(print_task.output=="add"):
	add_task = add_op(print_task.output, a, 4).after(print_task)
with dsl.Condition(print_task.output=="div"):
	divmod_task = divmod_op(print_task.output, a, b).after(print_task)

Здесь следует задуматься над тем, что обеим операциям ветвления должна предшествовать одна и та же операция. Это гарантирует, что сформированный граф имеет 2 листовых узла, генерируемых из корневого узла.

С операторами dsl.condition и переданным динамическим аргументом мы можем быть уверены, что будет выполнен только один конечный узел. Это эмулирует выполнение простого оператора if:elif, но с использованием оператора dsl.condition.

Отправить конвейер на выполнение. Набор аргументов 01.

#Specify pipeline argument values
arguments = {"calculation": "add" ,"a": "7", "b": "8"}
#Submit a pipeline run
kfp.Client().create_run_from_pipeline_func(calculation_pipeline, arguments=arguments)

Отправить конвейер на выполнение. Набор аргументов 02.

#Specify pipeline argument values
arguments = {"calculation": "div" ,"a": "7", "b": "8"}
#Submit a pipeline run
kfp.Client().create_run_from_pipeline_func(calculation_pipeline, arguments=arguments)

Выход конвейера

Найдите скриншот запуска конвейера для обоих аргументов следующим образом: