Apache Flink: проблема с производительностью при выполнении большого количества заданий

При большом количестве запросов Flink SQL (100 из нижеприведенных) клиент командной строки Flink дает сбой «JobManager не ответил в течение 600000 мс» в кластере Yarn, т.е. задание никогда не запускается в кластере.

  • Журналы JobManager не содержат ничего после последнего запуска TaskManager, кроме журналов DEBUG с «заданием с идентификатором 5cd95f89ed7a66ec44f2d19eca0592f7, не найденным в JobManager», что указывает на его вероятное застревание (создание ExecutionGraph?).
  • То же самое работает как автономная Java-программа локально (изначально высокая загрузка ЦП)
  • Примечание. Каждая строка в structStream содержит 515 столбцов (многие заканчиваются нулевым значением), включая столбец с необработанным сообщением.
  • В кластере YARN мы указываем 18 ГБ для TaskManager, 18 ГБ для JobManager, 5 слотов каждый и параллелизм 725 (разделы в нашем исходном коде Kafka).

Запрос SQL Flink:

select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, 
       EventTimestamp, RawMsg, Source 
from structStream
where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' 
      and Outcome='Success'
group by tumble(proctime, INTERVAL '1' SECOND), Environment, 
         CollectedTimestamp, EventTimestamp, RawMsg, Source

Код

public static void main(String[] args) throws Exception {
    FileSystems.newFileSystem(KafkaReadingStreamingJob.class
                             .getResource(WHITELIST_CSV).toURI(), new HashMap<>());

    final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment();
    final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment);

    final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment);
    tableEnv.registerDataStream("structStream", structStream);
    tableEnv.scan("structStream").printSchema();

    for (int i = 0; i < 100; i++) {
        for (String query : Queries.sample) {
            // Queries.sample has one query that is above. 
            Table selectQuery = tableEnv.sqlQuery(query);

            DataStream<Row> selectQueryStream =                                                 
                               tableEnv.toAppendStream(selectQuery, Row.class);
            selectQueryStream.print();
        }
    }

    // execute program
    streamingEnvironment.execute("Kafka Streaming SQL");
}

private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
    Properties properties = getKafkaProperties();

    // TestDeserializer deserializes the JSON to a ROW of string columns (515)
    // and also adds a column for the raw message. 
    FlinkKafkaConsumer011 consumer = new         
         FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties);
    DataStream<Row> stream = environment.addSource(consumer);

    return stream;
}

private static RowTypeInfo getRowTypeInfo() throws Exception {
    // This has 515 fields. 
    List<String> fieldNames = DDIManager.getDDIFieldNames();
    fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
    fieldNames.add("proctime");

    // Fill typeInformationArray with StringType to all but the last field which is of type Time
    .....
    return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}

private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
    final StreamExecutionEnvironment env =                      
    StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

    env.enableCheckpointing(60000);
    env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
    env.setParallelism(725);
    return env;
}

private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
    Properties properties = getKafkaProperties();

    // TestDeserializer deserializes the JSON to a ROW of string columns (515)
    // and also adds a column for the raw message. 
    FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new  TestDeserializer(getRowTypeInfo()), properties);
    DataStream<Row> stream = environment.addSource(consumer);

    return stream;
}

private static RowTypeInfo getRowTypeInfo() throws Exception {
    // This has 515 fields. 
    List<String> fieldNames = DDIManager.getDDIFieldNames();
    fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
    fieldNames.add("proctime");

    // Fill typeInformationArray with StringType to all but the last field which is of type Time
    .....
    return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}

private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
    final StreamExecutionEnvironment env =     StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

    env.enableCheckpointing(60000);
    env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
    env.setParallelism(725);
    return env;
}

person Subramanya Suresh    schedule 13.04.2018    source источник
comment
Из-за тайм-аута клиента командной строки и зависания JobManager задание никогда не выходит за пределы фазы инициализации, поэтому не может выполнять указанные запросы. Я попытался уточнить это в вопросе, спасибо.   -  person Subramanya Suresh    schedule 13.04.2018
comment
Во Flink также есть билет JIRA с дополнительной информацией: issues.apache.org/jira/ просмотр / FLINK-9166   -  person Robert Metzger    schedule 19.04.2018


Ответы (1)


Мне кажется, что JobManager перегружен слишком большим количеством одновременно выполняемых заданий. Я бы предложил распределить задания по большему количеству кластеров JobManager / Flink.

person Fabian Hueske    schedule 16.04.2018