У меня возникла проблема с заданием Azure Stream Analytics (ASA), которое должно вызывать функцию службы машинного обучения Azure для оценки предоставленных входных данных. Запрос был разработан и протестирован в Visual Studio (VS) 2019 с расширением Azure Data Lake и Stream Analytics Tools. В качестве входных данных задание использует Azure IoT-Hub, а в качестве выходных — локальные выходные данные VS для целей тестирования (а позже — даже с Blobstorage). В этой среде все работает нормально, вызов функции службы ML выполняется успешно и возвращает желаемый ответ. При использовании того же запроса, определяемых пользователем функций и агрегатов, как и в VS в облачном задании, выходные события не генерируются (ни Blobstorage, ни Power BI в качестве выходных данных). В ML Webservice видно, что ASA успешно вызывает функцию, но почему-то не возвращает данных ответа. Удаление вызова функции ML из запроса приводит к успешному выполнению задания с выходными событиями.
Для развертывания веб-службы ML я попробовал следующее (работает для VS, без вывода в облаке):
ACI (1 ЦП, 1 ГБ ОЗУ)
Разработка/тестирование AKS (виртуальная машина Standard_B2s)
Производство AKS (виртуальная машина Standard_D3_v2)
Схема функции сценария вывода:
ввод: массив
вывод: запись
Схема ввода скрипта вывода выглядит так:
@input_schema('data', NumpyParameterType(input_sample, enforce_shape=False))
@output_schema(NumpyParameterType(output_sample)) # other parameter type for record caused error in ASA
def run(data):
response = {'score1': 0,
'score2': 0,
'score3': 0,
'score4': 0,
'score5': 0,
'highest_score': None}
И возвращаемое значение:
return [response]
Подзапрос задания ASA с вызовом функции ML:
with raw_scores as (
select
time, udf.HMMscore(udf.numpyfySeq(Sequence)) as score
from Sequence
)
и UDF numpyfySeq, например:
// creates a N x 18 size array
function numpyfySeq(Sequence) {
'use strict';
var transpose = m => m[0].map((x, i) => m.map(x => x[i]));
var array = [];
for (var feature in Sequence) {
if (feature != "time") {
array.push(Sequence[feature])
}
}
return transpose(array);
}
Последовательность — это подзапрос, который объединяет данные в последовательности (массивы) с помощью определяемого пользователем агрегата.
В VS данные поступают из IoT-Hub (выбран облачный ввод). Сигнатура функции правильно распознается на портале, как показано на изображении: Сигнатура функции
Я надеюсь, что предоставленной информации достаточно, и вы можете мне помочь.
Изменить:
Аутентификация для веб-службы Azure ML основана на ключе. В ASA при выборе использования функции службы машинного обучения Azure она автоматически обнаруживает и использует ключи из развернутой модели машинного обучения в рамках подписки и рабочей области машинного обучения. Используемый код развертывания (в этом примере для ACI, но выглядит почти так же для развертывания AKS):
from azureml.core.model import InferenceConfig, Model
from azureml.core.environment import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.webservice import AciWebservice
ws = Workspace.from_config()
env = Environment(name='scoring_env')
deps = CondaDependencies(conda_dependencies_file_path='./deps')
env.python.conda_dependencies = deps
inference_config = InferenceConfig(source_directory='./prediction/',
entry_script='score.py',
environment=env)
deployment_config = AciWebservice.deploy_configuration(auth_enabled=True, cpu_cores=1,
memory_gb=1)
model = Model(ws, 'HMM')
service = Model.deploy(ws, 'hmm-scoring', models,
inference_config,
deployment_config,
overwrite=True,)
service.wait_for_deployment(show_output=True)
с conda_dependencies:
name: project_environment
dependencies:
# The python interpreter version.
# Currently Azure ML only supports 3.5.2 and later.
- python=3.7.5
- pip:
- sklearn
- azureml-core
- azureml-defaults
- inference-schema[numpy-support]
- hmmlearn
- numpy
- pip
channels:
- anaconda
- conda-forge
Код, используемый в score.py, представляет собой обычную операцию оценки с загруженными моделями и форматированием, например:
score1 = model1.score(data)
score2 = model2.score(data)
score3 = model3.score(data)
# Same scoring with model4 and model5
# scaling of the scores to a defined interval and determination of model that delivered highest score
response['score1'] = score1
response['score2'] = score2
# and so on