2020-12-31

SqsAckSink makes the Akka stream hanging forever causing graph restarts

I'm trying to implement a simple workflow with an SQS source and a test with Localstack. And I cannot make it work if I add SqsAckSink, neither it works with SqsAckFlow. But if I remove SqsAckSink and just use Sink.seq(), then the test passes. Having the SqsAckSink or SqsAckFlow makes the test hanging forever. I have also enabled debug in the test and see that the same error repeats again and again making the graph restarting, but it doesn't make much sense to me. I'm posting the error messages below after the code snippets.

The code is:

    public class DefaultWorkflow implements Workflow {

  private Function<Throwable, Supervision.Directive> errorStrategy;
  private final ActorSystem actorSystem;
  private FlowMonitor<ProvisioningResult> workflowMonitor;
  private final String queueUrl;
  private final SqsAsyncClient asyncClient;

  @Inject
  public DefaultWorkflow(ActorSystem actorSystem, String queueUrl) {

    this.errorStrategy = exc -> (Supervision.Directive) Supervision.resume();
    this.actorSystem = actorSystem;
    this.queueUrl = queueUrl;

    asyncClient =
        SqsAsyncClient.builder()
            .region(Region.of(Localstack.getDefaultRegion()))
            .httpClient(AkkaHttpClient.builder().withActorSystem(actorSystem).build())
            .endpointOverride(new URI(queueUrl))
            .build();
    doWork();
  }

  private Flow<Message, ProvisioningResult, NotUsed> buildFlow() {
    return Flow.of(Message.class)
        .via(Flow.of(Message.class).map(m -> ProvisioningResult.builder().body(m.body()).build()));
  }

  @Override
  public Source<ProvisioningResult, FlowMonitor<ProvisioningResult>> getSource() {
    Source<Message, NotUsed> sqsSource =
        RestartSource.onFailuresWithBackoff(
            Duration.ofSeconds(1), Duration.ofSeconds(2), 0.1, this::createSQSSource);
    return sqsSource
        .via(buildFlow())
        .withAttributes(ActorAttributes.withSupervisionStrategy(errorStrategy))
        .monitorMat(Keep.right());
  }

  private Source<Message, NotUsed> createSQSSource() {
    SqsSourceSettings sqsSourceSettings = SqsSourceSettings.create().withMaxBatchSize(1);
    return SqsSource.create(queueUrl, sqsSourceSettings, asyncClient);
  }

  @Override
  public FlowMonitor<ProvisioningResult> getWorkflowMonitor() {
    return this.workflowMonitor;
  }

  private void doWork() {
    Pair<FlowMonitor<ProvisioningResult>, CompletionStage<Done>> run =
        getSource()
            .toMat(SqsAckSink.create(queueUrl, SqsAckSettings.create(), asyncClient), Keep.both())
            .run(actorSystem);
    workflowMonitor = run.first();
  }
}

The test looks like this:

  @Test
  public void getSource_givenMessage_shouldProduceResult()
      throws InterruptedException, ExecutionException, TimeoutException, URISyntaxException {
    String sqsName = "sqs2";
    String messageBody = "someMessage";
    String sqsUrl = initSQS(sqsName);
    generateSourceData(sqsUrl, messageBody);

    this.defaultWorkflow = new DefaultWorkflow(this.actorSystem, sqsUrl);

    Source<ProvisioningResult, FlowMonitor<ProvisioningResult>> source =
        defaultWorkflow.getSource();
    final CompletionStage<List<ProvisioningResult>> future =
        source.take(1).runWith(Sink.seq(), materializer);
    final List<ProvisioningResult> result = future.toCompletableFuture().join();
    assertEquals(1, result.size());
    assertEquals(result.get(0).getBody(), messageBody);
  }

  public void generateSourceData(String queueUrl, String messageBody) {
    client
        .sendMessage(
            SendMessageRequest.builder().queueUrl(queueUrl).messageBody(messageBody).build())
        .join();
  }

  private void initClient() throws URISyntaxException {
    System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
    AwsCredentials credentials = AwsBasicCredentials.create("somekey", "somevalue");
    StaticCredentialsProvider provider = StaticCredentialsProvider.create(credentials);

    client =
        SqsAsyncClient.builder()
            .region(Region.of(Localstack.getDefaultRegion()))
            .httpClient(AkkaHttpClient.builder().withActorSystem(ActorSystem.create()).build())
            .credentialsProvider(provider)
            .endpointOverride(new URI(Localstack.INSTANCE.getEndpointSQS()))
            .build();
  }

  protected String initSQS(String queueName) throws URISyntaxException {
    initClient();
    client
        .createQueue(CreateQueueRequest.builder().queueName(queueName).build())
        .join()
        .queueUrl();
    GetQueueUrlResponse response = client.getQueueUrl(GetQueueUrlRequest.builder().queueName(queueName).build()).join();
    System.out.println("Using queue " + response);
    log.info("Using queue {}", response);
    return response.queueUrl();
  }

When I enable debug, I see this error that repeats forever:

