Coder Social home page Coder Social logo

浅尝Hazelcast about blog HOT 14 OPEN

CharLemAznable avatar CharLemAznable commented on August 30, 2024
浅尝Hazelcast

from blog.

Comments (14)

CharLemAznable avatar CharLemAznable commented on August 30, 2024

参考来源:

原文链接  
【Hazelcast系列  一】Hazelcast 概览
【Hazelcast系列  二】创建Hazelcast集群
【Hazelcast系列  三】分布式数据结构简介
【Hazelcast系列  四】分布式map
【Hazelcast系列  五】分布式queue
【Hazelcast系列  六】分布式MultiMap
【Hazelcast系列  七】分布式Set
【Hazelcast系列  八】分布式List
【Hazelcast系列  九】主题(Topic)
【Hazelcast系列  十】可靠主题(Reliable Topic)
【Hazelcast系列 十一】分布式事件
【Hazelcast系列 十二】分布式计算

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

概览

Hazelcast是基于内存的数据网格开源项目,同时也是该公司的名称。

Hazelcast提供弹性可扩展的分布式内存计算,被公认是提高应用程序性能和扩展性最好的方案。

Hazelcast以Jar包的方式发布,可以轻松地内嵌已有的项目或应用中,并提供分布式数据结构和分布式计算工具。

Hazelcast具有高可扩展性和高可用性(100%可用,从不失败)。

Hazelcast是基于内存的、高速的、可弹性扩展的、对开发者友好的NoSQL

Hazelcast集群中的节点是对等的,因此Hazelcast无单点故障问题。

Hazelcast被设计为可以扩展到成百上千个节点,集群的内存存储能力和计算能力可以维持线性增加。集群内每两个节点之间都有一条TCP连接,所有的交互都通过该TCP连接。

Hazelcast集群有两种部署模式:内嵌模式,客户端/服务器模式。

  • 内嵌模式
    在内嵌部署模式下,Hazelcast集群中的一个节点包括:应用程序,Hazelcast分区数据,Hazelcast服务三部分。内嵌部署模式的优势是读取数据延迟低。
  • 客户端/服务器部署模式
    可以部署一个提供服务的独立Hazelcast集群,服务集群可以独立创建,独立扩展。客户端通过和集群中的节点交互来获取Hazelcast数据和服务。

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

创建Hazelcast集群

集群发现机制

Hazelcast支持以下发现机制:

  • TCP
  • 多播
  • GCP Cloud Discovery
  • AWS Cloud Discovery
  • Apache jclouds Cloud Discovery
  • Azure Cloud Discovery
  • Zookeeper
  • Consul
  • Etcd
  • PCF
  • OpenShift
  • Eureka
  • Heroku
  • Kubernetes

当Hazelcast集群建立完成,集群间成员使用TCP/IP互相通信,这不受集群发现机制的约束。

TCP发现机制

可以把Hazelcast配置成一个完整的TPC/IP集群(发现和通信都使用TCP/IP协议)。

无需列出所有集群成员,但是当新成员加入时,至少有一个列出的成员必须在集群中处于活跃状态。

多播发现机制

Hazelcast允许集群成员间使用多播通信来相互发现。

多播是否可用或允许使用依赖于环境,使用前请谨慎确认,避免环境不允许使用而导致功能异常。

创建集群

通过cluster-name是创建集群最简单的方法,cluster-name也是对集群隔离分组的简单方式。

一个JVM可以运行多个Hazelcast实例,每个实例只能加入一个组(集群),每个实例只和所属集群的实例交互,与其他集群的实例隔离。

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

加载Hazelcast配置时, 可使用:

  • 系统配置参数(system property)指定XML/YAML配置文件的加载路径
  • 搜寻当前classpath路径寻找hazelcast.xml
  • 使用默认的hazelcast-default.xml

编码方式配置Hazelcast时, 可使用:

  • 创建com.hazelcast.config.Config类型对象, 包括:
    XML格式配置:
    com.hazelcast.config.ClasspathXmlConfig
    com.hazelcast.config.FileSystemXmlConfig
    com.hazelcast.config.InMemoryXmlConfig
    com.hazelcast.config.UrlXmlConfig
    YAML格式配置:
    com.hazelcast.config.ClasspathYamlConfig
    com.hazelcast.config.FileSystemYamlConfig
    com.hazelcast.config.InMemoryYamlConfig
    com.hazelcast.config.UrlYamlConfig
  • 创建com.hazelcast.config.ConfigBuilder类型对象并调用build方法, 包括:
    XML格式配置:
    com.hazelcast.config.XmlConfigBuilder
    YAML格式配置:
    com.hazelcast.config.YamlConfigBuilder

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

