3

Hazelcast服务及注册中心实现

 2 years ago
source link: https://perkins4j2.github.io/posts/5377900/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

分区和复制

//backup
new MapConfig()
.setBackupCount(0)
.setAsyncBackupCount(1)
.setReadBackupData(true)

默认情况是同步复制一份,以上为异步复制1份,也可同步+异步。

  • 分区采用一致性Hash算法,便于节点变更复制
  • 计算分区,MOD(hash result, partition count) ,其中将key取byte数组,再取hash值,然后对分区默认271数取模,则为分区位置
  • 如果不指定备份数,则没有复制,节点下线后会丢失数据
  • 若有备份,则节点数据对等,单节点宕机,备份会成为查询节点
  • 若有备份,新增节点时,分区数据因一致性算法则最小化复制扩容
  • 读写操作时,会根据key将请求分发至该分区
  • 区分表由最早启动的节点告知其他节点,包括后续的变更通知以便重新分区,通知周期默认15s
  • 同步复制可防止脏数据,性能不如异步
  • 备份数据也可提供读,节省查询其他节点的分区的网络开销,但可能存在脏数据,且不计入主分区的命中次数,也不影响空闲过期时间
  • 通过MapStore实现单个、批量的读写持久化数据,MapStore继承了LoadStore。
  • 持久化应是中心化数据库,因为多个节点情况下,查询执行在主分区上,导致不同节点的数据可能不一致;在节点变化时,主分区变更至其他节点,则单点持久化数据失效。

    读时持久化

    如果配置和开启MapStore, IMap.get分区查询内存数据不存在时,调用MapStore.load查询并加载到内存;但不会新增备份。

    写时持久化

    write-delay-seconds为0时,map.put则MapStore.store同步写入数据至持久化,且可同步备份。

    写后持久化

  • write-delay-seconds大于0,map.put异步延迟持久化数据。
  • 该数据会标记为脏数据,存入队列中,直至被持久化。
  • 当write-coalescing即保留所有变更时,需要设置hazelcast.map.write.behind.queue.capacity以防脏数据在内存中过多,导致OOM。

Map及NearCache配置

Config config = new Config();

config.setClusterName("cluster");

//Network
config.setNetworkConfig(new NetworkConfig()
.setJoin(new JoinConfig()
.setMulticastConfig(
new MulticastConfig()
.setEnabled(false))
.setEurekaConfig(
new EurekaConfig()
.setEnabled(true)
.setProperty("self-registration", "true")
.setProperty("namespace", "hazelcast"))
)
);

//Serialize
config.setSerializationConfig(
new SerializationConfig()
.addDataSerializableFactory(SerializableFactory.FACTORY_ID, new SerializableFactory())
);

//backpressure
config.setProperty("hazelcast.backpressure.enabled", "true");
config.setProperty("hazelcast.backpressure.max.concurrent.invocations.per.partition", "100");
config.setProperty("hazelcast.backpressure.syncwindow", "500");
config.setProperty("hazelcast.backpressure.backoff.timeout.millis", "60000");
config.setProperty("hazelcast.operation.backup.timeout.millis", "60000");
config.setProperty("hazelcast.client.max.concurrent.invocations", "200");

//thread 3*3(accept request+read data from other+write data to other)
config.setProperty("hazelcast.io.thread.count", "3");
config.setProperty("hazelcast.operation.thread.count", "100");

//slow invalidation
config.setProperty("hazelcast.slow.operation.detector.enabled", "true");
config.setProperty("hazelcast.slow.operation.detector.stacktrace.logging.enabled", "true");
config.setProperty("hazelcast.slow.operation.detector.log.purge.interval.seconds", "600");
config.setProperty("hazelcast.slow.operation.detector.log.retention.seconds", "600");
config.setProperty("hazelcast.slow.operation.detector.threshold.millis", "300");

//near cache invalidation
config.setProperty("hazelcast.map.invalidation.batch.enabled", "true");
config.setProperty("hazelcast.map.invalidation.batch.size", "100");
config.setProperty("hazelcast.map.invalidation.batchfrequency.seconds", "5");

//当前 write-coalescing is false即保留全部变更, set per-node maximum capacity
config.setProperty("hazelcast.map.write.behind.queue.capacity", "5000");

//map config
List<CacheConfig> list = new ArrayList<>();
list.add(new CacheConfig("cache", 900, 50000));

list.forEach(e -> {
config.addMapConfig(new MapConfig()
.setName(e.getName())
.setInMemoryFormat(InMemoryFormat.BINARY)
.setMetadataPolicy(MetadataPolicy.OFF)
//backup
.setBackupCount(0)
.setAsyncBackupCount(1)
//timeout
.setTimeToLiveSeconds(e.getTimeout())
.setMaxIdleSeconds(e.getTimeout() / 2)
//evict
.setEvictionConfig(new EvictionConfig()
.setEvictionPolicy(EvictionPolicy.LFU)
.setMaxSizePolicy(MaxSizePolicy.PER_NODE)
.setSize(e.getSize()))
//backup read
.setReadBackupData(true)
//partition
.setPartitioningStrategyConfig(new PartitioningStrategyConfig()
.setPartitioningStrategy(new StringPartitioningStrategy()))
//near cache
.setNearCacheConfig(new NearCacheConfig()
.setName(e.getName() + "_nc")
.setInMemoryFormat(InMemoryFormat.OBJECT)
.setCacheLocalEntries(true)
.setInvalidateOnChange(true)
.setLocalUpdatePolicy(NearCacheConfig.LocalUpdatePolicy.INVALIDATE)
.setMaxIdleSeconds(e.getTimeout() / 10)
.setEvictionConfig(new EvictionConfig()
.setEvictionPolicy(EvictionPolicy.LFU)
.setMaxSizePolicy(MaxSizePolicy.ENTRY_COUNT)
.setSize(e.getSize() / 100))
)
//map store
.setMapStoreConfig(new MapStoreConfig()
.setEnabled(true)
//保留最新,而不是全部变更
.setWriteCoalescing(true)
)
);


});

return Hazelcast.newHazelcastInstance(config);

Eureka注册中心

config.setNetworkConfig(new NetworkConfig()
.setJoin(new JoinConfig()
.setMulticastConfig(
new MulticastConfig()
.setEnabled(false))
.setEurekaConfig(
new EurekaConfig()
.setEnabled(true)
.setProperty("self-registration", "true")
.setProperty("namespace", "hazelcast"))
)
);

eureka-client.properties,默认配置名称,路径在classpath下。

hazelcast.shouldUseDns=false
hazelcast.name=hz-instance
hazelcast.serviceUrl.default=,http://192.168.1.100:8000/eureka/

以上可实现复用eureka配置中心,实现各hazelcast节点同步,无需广播。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK