-
Notifications
You must be signed in to change notification settings - Fork 0
/
sharedFunctions.py
154 lines (135 loc) · 5.37 KB
/
sharedFunctions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
from constants import *
from featuresStruct import features
from command import *
import subprocess,re,traceback,sys,os
def nGramsExtractor(apk,family):
name = apk.split('/')[-1]
Sources = []
Sinks = []
f = open("ngrams.txt","r")
f2 = open(PROJECT_PATH+"tools/flowdroid/SourcesAndSinks.txt","r")
sourcesinks = f2.readlines()
for ss in sourcesinks:
if "<" in ss and ">" in ss:
payload = ss.split('<')[1].split('>')[0]
else:
continue
if "_SOURCE_" in ss:
Sources.append(payload)
elif "_SINK_" in ss:
Sinks.append(payload)
Sequences = {}
cmd = 'cd '+PROJECT_PATH+'tools/flowdroid/ && java -Xmx4g -cp soot-trunk.jar:soot-infoflow.jar:soot-infoflow-android.jar:slf4j-api-1.7.5.jar:slf4j-simple-1.7.5.jar:axml-2.0.jar soot.jimple.infoflow.android.TestApps.Test "'+apk+'" /root/FYP/android-platforms --pathalgo contextsensitive'
print "\nCommand is " + cmd + "\n"
try:
output = Command(cmd).run(capture=True,timeout=600)
if output == "failed":
print "Moving apk to quarantine"
os.system("mv "+apk+" "+PROJECT_PATH+"quarantine/"+family+"/")
return False
output = '\n'.join(output)
results = output.split('Found a flow to sink')
for i in range(1,len(results)):
sink = results[i].split('from the following sources:')[0][:-1]
sink = re.findall( '<(.*?)>', sink)[0]
startSource = results[i].split('from the following sources:')[1].split('on Path ')[0]
startSource = re.findall( '<(.*?)>', startSource)[0]
apipaths = results[i].split('from the following sources:')[1].split('on Path ')
for pathh in apipaths:
Sequence = ""
if "[" not in pathh or "]" not in pathh:
continue
pathh = pathh.split('[')[1].split(']')[0]
sources = pathh.split(',')
apiPath = []
for source in sources:
if "invoke" not in source:
continue
m = re.findall ( '<(.*?)>', source)
for mm in m:
apiPath.append(mm)
print "Sink is "+sink
#apiPath.append(sink)
validSourceFound = False
for subpath in apiPath:
if subpath in Sources:
validSourceFound = True
if validSourceFound:
Sequence += subpath + ","
#print "\n"
if validSourceFound:
Sequence += sink + ","
if Sequence in Sequences:
Sequences[Sequence] = Sequences[Sequence] + "," + name
else:
Sequences[Sequence] = name
except Exception as e:
traceback.print_exc(file=sys.stdout)
print(e.message) + " for " + name
return False
currentngram = []
ngrams = []
n = 4
for sequence in Sequences:
sequences = sequence.split(',')
for sq in sequences:
if sq.strip():
currentngram.append(sq)
if len(currentngram) == n:
ngramcopy = currentngram[:]
alreadyAdded = False
for ngram in ngrams:
if set(ngram) == set(ngramcopy):
alreadyAdded = True
break
if not alreadyAdded:
ngrams.append(ngramcopy)
currentngram.pop(0)
basengrams = []
content = f.readlines()
ngram = []
for line in content:
line = line.strip()
if line:
ngram.append(line)
else:
basengrams.append(ngram)
ngram = []
for ngram in ngrams:
for basengram in basengrams:
if set(basengram) == set(ngram):
string = ",".join(basengram)
#print "\nFound "+string+"\n"
if string[-1] == ",":
string = string[:-1]
features[string] = 1
f.close()
return True
def onBangcleDetected(evidence,sh,startColumn):
global features
features["BANGCLE"] = 1
#print "Use of bangcle detected:"+evidence
#print ""
#sh.write(START_ROW+16,0,"Obsfucation technique")
#sh.write(START_ROW+16,startColumn,"Bangcle")
def onDexGuardDetected(evidence,sh,startColumn):
global features
features["DEXGUARD"] = 1
#print "Use of Dexguard detected:"+evidence+" non-ASCII characters"
#print ""
#sh.write(START_ROW+16,0,"Obsfucation technique")
#sh.write(START_ROW+16,startColumn,"DexGuard")
def onHose2JarDetected(evidence,sh,startColumn):
global features
features["HOSE2JAR"] = 1
#print "Use of Hose2Jar detected:"+evidence
#print ""
#sh.write(START_ROW+16,0,"Obsfucation technique")
#sh.write(START_ROW+16,startColumn,"Hose2Jar")
def onAPKProtectDetected(evidence,sh,startColumn):
global features
features["APKPROTECT"] = 1
#print "Use of Hose2Jar detected:"+evidence
#print ""
#sh.write(START_ROW+16,0,"Obsfucation technique")
#sh.write(START_ROW+16,startColumn,"ApkProtect")