rules.py (6706B)
1 # standard imports 2 import logging 3 import uuid 4 5 # external imports 6 from chainlib.eth.address import is_same_address 7 from hexathon import same as same_hex 8 9 # local imports 10 from .error import RuleFail 11 12 logg = logging.getLogger() 13 14 15 16 class RuleData: 17 18 def __init__(self, fragments, description=None, match_all=False): 19 self.fragments = fragments 20 self.description = description 21 if self.description == None: 22 self.description = str(uuid.uuid4()) 23 self.match_all = match_all 24 25 26 def check(self, sender, recipient, data, tx_hash): 27 have_fail = False 28 have_match = False 29 if len(self.fragments) == 0: 30 return False 31 32 for fragment in self.fragments: 33 l = len(fragment) 34 if len(fragment) > len(data): 35 continue 36 if fragment in data: 37 logg.debug('tx {} rule {} match in DATA FRAGMENT {}'.format(tx_hash, self.description, fragment)) 38 if not self.match_all: 39 return True 40 have_match = True 41 else: 42 logg.debug('data match all {}'.format(self.match_all)) 43 if self.match_all: 44 return False 45 have_fail = True 46 47 return have_match 48 49 50 def __str__(self): 51 return 'Fragment ' + self.description + ' {}'.format( 52 self.fragments, 53 ) 54 55 56 class RuleMethod: 57 58 def __init__(self, methods, description=None, match_all=False): 59 self.methods = methods 60 self.description = description 61 if self.description == None: 62 self.description = str(uuid.uuid4()) 63 if match_all: 64 logg.warning('match_all ignord for RuleMethod rule') 65 66 67 def check(self, sender, recipient, data, tx_hash): 68 if len(self.methods) == 0: 69 return False 70 71 for method in self.methods: 72 l = len(method) 73 if len(method) > len(data): 74 continue 75 if data[:l] == method: 76 logg.debug('tx {} rule {} match in DATA {}'.format(tx_hash, self.description, method)) 77 return True 78 79 return False 80 81 82 def __str__(self): 83 return 'Method ' + self.description + ' {}'.format( 84 self.methods, 85 ) 86 87 88 class RuleSimple: 89 90 def __init__(self, outputs, inputs, executables, description=None, match_all=False): 91 self.description = description 92 if self.description == None: 93 self.description = str(uuid.uuid4()) 94 self.outputs = outputs 95 self.inputs = inputs 96 self.executables = executables 97 self.match_all = match_all 98 99 100 def check(self, sender, recipient, data, tx_hash): 101 r = None 102 try: 103 r = self.__check(sender, recipient, data, tx_hash) 104 except RuleFail: 105 return False 106 return r 107 108 109 def __check(self, sender, recipient, data, tx_hash): 110 have_fail = False 111 have_match = False 112 for rule in self.outputs: 113 if rule != None and is_same_address(sender, rule): 114 logg.debug('tx {} rule {} match in SENDER {}'.format(tx_hash, self.description, sender)) 115 if not self.match_all: 116 return True 117 have_match = True 118 else: 119 if self.match_all: 120 raise RuleFail(rule) 121 have_fail = True 122 if recipient == None: 123 return False 124 for rule in self.inputs: 125 if rule != None and is_same_address(recipient, rule): 126 logg.debug('tx {} rule {} match in RECIPIENT {}'.format(tx_hash, self.description, recipient)) 127 if not self.match_all: 128 return True 129 have_match = True 130 else: 131 if self.match_all: 132 raise RuleFail(rule) 133 have_fail = True 134 for rule in self.executables: 135 if rule != None and is_same_address(recipient, rule): 136 logg.debug('tx {} rule {} match in EXECUTABLE {}'.format(tx_hash, self.description, recipient)) 137 if not self.match_all: 138 return True 139 have_match = True 140 else: 141 if self.match_all: 142 raise RuleFail(rule) 143 have_fail = True 144 145 return have_match 146 147 148 def __str__(self): 149 return 'Simple ' + self.description + ' outputs {} inputs {} execs {}'.format( 150 self.outputs, 151 self.inputs, 152 self.executables, 153 ) 154 155 156 class RuleHash: 157 158 def __init__(self, hashes, description=None): 159 self.hashes = hashes 160 self.description = description 161 if self.description == None: 162 self.description = str(uuid.uuid4()) 163 164 165 def check(self, sender, recipient, data, tx_hash): 166 for h in self.hashes: 167 if len(tx_hash) >= len(h): 168 if same_hex(tx_hash[:len(h)], h): 169 return True 170 return False 171 172 173 def __str__(self): 174 return 'Hash ' + self.description + ' hashes {}'.format( 175 self.hashes, 176 ) 177 178 179 class AddressRules: 180 181 def __init__(self, include_by_default=False, match_all=False): 182 self.excludes = [] 183 self.includes = [] 184 self.include_by_default = include_by_default 185 self.match_all = match_all 186 187 188 def exclude(self, rule): 189 self.excludes.append(rule) 190 logg.info('cache filter added EXCLUDE rule {}'.format(rule)) 191 192 193 def include(self, rule): 194 self.includes.append(rule) 195 logg.info('cache filter added INCLUDE rule {}'.format(rule)) 196 197 198 def apply_rules(self, tx): 199 return self.apply_rules_detail(tx.outputs[0], tx.inputs[0], tx.payload, tx.hash) 200 201 202 def apply_rules_detail(self, sender, recipient, data, tx_hash): 203 v = self.include_by_default 204 have_fail = False 205 have_match = False 206 207 for rule in self.includes: 208 if rule.check(sender, recipient, data, tx_hash): 209 v = True 210 logg.info('match in includes rule: {}'.format(rule)) 211 if not self.match_all: 212 break 213 elif self.match_all: 214 v = False 215 break 216 217 if not v: 218 return v 219 220 for rule in self.excludes: 221 if rule.check(sender, recipient, data, tx_hash): 222 v = False 223 logg.info('match in excludes rule: {}'.format(rule)) 224 if not self.match_all: 225 break 226 227 return v