本节介绍如何发送消息。

Using KafkaTemplate

本节介绍如何使用KafkaTemplate发送消息。

Overview

KafkaTemplate封装了一个生产者,并提供了向Kafka主题发送数据的便利方法。以下列表显示了KafkaTemplate的相关方法:

CompletableFuture<SendResult<K, V>> sendDefault(V data);CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);CompletableFuture<SendResult<K, V>> send(String topic, V data);CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);CompletableFuture<SendResult<K, V>> send(Message<?> message);Map<MetricName, ? extends Metric> metrics();List<PartitionInfo> partitionsFor(String topic);<T> T execute(ProducerCallback<K, V, T> callback);<T> T executeInTransaction(OperationsCallback<K, V, T> callback);// Flush the producer.
void flush();interface ProducerCallback<K, V, T> {T doInKafka(Producer<K, V> producer);}interface OperationsCallback<K, V, T> {T doInOperations(KafkaOperations<K, V> operations);}

有关更多详细信息,请参阅Javadoc。

sendDefault API要求已向模板提供默认主题。

API将时间戳作为参数,并将此时间戳存储在记录中。如何存储用户提供的时间戳取决于Kafka主题上配置的时间戳类型。如果主题配置为使用CREATE_TIME,则记录用户指定的时间戳(如果未指定,则生成)。如果主题配置为使用LOG_APPEND_TIME,则用户指定的时间戳将被忽略,代理将添加本地代理时间。

度量和分区For方法委托给底层Producer上的相同方法。execute方法提供对底层Producer的直接访问。

要使用该模板,您可以配置生产者工厂并在模板的构造函数中提供它。以下示例显示了如何执行此操作:

@Bean
public ProducerFactory<Integer, String> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs());
}@Bean
public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);// See https://kafka.apache.org/documentation/#producerconfigs for more propertiesreturn props;
}@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {return new KafkaTemplate<Integer, String>(producerFactory());
}