分布式数据结构简介

  • 标准集合
    • Map: java.util.Map 的分布式实现。
    • Queue: java.util.concurrent.BlockingQueue 的分布式实现。
    • Ringbuffer:Java 中没有对应的数据结构,Ringbuffer 通常用于可靠的事件系统。
    • Set: java.util.Set 的分布式并发实现。
    • List: java.util.List 的分布式并发实现。
    • Multimap: com.google.common.collect.Multimap 的分布式实现,支持存储重复键。
    • Replicated Map:不支持分区的 Map 数据结构,集群所有成员都有全量数据。
    • Cardinality Estimator :实现了 HyperLogLog 算法的数据结构。(参见:#19
  • 主题
    主题是用于多个订阅者的分布式消息分发机制,Hazelcast支持通过主题进行消息的可靠分发。
  • 并发工具
    • FencedLock: java.util.concurrent.locks.Lock 的分布式实现,可以保证集群中只有一个线程可以获得锁。
    • ISemaphore: java.util.concurrent.Semaphore 的分布式实现。
    • IAtomicLong: java.util.concurrent.atomic.AtomicLong 的分布式实现。
    • IAtomicReference: java.util.concurrent.atomic.AtomicReference 的分布式实现。
    • FlakeIdGenerator:用于生产集群范围内的唯一标识符。
    • ICountdownLatch: java.util.concurrent.CountDownLatch 的分布式实现。
    • PN counter:一个分布式数据结构,其中每个Hazelcast实例都可以递增和递减计数器值,并将这些更新传播到所有副本。
  • Event Journal:是一种分布式数据结构,用于存储map或缓存上操作的历史记录。

基于分区策略,Hazelcast有两种类型的分布式数据结构:分区数据结构非分区数据结构

分区数据结构包括:

  • Map
  • MultiMap
  • Cache
  • Event Journal

非分区数据结构包括:

  • Queue
  • Set
  • List
  • Ringbuffer
  • FencedLock
  • ISemaphore
  • IAtomicLong
  • IAtomicReference
  • FlakeldGenerator
  • ICountdownLatch
  • Cardinality Estimator
  • PN Counter
分布式对象的加载和销毁

一个分布式对象的加载,首先需要创建一个Hazelcast实例,然后通过Hazelcast实例调用对应的get方法获取。

Hazelcast中的大多数分布式对象都是懒加载,只有在第一次操作对象时才创建对象。使用已经创建的对象,可以直接通过对象的引用访问,无需重新加载一次。

销毁分布式对象可以使用destory方法,该方法会清理和释放对象的所有资源。

使用该方法时需要十分谨慎:Hazelcast中所有的分布式数据结构都被设计为当访问对象的时刻才创建对象,因此即便已经销毁了对象,只要对该对象还有访问,对象就会被重新创建。

控制分区

Hazelcast使用分布式对象的名字决定该对象应该存储在哪个分区。

如果想将两个分布式对象存储在同一个分区,可以使用@符设置分区key,从而将两个队列存储在同一个分区。

以分布式队列为例:

IQueue<String> queue1 = instance1.getQueue("q1@test");
IQueue<String> queue2 = instance1.getQueue("q2@test");

Hazelcast提供了getPartitionKey方法用于获取分区key的信息,这将有助于创建一个和已有对象存储在同一个分区的新对象。

以分布式队列为例:

System.out.println("queue1 partition key = " + queue1.getPartitionKey());
System.out.println("queue2 partition key = " + queue2.getPartitionKey());

输出:

queue1 partition key = test
queue2 partition key = test
高可用性(HA)

当集群中有一个成员发生故障时,保存相同数据的备份副本会将包括权限、锁在内的所有数据分配给集群内其他成员,无数据丢失。

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

分布式map

Hazelcast对分布式map的数据和备份数据进行分区,并将数据平均分配给集群所有的节点。

分布式map的默认备份数为一。Hazelcast中的map有两种类型的备份:同步和异步。

Hazelcast通过MapStore可以从持久存储中加载数据到map/将map数据写入持久存储。

同步备份

创建同步备份只需要设置backup-count

<hazelcast>
    ...
    <map name="xxx">
        <backup-count>1</backup-count>
    </map>
    ...
</hazelcast>

backup-count可设置的最大值为6。

在同步备份场景下,备份操作会阻塞其他操作直到备份数据已经同步到集群节点并收到确认,因此备份会在put操作完成前更新,从而提供了集群数据更高的可靠性,但可能带来额外的阻塞成本并产生时延问题。

异步备份

异步备份不会阻塞对map的操作,不需要确认,而且会在特定的时间点执行。

可以通过参数asyn-backup-count设置异步备份。

<hazelcast>
    ...
    <map name="xxx">
        <backup-count>0</backup-count>
        <async-backup-count>1</async-backup-count>
    </map>
    ...
</hazelcast>

map可以同时拥有同步备份和异步备份。

备份可读

在某个节点上调用map.get(key)方法时,默认会从真正拥有该数据的节点读取数据。

通过设置read-backup-data为true,可以允许备份数据可读:

<hazelcast>
    ...
    <map name="xxx">
        <backup-count>0</backup-count>
        <async-backup-count>1</async-backup-count>
        <read-backup-data>true</read-backup-data>
    </map>
    ...
</hazelcast>

该参数的默认值为false。备份数据可读可以提高读操作性能,但是可能产生脏读问题。

驱逐map数据

map中的数据会一直存在除非手动删除或使用驱逐策略驱逐。

<hazelcast>
    ...
    <map name="xxx">
        <time-to-live-seconds>0</time-to-live-seconds>
        <max-idle-seconds>0</max-idle-seconds>
        <eviction eviction-policy="LRU" max-size-policy="PER_NODE" size="5000"/>
    </map>
    ...
</hazelcast>
  • time-to-live-seconds:设置每条数据在map中的保存时间
  • max-idle-seconds:设置数据在map中的最大空闲时间
  • eviction-policy:数据量超过设置的最大值时采用的驱逐策略
    • NONE:默认策略,不会驱逐数据
    • LRU:最近最少使用策略
    • LFU:最少使用策略
  • max-size-policysize:map存储数据的最大值
    • PER_NODE:集群节点存储的最大数据量,默认策略
    • PER_PARTITION:每个分区存储的最大数据量
    • USED_HEAP_SIZE:每个Hazelcast实例使用的最大堆大小(MB)
    • USED_HEAP_PERCENTAGE:每个Hazelcast实例使用的堆内存大小比例
    • FREE_HEAP_SIZE:最小空闲堆内存(MB)
    • FREE_HEAP_PERCENTAGE:最小空闲内存比例
    • USED_NATIVE_MEMORY_SIZE:每个Hazelcast使用的最大直接内存(MB)
    • USED_NATIVE_MEMORY_PERCENTAGE:每个实例使用的最大直接内存比例
    • FREE_NATIVE_MEMORY_SIZE:每个实例的最小空闲直接内存(MB)
    • FREE_NATIVE_MEMORY_PERCENTAGE:每个实例的直接内存最小空闲比例

每条数据可以设置自己的time-to-live-secondsmax-idle-seconds参数,如果不设置则使用map的参数值。

V put(K key, V value, long ttl, TimeUnit ttlUnit);
V put(K key, V value, long ttl, TimeUnit ttlUnit, long maxIdle, TimeUnit maxIdleUnit);

内存存储格式

Hazelcast默认在内存中以二进制的格式存储数据。有时候以对象的形式存储数据可以加速本地处理,尤其对于查询操作。

Hazelcast支持以下三种数据格式:

  • BINARY:数据包括key和value都是序列化的二进制格式存储在内存中。
  • OBJECT:数据以对象存储在内存中。可以减少反序列化的开销,适用于数据复杂和需要处理大量数据的场景。
  • NATIVE:Hazelcast 企业版特性,这种格式和BINARY格式类似,但是存储在直接内存中。

元数据策略

IMap可以在更新时自动预处理多种数据类型,以加速对数据的查询,当前只有HazelcastJsonValue这种类型支持。启用创建元数据创建后,IMap会创建有关受支持类型对象的元数据,并在查询时使用此元数据。这不影响除支持的类型外,操作其他任何类型的对象的时延和吞吐量。

默认开启,可关闭该功能:

<hazelcast>
    ...
    <map name="xxx">
        <metadata-policy>OFF</metadata-policy>
    </map>
    ...
</hazelcast>

IMap实现是线程安全的,也可以使用Hazelcast提供的悲观锁和乐观锁来解决"竞态"问题。

悲观锁

锁住要操作map的entry直到操作完成。要使用悲观锁可以调用IMap提供的map.lockmap.unlock方法。

map.lock( key );
try {
    Value value = map.get( key );
    Thread.sleep( 10 );
    value.amount++;
    map.put( key, value );
} finally {
    map.unlock( key );
}

IMap的悲观锁是 可重入 的但是 不是公平锁

乐观锁

IMap的replace方法使用乐观锁。

replace根据数据在内存的存储格式比较值。

如果想使用自定义的equals方法进行相等性比较,数据在内存中的存储格式必须是OBJECT,否Hazelcast首先将数据序列化然后进行比较。

Value oldValue = map.get( key );
Value newValue = new Value( oldValue );
Thread.sleep( 10 );
newValue.amount++;
map.replace( key, oldValue, newValue );
悲观锁 vs. 乐观锁

悲观锁和乐观锁没有绝对的优劣,需要根据业务场景选择不同的锁策略。

对于大多数只读系统,乐观锁更加合适,和悲观锁相比乐观锁有更高的性能。

对于同一个key存在大量更新的场景使用悲观锁更好,从数据一致性来看悲观锁比乐观锁更加可靠。

解决 ABA 问题

乐观锁的CAS(Compare and Swap)机制导致ABA问题

3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。
一个线程one从内存位置V中取出A,
这时候另一个线程two也从内存中取出A,并且two进行了一些操作变成了B,然后two又将V位置的数据变成A,
这时候线程one进行CAS操作发现内存中仍然是A,然后one操作成功。

为了解决这类问题,可以给每个数据增加一个版本,在操作之前检查版本以确保数据没有被更改。

使用悲观锁避免锁脑裂
在"双机热备"高可用(HA)系统中,当联系两个节点的"心跳线"断开时(即两个节点断开联系时),本来为一个整体、动作协调的HA系统,就分裂成为两个独立的节点(即两个独立的个体)。由于相互失去了联系,都以为是对方出了故障,两个节点上的HA软件像"裂脑人"一样,"本能"地争抢"共享资源"、争起"应用服务"。就会发生严重后果:1)或者共享资源被瓜分、两边"服务"都起不来了;2)或者两边"服务"都起来了,但同时读写"共享存储",导致数据损坏(常见如数据库轮询着的联机日志出错)。

