Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
P
PET-rating
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Requirements
Code
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Terraform modules
Monitor
Incidents
Service Desk
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Timo Heikkilä
PET-rating
Commits
72c5ed22
Commit
72c5ed22
authored
5 years ago
by
Ossi Laine
Browse files
Options
Downloads
Patches
Plain Diff
Handle session correctly when moving between question types (embody -> slider)
parent
4e38530e
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
app/task/views.py
+84
-98
84 additions, 98 deletions
app/task/views.py
with
84 additions
and
98 deletions
app/task/views.py
+
84
−
98
View file @
72c5ed22
...
...
@@ -17,6 +17,7 @@ from flask import (
)
from
sqlalchemy
import
and_
from
sqlalchemy
import
exc
from
flask_babel
import
_
,
lazy_gettext
as
_l
from
app
import
db
...
...
@@ -44,13 +45,12 @@ def get_randomized_page(page_id):
return
randomized_page
def
add_slider_answer
(
key
,
value
,
randomized_
page_id
=
None
):
def
add_slider_answer
(
key
,
value
,
page_id
=
None
):
'''
Insert slider value to database. If trial randomization is set to
'
Off
'
the values are inputted for session[
'
current_idpage
'
]. Otherwise the values
are set for the corresponding id found in the trial randomization table
'''
page_idpage
=
session
[
'
current_idpage
'
]
if
session
[
'
randomization
'
]
==
'
Off
'
else
randomized_page_id
participant_answer
=
answer
(
question_idquestion
=
key
,
answer_set_idanswer_set
=
session
[
'
answer_set
'
],
answer
=
value
,
page_idpage
=
page_idpage
)
participant_answer
=
answer
(
question_idquestion
=
key
,
answer_set_idanswer_set
=
session
[
'
answer_set
'
],
answer
=
value
,
page_idpage
=
page_id
)
db
.
session
.
add
(
participant_answer
)
db
.
session
.
commit
()
...
...
@@ -72,37 +72,23 @@ def update_answer_set_type(answer_type):
the_time
=
datetime
.
now
()
the_time
=
the_time
.
replace
(
microsecond
=
0
)
print
(
"
UPDATE TYPE
"
)
updated_answer_set
=
answer_set
.
query
.
filter_by
(
idanswer_set
=
session
[
'
answer_set
'
]).
first
()
updated_answer_set
.
answer_type
=
answer_type
updated_answer_set
.
last_answer_time
=
the_time
db
.
session
.
commit
()
def
slider_question_has_answers
(
user
,
page_id
):
'''
This should return true IF there are questions from certain page and no answers
'''
answer_set_id
=
answer_set
.
query
.
filter_by
(
session
=
user
).
first
().
idanswer_set
experiment_id
=
answer_set
.
query
.
filter_by
(
session
=
user
).
first
().
experiment_idexperiment
if
session
[
'
randomization
'
]
==
'
On
'
:
randomized_page_id
=
get_randomized_page
(
page_id
).
randomized_idpage
answers
=
answer
.
query
.
filter_by
(
answer_set_idanswer_set
=
answer_set_id
,
page_idpage
=
randomized_page_id
).
all
()
else
:
answers
=
answer
.
query
.
filter_by
(
answer_set_idanswer_set
=
answer_set_id
,
page_idpage
=
page_id
).
all
()
questions
=
question
.
query
.
filter_by
(
experiment_idexperiment
=
experiment_id
).
all
()
# TODO: should return true if there are no slider questions!!!!
return
(
True
if
(
len
(
answers
)
==
0
and
len
(
questions
)
>
0
)
else
False
)
def
select_form_type
():
"""
Select form type based on the value in answer_set->answer_type
"""
form
=
None
answer_set_type
=
answer_set
.
query
.
filter_by
(
idanswer_set
=
session
[
'
answer_set
'
]).
first
().
answer_type
print
(
session
[
'
answer_set
'
])
print
(
answer_set_type
)
if
answer_set_type
==
'
slider
'
:
form
=
TaskForm
()
...
...
@@ -118,36 +104,56 @@ def select_form_type():
return
form
def
check_if_answer_exists
(
answer_type
,
page_id
):
"""
Check if there is already answer on certain experiment->page
"""
check_answer
=
None
if
answer_type
==
'
embody
'
:
check_answer
=
embody_answer
.
query
.
filter
(
and_
(
embody_answer
.
answer_set_idanswer_set
==
session
[
'
answer_set
'
],
embody_answer
.
page_idpage
==
session
[
'
current_id
page
'
]
)).
first
()
check_answer
=
embody_answer
.
query
.
filter
(
and_
(
embody_answer
.
answer_set_idanswer_set
==
session
[
'
answer_set
'
],
embody_answer
.
page_idpage
==
page
_id
)).
first
()
elif
answer_type
==
'
slider
'
:
check_answer
=
answer
.
query
.
filter
(
and_
(
answer
.
answer_set_idanswer_set
==
session
[
'
answer_set
'
],
answer
.
page_idpage
==
session
[
'
current_id
page
'
]
)).
first
()
check_answer
=
answer
.
query
.
filter
(
and_
(
answer
.
answer_set_idanswer_set
==
session
[
'
answer_set
'
],
answer
.
page_idpage
==
page
_id
)).
first
()
return
(
check_answer
,
None
)
return
check_answer
def
check_if_randomized_answer_exists
(
answer_type
,
page_id
):
"""
Check if there is already answer on certain experiment->page if the pages are in randomized order
"""
check_answer
=
randomized_page_id
=
None
if
answer_type
==
'
embody
'
:
randomized_page_id
=
get_randomized_page
(
page_id
).
randomized_idpage
check_answer
=
embody_answer
.
query
.
filter
(
and_
(
embody_answer
.
answer_set_idanswer_set
==
session
[
'
answer_set
'
],
embody_answer
.
page_idpage
==
randomized_page_id
)).
first
()
elif
answer_type
==
'
slider
'
:
randomized_page_id
=
get_randomized_page
(
page_id
).
randomized_idpage
check_answer
=
answer
.
query
.
filter
(
and_
(
answer
.
answer_set_idanswer_set
==
session
[
'
answer_set
'
],
answer
.
page_idpage
==
randomized_page_id
)).
first
()
def
next_page
(
pages
):
# If no more pages left -> complete task
if
not
pages
.
has_next
:
return
redirect
(
url_for
(
'
task.completed
'
))
return
(
check_answer
,
randomized_page_id
)
# If user has answered to all questions, then move to next page
return
redirect
(
url_for
(
'
task.task
'
,
page_num
=
pages
.
next_num
))
def
get_experiment_info
():
"""
Return experiment information from current session
"""
try
:
return
experiment
.
query
.
filter_by
(
idexperiment
=
session
[
'
exp_id
'
]).
first
()
except
exc
.
KeyError
as
err
:
flash
(
"
No valid session found
"
)
return
redirect
(
'
/
'
)
def
embody_on
():
"""
Check from session[exp_id] if embody questions are enabled
"""
experiment_info
=
get_experiment_info
()
if
experiment_info
.
embody_enabled
:
return
True
else
:
return
False
################################
### ###
### ROUTES ###
### ###
################################
def
slider_on
():
"""
Check from session[exp_id] if there are slider questions in this session
"""
experiment_info
=
get_experiment_info
()
questions
=
question
.
query
.
filter_by
(
experiment_idexperiment
=
experiment_info
.
idexperiment
).
all
()
print
(
questions
)
if
len
(
questions
)
==
0
:
return
False
return
True
@task_blueprint.route
(
'
/embody/<int:page_num>
'
,
methods
=
[
'
POST
'
])
def
task_embody
(
page_num
):
...
...
@@ -163,36 +169,27 @@ def task_embody(page_num):
# Check if randomization ON and if user has already answered to embody question
if
session
[
'
randomization
'
]
==
'
On
'
:
check_answer
,
randomized_page
_id
=
check_if_randomized_answer_exists
(
'
embody
'
,
page_id
)
else
:
check_answer
,
randomized_page_id
=
check_if_answer_exists
(
'
embody
'
,
page_id
)
page_id
=
get_
randomized_page
(
page_id
).
randomized_idpage
check_answer
=
check_if_answer_exists
(
'
embody
'
,
page_id
)
# Add answer to DB
if
check_answer
is
None
:
page_idpage
=
session
[
'
current_idpage
'
]
if
session
[
'
randomization
'
]
==
'
Off
'
else
randomized_page_id
# Add new embody answer
participant_answer
=
embody_answer
(
answer_set_idanswer_set
=
session
[
'
answer_set
'
],
coordinates
=
data
[
'
coordinates
'
],
page_idpage
=
page_id
page
)
participant_answer
=
embody_answer
(
answer_set_idanswer_set
=
session
[
'
answer_set
'
],
coordinates
=
data
[
'
coordinates
'
],
page_idpage
=
page_id
)
db
.
session
.
add
(
participant_answer
)
# Update answer set type
answer_set
.
query
.
filter_by
(
idanswer_set
=
session
[
'
answer_set
'
]).
first
().
answer_type
=
'
slider
'
db
.
session
.
commit
()
else
:
flash
(
"
Page has been answered already. Answers discarded
"
)
# Check if there are unanswered slider questions
if
slider_question_has_answers
(
session
[
'
user
'
],
page_id
):
# Check if there are unanswered slider questions -> if true redirect to same page
if
slider_on
():
print
(
"
SLIDER ON
"
)
update_answer_set_type
(
'
slider
'
)
return
redirect
(
url_for
(
'
task.task
'
,
page_num
=
page_num
))
if
not
pages
.
has_next
:
return
redirect
(
url_for
(
'
task.completed
'
))
# If user has answered to all questions, then move to next page
return
redirect
(
url_for
(
'
task.task
'
,
page_num
=
pages
.
next_num
))
update_answer_set_page
()
return
next_page
(
pages
)
@task_blueprint.route
(
'
/question/<int:page_num>
'
,
methods
=
[
'
POST
'
])
...
...
@@ -207,44 +204,31 @@ def task_answer(page_num):
# Check if randomization ON and if user has already answered to slider question
if
session
[
'
randomization
'
]
==
'
On
'
:
check_answer
,
randomized_page
_id
=
check_if_randomized_answer_exists
(
'
slider
'
,
page_id
)
else
:
check_answer
,
randomized_page_id
=
check_if_answer_exists
(
'
slider
'
,
page_id
)
page_id
=
get_
randomized_page
(
page_id
).
randomized_idpage
check_answer
=
check_if_answer_exists
(
'
slider
'
,
page_id
)
if
check_answer
is
None
:
data
=
request
.
form
.
to_dict
()
for
key
,
value
in
data
.
items
():
add_slider_answer
(
key
,
value
,
randomized_
page_id
)
add_slider_answer
(
key
,
value
,
page_id
)
else
:
flash
(
"
Page has been answered already. Answers discarded
"
)
page_num
=
pages
.
next_num
if
not
pages
.
has_next
:
return
redirect
(
url_for
(
'
task.completed
'
))
update_answer_set_page
()
# If embody in use -> change the answer set type
exp_status
=
experiment
.
query
.
filter_by
(
idexperiment
=
session
[
'
exp_id
'
]).
first
()
if
exp_status
.
embody_enabled
:
if
embody_on
():
update_answer_set_type
(
'
embody
'
)
return
redirect
(
url_for
(
'
task.task
'
,
page_num
=
pages
.
next_num
))
# Always redirect to next page from sliders
update_answer_set_page
()
return
next_page
(
pages
)
@task_blueprint.route
(
'
/<int:page_num>
'
,
methods
=
[
'
GET
'
])
def
task
(
page_num
):
"""
Get selected task page
"""
randomized_stimulus
=
""
try
:
experiment_info
=
experiment
.
query
.
filter_by
(
idexperiment
=
session
[
'
exp_id
'
]).
first
()
except
KeyError
as
err
:
print
(
err
)
flash
(
"
No valid session found
"
)
return
redirect
(
'
/
'
)
experiment_info
=
get_experiment_info
()
#for text stimuli the size needs to be calculated since the template element utilises h1-h6 tags.
#A value of stimulus size 12 gives h1 and value of 1 gives h6
stimulus_size
=
experiment_info
.
stimulus_size
...
...
@@ -253,6 +237,7 @@ def task(page_num):
pages
=
page
.
query
.
filter_by
(
experiment_idexperiment
=
session
[
'
exp_id
'
]).
paginate
(
per_page
=
1
,
page
=
page_num
,
error_out
=
True
)
page_id
=
pages
.
items
[
0
].
idpage
progress_bar_percentage
=
round
((
pages
.
page
/
pages
.
pages
)
*
100
)
session
[
'
current_idpage
'
]
=
page_id
# if trial randomization is on we will still use the same functionality that is used otherwise
# but we will pass the randomized pair of the page_id from trial randomization table to the task.html
...
...
@@ -260,8 +245,16 @@ def task(page_num):
randomized_page_id
=
get_randomized_page
(
page_id
).
randomized_idpage
randomized_stimulus
=
page
.
query
.
filter_by
(
idpage
=
randomized_page_id
).
first
()
# OLD
for
p
in
pages
.
items
:
session
[
'
current_idpage
'
]
=
p
.
idpage
#session['current_idpage'] = p.idpage
print
(
'
p.idpage -loop:
'
,
p
.
idpage
)
# TODO: if no form found -> go to next page
# TODO: check if embody ON and no answer
return
render_template
(
'
task.html
'
,
...
...
@@ -292,30 +285,30 @@ def completed():
@task_blueprint.route
(
'
/continue
'
,
methods
=
[
'
GET
'
,
'
POST
'
])
def
continue_task
():
"""
Continue unfinished task
"""
# Set experience if to session
exp_id
=
request
.
args
.
get
(
'
exp_id
'
,
None
)
session
[
'
exp_id
'
]
=
exp_id
form
=
ContinueTaskForm
()
if
form
.
validate_on_submit
():
#check if participant ID is found from db and that the answer set is linked to the correct experiment
participant
=
answer_set
.
query
.
filter
(
and_
(
answer_set
.
session
==
form
.
participant_id
.
data
,
answer_set
.
experiment_idexperiment
==
exp_id
)).
first
()
if
participant
is
None
:
flash
(
'
Invalid ID
'
)
return
redirect
(
url_for
(
'
task.continue_task
'
,
exp_id
=
exp_id
))
#flash('Login requested for participant {}'.format(form.participant_id.data))
#if correct participant_id is found with the correct experiment ID; start session for that user
session
[
'
exp_id
'
]
=
exp_id
exp
=
get_experiment_info
()
session
[
'
user
'
]
=
form
.
participant_id
.
data
session
[
'
answer_set
'
]
=
participant
.
idanswer_set
mediatype
=
page
.
query
.
filter_by
(
experiment_idexperiment
=
session
[
'
exp_id
'
]).
first
()
exp
=
experiment
.
query
.
filter_by
(
idexperiment
=
session
[
'
exp_id
'
]).
first
()
session
[
'
randomization
'
]
=
exp
.
randomization
update_answer_set_type
(
participant
.
answer_type
)
mediatype
=
page
.
query
.
filter_by
(
experiment_idexperiment
=
session
[
'
exp_id
'
]).
first
()
if
mediatype
:
session
[
'
type
'
]
=
mediatype
.
type
else
:
...
...
@@ -324,20 +317,13 @@ def continue_task():
#If participant has done just the registration redirect to the first page of the experiment
if
participant
.
answer_counter
==
0
:
#flash("Ei vastauksia ohjataan ekalle sivulle")
return
redirect
(
url_for
(
'
task.task
'
,
page_num
=
1
))
redirect_to_page
=
participant
.
answer_counter
+
1
#flash("redirect to page:")
#flash(redirect_to_page)
experiment_page_count
=
db
.
session
.
query
(
page
).
filter_by
(
experiment_idexperiment
=
session
[
'
exp_id
'
]).
count
()
#If participant has ansvered all pages allready redirect to task completed page
if
experiment_page_count
==
participant
.
answer_counter
:
return
redirect
(
url_for
(
'
task.completed
'
))
return
redirect
(
url_for
(
'
task.task
'
,
page_num
=
redirect_to_page
))
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment