Как сделать несколько параллельных считывателей для экспорта данных с помощью Google Spanner?

Внешние резервные копии / снимки для Google Cloud Spanner рекомендует использовать запросы с отметкой времени. границы для создания снимков для экспорта. Внизу документации Timestamp Bounds говорится:

Cloud Spanner постоянно собирает мусор в фоновом режиме, чтобы освободить место для хранения. Этот процесс известен как версия GC. По умолчанию сборщик мусора версии восстанавливает версии по истечении одного часа. Из-за этого Cloud Spanner не может выполнять чтение с отметкой времени чтения более одного часа назад.

Таким образом, любой экспорт должен быть завершен в течение часа. Один считыватель (т.е. select * from table; с меткой времени X) не сможет экспортировать всю таблицу в течение часа.

Как можно реализовать несколько параллельных считывателей с помощью гаечного ключа?


Примечание. В одном из комментариев упоминается, что скоро появится поддержка Apache Beam, но похоже, что для этого используется один считыватель:

/** A simplest read function implementation. Parallelism support is coming. */

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java#L26

Есть ли способ сделать параллельный считыватель, который сегодня требуется для луча, с использованием существующих API-интерфейсов? Или Beam нужно будет использовать что-то, что еще не выпущено для гаечного ключа Google?


person onionjake    schedule 30.06.2017    source источник
comment
Для простого запроса, такого как select * from table, если вы знаете границы и распределение первичного ключа, вы можете написать код, запускающий несколько процессов, считывающих данные из одной таблицы. Это становится сложнее для более сложных запросов. Is there a way to do the parallel reader that beam requires today using existing APIs? Or will Beam need to use something that isn't released yet on google spanner? Мы работаем над более производительной реализацией, которая также включает новый API. Он также решит проблему с версией GC, о которой вы упомянули.   -  person Mairbek Khadikov    schedule 30.06.2017
comment
@MairbekKhadikov Я не знаю распределения границ первичного ключа. Я надеюсь, что скоро мы получим другие API.   -  person onionjake    schedule 07.08.2017


Ответы (2)


Можно параллельно читать данные из Cloud Spanner с классом BatchClient. Следуйте read_data_in_parallel для получения дополнительной информации.

Если вы хотите экспортировать данные из Cloud Spanner, я бы порекомендовал вам использовать Cloud Dataflow (см. Подробности интеграции здесь), поскольку он обеспечивает абстракции более высокого уровня и заботится о деталях обработки данных, таких как масштабирование и обработка сбоев.

person Mairbek Khadikov    schedule 27.03.2018

Изменить 2018-03-30 - Пример проекта был обновлен для использования BatchClient, предлагаемого Google Cloud Spanner

После выпуска BatchClient для чтения / загрузки больших объемов данных приведенный ниже пример проекта был обновлен для использования нового пакетного клиента вместо стандартного клиента базы данных. Основная идея проекта осталась прежней: копирование данных в / из Cloud Spanner и любой другой базы данных с использованием стандартных функций jdbc. Следующий фрагмент кода устанавливает соединение jdbc в режиме пакетного чтения:

if (source.isWrapperFor(ICloudSpannerConnection.class))
{
    ICloudSpannerConnection con = source.unwrap(ICloudSpannerConnection.class);
    // Make sure no transaction is running
    if (!con.isBatchReadOnly())
    {
        if (con.getAutoCommit())
        {
            con.setAutoCommit(false);
        }
        else
        {
            con.commit();
        }
        con.setBatchReadOnly(true);
    }
}

Когда соединение находится в «пакетном режиме только для чтения», соединение будет использовать BatchClient из Google Cloud Spanner вместо стандартного клиента базы данных. Когда вызывается один из методов Statement#execute(String) или PreparedStatement#execute() (поскольку они позволяют возвращать несколько наборов результатов), драйвер jdbc создает секционированный запрос вместо обычного запроса. Результатом этого секционированного запроса будет ряд наборов результатов (по одному на секцию), которые могут быть получены методами Statement # getResultSet () и Statement # getMoreResults (int).

Statement statement = source.createStatement();
boolean hasResults = statement.execute(select);
int workerNumber = 0;
while (hasResults)
{
    ResultSet rs = statement.getResultSet();
    PartitionWorker worker = new PartitionWorker("PartionWorker-" + workerNumber, config, rs, tableSpec, table, insertCols);
    workers.add(worker);
    hasResults = statement.getMoreResults(Statement.KEEP_CURRENT_RESULT);
    workerNumber++;
}

Наборы результатов, возвращаемые Statement#execute(String), не выполняются напрямую, а только после первого вызова ResultSet#next(). Передача этих наборов результатов в отдельные рабочие потоки обеспечивает параллельную загрузку и копирование данных.


Оригинальный ответ:

Этот проект изначально создавался для преобразования в другом направлении (из локальной базы данных в облако Spanner), но поскольку он использует JDBC как для источника, так и для назначения, его также можно использовать и наоборот: преобразование базы данных Cloud Spanner в локальную базу данных PostgreSQL. Большие таблицы преобразуются параллельно с использованием пула потоков.