从2.5版本开始,您现在可以覆盖工厂的ProducerConfig属性,从同一工厂创建具有不同生产者配置的模板。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {return new KafkaTemplate<>(pf);
}@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {return new KafkaTemplate<>(pf,Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

请注意,ProducerFactory类型的bean<?,?>(如Spring Boot自动配置的)可以用不同的狭义泛型类型引用。

您还可以使用标准的<bean/>定义来配置模板。

然后,要使用该模板,您可以调用它的一个方法。

当您将这些方法用于Message<?>参数,主题、分区、密钥和时间戳信息在消息头中提供,消息头包括以下项目:

卡夫卡标头。主题

卡夫卡标头。隔板

卡夫卡标头。钥匙

卡夫卡标头。时间戳

消息有效载荷是数据。

您可以选择使用ProducerListener配置KafkaTemplate,以获得包含发送结果(成功或失败)的异步回调,而不是等待Future完成。以下清单显示了ProducerListener接口的定义:

public interface ProducerListener<K, V> {default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {}default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {}}

默认情况下,模板配置了LoggingProductListener,它记录错误,在发送成功时不做任何事情。

为了方便起见,如果您只想实现其中一个方法,则提供默认方法实现。

请注意,send方法返回一个CompletableFuture<SendResult>。您可以向侦听器注册一个回调,以异步接收发送的结果。以下示例显示了如何执行此操作:

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {...
});

SendResult有两个属性,ProducerRecord和RecordMetadata。有关这些对象的信息,请参阅Kafka API文档。

Throwable可以转换为KafkaProducerException;其producerRecord属性包含失败的记录。

如果你想阻止发送线程等待结果,你可以调用future的get()方法;建议使用带超时的方法。如果您设置了linger.ms,您可能希望在等待之前调用flush(),或者为了方便起见,模板有一个带有autoFlush参数的构造函数,该参数使模板在每次发送时都会刷新()。只有当您设置了linger.ms生产者属性并希望立即发送部分批次时,才需要刷新。

Examples

本节展示了向Kafka发送消息的示例:

public void sendToKafka(final MyOutputData data) {final ProducerRecord<String, String> record = createRecord(data);CompletableFuture<SendResult<String, String>> future = template.send(record);future.whenComplete((result, ex) -> {if (ex == null) {handleSuccess(data);}else {handleFailure(data, record, ex);}});
}
public void sendToKafka(final MyOutputData data) {final ProducerRecord<String, String> record = createRecord(data);try {template.send(record).get(10, TimeUnit.SECONDS);handleSuccess(data);}catch (ExecutionException e) {handleFailure(data, record, e.getCause());}catch (TimeoutException | InterruptedException e) {handleFailure(data, record, e);}
}

请注意,ExecutionException的原因是具有producerRecord属性的KafkaProducerException。

Using RoutingKafkaTemplate

从2.5版本开始,您可以使用RoutingKafkaTemplate在运行时根据目标主题名称选择生产者。

路由模板不支持事务、执行、刷新或度量操作,因为这些操作的主题未知。

该模板需要java.util.regex的映射。ProducerFactory<Object,Object>实例的模式。这个映射应该是有序的(例如LinkedHashMap),因为它是按顺序遍历的;您应该在开始时添加更具体的模式。

以下简单的Spring Boot应用程序提供了一个示例,说明如何使用相同的模板发送到不同的主题,每个主题都使用不同的值序列化器。

@SpringBootApplication
public class Application {public static void main(String[] args) {SpringApplication.run(Application.class, args);}@Beanpublic RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,ProducerFactory<Object, Object> pf) {// Clone the PF with a different Serializer, register with Spring for shutdownMap<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();map.put(Pattern.compile("two"), bytesPF);map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializerreturn new RoutingKafkaTemplate(map);}@Beanpublic ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {return args -> {routingTemplate.send("one", "thing1");routingTemplate.send("two", "thing2".getBytes());};}}

此示例的相应@KafkaListeners显示在注释属性中。

有关实现类似结果的另一种技术,但具有向同一主题发送不同类型的附加功能,请参阅委派序列化程序和解序列化程序。

Using DefaultKafkaProducerFactory

如使用KafkaTemplate中所示,ProducerFactory用于创建生产者。

当不使用事务时,默认情况下,DefaultKafkaProducerFactory会创建一个供所有客户端使用的单例生产者,如KafkaProducierJavaDocs中所建议的。但是,如果在模板上调用flush(),这可能会导致使用同一生成器的其他线程延迟。从2.3版本开始,DefaultKafkaProducerFactory有一个新的属性producerPerThread。当设置为true时,工厂将为每个线程创建(并缓存)一个单独的生产者,以避免此问题。

当producerPerThread为true时,当不再需要生产者时,用户代码必须在工厂上调用closeThreadBoundProducer()。这将物理关闭生产者并将其从ThreadLocal中删除。调用reset()或destroy()不会清理这些生产者。

另请参阅KafkaTemplate事务性和非事务性发布。

创建DefaultKafkaProducerFactory时,可以通过调用仅接受属性映射的构造函数从配置中获取键和/或值序列化器类(请参阅Using KafkaTemplate中的示例),或者可以将序列化器实例传递给DefaultKafkaProducerFactory构造函数(在这种情况下,所有生产者共享相同的实例)。或者,您可以提供供应商<Serializer>(从2.3版本开始),用于为每个生产商获取单独的序列化程序实例:

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从2.5.10版本开始,您现在可以在创建工厂后更新生产者属性。例如,如果您必须在凭据更改后更新SSL密钥/信任存储位置,这可能很有用。这些更改不会影响现有的生产者实例;调用reset()关闭任何现有的生产者,以便使用新属性创建新的生产者。

您不能将事务生产者工厂更改为非事务生产者工厂,反之亦然。

现在提供了两种新方法:

void updateConfigs(Map<String, Object> updates);void removeConfig(String configKey);

从2.8版本开始,如果你提供序列化器作为对象(在构造函数中或通过setter),工厂将调用configure()方法来配置它们的配置属性。

Using ReplyingKafkaTemplate

2.1.3版本引入了KafkaTemplate的一个子类来提供请求/回复语义。该类名为ReplyingKafkaTemplate,有两个附加方法;下面显示了方法签名:

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,Duration replyTimeout);

(另请参阅带消息的请求/回复)。

结果是一个CompletableFuture,它异步填充了结果(或超时的异常)。结果还有一个sendFuture属性,这是调用KafkaTemplate.send()的结果。您可以使用此未来来确定发送操作的结果。

如果使用第一种方法,或者replyTimeout参数为null,则使用模板的defaultReplyTimeout属性(默认为5秒)。

从2.8.8版本开始,模板有一个新方法waitForAssignment。如果回复容器配置了auto.coffset.reset=latest,这将非常有用,以避免在容器初始化之前发送请求和回复。

使用手动分区分配(无组管理)时,等待时间必须大于容器的pollTimeout属性,因为在第一次轮询完成之前不会发送通知。

以下Spring Boot应用程序显示了如何使用该功能的示例:

@SpringBootApplication
public class KRequestingApplication {public static void main(String[] args) {SpringApplication.run(KRequestingApplication.class, args).close();}@Beanpublic ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {return args -> {if (!template.waitForAssignment(Duration.ofSeconds(10))) {throw new IllegalStateException("Reply container did not initialize");}ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);System.out.println("Sent ok: " + sendResult.getRecordMetadata());ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);System.out.println("Return value: " + consumerRecord.value());};}@Beanpublic ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf,ConcurrentMessageListenerContainer<String, String> repliesContainer) {return new ReplyingKafkaTemplate<>(pf, repliesContainer);}@Beanpublic ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {ConcurrentMessageListenerContainer<String, String> repliesContainer =containerFactory.createContainer("kReplies");repliesContainer.getContainerProperties().setGroupId("repliesGroup");repliesContainer.setAutoStartup(false);return repliesContainer;}@Beanpublic NewTopic kRequests() {return TopicBuilder.name("kRequests").partitions(10).replicas(2).build();}@Beanpublic NewTopic kReplies() {return TopicBuilder.name("kReplies").partitions(10).replicas(2).build();}}

