Hopefully fixes #98 and all job tasks bugs

This commit is contained in:
Renzo 2016-07-30 00:16:24 -05:00
parent 536a1f907b
commit 61b48d0ca1

View File

@ -2,7 +2,7 @@
class JobTask extends Rails\ActiveRecord\Base class JobTask extends Rails\ActiveRecord\Base
{ {
protected $data; protected $data;
static public function execute_once() static public function execute_once()
{ {
foreach (self::where('status = "pending" AND task_type IN (?)', CONFIG()->active_job_tasks)->order("id desc")->take() as $task) { foreach (self::where('status = "pending" AND task_type IN (?)', CONFIG()->active_job_tasks)->order("id desc")->take() as $task) {
@ -10,7 +10,7 @@ class JobTask extends Rails\ActiveRecord\Base
sleep(1); sleep(1);
} }
} }
public function pretty_data() public function pretty_data()
{ {
switch ($this->task_type) { switch ($this->task_type) {
@ -18,10 +18,10 @@ class JobTask extends Rails\ActiveRecord\Base
$start = $this->data["start_tags"]; $start = $this->data["start_tags"];
$result = $this->data["result_tags"]; $result = $this->data["result_tags"];
$user = User::find_name($this->data["updater_id"]); $user = User::find_name($this->data["updater_id"]);
return "start: ".$start.", result: ".$result.", user: ".$user; return "start: ".$start.", result: ".$result.", user: ".$user;
break; break;
case "approve_tag_alias": case "approve_tag_alias":
$ta = TagAlias::where('id', $this->data->id)->first(); $ta = TagAlias::where('id', $this->data->id)->first();
if (!$ta) { if (!$ta) {
@ -31,7 +31,7 @@ class JobTask extends Rails\ActiveRecord\Base
} }
return "start: " . $ta->name . ", result: " . $ta->alias_name(); return "start: " . $ta->name . ", result: " . $ta->alias_name();
break; break;
case "approve_tag_implication": case "approve_tag_implication":
$ti = TagImplication::where('id', $this->data->id)->first(); $ti = TagImplication::where('id', $this->data->id)->first();
if (!$ti) { if (!$ti) {
@ -41,7 +41,7 @@ class JobTask extends Rails\ActiveRecord\Base
} }
return "start: " . $ti->predicate->name . ", result: " . $ti->consequent->name; return "start: " . $ti->predicate->name . ", result: " . $ti->consequent->name;
break; break;
case "calculate_tag_subscriptions": case "calculate_tag_subscriptions":
return "last run: " . (isset($this->data->last_run) ? $this->data->last_run : 'never'); return "last run: " . (isset($this->data->last_run) ? $this->data->last_run : 'never');
break; break;
@ -57,7 +57,7 @@ class JobTask extends Rails\ActiveRecord\Base
// end // end
// ret << (" (%i left) " % data["left"]) if data["left"] // ret << (" (%i left) " % data["left"]) if data["left"]
// ret // ret
case "periodic_maintenance": case "periodic_maintenance":
if ($this->status == "processing") if ($this->status == "processing")
return !empty($this->data->step) ? $this->data->step : 'unknown'; return !empty($this->data->step) ? $this->data->step : 'unknown';
@ -75,7 +75,7 @@ class JobTask extends Rails\ActiveRecord\Base
case "external_data_search": case "external_data_search":
return 'last updated post id: ' . (isset($this->data->last_post_id) ? $this->data->last_post_id : '(none)'); return 'last updated post id: ' . (isset($this->data->last_post_id) ? $this->data->last_post_id : '(none)');
break; break;
case "upload_batch_posts": case "upload_batch_posts":
if ($this->status == "pending") if ($this->status == "pending")
return "idle"; return "idle";
@ -93,28 +93,35 @@ class JobTask extends Rails\ActiveRecord\Base
// end // end
} }
} }
public function execute() public function execute()
{ {
if ($this->repeat_count > 0) if ($this->repeat_count > 0)
$count = $this->repeat_count - 1; $count = $this->repeat_count - 1;
else else
$count = $this->repeat_count; $count = $this->repeat_count;
Rails::systemExit()->register(function(){ Rails::systemExit()->register(function(){
if ($this->status == 'processing') if ($this->status == 'processing')
$this->updateAttribute('status', 'pending'); $this->updateAttribute('status', 'pending');
}, 'job_task'); }, 'job_task');
try { try {
$this->updateAttribute('status', "processing"); $this->updateAttribute('status', "processing");
$task_method = 'execute_'.$this->task_type; $task_method = 'execute_'.$this->task_type;
$this->$task_method(); $this->$task_method();
if ($count == 0) if ($count == 0)
$this->updateAttribute('status', "finished"); $this->updateAttribute('status', "finished");
else else {
// This is necessary due to a bug with Rails that won't clear changed attributes,
// so when 'status' is changed back to 'pending', the system will think the attribute
// is being reversed to its previous value, and will remove it from the changedAttributes,
// array, therefore the new value 'pending' won't be set and will stay as 'processing'.
$this->clearChangedAttributes();
$this->updateAttributes(array('status' => "pending", 'repeat_count' => $count)); $this->updateAttributes(array('status' => "pending", 'repeat_count' => $count));
}
} catch (Exception $x) { } catch (Exception $x) {
$text = ""; $text = "";
$text .= "Error executing job: " . $this->task_type . "\n"; $text .= "Error executing job: " . $this->task_type . "\n";
@ -126,34 +133,34 @@ class JobTask extends Rails\ActiveRecord\Base
throw $x; throw $x;
} }
} }
public function execute_periodic_maintenance() public function execute_periodic_maintenance()
{ {
if (!empty($this->data->next_run) && $this->data->next_run > time('Y-m-d H:i:s')) if (!empty($this->data->next_run) && $this->data->next_run > time('Y-m-d H:i:s'))
return; return;
$this->update_data(array("step" => "recalculating post count")); $this->update_data(array("step" => "recalculating post count"));
Post::recalculate_row_count(); Post::recalculate_row_count();
$this->update_data(array("step" => "recalculating tag post counts")); $this->update_data(array("step" => "recalculating tag post counts"));
Tag::recalculate_post_count(); Tag::recalculate_post_count();
$this->update_data(array("step" => "purging old tags")); $this->update_data(array("step" => "purging old tags"));
Tag::purge_tags(); Tag::purge_tags();
$next_run = strtotime('+6 hours'); $next_run = strtotime('+6 hours');
$this->update_data(array("next_run" => date('Y-m-d H:i:s', $next_run), "step" => null)); $this->update_data(array("next_run" => date('Y-m-d H:i:s', $next_run), "step" => null));
} }
public function execute_external_data_search() public function execute_external_data_search()
{ {
# current_user will be needed to save post history. # current_user will be needed to save post history.
# Set the first admin as current user. # Set the first admin as current user.
User::set_current_user(User::where('level = ?', CONFIG()->user_levels['Admin'])->first()); User::set_current_user(User::where('level = ?', CONFIG()->user_levels['Admin'])->first());
if (empty($this->data->last_post_id)) if (empty($this->data->last_post_id))
$this->data->last_post_id = 0; $this->data->last_post_id = 0;
$post_id = $this->data->last_post_id + 1; $post_id = $this->data->last_post_id + 1;
$config = array_merge([ $config = array_merge([
'servers' => [], 'servers' => [],
'interval' => 3, 'interval' => 3,
@ -164,7 +171,7 @@ class JobTask extends Rails\ActiveRecord\Base
'exclude_tags' => [], 'exclude_tags' => [],
'similarity' => 90 'similarity' => 90
], CONFIG()->external_data_search_config); ], CONFIG()->external_data_search_config);
$limit = $config['limit']; $limit = $config['limit'];
$interval = $config['interval']; $interval = $config['interval'];
$search_options = [ $search_options = [
@ -173,62 +180,62 @@ class JobTask extends Rails\ActiveRecord\Base
'services' => $config['servers'], 'services' => $config['servers'],
'threshold' => $config['similarity'] 'threshold' => $config['similarity']
]; ];
$post_count = !$limit ? -1 : 0; $post_count = !$limit ? -1 : 0;
while ($post_count < $limit) { while ($post_count < $limit) {
if (!$post = Post::where('id >= ? AND status != "deleted"', $post_id)->order('id ASC')->first()) { if (!$post = Post::where('id >= ? AND status != "deleted"', $post_id)->order('id ASC')->first()) {
break; break;
} }
$search_options['source'] = $post; $search_options['source'] = $post;
$new_tags = []; $new_tags = [];
$source = null; $source = null;
$external_posts = SimilarImages::similar_images($search_options)['posts_external']; $external_posts = SimilarImages::similar_images($search_options)['posts_external'];
$rating_set = false; $rating_set = false;
foreach ($external_posts as $ep) { foreach ($external_posts as $ep) {
if (!$rating_set && $config['set_rating'] && $ep->rating) { if (!$rating_set && $config['set_rating'] && $ep->rating) {
$post->rating = $ep->rating; $post->rating = $ep->rating;
$rating_set = true; $rating_set = true;
} }
if ($config['source'] && !$source && $ep->source) { if ($config['source'] && !$source && $ep->source) {
$source = $ep->source; $source = $ep->source;
} }
$new_tags = array_merge($new_tags, explode(' ', $ep->tags)); $new_tags = array_merge($new_tags, explode(' ', $ep->tags));
} }
# Exclude tags. # Exclude tags.
$new_tags = array_diff($new_tags, $config['exclude_tags']); $new_tags = array_diff($new_tags, $config['exclude_tags']);
if ($config['merge_tags']) { if ($config['merge_tags']) {
$new_tags = array_merge($new_tags, $post->tags); $new_tags = array_merge($new_tags, $post->tags);
} }
$new_tags = array_filter(array_unique($new_tags)); $new_tags = array_filter(array_unique($new_tags));
$post->new_tags = $new_tags; $post->new_tags = $new_tags;
if ($source); { if ($source); {
$post->source = $source; $post->source = $source;
} }
$post->save(); $post->save();
if ($limit) { if ($limit) {
$post_count++; $post_count++;
} }
$this->update_data(['last_post_id' => $post->id]); $this->update_data(['last_post_id' => $post->id]);
$post_id = $post->id + 1; $post_id = $post->id + 1;
if ($config['interval']) { if ($config['interval']) {
sleep($config['interval']); sleep($config['interval']);
} }
} }
} }
public function execute_upload_batch_posts() public function execute_upload_batch_posts()
{ {
$upload = BatchUpload::where("status = 'pending'")->order("id ASC")->first(); $upload = BatchUpload::where("status = 'pending'")->order("id ASC")->first();
@ -238,7 +245,7 @@ class JobTask extends Rails\ActiveRecord\Base
$this->updateAttributes(['data' => ['id' => $upload->id, 'user_id' => $upload->user_id, 'url' => $upload->url]]); $this->updateAttributes(['data' => ['id' => $upload->id, 'user_id' => $upload->user_id, 'url' => $upload->url]]);
$upload->run(); $upload->run();
} }
public function execute_approve_tag_alias() public function execute_approve_tag_alias()
{ {
$ta = TagAlias::find($this->data->id); $ta = TagAlias::find($this->data->id);
@ -246,7 +253,7 @@ class JobTask extends Rails\ActiveRecord\Base
$updater_ip_addr = $this->data->updater_ip_addr; $updater_ip_addr = $this->data->updater_ip_addr;
$ta->approve($updater_id, $updater_ip_addr); $ta->approve($updater_id, $updater_ip_addr);
} }
public function execute_approve_tag_implication() public function execute_approve_tag_implication()
{ {
$ti = TagImplication::find($this->data->id); $ti = TagImplication::find($this->data->id);
@ -254,7 +261,7 @@ class JobTask extends Rails\ActiveRecord\Base
$updater_ip_addr = $this->data->updater_ip_addr; $updater_ip_addr = $this->data->updater_ip_addr;
$ti->approve($updater_id, $updater_ip_addr); $ti->approve($updater_id, $updater_ip_addr);
} }
public function execute_calculate_tag_subscriptions() public function execute_calculate_tag_subscriptions()
{ {
if (Rails::cache()->read("delay-tag-sub-calc")) { if (Rails::cache()->read("delay-tag-sub-calc")) {
@ -264,12 +271,12 @@ class JobTask extends Rails\ActiveRecord\Base
TagSubscription::process_all(); TagSubscription::process_all();
$this->updateAttributes(['data' => ['last_run' => date('Y-m-d H:i')]]); $this->updateAttributes(['data' => ['last_run' => date('Y-m-d H:i')]]);
} }
protected function init() protected function init()
{ {
$this->setData($this->data_as_json ? json_decode($this->data_as_json) : new stdClass()); $this->setData($this->data_as_json ? json_decode($this->data_as_json) : new stdClass());
} }
public function setData($data) public function setData($data)
{ {
$this->data_as_json = json_encode($data); $this->data_as_json = json_encode($data);
@ -281,4 +288,4 @@ class JobTask extends Rails\ActiveRecord\Base
$data = array_merge((array)$this->data, $data); $data = array_merge((array)$this->data, $data);
$this->updateAttributes(array('data' => $data)); $this->updateAttributes(array('data' => $data));
} }
} }