可以配置在锁之前先检查集群成员数,如果检查失败,锁操作抛出SplitBrainProtectionException并失败。

配置如下:

<hazelcast>
    ...
    <map name="xxx">
        <split-brain-protection-ref>map-actions-split-brain-protection</split-brain-protection-ref>
    </map>
    <lock name="xxx">
        <split-brain-protection-ref>map-lock-actions-split-brain-protection</split-brain-protection-ref>
    </lock>
    ...
</hazelcast>

其中:
map-lock-actions-split-brain-protection配置用于map锁定,
map-actions-split-brain-protection用于其他map操作。

获取map统计信息

可以使用getLocalMapStats()方法获取map的统计信息,比如entry的主备数量,最后更新时间以及被锁的entry数量。

如果需要集群范围内的map统计信息,需要获取每个集群成员的map统计信息并将信息合并,或者从Hazelcast管理中心获取。

需要配置statistics-enabled的值为true:

<hazelcast>
    ...
    <map name="xxx">
        <statistics-enabled>true</statistics-enabled>
    </map>
    ...
</hazelcast>

可以使用getEntryView(key)方法获取map中entry的统计信息:

EntryView entry = map.getEntryView("first order");
System.out.println ( "size in memory  : " + entry.getCost() );
System.out.println ( "creationTime    : " + entry.getCreationTime() );
System.out.println ( "expirationTime  : " + entry.getExpirationTime() );
System.out.println ( "number of hits  : " + entry.getHits() );
System.out.println ( "lastAccessedTime: " + entry.getLastAccessTime() );
System.out.println ( "lastUpdateTime  : " + entry.getLastUpdateTime() );
System.out.println ( "version         : " + entry.getVersion() );
System.out.println ( "key             : " + entry.getKey() );
System.out.println ( "value           : " + entry.getValue() );

