感谢,已找到问题原因,这个provider变量应该放到setHttpClientConfigCallback内部,之前是作为私有成员变量transient声明的,会导致taskmanager无法拿到认证信息
String user = pt.get("es.user.name");
String password = pt.get("es.user.password");
esSinkBuilder.setRestClientFactory(
(RestClientBuilder restClientBuilder) ->
restClientBuilder
.setHttpClientConfigCallback(httpClientBuilder
->
{
CredentialsProvider provider = new
BasicCredentialsProvider();
provider.setCredentials(AuthScope.ANY,
new
UsernamePasswordCredentials(user, password));
httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
)
);
在2020年7月13日 15:33,Yangze Guo<[email protected]> 写道:
Hi,
请问您有检查过pt.get("es.user.name"),
pt.get("es.user.password")这两个参数读出来是否都是正确的,另外更完整的错误栈方便提供下么?
Best,
Yangze Guo
On Mon, Jul 13, 2020 at 3:10 PM 李宇彬 <[email protected]> wrote:
各位好,
请教一个问题
我们生产环境的es7是有用户名密码认证的,使用如下代码启动后会报,这段代码调用了es rest client api,单独使用是没问题的,不过放到 flink
里就报错了
org.elasticsearch.client.ResponseException: method [HEAD], host [xxx], URI [/],
status line [HTTP/1.1 401 Unauthorized]
ParameterTool pt = ParameterTool.fromArgs(args);
String confFile = pt.get("confFile");
pt = ParameterTool.fromPropertiesFile(new File(confFile));
provider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(pt.get("es.user.name"),
pt.get("es.user.password")));
esSinkBuilder.setRestClientFactory(
(RestClientBuilder restClientBuilder) ->
restClientBuilder
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder.setSocketTimeout(180000)
.setConnectionRequestTimeout(10000)
)
.setHttpClientConfigCallback(httpClientBuilder ->
{
httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证
return httpClientBuilder.setDefaultCredentialsProvider(provider);
}
)
);