Skip to content

Commit

Permalink
Merge pull request #1367 from bosch-io/bugfix/IlleaglArgumentExceptio…
Browse files Browse the repository at this point in the history
…n_in_OutboundMappingProcessorActor

Fixed illegal argument exception in outbound mapping processor actor
  • Loading branch information
thjaeckle committed Apr 14, 2022
2 parents 5215fb8 + 9ac512e commit 9f2714f
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ List<MappingOutcome<OutboundSignal.Mapped>> process(final OutboundSignal outboun
.stream()
.map(e -> OutboundSignalFactory.newMappableOutboundSignal(outboundSignal.getSource(), e.getValue(),
e.getKey()))
.collect(Collectors.toList());
.toList();
}
return processMappableSignals(outboundSignal, mappableSignals);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ public static void issueWeakAcknowledgements(final Signal<?> signal,
final List<AcknowledgementLabel> weakAckLabels = requestedAcks.stream()
.map(AcknowledgementRequest::getLabel)
.filter(isWeakAckLabel)
.collect(Collectors.toList());
.toList();
if (!weakAckLabels.isEmpty()) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final List<Acknowledgement> ackList = weakAckLabels.stream()
.map(label -> weakAck(label, entityIdWithType.get(), dittoHeaders))
.collect(Collectors.toList());
.toList();
final Acknowledgements weakAcks = Acknowledgements.of(ackList, dittoHeaders);
sender.tell(weakAcks, ActorRef.noSender());
}
Expand All @@ -219,12 +219,12 @@ private static void issueFailedAcknowledgements(final Signal<?> signal,
final List<AcknowledgementLabel> failedAckLabels = requestedAcks.stream()
.map(AcknowledgementRequest::getLabel)
.filter(isFailedAckLabel)
.collect(Collectors.toList());
.toList();
if (!failedAckLabels.isEmpty()) {
final DittoHeaders dittoHeaders = signal.getDittoHeaders();
final List<Acknowledgement> ackList = failedAckLabels.stream()
.map(label -> failedAck(label, entityIdWithType.get(), dittoHeaders, dre))
.collect(Collectors.toList());
.toList();
final Acknowledgements failedAcks = Acknowledgements.of(ackList, dittoHeaders);
sender.tell(failedAcks, ActorRef.noSender());
}
Expand Down Expand Up @@ -312,9 +312,9 @@ private Object mapDittoRuntimeException(final DittoRuntimeException exception) {

@Override
protected OutboundSignalWithSender mapMessage(final OutboundSignal message) {
if (message instanceof OutboundSignalWithSender) {
if (message instanceof OutboundSignalWithSender outboundSignalWithSender) {
// message contains original sender already
return (OutboundSignalWithSender) message;
return outboundSignalWithSender;
} else {
return OutboundSignalWithSender.of(message, getSender());
}
Expand Down Expand Up @@ -382,8 +382,7 @@ private static Flow<OutboundSignalWithSender, Pair<OutboundSignalWithSender, Fil
Collections.singletonList(targetAndSelector.first())),
targetAndSelector.second()));

return Stream.concat(outboundSignalWithoutExtraFields, outboundSignalWithExtraFields)
.collect(Collectors.toList());
return Stream.concat(outboundSignalWithoutExtraFields, outboundSignalWithExtraFields).toList();
});
}

Expand Down Expand Up @@ -599,8 +598,7 @@ private Object handleSignal(final Signal<?> signal, final ActorRef sender) {
return Source.empty();
})
.onError((mapperId, exception, topicPath, unused) -> {
if (exception instanceof DittoRuntimeException) {
final DittoRuntimeException e = (DittoRuntimeException) exception;
if (exception instanceof DittoRuntimeException e) {
monitorsForOther.forEach(monitor ->
monitor.getLogger().failure(infoProvider, e));
logger.withCorrelationId(e)
Expand Down Expand Up @@ -675,7 +673,7 @@ private <T> CompletionStage<Collection<OutboundSignal.MultiMapped>> toMultiMappe
final List<Target> targetsToPublishAt = outboundSignals.stream()
.map(OutboundSignal::getTargets)
.flatMap(List::stream)
.collect(Collectors.toList());
.toList();
final Predicate<AcknowledgementLabel> willPublish =
ConnectionValidator.getTargetIssuedAcknowledgementLabels(connection.getId(),
targetsToPublishAt)
Expand All @@ -684,10 +682,13 @@ private <T> CompletionStage<Collection<OutboundSignal.MultiMapped>> toMultiMappe
filterFailedEnrichments(outboundSignals, willPublish);
final List<Mapped> mappedSignals = signalsWithoutEnrichmentFailures
.map(OutboundSignalWithSender::asMapped)
.collect(Collectors.toList());
.toList();
issueWeakAcknowledgements(outbound.getSource(),
willPublish.negate().and(outboundMappingProcessor::isTargetIssuedAck),
sender);
if (mappedSignals.isEmpty()) {
return List.of();
}
return List.of(OutboundSignalFactory.newMultiMappedOutboundSignal(mappedSignals, sender));
}
});
Expand Down

0 comments on commit 9f2714f

Please sign in to comment.