В проекте используется этот драйвер JDBC с открытым исходным кодом вместо драйвера JDBC, поставляемого Google. Исходное соединение JDBC Cloud Spanner установлено в режим только для чтения и autocommit = false. Это гарантирует, что соединение автоматически создает транзакцию только для чтения с использованием текущего времени в качестве отметки времени при первом выполнении запроса. Все последующие запросы в рамках одной транзакции (только для чтения) будут использовать одну и ту же метку времени, что даст вам согласованный снимок вашей базы данных Google Cloud Spanner.

Это работает следующим образом:

  1. Установите для исходной базы данных транзакционный режим только для чтения.
  2. Метод convert (String catalog, String schema) выполняет итерацию по всем таблицам в исходной базе данных (Cloud Spanner).
  3. Для каждой таблицы определяется количество записей, и в зависимости от размера таблицы таблица копируется с использованием либо основного потока приложения, либо рабочего пула.
  4. За параллельное копирование отвечает класс UploadWorker. Каждому исполнителю назначается диапазон записей из таблицы (например, строки с 1 по 2400). Диапазон выбирается оператором select в следующем формате: 'SELECT * FROM $ TABLE ORDER BY $ PK_COLUMNS LIMIT $ BATCH_SIZE OFFSET $ CURRENT_OFFSET'
  5. Зафиксируйте транзакцию только для чтения в исходной базе данных после преобразования ВСЕХ таблиц.

Ниже приведен фрагмент кода наиболее важных частей.

public void convert(String catalog, String schema) throws SQLException
{
    int batchSize = config.getBatchSize();
    destination.setAutoCommit(false);
    // Set the source connection to transaction mode (no autocommit) and read-only
    source.setAutoCommit(false);
    source.setReadOnly(true);
    try (ResultSet tables = destination.getMetaData().getTables(catalog, schema, null, new String[] { "TABLE" }))
    {
        while (tables.next())
        {
            String tableSchema = tables.getString("TABLE_SCHEM");
            if (!config.getDestinationDatabaseType().isSystemSchema(tableSchema))
            {
                String table = tables.getString("TABLE_NAME");
                // Check whether the destination table is empty.
                int destinationRecordCount = getDestinationRecordCount(table);
                if (destinationRecordCount == 0 || config.getDataConvertMode() == ConvertMode.DropAndRecreate)
                {
                    if (destinationRecordCount > 0)
                    {
                        deleteAll(table);
                    }
                    int sourceRecordCount = getSourceRecordCount(getTableSpec(catalog, tableSchema, table));
                    if (sourceRecordCount > batchSize)
                    {
                        convertTableWithWorkers(catalog, tableSchema, table);
                    }
                    else
                    {
                        convertTable(catalog, tableSchema, table);
                    }
                }
                else
                {
                    if (config.getDataConvertMode() == ConvertMode.ThrowExceptionIfExists)
                        throw new IllegalStateException("Table " + table + " is not empty");
                    else if (config.getDataConvertMode() == ConvertMode.SkipExisting)
                        log.info("Skipping data copy for table " + table);
                }
            }
        }
    }
    source.commit();
}

private void convertTableWithWorkers(String catalog, String schema, String table) throws SQLException
{
    String tableSpec = getTableSpec(catalog, schema, table);
    Columns insertCols = getColumns(catalog, schema, table, false);
    Columns selectCols = getColumns(catalog, schema, table, true);
    if (insertCols.primaryKeyCols.isEmpty())
    {
        log.warning("Table " + tableSpec + " does not have a primary key. No data will be copied.");
        return;
    }
    log.info("About to copy data from table " + tableSpec);

    int batchSize = config.getBatchSize();
    int totalRecordCount = getSourceRecordCount(tableSpec);
    int numberOfWorkers = calculateNumberOfWorkers(totalRecordCount);
    int numberOfRecordsPerWorker = totalRecordCount / numberOfWorkers;
    if (totalRecordCount % numberOfWorkers > 0)
        numberOfRecordsPerWorker++;
    int currentOffset = 0;
    ExecutorService service = Executors.newFixedThreadPool(numberOfWorkers);
    for (int workerNumber = 0; workerNumber < numberOfWorkers; workerNumber++)
    {
        int workerRecordCount = Math.min(numberOfRecordsPerWorker, totalRecordCount - currentOffset);
        UploadWorker worker = new UploadWorker("UploadWorker-" + workerNumber, selectFormat, tableSpec, table,
                insertCols, selectCols, currentOffset, workerRecordCount, batchSize, source,
                config.getUrlDestination(), config.isUseJdbcBatching());
        service.submit(worker);
        currentOffset = currentOffset + numberOfRecordsPerWorker;
    }
    service.shutdown();
    try
    {
        service.awaitTermination(config.getUploadWorkerMaxWaitInMinutes(), TimeUnit.MINUTES);
    }
    catch (InterruptedException e)
    {
        log.severe("Error while waiting for workers to finish: " + e.getMessage());
        throw new RuntimeException(e);
    }

}

