From 61b48d0ca1e5e7bbde46427172b9f6321283ddeb Mon Sep 17 00:00:00 2001 From: Renzo Date: Sat, 30 Jul 2016 00:16:24 -0500 Subject: [PATCH] Hopefully fixes #98 and all job tasks bugs --- app/models/JobTask.php | 91 +++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 42 deletions(-) diff --git a/app/models/JobTask.php b/app/models/JobTask.php index fce0814..fbac654 100755 --- a/app/models/JobTask.php +++ b/app/models/JobTask.php @@ -2,7 +2,7 @@ class JobTask extends Rails\ActiveRecord\Base { protected $data; - + static public function execute_once() { foreach (self::where('status = "pending" AND task_type IN (?)', CONFIG()->active_job_tasks)->order("id desc")->take() as $task) { @@ -10,7 +10,7 @@ class JobTask extends Rails\ActiveRecord\Base sleep(1); } } - + public function pretty_data() { switch ($this->task_type) { @@ -18,10 +18,10 @@ class JobTask extends Rails\ActiveRecord\Base $start = $this->data["start_tags"]; $result = $this->data["result_tags"]; $user = User::find_name($this->data["updater_id"]); - + return "start: ".$start.", result: ".$result.", user: ".$user; break; - + case "approve_tag_alias": $ta = TagAlias::where('id', $this->data->id)->first(); if (!$ta) { @@ -31,7 +31,7 @@ class JobTask extends Rails\ActiveRecord\Base } return "start: " . $ta->name . ", result: " . $ta->alias_name(); break; - + case "approve_tag_implication": $ti = TagImplication::where('id', $this->data->id)->first(); if (!$ti) { @@ -41,7 +41,7 @@ class JobTask extends Rails\ActiveRecord\Base } return "start: " . $ti->predicate->name . ", result: " . $ti->consequent->name; break; - + case "calculate_tag_subscriptions": return "last run: " . (isset($this->data->last_run) ? $this->data->last_run : 'never'); break; @@ -57,7 +57,7 @@ class JobTask extends Rails\ActiveRecord\Base // end // ret << (" (%i left) " % data["left"]) if data["left"] // ret - + case "periodic_maintenance": if ($this->status == "processing") return !empty($this->data->step) ? $this->data->step : 'unknown'; @@ -75,7 +75,7 @@ class JobTask extends Rails\ActiveRecord\Base case "external_data_search": return 'last updated post id: ' . (isset($this->data->last_post_id) ? $this->data->last_post_id : '(none)'); break; - + case "upload_batch_posts": if ($this->status == "pending") return "idle"; @@ -93,28 +93,35 @@ class JobTask extends Rails\ActiveRecord\Base // end } } - + public function execute() { if ($this->repeat_count > 0) $count = $this->repeat_count - 1; else $count = $this->repeat_count; - + Rails::systemExit()->register(function(){ if ($this->status == 'processing') $this->updateAttribute('status', 'pending'); }, 'job_task'); - + try { $this->updateAttribute('status', "processing"); $task_method = 'execute_'.$this->task_type; $this->$task_method(); - + if ($count == 0) $this->updateAttribute('status', "finished"); - else + else { + // This is necessary due to a bug with Rails that won't clear changed attributes, + // so when 'status' is changed back to 'pending', the system will think the attribute + // is being reversed to its previous value, and will remove it from the changedAttributes, + // array, therefore the new value 'pending' won't be set and will stay as 'processing'. + $this->clearChangedAttributes(); + $this->updateAttributes(array('status' => "pending", 'repeat_count' => $count)); + } } catch (Exception $x) { $text = ""; $text .= "Error executing job: " . $this->task_type . "\n"; @@ -126,34 +133,34 @@ class JobTask extends Rails\ActiveRecord\Base throw $x; } } - + public function execute_periodic_maintenance() { if (!empty($this->data->next_run) && $this->data->next_run > time('Y-m-d H:i:s')) return; - + $this->update_data(array("step" => "recalculating post count")); Post::recalculate_row_count(); $this->update_data(array("step" => "recalculating tag post counts")); Tag::recalculate_post_count(); $this->update_data(array("step" => "purging old tags")); Tag::purge_tags(); - + $next_run = strtotime('+6 hours'); $this->update_data(array("next_run" => date('Y-m-d H:i:s', $next_run), "step" => null)); } - + public function execute_external_data_search() { # current_user will be needed to save post history. # Set the first admin as current user. User::set_current_user(User::where('level = ?', CONFIG()->user_levels['Admin'])->first()); - + if (empty($this->data->last_post_id)) $this->data->last_post_id = 0; - + $post_id = $this->data->last_post_id + 1; - + $config = array_merge([ 'servers' => [], 'interval' => 3, @@ -164,7 +171,7 @@ class JobTask extends Rails\ActiveRecord\Base 'exclude_tags' => [], 'similarity' => 90 ], CONFIG()->external_data_search_config); - + $limit = $config['limit']; $interval = $config['interval']; $search_options = [ @@ -173,62 +180,62 @@ class JobTask extends Rails\ActiveRecord\Base 'services' => $config['servers'], 'threshold' => $config['similarity'] ]; - + $post_count = !$limit ? -1 : 0; - + while ($post_count < $limit) { if (!$post = Post::where('id >= ? AND status != "deleted"', $post_id)->order('id ASC')->first()) { break; } - + $search_options['source'] = $post; $new_tags = []; $source = null; - + $external_posts = SimilarImages::similar_images($search_options)['posts_external']; - + $rating_set = false; foreach ($external_posts as $ep) { if (!$rating_set && $config['set_rating'] && $ep->rating) { $post->rating = $ep->rating; $rating_set = true; } - + if ($config['source'] && !$source && $ep->source) { $source = $ep->source; } $new_tags = array_merge($new_tags, explode(' ', $ep->tags)); } - + # Exclude tags. $new_tags = array_diff($new_tags, $config['exclude_tags']); - + if ($config['merge_tags']) { $new_tags = array_merge($new_tags, $post->tags); } - + $new_tags = array_filter(array_unique($new_tags)); $post->new_tags = $new_tags; - + if ($source); { $post->source = $source; } - + $post->save(); - + if ($limit) { $post_count++; } - + $this->update_data(['last_post_id' => $post->id]); $post_id = $post->id + 1; - + if ($config['interval']) { sleep($config['interval']); } } } - + public function execute_upload_batch_posts() { $upload = BatchUpload::where("status = 'pending'")->order("id ASC")->first(); @@ -238,7 +245,7 @@ class JobTask extends Rails\ActiveRecord\Base $this->updateAttributes(['data' => ['id' => $upload->id, 'user_id' => $upload->user_id, 'url' => $upload->url]]); $upload->run(); } - + public function execute_approve_tag_alias() { $ta = TagAlias::find($this->data->id); @@ -246,7 +253,7 @@ class JobTask extends Rails\ActiveRecord\Base $updater_ip_addr = $this->data->updater_ip_addr; $ta->approve($updater_id, $updater_ip_addr); } - + public function execute_approve_tag_implication() { $ti = TagImplication::find($this->data->id); @@ -254,7 +261,7 @@ class JobTask extends Rails\ActiveRecord\Base $updater_ip_addr = $this->data->updater_ip_addr; $ti->approve($updater_id, $updater_ip_addr); } - + public function execute_calculate_tag_subscriptions() { if (Rails::cache()->read("delay-tag-sub-calc")) { @@ -264,12 +271,12 @@ class JobTask extends Rails\ActiveRecord\Base TagSubscription::process_all(); $this->updateAttributes(['data' => ['last_run' => date('Y-m-d H:i')]]); } - + protected function init() { $this->setData($this->data_as_json ? json_decode($this->data_as_json) : new stdClass()); } - + public function setData($data) { $this->data_as_json = json_encode($data); @@ -281,4 +288,4 @@ class JobTask extends Rails\ActiveRecord\Base $data = array_merge((array)$this->data, $data); $this->updateAttributes(array('data' => $data)); } -} \ No newline at end of file +}