test_umat.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #!/usr/bin/env python
  2. from __future__ import print_function
  3. import numpy as np
  4. import cv2 as cv
  5. import os
  6. from tests_common import NewOpenCVTests
  7. def load_exposure_seq(path):
  8. images = []
  9. times = []
  10. with open(os.path.join(path, 'list.txt'), 'r') as list_file:
  11. for line in list_file.readlines():
  12. name, time = line.split()
  13. images.append(cv.imread(os.path.join(path, name)))
  14. times.append(1. / float(time))
  15. return images, times
  16. class UMat(NewOpenCVTests):
  17. def test_umat_construct(self):
  18. data = np.random.random([512, 512])
  19. # UMat constructors
  20. data_um = cv.UMat(data) # from ndarray
  21. data_sub_um = cv.UMat(data_um, (128, 256), (128, 256)) # from UMat
  22. data_dst_um = cv.UMat(128, 128, cv.CV_64F) # from size/type
  23. # test continuous and submatrix flags
  24. assert data_um.isContinuous() and not data_um.isSubmatrix()
  25. assert not data_sub_um.isContinuous() and data_sub_um.isSubmatrix()
  26. # test operation on submatrix
  27. cv.multiply(data_sub_um, 2., dst=data_dst_um)
  28. assert np.allclose(2. * data[128:256, 128:256], data_dst_um.get())
  29. def test_umat_handle(self):
  30. a_um = cv.UMat(256, 256, cv.CV_32F)
  31. _ctx_handle = cv.UMat.context() # obtain context handle
  32. _queue_handle = cv.UMat.queue() # obtain queue handle
  33. _a_handle = a_um.handle(cv.ACCESS_READ) # obtain buffer handle
  34. _offset = a_um.offset # obtain buffer offset
  35. def test_umat_matching(self):
  36. img1 = self.get_sample("samples/data/right01.jpg")
  37. img2 = self.get_sample("samples/data/right02.jpg")
  38. orb = cv.ORB_create()
  39. img1, img2 = cv.UMat(img1), cv.UMat(img2)
  40. ps1, descs_umat1 = orb.detectAndCompute(img1, None)
  41. ps2, descs_umat2 = orb.detectAndCompute(img2, None)
  42. self.assertIsInstance(descs_umat1, cv.UMat)
  43. self.assertIsInstance(descs_umat2, cv.UMat)
  44. self.assertGreater(len(ps1), 0)
  45. self.assertGreater(len(ps2), 0)
  46. bf = cv.BFMatcher(cv.NORM_HAMMING, crossCheck=True)
  47. res_umats = bf.match(descs_umat1, descs_umat2)
  48. res = bf.match(descs_umat1.get(), descs_umat2.get())
  49. self.assertGreater(len(res), 0)
  50. self.assertEqual(len(res_umats), len(res))
  51. def test_umat_optical_flow(self):
  52. img1 = self.get_sample("samples/data/right01.jpg", cv.IMREAD_GRAYSCALE)
  53. img2 = self.get_sample("samples/data/right02.jpg", cv.IMREAD_GRAYSCALE)
  54. # Note, that if you want to see performance boost by OCL implementation - you need enough data
  55. # For example you can increase maxCorners param to 10000 and increase img1 and img2 in such way:
  56. # img = np.hstack([np.vstack([img] * 6)] * 6)
  57. feature_params = dict(maxCorners=239,
  58. qualityLevel=0.3,
  59. minDistance=7,
  60. blockSize=7)
  61. p0 = cv.goodFeaturesToTrack(img1, mask=None, **feature_params)
  62. p0_umat = cv.goodFeaturesToTrack(cv.UMat(img1), mask=None, **feature_params)
  63. self.assertEqual(p0_umat.get().shape, p0.shape)
  64. p0 = np.array(sorted(p0, key=lambda p: tuple(p[0])))
  65. p0_umat = cv.UMat(np.array(sorted(p0_umat.get(), key=lambda p: tuple(p[0]))))
  66. self.assertTrue(np.allclose(p0_umat.get(), p0))
  67. _p1_mask_err = cv.calcOpticalFlowPyrLK(img1, img2, p0, None)
  68. _p1_mask_err_umat0 = list(map(lambda umat: umat.get(), cv.calcOpticalFlowPyrLK(img1, img2, p0_umat, None)))
  69. _p1_mask_err_umat1 = list(map(lambda umat: umat.get(), cv.calcOpticalFlowPyrLK(cv.UMat(img1), img2, p0_umat, None)))
  70. _p1_mask_err_umat2 = list(map(lambda umat: umat.get(), cv.calcOpticalFlowPyrLK(img1, cv.UMat(img2), p0_umat, None)))
  71. for _p1_mask_err_umat in [_p1_mask_err_umat0, _p1_mask_err_umat1, _p1_mask_err_umat2]:
  72. for data, data_umat in zip(_p1_mask_err, _p1_mask_err_umat):
  73. self.assertEqual(data.shape, data_umat.shape)
  74. self.assertEqual(data.dtype, data_umat.dtype)
  75. for _p1_mask_err_umat in [_p1_mask_err_umat1, _p1_mask_err_umat2]:
  76. for data_umat0, data_umat in zip(_p1_mask_err_umat0[:2], _p1_mask_err_umat[:2]):
  77. self.assertTrue(np.allclose(data_umat0, data_umat))
  78. def test_umat_merge_mertens(self):
  79. if self.extraTestDataPath is None:
  80. self.fail('Test data is not available')
  81. test_data_path = os.path.join(self.extraTestDataPath, 'cv', 'hdr')
  82. images, _ = load_exposure_seq(os.path.join(test_data_path, 'exposures'))
  83. # As we want to test mat vs. umat here, we temporarily set only one worker-thread to achieve
  84. # deterministic summations inside mertens' parallelized process.
  85. num_threads = cv.getNumThreads()
  86. cv.setNumThreads(1)
  87. merge = cv.createMergeMertens()
  88. mat_result = merge.process(images)
  89. umat_images = [cv.UMat(img) for img in images]
  90. umat_result = merge.process(umat_images)
  91. cv.setNumThreads(num_threads)
  92. self.assertTrue(np.allclose(umat_result.get(), mat_result))
  93. if __name__ == '__main__':
  94. NewOpenCVTests.bootstrap()