import.py (5735B)
1 # standard imports 2 import argparse 3 import logging 4 import sys 5 import os 6 import time 7 8 # external imports 9 from chainlib.encode import TxHexNormalizer 10 from chainlib.eth.connection import EthHTTPConnection 11 from chainlib.chain import ChainSpec 12 from eth_cache.store.file import FileStore 13 14 # local imports 15 from eth_monitor.filters.cache import Filter as CacheFilter 16 from eth_monitor.filters import RuledFilter 17 from eth_monitor.rules import ( 18 AddressRules, 19 RuleSimple, 20 ) 21 22 logging.basicConfig(level=logging.WARNING) 23 logg = logging.getLogger() 24 25 normalize_address = TxHexNormalizer().wallet_address 26 27 services = [ 28 'etherscan', 29 ] 30 31 argparser = argparse.ArgumentParser('master eth events monitor') 32 argparser.add_argument('--api-key-file', dest='api_key_file', type=str, help='File to read API key from') 33 argparser.add_argument('--cache-dir', dest='cache_dir', type=str, default='.eth-monitor/cache', help='Directory to store tx data') 34 argparser.add_argument('--store-tx-data', dest='store_tx_data', action='store_true', help='Include all transaction data objects by default') 35 argparser.add_argument('--store-block-data', dest='store_block_data', action='store_true', help='Include all block data objects by default') 36 argparser.add_argument('-i', '--chain-spec', dest='i', type=str, default='evm:ethereum:1', help='Chain specification string') 37 argparser.add_argument('--address-file', dest='address_file', default=[], type=str, action='append', help='Add addresses from file') 38 argparser.add_argument('--list-services', dest='list', action='store_true', help='List all supported services') 39 argparser.add_argument('-a', '--address', default=[], type=str, action='append', help='Add address') 40 argparser.add_argument('--socks-host', dest='socks_host', type=str, help='Conect through socks host') 41 argparser.add_argument('--socks-port', dest='socks_port', type=int, help='Conect through socks port') 42 argparser.add_argument('--delay', type=float, default=0.2, help='Seconds to wait between each retrieval from importer') 43 argparser.add_argument('-v', action='store_true', help='Be verbose') 44 argparser.add_argument('-vv', action='store_true', help='Be more verbose') 45 argparser.add_argument('-p', type=str, help='RPC provider') 46 argparser.add_argument('service', nargs='?', type=str, help='Index service to import from') 47 args = argparser.parse_args(sys.argv[1:]) 48 49 if args.list: 50 for s in services: 51 sys.stdout.write('{}\n'.format(s)) 52 sys.exit(0) 53 54 if not args.service: 55 argparser.error('the following arguments are required: service') 56 sys.exit(1) 57 58 if args.vv: 59 logg.setLevel(logging.DEBUG) 60 elif args.v: 61 logg.setLevel(logging.INFO) 62 63 api_key = os.environ.get('API_KEY') 64 if args.api_key_file != None: 65 f = open(args.api_key_file, 'r') 66 api_key = f.read() 67 f.close() 68 69 rpc = EthHTTPConnection(args.p) 70 71 chain_spec = ChainSpec.from_chain_str(args.i) 72 73 def conn_socks(host, port): 74 import socks 75 import socket 76 77 socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS4, host, port, True) 78 socket.socket = socks.socksocket 79 80 81 def collect_addresses(addresses=[], address_files=[]): 82 address_collection = [] 83 for a in addresses: 84 a = normalize_address(a) 85 if a in address_collection: 86 logg.debug('skipping duplicate address {}'.format(a)) 87 address_collection.append(a) 88 logg.info('added address {}'.format(a)) 89 90 for fp in address_files: 91 logg.debug('processing file ' + fp) 92 f = open(fp, 'r') 93 while True: 94 a = f.readline() 95 if a == '': 96 break 97 a = a.rstrip() 98 a = normalize_address(a) 99 if a in address_collection: 100 logg.debug('skipping duplicate address {}'.format(a)) 101 address_collection.append(a) 102 logg.info('added address {}'.format(a)) 103 f.close() 104 105 return address_collection 106 107 108 def setup_address_rules(addresses): 109 rules = AddressRules() 110 outputs = [] 111 inputs = [] 112 execs = [] 113 for address in addresses: 114 outputs.append(address) 115 inputs.append(address) 116 execs.append(address) 117 rule = RuleSimple(outputs, inputs, execs, description='etherscan import') 118 rules.include(rule) 119 return rules 120 121 122 def setup_filter(chain_spec, cache_dir, include_tx_data, include_block_data, address_rules): 123 store = FileStore(chain_spec, cache_dir, address_rules=address_rules) 124 cache_dir = os.path.realpath(cache_dir) 125 if cache_dir == None: 126 import tempfile 127 cache_dir = tempfile.mkdtemp() 128 logg.info('using chain spec {} and dir {}'.format(chain_spec, cache_dir)) 129 RuledFilter.init(store, include_tx_data=include_tx_data, include_block_data=include_block_data) 130 131 132 def main(): 133 if args.socks_host != None: 134 conn_socks(args.socks_host, args.socks_port) 135 addresses = collect_addresses(args.address, args.address_file) 136 137 from eth_monitor.importers.etherscan import Importer as EtherscanImporter 138 139 address_rules = setup_address_rules(args.address) 140 141 setup_filter( 142 chain_spec, 143 args.cache_dir, 144 bool(args.store_tx_data), 145 bool(args.store_block_data), 146 address_rules, 147 ) 148 149 cache_filter = CacheFilter( 150 rules_filter=address_rules, 151 ) 152 153 filters = [ 154 cache_filter, 155 ] 156 157 importer = [] 158 if args.service == 'etherscan': 159 importer = EtherscanImporter(rpc, api_key, filters=filters, block_callback=RuledFilter.block_callback) 160 else: 161 raise ValueError('invalid service: {}'.format(args.service)) 162 for a in addresses: 163 importer.get(a) 164 time.sleep(args.delay) 165 166 167 if __name__ == '__main__': 168 main()