Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Siu, Pui Chung
bioinformatics algorithms
Commits
c1219a39
Commit
c1219a39
authored
Jul 22, 2021
by
Siu, Pui Chung
Browse files
Upload New File
parent
c6204ff9
Changes
1
Hide whitespace changes
Inline
Sidebyside
clustering: kmean clustering/assignment_kmeans_skeleton_python3.py
0 → 100644
View file @
c1219a39
#!/usr/bin/env python3
"""
Author: Jacky Siu Pui Chung
Student number = 1047527
Description: Implementation of the kmeans clustering algorithm
Hints:
 write a function to obtain Euclidean distance between two points.
 write a function to initialize centroids by randomly selecting points
from the initial set of points. You can use the random.sample() method
 write a function to find the closest centroid to each point and assign
the points to the clusters.
 write a function to calculate the centroids given the clusters
 write a function to implement kmeans
 write a function to calculate WGSS given the clusters and the points
 write a function to calculate BGSS given the clusters and the points
"""
# import statements
import
random
import
math
import
copy
from
statistics
import
mean
from
statistics
import
stdev
def
csv_parser
(
lines
):
"""Return list of point coordinates as [[x1,y1,z1,...],[x2,y2,z2,...]]
lines: open file or list of lines. Expected format:
The file has a single header line.
Each line contains the coordinates for one data point, starting
with a label. A data point can be specified in arbitrary dimensions.
Output: List of lists with the coordinates of the points.
This function does not capture the labels of the data points. In case
they are needed later, this function should be adjusted.
"""
data_points
=
[]
for
line
in
lines
:
items
=
line
.
strip
().
split
(
","
)
try
:
#will fail on header line in file
data_points
.
append
(
list
(
map
(
float
,
items
[
1
:])))
#skip label
except
ValueError
:
#must be the header
continue
return
data_points
def
findeuclideandistance
(
pt1
,
pt2
):
"""
Description: calculate euclidean distance between two data points
Input: pt1, list of a data point, length equal its dimension
pt2, list of a data point, length equal its dimension
Output: d, int of euclidean distance between pt1 and pt2
"""
d
=
math
.
sqrt
(
sum
([(
a

b
)
**
2
for
a
,
b
in
zip
(
pt1
,
pt2
)]))
return
d
def
centroidinit
(
pointslist
,
k
):
"""
Description: initialize centroid by randomly sampling from the dataset
Input: pointslist, list of list containing data points with same number
of dimensions
k, int of number of clusters
Output: init_centroid_list, list of list containing data points of the
initialized centroids
clusters, dictionary with the key as 0, 1, 2,..... and values
as the respective initialized centroid
"""
init_centroid_list
=
random
.
sample
(
pointslist
,
k
)
clusters
=
{
clusternum
:[
init_centroid
]
for
clusternum
,
init_centroid
in
\
enumerate
(
init_centroid_list
)}
return
init_centroid_list
,
clusters
def
findclosestcentroid
(
pointslist
,
clustersdict
):
"""
Description: for each data point, find closest centroid
Input: pointslist, list of list containing data points with same number
of dimensions
clustersdict, dictionary with the key as 0, 1, 2,..... and values
as list of list, where list[0] are respective initialized centroid
for that cluster
Output: clustersdict, dictionary with keys as the cluster number and
values as a list of list containing all data points associated
with that cluster
labellist, list that is the same length as the pointslist, where
each point is associated with a cluster
"""
labellist
=
[]
clusterslist
=
[
init_centroid
[
0
]
for
init_centroid
in
clustersdict
.
values
()]
#removal of centroid from cluster list during initialization
pointslist_init
=
[
datapoints
for
datapoints
in
pointslist
if
datapoints
\
not
in
clusterslist
]
for
unassigned_points
in
pointslist_init
:
dis
=
[]
for
i
in
range
(
len
(
clusterslist
)):
dis
.
append
(
findeuclideandistance
(
clusterslist
[
i
],
unassigned_points
))
clustersdict
[
dis
.
index
(
min
(
dis
))].
append
(
unassigned_points
)
labellist
.
append
(
dis
.
index
(
min
(
dis
)))
return
clustersdict
,
labellist
def
reassigncentroid
(
clustersdict
):
"""
Description: reassign the centroid for each cluster by calculating the mean
between all data points within the cluster
Input: clustersdict, dictionary with keys as the cluster number and
values as a list of list containing all data points associated
with that cluster
Output:
clustersdict, clustersdict, dictionary with the key as cluster
number and values as list of list, where list[0]
are respective reassigned centroid for that cluster
"""
for
cluster
,
data_points_in_cluster
in
list
(
clustersdict
.
items
()):
avg
=
[
mean
(
dimension
)
for
dimension
in
zip
(
*
data_points_in_cluster
)]
clustersdict
[
cluster
]
=
[
avg
]
return
clustersdict
def
checkconvergence
(
clustersdictbefore
,
clustersdictafter
):
"""
Description: compare two clustersdict to check if they have the same
centroid at each cluster
Input: clustersdictbefore, dictionary with keys as the cluster number and
values as a list of list where list[0]
are respective reassigned centroid for that cluster
clustersdictafter, dictionary with keys as the cluster number and
values as a list of list where list[0]
are respective reassigned centroid for that cluster
Output:
True when they share same centroid at each cluster, false when
they do not
"""
for
keys
in
clustersdictbefore
.
keys
():
if
clustersdictbefore
[
keys
]
!=
clustersdictafter
[
keys
]:
return
False
return
True
def
checkemptycluster
(
clustersdict
):
"""
Description: check whether if a cluster has no data points associated
Input: clustersdict, dictionary with keys as the cluster number and
values as a list of list where list[0]
are respective reassigned centroid for that cluster
Output:
True when they share same centroid at each cluster, false when
they do not
"""
for
keys
in
clustersdict
.
keys
():
#cluster only contain centroid
if
len
(
clustersdict
[
keys
])
==
1
:
return
True
return
False
def
kmeansclusteringwrapper
(
pointslist
,
k
):
"""
Description: wrapper that connect the k mean clustering algorithm
Input: pointslist, list of list containing data points with same number
of dimensions
k, int of number of clusters
Output:
finallabellist, list that associate each point with a cluster
clustersdictfinal, dictionary with keys as the cluster number and
values as a list of list containing all data points associated
with that cluster
numtoconvergence, int of number of times it take to converge
"""
# number of times it takes to converge, minimum once
numtoconvergence
=
1
init_centroid_list
,
clustersdict
=
centroidinit
(
pointslist
,
k
)
#deep copy
clusersdictafter
=
copy
.
deepcopy
(
clustersdict
)
clustersdictafter
,
updatedlabellist
=
findclosestcentroid
(
pointslist
,
\
clusersdictafter
)
#if centroid has no data points associated, reinitiate centroid
if
checkemptycluster
(
clustersdictafter
):
kmeansclusteringwrapper
(
pointslist
,
k
)
while
checkconvergence
(
clustersdict
,
clustersdictafter
)
==
False
:
numtoconvergence
+=
1
clustersdict
=
copy
.
deepcopy
(
clustersdictafter
)
clustersdictafter
=
copy
.
deepcopy
(
clustersdictafter
)
clustersdictafter
,
updatedlabellist
=
findclosestcentroid
(
pointslist
,
\
clustersdictafter
)
clustersdictafter
=
reassigncentroid
(
clustersdictafter
)
#when converges
clustersdictfinal
,
finallabellist
=
findclosestcentroid
(
pointslist
,
\
clustersdictafter
)
if
checkemptycluster
(
clustersdictafter
):
kmeansclusteringwrapper
(
pointslist
,
k
)
return
finallabellist
,
clustersdictfinal
,
numtoconvergence
def
WGSS
(
clustersdict
):
"""
Description: calculate WGSS for clusters
Input: clustersdict, dictionary with keys as the cluster number and
values as a list of list containing all data points associated
with that cluster
Output:
WGSS_total, int of the WGSS score of the cluster
"""
WGSS_total
=
0
for
keys
in
clustersdict
.
keys
():
data_points
=
clustersdict
[
keys
]
centroid
=
data_points
[
0
]
WGSS
=
[
findeuclideandistance
(
data_point
,
centroid
)
**
2
for
\
data_point
in
data_points
]
WGSS
=
mean
(
WGSS
)
WGSS_total
+=
WGSS
return
WGSS_total
def
BGSS
(
clustersdict
,
pointlist
):
"""
Description: calculate BGSS for clusters
Input: clustersdict, dictionary with keys as the cluster number and
values as a list of list containing all data points associated
with that cluster
pointlist:list of list containing data points with same number
of dimensions
Output:
BGSS_total, int of the WGSS score of the cluster
"""
BGSS_total
=
0
mean_centroid
=
[
mean
(
dimension
)
for
dimension
in
zip
(
*
pointlist
)]
for
key
in
clustersdict
.
keys
():
centroid
=
clustersdict
[
key
][
0
]
d
=
findeuclideandistance
(
centroid
,
mean_centroid
)
n
=
len
(
clustersdict
[
key
])

