Всем привет.
Использую Camel для роутинга сообщений между очередями в SQS(amazon)
Мой роут конфиг:
1.
2.
3.
4.
5.
from("aws-sqs://test-camel-start?amazonSQSClient=#sqsClient&concurrentConsumers=2&maxMessagesPerPoll=1")
.unmarshal()
.base64()
.process(new BaseProcessor())
.to("aws-sqs://test-camel-success?amazonSQSClient=#sqsClient").end();
А это простой процесор для обработки сообщения из очереди:
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
String header = (String) exchange.getIn().getHeader("CamelAwsSqsMessageId");
String message = exchange.getIn().getBody(String.class);
MDC.put("message-id", header);
Thread.sleep(5 * 1000);
LOG.info("Async ping");
LOG.info(message);
} catch (Throwable e) {
LOG.error(e.getMessage(), e);
exchange.setException(e);
}
callback.done(false);
return false;
}
Так вот, Camel успешно читает сообщение с очереди test-camel-start, обрабатывает его и дает ошибку при попытке доставить сообщение в test-camel-success:
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
40.
41.
42.
43.
44.
45.
46.
47.
48.
49.
50.
51.
52.
53.
54.
637435 [Camel (SavvyMoneyCamelSQS) thread #2 - aws-sqs://test-camel-start] ERROR o.a.c.processor.DefaultErrorHandler cr.id=18b85e94-bc2f-44dd-baef-4bcd4fb79e0c - Failed delivery for (MessageId: ID-DESKTOP-5SBC4FA-1522239559145-0-6 on ExchangeId: ID-DESKTOP-5SBC4FA-1522239559145-0-5). Exhausted after delivery attempt: 1 caught: com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageBody. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: 85604fc2-0724-5d16-af6b-1cf78a24e8b0)
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [aws-sqs://test-camel-start?amazonSQSClient=%23sqsClient&concurrentConsumers=2&] [ 7591]
[route1 ] [unmarshal1 ] [unmarshal[org.apache.camel.model.dataformat.Base64DataFormat@237080aa] ] [ 1]
[route1 ] [process1 ] [Processor@0x5007bde9 ] [ 7417]
[route1 ] [to1 ] [aws-sqs://test-camel-success?amazonSQSClient=#sqsClient ] [ 172]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
com.amazonaws.services.sqs.model.AmazonSQSException: The request must contain the parameter MessageBody. (Service: AmazonSQS; Status Code: 400; Error Code: MissingParameter; Request ID: 85604fc2-0724-5d16-af6b-1cf78a24e8b0)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1638)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1303)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1055)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
at com.amazonaws.services.sqs.AmazonSQSClient.doInvoke(AmazonSQSClient.java:2013)
at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:1989)
at com.amazonaws.services.sqs.AmazonSQSClient.executeSendMessage(AmazonSQSClient.java:1594)
at com.amazonaws.services.sqs.AmazonSQSClient.sendMessage(AmazonSQSClient.java:1571)
at org.apache.camel.component.aws.sqs.SqsProducer.process(SqsProducer.java:62)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)
at org.apache.camel.processor.Pipeline$1.done(Pipeline.java:157)
at org.apache.camel.processor.CamelInternalProcessor$InternalCallback.done(CamelInternalProcessor.java:262)
at org.apache.camel.processor.RedeliveryErrorHandler$2.done(RedeliveryErrorHandler.java:560)
at com.home.camel_poligon.BaseProcessor.process(BaseProcessor.java:35)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:110)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101)
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201)
at org.apache.camel.component.aws.sqs.SqsConsumer.processBatch(SqsConsumer.java:206)
at org.apache.camel.component.aws.sqs.SqsConsumer.poll(SqsConsumer.java:111)
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:174)
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:101)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Как видно в логах отсутствует MessageBody.