请注意,我们可以使用Boot的自动配置容器工厂来创建回复容器。

如果将非平凡的反序列化器用于回复,请考虑使用ErrorHandlingDeserializer,该反序列化器委托给您配置的反序列化程序。如此配置后,RequestReplyFuture将异常完成,您可以捕获ExecutionException,并在其cause属性中包含反序列化Exception。

从2.6.7版本开始,除了检测反序列化异常外,模板还将调用replyErrorChecker函数(如果提供)。如果它返回异常,则未来将异常完成。

以下是一个示例:

template.setReplyErrorChecker(record -> {Header error = record.headers().lastHeader("serverSentAnError");if (error != null) {return new MyException(new String(error.value()));}else {return null;}
});...RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {future.getSendFuture().get(10, TimeUnit.SECONDS); // send okConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);...
}
catch (InterruptedException e) {...
}
catch (ExecutionException e) {if (e.getCause() instanceof MyException) {...}
}
catch (TimeoutException e) {...
}

模板设置了一个标头(默认情况下名为KafkaHeaders.CORRELATION_ID),该标头必须由服务器端回显。

在这种情况下,以下@KafkaListener应用程序会响应:

@SpringBootApplication
public class KReplyingApplication {public static void main(String[] args) {SpringApplication.run(KReplyingApplication.class, args);}@KafkaListener(id="server", topics = "kRequests")@SendTo // use default replyTo expressionpublic String listen(String in) {System.out.println("Server received: " + in);return in.toUpperCase();}@Beanpublic NewTopic kRequests() {return TopicBuilder.name("kRequests").partitions(10).replicas(2).build();}@Bean // not required if Jackson is on the classpathpublic MessagingMessageConverter simpleMapperConverter() {MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());return messagingMessageConverter;}}

@KafkaListener基础结构响应关联ID并确定回复主题。

有关发送回复的更多信息,请参阅使用@SendTo转发侦听器结果。该模板使用默认标头KafKaHeaders。REPLY_TOPIC表示回复所针对的主题。

从2.2版本开始,模板尝试从配置的回复容器中检测回复主题或分区。如果容器配置为监听单个主题或单个TopicPartitionOffset,则用于设置回复标头。如果容器配置为其他方式,则用户必须设置回复标头。在这种情况下,初始化期间会写入INFO日志消息。以下示例使用KafkaHeaders。REPLY_TOPIC:

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

当您使用单个回复TopicPartitionOffset进行配置时,您可以对多个模板使用相同的回复主题,只要每个实例都在不同的分区上侦听。当配置单个回复主题时,每个实例必须使用不同的group.id。在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例才能找到相关id。这可能有助于自动扩展,但会产生额外的网络流量开销,丢弃每个不需要的回复的成本很低。使用此设置时,我们建议您将模板的sharedReplyTopic设置为true,这会降低对DEBUG的意外回复的日志记录级别,而不是默认的ERROR。

以下是一个配置回复容器以使用相同共享回复主题的示例:

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // uniqueProperties props = new Properties();props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old repliescontainer.getContainerProperties().setKafkaConsumerProperties(props);return container;
}

