eth-monitor

Monitor and cache ethereum transactions with match filters
git clone git://git.defalsify.org/eth-monitor.git
Info | Log | Files | Refs | README | LICENSE

settings.py (15166B)


      1 # standard imports
      2 import logging
      3 import os
      4 import uuid
      5 import importlib
      6 import tempfile
      7 
      8 # external imports
      9 from chainlib.settings import ChainSettings
     10 from chainlib.eth.connection import EthHTTPConnection
     11 from chainsyncer.settings import *
     12 from eth_monitor.chain import EthChainInterface
     13 from chainlib.eth.address import is_address
     14 from eth_cache.rpc import CacheRPC
     15 from eth_cache.store.file import FileStore
     16 from chainsyncer.settings import process_sync_range
     17 
     18 
     19 # local imports
     20 from eth_monitor.rules import (
     21         AddressRules,
     22         RuleSimple,
     23         RuleMethod,
     24         RuleData,
     25         RuleHash,
     26         )
     27 from eth_monitor.cli.rules import to_config_names
     28 from eth_monitor.callback import (
     29         state_change_callback,
     30         filter_change_callback,
     31         BlockCallbackFilter,
     32         )
     33 from eth_monitor.filters import RuledFilter
     34 from eth_monitor.filters.cache import Filter as CacheFilter
     35 from eth_monitor.config import override, list_from_prefix
     36 from eth_monitor.filters.out import OutFilter
     37 from eth_monitor.filters.block import Filter as BlockFilter
     38 from eth_monitor.filters.run import Filter as RunFilter
     39 from eth_monitor.cache import from_cache_spec
     40 
     41 logg = logging.getLogger(__name__)
     42 
     43 
     44 def process_monitor_session(settings, config):
     45     session_id = config.get('_SESSION_ID')
     46     if session_id == None:
     47         if config.get('_SINGLE'):
     48             session_id = str(uuid.uuid4())
     49         else:
     50             session_id = 'default'
     51     
     52     settings.set('SESSION_ID', session_id)
     53     settings.set('SESSION_OK', True)
     54     return settings
     55 
     56 
     57 def process_monitor_rundir(settings, config):
     58     settings.set('RUN_OUT', False)
     59     if config.get('_RUN_DIR') == None:
     60         return settings
     61 
     62     run_dir = config.get('_RUN_DIR')
     63     try:
     64         os.makedirs(run_dir, exist_ok=True)
     65     except Exception as e:
     66         logg.error('could not create run dir, deactivating run output: ' + str(e))
     67         return settings
     68  
     69     lockfile = os.path.join(run_dir, '.lock')
     70     try:
     71         f = open(lockfile, 'x')
     72         f.close()
     73     except FileExistsError:
     74         logg.error('run dir {} is already in use, deactivating run output'.format(run_dir))
     75         return settings
     76   
     77     settings.set('RUN_OUT', True)
     78     settings.set('RUN_DIR', run_dir)
     79     return settings
     80 
     81 
     82 def process_monitor_session_dir(settings, config):
     83     syncer_store_module = None
     84     syncer_store_class = None
     85     sync_store = None
     86     session_id = settings.get('SESSION_ID')
     87     state_dir = None
     88     if config.get('SYNCER_BACKEND') == 'mem':
     89         syncer_store_module = importlib.import_module('chainsyncer.store.mem')
     90         syncer_store_class = getattr(syncer_store_module, 'SyncMemStore')
     91         sync_store = syncer_store_class(
     92             session_id=session_id,
     93             state_event_callback=state_change_callback,
     94             filter_state_event_callback=filter_change_callback,
     95             )
     96 
     97     else:
     98         if config.get('SYNCER_BACKEND') == 'fs': 
     99             syncer_store_module = importlib.import_module('chainsyncer.store.fs')
    100             syncer_store_class = getattr(syncer_store_module, 'SyncFsStore')
    101         elif config.get('SYNCER_BACKEND') == 'rocksdb':
    102             syncer_store_module = importlib.import_module('chainsyncer.store.rocksdb')
    103             syncer_store_class = getattr(syncer_store_module, 'SyncRocksDbStore')
    104         else:
    105             syncer_store_module = importlib.import_module(config.get('SYNCER_BACKEND'))
    106             syncer_store_class = getattr(syncer_store_module, 'SyncStore')
    107         state_dir = os.path.join(config.get('ETHMONITOR_STATE_DIR'), config.get('SYNCER_BACKEND'))
    108         os.makedirs(state_dir, exist_ok=True)
    109         session_dir = os.path.join(state_dir, session_id)
    110         sync_store = syncer_store_class(
    111                 session_dir,
    112                 session_id=session_id,
    113                 state_event_callback=state_change_callback,
    114                 filter_state_event_callback=filter_change_callback,
    115                 )
    116         settings.set('SESSION_DIR', session_dir)
    117 
    118     logg.info('using engine {} moduleĀ {}.{}'.format(config.get('SYNCER_BACKEND'), syncer_store_module.__file__, syncer_store_class.__name__))
    119 
    120     settings.set('STATE_DIR', state_dir)
    121     settings.set('SYNC_STORE', sync_store)
    122 
    123     return settings
    124 
    125 
    126 def process_address_arg_rules(settings, config):
    127     rules = settings.get('RULES')
    128     category = {
    129         'input': {
    130             'i': [],
    131             'x': [],
    132             },
    133         'output': {
    134             'i': [],
    135             'x': [],
    136             },
    137         'exec':  {
    138             'i': [],
    139             'x': [],
    140             },
    141         'hash': {
    142             'i': [],
    143             'x': [],
    144             },
    145         }
    146     for rules_arg in [
    147             'input',
    148             'output',
    149             'exec',
    150             ]:
    151         (vy, vn) = to_config_names(rules_arg)
    152         for address in config.get(vy):
    153             if not is_address(address):
    154                 raise ValueError('invalid address in config {}: {}'.format(vy, address))
    155             category[rules_arg]['i'].append(address)
    156         for address in config.get(vn):
    157             if not is_address(address):
    158                 raise ValueError('invalid address in config {}: {}'.format(vn, address))
    159             category[rules_arg]['x'].append(address)
    160 
    161     includes = RuleSimple(
    162             category['output']['i'],
    163             category['input']['i'],
    164             category['exec']['i'],
    165             description='INCLUDE',
    166             match_all=settings.get('MATCH_ALL'),
    167             )
    168     rules.include(includes)
    169 
    170     excludes = RuleSimple(
    171             category['output']['x'],
    172             category['input']['x'],
    173             category['exec']['x'],
    174             description='EXCLUDE',
    175             )
    176     rules.exclude(excludes)
    177 
    178     return settings
    179 
    180 
    181 def process_data_arg_rules(settings, config):
    182     rules = settings.get('RULES')
    183 
    184     include_data = []
    185     for v in config.get('ETHMONITOR_DATA'):
    186         include_data.append(v.lower())
    187     exclude_data = []
    188     for v in config.get('ETHMONITOR_X_DATA'):
    189         exclude_data.append(v.lower())
    190 
    191     includes = RuleMethod(include_data, description='INCLUDE')
    192     rules.include(includes)
    193    
    194     excludes = RuleMethod(exclude_data, description='EXCLUDE')
    195     rules.exclude(excludes)
    196 
    197     include_data = []
    198     for v in config.get('ETHMONITOR_DATA_IN'):
    199         include_data.append(v.lower())
    200     exclude_data = []
    201     for v in config.get('ETHMONITOR_X_DATA_IN'):
    202         exclude_data.append(v.lower())
    203 
    204     includes = RuleData(include_data, description='INCLUDE', match_all=settings.get('MATCH_ALL'))
    205     rules.include(includes)
    206    
    207     excludes = RuleData(exclude_data, description='EXCLUDE')
    208     rules.exclude(excludes)
    209 
    210     return settings
    211 
    212 
    213 def process_tx_hash_rules(settings, config):
    214     rules = settings.get('RULES')
    215 
    216     include_tx = []
    217     for v in config.get('ETHMONITOR_TXHASH'):
    218         include_tx.append(v.lower())
    219     exclude_tx = []
    220     for v in config.get('ETHMONITOR_X_TXHASH'):
    221         exclude_tx.append(v.lower())
    222 
    223     includes = RuleHash(include_tx, description='INCLUDE')
    224     rules.include(includes)
    225    
    226     excludes = RuleHash(exclude_tx, description='EXCLUDE')
    227     rules.exclude(excludes)
    228 
    229     return settings
    230 
    231 
    232 def process_address_file_rules(settings, config): #rules, includes_file=None, excludes_file=None, include_default=False, include_block_default=False):
    233     rules = settings.get('RULES')
    234     includes_file = config.get('ETHMONITOR_INCLUDES_FILE')
    235     if includes_file != None:
    236         f = open(includes_file, 'r')
    237         logg.debug('reading includes rules from {}'.format(os.path.realpath(includes_file)))
    238         while True:
    239             r = f.readline()
    240             if r == '':
    241                 break
    242             r = r.rstrip()
    243             v = r.split("\t")
    244 
    245             sender = []
    246             recipient = []
    247             executable = []
    248 
    249             try:
    250                 if v[0] != '':
    251                     sender = v[0].split(',')
    252             except IndexError:
    253                 pass
    254 
    255             try:
    256                 if v[1] != '':
    257                     recipient = v[1].split(',')
    258             except IndexError:
    259                 pass
    260 
    261             try:
    262                 if v[2] != '':
    263                     executable = v[2].split(',')
    264             except IndexError:
    265                 pass
    266 
    267             rule = RuleSimple(sender, recipient, executable, match_all=settings.get('MATCH_ALL'))
    268             rules.include(rule)
    269 
    270     excludes_file = config.get('ETHMONITOR_EXCLUDES_FILE')
    271     if excludes_file != None:
    272         f = open(includes_file, 'r')
    273         logg.debug('reading excludes rules from {}'.format(os.path.realpath(excludes_file)))
    274         while True:
    275             r = f.readline()
    276             if r == '':
    277                 break
    278             r = r.rstrip()
    279             v = r.split("\t")
    280 
    281             sender = None
    282             recipient = None
    283             executable = None
    284 
    285             if v[0] != '':
    286                 sender = v[0].strip(',')
    287             if v[1] != '':
    288                 recipient = v[1].strip(',')
    289             if v[2] != '':
    290                 executable = v[2].strip(',')
    291 
    292             rule = RuleSimple(sender, recipient, executable)
    293             rules.exclude(rule)
    294     return settings
    295 
    296 
    297 def process_arg_rules(settings, config):
    298     address_rules = AddressRules(include_by_default=config.get('ETHMONITOR_INCLUDE_DEFAULT'))
    299     settings.set('MATCH_ALL', config.true('ETHMONITOR_MATCH_ALL'))
    300     settings.set('RULES', address_rules)
    301     settings = process_address_arg_rules(settings, config)
    302     settings = process_data_arg_rules(settings, config)
    303     settings = process_address_file_rules(settings, config)
    304     settings = process_tx_hash_rules(settings, config)
    305     return settings
    306 
    307 
    308 def process_cache_store(settings, config):
    309     cache_spec = config.get('_CACHE_SPEC')
    310     store = from_cache_spec(settings.get('CHAIN_SPEC'), cache_spec)
    311     if store == None:
    312         cache_dir = config.get('_CACHE_DIR')
    313         if cache_dir == None:
    314             logg.warning('no cache dir specified, will discard everything!!')
    315             from eth_cache.store.null import NullStore
    316             store = NullStore()
    317         else:
    318             store = FileStore(settings.get('CHAIN_SPEC'), cache_dir)
    319             cache_dir = os.path.realpath(cache_dir)
    320             if cache_dir == None:
    321                 import tempfile
    322                 cache_dir = tempfile.mkdtemp()
    323 
    324     logg.info('using cache store {}'.format(store))
    325     settings.set('CACHE_STORE', store)
    326 
    327     return settings
    328 
    329 
    330 def process_cache_filter(settings, config):
    331     cache_store = settings.get('CACHE_STORE')
    332     cache_rules = AddressRules(include_by_default=True)
    333     if str(cache_store) != 'Nullstore':
    334         cache_rules = settings.o['RULES'] 
    335     fltr = CacheFilter(cache_store, rules_filter=cache_rules, include_tx_data=config.true('ETHCACHE_STORE_TX'))
    336     sync_store = settings.get('SYNC_STORE')
    337     sync_store.register(fltr)
    338     
    339     fltr = BlockFilter(cache_store, include_block_data=config.true('ETHCACHE_STORE_BLOCK'))
    340     hndlr = settings.get('BLOCK_HANDLER')
    341     hndlr.register(fltr)
    342     
    343     return settings
    344 
    345 
    346 def process_run_filter(settings, config):
    347     if not settings.get('RUN_OUT'):
    348         return settings
    349     fltr = RunFilter(settings.get('RUN_DIR'))
    350     hndlr = settings.get('BLOCK_HANDLER')
    351     hndlr.register(fltr)
    352     return settings
    353 
    354 def process_tx_filter(settings, config):
    355     for fltr in list_from_prefix(config, 'filter'):
    356         m = importlib.import_module(fltr)
    357         fltr_object = m.Filter(rules_filter=settings.get('RULES'))
    358         store = settings.get('SYNC_STORE')
    359         store.register(fltr_object)
    360         logg.info('using filter module {}'.format(fltr))
    361     return settings
    362 
    363 
    364 def process_block_filter(settings, config):
    365     block_filter_handler = BlockCallbackFilter()
    366     for block_filter in list_from_prefix(config, 'block_filter'):
    367         m = importlib.import_module(block_filter)
    368         block_filter_handler.register(m)
    369         logg.info('using block filter module {}'.format(block_filter))
    370 
    371     settings.set('BLOCK_HANDLER', block_filter_handler)
    372     return settings
    373 
    374 
    375 def process_out_filter(settings, config):
    376     out_filter = OutFilter(
    377         settings.o['CHAIN_SPEC'],
    378         rules_filter=settings.o['RULES'],
    379         renderers=settings.o['RENDERER'],
    380         )
    381     store = settings.get('SYNC_STORE')
    382     store.register(out_filter)
    383     return settings
    384 
    385 
    386 def process_arg_filter(settings, config):
    387     store = settings.get('SYNC_STORE')
    388     if config.get('ETHMONITOR_FILTER') != None:
    389         for k in config.get('ETHMONITOR_FILTER'):
    390             m = importlib.import_module(k)
    391             fltr = m.Filter()
    392             store.register(fltr)
    393     return settings
    394 
    395 
    396 def process_filter(settings, config):
    397     settings.set('FILTER', [])
    398     settings = process_renderer(settings, config)
    399     settings = process_block_filter(settings, config)
    400     settings = process_cache_filter(settings, config)
    401     settings = process_run_filter(settings, config)
    402     settings = process_tx_filter(settings, config)
    403     settings = process_out_filter(settings, config)
    404     settings = process_arg_filter(settings, config)
    405     return settings
    406 
    407 
    408 def process_renderer(settings, config):
    409     renderers_mods = []
    410     for renderer in config.get('ETHMONITOR_RENDERER'):
    411         m = importlib.import_module(renderer)
    412         renderers_mods.append(m)
    413         logg.info('using renderer module {}'.format(renderer))
    414     settings.set('RENDERER', renderers_mods)
    415     return settings
    416 
    417 
    418 def process_cache_rpc(settings, config):
    419     if str(settings.get('CACHE_STORE')) == 'Nullstore':
    420         logg.debug('cache store is null, cache rpc proxy will be deactivated')
    421         return settings
    422     if not config.true('_FRESH'):
    423         rpc = CacheRPC(settings.get('CONN'), settings.get('CACHE_STORE'))
    424         settings.set('CONN', rpc)
    425     return settings
    426 
    427 
    428 def process_sync(settings, config):
    429     dialect_filter = settings.get('RPC_DIALECT_FILTER')
    430     settings.set('SYNCER_INTERFACE', EthChainInterface(dialect_filter=dialect_filter, batch_limit=settings.get('RPC_BATCH_LIMIT')))
    431     settings = process_sync_range(settings, config)
    432     return settings
    433 
    434 
    435 def process_cache(settings, config):
    436     settings = process_cache_store(settings, config)
    437     settings = process_cache_rpc(settings, config)
    438     return settings
    439 
    440 
    441 def process_user_context(settings, config):
    442     ctx_usr = {}
    443     ks = config.get('ETHMONITOR_CONTEXT_KEY')
    444     if ks != None:
    445         for kv in ks:
    446             (k, v) = kv.split('=', 1)
    447             ctx_usr[k] = v
    448     ctx = {
    449         'driver': 'eth-monitor',
    450         'rundir': settings.get('RUN_DIR'),
    451         'usr': ctx_usr,
    452             }
    453     settings.set('SYNCER_CONTEXT', ctx)
    454     return settings
    455 
    456 
    457 def process_settings(settings, config):
    458     settings = process_monitor_session(settings, config)
    459     settings = process_monitor_session_dir(settings, config)
    460     settings = process_monitor_rundir(settings, config)
    461     settings = process_arg_rules(settings, config)
    462     settings = process_sync(settings, config)
    463     settings = process_cache(settings, config)
    464     settings = process_user_context(settings, config)
    465     settings = process_filter(settings, config)
    466     return settings