cron.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. #
  2. # Copyright 2019 The FATE Authors. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import threading
  17. import time
  18. import random
  19. class Cron(threading.Thread):
  20. def __init__(self, interval, run_second=None, rand_size=None, title='', logger=None, lock=None):
  21. """
  22. :param interval: interval by millisecond
  23. :param run_second:
  24. :param rand_size:
  25. :param title:
  26. :param logger:
  27. :param lock:
  28. """
  29. super(Cron, self).__init__()
  30. self.interval = interval
  31. self.run_second = run_second
  32. self.rand_size = rand_size
  33. self.finished = threading.Event()
  34. self.title = title
  35. self.logger = logger
  36. self.lock = lock
  37. def cancel(self):
  38. self.finished.set()
  39. def run(self):
  40. def do():
  41. try:
  42. if not self.lock or self.lock.acquire(0):
  43. self.run_do()
  44. except Exception as e:
  45. if self.logger:
  46. self.logger.exception(e)
  47. else:
  48. raise e
  49. finally:
  50. if self.lock and self.lock.locked():
  51. self.lock.release()
  52. try:
  53. if self.logger and self.title:
  54. self.logger.info('%s cron start.' % self.title)
  55. if self.run_second is None:
  56. first_interval = self.interval
  57. else:
  58. now = int(round(time.time()*1000))
  59. delta = -now % 60000 + self.run_second*1000
  60. if delta > 0:
  61. first_interval = delta
  62. else:
  63. first_interval = 60*1000 + delta
  64. self.finished.wait(first_interval/1000)
  65. if not self.finished.is_set():
  66. do()
  67. while True:
  68. self.finished.wait((self.interval if self.rand_size is None else self.interval - random.randint(0, self.rand_size))/1000)
  69. if not self.finished.is_set():
  70. do()
  71. except Exception as e:
  72. if self.logger:
  73. self.logger.exception(e)
  74. else:
  75. raise e
  76. def run_do(self):
  77. pass