如果您有多个客户端实例,并且没有如前一段所述对其进行配置,则每个实例都需要一个专门的回复主题。另一种方法是设置KafkaHeaders。REPLY_PARTITION并为每个实例使用专用分区。Header包含一个四字节的int(大端序)。服务器必须使用此标头将回复路由到正确的分区(@KafkaListener执行此操作)。然而,在这种情况下,回复容器不得使用Kafka的组管理功能,并且必须配置为在固定分区上侦听(通过在其ContainerProperties构造函数中使用TopicPartitionOffset)。

DefaultKafkaHeaderMapper要求Jackson在类路径上(对于@KafkaListener)。如果它不可用,则消息转换器没有标头映射器,因此您必须使用SimpleKafkaHeaderMapper配置MessagingMessageConverter,如前所示。

默认情况下,使用3个标头:

卡夫卡标头。CORRELATION_ID-用于将回复与请求相关联

卡夫卡标头。REPLY_TOPIC-用于告诉服务器在哪里回复

卡夫卡标头。REPLY_PARTITION-(可选)用于告诉服务器要回复哪个分区

@KafkaListener基础结构使用这些标头名称来路由回复。

从2.3版本开始,您可以自定义标头名称-该模板有3个属性,分别是HeaderName、replyTopicHeaderName和replyPartitionHeaderName。如果您的服务器不是Spring应用程序(或不使用@KafkaListener),这很有用。

相反,如果请求的应用程序不是spring应用程序,并且将相关性信息放在不同的标头中,从3.0版本开始,您可以在侦听器容器工厂上配置自定义correlationHeaderName,该标头将被回显。以前,侦听器必须回显自定义相关性标头。

Request/Reply with Message<?>s

2.7版本在ReplyingKafkaTemplate中添加了发送和接收春季消息的方法?>抽象:

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,ParameterizedTypeReference<P> returnType);

这些将使用模板的默认replyTimeout,也有重载版本可以在方法调用中超时。

如果消费者的反序列化器或模板的MessageConverter可以通过配置或回复消息中的类型元数据转换有效载荷,而无需任何额外信息,则使用第一种方法。

如果需要为返回类型提供类型信息,请使用第二种方法来帮助消息转换器。这也允许同一模板接收不同的类型,即使回复中没有类型元数据,例如当服务器端不是Spring应用程序时。以下是后者的一个例子:

@Bean
ReplyingKafkaTemplate<String, String, String> template(ProducerFactory<String, String> pf,ConcurrentKafkaListenerContainerFactory<String, String> factory) {ConcurrentMessageListenerContainer<String, String> replyContainer =factory.createContainer("replies");replyContainer.getContainerProperties().setGroupId("request.replies");ReplyingKafkaTemplate<String, String, String> template =new ReplyingKafkaTemplate<>(pf, replyContainer);template.setMessageConverter(new ByteArrayJsonMessageConverter());template.setDefaultTopic("requests");return template;
}
RequestReplyTypedMessageFuture<String, String, Thing> future1 =template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));

Reply Type Message<?>