使用谓词(Predicate)监听map数据

可以监听map中对特定数据的操作,Hazelcast提供了hazelcast.map.entry.filtering.natural.event.types属性,下表展示配置参数的值为true和不配置参数或值为false时Hazelcast行为区别:

  Default True
旧值满足谓词,新值不满足谓词 无事件发送 发送REMOVED
新旧值均满足谓词 发送UPDATED 事件 发送UPDATED 事件
新旧值均不满足谓词 无事件发送 无事件发送
旧值不满足谓词,新值满足谓词 发送UPDATED 事件 发送ADDED 事件

实现EntryAddedListenerEntryUpdatedListenerEntryRemovedListener监听事件。

public class CustomizeEntryListener implements EntryAddedListener<String, Order>,
        EntryUpdatedListener<String, Order>,
        EntryRemovedListener<String, Order> {
}
Config config = new Config();
config.setProperty("hazelcast.map.entry.filtering.natural.event.types", "true");
HazelcastInstance instance = Hazelcast.newHazelcastInstance();
IMap<String, Order> map = instance.getMap("data");
map.addEntryListener(new CustomizeEntryListener(), (Predicate<String, Order>) entry -> "car".equals(entry.getValue().getName()), true);
map.put("first order", new Order("car"));
map.put("first order", new Order("car"));
map.remove("first order");

使用谓词(Predicate)批量删除

Hazelcast提供了removeAll()方法以实现根据谓词删除所有数据:

void removeAll(Predicate<K, V> predicate);

添加拦截器

拦截器是同步的,可以修改操作的行为,改变操作的值甚至取消操作(通过抛出一个异常)。

map的拦截器组成一个链,因此多次添加一个拦截器会导致相同的拦截器被执行多次。在成员初始化的时候添加拦截器会轻易的导致这种场景,因为多个成员会添加相同的拦截器。

IMap提供了两个方法用于添加和删除拦截器:addInterceptorremoveInterceptor

防止内存溢出

使用map的查询方法很容易触发内存溢出异常,尤其在集群规模较大或堆很大的条件下。例如IMap.entrySet()IMap.values()或谓词选择错误的查询。

为了阻止这种异常的发生,可以配置每个查询返回的最大数据量。不同于SQL中的limit,基于查询的操作的最大结果限制旨在作为最后一道防线,以防止检索超出其处理能力的数据。

使用QueryResultSizeLimiter组件启用限制。每个QueryResultSizeLimiter组件运行在集群成员所有的分区之上,因此只要集群成员没有超过限制组件就会一直收集信息。如果超过限制会返回客户端一个QueryResultSizeExceededException异常。

可以通过下面的两个系统属性配置查询结果的大小限制:

  • hazelcast.query.result.size.limit:map查询返回结果的最大值。该值定义了单次查询返回的最大数据量,如果单次查询返回的数据量超过了该值则会抛出一个QueryResultSizeExceededException异常。
  • hazelcast.query.max.local.partition.limit.for.precheck:本地分区最大值。

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

分布式queue

Hazelcast queue是java.util.concurrent.BlockingQueue的分布式实现,Hazelcast的queue允许集群所有成员和其交互。

集群内所有的操作都满足FIFO的顺序,用户自定义的对象出队和入队都需要序列化。

所有的元素都会拷贝到实例本地,实例对queue的遍历都是本地遍历。可以使用ItemListener监听queue中元素的添加和移除。

向queue添加item时,Hazelcast会给每个item分配一个有序递增的itemId 。

通过max-size设置容量即可将Hazelcast中的一个分布式队列变为有界队列。max-size定义了queue可以存储的最大数据量。一旦queue中的数据量达到该值,put操作将会被阻塞直到queue的数据量低于max-size

<hazelcast>
    ...
    <queue name="queue">
        <max-size>10</max-size>
    </queue>
    ...
</hazelcast>

Hazelcast通过QueueStore可以从持久存储中加载数据到queue/将queue数据写入持久存储。

脑裂保护

可以配置在应用queue操作前检查集群最小可用节点数,检查可以避免在网络分区产生的场景下queue的操作可以在所有集群都操作成功。

下面的操作支持脑裂保护检查:

  • Collection.addAll()
  • Collection.removeAll(), Collection.retainAll()
  • BlockingQueue.offer(), BlockingQueue.add(), BlockingQueue.put()
  • BlockingQueue.drainTo()
  • IQueue.poll(), Queue.remove(), IQueue.take()
  • BlockingQueue.remove()
  • Collection.clear()
  • Collection.containsAll(), BlockingQueue.contains()
  • Collection.isEmpty()
  • Collection.iterator(), Collection.toArray()
  • Queue.peek(), Queue.element()
  • Collection.size()
  • BlockingQueue.remainingCapacity()

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

分布式MultiMap

Hazelcast中的MultiMap是一个key可以有多个值的特殊map。和其他数据结构一样,MultiMap也是分布式和线程安全的。

MultiMap不是java.util.Map的分布式实现,MultiMap支持Map的多数功能,但是不支持索引,谓词和加载/存储。

MultiMap支持通过EntryListener监听数据变化。

在使用MultiMap时,可以通过valueCollectionType配置值的集合类型是Set或List。

  • 如果选择Set重复值和空置是不允许的而且顺序是无关紧要的。
  • 如果选择使用List排序是相关的而且可以允许存储重复的值。

通过statisticsEnabled配置可以开启对MultiMap的统计,使用getLocalMultiMapStats()方法获取统计信息。

目前,MultiMap不支持数据驱逐。

脑裂保护

MultiMapTransactionalMultiMap支持配置在应用操作前检查集群可用节点数。

支持脑裂保护的方法:

MultiMap

  • clear
  • forceUnlock
  • lock
  • put
  • remove
  • tryLock
  • unlock
  • containsEntry
  • containsKey
  • containsValue
  • entrySet
  • get
  • isLocked
  • keySet
  • localKeySet
  • size
  • valueCount
  • values

TransactionalMultiMap

  • put
  • remove
  • size
  • get
  • valueCount

配置脑裂保护:

<hazelcast>
    ...
    <multimap name="xxx">
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </multimap>
    ...
</hazelcast>

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

分布式Set

Hazelcast中的ISetjava.util.Set的分布式和并发实现。

  • 不允许重复元素。
  • 不保证元素顺序。
  • 不是分区数据结构。
  • 扩容不能超过单节点容量限制。
  • 备份数据存储在另外节点的一个分区中。
  • 所有数据被拷贝到本地,本地数据遍历。
  • 使用对象序列化的字节版本实现相等比较。

ISet可以通过ItemListener来监听元素的添加的和移除。

脑裂保护

ISet&TransactionalSet支持配置在应用操作前检查集群节点最小值,以保证在网络分区发生时,操作不会在所有集群都操作成功。

支持脑裂保护的方法:

ISet:

  • add
  • addAll
  • clear
  • remove
  • removeAll
  • contains
  • containsAll
  • isEmpty
  • iterator
  • size
  • toArray

TransactionalSet:

  • add
  • remove
  • size

配置脑裂保护:

<hazelcast>
    ...
    <set name="xxx">
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </set>
    ...
</hazelcast>

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

分布式List

IListISet类似,但是IList支持存储重复元素。

  • 除了支持存储重复元素,List还能保证元素的顺序。
  • List也是非分区数据结构。
  • List容量扩展不能超过单节点容量限制。
  • 数据被拷贝到本地,本地遍历。

可使用ItemListener监听数据的添加和移除。

脑裂保护

IList&TransactionalList支持配置在应用操作前检查集群节点最小值,以包装在网络分区发生时,操作不会在所有集群都操作成功。

支持脑裂保护的方法:

IList:

  • add
  • addAll
  • clear
  • remove
  • removeAll
  • set
  • add
  • contains
  • containsAll
  • get
  • indexOf
  • isEmpty
  • iterator
  • lastIndexOf
  • listIterator
  • size
  • subList
  • toArray

TransactionalList:

  • add
  • remove
  • size

配置脑裂保护:

<hazelcast>
    ...
    <list name="xxx">
        <split-brain-protection-ref>splitbrainprotection-name</split-brain-protection-ref>
    </list>
    ...
</hazelcast>

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

主题(Topic)

Hazelcast支持常用的publish/subscribe消息模型。
发布和订阅都是集群级操作,一个member订阅主题,其实是订阅了集群内所有member发布到主题的消息,即使member是在订阅之后加入集群。

主题两个统计量:发布的消息数和接收的消息数(member启动之后),只反映本节点的统计信息,非全局统计信息。
统计数据没有备份,如果member下线,统计数据会丢失。

集群的所有成员都有集群内订阅的列表。当一个新成员订阅一个主题时,该成员会向集群所有的成员发送该订阅消息,如果一个新成员加入集群,新成员也会收到目前集群内所有的订阅信息。

消息顺序和发布顺序

如果globalOrderEnabled设置为false,消息不会被排序,监听器将会按照消息发布的顺序处理消息。
即:只能保证每个member发布的消息按顺序处理,不能保证不同member消息之间的处理顺序。

如果globalOrderEnabled设置为true,监听同一个主题的所有member得到的消息的顺序是一致。

StripeExecutor负责消息的分发和接收。StripeExecutor中线程的数量由参数hazelcast.event.thread.count配置,默认线程数为5。

配置主题

主题配置支持以下元素:

  • statistics-enabled:是否收集统计信息,默认为true。
  • global-ordering-enabled:是否保证全局有序,默认值false。
  • message-listeners:主题监听器。

除去上面的配置下面的系统属性也和主题相关,但是不是主题所特有的配置:

  • hazelcast.event.queue.capacity:默认值 1,000,000
  • hazelcast.event.queue.timeout.millis:默认值 250
  • hazelcast.event.thread.count:默认值 5

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

可靠主题(Reliable Topic)

主题中的事件可能丢失,为了提高数据可靠性Hazelcast提供了可靠主题。
为了保证数据的可靠性,可靠主题使用Ringbuffer数据结构备份主题的事件。

和普通主题相比,可靠主题有以下优势:

  • 主题中的事件不会丢失。Ringbuffer默认有一个同步备份。
  • 每个可靠主题使用独立的Ringbuffer,各个主题之间互不影响。
  • 在脑裂环境中可靠主题无法工作。

缓慢的消费者

可靠主题提供了对消费速度慢的消费的控制和管理方法。
因为不知道速度慢的消费者何时能赶上,因此将对应的事件无限期保存在内存中是不明智的。
可以通过Ringbuffer的容量来对内存中的数量进行限制。
当Ringbuffer存储的数据量超过容量限制,可以选择下面的四种策略进行处理:

  • DISCARD_OLDEST:丢弃老数据,即使设置了TTL。
  • DISCARD_NEWEST:丢弃新数据。
  • BLOCK:阻塞直到有数据过期。
  • ERROR:立即抛出TopicOverloadException异常。

配置可靠主题

可靠主题的配置参数有以下四项:

  • statistics-enabled:默认值true,是否开启统计。
  • message-listener:事件监听器。
  • read-batch-size:批量读大小,默认10。
  • topic-overload-policy:事件超过容量限制时的处理策略。

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

分布式事件

如果想感知某些事件,可以向Hazelcast实例注册监听器。
监听器在集群范围内有效,向集群内的一个成员注册监听器,实际上是向集群内所有的成员注册了监听器。
新加入成员产生的事件也会被发送给监听器。
Hazelcast只生成注册了监听器的事件。
如果在注册监听器时提供了谓词,事件在发送给监听器时首先要通过谓词,只有通过谓词的事件才能最终发送给监听器。

集群事件

  • 成员关系监听器MembershipListener
  • 分布式对象监听器DistributedObjectListener
  • 迁移监听器MigrationListener
  • 分区监听器PartitionLostListener
  • 生命周期监听器LifecycleListener
  • 客户端监听器ClientListener
  • 分布式对象Map事件监听器EntryAddedListener MapClearedListener MapPartitionLostListener
  • 分布式对象MultiMap事件监听器EntryListener
  • 分布式对象IQueue,ISet和IList元素监听器ItemListener
  • ITopic消息监听器MessageListener

分布式对象监听器属性:

  • include-value,事件是否包含value,默认true
  • local,是否只监听本地事件,默认值false

配置方式

  • 直接配置Hazelcast实例
  • 配置Hazelcast Config
  • Hazelcast配置文件

后两种方式可保证创建实例和注册监听之间的事件不会丢失。

全局事件配置

  • hazelcast.event.queue.capacity:默认值1000000
  • hazelcast.event.queue.timeout.millis:默认值 250
  • hazelcast.event.thread.count:默认值 5

如果事件队列达到容量限制(hazelcast.event.queue.capacity),最后一个事件无法在hazelcast.event.queue.timeout.millis内插入事件队列,这些事件将会被丢弃并发出一个警告信息EventQueue overload

如果监听器执行的计算非常耗时,这有可能导致事件队列达到容量限制并丢失事件。
因此必须把负载重的工作提交到其他线程中处理

对于Hazelcast中的所有事件,对于给定的key,可以保证事件生成的顺序和事件发布的顺序一致。
对于map和multimap来说,对于同一个key的操作顺序可以保证。
对于list,set,topic和queue,事件的顺序和操作的顺序一致。

from blog.

CharLemAznable avatar CharLemAznable commented on August 30, 2024

分布式计算

Executor Service

Hazelcast提供了IExecutorService用于在分布式环境中执行异步任务,并且实现了java.util.concurrent.ExecutorService接口,为需要计算和数据处理能力的应用程序提供服务。
因为任务都被分布式执行,因此任务必须支持序列化以支持在不同JVM之间通过网络传输。

实现Callable Task

在Hazelcast中,实现一个类似java.util.concurrent.Callable的任务需要实现两个接口:CallableSerializable

