序
本文主要研究一下sentinel的DataSource
DataSource
sentinel-datasource-extension-0.1.1-sources.jar!/com/alibaba/csp/sentinel/datasource/DataSource.java
public interface DataSource{ /** * Load data data source as the target type. * * @return the target data. * @throws Exception */ T loadConfig() throws Exception; /** * Read original data from the data source. * * @return the original data. * @throws Exception */ S readSource() throws Exception; /** * Get {@link SentinelProperty} of the data source. * * @return the property. */ SentinelPropertygetProperty(); /** * Write the {@code values} to the data source. * * @param values * @throws Exception */ void writeDataSource(T values) throws Exception; /** * Close the data source. * * @throws Exception */ void close() throws Exception;}复制代码
- 定义了loadConfig、readSource、writeDataSource等方法
- 有个抽象子类AbstractDataSource
AbstractDataSource
sentinel-datasource-extension-0.1.1-sources.jar!/com/alibaba/csp/sentinel/datasource/AbstractDataSource.java
public abstract class AbstractDataSourceimplements DataSource{ protected final ConfigParserparser; protected final SentinelPropertyproperty; public AbstractDataSource(ConfigParser parser) { if (parser == null) { throw new IllegalArgumentException("parser can't be null"); } this.parser = parser; this.property = new DynamicSentinelProperty(); } @Override public T loadConfig() throws Exception { S readValue = readSource(); T value = parser.parse(readValue); return value; } public T loadConfig(S conf) throws Exception { T value = parser.parse(conf); return value; } @Override public SentinelProperty getProperty() { return property; } @Override public void writeDataSource(T values) throws Exception { throw new UnsupportedOperationException(); }}复制代码
- 定义了ConfigParser属性,使用它来解析数据源
- 它有一个抽象子类为AutoRefreshDataSource
AutoRefreshDataSource
sentinel-datasource-extension-0.1.1-sources.jar!/com/alibaba/csp/sentinel/datasource/AutoRefreshDataSource.java
/** * A {@link DataSource} automatically fetches the backend data. * * @paramsource data type * @paramtarget data type * @author Carpenter Lee */public abstract class AutoRefreshDataSource extends AbstractDataSource{ private ScheduledExecutorService service; protected long recommendRefreshMs = 3000; public AutoRefreshDataSource(ConfigParserconfigParser) { super(configParser); startTimerService(); } public AutoRefreshDataSource(ConfigParserconfigParser, final long recommendRefreshMs) { super(configParser); if (recommendRefreshMs <= 0) { throw new IllegalArgumentException("recommendRefreshMs must > 0, but " + recommendRefreshMs + " get"); } this.recommendRefreshMs = recommendRefreshMs; startTimerService(); } private void startTimerService() { service = Executors.newScheduledThreadPool(1, new NamedThreadFactory("sentinel-datasource-auto-refresh-task", true)); service.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { T newValue = loadConfig(); getProperty().updateValue(newValue); } catch (Throwable e) { RecordLog.info("loadConfig exception", e); } } }, recommendRefreshMs, recommendRefreshMs, TimeUnit.MILLISECONDS); } @Override public void close() throws Exception { if (service != null) { service.shutdownNow(); service = null; } }}复制代码
- 创建了ScheduledExecutorService,然后定时调度执行loadConfig方法,然后将获取的数据更新到property
- 它有一个子类为FileRefreshableDataSource
FileRefreshableDataSource
sentinel-datasource-extension-0.1.1-sources.jar!/com/alibaba/csp/sentinel/datasource/FileRefreshableDataSource.java
/** ** A {@link DataSource} based on file. This class will automatically fetches the backend file every 3 seconds. *
** Limitations: default read buffer size is 1MB, if file size is greater than buffer size, exceeding bytes will * be ignored. Default charset is UTF8. *
* * @author Carpenter Lee */public class FileRefreshableDataSourceextends AutoRefreshDataSource { private static final int MAX_SIZE = 1024 * 1024 * 4; private static final long DEFAULT_REFRESH_MS = 3000; private static final int DEFAULT_BUF_SIZE = 1024 * 1024; private static final Charset DEFAULT_CHAR_SET = Charset.forName("utf-8"); private byte[] buf; private Charset charset; private File file; /** * Create a file based {@link DataSource} whose read buffer size is 1MB, charset is UTF8, * and read interval is 3 seconds. * * @param file the file to read. * @param configParser the config parser. */ public FileRefreshableDataSource(File file, ConfigParser configParser) throws FileNotFoundException { this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); } public FileRefreshableDataSource(String fileName, ConfigParser configParser) throws FileNotFoundException { this(new File(fileName), configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, DEFAULT_CHAR_SET); //System.out.println(file.getAbsoluteFile()); } public FileRefreshableDataSource(File file, ConfigParser configParser, int bufSize) throws FileNotFoundException { this(file, configParser, DEFAULT_REFRESH_MS, bufSize, DEFAULT_CHAR_SET); } public FileRefreshableDataSource(File file, ConfigParser configParser, Charset charset) throws FileNotFoundException { this(file, configParser, DEFAULT_REFRESH_MS, DEFAULT_BUF_SIZE, charset); } public FileRefreshableDataSource(File file, ConfigParser configParser, long recommendRefreshMs, int bufSize, Charset charset) throws FileNotFoundException { super(configParser, recommendRefreshMs); if (bufSize <= 0 || bufSize > MAX_SIZE) { throw new IllegalArgumentException("bufSize must between (0, " + MAX_SIZE + "], but " + bufSize + " get"); } if (file == null) { throw new IllegalArgumentException("file can't be null"); } if (charset == null) { throw new IllegalArgumentException("charset can't be null"); } this.buf = new byte[bufSize]; this.file = file; this.charset = charset; firstLoad(); } private void firstLoad() { try { T newValue = loadConfig(); getProperty().updateValue(newValue); } catch (Throwable e) { RecordLog.info("loadConfig exception", e); } } @Override public String readSource() throws Exception { FileInputStream inputStream = null; try { inputStream = new FileInputStream(file); FileChannel channel = inputStream.getChannel(); if (channel.size() > buf.length) { throw new RuntimeException(file.getAbsolutePath() + " file size=" + channel.size() + ", is bigger than bufSize=" + buf.length + ". Can't read"); } int len = inputStream.read(buf); return new String(buf, 0, len, charset); } finally { if (inputStream != null) { try { inputStream.close(); } catch (Exception ignore) { } } } } @Override public void close() throws Exception { super.close(); buf = null; } @Override public void writeDataSource(T values) throws Exception { throw new UnsupportedOperationException(); }}复制代码
- 从文件读取数据,但是writeDataSource目前还不支持
小结
- sentinel-datasource-extension默认提供FileRefreshableDataSource,另外有zookeeper、nacos、appllo的扩展实现。
- 如果要自己扩展的话,使用拉模式直接继承AutoRefreshDataSource实现readSource();推模式的话直接继承AbstractDataSource,自己构造监听方法实现readSource()