当@KafkaListener返回消息时<?>对于2.5之前的版本,有必要填充回复主题和相关性id标头。在这个例子中,我们使用请求中的回复主题头:

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {return MessageBuilder.withPayload(in.toUpperCase()).setHeader(KafkaHeaders.TOPIC, replyTo).setHeader(KafkaHeaders.KEY, 42).setHeader(KafkaHeaders.CORRELATION_ID, correlation).build();
}

这也显示了如何在回复记录上设置密钥。

从2.5版本开始,框架将检测这些标头是否丢失,并用主题填充它们——要么是根据@SendTo值确定的主题,要么是传入的KafkaHeaders。REPLY_TOPIC标头(如果存在)。它还将对传入的KafkaHeaders进行回声处理。CORRELATION_ID和KafkaHeaders。REPLY_PARTITION(如果存在)。

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {return MessageBuilder.withPayload(in.toUpperCase()).setHeader(KafkaHeaders.KEY, 42).build();
}

Original Record Key in Reply

从3.3版本开始,传入请求中的Kafka记录键(如果存在)将保留在回复记录中。这仅适用于单记录请求/回复场景。当侦听器是批处理或返回类型是集合时,由应用程序通过将回复记录包装在消息类型中来指定要使用的键。

Aggregating Multiple Replies

Using ReplyingKafkaTemplate中的模板仅适用于单个请求/回复场景。对于单个消息的多个接收者返回回复的情况,您可以使用AggregatingReplyingKafka template。这是Scatter-Garger企业集成模式客户端的实现。

与ReplyingKafkaTemplate一样,AggregatingReplyingKafka Template构造函数需要一个生产者工厂和一个监听器容器来接收回复;它有第三个参数BiPredicate<List<ConsumerRecord<K,R>,Boolean>releaseStrategy,每次收到回复时都会参考该参数;当谓词返回true时,ConsumerRecords的集合用于完成sendAndReceive方法返回的Future。

还有一个额外的属性returnPartialOnTimeout(默认值为false)。当此设置为true时,不使用KafkaReplyTimeoutException完成未来,而是使用部分结果正常完成未来(只要至少收到一条回复记录)。

从2.3.5版本开始,谓词也会在超时后调用(如果returnPartialOnTimeout为true)。第一个参数是当前记录列表;如果此调用是由于超时引起的,则第二个为真。谓词可以修改记录列表。

