settings.py (15166B)
1 # standard imports 2 import logging 3 import os 4 import uuid 5 import importlib 6 import tempfile 7 8 # external imports 9 from chainlib.settings import ChainSettings 10 from chainlib.eth.connection import EthHTTPConnection 11 from chainsyncer.settings import * 12 from eth_monitor.chain import EthChainInterface 13 from chainlib.eth.address import is_address 14 from eth_cache.rpc import CacheRPC 15 from eth_cache.store.file import FileStore 16 from chainsyncer.settings import process_sync_range 17 18 19 # local imports 20 from eth_monitor.rules import ( 21 AddressRules, 22 RuleSimple, 23 RuleMethod, 24 RuleData, 25 RuleHash, 26 ) 27 from eth_monitor.cli.rules import to_config_names 28 from eth_monitor.callback import ( 29 state_change_callback, 30 filter_change_callback, 31 BlockCallbackFilter, 32 ) 33 from eth_monitor.filters import RuledFilter 34 from eth_monitor.filters.cache import Filter as CacheFilter 35 from eth_monitor.config import override, list_from_prefix 36 from eth_monitor.filters.out import OutFilter 37 from eth_monitor.filters.block import Filter as BlockFilter 38 from eth_monitor.filters.run import Filter as RunFilter 39 from eth_monitor.cache import from_cache_spec 40 41 logg = logging.getLogger(__name__) 42 43 44 def process_monitor_session(settings, config): 45 session_id = config.get('_SESSION_ID') 46 if session_id == None: 47 if config.get('_SINGLE'): 48 session_id = str(uuid.uuid4()) 49 else: 50 session_id = 'default' 51 52 settings.set('SESSION_ID', session_id) 53 settings.set('SESSION_OK', True) 54 return settings 55 56 57 def process_monitor_rundir(settings, config): 58 settings.set('RUN_OUT', False) 59 if config.get('_RUN_DIR') == None: 60 return settings 61 62 run_dir = config.get('_RUN_DIR') 63 try: 64 os.makedirs(run_dir, exist_ok=True) 65 except Exception as e: 66 logg.error('could not create run dir, deactivating run output: ' + str(e)) 67 return settings 68 69 lockfile = os.path.join(run_dir, '.lock') 70 try: 71 f = open(lockfile, 'x') 72 f.close() 73 except FileExistsError: 74 logg.error('run dir {} is already in use, deactivating run output'.format(run_dir)) 75 return settings 76 77 settings.set('RUN_OUT', True) 78 settings.set('RUN_DIR', run_dir) 79 return settings 80 81 82 def process_monitor_session_dir(settings, config): 83 syncer_store_module = None 84 syncer_store_class = None 85 sync_store = None 86 session_id = settings.get('SESSION_ID') 87 state_dir = None 88 if config.get('SYNCER_BACKEND') == 'mem': 89 syncer_store_module = importlib.import_module('chainsyncer.store.mem') 90 syncer_store_class = getattr(syncer_store_module, 'SyncMemStore') 91 sync_store = syncer_store_class( 92 session_id=session_id, 93 state_event_callback=state_change_callback, 94 filter_state_event_callback=filter_change_callback, 95 ) 96 97 else: 98 if config.get('SYNCER_BACKEND') == 'fs': 99 syncer_store_module = importlib.import_module('chainsyncer.store.fs') 100 syncer_store_class = getattr(syncer_store_module, 'SyncFsStore') 101 elif config.get('SYNCER_BACKEND') == 'rocksdb': 102 syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb') 103 syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore') 104 else: 105 syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND')) 106 syncer_store_class = getattr(syncer_store_module, 'SyncStore') 107 state_dir = os.path.join(config.get('ETHMONITOR_STATE_DIR'), config.get('SYNCER_BACKEND')) 108 os.makedirs(state_dir, exist_ok=True) 109 session_dir = os.path.join(state_dir, session_id) 110 sync_store = syncer_store_class( 111 session_dir, 112 session_id=session_id, 113 state_event_callback=state_change_callback, 114 filter_state_event_callback=filter_change_callback, 115 ) 116 settings.set('SESSION_DIR', session_dir) 117 118 logg.info('using engine {} moduleĀ {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__)) 119 120 settings.set('STATE_DIR', state_dir) 121 settings.set('SYNC_STORE', sync_store) 122 123 return settings 124 125 126 def process_address_arg_rules(settings, config): 127 rules = settings.get('RULES') 128 category = { 129 'input': { 130 'i': [], 131 'x': [], 132 }, 133 'output': { 134 'i': [], 135 'x': [], 136 }, 137 'exec': { 138 'i': [], 139 'x': [], 140 }, 141 'hash': { 142 'i': [], 143 'x': [], 144 }, 145 } 146 for rules_arg in [ 147 'input', 148 'output', 149 'exec', 150 ]: 151 (vy, vn) = to_config_names(rules_arg) 152 for address in config.get(vy): 153 if not is_address(address): 154 raise ValueError('invalid address in config {}: {}'.format(vy, address)) 155 category[rules_arg]['i'].append(address) 156 for address in config.get(vn): 157 if not is_address(address): 158 raise ValueError('invalid address in config {}: {}'.format(vn, address)) 159 category[rules_arg]['x'].append(address) 160 161 includes = RuleSimple( 162 category['output']['i'], 163 category['input']['i'], 164 category['exec']['i'], 165 description='INCLUDE', 166 match_all=settings.get('MATCH_ALL'), 167 ) 168 rules.include(includes) 169 170 excludes = RuleSimple( 171 category['output']['x'], 172 category['input']['x'], 173 category['exec']['x'], 174 description='EXCLUDE', 175 ) 176 rules.exclude(excludes) 177 178 return settings 179 180 181 def process_data_arg_rules(settings, config): 182 rules = settings.get('RULES') 183 184 include_data = [] 185 for v in config.get('ETHMONITOR_DATA'): 186 include_data.append(v.lower()) 187 exclude_data = [] 188 for v in config.get('ETHMONITOR_X_DATA'): 189 exclude_data.append(v.lower()) 190 191 includes = RuleMethod(include_data, description='INCLUDE') 192 rules.include(includes) 193 194 excludes = RuleMethod(exclude_data, description='EXCLUDE') 195 rules.exclude(excludes) 196 197 include_data = [] 198 for v in config.get('ETHMONITOR_DATA_IN'): 199 include_data.append(v.lower()) 200 exclude_data = [] 201 for v in config.get('ETHMONITOR_X_DATA_IN'): 202 exclude_data.append(v.lower()) 203 204 includes = RuleData(include_data, description='INCLUDE', match_all=settings.get('MATCH_ALL')) 205 rules.include(includes) 206 207 excludes = RuleData(exclude_data, description='EXCLUDE') 208 rules.exclude(excludes) 209 210 return settings 211 212 213 def process_tx_hash_rules(settings, config): 214 rules = settings.get('RULES') 215 216 include_tx = [] 217 for v in config.get('ETHMONITOR_TXHASH'): 218 include_tx.append(v.lower()) 219 exclude_tx = [] 220 for v in config.get('ETHMONITOR_X_TXHASH'): 221 exclude_tx.append(v.lower()) 222 223 includes = RuleHash(include_tx, description='INCLUDE') 224 rules.include(includes) 225 226 excludes = RuleHash(exclude_tx, description='EXCLUDE') 227 rules.exclude(excludes) 228 229 return settings 230 231 232 def process_address_file_rules(settings, config): #rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False): 233 rules = settings.get('RULES') 234 includes_file = config.get('ETHMONITOR_INCLUDES_FILE') 235 if includes_file != None: 236 f = open(includes_file, 'r') 237 logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file))) 238 while True: 239 r = f.readline() 240 if r == '': 241 break 242 r = r.rstrip() 243 v = r.split("\t") 244 245 sender = [] 246 recipient = [] 247 executable = [] 248 249 try: 250 if v[0] != '': 251 sender = v[0].split(',') 252 except IndexError: 253 pass 254 255 try: 256 if v[1] != '': 257 recipient = v[1].split(',') 258 except IndexError: 259 pass 260 261 try: 262 if v[2] != '': 263 executable = v[2].split(',') 264 except IndexError: 265 pass 266 267 rule = RuleSimple(sender, recipient, executable, match_all=settings.get('MATCH_ALL')) 268 rules.include(rule) 269 270 excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE') 271 if excludes_file != None: 272 f = open(includes_file, 'r') 273 logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file))) 274 while True: 275 r = f.readline() 276 if r == '': 277 break 278 r = r.rstrip() 279 v = r.split("\t") 280 281 sender = None 282 recipient = None 283 executable = None 284 285 if v[0] != '': 286 sender = v[0].strip(',') 287 if v[1] != '': 288 recipient = v[1].strip(',') 289 if v[2] != '': 290 executable = v[2].strip(',') 291 292 rule = RuleSimple(sender, recipient, executable) 293 rules.exclude(rule) 294 return settings 295 296 297 def process_arg_rules(settings, config): 298 address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT')) 299 settings.set('MATCH_ALL', config.true('ETHMONITOR_MATCH_ALL')) 300 settings.set('RULES', address_rules) 301 settings = process_address_arg_rules(settings, config) 302 settings = process_data_arg_rules(settings, config) 303 settings = process_address_file_rules(settings, config) 304 settings = process_tx_hash_rules(settings, config) 305 return settings 306 307 308 def process_cache_store(settings, config): 309 cache_spec = config.get('_CACHE_SPEC') 310 store = from_cache_spec(settings.get('CHAIN_SPEC'), cache_spec) 311 if store == None: 312 cache_dir = config.get('_CACHE_DIR') 313 if cache_dir == None: 314 logg.warning('no cache dir specified, will discard everything!!') 315 from eth_cache.store.null import NullStore 316 store = NullStore() 317 else: 318 store = FileStore(settings.get('CHAIN_SPEC'), cache_dir) 319 cache_dir = os.path.realpath(cache_dir) 320 if cache_dir == None: 321 import tempfile 322 cache_dir = tempfile.mkdtemp() 323 324 logg.info('using cache store {}'.format(store)) 325 settings.set('CACHE_STORE', store) 326 327 return settings 328 329 330 def process_cache_filter(settings, config): 331 cache_store = settings.get('CACHE_STORE') 332 cache_rules = AddressRules(include_by_default=True) 333 if str(cache_store) != 'Nullstore': 334 cache_rules = settings.o['RULES'] 335 fltr = CacheFilter(cache_store, rules_filter=cache_rules, include_tx_data=config.true('ETHCACHE_STORE_TX')) 336 sync_store = settings.get('SYNC_STORE') 337 sync_store.register(fltr) 338 339 fltr = BlockFilter(cache_store, include_block_data=config.true('ETHCACHE_STORE_BLOCK')) 340 hndlr = settings.get('BLOCK_HANDLER') 341 hndlr.register(fltr) 342 343 return settings 344 345 346 def process_run_filter(settings, config): 347 if not settings.get('RUN_OUT'): 348 return settings 349 fltr = RunFilter(settings.get('RUN_DIR')) 350 hndlr = settings.get('BLOCK_HANDLER') 351 hndlr.register(fltr) 352 return settings 353 354 def process_tx_filter(settings, config): 355 for fltr in list_from_prefix(config, 'filter'): 356 m = importlib.import_module(fltr) 357 fltr_object = m.Filter(rules_filter=settings.get('RULES')) 358 store = settings.get('SYNC_STORE') 359 store.register(fltr_object) 360 logg.info('using filter module {}'.format(fltr)) 361 return settings 362 363 364 def process_block_filter(settings, config): 365 block_filter_handler = BlockCallbackFilter() 366 for block_filter in list_from_prefix(config, 'block_filter'): 367 m = importlib.import_module(block_filter) 368 block_filter_handler.register(m) 369 logg.info('using block filter module {}'.format(block_filter)) 370 371 settings.set('BLOCK_HANDLER', block_filter_handler) 372 return settings 373 374 375 def process_out_filter(settings, config): 376 out_filter = OutFilter( 377 settings.o['CHAIN_SPEC'], 378 rules_filter=settings.o['RULES'], 379 renderers=settings.o['RENDERER'], 380 ) 381 store = settings.get('SYNC_STORE') 382 store.register(out_filter) 383 return settings 384 385 386 def process_arg_filter(settings, config): 387 store = settings.get('SYNC_STORE') 388 if config.get('ETHMONITOR_FILTER') != None: 389 for k in config.get('ETHMONITOR_FILTER'): 390 m = importlib.import_module(k) 391 fltr = m.Filter() 392 store.register(fltr) 393 return settings 394 395 396 def process_filter(settings, config): 397 settings.set('FILTER', []) 398 settings = process_renderer(settings, config) 399 settings = process_block_filter(settings, config) 400 settings = process_cache_filter(settings, config) 401 settings = process_run_filter(settings, config) 402 settings = process_tx_filter(settings, config) 403 settings = process_out_filter(settings, config) 404 settings = process_arg_filter(settings, config) 405 return settings 406 407 408 def process_renderer(settings, config): 409 renderers_mods = [] 410 for renderer in config.get('ETHMONITOR_RENDERER'): 411 m = importlib.import_module(renderer) 412 renderers_mods.append(m) 413 logg.info('using renderer module {}'.format(renderer)) 414 settings.set('RENDERER', renderers_mods) 415 return settings 416 417 418 def process_cache_rpc(settings, config): 419 if str(settings.get('CACHE_STORE')) == 'Nullstore': 420 logg.debug('cache store is null, cache rpc proxy will be deactivated') 421 return settings 422 if not config.true('_FRESH'): 423 rpc = CacheRPC(settings.get('CONN'), settings.get('CACHE_STORE')) 424 settings.set('CONN', rpc) 425 return settings 426 427 428 def process_sync(settings, config): 429 dialect_filter = settings.get('RPC_DIALECT_FILTER') 430 settings.set('SYNCER_INTERFACE', EthChainInterface(dialect_filter=dialect_filter, batch_limit=settings.get('RPC_BATCH_LIMIT'))) 431 settings = process_sync_range(settings, config) 432 return settings 433 434 435 def process_cache(settings, config): 436 settings = process_cache_store(settings, config) 437 settings = process_cache_rpc(settings, config) 438 return settings 439 440 441 def process_user_context(settings, config): 442 ctx_usr = {} 443 ks = config.get('ETHMONITOR_CONTEXT_KEY') 444 if ks != None: 445 for kv in ks: 446 (k, v) = kv.split('=', 1) 447 ctx_usr[k] = v 448 ctx = { 449 'driver': 'eth-monitor', 450 'rundir': settings.get('RUN_DIR'), 451 'usr': ctx_usr, 452 } 453 settings.set('SYNCER_CONTEXT', ctx) 454 return settings 455 456 457 def process_settings(settings, config): 458 settings = process_monitor_session(settings, config) 459 settings = process_monitor_session_dir(settings, config) 460 settings = process_monitor_rundir(settings, config) 461 settings = process_arg_rules(settings, config) 462 settings = process_sync(settings, config) 463 settings = process_cache(settings, config) 464 settings = process_user_context(settings, config) 465 settings = process_filter(settings, config) 466 return settings