public class UploadWorker implements Runnable
{
private static final Logger log = Logger.getLogger(UploadWorker.class.getName());

private final String name;

private String selectFormat;

private String sourceTable;

private String destinationTable;

private Columns insertCols;

private Columns selectCols;

private int beginOffset;

private int numberOfRecordsToCopy;

private int batchSize;

private Connection source;

private String urlDestination;

private boolean useJdbcBatching;

UploadWorker(String name, String selectFormat, String sourceTable, String destinationTable, Columns insertCols,
        Columns selectCols, int beginOffset, int numberOfRecordsToCopy, int batchSize, Connection source,
        String urlDestination, boolean useJdbcBatching)
{
    this.name = name;
    this.selectFormat = selectFormat;
    this.sourceTable = sourceTable;
    this.destinationTable = destinationTable;
    this.insertCols = insertCols;
    this.selectCols = selectCols;
    this.beginOffset = beginOffset;
    this.numberOfRecordsToCopy = numberOfRecordsToCopy;
    this.batchSize = batchSize;
    this.source = source;
    this.urlDestination = urlDestination;
    this.useJdbcBatching = useJdbcBatching;
}

@Override
public void run()
{
    // Connection source = DriverManager.getConnection(urlSource);
    try (Connection destination = DriverManager.getConnection(urlDestination))
    {
        log.info(name + ": " + sourceTable + ": Starting copying " + numberOfRecordsToCopy + " records");

        destination.setAutoCommit(false);
        String sql = "INSERT INTO " + destinationTable + " (" + insertCols.getColumnNames() + ") VALUES \n";
        sql = sql + "(" + insertCols.getColumnParameters() + ")";
        PreparedStatement statement = destination.prepareStatement(sql);

        int lastRecord = beginOffset + numberOfRecordsToCopy;
        int recordCount = 0;
        int currentOffset = beginOffset;
        while (true)
        {
            int limit = Math.min(batchSize, lastRecord - currentOffset);
            String select = selectFormat.replace("$COLUMNS", selectCols.getColumnNames());
            select = select.replace("$TABLE", sourceTable);
            select = select.replace("$PRIMARY_KEY", selectCols.getPrimaryKeyColumns());
            select = select.replace("$BATCH_SIZE", String.valueOf(limit));
            select = select.replace("$OFFSET", String.valueOf(currentOffset));
            try (ResultSet rs = source.createStatement().executeQuery(select))
            {
                while (rs.next())
                {
                    int index = 1;
                    for (Integer type : insertCols.columnTypes)
                    {
                        Object object = rs.getObject(index);
                        statement.setObject(index, object, type);
                        index++;
                    }
                    if (useJdbcBatching)
                        statement.addBatch();
                    else
                        statement.executeUpdate();
                    recordCount++;
                }
                if (useJdbcBatching)
                    statement.executeBatch();
            }
            destination.commit();
            log.info(name + ": " + sourceTable + ": Records copied so far: " + recordCount + " of "
                    + numberOfRecordsToCopy);
            currentOffset = currentOffset + batchSize;
            if (recordCount >= numberOfRecordsToCopy)
                break;
        }
    }
    catch (SQLException e)
    {
        log.severe("Error during data copy: " + e.getMessage());
        throw new RuntimeException(e);
    }
    log.info(name + ": Finished copying");
}

}
person Knut Olav Løite    schedule 09.07.2017
comment
Я провел несколько тестов, используя чтение пределов и смещений. Используя ограничение / смещение, анализатор запросов сообщает, что он выполняет последовательное сканирование всех строк, что означает, что время запроса будет линейно масштабироваться. Это проблема, потому что в таблице, которую я использую, будут миллиарды строк. - person onionjake; 07.08.2017
comment
Я думаю, что анализатор запросов вас немного обманывает. У меня очень простая таблица с двумя столбцами: номер INT64 и имя STRING (100), содержащее 1000000 кортежей, таких как (1, 'one'), (2, 'two') ... При выполнении выбора с ограничением и упорядоченным смещением по первичному ключу (номеру) запрос занимает не более 500 мс. При выполнении того же запроса по другому столбцу (имени) запрос занимает 15-20 секунд для больших смещений. В обоих случаях анализатор запросов утверждает, что выполняет сканирование таблицы (планы запросов, возвращаемые Cloud Spanner, одинаковы для обоих запросов). - person Knut Olav Løite; 09.08.2017
comment
Я провел несколько тестов с LIMIT и OFFSET. Даже когда я заказываю по первичному ключу, время запроса линейно масштабируется с помощью OFFSET, поэтому для выполнения OFFSET 300000 требуется около 20 секунд. - person onionjake; 12.03.2018
comment
Cloud Spanner не показывает границы разделов, мы хотели бы быть гибкими, чтобы обновить реализацию. Запросы LIMIT / OFFSET - это не то приближение, которое вам нужно, потому что оно не масштабируется. Вы можете спроектировать свою схему так, чтобы можно было отслеживать приблизительные разделы, но это не работает для произвольных запросов. Рекомендую использовать BatchClient. См. cloud.google.com/spanner/docs/reads#read_data_in_parallel. - person Mairbek Khadikov; 27.03.2018