AggregatingReplyingKafkaTemplate<Integer, String, String> template =new AggregatingReplyingKafkaTemplate<>(producerFactory, container,coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =future.get(30, TimeUnit.SECONDS);

请注意,返回类型是ConsumerRecord,其值是ConsumerRecords的集合。“外部”ConsumerRecord不是“真实”记录,它是由模板合成的,作为请求收到的实际回复记录的持有者。当正常发布发生时(发布策略返回true),主题设置为aggregatedResults;如果returnPartialOnTimeout为true,并且发生超时(并且至少收到一条回复记录),则主题设置为partialResultsAfterTimeout。该模板为这些“主题”名称提供了恒定的静态变量:

/*** Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated* results in its value after a normal release by the release strategy.*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";/*** Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated* results in its value after a timeout.*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

集合中的真实ConsumerRecords包含收到回复的实际主题。

答复的侦听器容器必须配置为AckMode。手动或确认模式。手动即时;消费者属性enable.auto.commit必须为false(自2.3版本以来的默认值)。为了避免丢失消息的任何可能性,模板仅在有零个未完成的请求时提交偏移量,即当发布策略释放最后一个未完成请求时。重新平衡后,可能会出现重复的回复交付;对于任何飞行中的请求,这些都将被忽略;当收到已发布回复的重复回复时,您可能会看到错误日志消息。

如果将ErrorHandlingDeserializer与此聚合模板一起使用,则框架将不会自动检测到反序列化异常。相反,记录(具有null值)将原封不动地返回,头中有反序列化异常。建议应用程序调用实用程序方法ReplyingKafkaTemplate.checkDeserialize()方法,以确定是否发生了反序列化异常。有关更多信息,请参阅其JavaDocs。此聚合模板也不需要replyErrorChecker;您应该对回复的每个元素进行检查。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/88202.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/88202.shtml
英文地址,请注明出处:http://en.pswp.cn/bicheng/88202.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS长度单位问题

在 CSS 中&#xff0c;100px 的逻辑长度在不同分辨率的手机屏幕上是否表现一致&#xff0c;取决于 设备的像素密度&#xff08;devicePixelRatio&#xff09; 和 视口&#xff08;viewport&#xff09;的缩放设置。以下是详细分析&#xff1a;1. 核心概念 CSS 像素&#xff08;…

基于Java+SpringBoot的图书管理系统

源码编号&#xff1a;S606源码名称&#xff1a;基于SpringBoot的图书管理系统用户类型&#xff1a;双角色&#xff0c;用户、管理员数据库表数量&#xff1a;12 张表主要技术&#xff1a;Java、Vue、ElementUl 、SpringBoot、Maven运行环境&#xff1a;Windows/Mac、JDK1.8及以…

XTOM工业级蓝光三维扫描仪用于笔记本电脑背板模具全尺寸检测

镁合金具有密度小、强度高、耐腐蚀性好等优点&#xff0c;成为笔记本电脑外壳主流材料。冲压模具作为批量生产笔记本电脑镁合金背板的核心工具&#xff0c;其精度直接决定了产品的尺寸一致性、结构可靠性与外观品质。微米级模具误差可能在冲压过程中被放大至毫米级&#xff08;…

运维打铁: MongoDB 数据库集群搭建与管理

文章目录思维导图一、集群基础概念1. 分片集群2. 副本集二、集群搭建1. 环境准备2. 配置副本集步骤 1&#xff1a;修改配置文件步骤 2&#xff1a;启动 MongoDB 服务步骤 3&#xff1a;初始化副本集3. 配置分片集群步骤 1&#xff1a;配置配置服务器副本集步骤 2&#xff1a;启…

HCIP-Datacom Core Technology V1.0_5 OSPF特殊区域及其他特性

在前面的章节中&#xff0c;OSPF可以划分区域&#xff0c;减轻单区域里面LSDB的规模&#xff0c;从而减轻路由器的负荷&#xff0c;虽然OSPF能够划分区域&#xff0c;但是依旧需要维护域间路由和外部路由&#xff0c;这样随着网络规模的不断扩大&#xff0c;路由器所维护的LSDB…

实时开发IDE部署指南

&#x1f525;&#x1f525; AllData大数据产品是可定义数据中台&#xff0c;以数据平台为底座&#xff0c;以数据中台为桥梁&#xff0c;以机器学习平台为中层框架&#xff0c;以大模型应用为上游产品&#xff0c;提供全链路数字化解决方案。 ✨杭州奥零数据科技官网&#xff…

深入解析 RAGFlow:文件上传到知识库的完整流程

在 RAGFlow 这样的检索增强生成&#xff08;RAG&#xff09;系统中&#xff0c;知识库是其核心。用户上传的文档如何高效、可靠地转化为可检索的知识&#xff0c;是系统稳定运行的关键。今天&#xff0c;我们就来深入探讨 RAGFlow 中文件上传到知识库的完整流程&#xff0c;揭秘…

cad_recognition 笔记

Hubch/cad_recognition | DeepWiki https://github.com/Hubch/cad_recognition winget install python.python.3.10 python -m venv venv micromamba activate ./venv pip install paddleocr2.9.0 pip install poetry pip install moviepy1.0.3 下次要用conda建环境 或者…

基于odoo17的设计模式详解---构建模式

大家好&#xff0c;我是你的Odoo技术伙伴。在Odoo开发中&#xff0c;创建一个简单的记录可能只需要一行 self.env[res.partner].create({name: New Partner})。但如果我们要创建一个复杂的对象&#xff0c;比如一个包含了特定上下文、具有多个可选配置、并且需要执行一系列关联…

暑假算法日记第四天

目标​&#xff1a;刷完灵神专题训练算法题单 阶段目标&#x1f4cc;&#xff1a;【算法题单】滑动窗口与双指针 LeetCode题目:2953. 统计完全子字符串1016. 子串能表示从 1 到 N 数字的二进制串其他: 今日总结 往期打卡 2953. 统计完全子字符串 跳转: 2953. 统计完全子字符串…

Linux 常用命令大全(2025简明版)

&#x1f9ed; 一、文件和目录操作命令说明ls列出目录内容ls -l以列表形式显示&#xff08;含权限&#xff09;cd /path切换目录pwd显示当前路径mkdir dir创建目录mkdir -p dir/subdir递归创建目录rm file删除文件rm -r dir删除目录&#xff08;递归&#xff09;rm -rf dir强制…

React Ref 指南:原理、实现与实践

前言 React Ref&#xff08;引用&#xff09;是React中一个强大而重要的概念&#xff0c;它为我们提供了直接访问DOM元素或组件实例的能力。虽然React推崇声明式编程和数据驱动的理念&#xff0c;但在某些场景下&#xff0c;我们仍需要直接操作DOM或访问组件实例。本文将深入探…

4.权重衰减(weight decay)

4.1 手动实现权重衰减 import torch from torch import nn from torch.utils.data import TensorDataset,DataLoader import matplotlib.pyplot as plt def synthetic_data(w,b,num_inputs):Xtorch.normal(0,1,size(num_inputs,w.shape[0]))yXwbytorch.normal(0,0.1,sizey.shap…

OpenCV开发-初始概念

第一章 OpenCV核心架构解析1.1 计算机视觉的基石OpenCV&#xff08;Open Source Computer Vision Library&#xff09;作为跨平台计算机视觉库&#xff0c;自1999年由Intel发起&#xff0c;已成为图像处理领域的标准工具。其核心价值体现在&#xff1a;跨平台性&#xff1a;支持…

LeetCode 930.和相同的二元子数组

给你一个二元数组 nums &#xff0c;和一个整数 goal &#xff0c;请你统计并返回有多少个和为 goal 的 非空 子数组。 子数组 是数组的一段连续部分。 示例 1&#xff1a; 输入&#xff1a;nums [1,0,1,0,1], goal 2 输出&#xff1a;4 解释&#xff1a; 有 4 个满足题目要求…

【论文解读】Referring Camouflaged Object Detection

论文信息 论文题目&#xff1a;Referring Camouflaged Object Detection 论文链接&#xff1a;https://arxiv.org/pdf/2306.07532 代码链接&#xff1a;https://github.com/zhangxuying1004/RefCOD 录用期刊&#xff1a;TPAMI 2025 论文单位&#xff1a;南开大学 ps&#xff1a…

Spring中过滤器和拦截器的区别及具体实现

在 Spring 框架中&#xff0c;过滤器&#xff08;Filter&#xff09; 和 拦截器&#xff08;Interceptor&#xff09; 都是用于处理 HTTP 请求的中间件&#xff0c;但它们在作用范围、实现方式和生命周期上有显著区别。以下是详细对比和实现方式&#xff1a;核心区别特性过滤器…

CANFD 数据记录仪在新能源汽车售后维修中的应用

一、前言随着新能源汽车市场如火如荼和新能源汽车电子系统的日益复杂&#xff0c;传统维修手段在面对复杂和偶发故障时往往捉襟见肘&#xff0c;CANFD 数据记录仪则凭借其独特优势&#xff0c;为售后维修带来新的解决方案。二、 详细介绍在新能源汽车领域&#xff0c;CANFD 数据…

某当CRM XlsFileUpload存在任意文件上传(CNVD-2025-10982)

免责声明 本文档所述漏洞详情及复现方法仅限用于合法授权的安全研究和学术教育用途。任何个人或组织不得利用本文内容从事未经许可的渗透测试、网络攻击或其他违法行为。使用者应确保其行为符合相关法律法规,并取得目标系统的明确授权。 前言: 我们建立了一个更多,更全的…

自然语言处理与实践

文章目录Lesson1&#xff1a;Introduction to NLP、NLP 基础与文本预处理1.教材2.自然语言处理概述(1)NLP 的定义、发展历程与应用场景(2)NLP 的主要任务&#xff1a;分词、词性标注、命名实体识别、句法分析等2.文本预处理3.文本表示方法&#xff1a;词向量表示/词表征Lesson2…