[info] [debug] s.a.a.c.i.ExecutionInterceptorChain - Creating an interceptor chain that will apply interceptors in the following order: [software.amazon.awssdk.awscore.interceptor.HelpfulUnknownHostExceptionInterceptor@41a58812, software.amazon.awssdk.services.sqs.internal.MessageMD5ChecksumInterceptor@4f626e56, software.amazon.awssdk.protocols.query.interceptor.QueryParametersToBodyInterceptor@4b5de362] [info] [debug] s.a.a.c.i.ExecutionInterceptorChain - Interceptor 'software.amazon.awssdk.protocols.query.interceptor.QueryParametersToBodyInterceptor@4b5de362' modified the message with its modifyHttpRequest method. [info] [debug] s.a.a.request - Sending Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=http, host=localhost, port=4566, encodedPath=, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent], queryParameters=[]) [info] [debug] s.a.a.a.s.Aws4Signer - AWS4 String to sign: AWS4-HMAC-SHA256 [info] 20201230T233655Z [info] 20201230/us-east-1/sqs/aws4_request [info] 5e95b0e67072e57434cb0da9516ef2ef4c747760bda87e9207161f9834d6dc01 [info] [debug] s.a.a.request - Received error response: 500 [info] [debug] s.a.a.request - Retryable error detected. Will retry in 47ms. Request attempt number 2 [info] [debug] s.a.a.request - Retrying Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=http, host=localhost, port=4566, encodedPath=, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent], queryParameters=[]) [info] [debug] s.a.a.a.s.Aws4Signer - AWS4 String to sign: AWS4-HMAC-SHA256 [info] 20201230T233655Z [info] 20201230/us-east-1/sqs/aws4_request [info] ebce46865d8fface363b83481672fa6a3b7a11f584f2ea7c1e2b56e381e33afc [info] [debug] s.a.a.request - Received error response: 500 [info] [debug] s.a.a.request - Retryable error detected. Will retry in 51ms. Request attempt number 3 [info] [debug] s.a.a.request - Retrying Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=http, host=localhost, port=4566, encodedPath=, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent], queryParameters=[]) [info] [debug] s.a.a.a.s.Aws4Signer - AWS4 String to sign: AWS4-HMAC-SHA256 [info] 20201230T233655Z [info] 20201230/us-east-1/sqs/aws4_request [info] 4085d30b4fa9cce89c435ed7a6404539a665bc2bfd4322fafd4095d2cb58fab1 [info] [debug] s.a.a.request - Received error response: 500 [info] [debug] s.a.a.request - Retryable error detected. Will retry in 230ms. Request attempt number 4 [info] [debug] s.a.a.request - Retrying Request: DefaultSdkHttpFullRequest(httpMethod=POST, protocol=http, host=localhost, port=4566, encodedPath=, headers=[amz-sdk-invocation-id, Content-Length, Content-Type, User-Agent], queryParameters=[]) [info] [debug] s.a.a.a.s.Aws4Signer - AWS4 String to sign: AWS4-HMAC-SHA256 [info] 20201230T233655Z [info] 20201230/us-east-1/sqs/aws4_request [info] 618037af1ce6008ce721fd3ca9cddde2e3f2893d50bd9ff5a241cc2061e1d67f [info] [debug] s.a.a.request - Received error response: 500 [info] [warn] a.s.s.RestartWithBackoffSource - Restarting graph due to failure. stack_trace: [info] java.util.concurrent.CompletionException: software.amazon.awssdk.services.sqs.model.SqsException: null (Service: Sqs, Status Code: 500, Request ID: null) [info] at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:60) [info] at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:51) [info] at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) [info] at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) [info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [info] at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) [info] at software.amazon.awssdk.utils.CompletableFutureUtils.lambda$forwardExceptionTo$0(CompletableFutureUtils.java:74) [info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) [info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [info] at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) [info] at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeAttemptExecute(AsyncRetryableStage.java:85) [info] at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.maybeRetryExecute(AsyncRetryableStage.java:144) [info] at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryingExecutor.lambda$attemptExecute$1(AsyncRetryableStage.java:133) [info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) [info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) [info] at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage.lambda$executeHttpRequest$1(MakeAsyncHttpRequestStage.java:167) [info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) [info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) [info] at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) [info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [info] at java.base/java.lang.Thread.run(Thread.java:834) [info] Caused by: software.amazon.awssdk.services.sqs.model.SqsException: null (Service: Sqs, Status Code: 500, Request ID: null) [info] at software.amazon.awssdk.services.sqs.model.SqsException$BuilderImpl.build(SqsException.java:95) [info] at software.amazon.awssdk.services.sqs.model.SqsException$BuilderImpl.build(SqsException.java:55) [info] at software.amazon.awssdk.protocols.query.internal.unmarshall.AwsXmlErrorUnmarshaller.unmarshall(AwsXmlErrorUnmarshaller.java:97) [info] at software.amazon.awssdk.protocols.query.unmarshall.AwsXmlErrorProtocolUnmarshaller.handle(AwsXmlErrorProtocolUnmarshaller.java:102) [info] at software.amazon.awssdk.protocols.query.unmarshall.AwsXmlErrorProtocolUnmarshaller.handle(AwsXmlErrorProtocolUnmarshaller.java:82) [info] at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler.lambda$prepare$0(AsyncResponseHandler.java:88) [info] at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) [info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) [info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) [info] at software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler$BaosSubscriber.onComplete(AsyncResponseHandler.java:129) [info] at akka.stream.impl.ReactiveStreamsCompliance$.tryOnComplete(ReactiveStreamsCompliance.scala:114) [info] at akka.stream.impl.fusing.ActorGraphInterpreter$ActorOutputBoundary.complete(ActorGraphInterpreter.scala:390) [info] at akka.stream.impl.fusing.ActorGraphInterpreter$ActorOutputBoundary.onUpstreamFinish(ActorGraphInterpreter.scala:416) [info] at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:523) [info] at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390) [info] at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625) [info] at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502) [info] at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600) [info] at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:769) [info] at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784) [info] at akka.actor.Actor.aroundReceive(Actor.scala:537) [info] at akka.actor.Actor.aroundReceive$(Actor.scala:535) [info] at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691) [info] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577) [info] at akka.actor.ActorCell.invoke(ActorCell.scala:547) [info] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [info] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [info] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [info] at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [info] at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020) [info] at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) [info] at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) [info] at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)



from Recent Questions - Stack Overflow https://ift.tt/3pzxrtu
https://ift.tt/eA8V8J

No comments:

Post a Comment