Я использую связыватель Spring Cloud Stream Kinesis (версия 2.1.0)
По соображениям безопасности у меня должен быть один набор учетных данных для Kinesis и другой набор учетных данных для DynamoDB и CloudWatch.
Все работает нормально, если для spring.cloud.stream.kinesis.binder.kplKclEnabled
установлено значение false. Но если для него установлено значение true, у меня будет исключение.
com.amazonaws.services.kinesis.model.ResourceNotFoundException: Stream {my_stream} under account {my_account} not found
Трассировка всего стека доступна по адресу https://pastebin.com/bjvKSzrg
Я хотел бы включить KCL, знает ли кто-нибудь, как избежать этой ошибки?
Я знаю, что эта ошибка возникает из-за того, что учетные данные пользователя для cloudwatch и Dynamodb не видят упомянутый поток Kinesis. Но зачем им это видеть? Кроме того, если KCL отключен, он работает должным образом. так что не понимаю, почему это не работает с включенным KCL
Вот мой файл свойств
spring.main.allow-bean-definition-overriding=true
spring.cloud.stream.bindings.input.destination=streamName
spring.cloud.stream.bindings.input.group=worker
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.kinesis.bindings.input.consumer.listener-mode=batch
spring.cloud.stream.bindings.input.binder=kinesisConsumer
spring.cloud.stream.binders.kinesisConsumer.type=kinesis
spring.cloud.stream.binders.kinesisConsumer.defaultCandidate=false
spring.cloud.stream.binders.kinesisConsumer.environment.spring.main.sources=com.philips.ka.oneka.kinesis.config.KinesisOutputConfiguration
cloud.aws.stack.auto=false
cloud.aws.credentials.useDefaultAwsCredentialsChain=false
cloud.aws.credentials.instanceProfile=true
spring.cloud.stream.kinesis.binder.kplKclEnabled=true
Упомянутый класс конфигурации
@Configuration
@EnableConfigurationProperties(AwsProperties.class)
public class KinesisOutputConfiguration {
AwsProperties.Properties properties;
public KinesisOutputConfiguration(AwsProperties awsProperties) {
this.properties = awsProperties.getStreamType().get(AwsProperties.StreamType.SPECTRE);
}
@Bean(destroyMethod = "shutdown")
public AmazonKinesisAsync amazonKinesis() {
RefreshingCredentials refreshingCredentials = new RefreshingCredentials(this.properties.getRefreshed.getUrl(), this.properties.getHsdp().getClientId(),
this.properties.getRefreshed().getClientSecret(), this.properties.getRefreshed().getUsername(), this.properties.getRefreshed().getPassword(),
this.properties.getRefreshed().getDiscoveryUrl(), new UriTemplate("{databroker_url}/Stream/$getaccessdetails"),
new RestTemplate());
return AmazonKinesisAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("eu-west-1").build();
}
@Bean(destroyMethod = "shutdown")
public AmazonCloudWatchAsync cloudWatch() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonCloudWatchAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
@Bean(destroyMethod = "shutdown")
@Primary
public AmazonDynamoDBAsync dynamoDBAsync() {
AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(this.properties.getAccessKey(),
this.properties.getSecretKey()));
return AmazonDynamoDBAsyncClientBuilder.standard().withCredentials(credentialsProvider).withRegion("us-east-2").build();
}
}