Изменить 2018-03-30 - Пример проекта был обновлен для использования BatchClient, предлагаемого Google Cloud Spanner
После выпуска BatchClient для чтения / загрузки больших объемов данных приведенный ниже пример проекта был обновлен для использования нового пакетного клиента вместо стандартного клиента базы данных. Основная идея проекта осталась прежней: копирование данных в / из Cloud Spanner и любой другой базы данных с использованием стандартных функций jdbc. Следующий фрагмент кода устанавливает соединение jdbc в режиме пакетного чтения:
if (source.isWrapperFor(ICloudSpannerConnection.class))
{
ICloudSpannerConnection con = source.unwrap(ICloudSpannerConnection.class);
// Make sure no transaction is running
if (!con.isBatchReadOnly())
{
if (con.getAutoCommit())
{
con.setAutoCommit(false);
}
else
{
con.commit();
}
con.setBatchReadOnly(true);
}
}
Когда соединение находится в «пакетном режиме только для чтения», соединение будет использовать BatchClient из Google Cloud Spanner вместо стандартного клиента базы данных. Когда вызывается один из методов Statement#execute(String)
или PreparedStatement#execute()
(поскольку они позволяют возвращать несколько наборов результатов), драйвер jdbc создает секционированный запрос вместо обычного запроса. Результатом этого секционированного запроса будет ряд наборов результатов (по одному на секцию), которые могут быть получены методами Statement # getResultSet () и Statement # getMoreResults (int).
Statement statement = source.createStatement();
boolean hasResults = statement.execute(select);
int workerNumber = 0;
while (hasResults)
{
ResultSet rs = statement.getResultSet();
PartitionWorker worker = new PartitionWorker("PartionWorker-" + workerNumber, config, rs, tableSpec, table, insertCols);
workers.add(worker);
hasResults = statement.getMoreResults(Statement.KEEP_CURRENT_RESULT);
workerNumber++;
}
Наборы результатов, возвращаемые Statement#execute(String)
, не выполняются напрямую, а только после первого вызова ResultSet#next()
. Передача этих наборов результатов в отдельные рабочие потоки обеспечивает параллельную загрузку и копирование данных.
Оригинальный ответ:
Этот проект изначально создавался для преобразования в другом направлении (из локальной базы данных в облако Spanner), но поскольку он использует JDBC как для источника, так и для назначения, его также можно использовать и наоборот: преобразование базы данных Cloud Spanner в локальную базу данных PostgreSQL. Большие таблицы преобразуются параллельно с использованием пула потоков.
В проекте используется этот драйвер JDBC с открытым исходным кодом вместо драйвера JDBC, поставляемого Google. Исходное соединение JDBC Cloud Spanner установлено в режим только для чтения и autocommit = false. Это гарантирует, что соединение автоматически создает транзакцию только для чтения с использованием текущего времени в качестве отметки времени при первом выполнении запроса. Все последующие запросы в рамках одной транзакции (только для чтения) будут использовать одну и ту же метку времени, что даст вам согласованный снимок вашей базы данных Google Cloud Spanner.
Это работает следующим образом:
- Установите для исходной базы данных транзакционный режим только для чтения.
- Метод convert (String catalog, String schema) выполняет итерацию по всем таблицам в исходной базе данных (Cloud Spanner).
- Для каждой таблицы определяется количество записей, и в зависимости от размера таблицы таблица копируется с использованием либо основного потока приложения, либо рабочего пула.
- За параллельное копирование отвечает класс UploadWorker. Каждому исполнителю назначается диапазон записей из таблицы (например, строки с 1 по 2400). Диапазон выбирается оператором select в следующем формате: 'SELECT * FROM $ TABLE ORDER BY $ PK_COLUMNS LIMIT $ BATCH_SIZE OFFSET $ CURRENT_OFFSET'
- Зафиксируйте транзакцию только для чтения в исходной базе данных после преобразования ВСЕХ таблиц.
Ниже приведен фрагмент кода наиболее важных частей.
public void convert(String catalog, String schema) throws SQLException
{
int batchSize = config.getBatchSize();
destination.setAutoCommit(false);
// Set the source connection to transaction mode (no autocommit) and read-only
source.setAutoCommit(false);
source.setReadOnly(true);
try (ResultSet tables = destination.getMetaData().getTables(catalog, schema, null, new String[] { "TABLE" }))
{
while (tables.next())
{
String tableSchema = tables.getString("TABLE_SCHEM");
if (!config.getDestinationDatabaseType().isSystemSchema(tableSchema))
{
String table = tables.getString("TABLE_NAME");
// Check whether the destination table is empty.
int destinationRecordCount = getDestinationRecordCount(table);
if (destinationRecordCount == 0 || config.getDataConvertMode() == ConvertMode.DropAndRecreate)
{
if (destinationRecordCount > 0)
{
deleteAll(table);
}
int sourceRecordCount = getSourceRecordCount(getTableSpec(catalog, tableSchema, table));
if (sourceRecordCount > batchSize)
{
convertTableWithWorkers(catalog, tableSchema, table);
}
else
{
convertTable(catalog, tableSchema, table);
}
}
else
{
if (config.getDataConvertMode() == ConvertMode.ThrowExceptionIfExists)
throw new IllegalStateException("Table " + table + " is not empty");
else if (config.getDataConvertMode() == ConvertMode.SkipExisting)
log.info("Skipping data copy for table " + table);
}
}
}
}
source.commit();
}
private void convertTableWithWorkers(String catalog, String schema, String table) throws SQLException
{
String tableSpec = getTableSpec(catalog, schema, table);
Columns insertCols = getColumns(catalog, schema, table, false);
Columns selectCols = getColumns(catalog, schema, table, true);
if (insertCols.primaryKeyCols.isEmpty())
{
log.warning("Table " + tableSpec + " does not have a primary key. No data will be copied.");
return;
}
log.info("About to copy data from table " + tableSpec);
int batchSize = config.getBatchSize();
int totalRecordCount = getSourceRecordCount(tableSpec);
int numberOfWorkers = calculateNumberOfWorkers(totalRecordCount);
int numberOfRecordsPerWorker = totalRecordCount / numberOfWorkers;
if (totalRecordCount % numberOfWorkers > 0)
numberOfRecordsPerWorker++;
int currentOffset = 0;
ExecutorService service = Executors.newFixedThreadPool(numberOfWorkers);
for (int workerNumber = 0; workerNumber < numberOfWorkers; workerNumber++)
{
int workerRecordCount = Math.min(numberOfRecordsPerWorker, totalRecordCount - currentOffset);
UploadWorker worker = new UploadWorker("UploadWorker-" + workerNumber, selectFormat, tableSpec, table,
insertCols, selectCols, currentOffset, workerRecordCount, batchSize, source,
config.getUrlDestination(), config.isUseJdbcBatching());
service.submit(worker);
currentOffset = currentOffset + numberOfRecordsPerWorker;
}
service.shutdown();
try
{
service.awaitTermination(config.getUploadWorkerMaxWaitInMinutes(), TimeUnit.MINUTES);
}
catch (InterruptedException e)
{
log.severe("Error while waiting for workers to finish: " + e.getMessage());
throw new RuntimeException(e);
}
}
public class UploadWorker implements Runnable
{
private static final Logger log = Logger.getLogger(UploadWorker.class.getName());
private final String name;
private String selectFormat;
private String sourceTable;
private String destinationTable;
private Columns insertCols;
private Columns selectCols;
private int beginOffset;
private int numberOfRecordsToCopy;
private int batchSize;
private Connection source;
private String urlDestination;
private boolean useJdbcBatching;
UploadWorker(String name, String selectFormat, String sourceTable, String destinationTable, Columns insertCols,
Columns selectCols, int beginOffset, int numberOfRecordsToCopy, int batchSize, Connection source,
String urlDestination, boolean useJdbcBatching)
{
this.name = name;
this.selectFormat = selectFormat;
this.sourceTable = sourceTable;
this.destinationTable = destinationTable;
this.insertCols = insertCols;
this.selectCols = selectCols;
this.beginOffset = beginOffset;
this.numberOfRecordsToCopy = numberOfRecordsToCopy;
this.batchSize = batchSize;
this.source = source;
this.urlDestination = urlDestination;
this.useJdbcBatching = useJdbcBatching;
}
@Override
public void run()
{
// Connection source = DriverManager.getConnection(urlSource);
try (Connection destination = DriverManager.getConnection(urlDestination))
{
log.info(name + ": " + sourceTable + ": Starting copying " + numberOfRecordsToCopy + " records");
destination.setAutoCommit(false);
String sql = "INSERT INTO " + destinationTable + " (" + insertCols.getColumnNames() + ") VALUES \n";
sql = sql + "(" + insertCols.getColumnParameters() + ")";
PreparedStatement statement = destination.prepareStatement(sql);
int lastRecord = beginOffset + numberOfRecordsToCopy;
int recordCount = 0;
int currentOffset = beginOffset;
while (true)
{
int limit = Math.min(batchSize, lastRecord - currentOffset);
String select = selectFormat.replace("$COLUMNS", selectCols.getColumnNames());
select = select.replace("$TABLE", sourceTable);
select = select.replace("$PRIMARY_KEY", selectCols.getPrimaryKeyColumns());
select = select.replace("$BATCH_SIZE", String.valueOf(limit));
select = select.replace("$OFFSET", String.valueOf(currentOffset));
try (ResultSet rs = source.createStatement().executeQuery(select))
{
while (rs.next())
{
int index = 1;
for (Integer type : insertCols.columnTypes)
{
Object object = rs.getObject(index);
statement.setObject(index, object, type);
index++;
}
if (useJdbcBatching)
statement.addBatch();
else
statement.executeUpdate();
recordCount++;
}
if (useJdbcBatching)
statement.executeBatch();
}
destination.commit();
log.info(name + ": " + sourceTable + ": Records copied so far: " + recordCount + " of "
+ numberOfRecordsToCopy);
currentOffset = currentOffset + batchSize;
if (recordCount >= numberOfRecordsToCopy)
break;
}
}
catch (SQLException e)
{
log.severe("Error during data copy: " + e.getMessage());
throw new RuntimeException(e);
}
log.info(name + ": Finished copying");
}
}
person
Knut Olav Løite
schedule
09.07.2017
select * from table
, если вы знаете границы и распределение первичного ключа, вы можете написать код, запускающий несколько процессов, считывающих данные из одной таблицы. Это становится сложнее для более сложных запросов.Is there a way to do the parallel reader that beam requires today using existing APIs? Or will Beam need to use something that isn't released yet on google spanner?
Мы работаем над более производительной реализацией, которая также включает новый API. Он также решит проблему с версией GC, о которой вы упомянули. - person Mairbek Khadikov   schedule 30.06.2017