在Hazelcast中执行一个Callable Task的执行流程如下:

  • HazelcastInstance获取Executor
  • 提交task并拿到返回值Future
  • 当任务执行完成,使用Future对象获取结果。
实现Runnable Task

和Callable Task类似,Hazelcast中的Runnable Task需要实现两个接口:RunnableSerializable

在Hazelcast中执行Runnable Task只需要以下两步:

  • HazelcastInstance获取Executor
  • 将task提交到Executor
扩容Executor Service

垂直扩容:

  • pool-size
  • CPU
  • 内存

水平扩容:

  • 增加Hazelcast的数量
Executing Code in the Cluster

Hazelcast的IExecutorService允许在Hazelcast集群执行代码。
执行场景主要有:

  • 在指定member执行:IExecutorService#submitToMember
  • 在拥有指定key的member执行:IExecutorService#submitToKeyOwner
  • 随机选择一个member执行:IExecutorService#submit
  • 在所有member执行:IExecutorService#submitToAllMembers
取消任务

在Hazelcast中可以使用标准的Java APIFuture#cancel来停止或取消任务。

任务回调

可以使用Hazelcast提供的ExecutionCallback实现在任务完成时的异步回调。
任务完成且没有错误时回调,需要实现onResponse方法,任务完成但是有错误回调,需要实现onFailure方法。

class CallbackTask : Callable<String>, Serializable {
    override fun call(): String {
        TimeUnit.SECONDS.sleep(10)
        return "callback"
    }
}

class TaskCallback : ExecutionCallback<String> {
    override fun onFailure(t: Throwable?) {
        println("failure ${t?.printStackTrace()}")
    }

    override fun onResponse(response: String?) {
        println("response: $response")
    }
}

fun main() = runBlocking<Unit> {
    val instance = Hazelcast.newHazelcastInstance()
    val executor = instance.getExecutorService("default")
    executor.submit(CallbackTask(), TaskCallback())
}
为Task选择members

如果想控制执行任务的member,可以使用MemberSelector接口,该接口的select(Member)方法会在每一个member上执行已确认当前member是否可以执行任务。

Hazelcast提供了四个默认的MemberSelector实例:DATA_MEMBER_SELECTORLITE_MEMBER_SELECTORLOCAL_MEMBER_SELECTORNON_LOCAL_MEMBER_SELECTOR

配置Executor Service

Hazelcast支持对线程池大小,任务队列大小,是否开启统计和脑裂保护进行配置。
Hazelcast支持以配置文件或代码方式配置Executor Service。

脑裂保护

受脑裂保护的操作:

  • execute
  • executeOnAllMembers
  • executeOnKeyOwner
  • executeOnMember
  • executeOnMembers
  • shutdown
  • shutdownNow
  • submit
  • submitToAllMembers
  • submitToKeyOwner
  • submitToMember
  • submitToMembers

Durable Executor Service

Durable Executor Service是一种将任务存储在执行member及备份member,保证在member或任务提交者故障的场景中任务不丢失的数据结构。

执行任务的步骤:

  • 将任务发送到主分区member及备份member,然后执行任务。
  • 获取任务执行结果。

Hazelcast将任务存储在主备member之后然后执行任务,保证了在返回Future之前任务已经被执行,并且可以使用一个唯一的ID跟踪已提交任务的结果。

Hazelcast使用Ringbuffer存储任务并生成一个任务对应的序列返回调用端。
客户端从序列中获取任务的执行结果。如果结果已经可用,则将立即返回,否则将一直等待直到收到队列通知。
任务执行完成,Ringbuffer会用任务的结果替换序列中的任务,并通知等待在序列上的操作。

配置Durable Executor Service

Hazelcast支持以配置文件或代码方式配置Durable Executor Service。

配置各参数的含义如下:

  • name:executor的名字。
  • pool-size:executor线程池大小。
  • durability:任务备份数,默认值1。
  • capacity:Executor 的队列大小。
  • split-brain-protection-ref:脑裂保护配置的名字。
脑裂保护

支持脑裂保护的操作:

  • disposeResult
  • execute
  • executeOnKeyOwner
  • retrieveAndDisposeResult
  • shutdown
  • shutdownNow
  • submit
  • submitToKeyOwner
  • retrieveResult

Scheduled Executor Service

Hazelcast的IScheduledExecutorService是部分实现了java.util.concurrent.ScheduledExecutorService接口的数据结构。
部分实现指的是IScheduledExecutorService仅支持以_固定速率_执行任务而不支持以_固定间隔_执行任务。

IScheduledExecutorService除去支持通用的调度方法,还支持以下额外的方法:

  • scheduleOnMember:在一个特定的member执行。
  • scheduleOnKeyOwner:拥有key的member执行。
  • scheduleOnAllMembers:在集群所有member执行。
  • scheduleOnAllMembers:在所有给定的member上执行。

有两种可用调度模式:

  • 可用分区调度。任务存储在主分区和N个备份分区。在member故障场景,会有一个或多个备份接管任务,这可能会导致任务延迟。
  • 可用member调度。任务只存储在member上,如果member故障任务会丢失。