1
BGSS_total
+=
d
**
2
*
n
return
BGSS_total
def
clusterevaulation
(
clustersdict
,
pointlist
):
"""
Description: wrapper that calculate W score for clusters
Input: clustersdict, dictionary with keys as the cluster number and
values as a list of list containing all data points associated
with that cluster
pointlist:list of list containing data points with same number
of dimensions
Output:
W, int of the W score of the cluster
"""
clusters_for_WGSS
=
copy
.
deepcopy
(
clustersdict
)
clusters_for_BGSS
=
copy
.
deepcopy
(
clustersdict
)
WGSS_score
=
WGSS
(
clusters_for_WGSS
)
BGSS_score
=
BGSS
(
clusters_for_BGSS
,
pointlist
)
W
=
WGSS_score
/
BGSS_score
return
W
def
statistical_calc
(
pointlist
,
k
,
repetition
):
"""
Description: wrapper that calculate W score, niter and final labels
for clusters in a given repitition
Input: pointlist:list of list containing data points with same number
of dimensions
k, int of number of clusters
repitition: number of times to perform k mean clustering
on the pointlist with given k
Output:
W_list, list of int of the W score of the cluster
numtoconvergence_list: list of int of the niter of each k mean
clustering
finallabellist: list of list of the label for each point
in the pointlist
"""
W_list
=
[]
numtoconvergence_list
=
[]
finallabellist
=
[]
while
len
(
W_list
)
<
repetition
:
finallabels
,
clustersdictfinal
,
numtoconvergence
=
\
(
kmeansclusteringwrapper
(
pointlist
,
k
))
W
=
clusterevaulation
(
clustersdictfinal
,
pointlist
)
W_list
.
append
(
W
)
numtoconvergence_list
.
append
(
numtoconvergence
)
finallabellist
.
append
(
finallabels
)
return
W_list
,
numtoconvergence_list
,
finallabellist
if
__name__
==
"__main__"
:
data_points_2dtest_2021
=
csv_parser
(
open
(
"datasets/2dtest_2021.csv"
))
# the code below should produce the results necessary to answer
# the questions. In other words, if we run your code, we should see
# the data that you used to answer the questions.
W_list_2dtest_2021
,
numtoconvergence_list_2dtest_2021
,
\
finallabellist_2dtest_2021
=
statistical_calc
(
data_points_2dtest_2021
,
3
,
20
)
print
(
"1a)"
,
mean
(
numtoconvergence_list_2dtest_2021
),
\
stdev
(
numtoconvergence_list_2dtest_2021
))
print
(
"1b) lowest W = "
,
min
(
W_list_2dtest_2021
),
"label list = "
,
\
finallabellist_2dtest_2021
[
\
W_list_2dtest_2021
.
index
(
min
(
W_list_2dtest_2021
))])
print
(
"1c)"
,
mean
(
W_list_2dtest_2021
),
stdev
(
W_list_2dtest_2021
))
print
(
"1d)"
,
min
(
W_list_2dtest_2021
))
k_list
=
[
3
,
4
,
5
,
6
]
data_points_LargeSet_1_2021
=
csv_parser
(
open
(
"datasets/LargeSet_1_2021.csv"
))
LargeSet_1_2021_W_list
=
[]
LargeSet_1_2021_ntc_list
=
[]
for
k
in
k_list
:
W_list
,
numtoconvergence_list
,
finallabellist
=
\
statistical_calc
(
data_points_2dtest_2021
,
k
,
50
)
LargeSet_1_2021_W_list
.
append
((
mean
(
W_list
),
stdev
(
W_list
)))
LargeSet_1_2021_ntc_list
.
append
(
\
(
mean
(
numtoconvergence_list
),
stdev
(
numtoconvergence_list
)))
print
(
"2a)"
)
for
i
in
range
(
len
(
LargeSet_1_2021_ntc_list
)):
print
(
"For niter, k = "
,
i
+
3
,
": mean = "
,
LargeSet_1_2021_ntc_list
[
i
][
0
],
\
"sd = "
,
LargeSet_1_2021_ntc_list
[
i
][
1
])
print
(
"2b)"
)
for
i
in
range
(
len
(
LargeSet_1_2021_W_list
)):
print
(
"For W, k = "
,
i
+
3
,
": mean = "
,
LargeSet_1_2021_W_list
[
i
][
0
],
\
"sd = "
,
LargeSet_1_2021_W_list
[
i
][
1
])
print
(
"2c)"
)
for
k
in
k_list
:
W_list
,
numtoconvergence_list
,
finallabellist
=
statistical_calc
\
(
data_points_2dtest_2021
,
k
,
50
)
print
(
"for k = {0}"
.
format
(
k
))
print
(
"smallest W value = "
,
min
(
W_list
))
print
(
finallabellist
[
W_list
.
index
(
min
(
W_list
))])
print
(
"2d) The lowest W value was selected because it means it has the
\
strongest cluster structure"
)
print
(
"2e) Selecting the lowest W value is better as not all convergence
\
lead to the global minimum (or at least lowest known)"
)
print
(
"3) K mean clustering assume that the variance of
\
the distribution within a cluster is spherical,
\
which is not applicable in the dataset"
)
print
(
"4) For each value of k, a range of repitition can
\
be used for cross validation to find the lowest mean W"
)
print
(
"5) with an outlier in a cluster, it dramatically shifts
\
the reassignment of the centroid as the mean
\
is sensitive to the outlier's position"
)
print
(
"6) the k mean clustering algorithm is not deterministic,
\
as the initialization of the centroid are
\
randomized"
)
print
(
"7) O(N(D+K)), N being the number of data points
\
in the dataset, D being the dimension, and K
\
being the number of clusters"
)
print
(
"8) the time complexity = O(NKI), N being the number
\
of data points in the dataset, K being
\
the number of clusters, and I being the number of iterations til convergence."
)
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment