H

ashMap是Java集合框架中最常用的数据结构之一,它提供了高效的键值对存储和检索功能。在Java8中,HashMap引入了一系列新的原子性更新方法,包括compute()computeIfAbsent()computeIfPresent()等,这些方法极大地简化了在Map中进行复杂更新操作的代码。本文将详细介绍这些方法,包括它们的用法、示例和实际应用场景,并特别探讨它们在Kafka Stream数据处理中的实际应用。

在这里插入图片描述

1. compute()方法

方法签名

default V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)

功能说明

compute()方法用于根据指定的键和其当前映射值(如果没有当前映射值则为null)计算一个新的映射值。这个方法是原子性的,意味着在多线程环境下可以安全使用。

参数

  • key: 要计算的键
  • remappingFunction: 接受键和当前值作为参数,返回新值的函数

返回值

  • 返回与键关联的新值,如果没有值与键关联(且remappingFunction返回null),则返回null

示例

import java.util.HashMap;
import java.util.Map;public class ComputeExample {public static void main(String[] args) {Map<String, Integer> map = new HashMap<>();map.put("apple", 1);map.put("banana", 2);// 使用compute方法增加apple的数量map.compute("apple", (k, v) -> v + 1);System.out.println(map); // 输出: {apple=2, banana=2}// 对不存在的键使用compute方法map.compute("orange", (k, v) -> v == null ? 1 : v + 1);System.out.println(map); // 输出: {apple=2, banana=2, orange=1}// 使用compute方法删除条目(返回null)map.compute("banana", (k, v) -> null);System.out.println(map); // 输出: {apple=2, orange=1}}
}

用途

  • 当需要基于当前值计算新值时(如计数器增加)
  • 当需要根据键和当前值决定是否保留、更新或删除条目时
  • 替代传统的"检查是否存在,然后put"模式

2. computeIfAbsent()方法

方法签名

default V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction)

功能说明

computeIfAbsent()方法仅在指定的键尚未与值关联(或映射为null)时计算一个新值并将其放入Map中。

参数

  • key: 要检查的键
  • mappingFunction: 接受键作为参数,返回新值的函数

返回值

  • 返回与键关联的当前(现有或计算的)值,如果没有值与键关联,则返回null

示例

import java.util.HashMap;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;public class ComputeIfAbsentExample {public static void main(String[] args) {Map<String, List<String>> map = new HashMap<>();// 使用computeIfAbsent初始化列表map.computeIfAbsent("fruits", k -> new ArrayList<>()).add("apple");map.computeIfAbsent("fruits", k -> new ArrayList<>()).add("banana");map.computeIfAbsent("vegetables", k -> new ArrayList<>()).add("carrot");System.out.println(map); // 输出: {fruits=[apple, banana], vegetables=[carrot]}// 对已存在的键不会重新计算List<String> fruits = map.computeIfAbsent("fruits", k -> new ArrayList<>());fruits.add("orange");System.out.println(map); // 输出: {fruits=[apple, banana, orange], vegetables=[carrot]}}
}

用途

  • 延迟初始化(如上面的列表示例)
  • 缓存实现(当需要时才计算值)
  • 避免重复计算相同的键

3. computeIfPresent()方法

方法签名

default V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)

功能说明

computeIfPresent()方法仅在指定的键已与值关联时计算一个新值并将其放入Map中。

参数

  • key: 要检查的键
  • remappingFunction: 接受键和当前值作为参数,返回新值的函数

返回值

  • 返回与键关联的新值,如果没有值与键关联,则返回null

示例

import java.util.HashMap;
import java.util.Map;public class ComputeIfPresentExample {public static void main(String[] args) {Map<String, Integer> map = new HashMap<>();map.put("apple", 1);map.put("banana", 2);// 使用computeIfPresent增加apple的数量map.computeIfPresent("apple", (k, v) -> v + 1);System.out.println(map); // 输出: {apple=2, banana=2}// 对不存在的键使用computeIfPresent不会有任何效果map.computeIfPresent("orange", (k, v) -> v + 1);System.out.println(map); // 输出: {apple=2, banana=2}// 使用computeIfPresent删除条目(返回null)map.computeIfPresent("banana", (k, v) -> null);System.out.println(map); // 输出: {apple=2}}
}

用途

  • 当需要基于现有值更新值时(如计数器增加)
  • 当需要根据条件删除条目时
  • 替代传统的"检查是否存在,然后更新"模式

4. merge()方法

虽然不是严格意义上的compute方法,但merge()方法与这些方法功能相似,也值得介绍。

方法签名

default V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction)

功能说明

merge()方法将指定的值与键的当前值(如果存在)合并,使用提供的合并函数。如果键没有当前映射,则直接将键与指定值关联。

参数

  • key: 要合并的键
  • value: 要合并的值
  • remappingFunction: 接受当前值和指定值作为参数,返回合并后的值的函数

返回值

  • 返回与键关联的新值,如果没有值与键关联,则返回指定的值

示例

import java.util.HashMap;
import java.util.Map;public class MergeExample {public static void main(String[] args) {Map<String, Integer> map = new HashMap<>();map.put("apple", 1);map.put("banana", 2);// 使用merge方法增加apple的数量map.merge("apple", 1, (oldValue, newValue) -> oldValue + newValue);System.out.println(map); // 输出: {apple=2, banana=2}// 对不存在的键使用merge方法直接添加map.merge("orange", 3, (oldValue, newValue) -> oldValue + newValue);System.out.println(map); // 输出: {apple=2, banana=2, orange=3}// 使用merge方法删除条目(合并函数返回null)map.merge("banana", 1, (oldValue, newValue) -> null);System.out.println(map); // 输出: {apple=2, orange=3}}
}

用途

  • 合并两个值(如计数器累加)
  • 当需要基于现有值和新值计算新值时
  • 替代传统的"检查是否存在,然后合并"模式

5. 方法对比

方法触发条件参数典型用途
compute()总是执行键和BiFunction(键,当前值→新值)基于键和当前值计算新值
computeIfAbsent()键不存在或值为null键和Function(键→新值)延迟初始化,避免重复计算
computeIfPresent()键存在且值不为null键和BiFunction(键,当前值→新值)基于现有值更新值
merge()总是执行键、值和BiFunction(当前值,新值→合并值)合并两个值

6. 实际应用场景

6.1 缓存实现

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;public class CacheExample {private final Map<String, String> cache = new HashMap<>();public String get(String key, Function<String, String> loader) {return cache.computeIfAbsent(key, loader);}public static void main(String[] args) {CacheExample cache = new CacheExample();String value = cache.get("data", key -> {// 模拟从数据库加载数据System.out.println("Loading data for " + key);return "Value for " + key;});System.out.println(value);// 再次获取相同key不会重新加载value = cache.get("data", key -> {System.out.println("This won't be printed");return "New value";});System.out.println(value);}
}

6.2 计数器

import java.util.HashMap;
import java.util.Map;public class CounterExample {public static void main(String[] args) {Map<String, Integer> wordCounts = new HashMap<>();String[] words = {"apple", "banana", "apple", "orange", "banana", "apple"};for (String word : words) {wordCounts.merge(word, 1, Integer::sum);}System.out.println(wordCounts); // 输出: {orange=1, banana=2, apple=3}}
}

6.3 配置合并

import java.util.HashMap;
import java.util.Map;public class ConfigMergeExample {public static void main(String[] args) {Map<String, String> defaultConfig = new HashMap<>();defaultConfig.put("timeout", "1000");defaultConfig.put("retries", "3");Map<String, String> userConfig = new HashMap<>();userConfig.put("timeout", "2000");// 合并配置,用户配置优先userConfig.forEach((key, value) -> defaultConfig.merge(key, value, (oldVal, newVal) -> newVal));System.out.println(defaultConfig); // 输出: {timeout=2000, retries=3}}
}

7. Kafka Stream中的HashMap compute方法应用

Kafka Stream是一个用于构建流处理应用的Java库,它提供了高级抽象来处理数据流。在Kafka Stream应用中,我们经常需要维护状态(如计数器、聚合结果等),而HashMap及其compute方法家族非常适合这种场景。

7.1 Kafka Stream状态存储基础

Kafka Stream提供了KeyValueStore接口用于状态存储,但底层实现通常基于HashMap或其他高效的数据结构。当我们需要在Kafka Stream应用中维护自定义状态时,compute方法家族可以发挥巨大作用。

7.2 实时计数器示例

假设我们有一个Kafka Stream应用,需要统计每个产品的购买次数:

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import java.util.HashMap;
import java.util.Map;public class ProductCounterProcessor implements Processor<String, String, String, Long> {private final Map<String, Long> productCounts = new HashMap<>();@Overridepublic void init(ProcessorContext<String, Long> context) {// 初始化代码}@Overridepublic void process(Record<String, String> record) {String productId = record.key();// 使用compute方法原子性地增加计数器productCounts.compute(productId, (k, v) -> v == null ? 1L : v + 1L);// 可以定期将状态写入Kafka状态存储或发送到下游// 这里简化处理,直接转发结果context.forward(new Record<>(productId, productCounts.get(productId), record.timestamp()));}@Overridepublic void close() {// 清理代码}
}

在这个例子中,compute()方法确保了即使在高并发环境下,计数器也能正确更新,避免了传统的"检查-然后-更新"模式可能导致的竞态条件。

7.3 会话窗口聚合

在Kafka Stream中处理会话窗口时,我们经常需要维护会话状态。computeIfPresent()方法非常适合这种场景:

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import java.util.HashMap;
import java.util.Map;public class SessionAggregatorProcessor implements Processor<String, UserEvent, String, SessionSummary> {private final Map<String, SessionSummary> activeSessions = new HashMap<>();@Overridepublic void init(ProcessorContext<String, SessionSummary> context) {// 初始化代码}@Overridepublic void process(Record<String, UserEvent> record) {String userId = record.key();UserEvent event = record.value();// 使用computeIfPresent更新现有会话activeSessions.computeIfPresent(userId, (k, session) -> {session.addEvent(event);if (session.isExpired()) {// 会话过期,发送结果并移除context.forward(new Record<>(userId, session.toSummary(), record.timestamp()));return null; // 返回null会删除该条目}return session;});// 使用computeIfAbsent创建新会话activeSessions.computeIfAbsent(userId, k -> {SessionSummary newSession = new SessionSummary(event);return newSession;});}@Overridepublic void punctuate(long timestamp) {// 定期检查并关闭过期会话activeSessions.entrySet().removeIf(entry -> {if (entry.getValue().isExpired()) {context.forward(new Record<>(entry.getKey(), entry.getValue().toSummary(), timestamp));return true;}return false;});}@Overridepublic void close() {// 清理代码}
}

在这个例子中,我们结合使用了computeIfPresent()computeIfAbsent()方法来高效地管理会话状态,确保会话的正确创建、更新和过期处理。

7.4 窗口化聚合

对于基于时间的窗口聚合,merge()方法特别有用:

import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import java.util.HashMap;
import java.util.Map;public class WindowedAggregatorProcessor implements Processor<String, SalesEvent, String, SalesSummary> {private final Map<String, SalesSummary> windowSums = new HashMap<>();@Overridepublic void init(ProcessorContext<String, SalesSummary> context) {// 初始化代码}@Overridepublic void process(Record<String, SalesEvent> record) {String productId = record.key();SalesEvent event = record.value();// 使用merge方法合并销售事件到窗口汇总windowSums.merge(productId, new SalesSummary(event), (existingSum, newEvent) -> existingSum.merge(newEvent));// 定期发送窗口汇总结果if (shouldSendWindowResult()) {windowSums.forEach((k, v) -> context.forward(new Record<>(k, v, record.timestamp())));windowSums.clear(); // 清空窗口}}@Overridepublic void close() {// 清理代码}private boolean shouldSendWindowResult() {// 实现窗口触发逻辑return false;}
}

在这个例子中,merge()方法简化了窗口内销售事件的聚合过程,使我们能够高效地计算每个产品在当前窗口内的销售汇总。

8. 性能考虑与Kafka Stream集成

在Kafka Stream应用中使用这些compute方法时,需要注意以下几点:

  1. 线程安全性:Kafka Stream处理器通常是单线程处理每个分区,因此不需要额外的同步措施。但如果在多线程环境中使用HashMap,应考虑使用ConcurrentHashMap及其原子性方法。
  2. 状态存储:对于需要持久化的状态,Kafka Stream提供了Stores工厂类来创建持久化状态存储。这些存储底层可能使用类似HashMap的结构,但提供了容错能力。
  3. 内存管理:在处理大规模数据时,要注意HashMap的内存使用情况,避免OOM错误。可以考虑使用更高效的数据结构或定期清理过期状态。
  4. 容错性:虽然compute方法提供了原子性操作,但在分布式环境中,还需要考虑Kafka Stream提供的检查点机制来确保状态的一致性。

9. 总结

Java 8引入的compute()computeIfAbsent()computeIfPresent()merge()等方法极大地增强了HashMap的功能,使开发者能够以更简洁、更安全的方式执行复杂的Map更新操作。这些方法特别适合以下场景:

  • 需要基于当前值计算新值时(如计数器增加)
  • 当需要根据键和当前值决定是否保留、更新或删除条目时
  • 延迟初始化或缓存实现
  • 合并值的场景

在Kafka Stream数据处理中,这些方法特别有价值,因为它们:

  1. 简化了状态管理代码
  2. 提供了原子性操作,避免了竞态条件
  3. 使实时聚合和计数实现更加简洁
  4. 与Kafka Stream的处理器API完美配合

掌握这些方法可以显著提高Kafka Stream应用的开发效率和代码质量,特别是在需要维护复杂状态的应用场景中。无论是简单的计数器还是复杂的会话窗口聚合,这些compute方法都能提供优雅的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/93448.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/93448.shtml
英文地址,请注明出处:http://en.pswp.cn/diannao/93448.shtml

如若内容造成侵权/违法违规/事实不符,请联系英文站点网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【php中ssti模板注入讲解】

php中场景模板 1. Smarty 使用安全模式来执行不信任的模板,只运行PHP白名单里的函数。 2. Twig 与Smarty类似,不过无法利用该模板的SSTI调用静函数。 php常见模板入门 Smarty 不使用预先准备好的模板 使用预先准备好的模板 对值进行拼接后使用模板展示 设置在模板中…

Redis学习07-Redis的过期策略

Redis 过期策略 什么是过期策略 Redis 的过期策略用于管理设置了过期时间&#xff08;TTL&#xff09;的键&#xff0c;确保在键过期后能够被及时删除&#xff0c;从而释放内存 整体策略 Redis 采用的是定期删除惰性删除的组合策略 1. 定期删除 原理&#xff1a;周期性的从过期…

深入解读c++(命名空间)

目录 1关于命名空间 1.1是什么 1.2解决了什么问题 2.命名空间的定义 2.2命名空间的嵌套定义 3命名空间的特点 3.1命名空间不会影响生命周期 3.2命名空间只能在全局域里定义&#xff0c;当然嵌套定义时例外。 3.3在不同文件中定义相同名称的命名空间 4.命名空间的使用 …

ClickHouse高性能实时分析数据库-高性能的模式设计

告别等待&#xff0c;秒级响应&#xff01;这不只是教程&#xff0c;这是你驾驭PB级数据的超能力&#xff01;我的ClickHouse视频课&#xff0c;凝练十年实战精华&#xff0c;从入门到精通&#xff0c;从单机到集群。点开它&#xff0c;让数据处理速度快到飞起&#xff0c;让你…

ArkTS懒加载LazyForEach的基本使用

在 ArkTS 的开发中&#xff0c;如果你要渲染一个很长的列表&#xff0c;比如商品列表、评论列表或者朋友圈动态&#xff0c;用传统的循环结构&#xff08;比如 ForEach&#xff09;很容易导致性能问题&#xff0c;尤其是加载慢、卡顿甚至内存暴涨。 这时候就要用到 懒加载渲染组…

动态规划:从入门到精通

本文全章节一共一万七千多字&#xff0c;详细介绍动态规划基础与进阶技巧&#xff0c;全篇以代码为主&#xff0c;认真读完理解&#xff0c;你对动态规划的理解一定会有一个质的飞跃。一、动态规划简介: 动态规划&#xff08;Dynamic Programming&#xff0c;简称DP&…

八股训练营 40 天心得:一场结束,也是一场新的开始

八股训练营 40 天心得&#xff1a;一场结束&#xff0c;也是一场新的开始 感谢卡哥的训练营组织卡码笔记&#xff0c;对即将参加秋招的我们帮助了很多&#xff0c;感谢卡哥的开源代码随想录代码随想录 四十天前&#xff0c;我带着一颗不安却坚定的心&#xff0c;踏入了这场“…

STM32系统定时器(SysTick)详解:从原理到实战的精确延时与任务调度

前言&#xff1a;为什么SysTick是嵌入式开发的"瑞士军刀"&#xff1f; 在STM32开发中&#xff0c;我们经常需要精确的延时功能&#xff08;如毫秒级延时控制LED闪烁&#xff09;或周期性任务调度&#xff08;如定时采集传感器数据&#xff09;。实现这些功能的方式有…

【微信小程序】12、生物认证能力

1、生物认证 生物认证 是一种基于个体独特生理或行为特征进行身份验证的技术,广泛应用于安全、金融、医疗等领域。 小程序目前暂时只支持指纹识别认证。 2、查询支持的生物认证方式 获取本机支持的 SOTER 生物认证方式&#xff0c;文档 onLoad(options) {wx.checkIsSuppor…

高级机器学习

机器学习常见方法涉及方法&#xff1a;2.半监督学习3.无监督学习4.度量学习5.迁移学习6.多示例多标记学习7.在线学习8.元学习9.联邦学习10.强化学习11.概率图模型独立同分布独立指的是&#xff0c;样本集包括训练集测试集的任意两个样本之间都是不相关的。在表示样本的特征确定…

Chrome 提示 “此扩展程序不再受支持”(MacOS/Windows)

原因 最新 Chrome 使用 Manifest V3, 并在新版浏览器中 停止 V2 支持 处理方法 MacOS 新建一个后缀为 .mobileconfig 的文件, 内容参考 <?xml version"1.0" encoding"UTF-8"?> <!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN&…

C++20协程实战:高效网络库、手机终端、多媒体开发开发指南

基于C++协程和事件循环的网络库 以下是基于C++协程和事件循环的网络库实例,涵盖常见场景和功能实现。示例基于libuv、Boost.Asio或自定义事件循环,结合C++20协程(如std::coroutine)或其他协程库(如cppcoro)实现。 基础TCP服务器 #include <cppcoro/task.hpp> #in…

数据库4.0

索引 事务 JDBC~ 目录 一、MySQL索引 1.0 概述 2.0 相关操作 3.0 注意 4.0 索引背后的原理的理解 二、 事务 1.0 原子性 2.0 隔离性 (1)并发执行 (2) 出现的问题 3.0 使用 三、JDBC编程 1.0 概述 2.0 如何下载驱动包 3.0 jar如何引入到项目之中 4.0 jdbc…

HarmonyOS-ArkUI Web控件基础铺垫6--TCP协议- 流量控制算法与拥塞控制算法

HarmonyOS-ArkUI Web控件基础铺垫1-HTTP协议-数据包内容-CSDN博客 HarmonyOS-ArkUI Web控件基础铺垫2-DNS解析-CSDN博客 HarmonyOS-ArkUI Web控件基础铺垫3--TCP协议- 从规则本质到三次握手-CSDN博客 HarmonyOS-ArkUI Web控件基础铺垫4--TCP协议- 断联-四次挥手解析-CSDN博客…

Dify 从入门到精通(2/100 篇):Dify 的核心组件 —— 从节点到 RAG 管道

Dify 的核心组件&#xff1a;从节点到 RAG 管道 引言 在 Dify 博客系列&#xff1a;从入门到精通&#xff08;100 篇&#xff09; 的第一篇《Dify 究竟是什么&#xff1f;真能开启低代码 AI 应用开发的未来&#xff1f;》中&#xff0c;我们全面介绍了 Dify 的定位、核心特点…

在线培训、远程示教——医疗器械行业的直播解决方案

文章目录前言一、医疗器械直播应用的两大核心场景二、直播平台在医疗场景中的关键技术支持点三、典型功能实现原理总结前言 医疗器械行业对“培训”和“示教”的专业性要求极高&#xff0c;传统的线下模式常因时间、空间、人员成本等受限而效率低下。而随着高清低延迟视频技术…

Mqttnet的MqttClientTlsOptions.CertificateValidationHandler详解

MqttClientTlsOptions.CertificateValidationHandler 是 MQTTnet 库中用于自定义 TLS 证书验证逻辑的关键回调函数。在 MQTT 客户端与服务器建立 TLS 连接时&#xff0c;该回调允许你覆盖默认的证书验证流程&#xff0c;实现自定义的安全策略。核心作用当 MQTT 客户端通过 TLS …

【图像噪点消除】——图像预处理(OpenCV)

目录 1 均值滤波 2 方框滤波 3 高斯滤波 4 中值滤波 5 双边滤波 6 小结 噪声&#xff1a;图像中的一些干扰因素。通常是由于图像采集设备、传输信道等因素造成的&#xff0c;表现为图像中随机的亮度。常见的噪声类型有高斯噪声和椒盐噪声。高斯噪声是一种分布符合正态分布…

Vulnhub napping-1.0.1靶机渗透攻略详解

一、下载靶机 下载地址&#xff1a;https://download.vulnhub.com/napping/napping-1.0.1.ova 下载好后使用VM打开&#xff0c;将网络配置模式改为net&#xff0c;防止桥接其他主机干扰&#xff08;桥接Mac地址也可确定主机&#xff09;。 二、发现主机 使用nmap扫描没有相应…

Kubernetes自动扩容方案

Kubernetes 自动扩容可以概括为 “三层六类”&#xff1a;层级类型触发维度官方/社区方案一句话说明Pod 级HPACPU / 内存 / 自定义 / 外部指标内置副本数横向扩缩&#xff0c;最常用VPACPU / 内存社区组件单 Pod 资源竖向扩缩&#xff0c;不改副本数KEDA任意事件&#xff08;队…