eth-monitor

Monitor and cache ethereum transactions with match filters
git clone git://git.defalsify.org/eth-monitor.git
Info | Log | Files | Refs | README | LICENSE

import.py (5735B)


      1 # standard imports
      2 import argparse
      3 import logging
      4 import sys
      5 import os
      6 import time
      7 
      8 # external imports
      9 from chainlib.encode import TxHexNormalizer
     10 from chainlib.eth.connection import EthHTTPConnection
     11 from chainlib.chain import ChainSpec
     12 from eth_cache.store.file import FileStore
     13 
     14 # local imports
     15 from eth_monitor.filters.cache import Filter as CacheFilter
     16 from eth_monitor.filters import RuledFilter
     17 from eth_monitor.rules import (
     18         AddressRules,
     19         RuleSimple,
     20         )
     21 
     22 logging.basicConfig(level=logging.WARNING)
     23 logg = logging.getLogger()
     24 
     25 normalize_address = TxHexNormalizer().wallet_address
     26 
     27 services = [
     28     'etherscan',
     29         ]
     30 
     31 argparser = argparse.ArgumentParser('master eth events monitor')
     32 argparser.add_argument('--api-key-file', dest='api_key_file', type=str, help='File to read API key from')
     33 argparser.add_argument('--cache-dir', dest='cache_dir', type=str, default='.eth-monitor/cache', help='Directory to store tx data')
     34 argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default')
     35 argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default')
     36 argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string')
     37 argparser.add_argument('--address-file', dest='address_file', default=[], type=str, action='append', help='Add addresses from file')
     38 argparser.add_argument('--list-services', dest='list', action='store_true', help='List all supported services')
     39 argparser.add_argument('-a', '--address', default=[], type=str, action='append', help='Add address')
     40 argparser.add_argument('--socks-host', dest='socks_host', type=str, help='Conect through socks host')
     41 argparser.add_argument('--socks-port', dest='socks_port', type=int, help='Conect through socks port')
     42 argparser.add_argument('--delay', type=float, default=0.2, help='Seconds to wait between each retrieval from importer')
     43 argparser.add_argument('-v', action='store_true', help='Be verbose')
     44 argparser.add_argument('-vv', action='store_true', help='Be more verbose')
     45 argparser.add_argument('-p', type=str, help='RPC provider')
     46 argparser.add_argument('service', nargs='?', type=str, help='Index service to import from')
     47 args = argparser.parse_args(sys.argv[1:])
     48 
     49 if args.list:
     50     for s in services:
     51         sys.stdout.write('{}\n'.format(s))
     52     sys.exit(0)
     53 
     54 if not args.service:
     55     argparser.error('the following arguments are required: service')
     56     sys.exit(1)
     57 
     58 if args.vv:
     59     logg.setLevel(logging.DEBUG)
     60 elif args.v:
     61     logg.setLevel(logging.INFO)
     62 
     63 api_key = os.environ.get('API_KEY')
     64 if args.api_key_file != None:
     65     f = open(args.api_key_file, 'r')
     66     api_key = f.read()
     67     f.close()
     68 
     69 rpc = EthHTTPConnection(args.p)
     70 
     71 chain_spec = ChainSpec.from_chain_str(args.i)
     72 
     73 def conn_socks(host, port):
     74     import socks
     75     import socket
     76 
     77     socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS4, host, port, True)
     78     socket.socket = socks.socksocket
     79 
     80 
     81 def collect_addresses(addresses=[], address_files=[]):
     82     address_collection = []
     83     for a in addresses:
     84         a = normalize_address(a)
     85         if a in address_collection:
     86             logg.debug('skipping duplicate address {}'.format(a))
     87         address_collection.append(a)
     88         logg.info('added address {}'.format(a))
     89 
     90     for fp in address_files:
     91         logg.debug('processing file ' + fp)
     92         f = open(fp, 'r')
     93         while True:
     94             a = f.readline()
     95             if a == '':
     96                 break
     97             a = a.rstrip()
     98             a = normalize_address(a)
     99             if a in address_collection:
    100                 logg.debug('skipping duplicate address {}'.format(a))
    101             address_collection.append(a)
    102             logg.info('added address {}'.format(a))
    103         f.close()
    104 
    105     return address_collection
    106 
    107 
    108 def setup_address_rules(addresses):
    109     rules = AddressRules()
    110     outputs = []
    111     inputs = []
    112     execs = []
    113     for address in addresses:
    114         outputs.append(address)
    115         inputs.append(address)
    116         execs.append(address)
    117     rule = RuleSimple(outputs, inputs, execs, description='etherscan import')
    118     rules.include(rule)
    119     return rules
    120 
    121 
    122 def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data, address_rules):
    123     store = FileStore(chain_spec, cache_dir, address_rules=address_rules)
    124     cache_dir = os.path.realpath(cache_dir)
    125     if cache_dir == None:
    126         import tempfile
    127         cache_dir = tempfile.mkdtemp()
    128     logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir))
    129     RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data)
    130 
    131 
    132 def main():
    133     if args.socks_host != None:
    134         conn_socks(args.socks_host, args.socks_port)
    135     addresses = collect_addresses(args.address, args.address_file)
    136 
    137     from eth_monitor.importers.etherscan import Importer as EtherscanImporter
    138   
    139     address_rules = setup_address_rules(args.address)
    140 
    141     setup_filter(
    142             chain_spec,
    143             args.cache_dir,
    144             bool(args.store_tx_data),
    145             bool(args.store_block_data),
    146             address_rules,
    147             )
    148 
    149     cache_filter = CacheFilter(
    150             rules_filter=address_rules,
    151             )
    152 
    153     filters = [
    154             cache_filter,
    155             ]
    156 
    157     importer = []
    158     if args.service == 'etherscan':
    159         importer = EtherscanImporter(rpc, api_key, filters=filters, block_callback=RuledFilter.block_callback)
    160     else:
    161         raise ValueError('invalid service: {}'.format(args.service))
    162     for a in addresses:
    163         importer.get(a)
    164         time.sleep(args.delay)
    165 
    166 
    167 if __name__ == '__main__':
    168     main()