实现NamedTask可以自定义任务名,如果未定义则使用一个随机的UUID作为名字。
通过返回的IScheduleFuture可以获取任务的handler和任务的运行时统计信息。
每一个任务都有一个Future与之关联,任务完成可以调用Future#dispose方法释放任务占用的资源,或取消即将执行的任务。
任务handler是一个用来存储IScheduleFuture信息的描述类,包括任务的名字、任务的所有者和scheduler的名字,使用这些信息可以定位集群中的任务。
Scheduled Executor Service支持有状态任务的调度。任务状态必须与任务一起持久化。可以通过实现StatefulTask接口来创建一个有状态任务,实现还需要提供存储和加载状态的方法。如果分区丢失,任务被重新调度并在执行之前需要重新加载之前存储的状态。

配置Scheduled Executor Service

Hazelcast支持以配置文件或代码方式配置Scheduled Executor Service。

配置各参数的含义如下:

  • name:executor的名字。
  • pool-size:executor线程池大小。
  • durability:executor的备份数。
  • capacity:Executor 的队列大小。
  • split-brain-protection-ref:脑裂保护配置的名字。
脑裂保护

支持脑裂保护的操作:

  • schedule
  • scheduleAtFixedRate
  • scheduleOnAllMembers
  • scheduleOnAllMembersAtFixedRate
  • scheduleOnKeyOwner
  • scheduleOnKeyOwnerAtFixedRate
  • scheduleOnMember
  • scheduleOnMemberAtFixedRate
  • scheduleOnMembers
  • scheduleOnMembersAtFixedRate
  • shutdown
  • getAllScheduledFutures

Entry Processor

Entry Processor是在map entry上原子执行函数的一种方法。
IMap上执行批量处理,应该考虑一下使用entry processor。entry processor在存储数据的member上执行数据的读写操作,避免了不必要的两次网络请求:获取值和修改值并写回。
entry processor可以内存中执行更快的IMap操作而无需担心并发问题。
entry processor选择entry时支持使用谓词。
使用entry processor无需显示的锁操作,Hazelcast将entry processor分发到各个成员执行,因此增加成员会提高处理效率。
entry processor支持索引,如果map已经创建索引,entry processor会自动使用索引。
如果entry processor是map的主要操作而且map包含复杂对象,应该考虑使用OJBECT的内存格式来减少序列化开销。

IMap接口提供了以下方法处理entry:

  • executeOnKey
  • executeOnKeys
  • submitToKey
  • executeOnEntries(EntryProcessor)
  • executeOnEntries(EntryProcessor, Predicate)

entry processor在处理单个key时会使用锁,包括executeOnKeysubmitToKey

通常情况下如果代码修改了数据,备份数据也需要修改。如果希望在备份节点执行不同的处理逻辑,需要覆写getBackupProcessor方法。方法返回一个在备节点执行的EntryProcessor实例。方法可以返回null,以表示在备节点不执行任何操作。

创建Entry Processor

实现EntryProcessor接口,主备entry都会执行process方法。

Entry Processor性能优化

entry processor是以假定用户的代码可以在process方法内快速执行为前提进行设计。

Hazelcast提供了慢代码检测器,并可以根据下面的配置记录告警日志:

  • hazelcast.slow.operation.detector.enabled (默认值true)
  • hazelcast.slow.operation.detector.threshold.millis (默认值: 10000)
    在开发过程中应该将值设置的更低以检测在生成环境中可能成为瓶颈的processor。

Hazelcast提供的优化建议:

  • Offloadable 使用executor线程而不是分区线程执行。
  • ReadOnly 避免获取key的锁。
Offloadable Entry Processor

如果一个entry processor实现了Offloadable接口,process方法的执行将由getExecutorName获取的executor执行。
Offloading不阻塞分区线程,在处理期间key将被锁住以避免写冲突。
线程之间的合作关系如下:

  • partition thread获取entry并锁住key。
  • execution thread执行processor的process方法。
  • partition thread更新值并解锁key,如果值没有更新则只解锁key。

Hazelcast已经提供了Offloadable接口的两个实现:

  • NO_OFFLOADING:和没有实现Offloadable接口一样。
  • OFFLOADABLE_EXECUTOR:使用默认的ExecutionService.OFFLOADABLE_EXECUTOR

如果getExecutorName无法找到executor,将会使用默认的executor:

<hazelcast>
    ...
    <executor-service name="default">
        <pool-size>16</pool-size>
        <queue-capacity>0</queue-capacity>
    </executor-service>
    ...
</hazelcast>
只读Entry Processor

默认如果一个key上有锁entry processor则不会执行,直到key上的锁被释放为止。但是如果entry processor实现了Readonly接口,entry processor则不会尝试做任何修改,即不会感知key上是否有锁,也不会尝试获取key的锁。如果尝试修改entry,会抛出 UnsupportedOperationException异常。

如果一个entry processor实现了Readonly接口但是并未实现Offloadable接口,entry的处理不会委托给其他executor。

如果一个entry processor实现了ReadOnlyOffloadable两个接口,processor的process方法会在自定义的executor中执行,而且不感知key上的锁也不尝试获取锁。
线程之间的合作关系如下:

  • partition thread获取entry。
  • execution thread处理entry。
    在此情况下EntryProcessor#getBackupProcessor必须返回null否则会抛出IllegalArgumentException异常,修改entry会抛出UnsupportedOperationException异常。

from blog.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.